協程調度時機三:搶佔式調度
說明
雖然我們一直強調golang調度器是非搶佔式。非搶佔式的一個最大壞處是無法保證公平性,如果一個g處於死循環狀態,那麼其他協程可能就會被餓死。 所幸的是,Golang在1.4版本中加入了搶佔式調度的邏輯,搶佔式調度必然可能在g執行的某個時刻被剝奪cpu,讓給其他協程。
實現
還記得我們之前說過Golang的sysmon協程么,該協程會定期喚醒作系統狀態檢查,我們前面說過了它如何檢查處於Psyscall狀態的p,以便讓處於系統調用狀態的P可以被繼續執行,不至於餓死。 除了檢查這個意外,sysmon還檢查處於Prunning狀態的P,檢查它的目的就是避免這裡的某個g佔用了過多的cpu時間,並在某個時刻剝奪其cpu運行時間。
static uint32 retake(int64 now)n{n uint32 i, s, n;n int64 t;n P *p;nn Pdesc *pd;n n = 0;nn for(i = 0; i < runtime·gomaxprocs; i++) {n p = runtime·allp[i];n if(p==nil)n continue;n pd = &pdesc[i];n s = p->status;n if(s == Psyscall) {n ......n } else if(s == Prunning) {n // Preempt G if its running for more than 10ms. n t = p->schedtick;n if(pd->schedtick != t) {n pd->schedtick = t;n pd->schedwhen = now;n continue;n }n if(pd->schedwhen + 10*1000*1000 > now)n continue;n // 如果自從上次發生調度時間已經超過了10ms n preemptone(p);n }n }n return n;n}nn// 這裡的搶佔只是將g的preempt設置為true n// 只有在g進行函數調用時才會檢查該標誌位 n// 並進而可能發生調度,非常弱 nstatic bool preemptone(P *p)n{n M *mp;n G *gp;nn mp = p->m;n if(mp == nil || mp == g->m)n return false;n gp = mp->curg;n if(gp == nil || gp == mp->g0)n return false;n gp->preempt = true;nn // Every call in a go routine checks for stack overflow by n // comparing the current stack pointer to gp->stackguard0. n // Setting gp->stackguard0 to StackPreempt folds n // preemption into the normal stack overflow check. n gp->stackguard0 = StackPreempt;n return true;n}n
之前我們說過在函數調用時會進行堆棧檢測,現在將gp->stackGuard0設置為StackPreempt(-1314,非常小的值),肯定會調用一次runtime.morestack,邏輯如下:
TEXT runtime·morestack(SB),NOSPLIT,$0-0 n // Cannot grow scheduler stack (m->g0).n get_tls(CX)n MOVQ g(CX), BX n MOVQ g_m(BX), BX n MOVQ m_g0(BX), SI n CMPQ g(CX), SI n JNE 2(PC)n INT $3 nn // Cannot grow signal stack (m->gsignal).n MOVQ m_gsignal(BX), SI n CMPQ g(CX), SI n JNE 2(PC)n INT $3 n // Called from f.n // Set m->morebuf to fs caller.n MOVQ 8(SP), AX // fs callers PCn MOVQ AX, (m_morebuf+gobuf_pc)(BX)n LEAQ 16(SP), AX // fs callers SPn MOVQ AX, (m_morebuf+gobuf_sp)(BX)n get_tls(CX)n MOVQ g(CX), SIn MOVQ SI, (m_morebuf+gobuf_g)(BX)nn // Set g->sched to context in f.n MOVQ 0(SP), AX // fs PCn MOVQ AX, (g_sched+gobuf_pc)(SI)n MOVQ SI, (g_sched+gobuf_g)(SI)n LEAQ 8(SP), AX // fs SPn MOVQ AX, (g_sched+gobuf_sp)(SI)n MOVQ DX, (g_sched+gobuf_ctxt)(SI)n MOVQ BP, (g_sched+gobuf_bp)(SI)n // Call newstack on m->g0s stack.n MOVQ m_g0(BX), BX n MOVQ BX, g(CX)nn MOVQ (g_sched+gobuf_sp)(BX), SP n CALL runtime·newstack(SB)nn MOVQ $0, 0x1003 // crash if newstack returnsn RET n
最終調用newstack來進行堆棧擴容:
func newstack() {n thisg := getg()n // TODO: double check all gp. shouldnt be getg().n if thisg.m.morebuf.g.ptr().stackguard0 == stackFork {n throw("stack growth after fork")n }n if thisg.m.morebuf.g.ptr() != thisg.m.curg {n print("runtime: newstack called from g=", thisg.m.morebuf.g, "n"+"tm=", thisg.m, " m->curg=", thisg.m.curg, " m->g0=", thisg.m.g0, " m->gsignal=", thisg.m.gsignal, "n")n morebuf := thisg.m.morebuf n traceback(morebuf.pc, morebuf.sp, morebuf.lr, morebuf.g.ptr())n throw("runtime: wrong goroutine in newstack")n }nn gp := thisg.m.curg n morebuf := thisg.m.morebuf n thisg.m.morebuf.pc = 0n thisg.m.morebuf.lr = 0n thisg.m.morebuf.sp = 0n thisg.m.morebuf.g = 0n rewindmorestack(&gp.sched)nn // NOTE: stackguard0 may change underfoot, if another threadn // is about to try to preempt gp. Read it just once and use that samen // value now and below.n preempt := atomicloaduintptr(&gp.stackguard0) == stackPreemptn if preempt {n if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {n // Let the goroutine keep running for now.n // gp->preempt is set, so it will be preempted next time.n gp.stackguard0 = gp.stack.lo + _StackGuardn gogo(&gp.sched) // never returnn }n }n ......n // 進行重新調度n if preempt {n if gp == thisg.m.g0 {n throw("runtime: preempt g0")n }n if thisg.m.p == 0 && thisg.m.locks == 0 {n throw("runtime: g is running but p is not")n }nn if gp.preemptscan {n for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) {n // Likely to be racing with the GC asn // it sees a _Gwaiting and does then // stack scan. If so, gcworkdone willn // be set and gcphasework will simplyn // return.n }n if !gp.gcscandone {n scanstack(gp)n gp.gcscandone = truen }n gp.preemptscan = falsen gp.preempt = falsen casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting)n casgstatus(gp, _Gwaiting, _Grunning)n gp.stackguard0 = gp.stack.lo + _StackGuardn gogo(&gp.sched) // never returnn }nn // Act like goroutine called runtime.Gosched.n casgstatus(gp, _Gwaiting, _Grunning)n // 放棄當前協程,調度新協程執行n gopreempt_m(gp) // never returnn }n}n
這裡需要注意兩個東西:
- thisg := getg():這個代表當前執行newstack()函數的堆棧,也是當前線程的g0的stack;
- gp := thisg.m.curg:這個代表的是申請棧擴容的協程,與上面的thisg不是一個東西。
因為雖然調用了newstack,但是對於stackguard0==stackPreempt的協程來說,它的目的壓根不是堆棧擴容,而是發起一次調度,所以直接進入了gopreempt_m,這裡將當前協程掛起,並發起一次schedule().
推薦閱讀:
※如何理解 slice 的 capacity?
※選擇學習 C 語言、Go 語言、C++11 各有哪些優缺點?
※為啥 Erlang 沒有像 Go、Scala 語言那樣崛起?
※現在想再學習一門編程語言,應該選擇go還是python?
※Golang裡面defer的執行順序為什麼是逆序的?