chan
chan
channel是一个特殊的数据结构,是go语言贯彻CSP思想的典型代表,CSP思想的核心就是进程之间通过消息通信来进行数据的交换,对应的,通过channel我们可以很轻松地在协程之间通信。
import "fmt"
func main() {
done := make(chan struct{})
go func() {
// do something
done <- struct{}{}
}()
<-done
fmt.Println("finished")
}
除了通信之外,通过channel也还可以实现协程同步之类的操作,而在需要并发的系统中,channel的身影几乎随处可见,为了能够更好的理解channel工作方式,下面就会介绍其原理。
结构
channel在运行时对于的表示是runtime.hchan
结构体,它所包含的字段并不多,如下
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
从上面可以很明显的看到lock
字段,channel实际上是一个有锁的同步环形队列,其它的字段介绍如下
qcount
,表示总数据数dataqsize
,环形队列的大小buf
,指向大小为dataqsize
的数组的指针,也就是环形队列closed
,channel是否关闭sendx
,recvx
,表示发送和接收的索引sendq
,recvq
,表示发送和接收的协程链表,其组成元素是runtime.sudog
type waitq struct { first *sudog last *sudog }
通过下面一张图可以很清晰的明白channel的结构
当对channel使用len
和cap
函数时,返回的实际上是它的hchan.qcoun
和hchan.dataqsiz
字段。
创建
正常来说创建管道有且只有一种方式,使用make
函数创建
ch := make(chan int, size)
编译器会将其翻译成对runtime.makechan
函数的调用,由它来负责管道的实际创建,它的代码如下所示。
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
这部分逻辑比较简单,主要是在给管道分配内存,它首先会根据传入的size
和元素类型elem.size
来计算预计需要的内存大小,然后分为三种情况来处理
size
为0,只分配hchanSize
- 元素不包含指针,则分配对应内存大小的空间,并且环形队列的内存与管道的内存是连续的
- 元素包含指针,管道和环形队列的内存单独分配
如果环形队列中存放的是指针元素的话,因为它们引用了外部的元素,GC在标记-清除阶段就可能会扫描这些指针,当存放的是非指针元素时分配在连续的内存上就避免了不必要的扫描。内存分配完毕后,最后再更新其它的一些记录信息的字段。
顺便提一下,当管道容量是64位整数的时候,会使用runtime.makechan64
函数来进行创建,它本质上也是对runtime.makechan
的调用,只是多做了一个类型检查。
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}
一般来说size
都最好不要超过math.MaxInt32
。
发送
向管道发送数据时,我们会将要发送的数据置于箭头的右方
ch <- struct{}{}
编译器会将其翻译成runtime.chansend1
,真正负责发送数据的是runtime.chansend
函数,chansend1
会向其传递elem
指针,它指向发送元素的指针。
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
它首先会检查管道是否为nil
,block
表示当前的发送操作是否是阻塞的(block
的值与select
结构有关),如果阻塞发送且管道是nil
则直接崩溃。在非阻塞发送情况下,会在不加锁的情况下直接判断管道是否满了,如果满了就直接返回。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if !block && c.closed == 0 && full(c) {
return false
}
...
}
随后才开始加锁,并检测管道是否关闭,如果已经关闭了就会panic
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}
再之后从recvq
队列中出队一个sudog
,然后由runtime.send
函数进行发送。
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
send
函数内容如下,它会更新recvx
和sendx
,然后使用runtime.memmove
函数将通信数据的内存直接复制到接收方协程的目标元素地址上,然后通过runtime.goready
函数使接收方协程变为_Grunnable
状态,以便重新参与调度。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
memmove(dst, src, t.Size_)
}
在上面的这个过程中,因为可以找到等待接收的协程,所以数据就直接发送给了接收方,并没有存放在环形队列中,倘若没有可用的接受方协程且容量足够,就会将其放入环形队列缓冲区中,然后直接返回。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}
倘若缓冲区满了,如果是在非阻塞发送的情况下就会直接返回
if !block {
unlock(&c.lock)
return false
}
如果是阻塞发送,则会进入下面的代码流程
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
KeepAlive(ep)
...
}
首先它会将当前的协程构造成sudog
并加入hchan.sendq
等待发送协程队列,然后由runtime.gopark
使当前协程阻塞,变为_Gwaitting
状态直到再次被接收方唤醒,并且在会通过runtime.KeepLAlive
对要发送的数据进行保活来确保接收方成功复制。当被唤醒后就会进入接下来的收尾流程
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
mysg.c = nil
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
releaseSudog(mysg)
return true
}
可以看到的是对于管道发送数据而言总共有下面几种情况
- 管道为
nil
,程序崩溃 - 管道已关闭,发生
panic
revq
队列不为空,直接发送给接收方- 没有协程等待,加入缓冲区
- 缓冲区满了,发送协程进入阻塞状态,等待其它协程接收数据
值得注意的是,在上面发送的逻辑中没有看到对于缓冲区溢出数据的处理,这部分数据不可能抛弃掉,它保存在了sudog.elem
,由接收方来进行处理。
接收
在go中从管道接收数据的语法有两种,第一种是只读取数据
data <- ch
第二种是判断数据是否读取成功
data, ok <- ch
上面两种语法会被编译器翻译成对runtime.chanrecv1
和runtime.chanrecv1
的调用,不过它们实际上只是对runtime.chanrecv
的调用。接收的逻辑开头部分与发送的逻辑类似,都会先对管道判空。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
...
}
然后在非阻塞读取情况下,不加锁判断管道是否为空,如果管道未关闭就直接返回,管道已关闭则清空接收元素的内存。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}
然后加锁访问hchan.sendq
队列,通过下面if c.closed != 0
这个分支可以看到,即便管道已经关闭了,但如果管道中还有元素存在,并不会直接返回,依然会往下执行消费元素的代码,这也是为什么管道关闭后仍然允许读取的原因。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
...
}
如果管道没有关闭,就会查看sendq
队列是否有协程正在等待发送,是的话就由runitme.recv
来处理该发送方协程。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}
第一种情况,管道容量为0即无缓冲管道,接收方会直接通过runtime.recvDirect
函数从发送方复制数据,第二种情况缓冲区已满,虽然在前面并没有看到判断缓冲区是否满了的逻辑,但实际上当缓冲区容量不为0且有发送方等待发送就已经代表了缓冲区已经满了,因为只有缓冲区满了发送方才会阻塞等待发送,这部分逻辑是由发送方来进行判断的。然后接收方会从缓冲区将头部元素出队并将其内存复制到目标接收元素的指针,再将发送方协程要发送的数据复制后并入队(在这里我们就看到了接收方对于溢出缓冲区数据的处理方式),最后会由runtime.goready
去唤醒发送方协程,使其变为_Grunnable
状态,以便重新加入调度。
倘若没有等待发送的协程,就会去查看缓冲区是否有等待消费的元素,将头部元素出队并复制其内存到接收方目标元素,然后返回。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
}
到最后如果没有管道中没有可消费的元素,就会由runtime.gopark
将当前协程变为_Gwwaiting
状态,阻塞等待直到被发送方协程唤醒。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
...
}
被唤醒后,就会返回,此时返回的success
值来自sudog.success
,如果发送方成功发送数据那么该字段应该由发送方设置为true
,这部分逻辑我们可以在runtime.send
函数中看到。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
sg.success = true
goready(gp, skip+1)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
相对应的,在发送方runtime.chansend
末尾对于sudog.success
判断,其来源也是接收方在runtime.recv
函数中的设置,通过这些可以发现接收方和发送方两者相辅相成才能让管道正常运作。总的来说,接收数据要比发送数据稍微复杂一些,总共有以下几种情况
- 管道为
nil
,程序崩溃 - 管道已关闭,如果管道是空的就直接返回,如果不为空则跳到第5个情况执行
- 缓冲区容量为0,
sendq
中有等待发送的协程,则直接复制发送方的数据,然后唤醒发送方。 - 缓冲区满了,
sendq
中有等待发送的协程,将缓冲区头部元素出队,发送方的数据入队,然后唤醒发送方。 - 缓冲区没满且数量不为0,将缓冲区头部元素出队,然后返回。
- 缓冲区是空的,进入阻塞状态,等待被发送方唤醒。
关闭
对于关闭管道而言,我们会使用内置函数close
close(ch)
编译器会将其翻译成对runtime.closechan
的调用,如果传递的管道为nil
或者已关闭,则会直接panic
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
...
}
然后将这个管道的sendq
和recvq
中的所有阻塞的协程都出队并将它们全部都通过runtime.goready
唤醒
func closechan(c *hchan) {
...
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
提示
顺带一提,go允许单向管道,有着下面几个规则
- 只读管道不能发送数据
- 只读管道不能关闭
- 只写管道不能读取数据
这些错误早在编译期的类型检查阶段就会找出来,不会留到运行时,感兴趣可以到下面这两个包阅读相关代码
cmd/compile/internal/types2
cmd/compile/internal/typecheck
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
...
if uch.dir == RecvOnly {
check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
return
}
check.assignment(&val, uch.elem, "send")
判断关闭
在很早的时候(go1之前),有一个内置函数closed
用于判断管道是否关闭,不过后面很快就被删掉了。这是因为管道的使用场景通常都是多协程的情况,假设它返回true
确实可以代表管道已经关闭了,但是如果它返回了false
,那么并不能代表管道就真的没有关闭,因为谁也不知道在下一刻谁会把管道关闭掉,所以这个返回值是不可信的,如果以这个返回值为依据来判断是否向管道发送数据就更是危险了,因为向已关闭的管道发送数据会发生panic
。
如果实在需要,可以自己实现一个。一种方案是通过写管道来判断管道是关闭,代码如下
func closed(ch chan int) (ans bool) {
defer func() {
if err := recover(); err != nil {
ans = true
}
}()
ch <- 0
return ans
}
不过这样也是有副作用的,如果没关闭的话就会向里面写入冗余的数据,而且会进入defer-recover处理过程,造成额外的性能损失,所以写方案可以直接放弃。读方案的话可以考虑,不过不能直接读管道,因为直接读block
参数值为true
将会阻塞协程,应该结合select
来使用,管道与select
结合时就是非阻塞的。
func closed(ch chan int) bool {
select {
case _, received := <-ch:
return !received
default:
return false
}
}
这种只是看起来要比上面好一点点,它的情况仅仅适用于管道已关闭且管道缓冲区中没有元素,如果有元素的话还会平白无故的消费掉这个元素,还是没有一个很好的实现。
但其实我们根本就不需要判断管道是否关闭,理由在开头已经讲过了因为返回值并不可信,正确地使用管道并正确的关闭才是我们应该做的,所以
- 永远不要在接收方关闭管道,关闭只读管道不能通过编译这点已经很明确地告诉你不要这么做了,交给发送方来做这件事。
- 如果有多个发送方,应该单独让一个协程来完成关闭操作,确保
close
只有一方调用且只会调用一次。 - 传递管道时,最好限制只读或只写
遵循这几个原则,就能确保不会出太大的问题。