select
select
select是一种可以同时监听多个管道状态的结构,它的语法跟switch类似
import (
"context"
"log/slog"
"os"
"os/signal"
"time"
)
func main() {
finished := make(chan struct{})
ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
defer stop()
slog.Info("running")
go func() {
time.Sleep(time.Second * 2)
finished <- struct{}{}
}()
select {
case <-ctx.Done():
slog.Info("shutting down")
case <-finished:
slog.Info("finished")
}
}
这段代码通过将context,管道,select三者结合使用实现了一个简单的程序平滑退出的逻辑,代码中select同时监听着ctx.Done
和finished
两个管道,它退出的条件有两个,一是操作系统发送退出信号,二是finished
管道有消息可以读取即用户代码任务完成,这样我们就可以在程序退出时做收尾工作。
众所周知,select有两个非常重要的特性,一是非阻塞,在管道的发送与接收的源代码中可以看到对于select做了一些处理,可以在非阻塞的情况下判断管道是否可用,二是随机化,如果有多个管道可用的话它会随机选一个来执行,不遵守既定的顺序可以让每个管道都相对公平地得到执行,否则在极端情况下一些管道可能永远也不会被处理。因为它的工作全部都跟管道有关,所以先建议阅读chan这篇文章,了解了管道后再来了解select会畅通很多。
结构
运行时只有一个runtime.scase
结构体表示select的分支,每一个case
的运行时表示就是scase
。
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
}
其中的c
指的是管道,elem
表示接收或发送元素的指针,实际上select关键字指的是runtime.selectgo
函数。
原理
select的使用方式被go分成了四种情况来进行优化,这一点可以在cmd/compile/internal/walk.walkSelectCases
函数中看到对于这四种情况的处理逻辑。
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
ncas := len(cases)
sellineno := base.Pos
// optimization: zero-case select
if ncas == 0 {
return []ir.Node{mkcallstmt("block")}
}
// optimization: one-case select: single op.
if ncas == 1 {
...
}
// optimization: two-case select but one is default: single non-blocking op.
if ncas == 2 && dflt != nil {
...
}
...
return init
}
优化
编译器会对前三种情况进行优化,第一种情况是case数量为0时即一个空的select,我们都知道空的select语句会造成当前协程永久阻塞。
select{}
之所以会阻塞是因为编译器将其翻译成了对runtime.block
函数的直接调用
func block() {
gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // forever
}
而block
函数又调用了runtime.gopark
函数,使得当前协程变为_Gwaitting
状态,并进入永久阻塞,再也不会得到调度。
第二种情况,只有一个case且不是default,这种情况编译器会直接将其翻译成对管道的收发操作,并且还是阻塞式的,例如下面的这种代码
func main() {
ch := make(chan int)
select {
case <-ch:
// do something
}
}
它会被翻译成对runtime.chanrecv1
函数的直接调用,从汇编代码中就可以看出来
TEXT main.main(SB), ABIInternal, $2
...
LEAQ type:chan int(SB), AX
XORL BX, BX
PCDATA $1, $0
CALL runtime.makechan(SB)
XORL BX, BX
NOP
CALL runtime.chanrecv1(SB)
ADDQ $16, SP
POPQ BP
...
在只有一个case的情况下对管道进行发送数据也是同样的道理,它会被翻译成对runtime.chansend1
函数的直接调用,同样也是阻塞式的。
第三种情况,有两个case且其中一个是default
func main() {
ch := make(chan int)
select {
case ch <- 1:
// do something
default:
// do something
}
}
这种情况会将其翻译成一个对runtime.selectnbsend
调用的if
语句,如下
if selectnbsend(ch, 1) {
// do something
} else {
// do something
}
如果是接收管道数据就会翻译成对runtime.selectnbrecv
的调用
ch := make(chan int)
select {
case x, ok := <-ch:
// do something
default:
// do something
}
if selected, ok = selectnbrecv(&v, c); selected {
// do something
} else {
// do something
}
指的是注意的是,这种情况下对管道的接收或发送是非阻塞式的,我们可以很明显的看到其中的blcok
参数为false
。
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}
而不论是对管道发送或接收数据,在blcok
为false
时都有一个快速路径可以在不加锁的情况下判断是否可以发送或接收数据,正如下所示
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
return true, false
}
...
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if !block && c.closed == 0 && full(c) {
return false
}
...
}
对于读取管道时,如果管道是空的就会直接返回,对于写管道时,如果管道未关闭且已经满了也会直接返回,在一般情况下它们是会造成协程阻塞的,但是与select结合使用就不会。
处理
上面三种只是对特殊情况的优化,正常使用的select关键字会被翻译成对runtime.selectgo
函数的调用,它的处理逻辑长达400多行。
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)
编译器会将所有的case语句收集一个scase
数组,然后传递给selectgo
函数,处理完成后返回两个返回值
- 第一个是随机选取的管道下标,表示哪一个管道被处理了,没有的话返回-1
- 第二个表示对于读管道操作而言是否成功读取
这里简单解释下其参数
cas0
,scase
数组的头部指针,前半部分存放的是写管道case,后半部分存放的读管道case,以nsends
来区分order0
,它的长度是scase
数组的两倍,前半部分分配给pollorder
数组,后半部分分配给lockorder
数组nsends
和nrecvs
表示读/写管道case的数量,两者之和就是case的总数block
表示是否阻塞,如果有default
case就代表非阻塞,其值为true
,否则为true
。pc0
,指向一个[ncases]uintptr
的数组头部,用于竞态分析,后面可以忽略它,对于理解select而言没什么帮助
假设有下面的代码
func main() {
ch := make(chan int)
select {
case ch <- 1:
println(1)
case ch <- 2:
println(2)
case ch <- 3:
println(3)
case ch <- 4:
println(4)
default:
println("default")
}
}
查看其汇编形式,这里为了方便理解省去了部分代码
0x0000 00000 TEXT main.main(SB), ABIInterna
...
0x0023 00035 CALL runtime.makechan(SB)
0x0028 00040 MOVQ $1, main..autotmp_2+72(SP) // 1 2 3 4几个临时变量
0x0031 00049 MOVQ $2, main..autotmp_4+64(SP)
0x003a 00058 MOVQ $3, main..autotmp_6+56(SP)
0x0043 00067 MOVQ $4, main..autotmp_8+48(SP)
...
0x00c8 00200 CALL runtime.selectgo(SB) // 调用runtime.selectgo函数
0x00cd 00205 TESTQ AX, AX
0x00d0 00208 JLT 352 // 跳转到default分支
0x00d6 00214 PCDATA $1, $-1
0x00d6 00214 JEQ 320 // 跳转到分支4
0x00d8 00216 CMPQ AX, $1
0x00dc 00220 JEQ 288 // 跳转到分支3
0x00de 00222 NOP
0x00e0 00224 CMPQ AX, $2
0x00e4 00228 JNE 258 // 跳转到分支2
0x00e6 00230 PCDATA $1, $0
0x00e6 00230 CALL runtime.printlock(SB)
0x00eb 00235 MOVL $3, AX
0x00f0 00240 CALL runtime.printint(SB)
0x00f5 00245 CALL runtime.printnl(SB)
0x00fa 00250 CALL runtime.printunlock(SB)
0x00ff 00255 NOP
0x0100 00256 JMP 379
0x0102 00258 CALL runtime.printlock(SB)
0x0107 00263 MOVL $4, AX
0x010c 00268 CALL runtime.printint(SB)
0x0111 00273 CALL runtime.printnl(SB)
0x0116 00278 CALL runtime.printunlock(SB)
0x011b 00283 JMP 379
0x011d 00285 NOP
0x0120 00288 CALL runtime.printlock(SB)
0x0125 00293 MOVL $2, AX
0x012a 00298 CALL runtime.printint(SB)
0x012f 00303 CALL runtime.printnl(SB)
0x0134 00308 CALL runtime.printunlock(SB)
0x0139 00313 JMP 379
0x013b 00315 NOP
0x0140 00320 CALL runtime.printlock(SB)
0x0145 00325 MOVL $1, AX
0x014a 00330 CALL runtime.printint(SB)
0x014f 00335 CALL runtime.printnl(SB)
0x0154 00340 CALL runtime.printunlock(SB)
0x0159 00345 JMP 379
0x015b 00347 NOP
0x0160 00352 CALL runtime.printlock(SB)
0x0165 00357 LEAQ go:string."default\n"(SB)
0x016c 00364 MOVL $8, BX
0x0171 00369 CALL runtime.printstring(SB)
0x0176 00374 CALL runtime.printunlock(SB)
0x017b 00379 PCDATA $1, $-1
0x017b 00379 ADDQ $160, SP
0x0182 00386 POPQ BP
0x0183 00387 RET
可以看到在调用selectgo
函数后是有一个判断+跳转逻辑存在的,通过这些我们不难反推出其原来的样子
casei, ok := runtime.selectgo()
if casei == -1 {
println("default")
} else if casei == 3 {
println(4)
} else if casei == 2 {
println(3)
} else if casei == 1 {
println(2)
} else {
println(1)
}
编译器生成的实际代码可能和这个有出入,但大致意思是差不多的。所以编译器会在调用完selectgo
函数后同时使用if
语句来判断轮到哪一个管道被执行,并且在调用之前,编译器还会生成一个for循环来收集scase
数组不过这里省略掉了。
在知晓了外部是如何使用selectgo
函数以后,下面就来了解selectgo
函数内部是如何工作的。它首先会初始化几个数组,nsends+nrecvs
表示case的总数,从下面的代码也可以看出case数量的最大值也就是1 << 16
,pollorder
决定了管道的执行顺序,lockorder
决定了管道的锁定顺序。
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// 它的长度是scase数组的两倍,前半部分分配给pollorder数组,后半部分分配给lockorder数组。
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]
接下来初始化pollorder
数组,它存放的是待执行管道的sacses
数组下标
norder := 0
for i := range scases {
cas := &scases[i]
// Omit cases without channels from the poll and lock orders.
if cas.c == nil {
cas.elem = nil // allow GC
continue
}
j := fastrandn(uint32(norder + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]
它会遍历整个scases
数组,然后通过runtime.fastrandn
生成[0, i]之间的随机数,再将它与i
交换,过程中会跳过管道为nil
的case,遍历完成后就得到了一个元素被打乱了的pollorder
数组,如下图所示
然后对pollorder
数组根据管道的地址大小使用堆排序就得到了lockorder
数组,再调用runtime.sellock
按照顺序将其上锁
func sellock(scases []scase, lockorder []uint16) {
var c *hchan
for _, o := range lockorder {
c0 := scases[o].c
if c0 != c {
c = c0
lock(&c.lock)
}
}
}
这里值得注意的是,对管道按照地址大小排序是为了避免死锁,因为select操作本身不需要锁允许并发。假设按照pollorder
随机顺序加锁,那么考虑下面代码的情况
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)
poll := func() {
select {
case ch1 <- 1:
println(1)
case ch2 <- 2:
println(2)
case ch3 <- 3:
println(3)
case ch4 <- 4:
println(4)
default:
println("default")
}
}
// A
go poll()
// B
go poll()
// C
go poll()
三个协程ABC都走到了加锁这一步骤,并且它们彼此加锁顺序都是随机的互不相同,有可能造成这样一种情况,如下图所示
假设ABC加锁顺序跟上图一样,那么造成死锁的可能性就非常大,比如A会先持有ch2
的锁,然后去尝试获取ch1
的锁,但假设ch1
已经被协程B锁住了,协程B又会去尝试获取ch2
的锁,那么这样就造成了死锁。
如果所有协程都按照同样的顺序加锁,就不会发送死锁问题,这也是lockorder
要按照地址大小来进行排序的根本原因。
上完锁之后,就开始了真正的处理阶段,首先遍历pollorder
数组,按照之前打乱的顺序访问管道,逐个遍历找到一个可用的管道
for _, casei := range pollorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
if casi >= nsends { // 读管道
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
}
} else { // 写管道
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
if c.qcount < c.dataqsiz {
goto bufsend
}
}
}
可以看到这里对读/写管道做了6种情况的处理,下面分别进行讲解。第一种情况,读取管道且有发送方正在等待发送,这里会走到runtime.recv
函数,其作用已经讲过了,它最终会唤醒发送方协程,再唤醒之前回调函数会将全部管道解锁。
recv:
// can receive from sleeping sender (sg)
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
recvOK = true
goto retc
第二种情况,读取管道,没有发送方正在等待,缓冲区元素数量大于0,这里会直接从缓冲区中读取数据,其逻辑跟runtime.chanrecv
中完全一致,然后解锁。
bufrecv:
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc
第三种情况,读取管道,但管道已经关闭了,且缓冲区中没有剩余元素,这里会先解锁然后直接返回。
rclose:
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
goto retc
第四种情况,向已关闭的管道发送数据,这里会先解锁然后panic
,
sclose:
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
第五种情况,有接收方正在阻塞等待,这里会调用runitme.send
函数,并最终唤醒接收方协程,在唤醒之前回调函数会将全部管道解锁。
send:
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
goto retc
第六种情况,没有接收方协程等待,将要发送的数据放入缓冲区,然后解锁。
bufsend:
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc
然后上面所有情况最后都会走入retc
这个分支,而它要做的只有返回选中的管道下标casi
以及代表着是否读取成功的recvOk
。
retc:
return casi, recvOK
第七种情况,没有找到可用的管道,且代码中包含default
分支,则解锁管道然后直接返回,这里返回的casi
为-1即表示没有可用的管道。
if !block {
selunlock(scases, lockorder)
casi = -1
goto retc
}
最后一种情况,没有找到可用的管道,且代码中不包含default
分支,那么当前协程会陷入阻塞状态,在这之前selectgo
会将当前协程加入所有监听管道的recvq/sendq
队列中
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
sg.elem = cas.elem
sg.releasetime = 0
sg.c = c
*nextp = sg
nextp = &sg.waitlink
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}
这里会将创建若干个sudog
并将其和对应的管道链接起来,如下图所示
然后由runtime.gopark
阻塞,在阻塞前会将管道解锁,解锁的工作由runtime.selparkcommit
函数完成,它被作为回调函数传入了gopark
中。
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = false
被唤醒后的第一件事情就是解除sudog
与管道的链接
sellock(scases, lockorder)
gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil
casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil
然后将sudog
从之前管道的等待队列中移除
for _, casei := range lockorder {
k = &scases[casei]
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
casi = int(casei)
cas = k
caseSuccess = sglist.success
if sglist.releasetime > 0 {
caseReleaseTime = sglist.releasetime
}
} else {
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
在上面的过程中一定会找到一个唤醒方协程所处理的管道,然后根据caseSuccess
来做出最后的处理。对于写操作而言,sg.success
为false
代表管道已经关闭了,而且整个go运行时也只有close
函数会主动将该字段设置为false
,这表明当前协程是唤醒方通过close
函数唤醒的。对于读操作而言,如果是被发送方唤醒的,数据读取操作也早在被唤醒前由发送方通过runtime.send
函数完成了,其值为true
,如果是被close
函数唤醒的,跟前面一样都是直接返回。
c = cas.c
if casi < nsends {
if !caseSuccess {
goto sclose
}
} else {
recvOK = caseSuccess
}
selunlock(scases, lockorder)
goto retc
到此整个select的逻辑都大致理清楚了,上面分了好几种情况,可见select处理起来还是比较复杂的。