syncmap

寒江蓑笠翁大约 11 分钟

syncmap

go标准库提供的sync.Map是一个并发安全的map,使用它时不需要使用锁之类的方式来控制,其实现不算特别复杂,去掉注释总共也就两三百行代码。

结构

type Map struct {
    mu Mutex
    read atomic.Pointer[readOnly]
    dirty map[any]*entry
    misses int
}

它总共只有四个字段,分别如下

  • read,只读的map,可以理解为对dirty的缓存
  • dirty,一个普通的map
  • misses,访问read时没有命中的次数
  • mu,保护dirty的并发安全

readsync.readonly类型,其内部依旧是一个map,其中的amended字段表示dirty是否包含read所没有的key。

type readOnly struct {
	m       map[any]*entry
	amended bool // true if the dirty map contains some key not in m.
}

另外entry类型结构如下,p是一个指向value的指针。

type entry struct {
	p atomic.Pointer[any]
}

对于一个entry而言,它有三种可能的情况

  1. 正常情况,存放了对应的值
  2. pnil,表示该键值对已被删除,此时dirty可能为空,或者其可能依旧存在于dirty中。
  3. p == expungedexpunged是一个空的接口对象,同样代表了键值对已经被删除且不存在于dirty中。

标准库map的并发安全是通过读写分离来实现的,readdirty所存储的entry指针都是指向的同一片value,read是只读的,所以多个协程访问时也不会有安全问题,dirty是可以被修改的,受到互斥锁的保护,misses记录了key访问没有命中的次数,当次数累计到一定的值后,当前的dirty就会转变为readmisses清零,这就是sync.Map大致的工作逻辑,后续会对其操作进行更加细致的分析。

读操作对应Map.Load方法,代码如下

func (m *Map) Load(key any) (value any, ok bool) {
	read := m.loadReadOnly()
	e, ok := read.m[key]
	if !ok && read.amended {
		m.mu.Lock()
		read = m.loadReadOnly()
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}

它首先会访问read,如果存在的话就直接返回,否则就会去尝试持有mu互斥锁,然后再去访问read,因为在获得锁的期间dirty有可能晋升为read,倘若还是没有找到,最终就会去访问dirty,并记录一次miss,然后解锁。

func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	m.read.Store(&readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

通过missLocked方法可以看出,dirty晋升为read的阈值条件是m.misses >= len(m.dirty)

写操作对应的是Store方法,不过实际上也是由Swap方法来完成,previous代表着先前的值,loaded表示key是否存在。

func (m *Map) Swap(key, value any) (previous any, loaded bool)

写操作的流程分为两部分,如果访问的key存在于read中的话,那么就会直接获取对应的entry,然后通过CAS来更新entry的值,期间不需要上锁。

read := m.loadReadOnly()
if e, ok := read.m[key]; ok {
    if v, ok := e.trySwap(&value); ok {
        if v == nil {
            return nil, false
        }
        return *v, true
    }
}

在自旋的期间,如果p == expunged则代表着该key已经被删除了,就会直接返回。

func (e *entry) trySwap(i *any) (*any, bool) {
	for {
		p := e.p.Load()
		if p == expunged {
			return nil, false
		}
		if e.p.CompareAndSwap(p, i) {
			return p, true
		}
	}
}

如果key不存在于read中,就会尝试获取锁来进行接下来的操作,接下来分三种情况。第一种情况,在获取锁的期间dirty晋升为了read,如果访问到的entryexpunged,则说明它已经被删除了,且不存在于dirty中,这时需要将其添加到dirty中,然后再存储对应的值。

read = m.loadReadOnly()
if e, ok := read.m[key]; ok {
    if e.unexpungeLocked() {
        m.dirty[key] = e
    }
    if v := e.swapLocked(&value); v != nil {
        loaded = true
        previous = *v
    }
}

第二种情况,read中没有,但是dirty中有,也是直接存储对应的值

if e, ok := m.dirty[key]; ok {
    if v := e.swapLocked(&value); v != nil {
        loaded = true
        previous = *v
    }
}

第三种情况,read中没有,dirty中也没有,在这里如果read.amendedfalse的话,代表着dirty是空的,然后会使用m.dirtyLocked将read中所有未删除的键值对复制到ditry中,然后将read.amended标记为true,最后会直接新建一个entry来存放对应的值。

else {
    if !read.amended {
        // We're adding the first new key to the dirty map.
        // Make sure it is allocated and mark the read-only map as incomplete.
        m.dirtyLocked()
        m.read.Store(&readOnly{m: read.m, amended: true})
    }
    m.dirty[key] = newEntry(value)
}

func (m *Map) dirtyLocked() {
	if m.dirty != nil {
		return
	}

	read := m.loadReadOnly()
	m.dirty = make(map[any]*entry, len(read.m))
	for k, e := range read.m {
		if !e.tryExpungeLocked() {
			m.dirty[k] = e
		}
	}
}

删除操作对应的是LoadAndDelete方法,它的思路与读操作几乎完全一致,只是多了一个delete函数的调用。

func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
	read := m.loadReadOnly()
	e, ok := read.m[key]
	if !ok && read.amended {
		m.mu.Lock()
		read = m.loadReadOnly()
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
			delete(m.dirty, key)
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if ok {
		return e.delete()
	}
	return nil, false
}

删除键值对的时候永远只会对ditry执行delete操作,对应read而言,只会将它所存储的entry的值修改为nil

func (e *entry) delete() (value any, ok bool) {
	for {
		p := e.p.Load()
		if p == nil || p == expunged {
			return nil, false
		}
		if e.p.CompareAndSwap(p, nil) {
			return *p, true
		}
	}
}

遍历

遍历操作对应着Range方法

func (m *Map) Range(f func(key, value any) bool) {
	read := m.loadReadOnly()
	if read.amended {
		m.mu.Lock()
		read = m.loadReadOnly()
		if read.amended {
			read = readOnly{m: m.dirty}
			m.read.Store(&read)
			m.dirty = nil
			m.misses = 0
		}
		m.mu.Unlock()
	}

	for k, e := range read.m {
		v, ok := e.load()
		if !ok {
			continue
		}
		if !f(k, v) {
			break
		}
	}
}

在遍历时只会遍历read,如果read.amendedtrue,代表read中的key有缺失,这时会直接将ditry晋升为read,然后通过for range循环来遍历,并对每一个键值对调用回调函数。

性能

sync.Map采用了读写分离的方式来进行并发控制,它更适合读多写少的场景,因为在大部分情况下访问一个键值对的时候不需要加锁。但是如果要新增一个元素的话,就需要持有一个全局锁,它会阻塞当前map的所有操作,这就导致了写性能的低下,所以sync.Map并不适用于所有情况,对于读少写多的情况,可以采用分段锁的方式来实现,这样可以避免阻塞全局,这里推荐一个开源实现orcaman/concurrent-map: a thread-safe concurrent map for go (github.com)open in new window,采用分片的方式实现,且支持泛型,在性能和使用体验上都会好一些。