計算機的io模型區分為多種,目前用的最多的也就是nio、epoll、select。
結合不同場景使用不同的io模型才是正解。
具體可以查看我之前寫的io模型演進。io模型演進
golang天然適合并發,為什么?一個是輕量級的協程,二個是將復雜的io進行了抽象化,簡化了流程。
比如我們簡單的訪問一個http服務,幾行簡單的代碼就能實現:
tr?:=?&recordingTransport{} client?:=?&Client{Transport:?tr} url?:=?"http://dummy.faketld/" client.Get(url)?//?Note:?doesn't?hit?network
那么golang對Io做了哪些優化呢?能實現如此簡單的切換呢?
我們這里假設你對groutinue調度已經有一定的了解了。
我們知道,在go中,每個process綁定一個虛擬的machine,而在machine中,是具有一個g0的,g0在本地遍歷自己的隊列獲取g或者從全局隊列獲取g。
我們也知道了,在g運行的時候,g會把執行權交給g0進行重新調度,那么在io事件中,g是怎么把事件交還給g0的呢?這時候就牽扯到我們今天的主角----netpoll。
o語言在網絡輪詢器中使用 I/O 多路復用模型處理 I/O 操作,但是他沒有選擇最常見的系統調用 select
。 select
也可以提供 I/O 多路復用的能力,但是使用它有比較多的限制:
golang官方統一封裝一個網絡事件的poll,和平臺無關,為epoll/kqueue/port/AIX/Windows 提供了特定的實現。
src/runtime/netpoll_epoll.go
src/runtime/netpoll_kqueue.go
src/runtime/netpoll_solaris.go
src/runtime/netpoll_windows.go
src/runtime/netpoll_aix.go
src/runtime/netpoll_fake.go
這些模塊在不同平臺上實現了相同的功能,構成了一個常見的樹形結構。編譯器在編譯 Go 語言程序時,會根據目標平臺選擇樹中特定的分支進行編譯
必須實現的方法有:
netpollinit?初始化網絡輪詢器,通過?`sync.Once`?和?`netpollInited`?變量保證函數只會調用一次 netpollopen?監聽文件描述符上的邊緣觸發事件,創建事件并加入監聽poll_runtime_pollOpen函數,這個函數將用戶態協程的pollDesc信息寫入到epoll所在的單獨線程,從而實現用戶態和內核態的關聯。 netpoll??輪詢網絡并返回一組已經準備就緒的?Goroutine,傳入的參數會決定它的行為: ??-?如果參數小于0,阻塞等待文件就緒 ??-?如果參數等于0,非阻塞輪詢 ??-?如果參數大于0,阻塞定期輪詢 netpollBreak?喚醒網絡輪詢器,例如:計時器向前修改時間時會通過該函數中斷網絡輪詢器 netpollIsPollDescriptor??判斷文件描述符是否被輪詢器使用
netpoll中有2個重要的結構體:
//pollCache?? //pollDesc type?pollDesc?struct?{ link?*pollDesc?//?in?pollcache,?protected?by?pollcache.lock //?The?lock?protects?pollOpen,?pollSetDeadline,?pollUnblock?and?deadlineimpl?operations. //?This?fully?covers?seq,?rt?and?wt?variables.?fd?is?constant?throughout?the?PollDesc?lifetime. //?pollReset,?pollWait,?pollWaitCanceled?and?runtime·netpollready?(IO?readiness?notification) //?proceed?w/o?taking?the?lock.?So?closing,?everr,?rg,?rd,?wg?and?wd?are?manipulated //?in?a?lock-free?way?by?all?operations. //?NOTE(dvyukov):?the?following?code?uses?uintptr?to?store?*g?(rg/wg), //?that?will?blow?up?when?GC?starts?moving?objects. lock????mutex?//?protects?the?following?fields fd??????uintptr closing?bool everr???bool??????//?marks?event?scanning?error?happened user????uint32????//?user?settable?cookie rseq????uintptr???//?protects?from?stale?read?timers rg??????uintptr???//?pdReady,?pdWait,?G?waiting?for?read?or?nil rt??????timer?????//?read?deadline?timer?(set?if?rt.f?!=?nil) rd??????int64?????//?read?deadline wseq????uintptr???//?protects?from?stale?write?timers wg??????uintptr???//?pdReady,?pdWait,?G?waiting?for?write?or?nil wt??????timer?????//?write?deadline?timer wd??????int64?????//?write?deadline self????*pollDesc?//?storage?for?indirect?interface.?See?(*pollDesc).makeArg. } type?pollCache?struct?{ lock??mutex first?*pollDesc //?PollDesc?objects?must?be?type-stable, //?because?we?can?get?ready?notification?from?epoll/kqueue //?after?the?descriptor?is?closed/reused. //?Stale?notifications?are?detected?using?seq?variable, //?seq?is?incremented?when?deadlines?are?changed?or?descriptor?is?reused. }
rseq
和 wseq
— 表示文件描述符被重用或者計時器被重置;rg
和 wg
— 表示二進制的信號量,可能為 pdReady
、pdWait
、等待文件描述符可讀或者可寫的 Goroutine 以及 nil
;rd
和 wd
— 等待文件描述符可讀或者可寫的截止日期;rt
和 wt
— 用于等待文件描述符的計時器;golang關于io時間做了很多統一的封裝在runtime/netpoll之下(其實調用的是internal/poll包下的),然后通過internal包下對 runtime包進行調用,internal包下也封裝了一個同名的pollDesc對象,不過是一個指針(關于internal有個細節就是這個包是不能被外部調用):
type?pollDesc?struct?{ runtimeCtx?uintptr }
其實最終都是對runtime底下的調用,只不過封裝了一些易用的方法,比如read,write,做了一些抽象化的處理。
func?runtime_pollServerInit()??//初始化 func?runtime_pollOpen(fd?uintptr)?(uintptr,?int)??//打開 func?runtime_pollClose(ctx?uintptr)???//關閉 func?runtime_pollWait(ctx?uintptr,?mode?int)?int?//等待 func?runtime_pollWaitCanceled(ctx?uintptr,?mode?int)?int??//等待并(失敗時)退出 func?runtime_pollReset(ctx?uintptr,?mode?int)?int??//重置狀態,復用 func?runtime_pollSetDeadline(ctx?uintptr,?d?int64,?mode?int)?//設置讀/寫超時時間 func?runtime_pollUnblock(ctx?uintptr)??//?解鎖? func?runtime_isPollServerDescriptor(fd?uintptr)?bool?? //?這里的ctx實際上是一個io?fd,不是上下文 //?mod?是?r?或者?w??,io事件畢竟只有有這兩種 //?d?意義和time.d差不多,就是關于時間的
這些方法的具體實現都在runtime下,我們挑幾個重要的看看:
//將就緒好得io事件,寫入就緒的grotion對列 //?netpollready?is?called?by?the?platform-specific?netpoll?function. //?It?declares?that?the?fd?associated?with?pd?is?ready?for?I/O. //?The?toRun?argument?is?used?to?build?a?list?of?goroutines?to?return //?from?netpoll.?The?mode?argument?is?'r',?'w',?or?'r'+'w'?to?indicate //?whether?the?fd?is?ready?for?reading?or?writing?or?both. // //?This?may?run?while?the?world?is?stopped,?so?write?barriers?are?not?allowed. //go:nowritebarrier func?netpollready(toRun?*gList,?pd?*pollDesc,?mode?int32)?{ var?rg,?wg?*g if?mode?==?'r'?||?mode?==?'r'+'w'?{ rg?=?netpollunblock(pd,?'r',?true) } if?mode?==?'w'?||?mode?==?'r'+'w'?{ wg?=?netpollunblock(pd,?'w',?true) } if?rg?!=?nil?{ toRun.push(rg) } if?wg?!=?nil?{ toRun.push(wg) } }
//輪詢時調用的方法,如果io就緒了返回ok,如果沒就緒,返回flase //?returns?true?if?IO?is?ready,?or?false?if?timedout?or?closed //?waitio?-?wait?only?for?completed?IO,?ignore?errors func?netpollblock(pd?*pollDesc,?mode?int32,?waitio?bool)?bool?{ gpp?:=?&pd.rg if?mode?==?'w'?{ gpp?=?&pd.wg } //?set?the?gpp?semaphore?to?pdWait for?{ old?:=?*gpp if?old?==?pdReady?{ *gpp?=?0 return?true } if?old?!=?0?{ throw("runtime:?double?wait") } if?atomic.Casuintptr(gpp,?0,?pdWait)?{ break } } //?need?to?recheck?error?states?after?setting?gpp?to?pdWait //?this?is?necessary?because?runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl //?do?the?opposite:?store?to?closing/rd/wd,?membarrier,?load?of?rg/wg if?waitio?||?netpollcheckerr(pd,?mode)?==?0?{ ??//gopark是很重要得一個方法,本質上是讓出當前協程執行權,一般是返回到g0讓g0重新調度 gopark(netpollblockcommit,?unsafe.Pointer(gpp),?waitReasonIOWait,?traceEvGoBlockNet,?5) } //?be?careful?to?not?lose?concurrent?pdReady?notification old?:=?atomic.Xchguintptr(gpp,?0) if?old?>?pdWait?{ throw("runtime:?corrupted?polldesc") } return?old?==?pdReady } //獲取到當前io所在的協程,如果協程已關閉,直接返回nil func?netpollunblock(pd?*pollDesc,?mode?int32,?ioready?bool)?*g?{ gpp?:=?&pd.rg if?mode?==?'w'?{ gpp?=?&pd.wg } for?{ old?:=?*gpp if?old?==?pdReady?{ return?nil } if?old?==?0?&&?!ioready?{ //?Only?set?pdReady?for?ioready.?runtime_pollWait //?will?check?for?timeout/cancel?before?waiting. return?nil } var?new?uintptr if?ioready?{ new?=?pdReady } if?atomic.Casuintptr(gpp,?old,?new)?{ if?old?==?pdWait?{ old?=?0 } return?(*g)(unsafe.Pointer(old)) } } }
思考:
所以設置的timeout,不一定是真實的io waiting,可能是沒有獲取到執行權。
因為寫io是我們主動操作的,那么讀是怎么進行操作的呢?這是一個被動的狀態
首先我們了解一個結構體。golang中所有的網絡事件和文件讀寫都用fd進行標識(位于internal包下)。
//?FD?is?a?file?descriptor.?The?net?and?os?packages?use?this?type?as?a //?field?of?a?larger?type?representing?a?network?connection?or?OS?file. type?FD?struct?{ //?Lock?sysfd?and?serialize?access?to?Read?and?Write?methods. fdmu?fdMutex //?System?file?descriptor.?Immutable?until?Close. Sysfd?int //?I/O?poller. pd?pollDesc //?Writev?cache. iovecs?*[]syscall.Iovec //?Semaphore?signaled?when?file?is?closed. csema?uint32 //?Non-zero?if?this?file?has?been?set?to?blocking?mode. isBlocking?uint32 //?Whether?this?is?a?streaming?descriptor,?as?opposed?to?a //?packet-based?descriptor?like?a?UDP?socket.?Immutable. IsStream?bool //?Whether?a?zero?byte?read?indicates?EOF.?This?is?false?for?a //?message?based?socket?connection. ZeroReadIsEOF?bool //?Whether?this?is?a?file?rather?than?a?network?socket. isFile?bool }
我們看到,fd中關聯的pollDesc,通過pollDesc調用了runtime包內部的實現的各種平臺的io事件。
當我們進行read操作時(下面是代碼截取)
for?{ n,?err?:=?ignoringEINTRIO(syscall.Read,?fd.Sysfd,?p) if?err?!=?nil?{ n?=?0 if?err?==?syscall.EAGAIN?&&?fd.pd.pollable()?{ if?err?=?fd.pd.waitRead(fd.isFile);?err?==?nil?{ continue } } } err?=?fd.eofError(n,?err) return?n,?err }
會阻塞調用waiteRead方法,方法內部主要就是調用的runtime_pollWait。
func?poll_runtime_pollWait(pd?*pollDesc,?mode?int)?int?{ errcode?:=?netpollcheckerr(pd,?int32(mode)) if?errcode?!=?pollNoError?{ return?errcode } //?As?for?now?only?Solaris,?illumos,?and?AIX?use?level-triggered?IO. if?GOOS?==?"solaris"?||?GOOS?==?"illumos"?||?GOOS?==?"aix"?{ netpollarm(pd,?mode) } for?!netpollblock(pd,?int32(mode),?false)?{ errcode?=?netpollcheckerr(pd,?int32(mode)) if?errcode?!=?pollNoError?{ return?errcode } //?Can?happen?if?timeout?has?fired?and?unblocked?us, //?but?before?we?had?a?chance?to?run,?timeout?has?been?reset. //?Pretend?it?has?not?happened?and?retry. } return?pollNoError }
這里主要是由netpollblock控制,netpollblock方法我們上面就說過,當io還未就緒的時候,直接釋放當前的執行權,否則就是已經課讀寫的io事件,直接進行讀取操作即可。
整體流程 listenStream –> bind&listen&init –> pollDesc.Init -> poll_runtime_pollOpen –> runtime.netpollopen -> epollctl(EPOLL_CTL_ADD)
畫個圖來更容易理解,當然,我偷懶是找的圖
golang中遇到io事件時,統一對其做了封裝,首先建立系統事件(本文主要針對epoll),然后讓出cpu(gopark),然后進行協程調度執行其他g。當g io事件完成時,會從epoll進行交互看是否就緒(epoll就緒列表),就緒則pop取出一個g往下執行,未就緒則調度其他g。(其實pop取就緒列表也有一定邏輯,時候延時處理之類的)
runtime/proc.go,
//?Finds?a?runnable?goroutine?to?execute. //?Tries?to?steal?from?other?P's,?get?g?from?local?or?global?queue,?poll?network. func?findrunnable()?(gp?*g,?inheritTime?bool)?{ _g_?:=?getg() //?The?conditions?here?and?in?handoffp?must?agree:?if //?findrunnable?would?return?a?G?to?run,?handoffp?must?start //?an?M. top: _p_?:=?_g_.m.p.ptr() //...... //?Poll?network. //?This?netpoll?is?only?an?optimization?before?we?resort?to?stealing. //?We?can?safely?skip?it?if?there?are?no?waiters?or?a?thread?is?blocked //?in?netpoll?already.?If?there?is?any?kind?of?logical?race?with?that //?blocked?thread?(e.g.?it?has?already?returned?from?netpoll,?but?does //?not?set?lastpoll?yet),?this?thread?will?do?blocking?netpoll?below //?anyway. if?netpollinited()?&&?atomic.Load(&netpollWaiters)?>?0?&&?atomic.Load64(&sched.lastpoll)?!=?0?{ if?list?:=?netpoll(0);?!list.empty()?{?//?non-blocking gp?:=?list.pop() injectglist(&list) casgstatus(gp,?_Gwaiting,?_Grunnable) if?trace.enabled?{ traceGoUnpark(gp,?0) } return?gp,?false } } //...... }
另外在sysmon中,也對netpoll進行了調度。
//?Always?runs?without?a?P,?so?write?barriers?are?not?allowed. // //go:nowritebarrierrec func?sysmon()?{ lock(&sched.lock) sched.nmsys++ checkdead() unlock(&sched.lock) //...... //?poll?network?if?not?polled?for?more?than?10ms lastpoll?:=?int64(atomic.Load64(&sched.lastpoll)) if?netpollinited()?&&?lastpoll?!=?0?&&?lastpoll+10*1000*1000?<?now?{ atomic.Cas64(&sched.lastpoll,?uint64(lastpoll),?uint64(now)) list?:=?netpoll(0)?//?non-blocking?-?returns?list?of?goroutines if?!list.empty()?{ //?Need?to?decrement?number?of?idle?locked?M's //?(pretending?that?one?more?is?running)?before?injectglist. //?Otherwise?it?can?lead?to?the?following?situation: //?injectglist?grabs?all?P's?but?before?it?starts?M's?to?run?the?P's, //?another?M?returns?from?syscall,?finishes?running?its?G, //?observes?that?there?is?no?work?to?do?and?no?other?running?M's //?and?reports?deadlock. incidlelocked(-1) injectglist(&list) incidlelocked(1) } } //...... }
epoll是由系統內核單獨維護的一個線程,不由go本身維護
FD_CLOEXEC用來設置文件的close-on-exec狀態標準。 這,emm 就挺難理解得。
pollDesc是由pollCache進行維護的,并且不受GC監控(persistentalloc方法分配),所以,在正常情況關于io的操作,我們一定要進行手動關閉,對epoll中的引用對象進行清理(具體實現在poll_runtime_Semrelease)。
//?Must?be?in?non-GC?memory?because?can?be?referenced //?only?from?epoll/kqueue?internals. mem?:=?persistentalloc(n*pdSize,?0,?&memstats.other_sys) for?i?:=?uintptr(0);?i?<?n;?i++?{ pd?:=?(*pollDesc)(add(mem,?i*pdSize)) pd.link?=?c.first c.first?=?pd }
Go 的標準庫提供了一種監測應用程序的線程,并幫你 (找尋) 程序可能遇到的瓶頸. 該線程稱為sysmon,即系統監視器 (system monitor).在GMP 模型中,這個 (特殊) 線程未鏈接任何的 P, 這意味著調度器 (scheduler) 沒有將其考慮在內, 因此始終處于運行狀態.
sysmon線程的作用很廣, 主要涉及以下方面:
|