channel详解
chan须知
在golang中,chan用于多个go中的数据交互,分有buff和无buff,定义如下,无buf会阻塞发送端以及接收端,知道buf里数据被取走发送端才会解除阻塞
有buf则在buf满之前只阻塞接收端,下面我们取看看底层数据结构,以及各种情况go如何调度
make(chan int,1) //无buff
make(chan int,10) //有buff
我们先来找下chan在底层的定义,通过dlv找到创建chan的具体实现是runtime.makechan
//chan.go
package main
func main(){
_ = make(chan int,1)
}
#通过dlv找到创建chan go具体的代码
root@node1:~/work/go/demo# dlv debug chan.go
Type 'help' for list of commands.
(dlv) b main.main
Breakpoint 1 set at 0x460dc6 for main.main() ./chan.go:3
(dlv) c
> main.main() ./chan.go:3 (hits goroutine(1):1 total:1) (PC: 0x460dc6)
1: package main
2:
=> 3: func main(){
4: _ = make(chan int,1)
5: }
(dlv) disass
TEXT main.main(SB) /root/work/go/demo/chan.go
chan.go:3 0x460dc0 493b6610 cmp rsp, qword ptr [r14+0x10]
chan.go:3 0x460dc4 7629 jbe 0x460def
=> chan.go:3 0x460dc6* 4883ec18 sub rsp, 0x18
chan.go:3 0x460dca 48896c2410 mov qword ptr [rsp+0x10], rbp
chan.go:3 0x460dcf 488d6c2410 lea rbp, ptr [rsp+0x10]
chan.go:4 0x460dd4 488d0585440000 lea rax, ptr [rip+0x4485]
chan.go:4 0x460ddb bb01000000 mov ebx, 0x1
chan.go:4 0x460de0 e87b38faff call $runtime.makechan
chan.go:5 0x460de5 488b6c2410 mov rbp, qword ptr [rsp+0x10]
chan.go:5 0x460dea 4883c418 add rsp, 0x18
chan.go:5 0x460dee c3 ret
chan.go:3 0x460def e84cccffff call $runtime.morestack_noctxt
chan.go:3 0x460df4 ebca jmp $main.main
#通过不断输入si跳转到runtime.makechan后 找到文件位置如下
##> runtime.makechan() /usr/local/go/src/runtime/chan.go:72 (PC: 0x404660)
下面我们一起来看看具体的数据结构
channel 数据结构
//可以简单理解成环形buf, 具体结构看下图
type hchan struct {
//整个chan <-放了多少数据
qcount uint // total data in the queue
//总共可以放多少数据
dataqsiz uint // size of the circular queue
//存放数据的数组
buf unsafe.Pointer // points to an array of dataqsiz elements
//能够收发元素的大小
elemsize uint16
closed uint32
//make 里chan的类型
elemtype *_type // element type
//发送处理到的index
sendx uint // send index
//接收buf的index
recvx uint // receive index
//被阻塞的接收g列表,双向链表
recvq waitq // list of recv waiters
//被阻塞的发送g列表
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
这段数据结构还挺简单的,主要就是一个环形数组,我们继续看make初始化方法
//篇幅有限,删除掉了一堆校验
func makechan(t *chantype, size int) *hchan {
elem := t.elem
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
var c *hchan
switch {
//mem为0,对应我们make(chan type,0)这种情况,仅分配一段内存
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
//存储的类型不是指针类型,分配一段内存空间
//补充 elem.ptrdata对应是该结构指针截止的长度位置
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
//默认分配内存
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
return c
}
初始分配就是分配好hchan的结构体,接下来我们看发送和接收数据。
channel 发送数据
在上文中的代码我们改成如下结果,根据dlv分析得到执行函数未runtime.chansend1,下面我们看看具体的代码,由于代码比较复杂,我们先看流程图,后根据每种情况
看具体代码
//chan.go
package main
func main(){
chan1 := make(chan int,10)
chan1 <- 1
chan1 <- 2
}
#寻找发送数据代码位置
> main.main() ./chan.go:5 (PC: 0x458591)
Warning: debugging optimized function
chan.go:4 0x458574 488d05654b0000 lea rax, ptr [rip+0x4b65]
chan.go:4 0x45857b bb0a000000 mov ebx, 0xa
chan.go:4 0x458580 e8fbb6faff call $runtime.makechan
chan.go:4 0x458585 4889442410 mov qword ptr [rsp+0x10], rax
chan.go:5 0x45858a 488d1d2f410200 lea rbx, ptr [rip+0x2412f]
=> chan.go:5 0x458591 e8cab8faff call $runtime.chansend1
chan.go:6 0x458596 488b442410 mov rax, qword ptr [rsp+0x10]
chan.go:6 0x45859b 488d1d26410200 lea rbx, ptr [rip+0x24126]
chan.go:6 0x4585a2 e8b9b8faff call $runtime.chansend1
chan.go:7 0x4585a7 488b6c2418 mov rbp, qword ptr [rsp+0x18]
chan.go:7 0x4585ac 4883c420 add rsp, 0x20
# runtime.chansend1() /usr/local/go/src/runtime/chan.go:144 (PC: 0x403e60)
# chan 发送数据流程图
+-----------+
| put data |
+-----------+
^
| buf没满
|
+--------------+ chan not nil +-------------------+ +------------+ not sudog +-----------+ buf满了 +-----------------------+ +---------------------+
| channel | --------------> | non blocking | --> | lock | -----------> | chan | -------> | pack sudog && enqueue | --> | go park && schedule |
+--------------+ +-------------------+ +------------+ +-----------+ +-----------------------+ +---------------------+
| | |
| chan is nil | is close && full | get sudog
v v v
+--------------+ +-------------------+ +------------+ +-----------+ +-----------------------+
| gopark | | return false | | send | -----------> | copy data | -------> | goready |
+--------------+ +-------------------+ +------------+ +-----------+ +-----------------------+
由上图得知,我们chan <- i会遇到几种情况
- 如果recvq上存在被阻塞的g,则会直接讲数据发给当前g,设置成下一个运行的g
- 如果chan buf没满,直接将数据存到buf中
- 如果都不满足,创建一个sudog,并将其加入chan的sendq队列,当前g陷入阻塞等待chan的数据被接收 下面我们再结合代码看下具体的实现
首先判断chan为空,永久阻塞,
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//c为空且为阻塞,park住等待接收端接收
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
然后判断非阻塞且未被关闭且buf满了,返回false,然后加锁
if !block && c.closed == 0 && full(c) {
return false
}
lock(&c.lock)
然后判断hchan.waitq上是否有等待的g,也就是代码里<-chan的g,如果拿到了g,则执行send
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
dequeue()是拿g的逻辑,waitq是一个sudog的双向链表,存了first,last的sudog,sudog我们在runtime系列里会有一节详解,
func (q *waitq) dequeue() *sudog {
for {
sgp := q.first
if sgp == nil {
return nil
}
y := sgp.next
if y == nil {
q.first = nil
q.last = nil
} else {
y.prev = nil
q.first = y
sgp.next = nil // mark as removed (see dequeueSudoG)
}
if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) {
continue
}
return sgp
}
}
下面我们再来看看send的逻辑,sendDirect将chan里的数据拷贝给g,同时通过goready唤醒对应取数据的g
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
当waitq上没有g在等待的时候,如果buf没满,将数据放到buf上继续
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
当buf满了,阻塞当前这个发送的g,并通过acquireSudog获取一个sudog绑定上当前的g,放入sendq的g队列中,同时gopack,等待调度重新唤醒,直到接收了数据后,
将sudog通过releaseSudog释放回池
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
//获取sudog
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
chan接收数据
我们还是通过dlv找到接收数据具体调用为 CALL runtime.chanrecv1(SB),下面我们来看看具体流程,流程图如下
+------------------+
| recv |
+------------------+
block ^
+-----------------------------------+ | sendq.dequeue()
| v |
+----------------+ chan not nil +--------+ not block && not recv +------+ +--------------------+ +------------------+ buf为空 +-----------------------+ +-------------------+
| channelrecv | --------------> | block? | -----------------------> | lock | --> | chan close? | --> | not close | -------> | pack sudog && enqueue | --> | go park && schedule |
+----------------+ +--------+ +------+ +--------------------+ +------------------+ +-----------------------+ +---------------------+
| | |
| chan is nil | close && qcount=0 | buf不为空
v v v
+----------------+ +--------------------+ +------------------+
| gopark forever | | return | | copy value |
+----------------+ +--------------------+ +------------------+
下面让我们来看看具体代码,与发送重复的代码就省略了
先加锁,如果chan已经被关闭了且没有多余的数据了,直接返回,未关闭则先从sendq上取一个sudog,上文我们提到满了的时候则会有g通过sudog被gopark在sendq上
所以优先查看是否有被gopark的发送sudog,在recv方法里去ready,recv主要是唤醒发送的发送sudog,跟上文的send方法类似
lock(&c.lock)
if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
当没没有send的sudog时候,判断buf是不是为空,不为空则取出数据,buf为空的情况下如果不需要阻塞,直接解锁返回
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
后面跟我们的send逻辑类似.获取一个sudog,添加进hchan.recvq里,等待chansend里逻辑唤醒,唤醒后通过releaseSudog放回sudog
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success