mutex
mutex
锁是操作系统中的一种重要的同步原语,Go 语言在标准库中提供了互斥锁和读写锁两种实现,分别对应了
sync.Mutex
,互斥锁,读读互斥,读写互斥,写写互斥sync.RWMutex
,读写锁,读读共享,读写互斥,写写互斥
它们的业务使用场景非常常见,用于在并发情况下保护某一片共享内存能够顺序地访问和修改,正如下面的例子
import (
"fmt"
"sync"
)
func main() {
var i int
var wg sync.WaitGroup
var mu sync.Mutex
for range 10 {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
viewI := i
mu.Unlock()
viewI++
mu.Lock()
i = viewI
mu.Unlock()
}()
}
wg.Wait()
fmt.Println(i)
}
如果没有锁的保护,那么该函数每次执行输出的结果都可能不同的,无法预测,显然在大部分场景中我们都不希望发生这样的情况。这个案例对于大部分人来说都很简单,或许你对已经对锁的使用得心应手,但未必了解 Go 语言的锁内部是如何实现的,它本身的代码并不复杂,本文接下来会进行较为详细的讲解。
Locker
在开始之前我们先看一个类型sync.Locker
,它是 Go 定义的一组接口
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
它提供的方法非常的简单易懂,就是加锁和解锁,不过由于 Go 接口实现优于约定的特性,所以大部分人可能都从来没见过它,这里也只是简单提一嘴,因为它确实没那么重要,后面所讲的两个锁也是都实现了该接口。
Mutex
互斥锁Mutex
的类型定义位于sync/mutex.go
文件中,它是一个结构体类型,如下
type Mutex struct {
state int32
sema uint32
}
字段释义如下:
state
,字段表示锁的状态sema
,即信号量 semaphore,有关它的介绍会在后面讲到
先来讲讲这个state
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
)
state
是一个 32 位的整数类型,低 3 位用于表示上面的三种状态,总共有三种状态,这三种状态并非独立的,它们可以共存。
mutexLocked=1
,被锁住mutexWoken=2
,被唤醒mutexStarving=4
,饥饿模式
高 29 位用于表示有多少个协程正在等待锁,所以理论上来说一个互斥锁最多可以被 2^29+1 个协程同时使用,不过现实中不太可能会有这么多的协程,即便每个只占 2KB(初始栈空间大小),创建如此数量的协程所需要的内存空间也要 1TB 左右。
+-----------------------------------+---------------+------------+-------------+
| waiter | mutexStarving | mutexWoken | mutexLocked |
+-----------------------------------+---------------+------------+-------------+
| 29 bits | 1 bit | 1 bit | 1 bit |
+-----------------------------------+---------------+------------+-------------+
互斥锁一共有两种运行模式,一是正常模式,二是饥饿模式。正常模式就是按照协程在阻塞等待队列中到达的顺序来持有锁,即 FIFO,这是一般的情况,也是性能最好的时候,因为大家都按照访问的顺序来持有锁就不会有问题。饥饿模式就是不一般的情况,这个饥饿指的是等待协程长时间无法持有锁而一直处于阻塞状态,并不是说互斥锁是饥饿状态,那么什么时候协程会处于饥饿状态呢?官方给出一个例子,有一个先到的协程,因为无法持有互斥锁而阻塞,后续由于锁的释放被唤醒了,就在这时候来了另一个代码刚刚运行到这块尝试持有锁的协程(喜欢插队的),由于后者正处于运行的状态(正在占用 CPU 时间片),后者能够成功竞争到锁的几率是很高的,并且在极端情况下像这样的协程可能会有很多,那么刚刚唤醒的协程就会一直无法持有锁(一直插队没完没了了),明明是它先到的,却一直无法获得锁。
const (
starvationThresholdNs = 1e6
)
为了避免这种情况,Go 设置了一个等待阈值starvationThresholdNs
,如果有协程超过 1ms 仍未持有锁,互斥锁就会进入饥饿模式。在饥饿模式下,互斥锁的所有权会直接移交给等待队列中最前面的协程,新来的协程不会尝试持有锁,而且进入队尾等待。就这样,饥饿模式下互斥锁的所有权会全部由等待队列中协程逐个持有(让排队的人先获得锁,插队的后面去),当协程成功持有锁后,如果自己是最后一个等待协程或等待时间小于 1ms,就会将互斥锁切换回正常模式。这种饥饿模式的设计,就避免了一些协程长时间无法持有锁而“饿死”的情况。
TryLock
互斥锁提供了两个方法来进行加锁:
Lock()
,以阻塞的方式获取锁TryLock()
,非阻塞的方式获取锁
先来看看TryLock
的代码,因为它的实现更简单
func (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
return true
}
它开始时会进行检查,如果锁已经被持有了,或者处于饥饿状态(即很多协程正在等待锁),那么当前协程无法获得锁。否则的话通过 CAS 操作尝试更新状态为 mutexLocked
,如果 CAS 操作返回false
,则表示在此期间有其它协程成功获得了锁,那么当前协程无法获得锁,否则成功获得锁。从这里的代码可以看出, TryLock()
的调用者就是那个尝试插队的人,因为它不管有没有协程正在等待,就直接抢夺锁(old 可能不等于 0)。
Lock
下面是Lock
的代码,它也会使用 CAS 操作来尝试直接持有锁,只不过它更“老实”,它只会在没有协程阻塞等待时才会去直接持有锁(old=0)。
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
倘若它发现有协程正在阻塞等待,那么它就会“老实”地到后面排队,进入lockslow
自旋流程等待锁(互斥锁的核心)。首先会准备一些变量
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
waitStartTime
: 用于记录等待开始的时间,检查是否进入饥饿模式。starving
: 表示当前协程是否已经超过 1ms 未获得锁。awoke
: 标记当前协程是否已被唤醒。iter
: 计数器,记录自旋的次数。old
: 获取当前互斥锁的状态
然后进入 for 循环,判断当前协程能否进入自旋状态。
自旋是一种多线程间同步机制,又称为忙等待(busy-waiting),线程未持有锁时不会直接挂起切换线程上下文而是进入空转,这个过程中一直占用 CPU 时间片,如果是在锁竞争不大或是持有锁的时间很短的场情况下,这样做确实可以避免频繁切换线程上下文,能够有效地提高性能,然而它并不是万能的,在 Go 语言中滥用自旋有可能导致以下危险的后果:
- CPU 占用过高:自旋的协程过多会消耗大量的 CPU 资源,尤其是在锁被占用的时间较长时
- 影响协程调度:处理器 P 的总数量是有限的,如果有很多自旋的协程占用了 P,那么其它执行用户代码的协程就无法及时的被调度
- 缓存一致性问题:自旋锁的忙等待特性会导致线程在高速缓存(cache)中反复读取锁的状态,如果自旋的协程在不同的核心上运行,并且锁的状态没有被及时更新到全局内存中,导致协程读到的锁状态不准确,并且频繁的缓存一致性同步也会显著降低性能,
所以并不是所有的协程都能够进入自旋状态,它需要经过以下的严格判断
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
...
}
条件如下:
当前锁已经被持有且不能处于饥饿状态,否则意味着已经有协程长时间无法获得锁,这时候直接进入阻塞流程。
进入
runtime.sync_runtime_canSpin
判断流程const ( active_spin = 4 ) func sync_runtime_canSpin(i int) bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }
自旋次数小于
runtime.active_spin
,默认是4次,次数多了浪费资源。CPU核数大于1,单核系统自旋没有任何意义。
当前的
gomaxprocs
大于空闲 P 和正在自旋的 P 数量之和加 1,即说明当前没有足够的可用处理器来进行自旋当前P的本地队列必须空的,否则说明有其它用户任务要执行,不能进行自旋
如果判断可以自旋的话,就会调用runtime.sync_runtime_doSpin
进入自旋,实际上它就是执行了30次PAUSE
指令。
const (
active_spin_cnt = 30
)
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
如果不能进行自旋,就只会有两种下场:成功获得锁和进入等待队列陷入阻塞,不过在此之前还有很多事处理:
- 如果不为饥饿模式,就尝试获取锁
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
- 如果锁已经被占用或现在是饥饿模式,则等待的协程数+1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
- 如果当前协程已经处于饥饿状态,且锁仍然被占用,则进入饥饿模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
- 如果当前协程自旋被唤醒,则加上
mutexWoken
标志
if awoke {
new &^= mutexWoken
}
然后就开始尝试通过 CAS 去更新锁的状态,更新失败就直接开始下一轮循环
if atomic.CompareAndSwapInt32(&m.state, old, new) {
...
}else {
...
}
更新成功的话就开始下面的判断。
原状态并非饥饿模式,且没有协程占用锁,那么当前协程可以直接持有锁,退出流程,继续执行用户代码。
if old&(mutexLocked|mutexStarving) == 0 { break }
尝试持有锁失败,记录下等待时间,其中 LIFO 如果为 true,表示队列后进先出,否则就是 FIFO 先进先出。
queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() }
尝试获取信号量,进入
runtime.semacquire1
函数,如果能获取信号量就直接返回不会阻塞,否则的话就会调用runtime.gopark
挂起当前协程等待信号量的释放。runtime_SemacquireMutex(&m.sema, queueLifo, 1)
走到这一步有两种可能,一是直接成功获取信号量,二是阻塞刚刚被唤醒成功获得信号量,不管是哪一种都成功获得了信号量,如果现在是饥饿模式,就可以直接获得锁。
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break }
如果不是饥饿模式,则重置
iter
,重新开始自旋流程。awoke = true iter = 0
至此,加锁的流程就结束了,整个过程比较复杂,过程中用到了自旋等待和信号量阻塞等待两种方式,平衡了性能和公平性,适用于大多数的锁竞争情况。
Unlock
解锁的流程相对而言要简单很多,它首先会尝试快速解锁,如果new
为 0 的话表示现在没有等待协程,且不是饥饿模式,即解锁成功,可以直接返回。
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
否则就需要进入unlockslow
的流程
首先判断是否已经解锁了
if (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") }
如果是饥饿模式,就直接释放信号量,完成解锁。在饥饿模式下,当前解锁的协程将直接将锁的所有权交给下一个等待的协程。
if new&mutexStarving == 0 { ... } else { runtime_Semrelease(&m.sema, true, 1) }
不是饥饿模式,进入正常解锁流程
如果没有协程正在等待,或者有其它被唤醒的协程已经获得了锁,又或者说锁进入了饥饿模式
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return }
否则的话,就释放信号量唤醒下一个等待的协程,将当前锁的状态设置为
mutexWoken
new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state
最后,解锁的流程就结束了。
RWMutex
读写互斥锁RWMutex
的类型定义位于sync/rwmutex.go
文件中,它的实现也基于互斥锁。
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
}
下面是各个字段的释义
w
,一个互斥锁,写者协程持有该互斥锁时,其它写者协程和读者协程将被阻塞。writerSem
,写信号量,用于阻塞写者协程来等待读者协程,写者协程获取信号量,读者协程释放信号量。readerSem
,读信号量,用于阻塞读者协程来等待写者协程,读者协程获取信号量,写者协程释放信号量。readerCount
,核心字段,整个读写锁都靠它来维护状态。readerWait
,表示写者协程被阻塞时,需要等待的读者协程个数
读写锁大致的原理就是,通过互斥锁来使得写者协程间互斥,通过两个信号量writerSem
和readerSem
来使得读写互斥,读读共享。
readerCount
由于这个readerCount
变化比较多,且很重要,所以单独拎出来说,它大致上归纳为以下几种状态
- 0,当前既没有读者协程活跃也没有写者协程活跃,处于空闲的状态
-rwmutexMaxReaders
,一个写者协程已经持有了互斥锁,当前没有活跃的读者协程-rwmutexMaxReaders+N
,一个写者协程已经持有了写锁,当前的读者协程需要阻塞等待写者协程释放写锁- ``N-rwmutexMaxReaders`,一个写者协程已经持有了互斥锁,需要阻塞等待剩余读者协程释放读锁
N
,当前有 N 个活跃读者协程,即加了N
个读锁,没有活跃的写者协程
其中rwmutexMaxReaders
是一个常量值,它的值是互斥锁可以阻塞等待协程数量的 2 倍,因为一半是读者协程,一半是写者协程。
const rwmutexMaxReaders = 1 << 30
整个读写锁部分就这个readerCount
比较复杂,理解了它的变化也就搞明白了读写锁的工作流程。
TryLock
还是老样子,先来看看最简单的TryLock()
func (rw *RWMutex) TryLock() bool {
if !rw.w.TryLock() {
return false
}
if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
rw.w.Unlock()
return false
}
return true
}
开始时,它会尝试调用互斥锁的TryLock()
,如果失败了就直接返回。然后用 CAS 操作尝试去将readerCount
的值从 0 更新为 -rwmutexMaxReaders
。0 代表的是没有正在工作的读者协程,-rwmutexMaxReaders
表示现在写者协程已经持有了互斥锁。CAS 操作更新失败就将互斥锁解锁,成功的话就返回 true
。
Lock
接下来是Lock()
,它的实现也很简单。
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}
首先它会跟其它的写者协程竞争直到持有互斥锁,然后进行这么一个操作,先原子地减去-rwmutexMaxReaders
,然后再将得到的新值非原子地加上 rwmutexMaxReaders
r = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
拆成两步来看
这是为了通知其它读者协程现在正有写者协程尝试持有锁,在
TryLock
部分已经讲过了。rw.readerCount.Add(-rwmutexMaxReaders)
又加上
rwmutexMaxReaders
得到了 r,这个 r 代表的就是现在正在工作的读者协程数量。r = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
然后判断是否有读者协程正在工作,然年后将readerWait
的值加r
,最终还是不为 0 的话表示需要等待这些读者协程工作完,则进入 runtime_SemacquireRWMutex
流程尝试获取信号量writerSem
,该信号量是由读者协程释放的,如果能拿到信号量就表示读者协程已经工作完毕,否则的话就需要进入阻塞队列等待(这部分信号量的逻辑跟互斥锁那块基本上一致)。
UnLock
然后是UnLock()
,释放写锁。
func (rw *RWMutex) Unlock() {
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
fatal("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}
它的流程如下
前面提到过在加锁的时候会将
readerCount
更新为负值,这里再加上rwmutexMaxReaders
,就表示现在没有写者协程正在工作,然后得到的值就是正在阻塞等待的读者协程数量。r := rw.readerCount.Add(rwmutexMaxReaders)
如果它本身就是 0 或大于 0,代表着写锁已经被释放了
if r >= rwmutexMaxReaders { fatal("sync: Unlock of unlocked RWMutex") }
释放信号量
readerSem
,唤醒等待的读者协程for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) }
最后再释放互斥锁,唤醒等待的写者协程。
rw.w.Unlock()
释放写锁完成。
TryRLock
接下来看看读锁部分,这是TryRLock
的代码
func (rw *RWMutex) TryRLock() bool {
for {
c := rw.readerCount.Load()
if c < 0 {
return false
}
if rw.readerCount.CompareAndSwap(c, c+1) {
return true
}
}
}
它总共只干两件事
判断是否有写者协程正在工作,有的话则加锁失败。
c := rw.readerCount.Load() if c < 0 { return false }
尝试将
readerCount
加 1,如果更新成功了则加锁成功if rw.readerCount.CompareAndSwap(c, c+1) { return true }
否则继续循环判断直到退出
可以看到这里依赖的readerCount
都是在写锁部分维护的,这也是为什么要先讲写锁的原因,因为复杂核心的地方都在写锁部分维护。
RLock
RLock
的逻辑更简单
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}
它会尝试将readerCount
的值加 1,如果得到的新值还是小于 0,说明写者协程正在工作,则进入readerSem
信号量阻塞流程,当前协程会进入阻塞队列等待。
RUnLock
RUnLock
的也是一样的简单易懂
func (rw *RWMutex) RUnlock() {
if r := rw.readerCount.Add(-1); r < 0 {
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
fatal("sync: RUnlock of unlocked RWMutex")
}
if rw.readerWait.Add(-1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
它首先会尝试讲readerCount
减一,表示活跃读者协程数量减一,如果得到的值大于 0 表示可以直接释放,因为现在没有写者协程持有互斥锁,小于 0 表示有写者协程已经持有了互斥锁,它正在等待当前的所有读者协程完成工作。接下来进入 runlockSlow
的流程
如果原来的
readerCount
值为 0(锁是空闲的)或者为-rwmutexMaxReaders
(写者协程没有需要等待的读者协程,即读锁已经全部释放),则表示当前没有活跃的读者协程,不需要解锁if r+1 == 0 || r+1 == -rwmutexMaxReaders { fatal("sync: RUnlock of unlocked RWMutex") }
如果有活跃的读者协程的话,则将
readerWait
减一,如果当前读者协程是最后一个活跃的读者,则释放writerSem
信号量,唤醒等待的写者协程。if rw.readerWait.Add(-1) == 0 { runtime_Semrelease(&rw.writerSem, false, 1) }
释放读锁的流程结束。
Semaphore
互斥锁里面的信号量就是一个单纯的uint32
整型,通过原子地减一和加一来表示信号量的获取和释放,在运行时负责维护信号量的结构是 runtime.semaRoot
,它的类型定义就位于runtime/sema.go
文件中。semaRoot
使用一个平衡二叉树(treap)来组织和管理信号量,树中的每一个节点代表一个信号量,节点类型是 *sudog
,它是一个双向链表,维护了对应信号量的等待队列, 节点通过 *sudog.elem
(信号量地址)保持唯一性,并通过 *sudog.ticket
字段保证树的平衡性。
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
}
信号量树semaRoot
依赖于一个更低层级的互斥锁runtime.mutex
来保证它的并发安全性。
var semtable semTable
// Prime to not correlate with any user patterns.
const semTabSize = 251
type semTable [semTabSize]struct {
root semaRoot
// 用于内存对齐,提高性能
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}
semaRoot
在运行时存储在一个全局的semaTable
中,它看起来是一个定长的数组,用来存储多个信号量树的根节点集合,但实际上从运作方式上来看,它其实就是一个哈希表。表中的每个元素包含一个 semaRoot
和一些填充字节(pad
),用于对齐内存和避免缓存行竞争。semTabSize
是信号量表的大小常量,指定了表的长度为 251,通常选择一个质数,可以减少哈希冲突,提高散列效率。
func (t *semTable) rootFor(addr *uint32) *semaRoot {
return &t[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}
rootFor
方法就相当于哈希函数,它接受一个 uint32
类型的指针 addr
(即信号量的地址),返回该地址所对应的 semaRoot
结构体的指针。这行代码首先将 addr
转换为一个 uintptr
,然后右移 3 位,相当于除以 8(因为一个字节占 8 位,指针地址除以 8 可以将其映射为数组的索引),通过对 semTabSize
取模,确保索引在信号量表的大小范围内,通过索引得到了semaRoot
后,再去平衡树里面寻找与信号量对应的*sudog
等待队列。
Acquire
获取信号量,对应的实现是runtime.semacquire1
函数,
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason)
它接收下面几个参数:
addr
,信号量的地址lifo
,影响平衡树的出队顺序,默认是 FIFO,LIFO 即后进先出,当协程等待锁的时间不为 0 时(至少已经阻塞过一次了),它就是true
profile
,用于锁性能分析的标志skipframes
,跳过的栈帧数目reason
,阻塞的原因
下面会简述信号量获取的整个流程:
判断协程状态,如果当前协程不是正在被调度的协程,直接抛出异常
gp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") }
判断是否能获取信号量,并且尝试通过非阻塞的方法获取信号量,如果能获取的话既可以直接返回。
for { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } }
如果不能非阻塞的获取,就会进入循环通过正常手段去获取信号量,首先通过
acquireSudog()
从缓存中获取一个*sudog
,该结构表示一个阻塞等待的协程s := acquireSudog()
然后从全局表中得到信号量树
root := semtable.rootFor(addr)
进入循环,给信号量树加锁,再次判断是否能获取信号量,不能的话就将其加入信号量树中,然后调用
gopark
将其挂起等待,直到被唤醒继续重复这一过程,一直循环到获得信号量。for { lockWithRank(&root.lock, lockRankRoot) root.nwait.Add(1) if cansemacquire(addr) { root.nwait.Add(-1) unlock(&root.lock) break } root.queue(addr, s, lifo) goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } }
最后被唤醒的时候会释放
*sudog
,将其归还至缓存中。releaseSudog(s)
Release
释放信号量,唤醒阻塞等待的协程,该功能由runtime.semrelease1
函数实现
func semrelease1(addr *uint32, handoff bool, skipframes int)
它接收如下参数
addr
,信号量地址handoff
,表示是否将当前 P 正在调度的 G 直接切换为唤醒的 G,仅在饥饿模式的时候为true
skipframes
,跳过的栈帧数
下面简述释放的整个过程:
获取信号量树,然后信号量加一,表示释放一个信号量
root := semtable.rootFor(addr) atomic.Xadd(addr, 1)
如果等待协程数为 0,则直接返回
if root.nwait.Load() == 0 { return }
给信号量树加锁,二次判断是否有等待的协程
lockWithRank(&root.lock, lockRankRoot) if root.nwait.Load() == 0 { unlock(&root.lock) return }
从信号量树中获得一个阻塞等待的协程,
nwait
减一,然后释放信号量的锁s, t0, tailtime := root.dequeue(addr) if s != nil { root.nwait.Add(-1) } unlock(&root.lock)
判断能否获取信号量
if handoff && cansemacquire(addr) { s.ticket = 1 }
readyWithTime
函数会直接将唤醒的协程 G 作为 P 下一个将要运行的协程,也就是修改*p.runnext=g
。readyWithTime(s, 5+skipframes)
倘若
handoff
为true
的话,那么goyield
就会让当前释放信号量的协程 G 与当前 M 解绑,并重新加入 P 本地运行队列的尾部,然后开始新一轮调度循环,以便可以让被唤醒的协程 G 立即得到调度if s.ticket == 1 && getg().m.locks == 0 { goyield() }
信号量的获取与释放的流程就是这些,Go 语言中用到信号量的不止互斥锁,放在这里是因为信号量跟互斥锁的关联性是最大的,官方甚至都在注释上写明了
// Asynchronous semaphore for sync.Mutex.
了解完信号量后,再回头去看互斥锁就会更加清晰。
提示
有关于semaRoot
信号量树,它的出队入队因为涉及自平衡操作的实现比较繁琐,深究这些跟本文的主题无关且没有意义,所以将其屏蔽掉了,感兴趣可以自行了解源代码。
小结
互斥锁sync.Mutex
通过自旋和信号量两种机制实现来协程的等待。自旋是非阻塞的,但需要严格限制使用,因为它会消耗 CPU 资源;而信号量则是阻塞的,能够有效避免不必要的资源消耗。为了实现更加公平的竞争机制,Go 通过区分正常模式和饥饿模式来保证协程在竞争锁的过程中能够更加平衡。与 runtime.mutex
这种底层锁相比,sync.Mutex
作为面向用户的锁,设计时考虑了更多的实际使用场景。
读写锁sync.RWMutex
,通过互斥锁sync.Mutex
来实现写写互斥,并在此基础上 额外增加了两个信号量,用于实现读写互斥和读读共享,从而支持多种并发场景。
虽然锁的实现看起来较为复杂,但一旦理解了 Mutex
的原理,再去学习 sync
标准库中的其他同步工具就会变得轻松许多。