使用的go版本为 go1.21.2
首先我们写一个简单的Pool的使用代码
package mainimport "sync"var bytePool = sync.Pool{New: func() interface{} {b := make([]byte, 1024)return &b},
}func main() {for j := 0; j < 10; j++ {obj := bytePool.Get().(*[]byte) // 获取一个[]byte_ = objbytePool.Put(obj) // 用完再给放回去}
}
pool对象池的作用
- 减少内存分配: 通过池,可以减少对内存的频繁分配和释放,提高程序的内存利用率。
- 避免垃圾回收压力: 对象池中的对象在被使用后不会立即被释放,而是放回到池中等待复用。这有助于减轻垃圾回收的压力,因为对象可以在多次使用后才被真正释放。
- 提高性能: 复用对象可以避免不必要的对象创建和销毁开销,从而提高程序的性能。
从demo上看好像没啥卵用,我们来进行一些压力测试
package mainimport ("sync""testing"
)var bytePool = sync.Pool{New: func() interface{} {b := make([]byte, 1024)return &b},
}func BenchmarkByteMake(b *testing.B) {b.ReportAllocs()b.ResetTimer()for i := 0; i < b.N; i++ {for j := 0; j < 10000; j++ {obj := make([]byte, 1024)_ = obj}}
}func BenchmarkBytePool(b *testing.B) {b.ReportAllocs()b.ResetTimer()for i := 0; i < b.N; i++ {for j := 0; j < 10000; j++ {obj := bytePool.Get().(*[]byte) // 获取一个1024长度的[]byte_ = objbytePool.Put(obj) // 用完再给放回去}}
}
看一下压测效果
可以看到执行效率高了好多倍
项目中没实际用到过,不过我们可以翻一下开源项目中是怎么用的
redis-v9
Pool结构体
比较复杂有点套娃的意思
//代码位于 GOROOT/src/sync/pool.go L:49
type Pool struct {//防止Pool被复制, 君子协议,编译可以通过,某些编辑器会报waring//静态检测 go vet会出错//有兴趣可以看一下这里 https://github.com/golang/go/issues/8005#issuecomment-190753527noCopy noCopylocal unsafe.Pointer // 本地池,对应类型[P]poolLocal P指的是 GMP中的P.ID字段localSize uintptr // 本地池大小victim unsafe.Pointer // 上一个周期的本地池victimSize uintptr // 上一个周期的本地池大小New func() any // 创建对象的方法,这个需要我们自己实现
}type poolLocal struct { //本地池poolLocalInternal// 用128取模,确保结构体占据整数个缓存行,从而防止伪共享.pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}type poolLocalInternal struct {private interface{} // 本地P的私有字段shared poolChain // 双端链表, 任何P都可以进行popTail
}//代码位于 GOROOT/src/sync/poolqueue.go L:194
type poolChain struct { // 双向队列//头部head *poolChainElt//尾部tail *poolChainElt
}type poolChainElt struct { //环状队列poolDequeue // next 由生产者原子性地写入,并由消费者原子性地读取, 从非nil转换为nil// prev 由消费者原子性地写入,并由生产者原子性地读取, 从非nil转换为nilnext, prev *poolChainElt
}//代码位于 GOROOT/src/sync/poolqueue.go L:19
type poolDequeue struct {//一个字段两个含义,高32位为头,低32位为尾部headTail uint64//环形缓存//vals[i].typ 为nil 说明该槽位为空vals []eface
}type eface struct { //类型与值typ, val unsafe.Pointer
}
Get函数
//代码位于 GOROOT/src/sync/pool.go L:127
func (p *Pool) Get() any {if race.Enabled { // 使用竞态检查race.Disable() //竞态检查 禁用}l, pid := p.pin() //获取当前P的ID 与 poolLocal 详细见下方x := l.private //看看私有属性是否存在对象,如果存在就可以直接返回l.private = nilif x == nil { ////优先从链表的头部获取,x, _ = l.shared.popHead()if x == nil {// 慢读取路径x = p.getSlow(pid)}}runtime_procUnpin() //取消 P 的禁止抢占if race.Enabled { // 使用竞态检查race.Enable() //竞态检查 启用if x != nil {race.Acquire(poolRaceAddr(x))}}if x == nil && p.New != nil { //调度new方法重新生成一个对象x = p.New()}return x
}
pin函数
//代码位于 GOROOT/src/sync/pool.go L:127
func (p *Pool) pin() (*poolLocal, int) {//获取P的idpid := runtime_procPin()// 原子操作获取本地池大小// 本地池s := runtime_LoadAcquintptr(&p.localSize) // load-acquirel := p.local // load-consumeif uintptr(pid) < s { //如果当前P.id 没有在local中越界,直接去获取值return indexLocal(l, pid), pid}return p.pinSlow() //慢获取
}func (p *Pool) pinSlow() (*poolLocal, int) {//取消P的禁止抢占runtime_procUnpin()allPoolsMu.Lock() //加锁defer allPoolsMu.Unlock()pid := runtime_procPin() //获取P的id//获取本地池的大小与本地池s := p.localSizel := p.localif uintptr(pid) < s { //如果当前P.id 没有在local中越界,直接去获取值return indexLocal(l, pid), pid}if p.local == nil { //如果local为空,将他加入到allPools中allPools = append(allPools, p)}// GOMAXPROCS在GC之间发送了变化,重新分配p.load与p.localSizesize := runtime.GOMAXPROCS(0)local := make([]poolLocal, size)atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-releaseruntime_StoreReluintptr(&p.localSize, uintptr(size)) // store-releasereturn &local[pid], pid
}
getSlow函数
//代码位于 GOROOT/src/sync/pool.go L:156
func (p *Pool) getSlow(pid int) any {// 原子获取本地池大小// 本地池size := runtime_LoadAcquintptr(&p.localSize) // load-acquirelocals := p.local // load-consume// 尝试从别的P poolLocal尾部获取local// 这个循环的方式有点东西(pid+i+1)%int(size),优先从非pid的下标获取,最后一次是pidfor i := 0; i < int(size); i++ {l := indexLocal(locals, (pid+i+1)%int(size))if x, _ := l.shared.popTail(); x != nil {return x}}// 原子获取上一周期本地池大小size = atomic.LoadUintptr(&p.victimSize)if uintptr(pid) >= size { //如果pid大于size 说明让回收掉了return nil}locals = p.victiml := indexLocal(locals, pid)if x := l.private; x != nil {//看看私有属性是否存在对象,如果存在就可以直接返回l.private = nilreturn x}// 尝试从别的P poolLocal尾部获取localfor i := 0; i < int(size); i++ {l := indexLocal(locals, (pid+i)%int(size))if x, _ := l.shared.popTail(); x != nil {return x}}//将victimSize设置为0atomic.StoreUintptr(&p.victimSize, 0)return nil
}
Put函数
//代码位于 GOROOT/src/sync/pool.go L:95
func (p *Pool) Put(x any) {if x == nil { //如果写入的x为nil之间返回return}if race.Enabled { //使用竞态检查if fastrandn(4) == 0 {// Randomly drop x on floor.return}race.ReleaseMerge(poolRaceAddr(x))race.Disable() // 竞态检查 禁用}l, _ := p.pin() // 获取PoolLocalif l.private == nil { // 如果私有属性没有赋值l.private = x} else { //将x写入头l.shared.pushHead(x)}runtime_procUnpin()if race.Enabled { //使用竞态检查race.Enable() //竞态检查 启用}
}
pushHead函数解读
//代码位于 GOROOT/src/sync/poolqueue.go L:228
func (c *poolChain) pushHead(val any) {d := c.headif d == nil { //如果head为空,将head初始化为8长度的eface数组const initSize = 8 // Must be a power of 2d = new(poolChainElt)d.vals = make([]eface, initSize)c.head = dstorePoolChainElt(&c.tail, d) //将新创建的节点,当做尾节点}if d.pushHead(val) { //对象入队return}// 走到这里说明满了。可扩容为2倍newSize := len(d.vals) * 2// 扩容大小 (1 << 32) / 4 超出将这个设置为(1 << 32) / 4if newSize >= dequeueLimit { newSize = dequeueLimit}//新建poolChainElt将prev指向dd2 := &poolChainElt{prev: d}d2.vals = make([]eface, newSize)c.head = d2 //将新创建的节点,当做头节点storePoolChainElt(&d.next, d2) // 将老的节点指向,新节点d2.pushHead(val) //对象入队
}
延迟处理下标小技巧
package mainimport ("fmt"
)func main() {pid := 1size := 20for i := 0; i < int(size); i++ {if i == pid {continue}fmt.Println(i)}// 优化版本 pid会在最后一个打印处理for i := 0; i < size; i++ {index := (pid + i + 1) % size// 前面处理完以后直接returnfmt.Println(index)}
}
总结
我们从上面的源码分析了解Pool的数据结构、Get、Put这些基本操作原理,在项目中我们可以使用比特位来减少内存的占用,从源码分析我们得知Go官方设计不允许进行Pool复制(君子协议), 还学到了一个延迟处理下标的小技巧。