在计算机科学中,关联数组 (
Associative Array
),又称映射 (Map
)、字典 (Dictionary
) 是一个抽象的数据结构,它包含着类似于(键,值)的有序对。
从应用的角度来看,Go 语言的 map
数据结构主要有以下常见问题:
本文通过分析标准库中 map
的内部实现,尝试解答上面的问题。
map
的源代码文件路径为 $GOROOT/src/runtime/map.go
,笔者的 Go 版本为 go1.19 linux/amd64
。
const (
bucketCntBits = 3
// 8: 一个 bucket 存放元素的数量上限
bucketCnt = 1 << bucketCntBits
// 触发扩容的负载因子: 6.5
// 官方的解释是这个值是根据测试结果统计出来的
// 负载量超过负载因子时,发生扩容
// 负载量 = 元素数量 / bucket 桶数量
loadFactorNum = 13
loadFactorDen = 2
// 保持内联的键值对大小上限, 超过这个大小,键和值会被转换为指针
// 必须在 uint8 所表示的范围内 [0 - 255]
maxKeySize = 128
maxElemSize = 128
// 数据偏移量根据 bmap 结构体的大小进行对齐
dataOffset = unsafe.Offsetof(struct {
b bmap
v int64
}{}.v)
// tophash 除了放正常的高 8 位的 hash 值
// 还会在空闲、迁移时存储一些自定义的状态值
emptyRest = 0 // 槽未使用,且在较高的索引或溢出处不再有非空单元
emptyOne = 1 // 槽未使用
evacuatedX = 2 // 对应的键值存在,并且已经迁移到扩容后的 map 的前半部分
evacuatedY = 3 // 对应的键值存在,并且已经迁移到扩容后的 map 的后半部分
evacuatedEmpty = 4 // 槽未使用, 桶迁移完成
minTopHash = 5 // 正常填充槽的最小 TopHash 值
iterator = 1 // 可能有一个使用 bucket 的迭代器
oldIterator = 2 // 可能有一个使用 oldbuckets 的迭代器
hashWriting = 4 // 一个等待写入 map 的 goroutine
sameSizeGrow = 8 // 等量扩容: 当前的 map 增长是一个相同大小的新 map
// 用于迭代器检查的哨兵 bucket ID
noCheck = 1<<(8*goarch.PtrSize) - 1
)
我们暂时只需要关注两个常量数值:
bucket
最多可以放 8 个元素map
负载因子上限为 6.5map
负载量的计算方式为:
负载量 = 元素数量 / bucket 桶数量
bmap
对象表示 map
中用于存储数据的 bucket
, 也就是数据桶。
type bmap struct {
// tophash 通常包含此 bucket 中每个 key 的 hash 值高位字节
// 如果 tophash[0] < minTopHash, 那么其表示 bucket 的迁移状态
tophash [bucketCnt]uint8
}
编译器会将 bmap
结构体转换为下面的结构体:
type bmap struct {
tophash [bucketCnt]uint8
keys [bucketCnt]keytype
values [bucketCnt]valuetype
// 溢出 bucket 的指针
overflow uintptr
}
需要注意的是: key + value
并不是配对存储的,而是分别存储在不同的数组中。
把所有的 key
放在一起,然后把所有的 value
放在一起,可以消除为了内存对齐而导致的内存填充 (主要是为了节省内存),如 map[int64]int8
这种较为极端的情况。
配对存储 | 独立存储 | |
---|---|---|
map[int64]int8 | 128 字节 | 72 字节 |
key/value
大小为 16 字节,8 个元素一共 128 字节如果 map
的键和值都不包含指针,并且可以被内联 (<=128 bytes), 那么标记 bucket
桶不含指针,这样可以避免 GC
扫描。
但是 bmap.overflow
字段是一个指针,为了保证溢出的 bucket
桶存活 (不被 GC),在 hmap.extra.overflow
和 hmap.extra.oldoverflow
中存储了指向所有溢出桶的指针。
type mapextra struct {
// overflow 和 oldoverflow 仅在 key 和 elem 都不包含指针时使用
// overflow 包含 hmap.buckets 的溢出桶
// oldoverflow 包含 hmap.oldbuckets 的溢出桶
overflow *[]*bmap
oldoverflow *[]*bmap
// nextOverflow 指向空闲的溢出桶
nextOverflow *bmap
}
hmap
对象表示 map
数据类型的主体结构。
type hmap struct {
// map 中的元素个数
// 必须放在 struct 的第一个位置
// 因为内置的 len 函数会从这里读取 (减少指令,提升性能,和 sync.Once 的 done 字段作用一样)
count int
flags uint8
// bucket 桶数量对应的指数
// bucket 桶数量 = 2^B
// 超过 6.5 * 2^B 个元素,就需要扩容
B uint8
// 溢出 bucket 桶的数量
noverflow uint16
// 哈希种子, 为哈希函数的结果引入随机性
hash0 uint32
// bucket 桶存放数据的数组
// 如果 count 字段等于 0,则为 nil
buckets unsafe.Pointer
// 只有在扩容期间不为 nil
// 扩容期间指向 buckets
oldbuckets unsafe.Pointer
// 迁移进度计数器
// 小于计数器的 bucket 桶表示已经迁移完成
nevacuate uintptr
// 当键和值都可以内联的时,就会使用这个字段
extra *mapextra
}
makemap_small
方法实现了两种 map
创建方式:
map
, 例如 make(map[k]v)map
, 例如 make(map[k]v, hint), 前提是在编译时可以推断出参数 hint
最多为 8 并且分配到堆上func makemap_small() *hmap {
h := new(hmap)
// 生成哈希种子
h.hash0 = fastrand()
return h
}
如果创建的 map
不符合 makemap_small
方法的条件, 会调用 makemap
方法来创建。
// 如果 h != nil, map 可以在 h 中创建
// 如果 h.buckets != nil, 可以复用 h.buckets 数组
func makemap(t *maptype, hint int, h *hmap) *hmap {
if h == nil {
h = new(hmap)
}
// 生成哈希种子
h.hash0 = fastrand()
// 计算需要的 bucket 数量
B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B
// 创建 bucket 桶用于保存数据的数组
// 如果 B == 0, buckets 字段懒加载
// 如果 map 长度较大,那么这部分内存初始化会花较长时间
if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}
mapaccess1
,mapaccess2
,mapaccessK
几个方法差不多,都可以用来访问 map
元素,这里以 mapaccess2
作为示例说明。
mapaccess2
不仅可以返回参数 key 对应的元素,还可以返回第二个参数用来表示 key
对应的元素是否存在。
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {
if h == nil || h.count == 0 {
// map == nil 或元素数量为 0, 直接返回未找到
return unsafe.Pointer(&zeroVal[0]), false
}
if h.flags&hashWriting != 0 {
// 并发读写 map 错误
throw("concurrent map read and map write")
}
// 不同类型的 key,所用的 hash 算法是不一样的
hash := t.hasher(key, uintptr(h.hash0))
// 找到对应的 bucket 桶
m := bucketMask(h.B)
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
// 如果 map 正在扩容
if c := h.oldbuckets; c != nil {
if !h.sameSizeGrow() {
// 如果当前扩容机制是翻倍扩容
// 说明之前的 buckets 只有现在的一半
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
if !evacuated(oldb) {
// 如果旧桶中的数据还未迁移完成
// 说明 key 对应的值可能存在于旧桶中
b = oldb
}
}
// 取高 8 位的值
top := tophash(hash)
bucketloop:
for ; b != nil; b = b.overflow(t) {
// 一个桶在满 8 个元素后,会创建新的桶
// 然后挂在原来的桶的 overflow 指针上
for i := uintptr(0); i < bucketCnt; i++ {
// 循环对比桶中各个元素的哈希值
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
// 如果当前元素后面的元素都是空的,说明没有更多的元素了
// 直接跳转到下一个溢出桶
break bucketloop
}
continue
}
// 如果找到了相等的哈希值,那说明 key 对应的值可能存在于当前桶中
// 根据偏移量取出对应的 key
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.key.equal(key, k) {
// 仅当当前 key 和参数 key 完全相同时
// 才说明对应的元素找到了
return e, true
}
}
}
// 没有找到 key 对应的值,返回零值和 false
return unsafe.Pointer(&zeroVal[0]), false
}
mapassign
方法用于设置 map
的值 ,和 mapaccess
方法不同的地方在于: 如果 key
不存在于 map
中,则为它分配对应的槽,然后设置值。
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil {
// map 未初始化时不能赋值
panic(plainError("assignment to entry in nil map"))
}
if h.flags&hashWriting != 0 {
// 并发写错误
throw("concurrent map writes")
}
// 调用对应类型的 hash 算法
hash := t.hasher(key, uintptr(h.hash0))
// 调用 hash 算法后设置 hashWriting 状态标识
// 因为 t.hasher 可能发生 panic, 此时实际的设置操作还未完成
h.flags ^= hashWriting
// 初始化第一个桶
if h.buckets == nil {
h.buckets = newobject(t.bucket)
}
again:
// 计算低 8 位
bucket := hash & bucketMask(h.B)
if h.growing() {
// 如果 map 当前正在扩容
// 顺便调用一次增量扩容
growWork(t, h, bucket)
}
// 计算出存储的桶地址, 转换为 bucket 对象
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
// 计算高 8 位
top := tophash(hash)
bucketloop:
for {
for i := uintptr(0); i < bucketCnt; i++ {
// 循环对比桶中各个元素的哈希值
if b.tophash[i] != top {
// 在 b.tophash[i] != top 的情况下
// 理论上有可能会是一个空槽位
// 一般情况下 map 的槽位分布是这样的,e 表示 empty:
// [h1][h2][h3][h4][h5][e][e][e]
// 但在执行过 delete 操作时,可能会变成这样:
// [h1][h2][e][e][h5][e][e][e]
// 所以如果再插入的话,会尽量往前面的位置插入
// [h1][h2][e][e][h5][e][e][e]
// ^
// ^
// 这个位置
// 所以在循环的时候还要顺便把前面的空位置先记下来
if isEmpty(b.tophash[i]) && inserti == nil {
// 如果这个槽位没有被占,说明可以往这里放入 key 和 value
...
}
if b.tophash[i] == emptyRest {
// 如果当前元素后面的元素都是空的,说明没有更多的元素了
// 直接跳转到下一个溢出桶
break bucketloop
}
continue
}
// 如果找到了相等的哈希值,那说明 key 对应的值可能存在于当前桶中
// 根据偏移量取出对应的 key
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
// 如果两个 key 的首 8 位、最后 8 位 hash 值一样,会进行比较,和处理 hash 碰撞一样
// 如果相同的哈希位置的 key 和要插入的 key 不相等
// 直接跳过
if !t.key.equal(key, k) {
continue
}
// 如果当前 key 和参数 key 完全相同
// 但是对应的位置已经有值了,直接更新
if t.needkeyupdate() {
typedmemmove(t.key, k, key)
}
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
// 设置完成,直接跳转到 done 条件标签
goto done
}
// 当前桶没有可用的位置,从溢出桶继续查找
ovf := b.overflow(t)
if ovf == nil {
break
}
b = ovf
}
// 没有找到 key, 分配新的内存空间
// 如果触发了负载因子上限或者已经有太多溢出 bucket,并且当前没有在进行扩容,那么开始扩容操作
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
// 扩容的时候会把当前的 bucket 桶放入 oldbucket
// 如果还没有分配新的 bucket 桶, 则跳转到 again 重试一次
// 重试的时候在 growWork 方法里会把这个 key 对应的 bucket 桶提前分配好
goto again
}
if inserti == nil {
// 当前的桶和它挂载的溢出桶都满了,分配一个新的桶
...
}
// 将键值存储到对应的位置
...
// map 元素数量 + 1
h.count++
done:
if h.flags&hashWriting == 0 {
// 并发写错误
throw("concurrent map writes")
}
return elem
}
mapdelete
方法用于删除 map
中指定的 key
, 其内部逻辑和 mapassign
赋值方法大多数都是相同的。
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
// 如果 map 还未初始化或元素数量为 0, 直接返回
if h == nil || h.count == 0 {
return
}
if h.flags&hashWriting != 0 {
// 并发写错误
throw("concurrent map writes")
}
// 调用对应类型的 hash 算法
hash := t.hasher(key, uintptr(h.hash0))
// 调用 hash 算法后设置 hashWriting 状态标识
// 因为 t.hasher 可能发生 panic, 此时实际的删除操作还未完成
h.flags ^= hashWriting
// 计算低 8 位
bucket := hash & bucketMask(h.B)
if h.growing() {
growWork(t, h, bucket)
}
// 计算出存储的桶地址, 转换为 bmap 结构
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
bOrig := b
// 计算高 8 位
top := tophash(hash)
search:
for ; b != nil; b = b.overflow(t) {
// 循环对比桶中各个元素的哈希值
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
// 如果当前元素后面的元素都是空的,说明没有更多的元素了
// 直接跳转到下一个溢出桶
break search
}
continue
}
// 根据偏移量取出对应的 key
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
// 对应的槽标记为未使用
b.tophash[i] = emptyOne
// map 元素数量 + 1
h.count--
}
}
if h.flags&hashWriting == 0 {
// 并发写错误
throw("concurrent map writes")
}
}
makeBucketArray
方法用于初始化 bucket
桶对应的底层数组。
b
表示 bucket 桶的数量对应的指数 (例如传入 3, 表示桶的数量为 2 ^ 3 = 8)dirtyalloc
表示是否复用其他的数组,如果为 nil
会分配一个新数组,否则会清空参数 dirtyalloc
,然后重用这部分内存作为新数组func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
base := bucketShift(b)
nbuckets := base
// 当 bucket 数量小于 2^4 时
// 由于数据较少、使用溢出桶的可能性较低,会省略创建的过程以减少额外开销
// 当 bucket 桶的数量大于等于 2^4 时
// 额外创建 2 ^ (b - 4) 个溢出桶
if b >= 4 {
// 加上溢出桶的数量
nbuckets += bucketShift(b - 4)
sz := t.bucket.size * nbuckets
up := roundupsize(sz)
if up != sz {
nbuckets = up / t.bucket.size
}
}
if dirtyalloc == nil {
// 分配新的数组内存
buckets = newarray(t.bucket, int(nbuckets))
} else {
// 复用参数数组
buckets = dirtyalloc
size := t.bucket.size * nbuckets
if t.bucket.ptrdata != 0 {
memclrHasPointers(buckets, size)
} else {
memclrNoHeapPointers(buckets, size)
}
}
if base != nbuckets {
// 预分配一些溢出桶
nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
last.setoverflow(t, (*bmap)(buckets))
}
return buckets, nextOverflow
}
从应用层面来说,map
在使用前一定要进行初始化,如果对于存储数据的数量有大概的了解,可以在初始化时 预分配
一定的容量,可以有效提高性能。
从内部实现来说,map
通过编译器和运行时实现了常规操作,通过 拉链法
来解决哈希碰撞问题 (每个 bmap
对象都有一个 overflow
指针),
此外,通过将每个 bucket
中元素的前 8 位哈希值存入 tophash
, 以空间换时间,可以加速遍历 bucket
中元素。