1、哈希表
哈希表是一个很常见的数据结构,用来存储无序的 key/value
对,给定的 key
可以在 O(1)
时间复杂度内查找、更新或删除对应的 value
。
设计一个好的哈希表,需要着重关注两个关键点:哈希函数、冲突处理。
1.1 哈希函数
理想情况下,哈希函数既要有优异的性能,又要让结果在映射范围内均匀的分布,使得增删改查的时间复杂度为 𝑂(1)
。
不过实际情况是,key
的数量往往会大于映射的范围,这势必产生哈希冲突,以及更差的读写性能。最坏的结果可能会是所有操作的时间复杂度可能会达到 𝑂(𝑛)
,比如结果都映射到一个 bucket
(桶)上,最后变成线性查找了。
因此好的哈希函数对于哈希表来说至关重要。
1.2 冲突处理
当多个不同的 key
通过哈希函数计算得到同一个结果,这就是哈希冲突(或叫哈希碰撞)。解决的哈希冲突的办法,常见的有开放地址法和拉链法。
1.2.1 开放地址法
开放地址法是一种线性探测的方法,当发生哈希冲突时,它会按照某种规律(如线性探测、二次探测或双散列等)在哈希表中寻找下一个可用的空位来存储冲突的元素。
具体来说,当计算出的哈希值对应的槽位已被占用时,可以按照探测序列依次检查相邻的槽位,直到找到一个空槽位为止。
如上图,底层数组有四个元素,key3
经过计算
index := hash("key3") % len(len)
得到 key1
一样的索引,这时发现索引所在的槽位不为空,那么就向右侧依次探测不为空的槽位,直到找到空槽位那么就把 key/value
写进去。
当需要查找某个 key
对应的 value
时,会从索引的位置开始线性探测数组
- 步骤一:若对比索引所在位置的
key
相等,停止查找,取出value
- 步骤二:若对比索引所在位置的
key
不相等,则依次向右探测- 重复步骤一,找到则取出
value
- 若最终没有找到则返回空
- 重复步骤一,找到则取出
删除 key
和查找的方法同理。
最后,来看看开放地址发的优缺点:
- 优点:
- 不需要额外的存储空间来存储链表节点。
- 缺点:
- 容易产生聚集现象,即连续的槽位被占用,导致查找效率降低。
- 当哈希表越来越满时,查找、插入和删除操作的效率会下降。
1.2.2 拉链法
拉链法是在每个哈希表的槽位上链接一个链表。当发生哈希冲突时,将具有相同哈希值的元素添加到对应槽位的链表中。查找、插入和删除操作都在相应的链表中进行。
拉链法一般会使用数组加上链表,是哈希表最常见的实现方法,像 Golang
、Java
、PHP
等编程言都用拉链法实现哈希表,另外 Redis
中字典也是使用拉链法实现哈希表的,详见拙作 Redis 之字典。
同样的,也来看看拉链法的优缺点:
- 优点:
- 解决了聚集问题,提高了查找、插入和删除操作的效率。
- 动态地分配链表空间,适应于元素数量不确定的场景。
- 缺点:
- 需要额外的存储空间来存储链表节点。
- 如果链表过长,查找、插入和删除操作的效率仍然会受到影响。
1.2.3 装载因子
装载因子 = 元素数量 ÷ 桶数量
哈希表中的装载因子是衡量哈希表使用程度的一个重要参数,它表示哈希表中已存储的元素数量与哈希表总槽位数量之间的比例。装载因子越大,哈希的读写性能就越差。
拿开放地址法来比方,底层数组长度为 8
,目前已写入 6
个,那么装载因子为 6/8
也就是 0.75
,快接近 1
(为1
时说明已经满了),这时再写入会导致碰撞的次数变多,哈希表的性能变得越差。
对比拉链法来说,也是一样。因为哈希表读写操作主要耗时在计算哈希
、定位桶
和遍历链表
这三个过程。
对于装载因子越大,哈希的读写性能就越差
,解决的办法是动态的增大哈希表的长度,当装载因子超过某个阈值时增加哈希表的长度,自动扩容。大致步骤是:
- 创建新的哈希表,容量为原来的两倍
- 遍历原有哈希表的元素,并重新计算索引,写入新的哈希表中
- 遍历过程中,对于写操作只在新的哈希表中进行,查询和删除在新旧两个哈希表中分别进行
- 遍历完成后,释放原来的哈希表
2. Golang 的 map
注意: Golang 版本为 1.19.12
2.1 数据结构
先来看看 map
相关的常量
const (// 一个桶里面最多可以装的键值对的个数,8对。bucketCntBits = 3bucketCnt = 1 << bucketCntBits // 1 << 3 ,2的三次方,也就是 8// 触发扩容操作的最大装载因子的临界值loadFactor = 6.5// 为了保持内联,键 和 值 的最大长度都是128字节,如果超过了128个字节,就存储它的指针maxKeySize = 128maxValueSize = 128// 数据偏移应该是 bmap 的整数倍,但是需要正确的对齐。dataOffset = unsafe.Offsetof(struct {b bmapv int64}{}.v)// tophash 的一些值empty = 0 // 没有键值对evacuatedEmpty = 1 // 没有键值对,并且桶内的键值被迁移走了。evacuatedX = 2 // 键值对有效,并且已经迁移了一个表的前半段evacuatedY = 3 // 键值对有效,并且已经迁移了一个表的后半段minTopHash = 4 // 最小的 tophash// 标记iterator = 1 // 当前桶的迭代子oldIterator = 2 // 旧桶的迭代子hashWriting = 4 // 一个goroutine正在写入mapsameSizeGrow = 8 // 当前字典增长到新字典并且保持相同的大小// 迭代子检查桶ID的哨兵noCheck = 1<<(8*sys.PtrSize) - 1
)
通过上面字段可以得出,Golang
的 map
的每个桶中可以存 8
个 key/value
对。最大装载因子为 6.5
,这个是官方统计出来的最优值,源码中(src/runtime/map.go
)有介绍这个值是如何得来的,这里就不做过多介绍。若初始化有个 4
个桶,那么在存储 4*6.5
也就是在 26
个键值对后,就需要进行扩容了,不然查询效率就会降低,当然了,map
里元素越多,感觉越明显。
type hmap struct {count int // map 的长度flags uint8 // 标识,比如正在写数据B uint8 // log以2为底,桶个数的对数 (总共能存 6.5 * 2^B 个元素)noverflow uint16 // 近似溢出桶的个数,当B小于16时是准确值,大于等于16时是大概的值。hash0 uint32 // 哈希种子buckets unsafe.Pointer // 有 2^B 个桶的数组. count==0 的时候,这个数组为 nil.oldbuckets unsafe.Pointer // 旧的桶数组一半的元素nevacuate uintptr // 扩容增长过程中的计数器extra *mapextra // 可选字段
}type mapextra struct {// 如果 key 和 value 都不包含指针,并且可以被 inline(<=128 字节)// 使用 extra 来存储 overflow bucket,这样可以避免 GC 扫描整个 map// 然而 bmap.overflow 也是个指针。这时候我们只能把这些 overflow 的指针// 都放在 hmap.extra.overflow 和 hmap.extra.oldoverflow 中了// overflow 包含的是 hmap.buckets 的 overflow 的 bucket// oldoverflow 包含扩容时的 hmap.oldbuckets 的 overflow 的 bucketoverflow *[]*bmapoldoverflow *[]*bmap// 指向空闲的 overflow bucket 的指针nextOverflow *bmap
}
整个 hmap
结构占 48
个字节内存
fmt.Println(unsafe.Sizeof(hmap{})) // 48
其中
count
,代表map
当前元素的个数,通过len(m)
获取B
, 是log
以2
为底,桶个数的对数,比如 B 为3
,那么此时map
中bucket
的个数为2^3
那就是8
hash0
,哈希种子,可以增加哈希分布的随机性,从而有效地防止哈希冲突攻击buckets
指向具体bmap
数组数组的指针mapextra
主要存储溢出桶的,通过nextOverflow
进行相连
再来看看具体存放数据桶的数据结构
// bucket 本体
type bmap struct {topbits [8]uint8 // 用于存储每个键的哈希值的高位,用于快速比较和查找keys [8]keytype // 存储键的数组values [8]valuetype// 存储值的数组pad uintptr // 用于内存对齐的填充字段overflow uintptr // 指向溢出桶的指针,如果有的话
}
前面介绍过,桶的元素个数为 8
,所以 bmap
中 前三个字段数组长度都是 8
topbits
主要定位key
具体在哪个桶overflow
指向溢出桶的指针,用于存储溢出元素,形成一个溢出链表。当一个桶中的元素超过8
个时,多余的元素会存储在溢出桶中。keys/values
这里放一起介绍,和其他哈希表实现不同的是,Golang
中分别把key
和value
单独聚合存储,主要是为了节省内存- 比如
map[int64]int8
,key
占8
个字节,而value
仅占1
个字节,内存对齐的要则要16
个字节,但若是分开存储,8
个 value 才占8
个字节,这样就可以起到节省内存的作用 pad
主要作用为了内存对齐进行填充字段用的,比如上面哈希表中只有两个value
,那么pad
就可以填充6
个字节
- 比如
2.2 初始化
// make(map[k]v, hint)
// 如果编译器认为 map 和第一个 bucket 可以直接创建在栈上,h 和 bucket 可能都是非空
// h != nil,可以直接在 h 内创建 map
// 如果 h.buckets != nil,其指向的 bucket 可以作为第一个 bucket 来使用
func makemap(t *maptype, hint int, h *hmap) *hmap {// 计算 hint 个元素所需的内存大小,并检查是否会溢出或超过允许的最大分配内存 maxAllocmem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)// 如果发生溢出或内存大小超过限制,则将 hint 设置为 0if overflow || mem > maxAlloc {hint = 0}// 初始化 hmapif h == nil {h = new(hmap)}h.hash0 = fastrand()// 按照提供的元素个数,计算初始桶的数量B := uint8(0)for overLoadFactor(hint, B) {B++}h.B = B// 分配初始的 hash table// 如果 B == 0,buckets 字段会由 mapassign 来 lazily 分配// 因为如果 hint 很大的话,对这部分内存归零会花比较长时间if h.B != 0 {var nextOverflow *bmap// 初始化 bucket 和 nextOverflow h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)if nextOverflow != nil {h.extra = new(mapextra)h.extra.nextOverflow = nextOverflow}}return h
}
这里先看下 B
的计算逻辑,这里为了更好认清计算逻辑,把相关代码抽取出来,可以直接运行的。
package _0240626import ("fmt""testing"
)const (// Maximum number of key/elem pairs a bucket can hold.bucketCntBits = 3bucketCnt = 1 << bucketCntBits// Maximum average load of a bucket that triggers growth is 6.5.// Represent as loadFactorNum/loadFactorDen, to allow integer math.loadFactorNum = 13loadFactorDen = 2PtrSize = 4 << (^uintptr(0) >> 63)
)func TestB(t *testing.T) {B := uint8(0)hint := 8for overLoadFactor(hint, B) {B++}fmt.Println(B, 1<<B)
}// overLoadFactor reports whether count items placed in 1<<B buckets is over loadFactor.
func overLoadFactor(count int, B uint8) bool {return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}// bucketShift returns 1<<b, optimized for code generation.
func bucketShift(b uint8) uintptr {// Masking the shift amount allows overflow checks to be elided.return uintptr(1) << (b & (PtrSize*8 - 1))
}
还是得强调一点 B
是以 2
为底桶个数的对数,比如 B
为 0
,说只需一个桶。另外还需记住每个桶中存放的 key/value
对的个数是 8
。
好,接下来主要看看 overLoadFactor
判断逻辑
count > bucketCnt
,其中bucketCnt
为上面提到的一个桶存8
个元素uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
,这里主要为了计算提供的桶能否存的下给的元素数量- 装载因子为
6.5
,比如B
为0
,那么桶的个数就是1<<0
就是1
个桶,可以最多存2*6.5
,也就是13
个元素 - 少于
13
元素,就不用累加B
,否则就需要累加B
,并重复计算,直到提供的桶可以存的下count
个元素
- 装载因子为
OK
,确定了桶的数量,接下来就可以初始化桶了
// makeBucketArray 为 map buckets 初始化底层数组
// 1<<b 是需要分配的最小数量 buckets
// dirtyalloc 要么是 nil,要么就是之前由 makeBucketArray 使用同样的 t 和 b 参数
// 分配的数组
// 如果 dirtyalloc 是 nil,那么就会分配一个新数组,否则会清空掉原来的 dirtyalloc
// 然后重用这部分内存作为新的底层数组
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {// base = 1 << b 也就是桶的数量base := bucketShift(b)nbuckets := base// 对于比较小的 b 来说,不太可能有 overflow buckets// 这里省掉一些计算消耗if b >= 4 {// Add on the estimated number of overflow buckets// required to insert the median number of elements// used with this value of b.nbuckets += bucketShift(b - 4)sz := t.bucket.size * nbucketsup := roundupsize(sz)if up != sz {nbuckets = up / t.bucket.size}}if dirtyalloc == nil {buckets = newarray(t.bucket, int(nbuckets))} else {// dirtyalloc 之前就是用上面的 newarray(t.bucket, int(nbuckets))// 生成的,所以是非空buckets = dirtyallocsize := t.bucket.size * nbucketsif t.bucket.kind&kindNoPointers == 0 {memclrHasPointers(buckets, size)} else {memclrNoHeapPointers(buckets, size)}}if base != nbuckets {// 我们预分配了一些 overflow buckets// 为了让追踪这些 overflow buckets 的成本最低// 我们这里约定,如果预分配的 overflow bucket 的 overflow 指针是 nil// 那么通过增加指针可以找到更多可用的溢出桶.// 我们需要一个安全的非空指针来作为 last overflow bucket,直接用 buckets 就行了nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))last.setoverflow(t, (*bmap)(buckets))}return buckets, nextOverflow
}
这里需着重看下 b
的值,以 4
为界线
- 小于
4
,不创建溢出桶 ,比如3
,那么就初始化1<<3
也就是8
个正常桶(hmap
结构中buckets
指向的bmap
数组) - 大于或等于
4
,创建溢出捅(此时正常桶的个数为1<<4
即16
个)- 溢出桶数量规则,比如
b
为4
,在不考虑内存对齐的情况下,1<<(4-4)
也就是1
- 溢出桶和正常桶在内存上是连续的
- 最后一个溢出桶的溢出指针设置为桶数组的起始地址,可以明确地表示这是链表的末端,从而避免无限循环
- 溢出桶数量规则,比如
仔细阅读 makemap
函数的源码实现,发现在有溢出捅的情况下,在初始化 mapextra
后,仅仅用 nextOverflow
指向第一个空闲的溢出桶,但是 overflow
字段却没有提及。
if h.B != 0 {var nextOverflow *bmap// 初始化 bucket 和 nextOverflow h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)if nextOverflow != nil {h.extra = new(mapextra)h.extra.nextOverflow = nextOverflow}
}
这里就稍微向后跳下进度,因为在 mapextra
中 overflow
是在写入 key/value
时生成的。
func (h *hmap) newoverflow(t *maptype, b *bmap) *bmap {var ovf *bmapif h.extra != nil && h.extra.nextOverflow != nil {// We have preallocated overflow buckets available.// See makeBucketArray for more details.ovf = h.extra.nextOverflowif ovf.overflow(t) == nil {// We're not at the end of the preallocated overflow buckets. Bump the pointer.h.extra.nextOverflow = (*bmap)(add(unsafe.Pointer(ovf), uintptr(t.bucketsize)))} else {// This is the last preallocated overflow bucket.// Reset the overflow pointer on this bucket,// which was set to a non-nil sentinel value.ovf.setoverflow(t, nil)h.extra.nextOverflow = nil}} else {ovf = (*bmap)(newobject(t.bucket))}h.incrnoverflow()if t.bucket.ptrdata == 0 {h.createOverflow()*h.extra.overflow = append(*h.extra.overflow, ovf)}b.setoverflow(t, ovf)return ovf
}
也就是在 makemap
初始化的时候,若有溢出捅,正常桶和溢出桶是一块分配的,同时nextOverflow
指向第一个溢出捅,同时把最后一个溢出桶的 overflow
指针指向第一个正常桶。接着在写入的时候,会在正常桶都满的情况下,会去往溢出桶里写:
- 若有空闲的溢出捅(
h.extra.nextOverflow
)- 那么获取此空闲的溢出桶
ovf = h.extra.nextOverflow
- 同时还需判断此溢出捅的
overflow
是否为nil
- 为
nil
, 说明还存在空闲的溢出捅,那么h.extra.nextOverflow
指向此空闲的溢出捅 - 不为
nil
,说明是最后一个溢出桶(上面提到过在分配溢出捅的时候会把最后一个溢出桶的oveflow
指向第一个正常桶),同时把当前溢出桶的overflow
和h.extra.nextOverflow
都置为nil
- 为
- 那么获取此空闲的溢出桶
- 若没有空闲的溢出捅
- 那么就生成一个新的桶
ovf = (*bmap)(newobject(t.bucket))
- 那么就生成一个新的桶
- 更新溢出桶的数量(当
B
小于16
时是准确值,大于等于16
时是大概的值) - 若
map
中的key/value
类型都不是指针(map[int64]int8
),那么就会把溢出捅的指针放入extra
的overflow
切片数组中,这么做主要是方便GC
,提升性能- 当对一个长度
100万
个key/value
对的map
中进行GC
时,若按照传统方法,会依次扫描bmap
中的overflow
,这会相当耗时 - 而一旦对这些
overflow
进行管理,那么当GC
扫描时,只需把extra.overflow
置黑即可,这可是相当大的性能提升(有兴趣的读者可以写个map[int64]int8
和map[string]int8
测试用例,先往里写入100万
个key/value
对,手动开启GC
,然后统计下耗时)
- 当对一个长度
- 最后把待写入的桶的
overflow
指针指向ovf
这个溢出捅
ok,接下来看看 hmap
在不同类型下的内存布局吧
2.2.1 没有分配预分配溢出桶
当
B
小于4
,是不会预分配溢出捅
2.2.2 有分配预分配溢出桶
当
B
大于或等于4
,则会预分配溢出捅
2.3 写入
// t 是 map 的类型信息,h 是 map 的实际数据结构,key 是要插入或更新的键。
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {if h == nil {// 如果 map 是 nil,则抛出错误。panic(plainError("assignment to entry in nil map"))}if raceenabled {// 如果启用了竞态检测,获取调用者的程序计数器 (PC)。callerpc := getcallerpc()pc := abi.FuncPCABIInternal(mapassign)// 检测写操作是否会引起竞态。racewritepc(unsafe.Pointer(h), callerpc, pc)// 检测对 key 的读操作。raceReadObjectPC(t.key, key, callerpc, pc)}if msanenabled {// MemorySanitizer 检测对 key 的读取。msanread(key, t.key.size)}if asanenabled {// AddressSanitizer 检测对 key 的读取。asanread(key, t.key.size)}if h.flags&hashWriting != 0 {// 如果 hashWriting 标志已经被设置,说明有并发写操作,抛出错误。fatal("concurrent map writes")}// 计算键的哈希值。hash := t.hasher(key, uintptr(h.hash0))// 在调用 t.hasher 之后设置 hashWriting 标志,// 因为 t.hasher 可能会 panic,如果发生 panic,我们并没有进行写操作。h.flags ^= hashWritingif h.buckets == nil {// 如果 buckets 是 nil,则初始化一个新的 bucket。h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)}again:// 计算哈希值对应的 bucket 索引。bucket := hash & bucketMask(h.B)if h.growing() {// 如果 map 正在扩容,则执行扩容任务。growWork(t, h, bucket)}b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))top := tophash(hash)var inserti *uint8 // 指向要插入位置的 tophashvar insertk unsafe.Pointer // 指向要插入的 key 位置var elem unsafe.Pointer // 指向要插入的元素位置bucketloop:for {for i := uintptr(0); i < bucketCnt; i++ {// 检查每个槽位的 tophash。if b.tophash[i] != top {// 如果槽位为空,记录第一个空槽的位置。if isEmpty(b.tophash[i]) && inserti == nil {inserti = &b.tophash[i]insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))}// 如果遇到 emptyRest,说明后续的槽位都是空的,退出循环。if b.tophash[i] == emptyRest {break bucketloop}continue}// 计算当前槽位的 key 的地址。k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))if t.indirectkey() {// 如果是间接存储,获取实际的 key 地址。k = *((*unsafe.Pointer)(k))}// 比较键是否相等。if !t.key.equal(key, k) {continue}// 找到已存在的键,更新值。if t.needkeyupdate() {// 如果需要更新键,则复制新的键到当前位置。typedmemmove(t.key, k, key)}// 获取元素的地址。elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))goto done}// 检查溢出桶。ovf := b.overflow(t)if ovf == nil {break}b = ovf}// 没有找到键,分配新的插槽并添加条目。// 如果负载因子过高或溢出桶过多,并且不在扩容中,开始扩容。if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {hashGrow(t, h)goto again // 扩容会使所有内容失效,因此需要重试。}if inserti == nil {// 当前桶及所有溢出桶已满,分配一个新的溢出桶。newb := h.newoverflow(t, b)inserti = &newb.tophash[0]insertk = add(unsafe.Pointer(newb), dataOffset)elem = add(insertk, bucketCnt*uintptr(t.keysize))}// 在插入位置存储新键和值。if t.indirectkey() {// 如果键是间接存储,分配新内存并存储指针。kmem := newobject(t.key)*(*unsafe.Pointer)(insertk) = kmeminsertk = kmem}if t.indirectelem() {// 如果值是间接存储,分配新内存并存储指针。vmem := newobject(t.elem)*(*unsafe.Pointer)(elem) = vmem}// 复制键到插入位置。typedmemmove(t.key, insertk, key)// 设置插入位置的 tophash。*inserti = top// 更新元素计数。h.count++done:if h.flags&hashWriting == 0 {// 确保 hashWriting 标志已设置。fatal("concurrent map writes")}// 清除 hashWriting 标志。h.flags &^= hashWritingif t.indirectelem() {// 如果值是间接存储,返回实际的值指针。elem = *((*unsafe.Pointer)(elem))}return elem
}
阅读完 map
写入的源码后,终于知道 concurrent map writes
这个报错的出处了。因为 map
是非线程安全的,也就是一个 map
是不能同时有写入、读取的。
if h.flags&hashWriting != 0 {// 如果 hashWriting 标志已经被设置,说明有并发写操作,抛出错误。fatal("concurrent map writes")
}
同时,也补充了在 makemap
初始化中,B
为 0
也就是 1<<0
即 1
个桶时的生成之处了。
if h.buckets == nil {h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}
接下来着重介绍下如何定位桶以及对应得 key
的写入位置。
hash := t.hasher(key, uintptr(h.hash0))
bucket := hash & bucketMask(h.B)
if h.growing() {growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
top := tophash(hash)
- 先计算
key
对应的哈希值,这里会把初始化的hash0
加进去,使其更加分布均匀 - 定位具体的桶,也就是比较低
1<<B - 1
位- 比如
B
为3
,也就是1<<3
即8
个桶 - 那么
hash & (1<<3 -1 )
也就是获取hash
低七位是哪个值,就能定位到哪个桶了
- 比如
- 定位具体的
key
- 先求
hash
的高八位的值 - 再遍历对应
tophash
数组的第几个位置,进而知道key
位于哪个槽了(若槽中有值,直接更新,否则就写入)- 若是
8
个tophash
都没有,说明满了,那么就遍历overflow
- 如果当前桶及所有溢出桶已满,那么就分配一个新的溢出桶
- 若是
- 先求
在写入的时候,还得考虑到是否扩容
// 如果负载因子过高或溢出桶过多,并且不在扩容中,开始扩容。
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {hashGrow(t, h)goto again // 扩容会使所有内容失效,因此需要重试。
}
- 溢出桶的数量超过桶数量
- 装载因子超过
6.5
具体的扩容过程,留在本篇博文的最后来讲。
2.4 查询
// mapaccess2 用于在 map 中查找键,返回值和是否找到的布尔值。
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {// 如果启用了竞态检测且 map 非空,获取调用者 PC 和当前函数 PC。if raceenabled && h != nil {callerpc := getcallerpc()pc := abi.FuncPCABIInternal(mapaccess2)// 检测 map 的读操作是否会引起竞态。racereadpc(unsafe.Pointer(h), callerpc, pc)raceReadObjectPC(t.key, key, callerpc, pc)}// 如果启用了 MemorySanitizer 且 map 非空,读取键的内存。if msanenabled && h != nil {msanread(key, t.key.size)}// 如果启用了 AddressSanitizer 且 map 非空,读取键的内存。if asanenabled && h != nil {asanread(key, t.key.size)}// 如果 map 是 nil 或者元素个数为 0。if h == nil || h.count == 0 {// 如果哈希函数可能 panic,则计算哈希以检查。if t.hashMightPanic() {t.hasher(key, 0) // 见 issue 23734}// 返回零值指针和 false,表示未找到。return unsafe.Pointer(&zeroVal[0]), false}// 检查是否有并发读写。if h.flags&hashWriting != 0 {fatal("concurrent map read and map write")}// 计算键的哈希值。hash := t.hasher(key, uintptr(h.hash0))m := bucketMask(h.B)// 找到对应的 bucket。b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))if c := h.oldbuckets; c != nil {// 如果存在旧的 buckets,检查是否需要缩小掩码。if !h.sameSizeGrow() {m >>= 1}oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))// 如果旧的 bucket 未被迁移,则使用旧的。if !evacuated(oldb) {b = oldb}}top := tophash(hash)
bucketloop:// 遍历 bucket 和溢出桶。for ; b != nil; b = b.overflow(t) {for i := uintptr(0); i < bucketCnt; i++ {// 检查 tophash 是否匹配。if b.tophash[i] != top {// 如果遇到 emptyRest,说明后续无有效数据,退出循环。if b.tophash[i] == emptyRest {break bucketloop}continue}// 获取当前键的地址。k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))if t.indirectkey() {k = *((*unsafe.Pointer)(k))}// 比较键是否相等。if t.key.equal(key, k) {// 获取元素的地址。e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))if t.indirectelem() {e = *((*unsafe.Pointer)(e))}// 返回元素指针和 true。return e, true}}}// 未找到键,返回零值指针和 false。return unsafe.Pointer(&zeroVal[0]), false
}
发现和写入的逻辑差不多
- 计算
key
的哈希值hash
- 依据
hash
低B
位定位所在桶 - 依据
hash
高8
位定位在桶中的存储位置 - 若当前桶未找到则查找对应的溢出捅
- 若对应位置有数据,则对比此位置上
key
,以确定是否是要查找的数据 - 若
map
处于扩容阶段,那么优先从oldbuckets
查找 - 若未找到键,则返回零值指针和
false
。
等等,查询的时候还有个知识点没介绍到,那就是遍历 map
无序性,也就是说遍历 map
时顺序打印出来的 key/value
是和写入的顺序不一致的,而且每次打印的都不一样,顺序是随机的
func TestRandMap(t *testing.T) {m := map[int]int{0: 0,1: 1,2: 2,3: 3,4: 4,5: 5,6: 6,}for i := 0; i < len(m); i++ {for k, v := range m {fmt.Printf("key=%d,value=%d ", k, v)}fmt.Println()}
}
输出如下
key=5,value=5 key=6,value=6 key=0,value=0 key=1,value=1 key=2,value=2 key=3,value=3 key=4,value=4
key=3,value=3 key=4,value=4 key=5,value=5 key=6,value=6 key=0,value=0 key=1,value=1 key=2,value=2
key=0,value=0 key=1,value=1 key=2,value=2 key=3,value=3 key=4,value=4 key=5,value=5 key=6,value=6
key=0,value=0 key=1,value=1 key=2,value=2 key=3,value=3 key=4,value=4 key=5,value=5 key=6,value=6
key=0,value=0 key=1,value=1 key=2,value=2 key=3,value=3 key=4,value=4 key=5,value=5 key=6,value=6
key=6,value=6 key=0,value=0 key=1,value=1 key=2,value=2 key=3,value=3 key=4,value=4 key=5,value=5
那 Golang
是如何实现的呢?
type hiter struct {key unsafe.Pointer // 当前键的指针,必须在第一个位置。写入 nil 表示迭代结束。elem unsafe.Pointer // 当前值的指针,必须在第二个位置。t *maptype // `map` 类型的描述信息。h *hmap // `map` 的内部结构指针。buckets unsafe.Pointer // 迭代器初始化时的 bucket 指针。bptr *bmap // 当前 bucket 的指针。overflow *[]*bmap // 用于保持 hmap.buckets 的溢出桶存活。oldoverflow *[]*bmap // 用于保持 hmap.oldbuckets 的溢出桶存活。startBucket uintptr // 迭代开始时的桶索引。offset uint8 // 桶内的起始偏移量(随机化)。wrapped bool // 是否已经从桶数组的末尾绕回到开头。B uint8 // 当前 `map` 的 bucket 数量的对数(`B` 表示 `2^B` 个桶)。i uint8 // 当前桶内的偏移量(键值对索引)。bucket uintptr // 当前正在迭代的桶索引。checkBucket uintptr // 用于检查的桶索引。
}
每次在遍历 map
时,会维护一个迭代器 hiter
,着重关注下 startBucket
和 offset
这两个字段
- 遍历
map
时从哪个桶开始遍历 - 遍历桶时,从哪个位置开始遍历
接下来在看看如何初始化这个字段
func mapiterinit(t *maptype, h *hmap, it *hiter) {// 如果启用了数据竞争检测,并且哈希表不为空if raceenabled && h != nil {// 获取调用者的程序计数器callerpc := getcallerpc()// 记录读取操作racereadpc(unsafe.Pointer(h), callerpc, abi.FuncPCABIInternal(mapiterinit))}// 设置迭代器的类型信息it.t = t// 如果哈希表为空或计数为零,则直接返回if h == nil || h.count == 0 {return}// 验证迭代器的大小是否正确if unsafe.Sizeof(hiter{})/goarch.PtrSize != 12 {throw("hash_iter size incorrect") // 参见 cmd/compile/internal/reflectdata/reflect.go}// 设置迭代器的哈希表指针it.h = h// 获取当前哈希表的桶状态it.B = h.Bit.buckets = h.bucketsif t.bucket.ptrdata == 0 {// 如果桶没有指针数据,则分配当前溢出桶数组并记住当前和旧的指针// 这确保即使在迭代过程中哈希表扩展和/或增加溢出桶时,// 也能保留所有相关的溢出桶。h.createOverflow()it.overflow = h.extra.overflowit.oldoverflow = h.extra.oldoverflow}// 决定从哪里开始迭代var r uintptrif h.B > 31-bucketCntBits {// 如果桶的数量大于31减去桶数位数,则使用64位随机数r = uintptr(fastrand64())} else {// 否则使用32位随机数r = uintptr(fastrand())}// 随机选择一个起始桶it.startBucket = r & bucketMask(h.B)// 随机选择桶内的起始偏移量it.offset = uint8(r >> h.B & (bucketCnt - 1))// 初始化迭代器状态it.bucket = it.startBucket// 标记有一个迭代器存在// 可以与另一个 mapiterinit() 并发运行。if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {// 原子操作设置标志位,表明存在迭代器atomic.Or8(&h.flags, iterator|oldIterator)}// 预先执行一次迭代,初始化迭代器的 key 和 elem 指针mapiternext(it)
}
重点是这处
// 随机选择一个起始桶
it.startBucket = r & bucketMask(h.B)
// 随机选择桶内的起始偏移量
it.offset = uint8(r >> h.B & (bucketCnt - 1))
也就是每次遍历的时候,起始桶的选择是随机的,同时桶内的起始偏移量也是随机的,这样就实现了无序性。
2.5 删除
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {// 如果启用了数据竞争检测,并且哈希表不为空if raceenabled && h != nil {// 获取调用者的程序计数器callerpc := getcallerpc()pc := abi.FuncPCABIInternal(mapdelete)// 记录写操作racewritepc(unsafe.Pointer(h), callerpc, pc)// 记录对 key 的读操作raceReadObjectPC(t.key, key, callerpc, pc)}// 如果启用了 MemorySanitizer(MSAN)(MSAN)检测,并且哈希表不为空if msanenabled && h != nil {// 记录对 key 的读操作msanread(key, t.key.size)}// 如果启用了地址清毒(ASAN)检测,并且哈希表不为空if asanenabled && h != nil {// 记录对 key 的读操作asanread(key, t.key.size)}// 如果哈希表为空或计数为零,则直接返回if h == nil || h.count == 0 {if t.hashMightPanic() {// 计算哈希值,处理可能的 panict.hasher(key, 0) // 参见 issue 23734}return}// 如果哈希表标志位中有 hashWriting,则表示并发写操作,抛出异常if h.flags&hashWriting != 0 {fatal("concurrent map writes")}// 计算 key 的哈希值hash := t.hasher(key, uintptr(h.hash0))// 设置 hashWriting 标志位,在调用 t.hasher 之后设置,因为 t.hasher 可能会 panich.flags ^= hashWriting// 计算 key 对应的桶索引bucket := hash & bucketMask(h.B)// 如果正在扩展哈希表,执行扩展操作if h.growing() {growWork(t, h, bucket)}// 获取对应桶的指针b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))bOrig := b// 计算 tophash 值top := tophash(hash)
search:// 遍历桶及其溢出桶for ; b != nil; b = b.overflow(t) {for i := uintptr(0); i < bucketCnt; i++ {// 如果 tophash 不匹配,继续下一个if b.tophash[i] != top {if b.tophash[i] == emptyRest {break search}continue}// 获取 key 的指针k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))k2 := kif t.indirectkey() {k2 = *((*unsafe.Pointer)(k2))}// 如果 key 不匹配,继续下一个if !t.key.equal(key, k2) {continue}// 清空 key 的指针if t.indirectkey() {*(*unsafe.Pointer)(k) = nil} else if t.key.ptrdata != 0 {memclrHasPointers(k, t.key.size)}// 清空 elem 的指针e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))if t.indirectelem() {*(*unsafe.Pointer)(e) = nil} else if t.elem.ptrdata != 0 {memclrHasPointers(e, t.elem.size)} else {memclrNoHeapPointers(e, t.elem.size)}// 标记为已删除b.tophash[i] = emptyOne// 如果桶末尾连续出现 emptyOne 状态,将其改为 emptyRest 状态if i == bucketCnt-1 {if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {goto notLast}} else {if b.tophash[i+1] != emptyRest {goto notLast}}for {b.tophash[i] = emptyRestif i == 0 {if b == bOrig {break // 初始桶的开始,迭代结束}// 找到前一个桶,从其最后一个条目继续c := bfor b = bOrig; b.overflow(t) != c; b = b.overflow(t) {}i = bucketCnt - 1} else {i--}if b.tophash[i] != emptyOne {break}}notLast:// 减少哈希表计数h.count--// 如果哈希表为空,重置哈希种子if h.count == 0 {h.hash0 = fastrand()}break search}}// 检查 hashWriting 标志位,确保其被清除if h.flags&hashWriting == 0 {fatal("concurrent map writes")}// 清除 hashWriting 标志位h.flags &^= hashWriting
}
删除 key
的操作,在定位桶和 key
的流程上和写入及查询差不多。
找到对应的 key
以后,如果此位置上存储的是指针,那么就把指针置为 nil。如果是值就清空它所在的内存。还要清理 tophash
里面的值最后把 map
的 key
总个数计数器 count
减1 。
若是处在扩容过程中,那么删除操作会在扩容以后在新的 bmap
里面删除。
2.6 扩容
前面在 2.3
写入章节里介绍过,扩容满足以下两个条件的任意一个即可
- 溢出桶的数量超过桶数量
- 装载因子超过
6.5
针对这两种情况,实现的扩容机制是不一样的
2.6.1 等量扩容
若是溢出桶的数量超过桶数量,那么就会创建新桶保存数据,垃圾回收会清理老的溢出桶并释放内存。
什么情况会触发等量扩容(sameSizeGrow
)呢?
如果在写入后又删除了大量的数据,这样始终不满足条件 2
,且在之后的写入过程中,由于之前的桶已满,那么只能新增溢出桶并写入。
2.6.2 增量扩容
触发扩容的时机是在写入的时候
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {hashGrow(t, h)goto again // Growing the table invalidates everything, so try again
}
也就是当前 hmap
没有在扩容,并且满足上面条件任意一个,接下来看看 hasGrow
逻辑
func hashGrow(t *maptype, h *hmap) {// 如果哈希表的负载因子已经达到临界值,则增加桶的数量。// 否则,如果有太多的溢出桶,则保持相同数量的桶,并等量扩展。bigger := uint8(1)if !overLoadFactor(h.count+1, h.B) {// 如果哈希表没有超过负载因子,只是有太多的溢出桶,则不增加桶的数量。bigger = 0h.flags |= sameSizeGrow // 设置同大小增长标志}oldbuckets := h.buckets // 保存当前的桶数组newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) // 创建新的桶数组// 设置新的哈希表标志flags := h.flags &^ (iterator | oldIterator) // 清除迭代器标志if h.flags&iterator != 0 {flags |= oldIterator // 如果当前存在迭代器,则设置旧迭代器标志}// 提交扩展(与GC原子操作)h.B += bigger // 增加桶数量的指数h.flags = flags // 更新标志h.oldbuckets = oldbuckets // 保存旧的桶数组h.buckets = newbuckets // 设置新的桶数组h.nevacuate = 0 // 重置迁移计数h.noverflow = 0 // 重置溢出桶计数// 如果有溢出桶,将当前溢出桶提升到旧桶中if h.extra != nil && h.extra.overflow != nil {if h.extra.oldoverflow != nil {throw("oldoverflow is not nil") // 如果旧溢出桶不为空,抛出异常}h.extra.oldoverflow = h.extra.overflow // 提升当前溢出桶到旧溢出桶h.extra.overflow = nil // 清空当前溢出桶}// 如果有下一个溢出桶,设置到额外信息中if nextOverflow != nil {if h.extra == nil {h.extra = new(mapextra) // 如果额外信息为空,则创建一个新的}h.extra.nextOverflow = nextOverflow // 设置下一个溢出桶}// 实际的数据复制由 growWork() 和 evacuate() 逐步完成
}
函数的开头也交代了何时增量扩容、何时等量扩容
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {// 如果哈希表没有超过负载因子,只是有太多的溢出桶,则不增加桶的数量。bigger = 0h.flags |= sameSizeGrow // 设置同大小增长标志
}
增量扩容是指新的正常桶比原来扩大一倍
bigger := uint8(1)
h.B += bigger // 增加桶数量的指数
比如原来 B
为 3
共计 8
个桶,那么增量扩容后正常桶的数量为 1<<(3+1)
即 16
。
同时,h.oldbuckets
保存旧的 h.buckets
桶数组,h.buckets
保存新的桶数组
oldbuckets := h.buckets // 保存当前的桶数组
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) // 创建新的桶数组
h.oldbuckets = oldbuckets // 保存旧的桶数组
h.buckets = newbuckets // 设置新的桶数组
在此函数的最后,也阐明了实际的数据迁移是由 growWork()
和 evacuate()
逐步完成。
2.6.3 扩容机制
接下来看看 growWork()
。
在前面介绍写入
和删除
的时候,都会检查当前是否在扩容
if h.growing() {// 如果 map 正在扩容,则执行扩容任务。growWork(t, h, bucket)
}// growing reports whether h is growing. The growth may be to the same size or bigger.
func (h *hmap) growing() bool {return h.oldbuckets != nil
}
也就是通过 h.oldbuckets
是否不为 nil
来判断当前 map
是否在扩容,若 map
正在扩容,则执行扩容。
func growWork(t *maptype, h *hmap, bucket uintptr) {// 确保我们移动的 oldbucket 对应的是我们马上就要用到的那一个evacuate(t, h, bucket&h.oldbucketmask())// 如果还在 growing 状态,再多移动一个 oldbucketif h.growing() {evacuate(t, h, h.nevacuate)}
}
这里可以看到每次迁移两个桶,一个是当前的桶,另外一个是 hmap
里指向的 nevacuate
的桶,这是增量迁移。这个和 Redis
的 rehash
惰性迁移一致,只是 Golang
中在写入和删除的时候迁移。这样就既不影响写入、删除,又进行了扩容,也算是一种取舍吧。
真正的迁移是在 evacuate()
函数中
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {// 获取旧桶的指针b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))// 计算新桶的数量newbit := h.noldbuckets()// 如果旧桶还没有被迁移if !evacuated(b)) {// TODO: 如果没有迭代器使用旧桶,可以复用溢出桶而不是使用新桶。// xy 包含 x 和 y(低位和高位)的迁移目标。var xy [2]evacDstx := &xy[0]x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))x.k = add(unsafe.Pointer(x.b), dataOffset)x.e = add(x.k, bucketCnt*uintptr(t.keysize))if !h.sameSizeGrow() {// 仅在增长时计算 y 指针。// 否则 GC 可能会看到错误的指针。y := &xy[1]y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))y.k = add(unsafe.Pointer(y.b), dataOffset)y.e = add(y.k, bucketCnt*uintptr(t.keysize))}// 处理旧桶中的每个溢出桶for ; b != nil; b = b.overflow(t) {k := add(unsafe.Pointer(b), dataOffset)e := add(k, bucketCnt*uintptr(t.keysize))for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {top := b.tophash[i]if isEmpty(top) {b.tophash[i] = evacuatedEmptycontinue}if top < minTopHash {throw("bad map state")}k2 := kif t.indirectkey() {k2 = *((*unsafe.Pointer)(k2))}var useY uint8if !h.sameSizeGrow() {// 计算哈希以决定是将此键/元素发送到桶 x 还是桶 y。hash := t.hasher(k2, uintptr(h.hash0))if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {// 如果 key != key (NaNs),那么哈希可能会完全不同于旧哈希。// 而且,它不是可重现的。在存在迭代器的情况下,重现性是必须的,// 因为我们的迁移决策必须与迭代器做出的决策相匹配。// 幸运的是,我们可以自由地将这些键任意发送。// 我们让 tophash 的低位驱动迁移决策。// 我们为下一级重新计算一个新的随机 tophash,// 以便这些键在多次扩展后均匀分布在所有桶中。useY = top & 1top = tophash(hash)} else {if hash&newbit != 0 {useY = 1}}}if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {throw("bad evacuatedN")}b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedYdst := &xy[useY] // 迁移目标if dst.i == bucketCnt {dst.b = h.newoverflow(t, dst.b)dst.i = 0dst.k = add(unsafe.Pointer(dst.b), dataOffset)dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))}dst.b.tophash[dst.i&(bucketCnt-1)] = top // 掩码 dst.i 作为一种优化,以避免边界检查if t.indirectkey() {*(*unsafe.Pointer)(dst.k) = k2 // 复制指针} else {typedmemmove(t.key, dst.k, k) // 复制元素}if t.indirectelem() {*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)} else {typedmemmove(t.elem, dst.e, e)}dst.i++// 这些更新可能会将这些指针越过键或元素数组的末尾。// 不过我们在桶的末尾有溢出指针来防止指向桶的末尾之外。dst.k = add(dst.k, uintptr(t.keysize))dst.e = add(dst.e, uintptr(t.elemsize))}}// 取消链接溢出桶并清除键/元素以帮助 GC。if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))// 保留 b.tophash,因为迁移状态在此处维护。ptr := add(b, dataOffset)n := uintptr(t.bucketsize) - dataOffsetmemclrHasPointers(ptr, n)}}// 更新迁移标记if oldbucket == h.nevacuate {advanceEvacuationMark(h, t, newbit)}
}
这里需着重关注下这个 evacDst
数据结构
// evacDst 结构体表示在调整大小过程中,数据迁移的目的地
// 它包含了当前目标桶、索引以及指向键/元素存储的指针信息
type evacDst struct {b *bmap // 当前目标桶(指向 bmap 结构的指针)i int // 键/元素在 b 中的索引(在桶中存储键/元素的位置)k unsafe.Pointer // 指向当前键存储的指针(指向键内存位置的原始指针)e unsafe.Pointer // 指向当前元素存储的指针(指向元素内存位置的原始指针)
}
这个 evacDst
的主要作用是 rehash
和数据迁移。
// xy 包含 x 和 y(低位和高位)的迁移目标。
var xy [2]evacDst
x := &xy[0]
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.e = add(x.k, bucketCnt*uintptr(t.keysize))if !h.sameSizeGrow() {// 仅在增长时计算 y 指针。// 否则 GC 可能会看到错误的指针。y := &xy[1]y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))y.k = add(unsafe.Pointer(y.b), dataOffset)y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}
这里的 x
、y
分别代表扩容后新桶的前半段与后半段,若是等量扩容的话,那么只有 x
。
因为在等量扩容下,旧桶与新桶之间是一对一的关系,因此这里不过做就做过多介绍。
这里说下增量扩容,需要重新计算 key
的 hash
哈希值,然后确认在低位桶还是在高位桶。
newbit := h.noldbuckets()
hash := t.hasher(k2, uintptr(h.hash0))
if hash&newbit != 0 {useY = 1
}
这里我把相关代码给摘了出来,先计算 newbit
,也就是水位值。为何这么说呢,看看这个值是如何计算的。
func (h *hmap) noldbuckets() uintptr {oldB := h.Bif !h.sameSizeGrow() {oldB--}return bucketShift(oldB)
}// bucketShift returns 1<<b, optimized for code generation.
func bucketShift(b uint8) uintptr {return uintptr(1) << (b & (goarch.PtrSize*8 - 1))
}
比如增量扩容后 B
为 4
,newbit
就是 1<<(4-1)
也就是 8
,二进制为 1000
。我们知道二进制与 &
操作是相同位的值都为 1
那么结果为 1
,否则为 0
,而 Golang
是采用位运算来获取桶的编号的。
先获取 key
的哈希值,然后拿低 B
位再和 newbit
做与运算,一旦结果为 0
说明桶是小于水位值的,也就是低位桶,否则就是高位桶。
举个示例:旧桶 B
为 2
,也就是 4
个桶,其中在第二个桶中有 2
个 key
的哈希值低 3
位分别为 010
、110
。增量扩容后, B
变成了 3
,所以 010
、110
便分别落入 2
和 6
号桶。不过还得告诉程序扩容后落在地位桶 x
还是高位桶 y
。这里把上面的算法套用一下就知道了,010&100
结果为 0
落入低位桶 x
,110&100
结果为 1
落入高位桶 y
。
b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
dst := &xy[useY] // 迁移目标
useY
的作用就是这里了,目标桶依据 useY
是落入的地位桶还是高位桶。之前介绍过 xy
数组的第一位为低位桶(新桶的前半段),第二位为高位桶(新桶的后半段)。
const(evacuatedX = 2 // key/elem is valid. Entry has been evacuated to first half of larger table.evacuatedY = 3 // same as above, but evacuated to second half of larger table.
)
这里就是标记旧桶中 b.tophash[i]
已被迁移,至于迁移到低位桶还是高位桶就依据 evacuatedX + useY
来确定了
- evacuatedX,低位桶,也就是新桶的前半段
- evacuatedY,高位桶,也就是新桶的后半段,当
userY
为1
时,evacuatedX+userY
就是高位桶
if dst.i == bucketCnt {dst.b = h.newoverflow(t, dst.b)dst.i = 0dst.k = add(unsafe.Pointer(dst.b), dataOffset)dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}
如果迁移的桶都满了,那么新建个溢出捅,重置索引,并把 dst.k
和 dst.e
指向新的键和值存储位置。
dst.b.tophash[dst.i&(bucketCnt-1)] = top // 掩码 dst.i 作为一种优化,以避免边界检查if t.indirectkey() {*(*unsafe.Pointer)(dst.k) = k2 // 复制指针} else {typedmemmove(t.key, dst.k, k) // 复制元素}if t.indirectelem() {*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)} else {typedmemmove(t.elem, dst.e, e)}dst.i++dst.k = add(dst.k, uintptr(t.keysize))dst.e = add(dst.e, uintptr(t.elemsize))
前面准备工作都做好了,这里就是迁移数据了。
if oldbucket == h.nevacuate {advanceEvacuationMark(h, t, newbit)
}
这里主要是更新哈希表的迁移进度标记 (nevacuate
),以便逐步完成哈希表的增长 rehash
过程。
前面在介绍 growWork()
函数提到过,扩容时每次迁移两个值,一个是当前操作的值,另一个就是按照迁移进度正常迁移的值。这里就是对比下,是否刚好这两个值是一样的,是的话则更新下进度标记。
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {// 将当前迁移标记推进一个桶h.nevacuate++// 实验表明1024是一个过大的值,但作为保障以确保O(1)行为stop := h.nevacuate + 1024// 确保停止标记不超过新桶数量if stop > newbit {stop = newbit}// 遍历并推进迁移标记,直到到达停止标记或遇到尚未迁移的桶for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {h.nevacuate++}// 如果迁移标记到达了新桶数量,表示所有旧桶都已被迁移if h.nevacuate == newbit { // newbit 表示旧桶的数量// 迁移完成,释放旧的主桶数组h.oldbuckets = nil// 同时可以丢弃旧的溢出桶,如果它们没有被迭代器引用if h.extra != nil {h.extra.oldoverflow = nil}// 清除 sameSizeGrow 标志,表示增长过程已完成h.flags &^= sameSizeGrow}
}
这个函数的作用是增加哈希的 nevacuate
计数器,并在所有的旧桶都被迁移后清空 bmap
的 oldbuckets
和 oldoverflow
。