schedule消费端

上一节我们大概了解下生产端的逻辑,我们再来一起看下消费端逻辑,大概逻辑如下图所示,下面我们一步一步看下每个流程的具体代码

+ - - - - - - - - - - -+                             +- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -+
' schedule:            '                             ' top:                                                                                          '
'                      '                             '                                                                                               '
' +------------------+ '     +-----------------+     ' +--------------+     +----------------+     +-------------+     +---------+     +-----------+ '     +----+
' |   findrunnable   | ' --> |       top       | --> ' |   wakefing   | --> |    runqget1    | --> | globrunqget | --> | netpoll | --> | runqsteal | ' --> | gc |
' +------------------+ '     +-----------------+     ' +--------------+     +----------------+     +-------------+     +---------+     +-----------+ '     +----+
'                      '                             '                                                                                               '
'                      '                             +- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -+
' +------------------+ '     +-----------------+       +--------------+     +----------------+
' |     schedule     | ' --> | runtime.execute | -->   | runtime.gogo | --> | runtime.goexit |
' +------------------+ '     +-----------------+       +--------------+     +----------------+
' +------------------+ '
' |  globrunqget/61  | '
' +------------------+ '
' +------------------+ '
' | runnableGcWorker | '
' +------------------+ '
' +------------------+ '
' |     runqget      | '
' +------------------+ '
'                      '
+ - - - - - - - - - - -+

schedule()主函数

1 通过findrunnable()来获取一个g,下文详细根据代码讲解
2 通过execute(gp, inheritTime)将g运行在当前m上 3 gogo(&gp.sched) //通过汇编实现的,具体 4 goexit()执行完毕后,重新回到schedule()

//已删除多余部分代码
func schedule() {
	//获取m
	mp := getg().m
top:
	//省略判断该go是否可被调度的代码
	pp := mp.p.ptr()
	//获取一个可被调度的g
	gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available
	execute(gp, inheritTime)
}
func execute(gp *g, inheritTime bool) {
	mp := getg().m
	// Assign gp.m before entering _Grunning so running Gs have an
	// M.
	mp.curg = gp
	gp.m = mp
	//修改g的状态
	casgstatus(gp, _Grunnable, _Grunning)
	gp.waitsince = 0
	gp.preempt = false
	gp.stackguard0 = gp.stack.lo + _StackGuard
	if !inheritTime {
		mp.p.ptr().schedtick++
	}
	//交由汇编去执行,主要逻辑为把g执行需要的东西恢复,从pc寄存器开始执行,执行完后调用runtime.Goexit
	gogo(&gp.sched)
}

通过runtime.Goexit后,最终执行goexit0(gp *g),主要改变g状态由_Grunning到_Gdead,以及清理g上的资源,

// goexit continuation on g0.
func goexit0(gp *g) {
	mp := getg().m
	pp := mp.p.ptr()
    //删除了清理工作代码
	casgstatus(gp, _Grunning, _Gdead)
	dropg()
	//将g放入本地glist,如果本地glist大于64,则保持本地gFree32个元素,其余的移至全局gFree
	gfput(pp, gp)
	//重新执行调度
	schedule()
}

大体流程已经看完了,我们在来看下schedule内部的细节部分

schedule.findRunnable

寻找g的过程,可以说是调度消费端的核心都在下面函数里,我们先回顾下上文流程图里也就是findrunnable->top的过程
主要过程为

  • 检测是否需要gc stw,直到gc结束
  • checkTimers保存now与pollUntil方便后面窃取g
  • findRunnableGCWorker如果由gc需要执行,则返回gc的g
  • 如果全局队列有g,61/1概率去全局队列拿g,在globrunqget里触发将全局队列g偷一部分到本地队列,偷g的逻辑为
    pp.runq.len/2>(sched.runq.len/gomaxprocs)+1?pp.runq.len/2:(sched.runq.len/gomaxprocs)+1,同时返回sched.runq.head做为g1,return
  • wakefing 看是否有需要唤醒的g,有则唤醒
  • 通过runqget从本地队列里取g,优先获取pp.runnext,如果为空,则从本地队列tail里取
  • 本地队列没取到,通过globrunqget从全局队列里拿,拿的逻辑跟上文61/1一样
  • netpoll里取一批go通过injectglist放入队列,如果没有p,全部添加到全局队列,并试图通过startIdle从_Pidle来获取p并返回,获取到p了后,将npidle个g
    放入sched.runq(全局队列),剩余的g添加到本地队列pp.runq中,pp.runq满了则全部放入sched.runq
  • netpoll我们在后续章节想起讲解
  • stealWork:窃取其它p上的g,如果空闲m数量小于工作的p一半时,执行stealWork窃取其它p上的g,循环4次,随机从一个p中拿g
  • 后面在重复类似上面的逻辑,再找不到就退出
//删掉了其它部分代码,仅保留最基础的调度代码
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
	mp := getg().m
top:
	pp := mp.p.ptr()
 
	// 1/61从全局取拿g
	if pp.schedtick%61 == 0 && sched.runqsize > 0 {
		lock(&sched.lock)
		gp := globrunqget(pp, 1)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false, false
		}
	}
	
	// 从本地p.runnext拿g
	if gp, inheritTime := runqget(pp); gp != nil {
		return gp, inheritTime, false
	}

	// 从全局取g
	if sched.runqsize != 0 {
		lock(&sched.lock)
		gp := globrunqget(pp, 0)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false, false
		}
	}
	// netpoll 从epoll中获取一批g
	if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {
		if list := netpoll(0); !list.empty() { // non-blocking
			gp := list.pop()
			injectglist(&list)
			casgstatus(gp, _Gwaiting, _Grunnable)
			if trace.enabled {
				traceGoUnpark(gp, 0)
			}
			return gp, false, false
		}
	}
	//偷其它p上p.runq的g
	if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
		if !mp.spinning {
			mp.becomeSpinning()
		}

		gp, inheritTime, tnow, w, newWork := stealWork(now)
		if gp != nil {
			// Successfully stole.
			return gp, inheritTime, false
		}
		if newWork {
			// There may be new timer or GC work; restart to
			// discover.
			goto top
		}

		now = tnow
		if w != 0 && (pollUntil == 0 || w < pollUntil) {
			// Earlier timer to wait for.
			pollUntil = w
		}
	}
	//todo 剩余一部分逻辑后面二刷写
}