跳至主要內容

syncmap

寒江蓑笠翁2024年2月19日大约 11 分钟

syncmap

go 标准库提供的sync.Map是一个并发安全的 map,使用它时不需要使用锁之类的方式来控制,其实现不算特别复杂,去掉注释总共也就两三百行代码。

结构

type Map struct {
    mu Mutex
    read atomic.Pointer[readOnly]
    dirty map[any]*entry
    misses int
}

它总共只有四个字段,分别如下

readsync.readonly类型,其内部依旧是一个 map,其中的amended字段表示dirty是否包含read所没有的 key。

type readOnly struct {
  m       map[any]*entry
  amended bool // true if the dirty map contains some key not in m.
}

另外entry类型结构如下,p是一个指向 value 的指针。

type entry struct {
  p atomic.Pointer[any]
}

对于一个 entry 而言,它有三种可能的情况

  1. 正常情况,存放了对应的值
  2. pnil,表示该键值对已被删除,此时 dirty 可能为空,或者其可能依旧存在于 dirty 中。
  3. p == expungedexpunged是一个空的接口对象,同样代表了键值对已经被删除且不存在于 dirty 中。

标准库 map 的并发安全是通过读写分离来实现的,readdirty所存储的entry指针都是指向的同一片 value,read是只读的,所以多个协程访问时也不会有安全问题,dirty是可以被修改的,受到互斥锁的保护,misses记录了 key 访问没有命中的次数,当次数累计到一定的值后,当前的dirty就会转变为readmisses清零,这就是sync.Map大致的工作逻辑,后续会对其操作进行更加细致的分析。

读操作对应Map.Load方法,代码如下

func (m *Map) Load(key any) (value any, ok bool) {
  read := m.loadReadOnly()
  e, ok := read.m[key]
  if !ok && read.amended {
    m.mu.Lock()
    read = m.loadReadOnly()
    e, ok = read.m[key]
    if !ok && read.amended {
      e, ok = m.dirty[key]
      m.missLocked()
    }
    m.mu.Unlock()
  }
  if !ok {
    return nil, false
  }
  return e.load()
}

它首先会访问 read,如果存在的话就直接返回,否则就会去尝试持有mu互斥锁,然后再去访问 read,因为在获得锁的期间 dirty 有可能晋升为 read,倘若还是没有找到,最终就会去访问 dirty,并记录一次 miss,然后解锁。

func (m *Map) missLocked() {
  m.misses++
  if m.misses < len(m.dirty) {
    return
  }
  m.read.Store(&readOnly{m: m.dirty})
  m.dirty = nil
  m.misses = 0
}

通过missLocked方法可以看出,dirty 晋升为 read 的阈值条件是m.misses >= len(m.dirty)

写操作对应的是Store方法,不过实际上也是由Swap方法来完成,previous代表着先前的值,loaded表示 key 是否存在。

func (m *Map) Swap(key, value any) (previous any, loaded bool)

写操作的流程分为两部分,如果访问的 key 存在于 read 中的话,那么就会直接获取对应的entry,然后通过 CAS 来更新entry的值,期间不需要上锁。

read := m.loadReadOnly()
if e, ok := read.m[key]; ok {
    if v, ok := e.trySwap(&value); ok {
        if v == nil {
            return nil, false
        }
        return *v, true
    }
}

在自旋的期间,如果p == expunged则代表着该 key 已经被删除了,就会直接返回。

func (e *entry) trySwap(i *any) (*any, bool) {
  for {
    p := e.p.Load()
    if p == expunged {
      return nil, false
    }
    if e.p.CompareAndSwap(p, i) {
      return p, true
    }
  }
}

如果 key 不存在于 read 中,就会尝试获取锁来进行接下来的操作,接下来分三种情况。第一种情况,在获取锁的期间 dirty 晋升为了 read,如果访问到的entryexpunged,则说明它已经被删除了,且不存在于 dirty 中,这时需要将其添加到 dirty 中,然后再存储对应的值。

read = m.loadReadOnly()
if e, ok := read.m[key]; ok {
    if e.unexpungeLocked() {
        m.dirty[key] = e
    }
    if v := e.swapLocked(&value); v != nil {
        loaded = true
        previous = *v
    }
}

第二种情况,read 中没有,但是 dirty 中有,也是直接存储对应的值

if e, ok := m.dirty[key]; ok {
    if v := e.swapLocked(&value); v != nil {
        loaded = true
        previous = *v
    }
}

第三种情况,read 中没有,dirty 中也没有,在这里如果read.amendedfalse的话,代表着 dirty 是空的,然后会使用m.dirtyLocked将 read 中所有未删除的键值对复制到 ditry 中,然后将read.amended标记为true,最后会直接新建一个 entry 来存放对应的值。

else {
    if !read.amended {
        // We're adding the first new key to the dirty map.
        // Make sure it is allocated and mark the read-only map as incomplete.
        m.dirtyLocked()
        m.read.Store(&readOnly{m: read.m, amended: true})
    }
    m.dirty[key] = newEntry(value)
}

func (m *Map) dirtyLocked() {
  if m.dirty != nil {
    return
  }

  read := m.loadReadOnly()
  m.dirty = make(map[any]*entry, len(read.m))
  for k, e := range read.m {
    if !e.tryExpungeLocked() {
      m.dirty[k] = e
    }
  }
}

删除操作对应的是LoadAndDelete方法,它的思路与读操作几乎完全一致,只是多了一个delete函数的调用。

func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
  read := m.loadReadOnly()
  e, ok := read.m[key]
  if !ok && read.amended {
    m.mu.Lock()
    read = m.loadReadOnly()
    e, ok = read.m[key]
    if !ok && read.amended {
      e, ok = m.dirty[key]
      delete(m.dirty, key)
      m.missLocked()
    }
    m.mu.Unlock()
  }
  if ok {
    return e.delete()
  }
  return nil, false
}

删除键值对的时候永远只会对 ditry 执行delete操作,对应 read 而言,只会将它所存储的 entry 的值修改为nil

func (e *entry) delete() (value any, ok bool) {
  for {
    p := e.p.Load()
    if p == nil || p == expunged {
      return nil, false
    }
    if e.p.CompareAndSwap(p, nil) {
      return *p, true
    }
  }
}

遍历

遍历操作对应着Range方法

func (m *Map) Range(f func(key, value any) bool) {
  read := m.loadReadOnly()
  if read.amended {
    m.mu.Lock()
    read = m.loadReadOnly()
    if read.amended {
      read = readOnly{m: m.dirty}
      m.read.Store(&read)
      m.dirty = nil
      m.misses = 0
    }
    m.mu.Unlock()
  }

  for k, e := range read.m {
    v, ok := e.load()
    if !ok {
      continue
    }
    if !f(k, v) {
      break
    }
  }
}

在遍历时只会遍历 read,如果read.amendedtrue,代表 read 中的 key 有缺失,这时会直接将 ditry 晋升为 read,然后通过for range循环来遍历,并对每一个键值对调用回调函数。

性能

sync.Map采用了读写分离的方式来进行并发控制,它更适合读多写少的场景,因为在大部分情况下访问一个键值对的时候不需要加锁。但是如果要新增一个元素的话,就需要持有一个全局锁,它会阻塞当前 map 的所有操作,这就导致了写性能的低下,所以sync.Map并不适用于所有情况,对于读少写多的情况,可以采用分段锁的方式来实现,这样可以避免阻塞全局,这里推荐一个开源实现orcaman/concurrent-map: a thread-safe concurrent map for go (github.com),采用分片的方式实现,且支持泛型,在性能和使用体验上都会好一些。