channel详解

chan须知

在golang中,chan用于多个go中的数据交互,分有buff和无buff,定义如下,无buf会阻塞发送端以及接收端,知道buf里数据被取走发送端才会解除阻塞
有buf则在buf满之前只阻塞接收端,下面我们取看看底层数据结构,以及各种情况go如何调度

make(chan int,1) //无buff
make(chan int,10) //有buff

我们先来找下chan在底层的定义,通过dlv找到创建chan的具体实现是runtime.makechan

//chan.go
package main
func main(){
    _ = make(chan int,1)
}
#通过dlv找到创建chan go具体的代码
root@node1:~/work/go/demo# dlv debug chan.go 
Type 'help' for list of commands.
(dlv) b main.main
Breakpoint 1 set at 0x460dc6 for main.main() ./chan.go:3
(dlv) c
> main.main() ./chan.go:3 (hits goroutine(1):1 total:1) (PC: 0x460dc6)
     1:	package main
     2:	
=>   3:	func main(){
     4:	    _ = make(chan int,1)
     5:	}
(dlv) disass
TEXT main.main(SB) /root/work/go/demo/chan.go
	chan.go:3	0x460dc0	493b6610	cmp rsp, qword ptr [r14+0x10]
	chan.go:3	0x460dc4	7629		jbe 0x460def
=>	chan.go:3	0x460dc6*	4883ec18	sub rsp, 0x18
	chan.go:3	0x460dca	48896c2410	mov qword ptr [rsp+0x10], rbp
	chan.go:3	0x460dcf	488d6c2410	lea rbp, ptr [rsp+0x10]
	chan.go:4	0x460dd4	488d0585440000	lea rax, ptr [rip+0x4485]
	chan.go:4	0x460ddb	bb01000000	mov ebx, 0x1
	chan.go:4	0x460de0	e87b38faff	call $runtime.makechan
	chan.go:5	0x460de5	488b6c2410	mov rbp, qword ptr [rsp+0x10]
	chan.go:5	0x460dea	4883c418	add rsp, 0x18
	chan.go:5	0x460dee	c3		ret
	chan.go:3	0x460def	e84cccffff	call $runtime.morestack_noctxt
	chan.go:3	0x460df4	ebca		jmp $main.main
#通过不断输入si跳转到runtime.makechan后 找到文件位置如下
##> runtime.makechan() /usr/local/go/src/runtime/chan.go:72 (PC: 0x404660)

下面我们一起来看看具体的数据结构

channel 数据结构

//可以简单理解成环形buf, 具体结构看下图
type hchan struct {
	//整个chan <-放了多少数据
	qcount   uint           // total data in the queue
	//总共可以放多少数据
	dataqsiz uint           // size of the circular queue
	//存放数据的数组
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	//能够收发元素的大小
	elemsize uint16
	closed   uint32
	//make 里chan的类型
	elemtype *_type // element type
	//发送处理到的index
	sendx    uint   // send index
	//接收buf的index
	recvx    uint   // receive index
	//被阻塞的接收g列表,双向链表
	recvq    waitq  // list of recv waiters
	//被阻塞的发送g列表
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

这段数据结构还挺简单的,主要就是一个环形数组,我们继续看make初始化方法

//篇幅有限,删除掉了一堆校验
func makechan(t *chantype, size int) *hchan {
	elem := t.elem
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	
	var c *hchan
	switch {
	//mem为0,对应我们make(chan type,0)这种情况,仅分配一段内存
	case mem == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
    //存储的类型不是指针类型,分配一段内存空间
	//补充 elem.ptrdata对应是该结构指针截止的长度位置
	case elem.ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	//默认分配内存
    default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)
	return c
}

初始分配就是分配好hchan的结构体,接下来我们看发送和接收数据。

channel 发送数据

在上文中的代码我们改成如下结果,根据dlv分析得到执行函数未runtime.chansend1,下面我们看看具体的代码,由于代码比较复杂,我们先看流程图,后根据每种情况
看具体代码

//chan.go
package main
func main(){
	chan1 := make(chan int,10)
	chan1 <- 1
	chan1 <- 2
}
#寻找发送数据代码位置
> main.main() ./chan.go:5 (PC: 0x458591)
Warning: debugging optimized function
	chan.go:4	0x458574	488d05654b0000	lea rax, ptr [rip+0x4b65]
	chan.go:4	0x45857b	bb0a000000	mov ebx, 0xa
	chan.go:4	0x458580	e8fbb6faff	call $runtime.makechan
	chan.go:4	0x458585	4889442410	mov qword ptr [rsp+0x10], rax
	chan.go:5	0x45858a	488d1d2f410200	lea rbx, ptr [rip+0x2412f]
=>	chan.go:5	0x458591	e8cab8faff	call $runtime.chansend1
	chan.go:6	0x458596	488b442410	mov rax, qword ptr [rsp+0x10]
	chan.go:6	0x45859b	488d1d26410200	lea rbx, ptr [rip+0x24126]
	chan.go:6	0x4585a2	e8b9b8faff	call $runtime.chansend1
	chan.go:7	0x4585a7	488b6c2418	mov rbp, qword ptr [rsp+0x18]
	chan.go:7	0x4585ac	4883c420	add rsp, 0x20
# runtime.chansend1() /usr/local/go/src/runtime/chan.go:144 (PC: 0x403e60)
# chan 发送数据流程图
                                                                                       +-----------+
                                                                                       | put data  |
                                                                                       +-----------+
                                                                                         ^
                                                                                         | buf没满
                                                                                         |
+--------------+  chan not nil   +-------------------+     +------------+  not sudog   +-----------+  buf满了   +-----------------------+     +---------------------+
|   channel    | --------------> |   non blocking    | --> |    lock    | -----------> |   chan    | -------> | pack sudog && enqueue | --> | go park && schedule |
+--------------+                 +-------------------+     +------------+              +-----------+          +-----------------------+     +---------------------+
  |                                |                         |
  | chan is nil                    | is close && full        | get sudog
  v                                v                         v
+--------------+                 +-------------------+     +------------+              +-----------+          +-----------------------+
|    gopark    |                 |   return false    |     |    send    | -----------> | copy data | -------> |        goready        |
+--------------+                 +-------------------+     +------------+              +-----------+          +-----------------------+

由上图得知,我们chan <- i会遇到几种情况

  • 如果recvq上存在被阻塞的g,则会直接讲数据发给当前g,设置成下一个运行的g
  • 如果chan buf没满,直接将数据存到buf中
  • 如果都不满足,创建一个sudog,并将其加入chan的sendq队列,当前g陷入阻塞等待chan的数据被接收 下面我们再结合代码看下具体的实现

首先判断chan为空,永久阻塞,

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    //c为空且为阻塞,park住等待接收端接收
	if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

然后判断非阻塞且未被关闭且buf满了,返回false,然后加锁

if !block && c.closed == 0 && full(c) {
    return false
}
lock(&c.lock)

然后判断hchan.waitq上是否有等待的g,也就是代码里<-chan的g,如果拿到了g,则执行send

if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

dequeue()是拿g的逻辑,waitq是一个sudog的双向链表,存了first,last的sudog,sudog我们在runtime系列里会有一节详解,

func (q *waitq) dequeue() *sudog {
	for {
		sgp := q.first
		if sgp == nil {
			return nil
		}
		y := sgp.next
		if y == nil {
			q.first = nil
			q.last = nil
		} else {
			y.prev = nil
			q.first = y
			sgp.next = nil // mark as removed (see dequeueSudoG)
		}
		if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) {
			continue
		}

		return sgp
	}
}

下面我们再来看看send的逻辑,sendDirect将chan里的数据拷贝给g,同时通过goready唤醒对应取数据的g


func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)
}

当waitq上没有g在等待的时候,如果buf没满,将数据放到buf上继续

if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}

当buf满了,阻塞当前这个发送的g,并通过acquireSudog获取一个sudog绑定上当前的g,放入sendq的g队列中,同时gopack,等待调度重新唤醒,直到接收了数据后,
将sudog通过releaseSudog释放回池

// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()
	//获取sudog
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	gp.parkingOnChan.Store(true)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	KeepAlive(ep)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true

chan接收数据

我们还是通过dlv找到接收数据具体调用为 CALL runtime.chanrecv1(SB),下面我们来看看具体流程,流程图如下

                                                                                                               +------------------+
                                                                                                               |       recv       |
                                                                                                               +------------------+
                                              block                                                              ^
                                     +-----------------------------------+                                       | sendq.dequeue()
                                     |                                   v                                       |
+----------------+  chan not nil   +--------+  not block && not recv   +------+     +--------------------+     +------------------+  buf为空   +-----------------------+     +-------------------+
|  channelrecv   | --------------> | block? | -----------------------> | lock | --> |    chan close?     | --> |    not close     | -------> | pack sudog && enqueue | --> | go park && schedule |
+----------------+                 +--------+                          +------+     +--------------------+     +------------------+          +-----------------------+     +---------------------+
  |                                                                                   |                          |
  | chan is nil                                                                       | close && qcount=0        | buf不为空
  v                                                                                   v                          v
+----------------+                                                                  +--------------------+     +------------------+
| gopark forever |                                                                  |       return       |     |    copy value    |
+----------------+                                                                  +--------------------+     +------------------+

下面让我们来看看具体代码,与发送重复的代码就省略了
先加锁,如果chan已经被关闭了且没有多余的数据了,直接返回,未关闭则先从sendq上取一个sudog,上文我们提到满了的时候则会有g通过sudog被gopark在sendq上
所以优先查看是否有被gopark的发送sudog,在recv方法里去ready,recv主要是唤醒发送的发送sudog,跟上文的send方法类似

lock(&c.lock)
	if c.closed != 0 {
		if c.qcount == 0 {
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			unlock(&c.lock)
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	} else {
		if sg := c.sendq.dequeue(); sg != nil {
			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
			return true, true
		}
	}

当没没有send的sudog时候,判断buf是不是为空,不为空则取出数据,buf为空的情况下如果不需要阻塞,直接解锁返回

if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
    if !block {
        unlock(&c.lock)
    return false, false
}

后面跟我们的send逻辑类似.获取一个sudog,添加进hchan.recvq里,等待chansend里逻辑唤醒,唤醒后通过releaseSudog放回sudog

// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	gp.parkingOnChan.Store(true)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success