schedule消费端
上一节我们大概了解下生产端的逻辑,我们再来一起看下消费端逻辑,大概逻辑如下图所示,下面我们一步一步看下每个流程的具体代码
+ - - - - - - - - - - -+ +- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -+
' schedule: ' ' top: '
' ' ' '
' +------------------+ ' +-----------------+ ' +--------------+ +----------------+ +-------------+ +---------+ +-----------+ ' +----+
' | findrunnable | ' --> | top | --> ' | wakefing | --> | runqget1 | --> | globrunqget | --> | netpoll | --> | runqsteal | ' --> | gc |
' +------------------+ ' +-----------------+ ' +--------------+ +----------------+ +-------------+ +---------+ +-----------+ ' +----+
' ' ' '
' ' +- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -+
' +------------------+ ' +-----------------+ +--------------+ +----------------+
' | schedule | ' --> | runtime.execute | --> | runtime.gogo | --> | runtime.goexit |
' +------------------+ ' +-----------------+ +--------------+ +----------------+
' +------------------+ '
' | globrunqget/61 | '
' +------------------+ '
' +------------------+ '
' | runnableGcWorker | '
' +------------------+ '
' +------------------+ '
' | runqget | '
' +------------------+ '
' '
+ - - - - - - - - - - -+
schedule()主函数
1 通过findrunnable()来获取一个g,下文详细根据代码讲解
2 通过execute(gp, inheritTime)将g运行在当前m上
3 gogo(&gp.sched) //通过汇编实现的,具体
4 goexit()执行完毕后,重新回到schedule()
//已删除多余部分代码
func schedule() {
//获取m
mp := getg().m
top:
//省略判断该go是否可被调度的代码
pp := mp.p.ptr()
//获取一个可被调度的g
gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available
execute(gp, inheritTime)
}
func execute(gp *g, inheritTime bool) {
mp := getg().m
// Assign gp.m before entering _Grunning so running Gs have an
// M.
mp.curg = gp
gp.m = mp
//修改g的状态
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
mp.p.ptr().schedtick++
}
//交由汇编去执行,主要逻辑为把g执行需要的东西恢复,从pc寄存器开始执行,执行完后调用runtime.Goexit
gogo(&gp.sched)
}
通过runtime.Goexit后,最终执行goexit0(gp *g),主要改变g状态由_Grunning到_Gdead,以及清理g上的资源,
// goexit continuation on g0.
func goexit0(gp *g) {
mp := getg().m
pp := mp.p.ptr()
//删除了清理工作代码
casgstatus(gp, _Grunning, _Gdead)
dropg()
//将g放入本地glist,如果本地glist大于64,则保持本地gFree32个元素,其余的移至全局gFree
gfput(pp, gp)
//重新执行调度
schedule()
}
大体流程已经看完了,我们在来看下schedule内部的细节部分
schedule.findRunnable
寻找g的过程,可以说是调度消费端的核心都在下面函数里,我们先回顾下上文流程图里也就是findrunnable->top的过程
主要过程为
- 检测是否需要gc stw,直到gc结束
- checkTimers保存now与pollUntil方便后面窃取g
- findRunnableGCWorker如果由gc需要执行,则返回gc的g
- 如果全局队列有g,61/1概率去全局队列拿g,在globrunqget里触发将全局队列g偷一部分到本地队列,偷g的逻辑为
pp.runq.len/2>(sched.runq.len/gomaxprocs)+1?pp.runq.len/2:(sched.runq.len/gomaxprocs)+1,同时返回sched.runq.head做为g1,return - wakefing 看是否有需要唤醒的g,有则唤醒
- 通过runqget从本地队列里取g,优先获取pp.runnext,如果为空,则从本地队列tail里取
- 本地队列没取到,通过globrunqget从全局队列里拿,拿的逻辑跟上文61/1一样
- netpoll里取一批go通过injectglist放入队列,如果没有p,全部添加到全局队列,并试图通过startIdle从_Pidle来获取p并返回,获取到p了后,将npidle个g
放入sched.runq(全局队列),剩余的g添加到本地队列pp.runq中,pp.runq满了则全部放入sched.runq - netpoll我们在后续章节想起讲解
- stealWork:窃取其它p上的g,如果空闲m数量小于工作的p一半时,执行stealWork窃取其它p上的g,循环4次,随机从一个p中拿g
- 后面在重复类似上面的逻辑,再找不到就退出
//删掉了其它部分代码,仅保留最基础的调度代码
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
mp := getg().m
top:
pp := mp.p.ptr()
// 1/61从全局取拿g
if pp.schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp := globrunqget(pp, 1)
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}
// 从本地p.runnext拿g
if gp, inheritTime := runqget(pp); gp != nil {
return gp, inheritTime, false
}
// 从全局取g
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(pp, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}
// netpoll 从epoll中获取一批g
if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false, false
}
}
//偷其它p上p.runq的g
if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
if !mp.spinning {
mp.becomeSpinning()
}
gp, inheritTime, tnow, w, newWork := stealWork(now)
if gp != nil {
// Successfully stole.
return gp, inheritTime, false
}
if newWork {
// There may be new timer or GC work; restart to
// discover.
goto top
}
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
// Earlier timer to wait for.
pollUntil = w
}
}
//todo 剩余一部分逻辑后面二刷写
}