近期再写map的demo时出现了下面一段报错,于是带着疑惑去看了一下源码
目的:主要想知道为啥map不让并发读写
fatal error: concurrent map read and map write
一.map的数据结构
先有个印象,后续会详细介绍
// A header for a Go map.
type hmap struct {count int // 键值对数量flags uint8B uint8 // noverflow uint16 // hash0 uint32 // buckets unsafe.Pointer oldbuckets unsafe.Pointer nevacuate uintptr extra *mapextra // optional fields
}type mapextra struct {overflow *[]*bmap // 溢出桶oldoverflow *[]*bmap // 扩容时老的溢出桶nextOverflow *bmap // 下一个可用的溢出桶
}
bucket结构如下
// A bucket for a Go map.
type bmap struct {tophash [bucketCnt]uint8 // bucketCnt=8
}
实际上编译的时候会加点料,就是上图画的那个样子
type bmap struct {topbits [8]uint8keys [8]keytypevalues [8]valuetypepad uintptroverflow uintptr
}
二.创建map
func main() {mp := make(map[int]int, 32)mp[1] = 1
}
直接使用go tool compile -S main.go查看
1.makemap() 主要是初始化map
func makemap(t *maptype, hint int, h *hmap) *hmap {mem, overflow := math.MulUintptr(uintptr(hint), t.Bucket.Size_)if overflow || mem > maxAlloc {hint = 0}// initialize Hmapif h == nil {h = new(hmap)}h.hash0 = fastrand()// 找到一个B,使得map的装载因子(后续介绍)再正常范围内// 也就是 一个bucket能装8个key-val,根据你设定的key-val数量确定bucket数量// 又因为2^B=buckets 以此确定B的大小,有兴趣自己看源码B := uint8(0)for overLoadFactor(hint, B) {B++}h.B = Bif h.B != 0 {var nextOverflow *bmap// 创建bucket数组h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)if nextOverflow != nil {// 当B>=4时创建溢出桶overflowh.extra = new(mapextra)h.extra.nextOverflow = nextOverflow}}return h
}
三.前置知识(纯个人理解,我关注的重点不在这)
1.哈希函数
(纯个人实力有限,粗略理解)查找key过程使用哈希函数,我没过多了解,就是大概理解成数学中的函数y=f(x)(个人理解),就像下面的第一张函数图一样,x可以随意取值,经过f(x)计算后得到的值范围没有x的取值范围大,也就是说把x理解成输入源,y理解成输出源,这样容易出现可能不同的x值,算出来的y值一样,例如y=x^2,x取+-1,结果都是1,简单理解成哈希碰撞。
2.解决哈希碰撞办法
2.1 开放地址法
核心思想主要是依次探测和比较数组中的元素以判断目标键值对是否存在哈希表中。
个人理解:秉持着跟谁都不争的态度,有人住了就往下接着找个安静的地方呆着(按照索引大小向下)
2.2 拉链法
核心思想主要是发生冲突了就用链表连起来。
golang的map实现中就结合了这两种方法
四.查找key的过程,我剔除了无关的过程
再回顾一下bmap(bucket)的结构,只是一个tophash [bucketCnt]uint8,这个数组只是存储了key的对应哈希值的前8位,用于加速查找key和表示key处于某个状态
首先,确定key在哪个桶,也就是用key计算出的hash值 % buckets数组的长度的结果作为桶的索引
其次,index = x % y = x & (y -1) = hash & (2^B -1),假设B=2,那么index = hash & 0000 0011,也就是说index的值只与hash的后2位有关。
1.mapaccess1()
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {if h == nil || h.count == 0 {if t.HashMightPanic() {t.Hasher(key, 0) // see issue 23734}return unsafe.Pointer(&zeroVal[0])}// 看到一开始想知道为啥的报错信息了 不能并发读写mapif h.flags&hashWriting != 0 {fatal("concurrent map read and map write")}// 计算key对应的hash值hash := t.Hasher(key, uintptr(h.hash0))// 2^B - 1 后面做与运算m := bucketMask(h.B)// 用hash值与m做与运算求key所在的bucket 简单点理解就是做%模运算b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.BucketSize)))// 计算hash值前8位top := tophash(hash)
bucketloop:for ; b != nil; b = b.overflow(t) { // 外层遍历上面找到的bucket以及bucket内的overflow(溢出桶)for i := uintptr(0); i < bucketCnt; i++ { // 内层循环遍历bucket内的8个key-valif b.tophash[i] != top {// 如果hash前8位不相等if b.tophash[i] == emptyRest { // 这个emptyRest表示第i个后面的key-val都是空,不用找了,省点力气break bucketloop}// 接着比较下一个continue}// 找到了hash前8位相等的对应的keyk := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))if t.IndirectKey() {k = *((*unsafe.Pointer)(k))}// 接着比较bmap存储的key是否是自己想要找的if t.Key.Equal(key, k) { // 相等返回vale := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))if t.IndirectElem() {e = *((*unsafe.Pointer)(e))}return e}}}//没有找到就返回该map存储类型对应的零值return unsafe.Pointer(&zeroVal[0])
}
2.tophash() 计算hash值的前8位,并且给其加上一个minTopHash,主要是为了区分正常key的状态和一些key的特殊状态
// 空的 cell,也是初始时 bucket 的状态
empty = 0
// 空的 cell,表示 cell 已经被迁移到新的 bucket
evacuatedEmpty = 1
// key,value 已经搬迁完毕,但是 key 都在新 bucket 前半部分,
// 后面扩容部分会再讲到。
evacuatedX = 2
// 同上,key 在后半部分
evacuatedY = 3
// tophash 的最小正常值
minTopHash = 4
func tophash(hash uintptr) uint8 {top := uint8(hash >> (goarch.PtrSize*8 - 8))if top < minTopHash {top += minTopHash}return top
}
花了个个人理解的过程图,如下:
五.获取map值的两种方式
1.mapacees2()
第一种获取值的方式和上面四描述的一样,第二种也就是比第一种多了一个bool的返回值,表示key有没有命中。(先忽略map处于扩容的时候获取值,后续在展开)
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {if h == nil || h.count == 0 {if t.HashMightPanic() {t.Hasher(key, 0) // see issue 23734}return unsafe.Pointer(&zeroVal[0]), false}if h.flags&hashWriting != 0 {fatal("concurrent map read and map write")}hash := t.Hasher(key, uintptr(h.hash0))m := bucketMask(h.B)b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.BucketSize)))top := tophash(hash)
bucketloop:for ; b != nil; b = b.overflow(t) {for i := uintptr(0); i < bucketCnt; i++ {if b.tophash[i] != top {if b.tophash[i] == emptyRest {break bucketloop}continue}k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))if t.IndirectKey() {k = *((*unsafe.Pointer)(k))}if t.Key.Equal(key, k) {e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))if t.IndirectElem() {e = *((*unsafe.Pointer)(e))}return e, true}}}return unsafe.Pointer(&zeroVal[0]), false
}
六.扩容和遍历过程
1.mapiterinit()
这个函数主要是为了初始化hint这个结构体
type hiter struct {key unsafe.Pointer //key指针 elem unsafe.Pointer // value指针t *maptype // map类型h *hmap // map headerbuckets unsafe.Pointer // 初始化时指向的bucketbptr *bmap // 当前遍历到的 bucketoverflow *[]*bmap // 当前遍历到的 bucket的overflow(溢出桶)oldoverflow *[]*bmap // 老的bucket的overflow(溢出桶)startBucket uintptr // 起始遍历的bucket索引offset uint8 // 遍历开始的cell编号 每个bucket有8个cell(key-val键值对)wrapped bool // 是否从头遍历了B uint8 // B的大小i uint8 // 当前遍历的cell编号bucket uintptr // 当前指向的bucketcheckBucket uintptr // 因为扩容,需要检查的bucket
}
func mapiterinit(t *maptype, h *hmap, it *hiter) {// r 为随机数var r uintptrif h.B > 31-bucketCntBits {r = uintptr(fastrand64())} else {r = uintptr(fastrand())}// r & (2^B - 1) 这个也就是r%bucket数组的长度 结果就是0-2^B-1// 决定从哪个bucket遍历it.startBucket = r & bucketMask(h.B)// 这个(bucket-1)=0000 0111 r右移B位之后&(bucket-1) 的结果就是0-7it.offset = uint8(r >> h.B & (bucketCnt - 1))// iterator stateit.bucket = it.startBucketmapiternext(it)
}
1.扩容条件和种类
条件1:装载因子loadFactor = 总key-val数量 / buckets数组的长度(2^B)。当loadFactor>6.5时会触发扩容。至于为啥是6.5,注解里面写的很清楚
// 装载因子 溢出桶占比 键值对占比 key的命中占比 key没有命中占比
// loadFactor %overflow bytes/entry hitprobe missprobe
// 4.00 2.13 20.77 3.00 4.00
// 4.50 4.05 17.30 3.25 4.50
// 5.00 6.85 14.77 3.50 5.00
// 5.50 10.55 12.94 3.75 5.50
// 6.00 15.27 11.67 4.00 6.00
// 6.50 20.90 10.79 4.25 6.50
// 7.00 27.14 10.15 4.50 7.00
// 7.50 34.03 9.73 4.75 7.50
// 8.00 41.10 9.40 5.00 8.00
条件2:overflow溢出桶太多了:当overflow的数量大于等于2^B(也就是buckets数组的长度)
这个条件1主要是为了防止buckets的每一个bucket都装满了key-val,也就是key-val太多了,buckets数组装的太满。
这个条件2主要是为了弥补条件1的不足,就是会有这样的情况,当bucket的key-val很多的时候,但是loadFactor还没超过6.5也就触发不了条件1扩容2,然后不停的插入和删掉溢出桶的key-val,导致每个溢出桶里面的key-val都分布的不满不均匀(也就是房子很多,住的人很少),这样的查找效率就会很低
扩容种类1:增量扩容,也就是B+1,就是创建一个新的长度为原来buckets数组长度的两倍
对应条件1是因为key-val太多,bucket太少,所以就创建多点bucket
扩容种类2:等量扩容,也就是创建一个新的长度等于原来buckets数组
条件2是因为overflow溢出桶太多,那就重新创建bucket,然后整理overflow的key-val就行
图解:
需要注意的是增量扩容,扩容前的bucket里面的key-val会被打散分散到两个bucket。比如扩容前bucket0里面有两个key-val分别是key-val1和key-val2,那么扩容后会被分散到bucket0和bucket(2^B).下面解释为啥会被分散到这两个桶:
首先,确定key在哪个桶,也就是用key计算出的hash值 % buckets数组的长度的结果作为桶的索引
其次,index = x % y = x & (y -1) = hash & (2^B -1),假设B=2,那么index = hash & 0000 0011,也就是说index的值只与hash的后2位有关。
最后,增量扩容之后key的存放位置其实就是直接与hash的B+1位有直接关系。
例子:假设原本B=2,key1的hash值后三位101,key2的hash值后三位是001,因为B=2,所以就取后两位作为key的存放桶索引,因为两个key的后两位都是01也就是放到索引为1的bucket,扩容之后B=3,然后key1就放到了101也就是索引为5的bucket,key2也就放到了001也就是索引为1的bucket。
具体的扩容过程后续写、插入、修改key时展开,接下来接着看遍历过程。
2.遍历过程
mapiternext()
func mapiternext(it *hiter) {h := it.hif h.flags&hashWriting != 0 {fatal("concurrent map iteration and map write")}t := it.tbucket := it.bucket // 如果处于扩容过程此bucket是扩容后的bucketb := it.bptri := it.icheckBucket := it.checkBucketnext:// 一开始b默认就是nilif b == nil {if bucket == it.startBucket && it.wrapped { // 表示当前的遍历的bucket就是一开始遍历的bucket并且已经从头遍历过一轮buckets 例如 startBucket = 3 遍历顺序从 3 4 0 1 2 3重新回到原始的桶位置 此时还没找到就返回it.key = nilit.elem = nilreturn}if h.growing() && it.B == h.B { // 此时如果正在扩容oldbucket := bucket & it.h.oldbucketmask() // 先找到当前原本扩容前对应的老桶b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.BucketSize)))if !evacuated(b) { // 如果老桶的数据并没有迁移到新桶checkBucket = bucket // 标记此老桶} else {// 如果老桶的数据已经迁移到了新桶,那就遍历新桶b = (*bmap)(add(it.buckets, bucket*uintptr(t.BucketSize)))checkBucket = noCheck // 标记不需要检查老桶}} else { // 此时没有扩容 直接遍历当前的桶b = (*bmap)(add(it.buckets, bucket*uintptr(t.BucketSize)))checkBucket = noCheck // 标记不需要检查老桶}bucket++if bucket == bucketShift(it.B) { // 检查bucket++后是否到了最后一个bucketbucket = 0 it.wrapped = true // 标记从头遍历}i = 0}for ; i < bucketCnt; i++ { // 遍历选中的bucket的8个celloffi := (i + it.offset) & (bucketCnt - 1) // 随机决定从哪个cell往下从头遍历if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty { // 如果当前的cell是空 跳过continue}k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.KeySize))if t.IndirectKey() {k = *((*unsafe.Pointer)(k))}e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+uintptr(offi)*uintptr(t.ValueSize))if checkBucket != noCheck && !h.sameSizeGrow() { // 如果正在扩容并且不是等量扩容if t.ReflexiveKey() || t.Key.Equal(k, k) {hash := t.Hasher(k, uintptr(h.hash0))if hash&bucketMask(it.B) != checkBucket { // 过滤出老桶中扩容后属于新桶的key-valcontinue}} else {if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) {continue}}}if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) ||!(t.ReflexiveKey() || t.Key.Equal(k, k)) { // 老桶数据没迁移完就用老桶数据it.key = kif t.IndirectElem() {e = *((*unsafe.Pointer)(e))}it.elem = e} else { // 老桶迁移完就用新通数据rk, re := mapaccessK(t, h, k)// 检查一下数据在新桶是否存在if rk == nil {continue // key has been deleted}it.key = rkit.elem = re}it.bucket = bucketif it.bptr != b { // avoid unnecessary write barrier; see issue 14921it.bptr = b}it.i = i + 1it.checkBucket = checkBucketreturn}b = b.overflow(t) // 遍历下一个overflowi = 0 // cell索引置零goto next
}
个人理解的动画图:
假设当前map正在扩容,B此时等于2,buckets数组长度是4,oldmap的部分bucket数据为迁移如下图所示
假设这个时候开始遍历,情形如下图所示:
1.假设startbucket=1,startIndex=2,因为map正在扩容,这个时候遍历bucket1时会先去检查bucket1所属的老桶也就是左边的old bucket1,看看old bucket1数据有没有迁移完,没有迁移完就使用old bucket1数据右边 new bucket1的数据(也就是扩容之后数据还是存放在桶1的数据,主要看低B位数据,原因在上面扩容说过了),因为当前左边的old bucket1的数据已经迁移完成,所以就正常使用右边new bucket1的数据就行。遍历bucket内的数据按照startindex开始的顺序往下,到结尾就回到开头一直到再次回到startindex的位置。所以此时遍历得到的值如下:
2.此时完成循环遍历到右边的new bucket2,按照1所述的规则,检查到new bucket2的数据原本属于左边的old bucket0,又因为左边的old bucket0的数据还未迁移,所以此时会去old bucket0中找到扩容后数据右边new bucket2的数据(也就是lowbit=10)。此时遍历得到的值如下:
3.接着外层循环遍历到右边的new bucket3,按照规则, 此时遍历得到的值如下:
4.接着外层循环遍历到右边的new bucket0,此时遍历得到的值如下:
七.写和更新过程
1.mapassign()
注意:1.map的扩容不是一次性的,而是“渐进性”,也就是一点点的把数据搬到新的bucket。随着key的插入、更新、删除都会触发数据搬运,这三种操作一次最少搬运一个bucket数据,最多两个。2.value的插入和更新使用汇编完成。
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {if h == nil {panic(plainError("assignment to entry in nil map"))}// 如果当前有goroutine正在写map 直接fatalif h.flags&hashWriting != 0 {fatal("concurrent map writes")}hash := t.Hasher(key, uintptr(h.hash0))// 标记当前goroutine正在写maph.flags ^= hashWritingif h.buckets == nil {h.buckets = newobject(t.Bucket) // newarray(t.Bucket, 1)}again:bucket := hash & bucketMask(h.B) // 根据key的hash值对buckets数组长度取模找到key对应的bucket 这里用&运算代替取模操作if h.growing() { // 如果当前正在扩容growWork(t, h, bucket) // 下面解释}b := (*bmap)(add(h.buckets, bucket*uintptr(t.BucketSize))) // 找到key对应的buckettop := tophash(hash) // 获取key的hash值的前8位var inserti *uint8 // 插入的索引var insertk unsafe.Pointer // 插入的keyvar elem unsafe.Pointer // 插入的val
bucketloop:for { // 外层遍历bucket以及bucket内的overflowfor i := uintptr(0); i < bucketCnt; i++ { // 内层循环遍历bucket的8个cellif b.tophash[i] != top {if isEmpty(b.tophash[i]) && inserti == nil { // 记录第一个空的插槽,如果遍历完当前bucket以及overflow之后没有找到对应的key,就把值插入这个位置inserti = &b.tophash[i]insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))}if b.tophash[i] == emptyRest { // 这个emptyRest表示i后面的所有cell都是kong不用遍历了break bucketloop}continue}k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))if t.IndirectKey() {k = *((*unsafe.Pointer)(k))}if !t.Key.Equal(key, k) { // 前8位hash相等之后接着在比较一下keycontinue}// already have a mapping for key. Update it.if t.NeedKeyUpdate() { // 如果找到了对应的key,就执行更新操作typedmemmove(t.Key, k, key)}elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))goto done}ovf := b.overflow(t)if ovf == nil {break}b = ovf}if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) { // 如果当前没有扩容,并且key的数量太多或者overflow数量太多,就进行扩容hashGrow(t, h)goto again // Growing the table invalidates everything, so try again}if inserti == nil { // 到此如果inserti为nil表示上面全部遍历完还没找到对应的key并且也没有空的位置 所有就需要创建overflownewb := h.newoverflow(t, b)inserti = &newb.tophash[0]insertk = add(unsafe.Pointer(newb), dataOffset)elem = add(insertk, bucketCnt*uintptr(t.KeySize))}// store new key/elem at insert positionif t.IndirectKey() {kmem := newobject(t.Key)*(*unsafe.Pointer)(insertk) = kmeminsertk = kmem}if t.IndirectElem() {vmem := newobject(t.Elem)*(*unsafe.Pointer)(elem) = vmem}typedmemmove(t.Key, insertk, key) // 插入key*inserti = toph.count++done:if h.flags&hashWriting == 0 { //再次检查有没有别的goroutine在写mapfatal("concurrent map writes")}h.flags &^= hashWriting // 解除对此map的写状态if t.IndirectElem() {elem = *((*unsafe.Pointer)(elem))}// val的放入是用汇编写的return elem
}
2.hashGrow()
主要就是判断是需要进行增量扩容还是等量扩容,然后创建新buckets
func hashGrow(t *maptype, h *hmap) {// 默认进行增量扩容bigger := uint8(1)if !overLoadFactor(h.count+1, h.B) { // 如果不是因为装载因子超过阈值而触发的扩容,也就是进行等量扩容bigger = 0h.flags |= sameSizeGrow}oldbuckets := h.bucketsnewbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)flags := h.flags &^ (iterator | oldIterator)if h.flags&iterator != 0 {flags |= oldIterator}// commit the grow (atomic wrt gc)h.B += biggerh.flags = flagsh.oldbuckets = oldbucketsh.buckets = newbucketsh.nevacuate = 0h.noverflow = 0if h.extra != nil && h.extra.overflow != nil {// Promote current overflow buckets to the old generation.if h.extra.oldoverflow != nil {throw("oldoverflow is not nil")}h.extra.oldoverflow = h.extra.overflowh.extra.overflow = nil}if nextOverflow != nil {if h.extra == nil {h.extra = new(mapextra)}h.extra.nextOverflow = nextOverflow}// the actual copying of the hash table data is done incrementally// by growWork() and evacuate().
}
3.growWork()
扩容前夕,主要的搬运数据函数是evacuate(),这里的第一次搬运的数据是确认搬迁老的 bucket 对应正在使用的 bucket。第二次是搬运老桶数组中还没搬运的索引最小的哪个bucket里面的数据。
func growWork(t *maptype, h *hmap, bucket uintptr) {// make sure we evacuate the oldbucket corresponding// to the bucket we're about to useevacuate(t, h, bucket&h.oldbucketmask())// evacuate one more oldbucket to make progress on growingif h.growing() {evacuate(t, h, h.nevacuate)}
}
4.evacuate()
这里的xy的x表示新buckets的前一半,y就是后一半
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.BucketSize)))newbit := h.noldbuckets() // 老桶数组的长度if !evacuated(b) { // 如果还未迁移数据var xy [2]evacDstx := &xy[0]x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.BucketSize)))x.k = add(unsafe.Pointer(x.b), dataOffset)x.e = add(x.k, bucketCnt*uintptr(t.KeySize))if !h.sameSizeGrow() { // 增量扩容y := &xy[1]y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.BucketSize)))y.k = add(unsafe.Pointer(y.b), dataOffset)y.e = add(y.k, bucketCnt*uintptr(t.KeySize))}for ; b != nil; b = b.overflow(t) { // 外层遍历bucket以及bucket内的overflowk := add(unsafe.Pointer(b), dataOffset)e := add(k, bucketCnt*uintptr(t.KeySize))for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.KeySize)), add(e, uintptr(t.ValueSize)) { // 遍历bucket内的celltop := b.tophash[i]if isEmpty(top) { // 标记当前key已经被搬移b.tophash[i] = evacuatedEmptycontinue}if top < minTopHash {throw("bad map state")}k2 := kif t.IndirectKey() {k2 = *((*unsafe.Pointer)(k2))}var useY uint8 // 默认是0if !h.sameSizeGrow() { // 如果不是等量扩容hash := t.Hasher(k2, uintptr(h.hash0))// 这里的特例是相同的key两次算出的hash值不一样,只有在float变量的Nan()出现if h.flags&iterator != 0 && !t.ReflexiveKey() && !t.Key.Equal(k2, k2) {useY = top & 1top = tophash(hash)} else {if hash&newbit != 0 { // 这里就是hash % 新桶的长度 只不过是换成&运算useY = 1}}}if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {throw("bad evacuatedN")}// 表示数据搬移到Yb.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedYdst := &xy[useY] // evacuation destinationif dst.i == bucketCnt { // 如果当前cell溢出,申请一个溢出桶dst.b = h.newoverflow(t, dst.b)dst.i = 0dst.k = add(unsafe.Pointer(dst.b), dataOffset)dst.e = add(dst.k, bucketCnt*uintptr(t.KeySize))}dst.b.tophash[dst.i&(bucketCnt-1)] = top // 更新top hash值if t.IndirectKey() {*(*unsafe.Pointer)(dst.k) = k2 // 更新key指针} else {typedmemmove(t.Key, dst.k, k) // 更新key}if t.IndirectElem() {*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e) // 更新值指针} else {typedmemmove(t.Elem, dst.e, e) // 更新值}dst.i++ // 接着搬移下一个celldst.k = add(dst.k, uintptr(t.KeySize))dst.e = add(dst.e, uintptr(t.ValueSize))}}// 如果老的桶已经搬迁完没有goroutine使用让gc回收if h.flags&oldIterator == 0 && t.Bucket.PtrBytes != 0 {b := add(h.oldbuckets, oldbucket*uintptr(t.BucketSize))ptr := add(b, dataOffset)n := uintptr(t.BucketSize) - dataOffsetmemclrHasPointers(ptr, n)}}// 更新搬迁进度if oldbucket == h.nevacuate {advanceEvacuationMark(h, t, newbit)}
}
5.advanceEvacuationMark()
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {h.nevacuate++stop := h.nevacuate + 1024if stop > newbit {stop = newbit}for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {h.nevacuate++}if h.nevacuate == newbit { // 全部数据都迁移完成h.oldbuckets = nil // 清除老的bucketsif h.extra != nil {h.extra.oldoverflow = nil}h.flags &^= sameSizeGrow}
}
八.删除过程
1.mapdelete()
其实就是先看看是否在扩容,如果是就帮忙扩容,然后接着遍历bucket寻找要删除的key,找到就删除,然后就是判断是否需要给cell标记emptyRest
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {if h == nil || h.count == 0 {if t.HashMightPanic() {t.Hasher(key, 0) // see issue 23734}return}if h.flags&hashWriting != 0 {fatal("concurrent map writes")}hash := t.Hasher(key, uintptr(h.hash0))h.flags ^= hashWritingbucket := hash & bucketMask(h.B)if h.growing() {growWork(t, h, bucket)}b := (*bmap)(add(h.buckets, bucket*uintptr(t.BucketSize)))bOrig := btop := tophash(hash)
search:for ; b != nil; b = b.overflow(t) {for i := uintptr(0); i < bucketCnt; i++ {if b.tophash[i] != top {if b.tophash[i] == emptyRest {break search}continue}k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))k2 := kif t.IndirectKey() {k2 = *((*unsafe.Pointer)(k2))}if !t.Key.Equal(key, k2) {continue}// 删除keyif t.IndirectKey() {*(*unsafe.Pointer)(k) = nil} else if t.Key.PtrBytes != 0 {memclrHasPointers(k, t.Key.Size_)}e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))// 删除valueif t.IndirectElem() {*(*unsafe.Pointer)(e) = nil} else if t.Elem.PtrBytes != 0 {memclrHasPointers(e, t.Elem.Size_)} else {memclrNoHeapPointers(e, t.Elem.Size_)}b.tophash[i] = emptyOneif i == bucketCnt-1 { // 如果i是最后一个if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest { // 并且下一个overflow不为空和下一个overflow的第一个cell不为emptyRest goto notLast}} else {if b.tophash[i+1] != emptyRest { // 如果当前删除的cell后面的cell不为空goto notLast}}// 到此表明删除的cell后面一个cell是emptyRest,所以需要往前遍历,把为空的cell置为emptyRestfor { // 从i开始往前找,如果cell为空就置为emptyRestb.tophash[i] = emptyRestif i == 0 {if b == bOrig {break // beginning of initial bucket, we're done.}// Find previous bucket, continue at its last entry.c := bfor b = bOrig; b.overflow(t) != c; b = b.overflow(t) {}i = bucketCnt - 1} else {i--}if b.tophash[i] != emptyOne {break}}notLast: // 正常删除结束h.count-- // 键值对数量减一if h.count == 0 {h.hash0 = fastrand()}break search}}if h.flags&hashWriting == 0 {fatal("concurrent map writes")}h.flags &^= hashWriting
}