Go 协程池 Gopool VS ants 原理解析

写过高并发的都知道,控制协程数量是问题的关键,如何高效利用协程,本文将介绍gopool和ants两个广泛应用的协程池,通过本文你可以了解到:

1. 实现原理
2. 使用方法
3. 区别

背景

虽然通过go func()即可轻量级实现并发,但如果通过for循环创建go
func(),会出现成千上万个协程(1个协程2KB,过多协程会达到GB级别)导致系统资源耗尽,为了保护系统资源,往往需要控制协程数量。

协程池的核心思想是限制并发执行的 goroutine 数量,从而有效管理资源使用和避免过度并发带来的问题。

使用原生Channel实现协程池

Channel 支持多个协程之间的数据通信,并且可以通过有限长度的channel当做协程池,最大长度即为协程池最大容量。
在这里插入图片描述

核心原理

关键点1:定义协程池结构和构造函数

首先,定义一个协程池的结构体,它包含一个 channel 用于控制并发数量,以及一个 WaitGroup 用于等待所有任务完成。

package mainimport ("sync""fmt""time"
)type GoroutinePool struct {maxGoroutines intpool          chan struct{}wg            sync.WaitGroup
}func NewGoroutinePool(maxGoroutines int) *GoroutinePool {return &GoroutinePool{maxGoroutines: maxGoroutines,pool:          make(chan struct{}, maxGoroutines),}
}

关键点2:实现任务提交和执行的方法

为协程池添加方法以提交和执行任务。使用 channel 来控制同时运行的 goroutine 数量。

func (p *GoroutinePool) Submit(task func()) {p.wg.Add(1)p.pool <- struct{}{} // 获取令牌,如果池已满,这里会阻塞go func() {defer p.wg.Done()defer func() { <-p.pool }() // 释放令牌task() // 执行任务}()
}func (p *GoroutinePool) Wait() {p.wg.Wait()
}

举个🌰

func main() {pool := NewGoroutinePool(5) // 创建一个大小为5的协程池for i := 0; i < 10; i++ {count := ipool.Submit(func() {fmt.Printf("Running task %d, timeNow: %v\n", count, time.Now())time.Sleep(1 * time.Second) // 模拟耗时任务})}pool.Wait() // 等待所有任务完成fmt.Println("All tasks completed.")
}
// 输出
Running task 1, timeNow: 2024-08-14 15:46:19.937473 +0800 CST m=+0.000572251
Running task 2, timeNow: 2024-08-14 15:46:19.937135 +0800 CST m=+0.000234542
Running task 3, timeNow: 2024-08-14 15:46:19.937344 +0800 CST m=+0.000443501
Running task 0, timeNow: 2024-08-14 15:46:19.937135 +0800 CST m=+0.000234167
Running task 4, timeNow: 2024-08-14 15:46:19.937104 +0800 CST m=+0.000203709
Running task 9, timeNow: 2024-08-14 15:46:20.938929 +0800 CST m=+1.002042292
Running task 6, timeNow: 2024-08-14 15:46:20.938935 +0800 CST m=+1.002048334
Running task 8, timeNow: 2024-08-14 15:46:20.938965 +0800 CST m=+1.002078501
Running task 7, timeNow: 2024-08-14 15:46:20.938963 +0800 CST m=+1.002076542
Running task 5, timeNow: 2024-08-14 15:46:20.938948 +0800 CST m=+1.002061167
All tasks completed.

可以看到,因为协程池大小为5,所以前5个任务都在0s执行,后5个在1s执行。

说实话,原生的Channel实现,个人感觉已经能够完全覆盖日常使用,但是公司和业界还是推出了很多协程池的实现,存在即合理,我们来学习一下~

使用gopkg/gopool实现协程池

这个包在字节内部已经广泛应用,咨询作者后,才发现这个包已经开源啦
https://github.com/bytedance/gopkg/tree/main/util/gopool

核心原理

这三个“池”纯属个人理解,抽象了两个池的概念
三个“池”的关系

在这里插入图片描述

实现流程

黄色 为新任务到来业务处理流程
在这里插入图片描述

协程池结构体

给用户侧暴露的协程池结构体为pool,存储了名称,最大容量,任务链表,正在进行的任务数量。通过NewPool来进行创建。通过pool.Go()方法进行添加事件处理

type pool struct {// pool 的名字,打 metrics 和打 log 时用到name string// pool 的容量,也就是最大的真正在工作的 goroutine 的数量// 为了性能考虑,可能会有小误差cap int32// 配置信息config *Config// 任务链表taskHead  *tasktaskTail  *tasktaskLock  sync.MutextaskCount int32// 记录正在运行的 worker 数量workerCount int32// 用来标记是否关闭closed int32// worker panic 的时候会调用这个方法panicHandler func(context.Context, interface{})
}// name 必须是不含有空格的,只能含有字母、数字和下划线,否则 metrics 会失败
func NewPool(name string, cap int32, config *Config) Pool {p := &pool{name:   name,cap:    cap,config: config,}return p
}// 添加事件处理
func (p *pool) Go(f func()) {p.CtxGo(context.Background(), f)  // 在下文分析
}

关键点1:任务队列池

taskPool,存储对象为*task

用于存放待执行任务的队列,如果当前任务数<=最大容量,直接新建工作协程执行;如果当前任务数量>最大容量,新的任务会被链表链接到上一个任务尾部,不新建工作协程。这样所有的任务通过链表链接在一起,形成了任务队列池。

var taskPool sync.Poolfunc init() {taskPool.New = newTask
}type task struct {ctx context.Contextf   func()next *task
}func newTask() interface{} {return &task{}
}// 核心代码-将新任务放到任务队列池并执行
func (p *pool) CtxGo(ctx context.Context, f func()) {t := taskPool.Get().(*task)t.ctx = ctxt.f = fp.taskLock.Lock()// 链接节点if p.taskHead == nil {p.taskHead = tp.taskTail = t} else {p.taskTail.next = tp.taskTail = t}p.taskLock.Unlock()atomic.AddInt32(&p.taskCount, 1)// 如果 pool 已经被关闭了,就 panicif atomic.LoadInt32(&p.closed) == 1 {panic("use closed pool")}// 判断条件满足以下两个条件:// 1. task 数量大于阈值// 2. 目前的 worker 数量小于上限 p.cap// 或者目前没有 workerif (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {p.incWorkerCount()// 新建工作协程执行w := workerPool.Get().(*worker)w.pool = pw.run()  // 下文——工作协程池的处理}
}

关键点2:工作协程池

workerPool,存储对象为*worker

存储当前执行的协程,针对一个工作协程,判断当前节点的任务并执行,并且通过for循环一直寻找下一个待执行的任务,如果没有下一个节点,则跳出循环。

var workerPool sync.Poolfunc init() {workerPool.New = newWorker
}type worker struct {pool *pool
}func newWorker() interface{} {return &worker{}
}// 核心代码-协程池如何处理
func (w *worker) run() {// 启动协程,再复杂的包起协程其实都是这样简单🤷🏻‍♀️go func() {for {var t *taskw.pool.taskLock.Lock()if w.pool.taskHead != nil {t = w.pool.taskHeadw.pool.taskHead = w.pool.taskHead.nextatomic.AddInt32(&w.pool.taskCount, -1)}// 如果没有任务要做了,就释放资源,退出if t == nil {w.close()w.pool.taskLock.Unlock()// 工作协程对象回收w.Recycle()return}w.pool.taskLock.Unlock()func() {defer func() {if r := recover(); r != nil {logs.CtxFatal(t.ctx, "GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())if w.pool.config.EnablePanicMetrics {panicMetricsClient.EmitCounter(panicKey, 1, metrics.T{Name: "pool", Value: w.pool.name})}w.pool.panicHandler(t.ctx, r)}}()t.f()}()// 任务对象回收t.Recycle()}}()
}

核心原理

  1. 复用Go协程:每一个协程作为一个工作协程,执行完当前任务后,会进入for循环寻找下一个执行
  2. 减少内存分配:使用sync.Pool包来减少内存分配的频率,通过重用已经存在但不再使用的对象(即工作协程)来达到这一目的。同时这个包帮你完成了对象创建、回收操作;
  3. 显示记录容量:通过int32类型记录最大容量,记录当前执行数量,为了保证并发安全,通过atomic.AddInt32、atomic.LoadInt32等方法进行加减操作;
  4. 控制执行顺序:通过链表指定任务链,实现(非严格意义)FIFO,同时采用sync.Mutex来保证链表流转过程中的并发安全;之前的channel是通过阻塞的形式,实现(非严格意义)FIFO;

缺点:说实话,我内心是希望它能够把sync.WaitGroup包做一个集成,让它能够控制各个协程结束的时间,当然这个也不是协程池原生定义里的内容。

举个🌰

func main() {// 写法1_ = gopool.RegisterPool(gopool.NewPool("test", 5, gopool.NewConfig()))p := gopool.GetPool("test")// 写法2// p := gopool.NewPool("test", 5, gopool.NewConfig())var wg sync.WaitGroupfor i := 0; i < 10; i++ {count := iwg.Add(1)p.Go(func() {fmt.Printf("Running task %d, timeNow: %v\n", count, time.Now())time.Sleep(1 * time.Second) // 模拟耗时任务wg.Done()})}wg.Wait()fmt.Println("All tasks completed.")
}
// 输出
Running task 1, timeNow: 2024-08-15 10:32:16.729052 +0800 CST m=+0.023291751
Running task 3, timeNow: 2024-08-15 10:32:16.72925 +0800 CST m=+0.023490418
Running task 4, timeNow: 2024-08-15 10:32:16.729255 +0800 CST m=+0.023495168
Running task 0, timeNow: 2024-08-15 10:32:16.72905 +0800 CST m=+0.023289626
Running task 2, timeNow: 2024-08-15 10:32:16.729062 +0800 CST m=+0.023302334
Running task 6, timeNow: 2024-08-15 10:32:17.729546 +0800 CST m=+1.023782501
Running task 8, timeNow: 2024-08-15 10:32:17.729611 +0800 CST m=+1.023847543
Running task 5, timeNow: 2024-08-15 10:32:17.729532 +0800 CST m=+1.023768209
Running task 9, timeNow: 2024-08-15 10:32:17.729643 +0800 CST m=+1.023879126
Running task 7, timeNow: 2024-08-15 10:32:17.729586 +0800 CST m=+1.023822626
All tasks completed.

可以看到,因为协程池大小为5,所以前5个任务都在0s执行,后5个在1s执行。

使用panjf2000/ants实现协程池

https://github.com/panjf2000/ants Github 12.6k stars MIT license

核心原理

不得不说这个开源Readme写的还是比较好的,可以抄几张图贴在这里🐶
另外,作者还有一篇专门的文档来介绍 GMP 并发调度器深度解析之手撸一个高性能 goroutine pool|Strike Freedom , 不过这篇文档介绍的是第一个版本,目前已经更新到v2.10了,本文也主要介绍这个版本
流程图
在这里插入图片描述

活动图

step1: 容量为4的协程池,接收了6个任务
[图片]
step2: 有4个立马执行,另外两个等待
[图片]

step3: 剩余的两个执行
[图片]

step4: 执行结束,worker都空闲下来
[图片]

协程池结构体

给用户侧暴露的协程池结构体为Pool,存储了最大容量,当前工作协程的数量,定时器等。通过NewPool来进行创建。通过pool.Submit()方法进行添加事件处理。

type Pool struct {poolCommon
}type poolCommon struct {// 协程池容量capacity int32// 当前正在执行的go routinue数量running int32// 作者自己实现的自旋锁 (用atomic.StoreUint32, atomic.CompareAndSwapUint32 实现)lock sync.Locker// 核心 - 工作协程池workers workerQueue// 记录协程池当前的状态是否已关闭state int32// 条件变量,处理任务等待和唤醒cond *sync.Cond// 所有都执行完成标志位allDone chan struct{}// 保证关闭协程池仅一次once *sync.Once// 通过 sync.Pool 的模式加快对worker对象的获取workerCache sync.Pool// 等待执行的数量waiting int32// 清除过期 worker 的终结标记purgeDone int32purgeCtx  context.ContextstopPurge context.CancelFunc// 定时更新协程池当前时间的终结标记ticktockDone int32ticktockCtx  context.ContextstopTicktock context.CancelFuncnow atomic.Valueoptions *Options
}// NewPool 初始化
func NewPool(size int, options ...Option) (*Pool, error) {if size <= 0 {size = -1}opts := loadOptions(options...)if !opts.DisablePurge {if expiry := opts.ExpiryDuration; expiry < 0 {return nil, ErrInvalidPoolExpiry} else if expiry == 0 {opts.ExpiryDuration = DefaultCleanIntervalTime}}if opts.Logger == nil {opts.Logger = defaultLogger}p := &Pool{poolCommon: poolCommon{capacity: int32(size),allDone:  make(chan struct{}),lock:     syncx.NewSpinLock(),once:     &sync.Once{},options:  opts,}}p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}// 这里提供了两种工作协程的组织模式,// queueTypeLoopQueue为循环队列模式;queueTypeStack为列表(栈)模式// 如果需要预先分配资源,则采用循环队列模式,否则采用列表(栈)模式if p.options.PreAlloc {if size == -1 {return nil, ErrInvalidPreAllocSize}p.workers = newWorkerQueue(queueTypeLoopQueue, size)} else {p.workers = newWorkerQueue(queueTypeStack, 0)}p.cond = sync.NewCond(p.lock)p.goPurge()    // 回收工作协程p.goTicktock() // 开启定时任务,记录当前的时间,根据时间判断过期对应的工作协程  (这个名字有点像 TikTok 🐶)return p, nil
}// 添加事件处理 
func (p *Pool) Submit(task func()) error {if p.IsClosed() {return ErrPoolClosed}w, err := p.retrieveWorker()  // 核心代码 - 如何获取一个空闲的工作协程if w != nil {w.inputFunc(task)}return err
}

关键点1:任务队列“池”

上述流程图中核心的实现逻辑 这里的“池”带引号,因为实际上并没有对任务队列组织起来放到一起,仅仅是流程性的逻辑

这里接收到的任务,如果可以执行,则立即返回,如果不能够执行,则阻塞住,等待通知有空闲的工作协程。其实这里并没有任务队列池的概念,反而是即用即处理,不满足条件阻塞的方式。

func (p *Pool) retrieveWorker() (w worker, err error) {p.lock.Lock()retry:// 尝试获取一个空闲的工作协程if w = p.workers.detach(); w != nil {p.lock.Unlock()return}// 如果没达到上限,直接取一个工作协程做执行,这里也是用 sync.Pool 包来优化对象的获取if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {p.lock.Unlock()w = p.workerCache.Get().(*goWorker)w.run()return}// 如果处于非阻塞模式 或 待处理的调用者数量达到最大限制值,退出  (这是一个optional的配置项,核心流程可以不关注这个)if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {p.lock.Unlock()return nil, ErrPoolOverload}// 阻塞等待工作协程p.addWaiting(1)p.cond.Wait() // 通过 sync.Cond 阻塞住,如果通过则表示已经有工作协程空闲p.addWaiting(-1)if p.IsClosed() {p.lock.Unlock()return nil, ErrPoolClosed}// 🐶 我咋感觉这块用for循环也行呢?哈哈,goto 这个用法感觉很罕见goto retry
}

关键点2:工作协程池

工作协程池为 workerQueue 结构体,里面的具体工作协程的元素是 worker。

type worker interface {run()                     finish()  lastUsedTime() time.TimeinputFunc(func())inputParam(interface{})
}type workerQueue interface {len() int             // worker 数量isEmpty() bool        // worker 数量是否为0insert(worker) error  // 添加工作协程detach() worker       // 获取一个工作协程refresh(duration time.Duration) []worker // 清除过期 worker 并返回它们reset()               // 重置工作协程池
}

可以看到这两种都是interface形式,在官方包中给了两种实现的模式。

  1. workerQueue 工作协程池 - 两个实现
    [图片]
workerStack列表形式存储工作协程,不需要预先分配空间 【默认】
loopQueue循环链表形式存储工作协程,预先分配空间,eg:有一个场景需要一个超大容量的池,而且每个 goroutine 里面的任务都是耗时任务,这种情况下,预先分配 goroutine 队列内存将会减少不必要的内存重新分配
  1. worker 工作协程 - 两个实现
    [图片]
goWorker工作协程 【默认】
goWorkerWithFunc带方法的工作协程,eg:如果每次调用的方法都相同,则会默认这种模式,避免每次方法的绑定消耗资源

不同的工作协程池实现,会有完全不同的流转逻辑,下面详细介绍一下。

workerStack-列表/栈

列表形式的存储,但是弹出元素detach和追加元素insert,都是从尾部进行,满足后进先出(LIFO)的特点。
在这里插入图片描述

type workerStack struct {items  []worker  // 列表形式存储expiry []worker  // 存储过期的工作协程
}func newWorkerStack(size int) *workerStack {return &workerStack{items: make([]worker, 0, size),}
}func (wq *workerStack) len() int {return len(wq.items)
}func (wq *workerStack) isEmpty() bool {return len(wq.items) == 0
}func (wq *workerStack) insert(w worker) error {wq.items = append(wq.items, w)  // 尾部添加元素return nil
}func (wq *workerStack) detach() worker {l := wq.len()if l == 0 {return nil}w := wq.items[l-1]  // 尾部获取元素wq.items[l-1] = nil // 避免内存泄漏wq.items = wq.items[:l-1]return w
}func (wq *workerStack) refresh(duration time.Duration) []worker {n := wq.len()if n == 0 {return nil}expiryTime := time.Now().Add(-duration)index := wq.binarySearch(0, n-1, expiryTime)wq.expiry = wq.expiry[:0]if index != -1 {wq.expiry = append(wq.expiry, wq.items[:index+1]...)m := copy(wq.items, wq.items[index+1:])for i := m; i < n; i++ {wq.items[i] = nil}wq.items = wq.items[:m]}return wq.expiry
}// 二分法搜索,用于获取临界过期的工作协程
func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {for l <= r {mid := l + ((r - l) >> 1) // avoid overflow when computing midif expiryTime.Before(wq.items[mid].lastUsedTime()) {r = mid - 1} else {l = mid + 1}}return r
}func (wq *workerStack) reset() {for i := 0; i < wq.len(); i++ {wq.items[i].finish()wq.items[i] = nil}wq.items = wq.items[:0]
}
loopQueue-循环队列

loopQueue 实现基于循环队列,定义如下。循环队列包含一个长度为 size 的切片,head 表示队列头指针,tail 表示队列尾指针,也就是最后一个元素的下一位,isFull 表示队列是否已满,当 head 和 tail 指向同一位置时用于区分队列是空还是满。
在这里插入图片描述
在这里插入图片描述

type loopQueue struct {items  []worker    // 用于初始分配workerexpiry []worker    // 过期workerhead   int         // 头结点tail   int         // 尾结点size   int         // 总大小isFull bool        // 是否队列已满
}func newWorkerLoopQueue(size int) *loopQueue {return &loopQueue{items: make([]worker, size),size:  size,}
}func (wq *loopQueue) len() int {if wq.size == 0 || wq.isEmpty() {return 0}if wq.head == wq.tail && wq.isFull {return wq.size}if wq.tail > wq.head {return wq.tail - wq.head}return wq.size - wq.head + wq.tail
}func (wq *loopQueue) isEmpty() bool {return wq.head == wq.tail && !wq.isFull
}func (wq *loopQueue) insert(w worker) error {if wq.size == 0 {return errQueueIsReleased}if wq.isFull {return errQueueIsFull}wq.items[wq.tail] = wwq.tail = (wq.tail + 1) % wq.size   // tail结点向后遍历,如果超过长度,则取余从头开始if wq.tail == wq.head {wq.isFull = true}return nil
}func (wq *loopQueue) detach() worker {if wq.isEmpty() {return nil}w := wq.items[wq.head]             // 从head结点取出worker,先进先出wq.items[wq.head] = nilwq.head = (wq.head + 1) % wq.sizewq.isFull = falsereturn w
}func (wq *loopQueue) refresh(duration time.Duration) []worker {expiryTime := time.Now().Add(-duration)index := wq.binarySearch(expiryTime)if index == -1 {return nil}wq.expiry = wq.expiry[:0]if wq.head <= index {wq.expiry = append(wq.expiry, wq.items[wq.head:index+1]...)for i := wq.head; i < index+1; i++ {wq.items[i] = nil}} else {wq.expiry = append(wq.expiry, wq.items[0:index+1]...)wq.expiry = append(wq.expiry, wq.items[wq.head:]...)for i := 0; i < index+1; i++ {wq.items[i] = nil}for i := wq.head; i < wq.size; i++ {wq.items[i] = nil}}head := (index + 1) % wq.sizewq.head = headif len(wq.expiry) > 0 {wq.isFull = false}return wq.expiry
}func (wq *loopQueue) binarySearch(expiryTime time.Time) int {var mid, nlen, basel, tmid intnlen = len(wq.items)// if no need to remove work, return -1if wq.isEmpty() || expiryTime.Before(wq.items[wq.head].lastUsedTime()) {return -1}// example// size = 8, head = 7, tail = 4// [ 2, 3, 4, 5, nil, nil, nil,  1]  true position//   0  1  2  3    4   5     6   7//              tail          head////   1  2  3  4  nil nil   nil   0   mapped position//            r                  l// base algorithm is a copy from worker_stack// map head and tail to effective left and rightr := (wq.tail - 1 - wq.head + nlen) % nlenbasel = wq.headl := 0for l <= r {mid = l + ((r - l) >> 1) // avoid overflow when computing mid// calculate true mid position from mapped mid positiontmid = (mid + basel + nlen) % nlenif expiryTime.Before(wq.items[tmid].lastUsedTime()) {r = mid - 1} else {l = mid + 1}}// return true position from mapped positionreturn (r + basel + nlen) % nlen
}func (wq *loopQueue) reset() {if wq.isEmpty() {return}retry:if w := wq.detach(); w != nil {w.finish()goto retry}wq.items = wq.items[:0]wq.size = 0wq.head = 0wq.tail = 0
}
goWorker-工作协程

在工作协程池中的存储的具体执行者是goWorker。

type goWorker struct {// 所属协程池的引用,绑定的是同一个poolpool *Pool// 任务通道,通过这个通道将类型为func()的函数作为任务发送给goWorkertask chan func()// 记录goWorker被放回协程池的时间lastUsed time.Time
}// run 方法启动一个协程,用来执行指定的函数。
// 从task通道中不断地接收任务,获取函数变量后直接执行,然后将goWorker对象放回协程池。
// 这个 for 循环将一直从task通道接收任务,直至通道关闭或取出nil任务才会终止,期间协程将一直保持运行。
// 也就是说,每个 goWorker 只会启动一次协程,后续可以重复利用这个协程,这就是 ants 高性能的关键所在。
func (w *goWorker) run() {w.pool.addRunning(1)go func() {defer func() {if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() {w.pool.once.Do(func() {close(w.pool.allDone)})}w.pool.workerCache.Put(w)if p := recover(); p != nil {if ph := w.pool.options.PanicHandler; ph != nil {ph(p)} else {w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack())}}// 通知有空闲的工作协程w.pool.cond.Signal()}()for f := range w.task {if f == nil {return}f()                                       // 用户的程序if ok := w.pool.revertWorker(w); !ok {    // 将本协程释放,插入工作协程池中return}}}()
}// 将本协程释放,插入工作协程池中
func (p *Pool) revertWorker(worker *goWorker) bool {if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {p.cond.Broadcast()return false}worker.lastUsed = p.nowTime()p.lock.Lock()// To avoid memory leaks, add a double check in the lock scope.if p.IsClosed() {p.lock.Unlock()return false}// 将worker插入到工作协程池if err := p.workers.insert(worker); err != nil {p.lock.Unlock()return false}// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.p.cond.Signal()p.lock.Unlock()return true
}func (w *goWorker) finish() {w.task <- nil
}func (w *goWorker) lastUsedTime() time.Time {return w.lastUsed
}func (w *goWorker) inputFunc(fn func()) {w.task <- fn
}func (w *goWorker) inputParam(interface{}) {panic("unreachable")
}

核心原理

  1. 复用Go协程:每一个协程作为一个工作协程,执行完当前任务后,会标记为已结束,进入空闲状态,通过工作协程池的堆栈复用/循环队列复用,寻找下一个执行,这个应该是核心提高性能的地方。
  2. 减少内存分配:构造了两种使用sync.Pool包来减少内存分配的频率,通过重用已经存在但不再使用的对象(即工作协程)来达到这一目的。
  3. 协程池组织模式:提供了两种:列表/堆栈,循环队列,用来控制是否需要进行提前的协程池分配,以及存取协程过程中分配方式:(非严格意义)LIFO,(非严格意义)FIFO
  4. 空闲协程回收:ants使用时钟精确控制哪些工作协程需要进行垃圾回收,这一部分在本文没有详细介绍

举个🌰

func main() {pool, _ := ants.NewPool(5)var wg sync.WaitGroupfor i := 0; i < 10; i++ {count := iwg.Add(1)_ = pool.Submit(func() {fmt.Printf("Running task %d, timeNow: %v\n", count, time.Now())time.Sleep(1 * time.Second) // 模拟耗时任务wg.Done()})}wg.Wait()fmt.Println("All tasks completed.")
}

// 输出

Running task 1, timeNow: 2024-08-15 10:32:16.729052 +0800 CST m=+0.023291751
Running task 3, timeNow: 2024-08-15 10:32:16.72925 +0800 CST m=+0.023490418
Running task 4, timeNow: 2024-08-15 10:32:16.729255 +0800 CST m=+0.023495168
Running task 0, timeNow: 2024-08-15 10:32:16.72905 +0800 CST m=+0.023289626
Running task 2, timeNow: 2024-08-15 10:32:16.729062 +0800 CST m=+0.023302334
Running task 6, timeNow: 2024-08-15 10:32:17.729546 +0800 CST m=+1.023782501
Running task 8, timeNow: 2024-08-15 10:32:17.729611 +0800 CST m=+1.023847543
Running task 5, timeNow: 2024-08-15 10:32:17.729532 +0800 CST m=+1.023768209
Running task 9, timeNow: 2024-08-15 10:32:17.729643 +0800 CST m=+1.023879126
Running task 7, timeNow: 2024-08-15 10:32:17.729586 +0800 CST m=+1.023822626
All tasks completed.

可以看到,因为协程池大小为5,所以前5个任务都在0s执行,后5个在1s执行。

对比一下

共同点

  1. 通过sync.Pool (可以理解成 GMP 模型中的P)注册一个工作协程,通过复用工作协程,达到减少内存分配,减轻垃圾回收的压力。
  2. 都拥有类似的任务队列池、工作协程池的概念,实现对每个任务由哪个工作协程执行的工作。
  3. 都配置了垃圾回收的方式。
  4. 调用方式相似&&简单。

不同点

原生Channel实现gopkg/gopoolpanjf2000/ants
运行效率
复用方式通过工作协程自己for循环寻找未执行任务通过堆栈/循环队列管控空闲工作协程,控制工作协程如何分配给任务
错误处理专门的可配置的方法专门的可配置的方法
集成metrics集成了公司自研的metrics
灵活度可支持扩容的阈值有大量optional参数,可以配置是否阻塞、是否预分配、回收周期、支持动态扩容协程池大小

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/65626.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

无人机无法返航紧急处理方式!

一、检查飞行环境 了解禁飞原因和规定&#xff1a;首先&#xff0c;需要了解所在地区的无人机飞行规定&#xff0c;确认是否存在禁飞区或限飞区。如果处于禁飞区&#xff0c;应遵守相关规定&#xff0c;不要强行飞行。 检查天气情况&#xff1a;恶劣的天气条件&#xff08;如…

NLP论文速读(NeurIPS 2024)|BERT作为生成式上下文学习者BERTs are Generative In-Context Learners

论文速读|BERTs are Generative In-Context Learners 论文信息&#xff1a; 简介&#xff1a; 本文探讨了在自然语言处理&#xff08;NLP&#xff09;领域中&#xff0c;上下文学习&#xff08;in-context learning&#xff09;的能力&#xff0c;这通常与因果语言模型&#x…

vue3<script setup>中使用Swiper

swiper网址 Swiper中文网-轮播图幻灯片js插件,H5页面前端开发 Swiper - The Most Modern Mobile Touch Slider 安装 Swiper npm安装&#xff1a; npm install swiper yarn安装&#xff1a; yarn add swiper 导入带有所有模块&#xff08;捆绑包&#xff09;的 Swiper //…

今日收获(C语言)

一.文件的打开 有这样一个结构体&#xff0c;它内部是文件信息区&#xff0c;文件信息区中的变化可以影响到硬盘中的数据。这个结构体的名字是FILE。我们如果想要写代码对文件进行各种操作&#xff0c;就需要一个指向文件信息区的指针&#xff0c;这个指针的类型是FILE*&#…

node.js卸载并重新安装(超详细图文步骤)

卸载node.js 重新安装nodejs 一、卸载 1、首先进入控制面板卸载程序 2、卸载后 到文件夹中进行进一步的删除 删除上述的几个文件夹 每个人可能不一样&#xff0c;总之是找到自己的nodejs安装路径&#xff0c;下面是我的 ①删除C:UsersAdminAppDataRoaming路径下的npm相关文件…

仓颉编程语言:编程世界的 “文化瑰宝”

我的个人主页 在当今编程领域百花齐放的时代&#xff0c;各种编程语言争奇斗艳&#xff0c;服务于不同的应用场景和开发者群体。然而&#xff0c;有这样一种编程语言&#xff0c;它承载着独特的文化内涵&#xff0c;宛如编程世界里一颗熠熠生辉的“文化瑰宝”&#xff0c;那就…

Android使用JAVA调用JNI原生C++方法

1.native-lib.cpp为要生成so库的源码文件 2.JNI函数声明说明 NewStringUTF函数会返回jstring JNI函数声明规则 3.JAVA中声明及调用JNI函数 声明&#xff1a; 调用

DAY178内网渗透之内网对抗:横向移动篇入口差异切换上线IPC管道ATSC任务Impacket套件UI插件

1.内网横向移动 1、横向移动篇-入口点分析-域内域外打点 2、横向移动篇-IPC利用-连接通讯&计划任务, 3、横向移动篇-IPC利用-命令模式&工具套件 1.1 横向移动入口知识点 收集到域内用户和凭据后&#xff0c;为后续利用各种协议密码喷射通讯上线提供条件&#xff0c;…

宠物行业的出路:在爱与陪伴中寻找增长新机遇

在当下的消费市场中&#xff0c;如果说有什么领域能够逆势而上&#xff0c;宠物行业无疑是一个亮点。当人们越来越注重生活品质和精神寄托时&#xff0c;宠物成为了许多人的重要伴侣。它们不仅仅是家庭的一员&#xff0c;更是情感的寄托和生活的调剂。然而&#xff0c;随着行业…

MySQL数据库——索引结构之B+树

本文先介绍数据结构中树的演化过程&#xff0c;之后介绍为什么MySQL数据库选择了B树作为索引结构。 文章目录 树的演化为什么其他树结构不行&#xff1f;为什么不使用二叉查找树&#xff08;BST&#xff09;&#xff1f;为什么不使用平衡二叉树&#xff08;AVL树&#xff09;&a…

大模型—Ollama 结构化输出

Ollama 结构化输出 Ollama现在支持结构化输出,使得可以按照由JSON模式定义的特定格式来约束模型的输出。Ollama的Python和JavaScript库已经更新,以支持结构化输出。 结构化输出的用例包括: 从文档中解析数据从图像中提取数据结构化所有语言模型响应比JSON模式更可靠和一致开…

欧拉计划 Project Euler 35 题解

欧拉计划 Problem 35 题解 题干思路code暴力筛法rotate函数使用语法示例代码 题干 思路 一个很自然的思路就是暴力找&#xff0c;遍历一百万之内的所有数&#xff0c;也可以先把一百万以内所有的素数筛出来然后从中取选。这里我使用的是暴力算法。 code 暴力 #include <…

pytorch基础之注解的使用--003

Title 1.学习目标2.定义3.使用步骤4.结果 1.学习目标 针对源码中出现一些注解的问题&#xff0c;这里专门写一篇文章进行讲解。包括如何自定义注解&#xff0c;以及注意事项&#xff0c;相信JAVA中很多朋友业写过&#xff0c;但是今天写的是Python哦。。。 2.定义 在 Python…

C#编写的金鱼趣味小应用 - 开源研究系列文章

今天逛网&#xff0c;在GitHub中文网上发现一个源码&#xff0c;里面有这个金鱼小应用&#xff0c;于是就下载下来&#xff0c;根据自己的C#架构模板进行了更改&#xff0c;最终形成了这个例子。 1、 项目目录&#xff1b; 2、 源码介绍&#xff1b; 1) 初始化&#xff1b; 将样…

高效搭建Nacos:实现微服务的服务注册与配置中心

一、关于Nacos 1.1 简介 Nacos&#xff08;Dynamic Naming and Configuration Service&#xff09;是阿里巴巴开源的一款动态服务发现、配置管理和服务管理平台。它旨在帮助开发者更轻松地构建、部署和管理分布式系统&#xff0c;特别是在微服务架构中。Nacos 提供了简单易用…

112、Qt MSVC编译Qtxlsx

先参考103、QT搭建Excel表环境-使用Qtxlsx库文档&#xff0c;下载xlsx源码以及安装perl环境 并配置VS2019和perl环境变量 Qtxlsx库源码下载&#xff1a;https://github.com/dbzhang800/QtXlsxWriter 解压至非中文路径下 打开Qt自带的MSVC 2019命令框进入文件夹并运行命令生成…

频域滤波为什么使用psf2otf函数?

MATLAB中circshift函数是psf2otf函数的核心&#xff0c;在MATLAB中circshift函数的原理分析——psf2otf函数的核心直观解释了为什么需要循环移位。 MATLAB提出了psf2otf函数&#xff0c;先做循环移位&#xff0c;再计算离散傅里叶变换。如果有空域的卷积核&#xff0c;通过这个…

PySide6 SQLite3 做的 电脑组装报价系统

一、数据库结构说明 1. 配件类别表 (component_categories) 字段名类型说明约束category_idINTEGER类别IDPRIMARY KEY, AUTOINCREMENTcategory_nameTEXT类别名称NOT NULL, UNIQUEdescriptionTEXT类别描述 2. 配件表 (components) 字段名类型说明约束component_idINTEGER配件…

Android 部分操作(待补充

新建的线性布局.xml文件&#xff0c;文件名是 linearlayout.xml&#xff0c;根元素设置LinearLayout&#xff1b; 对于线性布局&#xff0c;调整第一个元素相对于顶部的位置&#xff0c;通过属性 layout_marginTop 设置后调整第一个元素的位置&#xff0c;后边的元素会依次向…

Android笔试面试题AI答之Android基础(7)

Android入门请看《Android应用开发项目式教程》&#xff0c;视频、源码、答疑&#xff0c;手把手教 文章目录 1.Android开发如何提高App的兼容性&#xff1f;**1. 支持多版本 Android 系统****2. 适配不同屏幕尺寸和分辨率****3. 处理不同硬件配置****4. 适配不同语言和地区**…