gmp
gmp
go语言最大的特点之一就是它对于并发的天然支持,仅需一个关键字就可以开启一个协程,就像下面例子所演示的一样。
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
fmt.Println("hello world!")
}()
go func() {
defer wg.Done()
fmt.Println("hello world too!")
}()
wg.Wait()
}
go语言的协程使用起来如此的简单,对于开发者来说几乎不需要做什么额外的工作,这也是它受欢迎的原因之一。不过在简单的背后,是一个并不简单的并发调度器在支撑着这一切,它的名字相信各位或多或少都应该听说过,因为其主要参与者分别由G(协程),M(系统线程),P(处理器)这三个成员组成,所以也被称为GMP调度器。GMP调度器的设计影响着整个go语言运行时的设计,GC,网络轮询器,可以说它就是整个语言最核心的一块,如果对它能够有一定的了解,在日后的工作中说不定会有些许帮助。
历史
Go语言的并发调度模型并发不是完全原创的,它吸收了很多前人的经验和教训,经过不断发展和改进才有了现在的样子。它借鉴过的语言有下面这些:
- Occam -1983
- Erlang - 1986
- Newsqueak - 1988
- Concurrent ML - 1993
- Alef - 1995
- Limbo - 1996
影响最为巨大的还是霍尔在1978年发表了一篇关于CSP(Communicate Sequential Process)的论文,该论文的基本思想是进程与进程之间通过通信来进行数据的交换。在上面的几门编程语言中无一不受到了CSP思想的影响,Erlang就是最为典型的一个面向消息的编程语言,著名开源消息队列中间件RabbitMQ就是采用Erlang编写的。到了现如今,随着的计算机和互联网的发展,并发支持几乎已经成为了一个现代语言的标配,结合了CSP思想的go语言便应运而生。
调度模型
首先来简单的介绍下GMP成的三个成员
- G,Goroutine,指的是go语言中的协程
- M,Machine,指是系统线程或者叫工作线程(worker thread),由操作系统来负责调度
- P,Processor,并非指CPU处理器,是go自己抽象的一个概念,指的是工作在系统线程上的处理器,通过它来调度每一个系统线程上的协程。
协程就是一种更加轻量的线程,规模更小,所需的资源也会更少,创建和销毁和调度的时机都是由go语言运行时来完成,而并非操作系统,所以它的管理成本要比线程低很多。不过协程也是依附于线程的,协程执行所需的时间片来自于线程,线程所需的时间片来自于操作系统,而不同的线程间的切换是有一定成本的,如何让协程利用好线程的时间片就是设计的关键所在。
1:N
解决问题的最好办法就是忽略这个问题,既然线程切换是有成本的,那直接不切换就行了。将所有的协程都分配到一个内核线程上,这样就只涉及到了协程间的切换。
线程与协程间的关系是1:N
,这样做有一个非常明显的缺点,当今时代的计算机几乎都是多核CPU,这样的分配无法利用充分利用多核CPU的性能。
N:N
另一种方法,一个线程对应一个协程,一个协程可以享受该线程的所有时间片,多个线程也可以利用好多核CPU的性能。但是,线程的创建和切换成本是比较高的,如果是一比一的关系,反而没有利用好协程的轻量这一优势。
M:N
M个线程对应N个协程,且M小于N。多个线程对应多个协程,每一个线程都会对应若干个协程,由处理器P来负责调度协程G如何使用线程的时间片。这种方法是相对而言比较好的一种,也是Go一直沿用至今的调度模型。
M只有与处理器P关联后才能执行任务,go会创建GOMAXPROCS
个处理器,所以真正能够用于执行任务的线程数量就是GOMAXPROCS
个,它的默认值是当前机器上的CPU逻辑核数,我们也可以手动去设置它的值。
通过代码修改
runtime.GOMAXPROCS(N)
,并且可以在运行时动态调整,调用后直接STW。设置环境变量
export GOMAXPROCS=N
,静态。
在实际情况下,M的数量会大于P的数量,因为在运行时会需要它们去处理其它的任务,比如一些系统调用,最大值是10000。
GMP这三个参与者以及调度器本身在运行时都有其对应的类型表示,它们都位于runtime/runtime2.go
文件中,下面会对其结构进行简单的介绍,方便在后面进行理解。
G
G在运行时的表现类型是runtime.g
结构体,是调度模型中最基本的调度单元,其结构如下所示,为了方便理解,删去了不少的字段。
type g struct {
stack stack // offset known to runtime/cgo
_panic *_panic // innermost panic - offset known to liblink
_defer *_defer // innermost defer
m *m // current m; offset known to arm liblink
sched gobuf
goid uint64
waitsince int64 // approx time when the g become blocked
waitreason waitReason // if status==Gwaiting
atomicstatus atomic.Uint32
preempt bool // preemption signal, duplicates stackguard0 = stackpreempt
startpc uintptr // pc of goroutine function
parentGoid uint64 // goid of goroutine that created this goroutine
waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
}
第一个字段就是属于该协程的栈的内存起始地址和结束地址
type stack struct {
lo uintptr
hi uintptr
}
_panic
和_defer
是分别指向panic
栈和defer
栈的指针
_panic *_panic // innermost panic - offset known to liblink
_defer *_defer // innermost defer
m
正在执行当前g的协程
m *m // current m; offset known to arm liblink
preempt
表示当前协程是否需要被抢占,等价于g.stackguard0 = stackpreempt
preempt bool // preemption signal, duplicates stackguard0 = stackpreempt
atomicstatus
用于存储协程G的状态值,它有以下可选的值
名称 | 描述 |
---|---|
_Gidle | 刚被分配,且未被初始化 |
_Grunnable | 表示当前协程可以运行,位于等待队列中 |
_Grunning | 表示当前协程正在执行用户代码 |
_Gsyscall | 被分配了一个M,用于执行系统调用, |
_Gwaiting | 协程阻塞,阻塞的原因见下文 |
_Gdead | 表示当前协程未被使用,可能刚刚退出,也可能刚刚初始化 |
_Gcopystack | 表示协程栈正在移动,在此期间不执行用户代码,也不位于等待队列中 |
_Gpreempted | 阻塞自身进入抢占,等待被抢占方唤醒 |
_Gscan | GC正在扫描协程栈空间,可以其它状态共存 |
sched
用于存储协程上下文信息用于恢复协程的执行现场,可以看到里面存储着sp
,pc
,ret
指针。
type gobuf struct {
sp uintptr
pc uintptr
g guintptr
ctxt unsafe.Pointer
ret uintptr
lr uintptr
bp uintptr // for framepointer-enabled architectures
}
waiting
表示当前协程正在等待的协程,waitsince
记录了协程发生阻塞的时刻,waitreason
表示协程阻塞的原因,可选的值如下。
var waitReasonStrings = [...]string{
waitReasonZero: "",
waitReasonGCAssistMarking: "GC assist marking",
waitReasonIOWait: "IO wait",
waitReasonChanReceiveNilChan: "chan receive (nil chan)",
waitReasonChanSendNilChan: "chan send (nil chan)",
waitReasonDumpingHeap: "dumping heap",
waitReasonGarbageCollection: "garbage collection",
waitReasonGarbageCollectionScan: "garbage collection scan",
waitReasonPanicWait: "panicwait",
waitReasonSelect: "select",
waitReasonSelectNoCases: "select (no cases)",
waitReasonGCAssistWait: "GC assist wait",
waitReasonGCSweepWait: "GC sweep wait",
waitReasonGCScavengeWait: "GC scavenge wait",
waitReasonChanReceive: "chan receive",
waitReasonChanSend: "chan send",
waitReasonFinalizerWait: "finalizer wait",
waitReasonForceGCIdle: "force gc (idle)",
waitReasonSemacquire: "semacquire",
waitReasonSleep: "sleep",
waitReasonSyncCondWait: "sync.Cond.Wait",
waitReasonSyncMutexLock: "sync.Mutex.Lock",
waitReasonSyncRWMutexRLock: "sync.RWMutex.RLock",
waitReasonSyncRWMutexLock: "sync.RWMutex.Lock",
waitReasonTraceReaderBlocked: "trace reader (blocked)",
waitReasonWaitForGCCycle: "wait for GC cycle",
waitReasonGCWorkerIdle: "GC worker (idle)",
waitReasonGCWorkerActive: "GC worker (active)",
waitReasonPreempted: "preempted",
waitReasonDebugCall: "debug call",
waitReasonGCMarkTermination: "GC mark termination",
waitReasonStoppingTheWorld: "stopping the world",
}
goid
和parentGoid
表示当前协程和父协程的唯一标识,startpc
表示当前协程入口函数的地址。
M
M
在运行时表现为runtime.m
结构体,是对工作线程的抽象
type m struct {
id int64
g0 *g // goroutine with scheduling stack
curg *g // current running goroutine
gsignal *g // signal-handling g
goSigStack gsignalStack // Go-allocated signal handling stack
p puintptr // attached p for executing go code (nil if not executing go code)
nextp puintptr
oldp puintptr // the p that was attached before executing a syscall
mallocing int32
throwing throwType
preemptoff string // if != "", keep curg running on this m
locks int32
dying int32
spinning bool // m is out of work and is actively looking for work
tls [tlsSlots]uintptr
...
}
同样的,M内部的字段也有很多,这里仅介绍部分字段方便理解。
id
,M的唯一标识符g0
,拥有调度栈的协程curg
,正在工作线程上运行的用户协程gsignal
,负责处理线程信号的协程goSigStack
,go分配的用于信号处理的栈空间p
,处理器P的地址,oldp
指向在执行系统调用前的P,nextp
指向新分配的Pmallocing
,用于表示当前是否正在分配新的内存空间throwing
,表示当M发生的错误类型preemptoff
,抢占标识符,当它为空串时表示当前正在运行的协程可以被抢占locks
,表示当前M"锁"的数量,不为0时禁止抢占dying
,表示M发生了无法挽回的panic
,有[0,3]
四个可选值,从低到高表示严重程度。spinning
,表示M正处于空闲状态,并且随时可用。tls
,线程本地存储
P
P在运行时由runtime.p
表示,负责调度M与G之间的工作,其结构如下所示
type p struct {
id int32
status uint32 // one of pidle/prunning/...
schedtick uint32 // incremented on every scheduler call
syscalltick uint32 // incremented on every system call
sysmontick sysmontick // last tick observed by sysmon
m muintptr // back-link to associated m (nil if idle)
// Queue of runnable goroutines. Accessed without lock.
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr
// Available G's (status == Gdead)
gFree struct {
gList
n int32
}
// preempt is set to indicate that this P should be enter the
// scheduler ASAP (regardless of what G is running on it).
preempt bool
...
}
status
表示P的状态,有以下几个可选值
值 | 描述 |
---|---|
_Pidle | P位于空闲状态,可以被调度器分配M,也有可能只是在其它状态间转换 |
_Prunning | P与M关联,并且正在执行用户代码 |
_Psyscall | 表示与P关联的M正在进行系统调用,在此期间P可能会被其它的M抢占 |
_Pgcstop | 表示P因GC而停止 |
_Pdead | P的大部分资源都被剥夺,将不再会被使用 |
下面几个字段记录了P中的runq
本地队列,可以看到本地队列的最大数量是256,超过此数量后G会被放到全局队列中去。
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext
表示下一个可用的G
runnext guintptr
其它的几个字段释义如下
id
,P的唯一标识符schedtick
,随着协程调度次数的增加而增加,在runtime.execute
函数中可见。syscalltick
,随着系统调用的次数增加而增加sysmontick
,记录了上一次被系统监控观察的信息m
,与P关联的MgFree
,空闲的G列表preempt
,表示P应该再次进入调度
全局队列的信息则存放在runtime.schedt
结构体中,是调度器在运行时的表示形式,如下。
type schedt struct {
...
midle muintptr // idle m's waiting for work
ngsys atomic.Int32 // number of system goroutines
pidle puintptr // idle p's
// Global runnable queue.
runq gQueue
runqsize int32
...
}
初始化
调度器的初始化位于go程序的引导阶段,负责引导go程序的就是runtime.rt0_go
函数,它由汇编实现位于文件runtime/asm_*.s
中,部分代码如下
TEXT runtime·rt0_go(SB),NOSPLIT|NOFRAME|TOPFRAME,$0
...
...
CALL runtime·check(SB)
MOVL 24(SP), AX // copy argc
MOVL AX, 0(SP)
MOVQ 32(SP), AX // copy argv
MOVQ AX, 8(SP)
CALL runtime·args(SB)
CALL runtime·osinit(SB)
CALL runtime·schedinit(SB)
// create a new goroutine to start program
MOVQ $runtime·mainPC(SB), AX // entry
PUSHQ AX
CALL runtime·newproc(SB)
POPQ AX
// start this M
CALL runtime·mstart(SB)
CALL runtime·abort(SB) // mstart should never return
RET
可以通过下面两行可以看到对runtime·osinit
和runtime·schedinit
的调用。
CALL runtime·osinit(SB)
CALL runtime·schedinit(SB)
前者负责初始化操作系统相关的工作,后者负责调度器的初始化,也就是runtime·schedinit
函数。它会在程序启动时负责初始化调度器运行所需的资源,下面是简化后的代码。
func schedinit() {
...
gp := getg()
sched.maxmcount = 10000
// The world starts stopped.
worldStopped()
...
stackinit()
mallocinit()
mcommoninit(gp.m, -1)
lock(&sched.lock)
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
unlock(&sched.lock)
...
// World is effectively started now, as P's can run.
worldStarted()
...
}
runtime.getg
函数由汇编实现,它的功能是获取到当前协程的运行时表示,也就是runtime.g
结构体的指针。通过sched.maxmcount = 10000
可以看到,在调度器初始化的时候就设置了M的最大数量为10000,这个值是固定的且没法修改。再之后就是初始化堆栈,然后才是runtime.mcommoninit
函数来初始化M,其函数实现如下
func mcommoninit(mp *m, id int64) {
gp := getg()
// g0 stack won't make sense for user (and is not necessary unwindable).
if gp != gp.m.g0 {
callers(1, mp.createstack[:])
}
lock(&sched.lock)
if id >= 0 {
mp.id = id
} else {
mp.id = mReserveID()
}
...
mpreinit(mp)
if mp.gsignal != nil {
mp.gsignal.stackguard1 = mp.gsignal.stack.lo + stackGuard
}
// Add to allm so garbage collector doesn't free g->m
// when it is just in a register or thread-local storage.
mp.alllink = allm
// NumCgoCall() iterates over allm w/o schedlock,
// so we need to publish it safely.
atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
unlock(&sched.lock)
...
}
该函数对M进行预初始化,主要做了以下工作
- 分配M的id
- 单独分配一个G用来处理线程信号,由
runtime.mpreinit
函数完成 - 将其作为全局M链表
runtime.allm
的头结点
接下来初始化P,其数量默认是CPU的逻辑核数,其次是环境变量的值。
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
最后由runtime.procresize
函数来负责初始化P,它会根据传入的数量来修改runtime.allp
这个存放所有P的全局切片。首先根据数量大小判断是否需要扩容
if nprocs > int32(len(allp)) {
// Synchronize with retake, which could be running
// concurrently since it doesn't run on a P.
lock(&allpLock)
if nprocs <= int32(cap(allp)) {
allp = allp[:nprocs]
} else {
nallp := make([]*p, nprocs)
// Copy everything up to allp's cap so we
// never lose old allocated Ps.
copy(nallp, allp[:cap(allp)])
allp = nallp
}
unlock(&allpLock)
}
然后再初始化每一个P
// initialize new P's
for i := old; i < nprocs; i++ {
pp := allp[i]
if pp == nil {
pp = new(p)
}
pp.init(i)
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
如果当前协程正在使用的P需要被销毁,则将其替换为allp[0]
,由runtime.acquirep
函数来完成M与新P的关联。
gp := getg()
if gp.m.p != 0 && gp.m.p.ptr().id < nprocs {
gp.m.p.ptr().status = _Prunning
gp.m.p.ptr().mcache.prepareForSweep()
} else {
if gp.m.p != 0 {
gp.m.p.ptr().m = 0
}
gp.m.p = 0
pp := allp[0]
pp.m = 0
pp.status = _Pidle
acquirep(pp)
}
随后销毁不再需要的P,在销毁时会释放P的所有资源,将其本地队列中所有的G放入全局队列中,销毁完毕然后对allp
进行切片。
// release resources from unused P's
for i := nprocs; i < old; i++ {
pp := allp[i]
pp.destroy()
// can't free P itself because it can be referenced by an M in syscall
}
// Trim allp.
if int32(len(allp)) != nprocs {
lock(&allpLock)
allp = allp[:nprocs]
unlock(&allpLock)
}
最后将空闲的P链接成一个链表,并最终返回链表的头结点
var runnablePs *p
for i := nprocs - 1; i >= 0; i-- {
pp := allp[i]
if gp.m.p.ptr() == pp {
continue
}
pp.status = _Pidle
if runqempty(pp) {
pidleput(pp, now)
} else {
pp.m.set(mget())
pp.link.set(runnablePs)
runnablePs = pp
}
}
return runnablePs
再之后,调度器初始化完毕,由runtime.worldStarted
将所有的P恢复运行。
MOVQ $runtime·mainPC(SB), AX // entry
PUSHQ AX
CALL runtime·newproc(SB)
POPQ AX
// start this M
CALL runtime·mstart(SB)
然后会通过runtime.newproc
函数创建一个新的协程来启动go程序,随后调用runtime.mstart
来正式启动调度器的运行,它同样是由汇编实现,其内部会调用runtime.mstart0
函数进行创建,该函数部分代码如下
gp := getg()
osStack := gp.stack.lo == 0
if osStack {
size := gp.stack.hi
if size == 0 {
size = 16384 * sys.StackGuardMultiplier
}
gp.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
gp.stack.lo = gp.stack.hi - size + 1024
}
gp.stackguard0 = gp.stack.lo + stackGuard
gp.stackguard1 = gp.stackguard0
mstart1()
此时的M只有拥有一个协程g0
,该协程使用线程的系统栈,并非单独分配的栈空间。mstart0
函数会先初始化G的栈边界,然后交给mstart1
去完成剩下的初始化工作。
gp := getg()
gp.sched.g = guintptr(unsafe.Pointer(gp))
gp.sched.pc = getcallerpc()
gp.sched.sp = getcallersp()
asminit()
minit()
if gp.m == &m0 {
mstartm0()
}
if fn := gp.m.mstartfn; fn != nil {
fn()
}
if gp.m != &m0 {
acquirep(gp.m.nextp.ptr())
gp.m.nextp = 0
}
schedule()
在开始之前,首先会记录当前的执行现场,因为初始化成功之后就会进入调度循环并且永远也不会返回,其它的调用可以复用执行现场从mstart1
函数返回达到退出线程的目的。记录完毕后,由runtime.asminit
和runtime.minit
两个函数负责初始化系统栈,然后由runtime.mstartm0
函数设置用于处理信号的回调。执行回调函数m.mstartfn
后 ,runtime.acquirep
函数将M与先前创建好的P进行关联,最后进入调度循环。
这里调用的runtime.schudule
是整个go运行时的第一轮调度循环,代表着调度器正式开始工作。
线程
在调度器中,G想要执行用户代码要依靠P,而P要正常工作必须要与一个M关联,M指的就是系统线程。
创建
M的创建是由函数runtime.newm
完成的,它接受一个函数和P以及id作为参数,作为参数的函数不能闭包。
func newm(fn func(), pp *p, id int64) {
acquirem()
mp := allocm(pp, fn, id)
mp.nextp.set(pp)
mp.sigmask = initSigmask
newm1(mp)
releasem(getg().m)
}
在开始前,newm
会先调用runtime.allocm
函数来创建线程的运行时表示也就是M,在过程中会使用runtime.mcommoninit
函数来初始化M的栈边界。
func allocm(pp *p, fn func(), id int64) *m {
allocmLock.rlock()
// The caller owns pp, but we may borrow (i.e., acquirep) it. We must
// disable preemption to ensure it is not stolen, which would make the
// caller lose ownership.
acquirem()
gp := getg()
if gp.m.p == 0 {
acquirep(pp) // temporarily borrow p for mallocs in this function
}
mp := new(m)
mp.mstartfn = fn
mcommoninit(mp, id)
mp.g0.m = mp
releasem(gp.m)
allocmLock.runlock()
return mp
}
再之后由runtime.newm1
调用runtime.newosproc
函数来完成真正的系统线程的创建。
func newm1(mp *m) {
execLock.rlock()
newosproc(mp)
execLock.runlock()
}
runtim.newosproc
的实现会根据操作系统的不同而不同,具体怎么创建就不是我们要关心的事了,由操作系统负责,然后由runtime.mstart
来启动M的工作。
退出
runtime.gogo(&mp.g0.sched)
在初始的时候提到过,在调用mstart1
函数时将执行现场保存在了 g0
的sched
字段中,将该字段传给runtime.gogo
函数(汇编实现)就可以让线程跳到执行现场继续执行,在保存的时候用的是getcallerpc()
,所以恢复现场的时候是回到了mstar0
函数。
mstart1()
if mStackIsSystemAllocated() {
osStack = true
}
mexit(osStack)
执行现场恢复后,按照执行顺序就会进入mexit
函数来退出线程。
mp := getg().m
unminit()
lock(&sched.lock)
for pprev := &allm; *pprev != nil; pprev = &(*pprev).alllink {
if *pprev == mp {
*pprev = mp.alllink
}
}
mp.freeWait.Store(freeMWait)
mp.freelink = sched.freem
sched.freem = mp
unlock(&sched.lock)
handoffp(releasep())
mdestroy(mp)
exitThread(&mp.freeWait)
它总共做了以下几个主要的事情
- 调用
runtime.uminit
来撤销runtime.minit
的工作 - 从全局变量
allm
中删除该M - 将调度器的
freem
指向当前M - 由
runtime.releasep
将P与当前M解绑,并由runtime.handoffp
让P跟其它的M绑定以继续工作 - 由
runtime.destroy
负责销毁M的资源 - 最后由操作系统来退出线程
到此M就成功退出了。
暂停
当因为调度器调度,GC,系统调用等原因需要暂停M时,就会调用runtime.stopm
函数来暂停线程,下面是简化后的代码。
func stopm() {
gp := getg()
lock(&sched.lock)
mput(gp.m)
unlock(&sched.lock)
mPark()
acquirep(gp.m.nextp.ptr())
gp.m.nextp = 0
}
它首先会将M放入全局的空闲M列表中,然后由mPark()
将当前线程阻塞在notesleep(&gp.m.park)
这里,当被唤醒过后该函数就会返回
func mPark() {
gp := getg()
notesleep(&gp.m.park)
noteclear(&gp.m.park)
}
唤醒后的M会去寻找一个P进行绑定从而继续执行任务。
协程
协程的生命周期刚好对应着协程的几个状态,了解协程的生命周期对了解调度器会很有帮助,毕竟整个调度器就是围绕着协程来设计的,整个协程的生命周期就如下图所示。
_Gcopystack
是协程栈扩张时具有的状态,在协程栈部分进行讲解。
创建
协程的创建从语法层面上来讲只需要一个go
关键字加一个函数。
go doSomething()
编译后会变成runtime.newproc
函数的调用
func newproc(fn *funcval) {
gp := getg()
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, gp, pc)
pp := getg().m.p.ptr()
runqput(pp, newg, true)
if mainStarted {
wakep()
}
})
}
由runtime.newproc1
来完成实际的创建,在创建时首先会锁住M,禁止抢占,然后会去P的本地gfree
列表中寻找空闲的G来重复利用,如果找不到就由runtime.malg
创建一个新的G,并为其分配2kb的栈空间。此时G的状态为_Gdead
。
mp := acquirem() // disable preemption because we hold M and P in local vars.
pp := mp.p.ptr()
newg := gfget(pp)
if newg == nil {
newg = malg(stackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
在go1.18及以后,参数的拷贝不再由newproc1
函数完成,在这之前,会使用runtime.memmove
来拷贝函数的参数。现在的话只是负责重置协程的栈空间,将runtime.goexit
作为栈底由它来进行协程的退出处理,然后设置入口函数的PCnewg.startpc = fn.fn
表示从这里开始执行,设置完成后,此时G的状态为_Grunnable
。
totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frame
totalSize = alignUp(totalSize, sys.StackAlign)
sp := newg.stack.hi - totalSize
spArg := sp
if usesLR {
// caller's LR
*(*uintptr)(unsafe.Pointer(sp)) = 0
prepGoExitFrame(sp)
spArg += sys.MinFrameSize
}
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.parentGoid = callergp.goid
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
casgstatus(newg, _Gdead, _Grunnable)
最后设置G的唯一标识符,然后释放M,返回创建的协程G。
newg.goid = pp.goidcache
pp.goidcache++
releasem(mp)
return newg
协程创建完毕后,会由runtime.runqput
函数尝试将其放入P的本地队列中,如果放不下就放到全局队列中。在协程创建的整个过程中,其状态变化首先是由_Gidle
变为_Gdead
,设置好入口函数后由_Gdead
变为_Grunnable
。
退出
在创建的时候go就已经将runtime.goexit
函数作为协程的栈底,那么当协程执行完毕后最终就会走入该函数,经过调用链goexit->goexit1->goexit0
,最终由runtime.goexit0
来负责协程的退出工作。
func goexit0(gp *g) {
mp := getg().m
pp := mp.p.ptr()
...
casgstatus(gp, _Grunning, _Gdead)
...
gp.m = nil
locked := gp.lockedm != 0
gp.lockedm = 0
mp.lockedg = 0
gp.preemptStop = false
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = waitReasonZero
gp.param = nil
gp.labels = nil
gp.timer = nil
dropg()
...
gfput(pp, gp)
...
schedule()
}
该函数主要做了下面几个事情
- 设置状态为
_Gdead
- 重置字段值
dropg()
切断M与G之间的关联gfput(pp, gp)
将当前G放入P的本地空闲列表中schedule()
进行新一轮调度,将M的执行权让给其它的G
退出后,协程的状态由_Grunning
变化为_Gdead
,在以后新建协程时仍有可能被重复利用。
系统调用
当协程G在执行用户代码时如果进行了系统调用,触发系统调用的方法有两种
syscall
标准库的系统调用- cgo调用
因为系统调用会阻塞工作线程,所以在此之前需要进行准备工作,由runtime.entersyscall
函数完成这一过程,但前者也只是对runtime.reentersyscall
函数的一个简单调用,实际的工作是由后者来完成的。首先会锁住当前的M,在进行准备期间G禁止被抢占,也禁止栈扩张,设置gp.stackguard0 = stackPreempt
表示在准备工作完成后将P的执行权将被其它的G抢占,然后保留协程的执行现场,方便系统调用返回后恢复。
gp := getg()
// Disable preemption because during this function g is in Gsyscall status,
// but can have inconsistent g->sched, do not let GC observe it.
gp.m.locks++
// Entersyscall must not call any function that might split/grow the stack.
// (See details in comment above.)
// Catch calls that might, by replacing the stack guard with something that
// will trip any stack check and leaving a flag to tell newstack to die.
gp.stackguard0 = stackPreempt
gp.throwsplit = true
// Leave SP around for GC and traceback.
save(pc, sp)
gp.syscallsp = sp
gp.syscallpc = pc
再之后,因为了防止长时间阻塞而影响其它G的执行,M与P会解绑,解绑后的M和G会因执行系统调用而阻塞,而P在解绑以后可能会与其它空闲的M绑定从而让P本地队列中其它的G能够继续工作。
casgstatus(gp, _Grunning, _Gsyscall)
gp.m.syscalltick = gp.m.p.ptr().syscalltick
pp := gp.m.p.ptr()
pp.m = 0
gp.m.oldp.set(pp)
gp.m.p = 0
atomic.Store(&pp.status, _Psyscall)
gp.m.locks--
在准备工作完成后,释放M的锁,在此期间G的状态由_Grunning
变为_Gsyscall
,P的状态变为_Psyscall
。
当系统调用返回后,线程M不再阻塞,对应的G也需要再次被调度来执行用户代码,由runtime.exitsyscall
函数来完成这个善后的工作。首先锁住当前的M,获取旧P的引用。
gp := getg()
gp.waitsince = 0
oldp := gp.m.oldp.ptr()
gp.m.oldp = 0
此时分为两种情况来处理,第一种情况是是否有P可以直接使用,runtime.exitsyscallfast
函数会判断原来的P是否可用,即P的状态是否为_Psyscall
,否则的话就会去找空闲的P。
func exitsyscallfast(oldp *p) bool {
gp := getg()
// Freezetheworld sets stopwait but does not retake P's.
if sched.stopwait == freezeStopWait {
return false
}
// Try to re-acquire the last P.
if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) {
// There's a cpu for us, so we can run.
wirep(oldp)
exitsyscallfast_reacquired()
return true
}
// Try to get any other idle P.
if sched.pidle != 0 {
var ok bool
systemstack(func() {
ok = exitsyscallfast_pidle()
})
if ok {
return true
}
}
return false
}
如果成功找到了可用的P,M会与P进行绑定,G由_Gsyscall
状态切换为_Grunning
状态,然后通过runtime.Gosched
G主动让出执行权,P进入调度循环寻找其它可用的G。
oldp := gp.m.oldp.ptr()
gp.m.oldp = 0
if exitsyscallfast(oldp) {
// There's a cpu for us, so we can run.
gp.m.p.ptr().syscalltick++
// We need to cas the status and scan before resuming...
casgstatus(gp, _Gsyscall, _Grunning)
// Garbage collector isn't running (since we are),
// so okay to clear syscallsp.
gp.syscallsp = 0
gp.m.locks--
if gp.preempt {
// restore the preemption request in case we've cleared it in newstack
gp.stackguard0 = stackPreempt
} else {
// otherwise restore the real stackGuard, we've spoiled it in entersyscall/entersyscallblock
gp.stackguard0 = gp.stack.lo + stackGuard
}
gp.throwsplit = false
if sched.disable.user && !schedEnabled(gp) {
// Scheduling of this goroutine is disabled.
Gosched()
}
return
}
假如没有找到的话,M会与G解绑,G由_Gsyscall
切换为_Grunnable
状态,然后再次尝试是否能找到空闲的P,如果没有找到就直接将G放入全局队列中,然后进入新一轮调度循环,旧M由runtime.stopm
进入空闲状态,等待日后的新任务。如果P找到了的话,旧M和G与新的P进行关联,然后继续执行用户代码,状态由_Grunnable
变为_Grunning
。
func exitsyscall0(gp *g) {
casgstatus(gp, _Gsyscall, _Grunnable)
dropg()
lock(&sched.lock)
var pp *p
if schedEnabled(gp) {
pp, _ = pidleget(0)
}
var locked bool
if pp == nil {
globrunqput(gp)
}
unlock(&sched.lock)
if pp != nil {
acquirep(pp)
execute(gp, false) // Never returns.
}
stopm()
schedule() // Never returns.
}
在退出系统调用后,G的状态最终有两种结果,一种是等待被调度的_Grunnable
,一种是继续运行的_Grunning
。
挂起
当前协程因为一些原因挂起的时候,状态会由_Grunnable
变为_Gwaiting
,挂起的原因有很多,可以是因为通道阻塞,select
,锁或者是time.sleep
,更多原因见G结构。拿time.Sleep
举例,它实际上是链接到了runtime.timesleep
,后者的代码如下。
func timeSleep(ns int64) {
if ns <= 0 {
return
}
gp := getg()
t := gp.timer
if t == nil {
t = new(timer)
gp.timer = t
}
t.f = goroutineReady
t.arg = gp
t.nextwhen = nanotime() + ns
if t.nextwhen < 0 { // check for overflow.
t.nextwhen = maxWhen
}
gopark(resetForSleep, unsafe.Pointer(t), waitReasonSleep, traceBlockSleep, 1)
}
可以看出,它通过getg
获取当前的协程,再通过runtime.gopark
使得当前的协程挂起。runtime.gopark
会更新G和M的阻塞原因,释放M的锁
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waitTraceBlockReason = traceReason
mp.waitTraceSkip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)
然后切换到系统栈上由runtime.park_m
来将G的状态切换为_Gwaiting
,然后切断M与G之间的关联并进入新的调度循环从而将执行权让给其它的G。挂起后,G即不执行用户代码,也不在本地队列中,只是保持着对M和P的引用。
mp := getg().m
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
schedule()
在runtime.timesleep
函数中有这么一行代码,指定了t.f
的值
t.f = goroutineReady
这个runtime.goroutineReady
函数的作用就是用于唤醒挂起的协程,它会调用runtime.ready
函数来唤醒协程
status := readgstatus(gp)
// Mark runnable.
mp := acquirem()
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(mp.p.ptr(), gp, next)
wakep()
releasem(mp)
唤醒后,将G的状态切换为_Grunnable
,然后将G放入P的本地队列中等待日后被调度。
协程栈
go语言中的协程是典型的有栈协程,每开启一个协程都会为其在堆上分配一个独立的栈空间,并且它会随着使用量的变化而增长或缩小。在调度器初始化的时候,runtime.stackinit
函数负责来初始化全局的栈空间缓存stackpool
和stackLarge
。
func stackinit() {
if _StackCacheSize&_PageMask != 0 {
throw("cache size must be a multiple of page size")
}
for i := range stackpool {
stackpool[i].item.span.init()
lockInit(&stackpool[i].item.mu, lockRankStackpool)
}
for i := range stackLarge.free {
stackLarge.free[i].init()
lockInit(&stackLarge.lock, lockRankStackLarge)
}
}
除此之外,每一个P都有一个自己独立的栈空间缓存mcache
type p struct {
...
mcache *mcache
...
}
type mcache struct {
_ sys.NotInHeap
nextSample uintptr
scanAlloc uintptr
tiny uintptr
tinyoffset uintptr
tinyAllocs uintptr
alloc [numSpanClasses]*mspan
stackcache [_NumStackOrders]stackfreelist
flushGen atomic.Uint32
}
线程缓存mcache
是每一个线程独立的且不是分配在堆内存上,访问的时候不需要加锁,这三个栈缓存在后续分配空间的时候都会用到。
分配
在新建协程时,如果没有可重复利用的协程,就会选择为其分配一个新的栈空间,它的大小默认为2KB。
newg := gfget(pp)
if newg == nil {
newg = malg(stackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
负责分配栈空间的函数就是runtime.stackalloc
func stackalloc(n uint32) stack
根据申请的栈内存大小是否小于32KB分为两种情况,32KB同时也是go中判断是小对象还是大对象的标准。倘若小于这个值就会从stackpool
缓存中去获取,当M与P绑定且M不允许被抢占时,就会去本地的线程缓存中获取。
if n < fixedStack<<_NumStackOrders && n < _StackCacheSize {
order := uint8(0)
n2 := n
for n2 > fixedStack {
order++
n2 >>= 1
}
var x gclinkptr
if stackNoCache != 0 || thisg.m.p == 0 || thisg.m.preemptoff != "" {
lock(&stackpool[order].item.mu)
x = stackpoolalloc(order)
unlock(&stackpool[order].item.mu)
} else {
c := thisg.m.p.ptr().mcache
x = c.stackcache[order].list
if x.ptr() == nil {
stackcacherefill(c, order)
x = c.stackcache[order].list
}
c.stackcache[order].list = x.ptr().next
c.stackcache[order].size -= uintptr(n)
}
v = unsafe.Pointer(x)
}
倘若大于32KB,就会去stackLarge
缓存中获取,如果还不够的话就直接在堆上分配内存。
else {
var s *mspan
npage := uintptr(n) >> _PageShift
log2npage := stacklog2(npage)
// Try to get a stack from the large stack cache.
lock(&stackLarge.lock)
if !stackLarge.free[log2npage].isEmpty() {
s = stackLarge.free[log2npage].first
stackLarge.free[log2npage].remove(s)
}
unlock(&stackLarge.lock)
lockWithRankMayAcquire(&mheap_.lock, lockRankMheap)
if s == nil {
// Allocate a new stack from the heap.
s = mheap_.allocManual(npage, spanAllocStack)
if s == nil {
throw("out of memory")
}
osStackAlloc(s)
s.elemsize = uintptr(n)
}
v = unsafe.Pointer(s.base())
}
完事后返回栈空间的低地址和高地址
return stack{uintptr(v), uintptr(v) + uintptr(n)}
扩容
默认的协程栈大小为2KB,足够轻量,所以创建一个协程的成本非常低,但这不一定够用,当栈空间不够用的时候就需要扩容。编译器会在函数的开头插入runtime.morestack
函数来检查当前协程是否需要进行栈扩容,如果需要的话就调用runtime.newstack
来完成真正的扩容操作。
提示
由于morestack
几乎会在所有函数的开头都被插入,所以栈扩容检查的时机也是一个协程抢占点。
thisg := getg()
gp := thisg.m.curg
// Allocate a bigger segment and move the stack.
oldsize := gp.stack.hi - gp.stack.lo
newsize := oldsize * 2
// The goroutine must be executing in order to call newstack,
// so it must be Grunning (or Gscanrunning).
casgstatus(gp, _Grunning, _Gcopystack)
// The concurrent GC will not scan the stack while we are doing the copy since
// the gp is in a Gcopystack status.
copystack(gp, newsize)
casgstatus(gp, _Gcopystack, _Grunning)
gogo(&gp.sched)
可以看到,计算后的栈空间容量是原来的两倍,由runtime.copystack
函数来完成栈拷贝的工作,在拷贝前G的状态由_Grunning
切换为_Gcopystack
。
func copystack(gp *g, newsize uintptr) {
old := gp.stack
used := old.hi - gp.sched.sp
// allocate new stack
new := stackalloc(uint32(newsize))
// Compute adjustment.
var adjinfo adjustinfo
adjinfo.old = old
adjinfo.delta = new.hi - old.hi
// Copy the stack (or the rest of it) to the new location
memmove(unsafe.Pointer(new.hi-ncopy), unsafe.Pointer(old.hi-ncopy), ncopy)
// Adjust remaining structures that have pointers into stacks.
// We have to do most of these before we traceback the new
// stack because gentraceback uses them.
adjustctxt(gp, &adjinfo)
adjustdefers(gp, &adjinfo)
adjustpanics(gp, &adjinfo)
if adjinfo.sghi != 0 {
adjinfo.sghi += adjinfo.delta
}
// Swap out old stack for new one
gp.stack = new
gp.stackguard0 = new.lo + stackGuard // NOTE: might clobber a preempt request
gp.sched.sp = new.hi - used
gp.stktopsp += adjinfo.delta
// Adjust pointers in the new stack.
var u unwinder
for u.init(gp, 0); u.valid(); u.next() {
adjustframe(&u.frame, &adjinfo)
}
stackfree(old)
}
该函数总共做了以下几个工作
- 分配新的栈空间
- 将旧栈内存通过
runtime.memmove
直接复制到新的栈空间中, - 调整包含栈指针的结构,比如defer,panic等
- 更新G的栈空间字段
- 通过
runtime.adjustframe
调整指向旧栈内存的指针 - 释放旧栈的内存
完成后,G的状态由_Gcopystack
切换为_Grunning
,并由runtime.gogo
函数让G继续执行用户代码。正是因为协程栈扩容的存在,所以go中的内存是不稳定的。
收缩
当G的状态为_Grunnable
,_Gsyscall
,_Gwaiting
时,GC会扫描协程栈的内存空间。
func scanstack(gp *g, gcw *gcWork) int64 {
switch readgstatus(gp) &^ _Gscan {
case _Grunnable, _Gsyscall, _Gwaiting:
// ok
}
...
if isShrinkStackSafe(gp) {
// Shrink the stack if not much of it is being used.
shrinkstack(gp)
}
...
}
实际的栈收缩工作由runtime.shrinkstack
来完成。
func shrinkstack(gp *g) {
if !isShrinkStackSafe(gp) {
throw("shrinkstack at bad time")
}
oldsize := gp.stack.hi - gp.stack.lo
newsize := oldsize / 2
if newsize < fixedStack {
return
}
avail := gp.stack.hi - gp.stack.lo
if used := gp.stack.hi - gp.sched.sp + stackNosplit; used >= avail/4 {
return
}
copystack(gp, newsize)
}
当使用的栈空间不足原有的1/4时,就会通过runtime.copystack
将其缩小为原来的1/2,后面的工作跟之前就没什么两样了。
分段栈
从copystack
的过程可以看到,它会将旧栈内存拷贝到一个更大的栈空间上,不管是原来的栈还是新的栈它们的内存地址都是连续的。而在远古时期的go语言中,栈扩容时做法和现在不一样,那会觉得内存拷贝太消耗性能了,采用的是分段栈的思路,如果栈空间内存不够用了,就再申请一片新的栈空间,原有的栈空间内存不会释放也不会被拷贝,彼此之前通过指针链接起来,形成了一个栈链表,这也就是分段栈的由来,就如下图所示
这样做的好处在于不用拷贝原有的栈,但缺点也十分的明显,就是会十分频繁的触发栈扩容和缩容。当栈空间的空闲内存所剩无几时,新的函数调用会触发栈的扩容,当这些函数返回时,不再需要新的栈空间了后就又会触发缩容,假如这些函数调用的频率非常高,那么就会非常频繁的触发扩容和缩容,这种操作所造成的性能损耗是非常大的。
所以在go1.4之后换成了连续栈,连续栈因为分配了一个容量更大的栈空间,不会出现已使用内存达到临界值时因函数调用而频繁的触发扩缩容这种情况,并且由于内存地址是连续的,根据缓存的空间局部性原理而言,连续栈对CPU缓存也更加友好。
调度循环
在调度器初始化部分提到过,在runtime.mstart1
函数中,M与P成功关联后就会进入第一个runtime.schedule
调度循环正式开始调度G以执行用户代码。在调度循环中,这一部分就主要是P在发挥作用。M对应着系统线程,G对应着入口函数也就是用户代码,但P并不像M和G一样有着对应的实体,它只是一个抽象的概念,作为中间人处理着M和G之间的关系。
func schedule() {
mp := getg().m
top:
pp := mp.p.ptr()
pp.preempt = false
if mp.spinning {
resetspinning()
}
gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available
execute(gp, inheritTime)
}
上面的代码经过简化,删去了许多条件判断,最核心的点只有两个runtime.findRunnable
和runtime.execute
,前者负责找到一个G,并且一定会返回一个可用的G,后者负责让G继续执行用户代码。
对于findRunnable
函数而言,第一个G来源就是P的本地队列
// local runq
if gp, inheritTime := runqget(pp); gp != nil {
return gp, inheritTime, false
}
如果本地队列没有G,那么就尝试从全局队列中获取
// global runq
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(pp, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}
如果本地和全局队列中都没有找到,就会尝试从网络轮询器中获取
if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if traceEnabled() {
traceGoUnpark(gp, 0)
}
return gp, false, false
}
}
倘若还找不到,最终就会从其它的P去偷取它本地队列中的G。在创建协程的时候提到过,P的本地队列中的G一大来源是当前协程派生的子协程,然而并非所有的协程都会创建子协程,这样就可能会出现一部分P非常忙碌,另一部分P是空闲的,这会导致一种情况,有的G因为一直在等待而迟迟无法被运行,而另一边的P十分清闲,什么事也没有。为了能够压榨所有的P,让它们发挥最大的工作效率,当P找不到G的时候,就会去其它P的本地队列中“偷取”能够执行的G,这样一来,每一个P都能拥有较为均匀的G队列,就很少会出现P与P之间隔岸观火的情况了。
gp, inheritTime, tnow, w, newWork := stealWork(now)
if gp != nil {
// Successfully stole.
return gp, inheritTime, false
}
runtime.stealWork
会随机选一个P来进行偷取,真正的偷取工作由runtime.runqgrab
函数来完成,它会尝试偷取该P本地队列一半的G。
for {
h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with other consumers
t := atomic.LoadAcq(&pp.runqtail) // load-acquire, synchronize with the producer
n := t - h
n = n - n/2
if n > uint32(len(pp.runq)/2) { // read inconsistent h and t
continue
}
for i := uint32(0); i < n; i++ {
g := pp.runq[(h+i)%uint32(len(pp.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
if atomic.CasRel(&pp.runqhead, h, h+n) { // cas-release, commits consume
return n
}
}
整个偷取工作会进行四次,如果四次后也偷不到G那么就返回。如果最终无法找到,当前M会被runtime.stopm
给暂停,直到被唤醒后继续重复上面的步骤。当找到并返回了一个G后,就将其交给runtime.execute
来运行G。
mp := getg().m
mp.curg = gp
gp.m = mp
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + stackGuard
gogo(&gp.sched)
首先更新M的curg
,然后更新G的状态为_Grunning
,最后交给runtime.gogo
来恢复G的运行。
总的的来说,在调度循环中G的来源根据优先级来分有四个
- P的本地队列
- 全局队列
- 网络轮询器
- 从其它P的本地队列偷
runtime.execute
在执行过后并不会返回,刚获取的G也不会永远执行下去,在某一个时机触发调度以后,它的执行权会被剥夺,然后进入新一轮调度循环,将执行权让给其它的G。
调度策略
不同的G执行用户代码的时长可能不同,部分G可能会耗时很长,部分G耗时很短,执行时机长的G可能会导致其它的G迟迟无法得到执行,所以交替执行G才是正确的方式,在操作系统中这种工作方式被称之为并发。
协作式调度
协作式调度的基本思路是,让G自行将执行权让给其它的G,主要有两种方法。
第一种方法就是在用户代码中主动让权,go提供了runtime.Gosched()
函数,使用者可以自己决定在何时让出执行权,不过在很多时候调度器内部的工作细节都使用者来说都是一个黑盒,很难去判断到底什么时候该主动让权,对使用者的要求比较高,并且go的调度器力求对使用者屏蔽大部分细节,追求更简单的使用方法,在这种情况下让使用者也参与到调度工作中并非是什么好事。
第二种方法是抢占标记,虽然它的名字有抢占的字眼,但它本质上还是协作式调度策略。思路就是在函数的头部插入抢占检测代码runtime.morestack()
,插入的过程在编译期完成,前面提到过它原本是用于进行栈扩容检测的函数,因为其检测点是每一个函数的调用,这同样是一个进行抢占检测的良好时机。runtime.newstack
函数上半部分都是在进行抢占检测,下半部分则在进行栈扩容检测,前面为了避免干扰就将这部分省掉了,现在就来看看这部分干了什么。首先会根据gp.stackguard0
进行抢占判断,如果不需要的话就继续执行用户代码。
stackguard0 := atomic.Loaduintptr(&gp.stackguard0)
preempt := stackguard0 == stackPreempt
if preempt {
if !canPreemptM(thisg.m) {
gp.stackguard0 = gp.stack.lo + stackGuard
gogo(&gp.sched) // never return
}
}
当g.stackguard0 == stackPreempt
时,由runtime.canPreemptM()
函数来判断协程条件是否需要被抢占,代码如下,
func canPreemptM(mp *m) bool {
return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning
}
可以看到能够被抢占需要满足四个条件
- M没有被锁住
- 没有正在分配内存
- 没有禁用抢占
- P处于
_Prunning
状态
而在以下两种种情况会将g.stackguard0
设置为stackPreempt
- 需要进行垃圾回收时
- 发生系统调用时
if preempt {
if gp.preemptShrink {
gp.preemptShrink = false
shrinkstack(gp)
}
// Act like goroutine called runtime.Gosched.
gopreempt_m(gp) // never return
}
最后就会走到runtime.gopreempt_m()
主动让出当前协程的执行权。首先切断M与G之间的联系,状态变为_Grunnbale
,然后将G放入全局队列中,最后进入调度循环将执行权让给其它的G。
casgstatus(gp, _Grunning, _Grunnable)
dropg()
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)
schedule()
这样一来,所有的协程在进行函数调用时都可能会进入该函数进行抢占检测,这种策略得依赖函数调用这一时机才能触发抢占并且主动让权。在1.14之前,go一直都是沿用的这种调度策略,但这样会有一个问题,如果没有函数调用,就没法检测了,比如下面这段经典代码,应该在很多教程中都有出现
func main() {
// 限制P的数量只能为1
runtime.GOMAXPROCS(1)
// 协程1
go func() {
for {
// 该协程不停的空转
}
}()
// 进入系统调用,主协程让权给其它协程
time.Sleep(time.Millisecond)
println("exit")
}
代码中创建了一个空转的协程1,然后主协程因为系统调用主动让权,此时协程1正在被调度,但因为它根本就不调用函数,也就没法进行抢占检测,由于P只有一个,没有其它空闲的P,这样会导致主协程永远无法被调度,exit
也就永远无法输出,不过这种问题也仅限于go1.14之前。
抢占式调度
官方在go1.14加入了基于信号的抢占式调度策略,这是一种异步抢占策略,通过异步线程发送信号的方式来进行抢占线程,基于信号的抢占式调度目前只有有两个入口,分别是系统监控和GC。
在系统监控的循环中,会遍历每一个P,如果P正在调度的G执行时间超过了10ms,就会强制触发抢占。这部分工作由runtime.retake
函数完成,下面是简化的后代码。
func retake(now int64) uint32 {
n := 0
lock(&allpLock)
for i := 0; i < len(allp); i++ {
pp := allp[i]
if pp == nil {
continue
}
pd := &pp.sysmontick
s := pp.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
t := int64(pp.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
preemptone(pp)
sysretake = true
}
}
}
unlock(&allpLock)
return uint32(n)
}
当需要进行垃圾回收时,如果G是状态是_Grunning
,也就是还在运行,同样也会触发抢占。
func suspendG(gp *g) suspendGState {
for i := 0; ; i++ {
switch s := readgstatus(gp); s {
case _Grunning:
gp.preemptStop = true
gp.preempt = true
gp.stackguard0 = stackPreempt
casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning)
if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync {
now := nanotime()
if now >= nextPreemptM {
nextPreemptM = now + yieldDelay/2
preemptM(asyncM)
}
}
......
......
func preemptM(mp *m) {
if mp.signalPending.CompareAndSwap(0, 1) {
if GOOS == "darwin" || GOOS == "ios" {
pendingPreemptSignals.Add(1)
}
signalM(mp, sigPreempt)
}
}
两个抢占入口最后都会进入runtime.preemptM
函数中,由它来完成抢占信号的发送。当信号成功发送后,在runtime.mstart
时通过runtime.initsig
注册的信号处理器回调函数runtime.sighandler
就会派上用场,如果检测到发送的是抢占信号,就会开始抢占。
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
...
if sig == sigPreempt && debug.asyncpreemptoff == 0 && !delayedSignal {
// Might be a preemption signal.
doSigPreempt(gp, c)
}
...
}
doSigPreempt
会修改目标协程的上下文,注入调用runtime.asyncPreempt
。
func doSigPreempt(gp *g, ctxt *sigctxt) {
// Check if this G wants to be preempted and is safe to
// preempt.
if wantAsyncPreempt(gp) {
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// Adjust the PC and inject a call to asyncPreempt.
ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
}
}
...
这样一来当重新切换回用户代码的时候,目标协程就会走到runtime.asyncPreempt
函数,在该函数中涉及到runtime.asyncPreempt2
的调用。
TEXT ·asyncPreempt(SB),NOSPLIT|NOFRAME,$0-0
PUSHQ BP
MOVQ SP, BP
// Save flags before clobbering them
PUSHFQ
// obj doesn't understand ADD/SUB on SP, but does understand ADJSP
ADJSP $368
// But vet doesn't know ADJSP, so suppress vet stack checking
...
CALL ·asyncPreempt2(SB)
...
RET
它会让当前协程停止工作并进行一轮新的调度循环从而将执行权让给其它协程。
func asyncPreempt2() {
gp := getg()
gp.asyncSafePoint = true
if gp.preemptStop {
mcall(preemptPark)
} else {
mcall(gopreempt_m)
}
gp.asyncSafePoint = false
}
这一过程过程都发生在runtime.asyncPreempt
函数中,它由汇编实现(位于runtime/preempt_*.s
中)并且会在调度完成后恢复先前被修改的协程上下文,以便让该协程在日后能够正常恢复。在采用异步抢占策略以后,之前的例子就不再会永久阻塞主协程了,当空转协程运行一定时间后就会被强制执行调度循环,从而将执行权让给了主协程,最终让程序能够正常结束。
小结
总的来说,触发调度的时机有以下几个:
- 函数调用
- 系统调用
- 系统监控
- 垃圾回收,垃圾回收对于执行时间过长的协程也会进行抢占
- 协程因管道,锁等原因而挂起
调度策略主要就是两大类,协作式和抢占式,协作式是主动让出执行权,抢占式是异步抢占执行权,两者共存才形成了如今的调度器。