1、总体架构
ants协程池,在使用上有多种方式(使用方式参考这篇文章:golang线程池ants-四种使用方法),但是在实现的核心就一个,如下架构图:
总的来说,就是三个数据结构: Pool、WorkerStack、goWorker以及这三个结构实现的方法,了解了这些,基本上对ants的实现原理就了如指掌了。
2、详细实现
2.1 worker的设计实现
worker结构如下:
type goWorker struct {// pool who owns this worker.pool *Pool// task is a job should be done.task chan func()// lastUsed will be updated when putting a worker back into queue.lastUsed time.Time
}
该结构设计非常简单,三个成员:归属的线程池、执行函数、该worker最后一次运行时间,goWorker结构实现如下接口:
type worker interface {run()finish()lastUsedTime() time.TimeinputFunc(func())inputParam(interface{})
}
核心函数run,该函数从管道task里获取到任务函数,并执行,执行完成后,将此worker放回协程池(此时worker阻塞等待任务到来,调用函数:w.pool.revertWorker(w)放回池子中),以便复用:
func (w *goWorker) run() {w.pool.addRunning(1)go func() {defer func() {if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() {w.pool.once.Do(func() {close(w.pool.allDone)})}w.pool.workerCache.Put(w)if p := recover(); p != nil {if ph := w.pool.options.PanicHandler; ph != nil {ph(p)} else {w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack())}}// Call Signal() here in case there are goroutines waiting for available workers.w.pool.cond.Signal()}()for f := range w.task {if f == nil {return}f()if ok := w.pool.revertWorker(w); !ok {return}}}()
}
finish函数,调用该函数,代表此worker的生命周期结束:
func (w *goWorker) finish() {w.task <- nil
}
这个时候run函数从遍历task管道中结束,进入defer函数,worker放入workerCache,备用。
inputFunc很容易理解,将任务放入管道,让worker去执行:
func (w *goWorker) inputFunc(fn func()) {w.task <- fn
}
2.2 workerStack结构
type workerStack struct {items []workerexpiry []worker
}
该结构就两个成员,都为worker的切片,items切片用于存储正常执行的worker,expiry存放过期的worker,workStack结构实现了如下接口:
type workerQueue interface {len() intisEmpty() boolinsert(worker) errordetach() workerrefresh(duration time.Duration) []worker // clean up the stale workers and return themreset()
}
len函数:返回正在运行worker的长度
isEmpty函数:判断是否有正在运行的worker
insert函数:将worker插入切片。
detach函数:获取一个worker。
refresh:更新所有worker,淘汰过期worker。
reset:清除所有worker。
重点看refresh函数:
func (wq *workerStack) refresh(duration time.Duration) []worker {n := wq.len()if n == 0 {return nil}expiryTime := time.Now().Add(-duration)index := wq.binarySearch(0, n-1, expiryTime)wq.expiry = wq.expiry[:0]if index != -1 {wq.expiry = append(wq.expiry, wq.items[:index+1]...)m := copy(wq.items, wq.items[index+1:])for i := m; i < n; i++ {wq.items[i] = nil}wq.items = wq.items[:m]}return wq.expiry
}
这个函数用于根据给定的时间间隔duration
来刷新工作队列中的过期项。主要执行以下步骤:
-
获取队列长度:首先,通过调用
wq.len()
获取工作队列wq
中当前元素的数量n
。如果队列为空(即n == 0
),则直接返回nil
,表示没有过期项。 -
计算过期时间:通过
time.Now().Add(-duration)
计算出一个时间点,这个时间点是duration
时间之前的时间,即认为是“过期”的时间点。 -
二分查找:使用
wq.binarySearch(0, n-1, expiryTime)
在队列中查找第一个过期项的位置(即第一个最后使用时间早于expiryTime
的项)。这个函数返回一个索引,如果找到这样的项,则返回该项的索引;如果没有找到,则返回-1
。 -
清理过期项:
- 首先,清空
wq.expiry
切片,用它来存储所有过期的项。 - 如果找到了过期项(即
index != -1
),则将wq.items
中从0
到index
(包含index
)的所有项(即所有过期项)追加到wq.expiry
中。 - 然后,使用
copy
函数将wq.items
中从index+1
到n-1
的所有项向前移动,覆盖掉前面的过期项。这里m
是copy
函数返回的值,表示实际复制的元素数量,即队列中剩余的非过期项的数量。 - 接下来,遍历
wq.items
中从m
到n-1
的所有位置,将它们设置为nil。
- 最后,通过
wq.items = wq.items[:m]
更新wq.items
的长度,去除所有过期的项。
- 首先,清空
-
返回过期项:函数返回
wq.expiry
,这是一个包含所有被移除的过期项的切片。
需要注意的是,wq.items
是一个切片,用于存储工作项;wq.expiry
也是一个切片,用于临时存储过期的项。
2.3 Pool结构
pool结构的定义源码稍作改了一下,之前poolCommon的结构就是Pool的结构,目前最新版本做了一个简单的封装。
type Pool struct {poolCommon
}
type poolCommon struct {// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool// which submits a new task to the same pool.capacity int32// running is the number of the currently running goroutines.running int32// lock for protecting the worker queue.lock sync.Locker// workers is a slice that store the available workers.workers workerQueue// state is used to notice the pool to closed itself.state int32// cond for waiting to get an idle worker.cond *sync.Cond// done is used to indicate that all workers are done.allDone chan struct{}// once is used to make sure the pool is closed just once.once *sync.Once// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.workerCache sync.Pool// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lockwaiting int32purgeDone int32purgeCtx context.ContextstopPurge context.CancelFuncticktockDone int32ticktockCtx context.ContextstopTicktock context.CancelFuncnow atomic.Valueoptions *Options
}
创建一个线程池:
// NewPool instantiates a Pool with customized options.
func NewPool(size int, options ...Option) (*Pool, error) {if size <= 0 {size = -1}opts := loadOptions(options...)if !opts.DisablePurge {if expiry := opts.ExpiryDuration; expiry < 0 {return nil, ErrInvalidPoolExpiry} else if expiry == 0 {opts.ExpiryDuration = DefaultCleanIntervalTime}}if opts.Logger == nil {opts.Logger = defaultLogger}p := &Pool{poolCommon: poolCommon{capacity: int32(size),allDone: make(chan struct{}),lock: syncx.NewSpinLock(),once: &sync.Once{},options: opts,}}p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}if p.options.PreAlloc {if size == -1 {return nil, ErrInvalidPreAllocSize}p.workers = newWorkerQueue(queueTypeLoopQueue, size)} else {p.workers = newWorkerQueue(queueTypeStack, 0)}p.cond = sync.NewCond(p.lock)p.goPurge()p.goTicktock()return p, nil
}
看如下几行代码:
p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}
workerCache为sync.Pool类型,sync.Pool
是Go语言标准库中提供的一个对象池化的工具,旨在通过复用对象来减少内存分配的频率并降低垃圾回收的开销,从而提高程序的性能。其内部维护了一组可复用的对象。当你需要一个对象时,可以尝试从sync.Pool
中获取。如果sync.Pool
中有可用的对象,它将返回一个;否则,它会调用你提供的构造函数来创建一个新对象,sync.Pool
的New
字段是一个可选的函数,用于在池中无可用对象时创建新的对象。
这里这样写即为:当无可用的worker时,则通过New函数创建一个新的worker。
创建workder列表,内部其实就是创建了了一个切片,类型为workerStack,用于管理所有的worker。
p.workers = newWorkerQueue(queueTypeStack, 0)
NewPool函数执行完成后,一个协程池就创建完成了。
协程池创建完成后,需要用来处理任务,如何将任务函数传递到worker去执行呢?看如下函数:
// Submit submits a task to this pool.
//
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
// but what calls for special attention is that you will get blocked with the last
// Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a Pool with ants.WithNonblocking(true).
func (p *Pool) Submit(task func()) error {if p.IsClosed() {return ErrPoolClosed}w, err := p.retrieveWorker()if w != nil {w.inputFunc(task)}return err
}
函数的入参为一个无返回值、无入参的函数,因此所有需要worker执行的函数都是func()类型,w, err := p.retrieveWorker(),取出一个空闲worker,取出成功后,将任务传递到worker内部:w.inputFunc(task),注意,当线程池中所有worker都忙碌时,inputFunc函数阻塞,一直到有worker空闲。
其他主要的函数,从池中获取worker的函数:
func (p *Pool) retrieveWorker() (w worker, err error) {p.lock.Lock()retry:// First try to fetch the worker from the queue.if w = p.workers.detach(); w != nil {p.lock.Unlock()return}// If the worker queue is empty, and we don't run out of the pool capacity,// then just spawn a new worker goroutine.if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {p.lock.Unlock()w = p.workerCache.Get().(*goWorker)w.run()return}// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {p.lock.Unlock()return nil, ErrPoolOverload}// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.p.addWaiting(1)p.cond.Wait() // block and wait for an available workerp.addWaiting(-1)if p.IsClosed() {p.lock.Unlock()return nil, ErrPoolClosed}goto retry
}
这个函数,获取worker有三个逻辑:
- 当池中有空闲worker,直接获取。
- 当池中没有空闲worker,从缓存workerCache中取出过期的worker使用,复用资源,降低开销。
- 等待有worker执行完任务释放。(阻塞情况)
revertWorker,将worker放回池中,以执行下次的任务。
func (p *Pool) revertWorker(worker *goWorker) bool {if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {p.cond.Broadcast()return false}worker.lastUsed = p.nowTime()p.lock.Lock()// To avoid memory leaks, add a double check in the lock scope.// Issue: https://github.com/panjf2000/ants/issues/113if p.IsClosed() {p.lock.Unlock()return false}if err := p.workers.insert(worker); err != nil {p.lock.Unlock()return false}// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.p.cond.Signal()p.lock.Unlock()return true
}
以上就为ants线程池实现的主要技术细节,希望对各位热爱技术的同学们提供一些些帮助。
3、总结
ants协程池是一个高性能、易用的Go语言协程池库,它通过复用goroutines、自动调度任务、定期清理过期goroutines等方式,帮助开发者更加高效地管理并发任务。无论是处理网络请求、数据处理还是其他需要高并发性能的场景,ants协程池都是一个值得推荐的选择。