cond
cond
sync.Cond
是 Go 标准库中的条件变量,它是唯一一个需要手动初始化的同步工具。与其他同步原语不同,sync.Cond
需要传入一个互斥锁 (sync.Mutex
) 来保护共享资源的访问。它允许协程在某个条件满足之前进入等待状态,并在条件满足时被唤醒。
示例代码
package main
import (
"fmt"
"sync"
"time"
)
var i = 0
func main() {
var mu sync.Mutex
var wg sync.WaitGroup
// 创建一个条件变量,并传入互斥锁
cd := sync.NewCond(&mu)
// 添加 4 个待处理的协程
wg.Add(4)
// 创建 3 个协程,每个协程都会等待条件满足
for j := range 3 {
go func() {
defer wg.Done()
mu.Lock()
for i <= 100 {
// 条件不满足时,协程会被阻塞在此
cd.Wait()
}
fmt.Printf("%d wake up\n", j)
mu.Unlock()
}()
}
// 创建一个协程,用来更新条件并唤醒其他协程
go func() {
defer wg.Done()
for {
mu.Lock()
i++ // 更新共享变量
mu.Unlock()
if i > 100 {
cd.Broadcast() // 条件满足时唤醒所有等待的协程
break
}
time.Sleep(time.Millisecond * 10) // 模拟工作负载
}
}()
// 等待所有协程完成
wg.Wait()
}
在上面的示例中,共享变量 i
被多个协程并发访问和修改。通过互斥锁 mu
来确保在并发条件下,访问 i
的操作是安全的。然后,通过 sync.NewCond(&mu)
创建了一个条件变量 cd
,它依赖于 mu
锁来保证在等待时对共享资源的访问是同步的。
- 三个等待的协程:每个协程通过
cd.Wait()
阻塞自己,直到条件满足(i > 100
)。这些协程会在共享资源i
的值更新之前一直处于阻塞状态。 - 一个更新条件并唤醒其他协程的协程:当条件满足时(即
i > 100
),这个协程通过cd.Broadcast()
唤醒所有等待的协程,让它们继续执行。
结构
type Cond struct {
// L is held while observing or changing the condition
L Locker
notify notifyList
}
type notifyList struct {
// wait is the ticket number of the next waiter. It is atomically
// incremented outside the lock.
wait atomic.Uint32
notify uint32
// List of parked waiters.
lock mutex
head *sudog
tail *sudog
}
其结构并不复杂:
L
,互斥锁,这里的类型是Locker
接口,而不是具体的锁类型notify
,等待协程的通知链表
比较重要的是runtime.notifyList
结构
wait
,原子值,记录了有多少个等待协程notify
,指向下一个将要被唤醒的协程,从 0 开始递增lock
,互斥锁,并不是我们传入的锁,而是runtime
内部实现的一个锁head
,tail
,链表指针
它总共就三个方法
Wait
, 阻塞等待Signal
,唤醒一个等待协程Broadcast
,唤醒所有等待协程
它的大部分实现都被隐藏在了runtime
库下,这些实现位于runtime/sema.go
文件中,以至于在标准库中它的代码非常简短,其基本原理就是一个加了锁的阻塞队列。
Wait
Wait
方法会让协程自身陷入阻塞等待,直到被唤醒。
func (c *Cond) Wait() {
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
它首先会将自身加入notifyList
中,但其实只是将notifyList.wait
加一而已,这里的操作就相当于len(notifyList)-1
,得到了最后一个元素的下标
func notifyListAdd(l *notifyList) uint32 {
return l.wait.Add(1) - 1
}
真正的加入操作是在notifyListWait
函数中完成
func notifyListWait(l *notifyList, t uint32) {
...
}
在该函数中,它首先会对链表进行上锁,然后快速判断当前协程是否已经被唤醒了,如果已经唤醒了就直接返回,不需要阻塞等待。
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
unlock(&l.lock)
return
}
如果没有被唤醒,则构造成sudog
加入队列,然后通过gopark
挂起。
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)
被唤醒后释放sudog
结构
releaseSudog(s)
Signal
Signal
会按照队列先入先出的顺序唤醒阻塞的协程
func (c *Cond) Signal() {
runtime_notifyListNotifyOne(&c.notify)
}
它的流程如下
不加锁直接判断,
l.wait
是否等于l.notify
,相等则表示所有协程都已经唤醒if l.wait.Load() == atomic.Load(&l.notify) { return }
加锁后,再判断一次是否都已经被唤醒
lockWithRank(&l.lock, lockRankNotifyList) t := l.notify if t == l.wait.Load() { unlock(&l.lock) return }
l.notify
加一atomic.Store(&l.notify, t+1)
循环遍历链表,找到需要被唤醒的协程,最后通过
runtime.goready
来唤醒协程。for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil readyWithTime(s, 4) return } } unlock(&l.lock)
Broadcast
Broadcast
会唤醒所有阻塞的协程
func (c *Cond) Broadcast() {
runtime_notifyListNotifyAll(&c.notify)
}
它的流程基本上是一致的
无锁检查,是否都已经被唤醒了
// Fast-path: if there are no new waiters since the last notification // we don't need to acquire the lock. if l.wait.Load() == atomic.Load(&l.notify) { return }
加锁,清空链表,然后释放锁,后续新到达的协程会被添加到链表头部
lockWithRank(&l.lock, lockRankNotifyList) s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, l.wait.Load()) unlock(&l.lock)
遍历链表,唤醒所有协程
for s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next }
小结
sync.Cond
最常见的使用场景是需要在多个协程之间同步某些条件,通常应用于生产者-消费者模型、任务调度等场景。在这些场景中,多个协程需要等待某些条件满足才能继续执行,或者需要在条件改变时通知多个协程。它提供了一种灵活且高效的方式来管理协程间的同步。通过与互斥锁配合使用, sync.Cond
可以确保共享资源的访问安全,并且可以在特定条件满足时控制协程的执行顺序。理解其内部实现原理有助于我们更好地掌握并发编程的技巧,尤其是在涉及复杂条件同步时。