目录
本文以go1.12.5版本分析,map相关的源码在runtime包的map开头的几个文件中,主要为map.go。
go的map底层实现方式是hash表(C++的map是红黑树实现,而C++ 11新增的unordered_map则与go的map类似,都是hash实现)。go map的数据被置入一个由桶组成的有序数组中,每个桶最多可以存放8个key/value对。key的hash值(32位)的低阶位用于在该数组中定位到桶,而高8位则用于在桶中区分key/value对。
go map的hash表中的基本单位是桶,每个桶最多存8个键值对,超了,则会链接到额外的溢出桶。所以go map是基本数据结构是hash数组+桶内的key-value数组+溢出的桶链表
当hash表超过阈值需要扩容增长时,会分配一个新的数组,新数组的大小一般是旧数组的2倍。这里从旧数组将数据迁移到新数组,不会一次全量拷贝,go会在每次读写Map时以桶为单位做动态搬迁疏散。
我们先了解基本的数据结构,后面再看源码就简单多了。
map主要由两个核心的结构,即基础结构和桶实现:
const ( // 关键的变量
bucketCntBits = 3
bucketCnt = 1 << bucketCntBits // 一个桶最多存储8个key-value对
loadFactorNum = 13 // 扩散因子:loadFactorNum / loadFactorDen = 6.5。
loadFactorDen = 2 // 即元素数量 >= (hash桶数量(2^hmp.B) * 6.5 / 8) 时,触发扩容
)
// map的基础数据结构
type hmap struct {
count int // map存储的元素对计数,len()函数返回此值,所以map的len()时间复杂度是O(1)
flags uint8 // 记录几个特殊的位标记,如当前是否有别的线程正在写map、当前是否为相同大小的增长(扩容/缩容?)
B uint8 // hash桶buckets的数量为2^B个
noverflow uint16 // 溢出的桶的数量的近似值
hash0 uint32 // hash种子
buckets unsafe.Pointer // 指向2^B个桶组成的数组的指针,数据存在这里
oldbuckets unsafe.Pointer // 指向扩容前的旧buckets数组,只在map增长时有效
nevacuate uintptr // 计数器,标示扩容后搬迁的进度
extra *mapextra // 保存溢出桶的链表和未使用的溢出桶数组的首地址
}
// 桶的实现结构
type bmap struct {
// tophash存储桶内每个key的hash值的高字节
// tophash[0] < minTopHash表示桶的疏散状态
// 当前版本bucketCnt的值是8,一个桶最多存储8个key-value对
tophash [bucketCnt]uint8
// 特别注意:
// 实际分配内存时会申请一个更大的内存空间A,A的前8字节为bmap
// 后面依次跟8个key、8个value、1个溢出指针
// map的桶结构实际指的是内存空间A
}
// map.go里很多函数的第1个入参是这个结构,从成员来看很明显,此结构标示了键值对和桶的大小等必要信息
// 有了这个结构的信息,map.go的代码就可以与键值对的具体数据类型解耦
// 所以map.go用内存偏移量和unsafe.Pointer指针来直接对内存进行存取,而无需关心key或value的具体类型
type maptype struct {
typ _type
key *_type
elem *_type
bucket *_type // internal type representing a hash bucket
keysize uint8 // size of key slot
valuesize uint8 // size of value slot
bucketsize uint16 // size of bucket
flags uint32
}
C++使用模板可以根据不同的类型生成map的代码。
golang则通过上述maptype结构体传递键值对的类型大小等信息,从而map.go直接用指针操作对应大小的内存来实现全局一份map代码同时适用于不同类型的键值对。这点上可以认为相比C++用模板实现map的方式,go map的目标文件的代码量会更小。
map底层创建时,会初始化一个hmap结构体,同时分配一个足够大的内存空间A。其中A的前段用于hash数组,A的后段预留给溢出的桶。于是hmap.buckets指向hash数组,即A的首地址;hmap.extra.nextOverflow初始时指向内存A中的后段,即hash数组结尾的下一个桶,也即第1个预留的溢出桶。所以当hash冲突需要使用到新的溢出桶时,会优先使用上述预留的溢出桶,hmap.extra.nextOverflow依次往后偏移直到用完所有的溢出桶,才有可能会申请新的溢出桶空间。
上图中,当需要分配一个溢出桶时,会优先从预留的溢出桶数组里取一个出来链接到链表后面,这时不需要再次申请内存。但当预留的桶被用完了,则需要申请新的内存给溢出桶。
使用make(map[k]v, hint)创建map时会调用makemap()函数,代码逻辑比较简单。
值得注意的是,makemap()创建的hash数组,数组的前面是hash表的空间,当hint >= 4时后面会追加2^(hint-4)个桶,之后再内存页帧对齐又追加了若干个桶(参见2.2章节结构图的hash数组部分)
所以创建map时一次内存分配既分配了用户预期大小的hash数组,又追加了一定量的预留的溢出桶,还做了内存对齐,一举多得。
// make(map[k]v, hint), hint即预分配大小
// 不传hint时,如用new创建个预设容量为0的map时,makemap只初始化hmap结构,不分配hash数组
func makemap(t *maptype, hint int, h *hmap) *hmap {
// 省略部分代码
// 随机hash种子
h.hash0 = fastrand()
// 2^h.B 为大于hint*6.5(扩容因子)的最小的2的幂
B := uint8(0)
// overLoadFactor(hint, B)只有一行代码:return hint > bucketCnt && uintptr(hint) > loadFactorNum*(bucketShift(B)/loadFactorDen)
// 即B的大小应满足 hint <= (2^B) * 6.5
// 一个桶能存8对key-value,所以这就表示B的初始值是保证这个map不需要扩容即可存下hint个元素对的最小的B值
for overLoadFactor(hint, B) {
B++
}
h.B = B
// 这里分配hash数组
if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
// makeBucketArray()会在hash数组后面预分配一些溢出桶,
// h.extra.nextOverflow用来保存上述溢出桶的首地址
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}
// 分配hash数组
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
base := bucketShift(b) // base代表用户预期的桶的数量,即hash数组的真实大小
nbuckets := base // nbuckets表示实际分配的桶的数量,>= base,这就可能会追加一些溢出桶作为溢出的预留
if b >= 4 {
// 这里追加一定数量的桶,并做内存对齐
nbuckets += bucketShift(b - 4)
sz := t.bucket.size * nbuckets
up := roundupsize(sz)
if up != sz {
nbuckets = up / t.bucket.size
}
}
// 后面的代码就是申请内存空间了,此处省略
// 这里大家可以思考下这个数组空间要怎么分配,其实就是n*sizeof(桶),所以:
// 每个桶前面是8字节的tophash数组,然后是8个key,再是8个value,最后放一个溢出指针
// sizeof(桶) = 8 + 8*sizeof(key) + 8*sizeof(value) + 8
return buckets, nextOverflow
}
go map的插入操作,调用mapassign()函数。
同学们或许在某些资料上了解过:
上述两个限制,在mapassign()函数开头能找到答案:
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
// 不熟悉指针操作的同学,用指针传参往往会踩空指针的坑
// 这里大家可以思考下,为什么h要非空判断?
// 如果一定要在这里支持空map并检测到map为空时自动初始化,应该怎么写?
// 提示:指针的指针
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
// 在这里做并发判断,检测到并发写时,抛异常
// 注意:go map的并发检测是伪检测,并不保证所有的并发都会被检测出来。而且这玩意是在运行期检测。
// 所以对map有并发要求时,应使用sync.map来代替普通map,通过加锁来阻断并发冲突
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}
hash := alg.hash(key, uintptr(h.hash0)) // 这里得到uint32的hash值
h.flags ^= hashWriting // 置Writing标志,key写入buckets后才会清除标志
if h.buckets == nil { // map不能为空,但hash数组可以初始是空的,这里会初始化
h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}
...
}
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
again:
bucket := hash & bucketMask(h.B) // 这里用hash值的低阶位定位hash数组的下标偏移量
if h.growing() {
growWork(t, h, bucket) // 这里是map的扩容缩容操作,我们在第4章单独讲
}
// 通过下标bucket,偏移定位到具体的桶
b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
top := tophash(hash) // 这里取高8位用于在桶内定位键值对
...
}
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
var inserti *uint8 // tophash插入位置
var insertk unsafe.Pointer // key插入位置
var val unsafe.Pointer // value插入位置
bucketloop:
for {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if isEmpty(b.tophash[i]) && inserti == nil {
// 找到个空位,先记录下tophash、key、value的插入位置
// 但要遍历完才能确定要不要插入到这个位置,因为后面有可能有重复的元素
inserti = &b.tophash[i]
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
}
if b.tophash[i] == emptyRest {
break bucketloop // 遍历完整个溢出链表,退出循环
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
if !alg.equal(key, k) {
continue
}
// 走到这里说明map里找到一个重复的key,更新key-value,跳到第5步
if t.needkeyupdate() {
typedmemmove(t.key, k, key)
}
val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
goto done // 更新Key后跳到第5步
}
ovf := b.overflow(t)
if ovf == nil {
break // 遍历完整个溢出链表,没找到能插入的空位,结束循环,下一步再追加一个溢出桶进来
}
b = ovf // 继续遍历下一个溢出桶
}
...
}
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
// 这里判断要不要扩容,我们第4章再讲
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
}
if inserti == nil { // inserti == nil说明上1步没找到空位,整个链表是满的,这里添加一个新的溢出桶上去
newb := h.newoverflow(t, b) // 分配新溢出桶,优先用3.1章节预留的溢出桶,用完了则分配一个新桶内存
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
val = add(insertk, bucketCnt*uintptr(t.keysize))
}
// 当key或value的类型大小超过一定值时,桶只存储key或value的指针。这里分配空间并取指针
if t.indirectkey() {
kmem := newobject(t.key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.indirectvalue() {
vmem := newobject(t.elem)
*(*unsafe.Pointer)(val) = vmem
}
typedmemmove(t.key, insertk, key) // 在桶中对应位置插入key
*inserti = top // 插入tophash,hash值高8位
h.count++ // 插入了新的键值对,h.count数量+1
...
}
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
done:
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
h.flags &^= hashWriting // 释放hashWriting标志位
if t.indirectvalue() {
val = *((*unsafe.Pointer)(val))
}
return val // 返回value可插入位置的指针,注意,value还没插入
}
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
...
b.tophash[i] = emptyOne // 先标记删除
// 如果b.tophash[i]不是最后一个元素,则暂时先占着坑。emptyOne标记的位置暂时不能被插入新元素(见3.2章节插入函数)
if i == bucketCnt-1 {
if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
goto notLast
}
} else {
if b.tophash[i+1] != emptyRest {
goto notLast
}
}
for { // 如果b.tophash[i]是最后一个元素,则把末尾的emptyOne全部清除置为emptyRest
b.tophash[i] = emptyRest
if i == 0 {
if b == bOrig {
break // beginning of initial bucket, we‘re done.
}
// Find previous bucket, continue at its last entry.
c := b
for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
}
i = bucketCnt - 1
} else {
i--
}
if b.tophash[i] != emptyOne {
break
}
}
...
}
查找操作由mapaccess开头的一组函数实现。前面的章节在插入和删除之前都得先定位查找到元素,逻辑是类似的,也比较简单,就不细说了:
map的迭代是通过hiter结构和对应的两个辅助函数实现的。hiter结构由编译器在调用辅助函数之前创建并传入,每次迭代结果也由hiter结构传回。下方的it即是hiter结构体的指针变量。
mapiterinit()函数主要是决定我们从哪个位置开始迭代,为什么是从哪个位置,而不是直接从hash数组头部开始呢?《go程序设计语言》好像提到过,hash表中数据每次插入的位置是变化的(其实是因为实现的原因,一方面hash种子是随机的,这导致相同的数据在不同的map变量内的hash值不同;另一方面即使同一个map变量内,数据删除再添加的位置也有可能变化,因为在同一个桶及溢出链表中数据的位置不分先后),所以为了防止用户错误的依赖于每次迭代的顺序,map作者干脆让相同的map每次迭代的顺序也是随机的。
迭代顺序随机的实现方式也简单,直接从随机的一个位置开始就行了:
于是,map的遍历过程如下:
func mapiterinit(t *maptype, h *hmap, it *hiter) {
...
// 随机一个偏移量来开始
r := uintptr(fastrand())
if h.B > 31-bucketCntBits {
r += uintptr(fastrand()) << 31
}
it.startBucket = r & bucketMask(h.B)
it.offset = uint8(r >> h.B & (bucketCnt - 1))
...
mapiternext(it) // 初始化迭代器的同时也返回第1对key/value
}
上一节迭代循环的过程很清晰了,这里我们说明几个重要的参数:
此外,迭代还需要关注扩容缩容的情况:
go map的扩容缩容都是grow相关的函数,这里扩容是真的,缩容是伪缩容,后面我会解释。我们先看下触发条件:
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
}
...
}
// overLoadFactor()返回true则触发扩容,即map的count大于hash桶数量(2^B)*6.5
func overLoadFactor(count int, B uint8) bool {
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
// tooManyOverflowBuckets(),顾名思义,溢出桶太多了触发缩容
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
if B > 15 {
B = 15
}
return noverflow >= uint16(1)<<(B&15)
}
map只在插入元素即mapassign()函数中对是否扩容缩容进行触发,条件即是上面这段代码:
仔细观察触发的代码,扩容和缩容是同一个函数,这是怎么做到的呢?在hashGrow()开始,会先判断是否满足扩容条件,如果满足就表明这次是扩容,不满足就一定是缩容条件触发了。扩容和缩容剩下的逻辑,主要区别就在于容量变化,就是hmap.B参数,扩容时B+1则hash表容量扩大1倍,缩容时hash表容量不变。
到这里触发的主要工作已经完成,接下来就是怎么把元素搬迁到新hash表里了。如果现在就一次全量搬迁过去,显然接下来会有比较长的一段时间map被占用(不支持并发)。所以搬迁的工作是异步增量搬迁的。
在插入和删除的函数内都有下面一段代码用于在每次插入和删除操作时,执行一次搬迁工作:
if h.growing() { // 当前处于搬迁状态
growWork(t, h, bucket) // 调用搬迁函数
}
func growWork(t *maptype, h *hmap, bucket uintptr) {
// 将当前需要处理的桶搬迁
evacuate(t, h, bucket&h.oldbucketmask())
if h.growing() { // 再多搬迁一个桶
evacuate(t, h, h.nevacuate)
}
}
现在可以解释为什么我把map的缩容叫做伪缩容了:因为缩容仅仅针对溢出桶太多的情况,触发缩容时hash数组的大小不变,即hash数组所占用的空间只增不减。也就是说,如果我们把一个已经增长到很大的map的元素挨个全部删除掉,hash表所占用的内存空间也不会被释放。
所以如果要实现“真缩容”,需自己实现缩容搬迁,即创建一个较小的map,将需要缩容的map的元素挨个搬迁过来:
// go map缩容代码示例
myMap := make(map[int]int, 1000000)
// 假设这里我们对bigMap做了很多次插入,之后又做了很多次删除,此时bigMap的元素数量远小于hash表大小
// 接下来我们开始缩容
smallMap := make(map[int]int, len(myMap))
for k, v := range myMap {
smallMap[k] = v
}
myMap = smallMap // 缩容完成,原来的map被我们丢弃,交给gc去清理
时间复杂度,go map是hash实现,我们先不管具体原理,江湖套路hash实现的就叫它O(1)的时间复杂度:
go采用的hash算法应是很成熟的算法,极端情况暂不考虑。所以综合情况下go map的时间复杂度应为O(1)
空间复杂度分析:
首先我们不考虑因删除大量元素导致的空间浪费情况(这种情况现在go是留给程序员自己解决),只考虑一个持续增长状态的map的一个空间使用率:
由于溢出桶数量超过hash桶数量时会触发缩容,所以最坏的情况是数据被集中在一条链上,hash表基本是空的,这时空间浪费O(n)。
最好的情况下,数据均匀散列在hash表上,没有元素溢出,这时最好的空间复杂度就是扩散因子决定了,当前go的扩散因子由全局变量决定,即loadFactorNum/loadFactorDen = 6.5。即平均每个hash桶被分配到6.5个元素以上时,开始扩容。所以最小的空间浪费是(8-6.5)/8 = 0.1875,即O(0.1875n)
结论:go map的空间复杂度(指除去正常存储元素所需空间之外的空间浪费)是O(0.1875n) ~ O(n)之间。
原文:https://www.cnblogs.com/ExMan/p/11791083.html