1 linux cfs调度

在讲go调度前,我们大概linux进程调度是怎么做的,不感兴趣可跳到章节2,不影响整体阅读 感兴趣的建议阅读
Linux CFS 调度器:原理、设计与内核实现
内核CFS文档

1.1 csf概述

CFS Completely Fair Scheduler简称,即完全公平调度器 假如我们有一个cpu,有100%的使用权,有两个task在运行,cgroup设定各使用50%,cfs则负责讲cpu时间片分给这两个task(调度线程)
所以cfs就是维护进程时间方面的平衡,给进程分配相当数量的处理器

1.2 cfs大概实现原理

所有需要调度的进程(也可以是线程)都被存放在vruntime红黑树上,值为进程的虚拟运行时间,调度后会根据公式 vruntime = 实际运行时间*1024 / 进程权重来更新值,即插入红黑树最右侧,每次调度都会从最左侧取值

2 go schedule

官方gmp设计文档 下面让我们来看下go schdule中相关的概念,即gmp

  • g 用户态线程,代表一个goroutine,go中调度单位,主要包含当前执行的栈信息,以及当前goroutine的状态
  • m 内核线程,就是被上文cfs调度的任务
  • p 简单理解成队列,存放g需要被调度的资源信息,每个调度g的m都由一个p,阻塞或系统调用时间过长的m除外,会解绑p且创建新的m与该p绑定
  • sched

我们先思考几个问题

  • 我们有10个待执行的g,4个m如何调度,即g>m
  • 我们有两个g,4个m,即g<m,则会存在m空闲

所以我们的调度器得知道所有m的状态,分配好每个m对应的g,
在go 1.0版本的时候,是多线程调度器,g存在一个全局队列,所有的m都是去全局队列拿g,这种情况可以很好的让每个m拿到对应的g,最大问题是拿g的时候锁竞争严重
在go1.11版本引入了word-stealing调度算法,上文文档有介绍该机制,引入了gmp模型,这时候每一个m对应每一个p,又引入了新问题,假如一个g长时间占用m,该m上的其它g得不到调用
在go1.12版本中引入了协作抢占试调用,1.14版本修改成了信号量抢占调用,下文我们专门讲解基于信号量的抢占式调用

img.png 大概的调度模型我们了解了,下面我们来看下m的细节,简单理解就是p有任务就拿,没有就去别的p偷,再没有就去全局偷,实在没事干就自旋,如果系统调用达到一定时间后
自身的p就会被拿走找有没有自旋的m,有就给过去,没有就创建新的,下面我们来看看m的代码

2.1 gmp中的主要结构

img_2.png m就是操作系统的线程,我们先看下几个重要的字段

// src/runtime/runtime2.go
type m struct {
    // 用于执行调度指令的g
	g0      *g     // goroutine with scheduling stack
    //处理signal的g
	gsignal       *g                // signal-handling g
	//线程本地存储
	tls           [tlsSlots]uintptr // thread-local storage (for x86 extern register)
	//当前运行的g
	curg          *g       // current running goroutine
	
	// 用于执行 go 代码的处理器p
	p             puintptr // attached p for executing go code (nil if not executing go code)
	//暂存的处理的
	nextp         puintptr
	//执行系统调用之前使用线程的处理器p
	oldp          puintptr // the p that was attached before executing a syscall
	
    //m没有运行work,正在寻找work即表示自身的自旋和非自旋状态
    spinning      bool // m is out of work and is actively looking for work
	
    //cgo相关
    ncgocall      uint64        // number of cgo calls in total
    ncgo          int32         // number of cgo calls currently in progress
    cgoCallersUse atomic.Uint32 // if non-zero, cgoCallers in use temporarily
    cgoCallers    *cgoCallers   // cgo traceback if crashing in cgo call
	//将自己与其他的 M 进行串联,即链表
	alllink     *m			// 在 allm 上
}

我们再来看看p的结构m,简单来说,p存在的意义是实现工作窃取(work stealing)算法,就是存放g的给m使用的本地队列, p主要存放了可被调度的goroutine,以及用于m执行的内存分配(不需要加锁)。 上有侵

// src/runtime/runtime2.go
// 其余很多字段都删减了,仅保留关键字段
type p struct {
	id          int32
	//p的状态
	status      uint32 // one of pidle/prunning/...
	//p链表
	link        puintptr
	//链接到m
	m           muintptr   // back-link to associated m (nil if idle)
	//go的内存分配相关,后文我们将会详细讲解
	mcache      *mcache
	pcache      pageCache

	//不同大小的可用的 defer 结构池
	deferpool    []*_defer // pool of available defer structs (see panic.go)
	deferpoolbuf [32]*_defer


	//可运行的goroutine队列,无锁访问
	// Queue of runnable goroutines. Accessed without lock.
	runqhead uint32
	runqtail uint32
	runq     [256]guintptr
	
	//简单来说就是插队的g
	// runnext, if non-nil, is a runnable G that was ready'd by
	// the current G and should be run next instead of what's in
	// runq if there's time remaining in the running G's time
	// slice. It will inherit the time left in the current time
	// slice. If a set of goroutines is locked in a
	// communicate-and-wait pattern, this schedules that set as a
	// unit and eliminates the (potentially large) scheduling
	// latency that otherwise arises from adding the ready'd
	// goroutines to the end of the run queue.
	//
	// Note that while other P's may atomically CAS this to zero,
	// only the owner P can CAS it to a valid G.
	runnext guintptr

	//可用的g状态链表
	// Available G's (status == Gdead)
	gFree struct {
		gList
		n int32
	}

	//todo 结合后面的发送数据详解
	sudogcache []*sudog
	sudogbuf   [128]*sudog

	//go内存分配的span,在内存分配文章详细讲解
	// Cache of mspan objects from the heap.
	mspancache struct {
		// We need an explicit length here because this field is used
		// in allocation codepaths where write barriers are not allowed,
		// and eliminating the write barrier/keeping it eliminated from
		// slice updates is tricky, more so than just managing the length
		// ourselves.
		len int
		buf [128]*mspan
	}

	//优先被调度
	// preempt is set to indicate that this P should be enter the
	// scheduler ASAP (regardless of what G is running on it).
	preempt bool

}

下面我们再简单看看g的结构

// src/runtime/runtime2.go
type g struct {
	//存放栈内存 边界为[lo, hi)
	//type stack struct {
	//    lo uintptr 
	//    hi uintptr
	//}
	stack       stack   // offset known to runtime/cgo
	stackguard0 uintptr // offset known to liblink
	stackguard1 uintptr // offset known to liblink

	_panic    *_panic // innermost panic - offset known to liblink
	_defer    *_defer // innermost defer
	//当前的m
	m         *m      // current m; offset known to arm liblink
	sched     gobuf
    //被唤醒时传递的参数
	param        unsafe.Pointer
	atomicstatus atomic.Uint32
	stackLock    uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
	goid         uint64
	schedlink    guintptr

	//抢占信号
	preempt       bool // preemption signal, duplicates stackguard0 = stackpreempt
}

g主要就是定义了执行栈,以及调试等,执行的时候讲参数拷贝, 保存执行函数的入口地址
下面我们在来看看sched的结构 sched主要管理p,全局的g队列,defer池,以及m

// src/runtime/runtime2.go
type schedt struct {
	lock mutex
	
	//m相关
	//自旋的m
	midle        muintptr // idle m's waiting for work
	//自旋的m数
	nmidle       int32    // number of idle m's waiting for work
	nmidlelocked int32    // number of locked m's waiting for work
	//已创建的m和下一个mid,及序号表示mid
	mnext        int64    // number of m's that have been created and next M ID
	maxmcount    int32    // maximum number of m's allowed (or die)
	nmsys        int32    // number of system m's not counted for deadlock
	nmfreed      int64    // cumulative number of freed m's

	ngsys atomic.Int32 // number of system goroutines

	//空闲p链表
	pidle        puintptr // idle p's
	//空闲p数量
	npidle       atomic.Int32
	//自旋m的数量
	nmspinning   atomic.Int32  // See "Worker thread parking/unparking" comment in proc.go.

	//全局队列
	// Global runnable queue.
	runq     gQueue
	runqsize int32
	
	// Global cache of dead G's.
	gFree struct {
		lock    mutex
		stack   gList // Gs with stacks //有栈的g链表
		noStack gList // Gs without stacks //没栈的g链表
		n       int32
	}

	//一集缓存,上文中p结构体上有二级缓存
	// Central cache of sudog structs.
	sudoglock  mutex
	sudogcache *sudog

	// Central pool of available defer structs.
	deferlock mutex
	deferpool *_defer

	// freem is the list of m's waiting to be freed when their
	// m.exited is set. Linked through m.freelink.
	freem *m
	
}

在go启动流程中,我们了解到了schedinit,根据上文的gmp结构,我们看下关于调度器的初始化

// src/runtime/proc.go
// The bootstrap sequence is:
//
//	call osinit
//	call schedinit
//	make & queue new G
//	call runtime·mstart
//
// The new G calls runtime·main.
func schedinit() {
    //m初始化
    mcommoninit(gp.m, -1)
    //p的初始化
    if procresize(procs) != nil {
        throw("unknown runnable goroutine during bootstrap")
    }
}

m的初始化

// src/runtime/proc.go
func mcommoninit(mp *m, id int64) {
	gp := getg()

	//不是g0堆栈信息不展示给用户
	// g0 stack won't make sense for user (and is not necessary unwindable).
	if gp != gp.m.g0 {
		callers(1, mp.createstack[:])
	}

	lock(&sched.lock)
	
	if id >= 0 {
		mp.id = id
	} else {
		//更新m的数量以及id
		mp.id = mReserveID()
	}
    //栈相关
	lo := uint32(int64Hash(uint64(mp.id), fastrandseed))
	hi := uint32(int64Hash(uint64(cputicks()), ^fastrandseed))
	if lo|hi == 0 {
		hi = 1
	}
	// Same behavior as for 1.17.
	// TODO: Simplify this.
	if goarch.BigEndian {
		mp.fastrand = uint64(lo)<<32 | uint64(hi)
	} else {
		mp.fastrand = uint64(hi)<<32 | uint64(lo)
	}

    //初始化一个新的m,子线程
	mpreinit(mp)
	//信号处理相关
	if mp.gsignal != nil {
		mp.gsignal.stackguard1 = mp.gsignal.stack.lo + stackGuard
	}

    // 添加到 allm 中,从而当它刚保存到寄存器或本地线程存储时候 GC 不会释放 g.m
	// Add to allm so garbage collector doesn't free g->m
	// when it is just in a register or thread-local storage.
	mp.alllink = allm

	// NumCgoCall() iterates over allm w/o schedlock,
	// so we need to publish it safely.
	atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
	unlock(&sched.lock)

	//cgo相关
	// Allocate memory to hold a cgo traceback if the cgo call crashes.
	if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" {
		mp.cgoCallers = new(cgoCallers)
	}
}

p的初始化

简单的描述下干了哪些事

  • 按找nprocs的数量调整allp的大小(runtime.GOMAXPROCS()),并且初始化部分p
  • 如果当前p里还有g,没有被移除,讲p状态设置未_Prunning,否则将第一个p给当前的m绑定
  • 从allp移除不需要的p,将释放的p队列的任务扔进全局队列
  • 最后挨个检查p,将没有任务的p放入idle队列,并将初当前p且没有任务的p连成链表 p的状态分别为_Pidle,_Prunning,_Psyscall,_Pgcstop,_Pdead
  • _Pidle 未运行g的p
  • _Prunning 已经被m绑定
  • _Psyscall 没有运行用户代码,与系统调用的m解绑后的状态
  • _Pgcstop P因gc STW 而停止
  • _Pdead GOMAXPROCS收缩不需要这个p了,逻辑我们下文有讲
func procresize(nprocs int32) *p {
    //获取p的个数
	old := gomaxprocs
	if old < 0 || nprocs <= 0 {
		throw("procresize: invalid arg")
	}
	if traceEnabled() {
		traceGomaxprocs(nprocs)
	}

	//更新统计信息,记录此次修改gomaxprocs的时间
	// update statistics
	now := nanotime()
	if sched.procresizetime != 0 {
		sched.totaltime += int64(old) * (now - sched.procresizetime)
	}
	sched.procresizetime = now

	maskWords := (nprocs + 31) / 32

    // 必要时增加 allp
    // 这个时候本质上是在检查用户代码是否有调用过 runtime.MAXGOPROCS 调整 p 的数量
    // 此处多一步检查是为了避免内部的锁,如果 nprocs 明显小于 allp 的可见数量(因为 len)
    // 则不需要进行加锁
	// Grow allp if necessary.
	if nprocs > int32(len(allp)) {
		//与 retake 同步,它可以同时运行,因为它不在 P 上运行
		// Synchronize with retake, which could be running
		// concurrently since it doesn't run on a P.
		lock(&allpLock)
		if nprocs <= int32(cap(allp)) {
			//nprocs变小,则只保留allp数组里0-nprocs个p
			allp = allp[:nprocs]
		} else {
			//调大创建新的p
			nallp := make([]*p, nprocs)
			//保证旧p不被丢弃
			// Copy everything up to allp's cap so we
			// never lose old allocated Ps.
			copy(nallp, allp[:cap(allp)])
			allp = nallp
		}

		if maskWords <= int32(cap(idlepMask)) {
			idlepMask = idlepMask[:maskWords]
			timerpMask = timerpMask[:maskWords]
		} else {
			nidlepMask := make([]uint32, maskWords)
			// No need to copy beyond len, old Ps are irrelevant.
			copy(nidlepMask, idlepMask)
			idlepMask = nidlepMask

			ntimerpMask := make([]uint32, maskWords)
			copy(ntimerpMask, timerpMask)
			timerpMask = ntimerpMask
		}
		unlock(&allpLock)
	}

	//初始化新的p
	// initialize new P's
	for i := old; i < nprocs; i++ {
		pp := allp[i]
		if pp == nil {
			pp = new(p)
		}
		//具体初始化流程 我们会在下文讲解
		pp.init(i)
		atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
	}
	
	gp := getg()
	//如果当前的p的id(第一个p id = 1)大于调整后的p的数量,一般指处理器数量
	if gp.m.p != 0 && gp.m.p.ptr().id < nprocs {
		// continue to use the current P
		gp.m.p.ptr().status = _Prunning
		gp.m.p.ptr().mcache.prepareForSweep()
	} else {
		// release the current P and acquire allp[0].
		//
		// We must do this before destroying our current P
		// because p.destroy itself has write barriers, so we
		// need to do that from a valid P.
		if gp.m.p != 0 {
			if traceEnabled() {
				// Pretend that we were descheduled
				// and then scheduled again to keep
				// the trace sane.
				traceGoSched()
				traceProcStop(gp.m.p.ptr())
			}
			gp.m.p.ptr().m = 0
		}
		//切换到p0执行
		gp.m.p = 0
		pp := allp[0]
		pp.m = 0
		pp.status = _Pidle
		//将p0绑定到当前m
		acquirep(pp)
		if traceEnabled() {
			traceGoStart()
		}
	}
    //gmp已经被设置,不需要mcache0引导
	// g.m.p is now set, so we no longer need mcache0 for bootstrapping.
	mcache0 = nil

	//释放未使用的p资源
	// release resources from unused P's
	for i := nprocs; i < old; i++ {
		pp := allp[i]
		pp.destroy()
		// can't free P itself because it can be referenced by an M in syscall
	}

	// Trim allp.
	if int32(len(allp)) != nprocs {
		lock(&allpLock)
		allp = allp[:nprocs]
		idlepMask = idlepMask[:maskWords]
		timerpMask = timerpMask[:maskWords]
		unlock(&allpLock)
	}

	var runnablePs *p
	for i := nprocs - 1; i >= 0; i-- {
		pp := allp[i]
		if gp.m.p.ptr() == pp {
			continue
		}
		pp.status = _Pidle
		//在本地队列里没有g
		if runqempty(pp) {
            //放入_Pidle队列里
			pidleput(pp, now)
		} else {
            //p队列里有g,代表有任务,绑定一个m
			pp.m.set(mget())
            //构建可运行的p链表
			pp.link.set(runnablePs)
			runnablePs = pp
		}
	}
	stealOrder.reset(uint32(nprocs))
	var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
	atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
	if old != nprocs {
		//通知限制器p数量改了
		// Notify the limiter that the amount of procs has changed.
		gcCPULimiter.resetCapacity(now, nprocs)
	}
    //返回所有包含本地任务的链表
	return runnablePs
}
//初始化p,新创建或者之前被销毁的p
// init initializes pp, which may be a freshly allocated p or a
// previously destroyed p, and transitions it to status _Pgcstop.
func (pp *p) init(id int32) {
	// p 的 id 就是它在 allp 中的索引
	pp.id = id
	// 新创建的 p 处于 _Pgcstop 状态
	pp.status = _Pgcstop
	pp.sudogcache = pp.sudogbuf[:0]
	pp.deferpool = pp.deferpoolbuf[:0]
	pp.wbBuf.reset() 
	// 为 P 分配 cache 对象
	if pp.mcache == nil {
		//id == 0 说明这是引导阶段初始化第一个 p
		if id == 0 {
			if mcache0 == nil {
				throw("missing mcache?")
			}
			// Use the bootstrap mcache0. Only one P will get
			// mcache0: the one with ID 0.
			pp.mcache = mcache0
		} else {
			pp.mcache = allocmcache()
		}
	}
	if raceenabled && pp.raceprocctx == 0 {
		if id == 0 {
			pp.raceprocctx = raceprocctx0
			raceprocctx0 = 0 // bootstrap
		} else {
			pp.raceprocctx = raceproccreate()
		}
	}
	lockInit(&pp.timersLock, lockRankTimers)

	// This P may get timers when it starts running. Set the mask here
	// since the P may not go through pidleget (notably P 0 on startup).
	timerpMask.set(id)
	// Similarly, we may not go through pidleget before this P starts
	// running if it is P 0 on startup.
	idlepMask.clear(id)
}

g 的初始化

g的状态如下,不需要记住,大概有印象就行了,不清楚的时候可以查到

  • _Gidle 表示这个 goroutine 刚刚分配,还没有初始化
  • _Grunnable 表示这个 goroutine 在运行队列中。它当前不在执行用户代码。堆栈不被拥有。
  • _Grunning 意味着这个 goroutine 可以执行用户代码。堆栈由这个 goroutine 拥有。它不在运行队列中。它被分配了一个 M 和一个 P(g.m 和 g.m.p 是有效的)
  • _Gsyscall 表示这个 goroutine 正在执行一个系统调用。它不执行用户代码。堆栈由这个 goroutine 拥有。它不在运行队列中。它被分配了一个 m
  • _Gwaiting 表示这个 goroutine 在运行时被阻塞
  • _Gmoribund_unused 暂未使用
  • _Gdead 这个 goroutine当前未被使用。它可能刚刚退出,处于空闲列表中,或者刚刚被初始化。它不执行用户代码。它可能有也可能没有分配堆栈。 G 及其堆栈(如果有)由退出G或从空闲列表中获得G的M所有。
  • _Genqueue_unused 暂未使用
  • _Gcopystack 意味着这个 goroutine 的堆栈正在被移动
  • _Gpreempted 意味着这个 goroutine 停止了自己以进行 suspendG 抢占
//gc相关的g状态,放到后面章节讲解
_Gscan          = 0x1000
_Gscanrunnable  = _Gscan + _Grunnable  // 0x1001
_Gscanrunning   = _Gscan + _Grunning   // 0x1002
_Gscansyscall   = _Gscan + _Gsyscall   // 0x1003
_Gscanwaiting   = _Gscan + _Gwaiting   // 0x1004
_Gscanpreempted = _Gscan + _Gpreempted // 0x1009

启动go的方法为runtime.newproc,整体流程如下

  • 调用runtime.newproc1
  • 通过gfget从p.gFree上获取一个g,如果p.gFree为空,则通过sched.gFree.stack.pop()从全局队列捞一批,当p.gFree=32||sched.gFree=nil中止
  • 如果没有获取到,通过malg与allgadd分配一个新的g简称newg,然后把newg放入p的runnext上
  • 若runnext为空,则直接分配,若runnext不为空,则把runnext上存在的旧的g简称oldg放入p.runq即p的本地队列末尾,若runq满了
  • 调用runqputslow将p.runq上的g分配一半到全局队列,同时将oldg添加到末尾
                             +---------------------+
                             |       go func       |
                             +---------------------+
                               |
                               |
                               v
                             +---------------------+
                             |  runtime.newproc1   |
                             +---------------------+
                               |
                               |
                               v
                             +---------------------+
                             |   malg && allgadd   |
                             +---------------------+
                               |
                               |
                               v
+--------------+             +-------------------------------+
|    wakep     | <---------- |            runqput            |
+--------------+             +-------------------------------+
                               |                      ^    ^
                               |                      |    |
                               v                      |    |
+--------------+  runnext空   +---------------------+  |    |
| pp.runq=newg | <---------- |       runnext       |  |    |
+--------------+             +---------------------+  |    |
  |                            |                      |    |
  |                            | runnext非空            |    |
  |                            v                      |    |
  |                          +---------------------+  |    |
  |                          |   pp.runnext=newg   |  |    |
  |                          +---------------------+  |    |
  |                            |                      |    |
  |                            |                      |    |
  |                            v                      |    |
  |                          +---------------------+  |    |
  |                          | put oldg to pp.runq |  |    |
  |                          +---------------------+  |    |
  |                            |                      |    |
  |                            | pp.runq full         |    |
  |                            v                      |    |
  |                          +---------------------+  |    |
  |                          |     runqputslow     | -+    |
  |                          +---------------------+       |
  |                                                        |
  +--------------------------------------------------------+
// src/runtime/proc.go

// Create a new g running fn.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
func newproc(fn *funcval) {
	//很熟悉的方法了,获取一个g
	gp := getg()
	//获取pc/ip寄存器的值
	pc := getcallerpc()
	systemstack(func() {
		//创建一个新的 g,状态为_Grunnable
		newg := newproc1(fn, gp, pc)
        //pp即我们上文中的p,该m上绑定的p
		pp := getg().m.p.ptr()
		//runqput 尝试将 g 放入本地可运行队列。如果 next 为假,runqput 将   
		//g 添加到可运行队列的尾部。如果 next 为真,则 runqput 将 g 放入 pp.runnext 槽中  
		//如果运行队列已满,则 runnext 将 g 放入全局队列。仅由所有者 P 执行。
		//我们回忆下,pp.runnext是插队的g,将在下一个调用
		runqput(pp, newg, true)
        
		//todo 暂时理解不够深刻,后续二刷
		//如果主m已经启动,尝试再添加一个 P 来执行 G
		if mainStarted {
			wakep()
		}
	})
}

下面我们再来看下newproc1(),即创建一个新的g

// Create a new g in state _Grunnable, starting at fn. callerpc is the
// address of the go statement that created this. The caller is responsible
// for adding the new g to the scheduler.
func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {
	if fn == nil {
		fatal("go of nil func value")
	}
	mp := acquirem() // disable preemption because we hold M and P in local vars.
	//获取当前m上绑定的p
	pp := mp.p.ptr()
	//根据p获取一个g,从g上的gFree取,如果没有,从全局捞一批回来(从sched.gFree里取,直到sched.gFree没有或者pp.gFree里>=32为止)
	newg := gfget(pp)
	//如果没取到g(初始化是没有g的)
	if newg == nil {
		//	stackMin = 2048
		//创建2k的栈
		newg = malg(stackMin)
		//将新创建的g从_Gidle更新为_Gdead
		casgstatus(newg, _Gidle, _Gdead)
		//将_Gdead的栈添加到allg,gc不会扫描未初始化的栈
		allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
	}
    //省略了部分各种赋值堆栈指针给调度sched,以及g
	
	//由_Gidle变成了_Gdead变成了_Grunnable
	//分配好寄存器啥的,将g从_Gdead变成_Grunnable
	casgstatus(newg, _Gdead, _Grunnable)
	//分配go id
	newg.goid = pp.goidcache
	//用于下一次go分配
	pp.goidcache++
	if raceenabled {
		//分配data race的ctx
		newg.racectx = racegostart(callerpc)
		if newg.labels != nil {
			// See note in proflabel.go on labelSync's role in synchronizing
			// with the reads in the signal handler.
			racereleasemergeg(newg, unsafe.Pointer(&labelSync))
		}
	}
	if traceEnabled() {
		traceGoCreate(newg, newg.startpc)
	}
	//恢复抢占请求
	releasem(mp)
	return newg
}
func runqput(pp *p, gp *g, next bool) {
	//true
	if next {
    //将newg分配到runnext,直到成功为止
	retryNext:
		oldnext := pp.runnext
		if !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
			goto retryNext
		}
		if oldnext == 0 {
			return
		}
		// Kick the old runnext out to the regular run queue.
		gp = oldnext.ptr()
	}

retry:
	h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with consumers
	t := pp.runqtail
	//如果本地队列没满的情况,将oldg分配到p的本地队列
	if t-h < uint32(len(pp.runq)) {
		pp.runq[t%uint32(len(pp.runq))].set(gp)
		atomic.StoreRel(&pp.runqtail, t+1) // store-release, makes the item available for consumption
		return
	}
	//本地p上的glist满了,将oldg放入全局glist
	if runqputslow(pp, gp, h, t) {
		return
	}
	// the queue is not full, now the put above must succeed
	goto retry
}

// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.
//将p上的glist一次取1/2放入全局glist,同时将gp插入末尾
func runqputslow(pp *p, gp *g, h, t uint32) bool {
	var batch [len(pp.runq)/2 + 1]*g

	// First, grab a batch from local queue.
	n := t - h
	n = n / 2
	if n != uint32(len(pp.runq)/2) {
		throw("runqputslow: queue is not full")
	}
	for i := uint32(0); i < n; i++ {
		batch[i] = pp.runq[(h+i)%uint32(len(pp.runq))].ptr()
	}
	if !atomic.CasRel(&pp.runqhead, h, h+n) { // cas-release, commits consume
		return false
	}
	batch[n] = gp

	if randomizeScheduler {
		for i := uint32(1); i <= n; i++ {
			j := fastrandn(i + 1)
			batch[i], batch[j] = batch[j], batch[i]
		}
	}

	// Link the goroutines.
	for i := uint32(0); i < n; i++ {
		batch[i].schedlink.set(batch[i+1])
	}
	var q gQueue
	q.head.set(batch[0])
	q.tail.set(batch[n])

	// Now put the batch on global queue.
	lock(&sched.lock)
	globrunqputbatch(&q, int32(n+1))
	unlock(&sched.lock)
	return true
}

我们来总结下创建g的过程

  • 尝试从本地p获取已经执行过的g,本地p为0则一次从全局队列捞32个,再返回p
  • 如果拿不到g(初始化没有),则创建一个g,并分配线程执行栈,g处于_Gidle状态
  • 创建完成后,g变成了 _Gdead,然后执行函数的入口地址跟参数, 初始化sp,等等存储一份,调用后并存储sched并将状态变为_Grunnable
  • 给新创建的goroutine分配id,并将g塞入本地队列或者插入p.runnext(插队),满了则放入p.runq末尾,p.runq也满了则同时将p的runq通过runqputslow函数
  • 分一半放入全局队列