内容目录
概述
go语言中的map并不是并发安全的,在Go 1.6之前,并发读写map会导致读取到脏数据,在1.6之后则程序直接panic,所以go 1.9之前的解决方案是额外绑定一个锁,封装成一个新的struct或者单独使用锁都可以。直到sync.Map出现提供了一种空间换时间有效减少锁的实现方法。
原理
为了减少并发抢锁导致的阻塞,sync.Map分出了read和dirty两个map,里面存的都是指针。存、删和查都先操作read,并用atomic进行并发保护,速度较快,直到read不能满足需求才去操作dirty,操作dirty的时用Mutex锁进行并发保护,速度较慢。
源码分析
主要结构
//用于保存value的interface指针,通过atomic进行原子操作 type entry struct { p unsafe.Pointer // *interface{} }
//Map.read 用的就是readOnly,对其进行操作的时候,使用atomic进行保护 type readOnly struct { m map[interface{}]*entry // 存储写入的数据 amended bool // 如果Map.dirty有些数据不在中的时候,这个值为true }
//sync.Map的主结构 type Map struct { mu Mutex // 锁,操作dirty的时候用的 read atomic.Value // 存的是readOnly结构体,用atomic保护进行操作,无需加锁 dirty map[interface{}]*entry//加锁进行操作,和read构成冗余,misses达到len(dirty)后升级为read misses int// 未命中read的次数 }
主要方法
//加载方法,也就是提供一个键key,查找对应的值value,如果不存在,通过ok反映 func (m *Map) Load(key interface{}) (value interface{}, ok bool) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] // 不存在,且dirty中有新数据 if !ok && read.amended { m.mu.Lock()//加锁 // 双检查,避免加锁的时候m.dirty提升为m.read,这个时候m.read可能被替换了。 read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { e, ok = m.dirty[key] // 不管m.dirty中存不存在,都将misses计数加一 // missLocked()中满足条件后就会提升m.dirty m.missLocked() } m.mu.Unlock() } if !ok { return nil, false } return e.load()// 使用原子操作读取数据 } // Map.misses += 1, 如果misses == len(dirty) ,dirty升级为read ,然后dirty指向nil func (m *Map) missLocked() { m.misses++ if m.misses < len(m.dirty) { return } m.read.Store(readOnly{m: m.dirty}) m.dirty = nil m.misses = 0 }
// 更新或者新增一个entry func (m *Map) Store(key, value interface{}) { read, _ := m.read.Load().(readOnly) // 如果m.read存在这个键,并且这个entry没有被标记删除,尝试直接存储。 // 因为m.dirty也指向这个entry,所以m.dirty也保持最新的entry。 if e, ok := read.m[key]; ok && e.tryStore(&value) { return } // 如果m.read不存在或者已经被标记删除 m.mu.Lock() read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok {//m.read存在并已被标记删除时 if e.unexpungeLocked() {//标记成未被删除 m.dirty[key] = e //m.dirty中不存在这个键,所以加入m.dirty } e.storeLocked(&value)//更新 } else if e, ok := m.dirty[key]; ok {// m.dirty存在这个键时 e.storeLocked(&value)//更新 } else {// key不在read里面,也不在dirty里面时 if !read.amended {// amended 若为false,则表示dirty未被初始化过 m.dirtyLocked()// 初始化dirty,将dirty中未被删除的数据全都复制到dirty中,read中指向nil的数据才会被标记为expunged m.read.Store(readOnly{m: read.m, amended: true})//将amended改为true } m.dirty[key] = newEntry(value)// 将值存入dirty } m.mu.Unlock() } func (m *Map) dirtyLocked() { if m.dirty != nil { return } read, _ := m.read.Load().(readOnly) m.dirty = make(map[interface{}]*entry, len(read.m)) for k, e := range read.m { if !e.tryExpungeLocked() { m.dirty[k] = e } } } func (e *entry) tryExpungeLocked() (isExpunged bool) { p := atomic.LoadPointer(&e.p) for p == nil { // 将已经删除标记为nil的数据标记为expunged if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { return true } p = atomic.LoadPointer(&e.p) } return p == expunged }
//删除一个键值 func (m *Map) Delete(key interface{}) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] if !ok && read.amended {//不在read中,且dirty中有新数据 m.mu.Lock() //双检查 read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { delete(m.dirty, key)//直接删除dirty的数据 } m.mu.Unlock() } if ok { // read中存在key,将这个key标记为删除状态,但并不删除数据 e.delete() } }
//遍历map(通过回调的方式) func (m *Map) Range(f func(key, value interface{}) bool) { read, _ := m.read.Load().(readOnly) if read.amended {// amended==true,表示dirty中有read没有的数据,此时dirty的数据最全 m.mu.Lock() read, _ = m.read.Load().(readOnly) if read.amended {// 双检查,判断获取锁之前,该值是否变了 // 将dirty升级为read read = readOnly{m: m.dirty} m.read.Store(read) m.dirty = nil m.misses = 0 } m.mu.Unlock() } // 遍历read.m,将值传入回调函数f for k, e := range read.m { v, ok := e.load() if !ok { continue } if !f(k, v) { break } } }
特别提醒
sync.Map在初始化时会将read中所有未删除的数据复制到dirty,而频繁往map中插入新数据会导致dirty中有大量read中没有的数据,从而导致read的命中率过低,需要频繁调用锁进行操作,并且未命中次数达到len(dirty)后,dirty会被升级为read,再次有新数据插入的时候,又会重复dirty初始化的过程。这一系列流程均会造成较大的开销影响整体性能。
适用场景
综上sync.Map适用于读多更新多,新增少的场景。
注:这里更新多特指当read中存在该键且未被标记删除时更新操作,此场景可直接原子更新无需加锁。
注:这里更新多特指当read中存在该键且未被标记删除时更新操作,此场景可直接原子更新无需加锁。
——
虽然说操作是原子的, 但是它前面还是有锁保持着. 可能锁的开销较小, 很快就释放.
我的理解正确吗
read, _ := m.read.Load().(readOnly)
// 如果m.read存在这个键,并且这个entry没有被标记删除,尝试直接存储。
// 因为m.dirty也指向这个entry,所以m.dirty也保持最新的entry。
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
你跟下e.tryStore(&value)的代码
atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i))
这里是原子操作并没有锁的