syncmap
syncmap
go 标准库提供的sync.Map
是一个并发安全的 map,使用它时不需要使用锁之类的方式来控制,其实现不算特别复杂,去掉注释总共也就两三百行代码。
结构
type Map struct {
mu Mutex
read atomic.Pointer[readOnly]
dirty map[any]*entry
misses int
}
它总共只有四个字段,分别如下
read
,只读的 map,可以理解为对dirty
的缓存dirty
,一个普通的 mapmisses
,访问read
时没有命中的次数mu
,保护dirty
的并发安全
read
是sync.readonly
类型,其内部依旧是一个 map,其中的amended
字段表示dirty
是否包含read
所没有的 key。
type readOnly struct {
m map[any]*entry
amended bool // true if the dirty map contains some key not in m.
}
另外entry
类型结构如下,p
是一个指向 value 的指针。
type entry struct {
p atomic.Pointer[any]
}
对于一个 entry 而言,它有三种可能的情况
- 正常情况,存放了对应的值
p
为nil
,表示该键值对已被删除,此时 dirty 可能为空,或者其可能依旧存在于 dirty 中。p == expunged
,expunged
是一个空的接口对象,同样代表了键值对已经被删除且不存在于 dirty 中。
标准库 map 的并发安全是通过读写分离来实现的,read
和dirty
所存储的entry
指针都是指向的同一片 value,read
是只读的,所以多个协程访问时也不会有安全问题,dirty
是可以被修改的,受到互斥锁的保护,misses
记录了 key 访问没有命中的次数,当次数累计到一定的值后,当前的dirty
就会转变为read
,misses
清零,这就是sync.Map
大致的工作逻辑,后续会对其操作进行更加细致的分析。
读
读操作对应Map.Load
方法,代码如下
func (m *Map) Load(key any) (value any, ok bool) {
read := m.loadReadOnly()
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
它首先会访问 read,如果存在的话就直接返回,否则就会去尝试持有mu
互斥锁,然后再去访问 read,因为在获得锁的期间 dirty 有可能晋升为 read,倘若还是没有找到,最终就会去访问 dirty,并记录一次 miss,然后解锁。
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(&readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
通过missLocked
方法可以看出,dirty 晋升为 read 的阈值条件是m.misses >= len(m.dirty)
。
写
写操作对应的是Store
方法,不过实际上也是由Swap
方法来完成,previous
代表着先前的值,loaded
表示 key 是否存在。
func (m *Map) Swap(key, value any) (previous any, loaded bool)
写操作的流程分为两部分,如果访问的 key 存在于 read 中的话,那么就会直接获取对应的entry
,然后通过 CAS 来更新entry
的值,期间不需要上锁。
read := m.loadReadOnly()
if e, ok := read.m[key]; ok {
if v, ok := e.trySwap(&value); ok {
if v == nil {
return nil, false
}
return *v, true
}
}
在自旋的期间,如果p == expunged
则代表着该 key 已经被删除了,就会直接返回。
func (e *entry) trySwap(i *any) (*any, bool) {
for {
p := e.p.Load()
if p == expunged {
return nil, false
}
if e.p.CompareAndSwap(p, i) {
return p, true
}
}
}
如果 key 不存在于 read 中,就会尝试获取锁来进行接下来的操作,接下来分三种情况。第一种情况,在获取锁的期间 dirty 晋升为了 read,如果访问到的entry
是expunged
,则说明它已经被删除了,且不存在于 dirty 中,这时需要将其添加到 dirty 中,然后再存储对应的值。
read = m.loadReadOnly()
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
}
第二种情况,read 中没有,但是 dirty 中有,也是直接存储对应的值
if e, ok := m.dirty[key]; ok {
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
}
第三种情况,read 中没有,dirty 中也没有,在这里如果read.amended
为false
的话,代表着 dirty 是空的,然后会使用m.dirtyLocked
将 read 中所有未删除的键值对复制到 ditry 中,然后将read.amended
标记为true
,最后会直接新建一个 entry 来存放对应的值。
else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(&readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read := m.loadReadOnly()
m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
删
删除操作对应的是LoadAndDelete
方法,它的思路与读操作几乎完全一致,只是多了一个delete
函数的调用。
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
read := m.loadReadOnly()
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}
删除键值对的时候永远只会对 ditry 执行delete
操作,对应 read 而言,只会将它所存储的 entry 的值修改为nil
。
func (e *entry) delete() (value any, ok bool) {
for {
p := e.p.Load()
if p == nil || p == expunged {
return nil, false
}
if e.p.CompareAndSwap(p, nil) {
return *p, true
}
}
}
遍历
遍历操作对应着Range
方法
func (m *Map) Range(f func(key, value any) bool) {
read := m.loadReadOnly()
if read.amended {
m.mu.Lock()
read = m.loadReadOnly()
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(&read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
在遍历时只会遍历 read,如果read.amended
为true
,代表 read 中的 key 有缺失,这时会直接将 ditry 晋升为 read,然后通过for range
循环来遍历,并对每一个键值对调用回调函数。
性能
sync.Map
采用了读写分离的方式来进行并发控制,它更适合读多写少的场景,因为在大部分情况下访问一个键值对的时候不需要加锁。但是如果要新增一个元素的话,就需要持有一个全局锁,它会阻塞当前 map 的所有操作,这就导致了写性能的低下,所以sync.Map
并不适用于所有情况,对于读少写多的情况,可以采用分段锁的方式来实现,这样可以避免阻塞全局,这里推荐一个开源实现orcaman/concurrent-map: a thread-safe concurrent map for go (github.com),采用分片的方式实现,且支持泛型,在性能和使用体验上都会好一些。