Go协程池gopool源码解析

1、gopool简介

Repository:https://github.com/bytedance/gopkg/tree/develop/util/gopool

gopool is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines.

It is an alternative to the go keyword.

gopool的用法非常简单,将曾经我们经常使用的go func(){...}替换为gopool.Go(func(){...})即可

此时gopool将会使用默认的配置来管理你启动的协程,也可以选择针对业务场景配置池子大小以及扩容上限

old:

go func() {// do your job
}()

new:

gopool.Go(func(){// do your job
})

2、核心数据结构

1)、Pool

Pool是一个定义了协程池的接口,代码如下:

// util/gopool/pool.go
type Pool interface {// Name returns the corresponding pool name.// 协程池的名称Name() string// SetCap sets the goroutine capacity of the pool.// 设置协程池内goroutine的容量SetCap(cap int32)// Go executes f.// 执行f函数Go(f func())// CtxGo executes f and accepts the context.// 带ctx,执行f函数CtxGo(ctx context.Context, f func())// SetPanicHandler sets the panic handler.// 设置发生panic时调用的函数SetPanicHandler(f func(context.Context, interface{}))
}

gopool提供了Pool这个接口的默认实现pool,代码如下:

// util/gopool/pool.go
type pool struct {// The name of the pool// 协程池的名字name string// capacity of the pool, the maximum number of goroutines that are actually working// 协程池实际工作的goroutine的最大数量cap int32// Configuration information// 配置信息config *Config// linked list of tasks// task队列的元信息,每一个task代表一个待执行的函数taskHead  *tasktaskTail  *tasktaskLock  sync.MutextaskCount int32// Record the number of running workers// 当前有多少个worker在运行中,每个worker代表一个goroutineworkerCount int32// This method will be called when the worker panic// 协程池中的协程引发的panic会由该函数处理panicHandler func(context.Context, interface{})
}

pool数据结构如下图:

2)、task
// util/gopool/pool.go
type task struct {// 当前task的ctxctx context.Context// 当前task需要执行的函数ff func()// 指向下一个task的指针next *task
}

task是一个链表结构,可以把它理解为一个待执行的任务,包含了当前task需要执行的函数f func()以及指向下一个task的指针

一个协程池pool对应了一组task,pool维护了指向链表的头尾的两个指针:taskHead和taskTail以及链表的长度taskCount和对应的锁taskLock

3)、worker
// util/gopool/worker.go
type worker struct {pool *pool
}

一个worker就是逻辑上的一个执行器,它对应到一个协程池pool

当一个worker被唤起,将会开启一个goroutine,不断从pool中的task链表获取任务并执行,代码如下:

// util/gopool/worker.go
func (w *worker) run() {go func() {for {var t *task// 操作pool中的task链表前,加锁保证并发安全w.pool.taskLock.Lock()if w.pool.taskHead != nil {// 拿到taskHead准备执行t = w.pool.taskHead// 更新链表的head以及数量w.pool.taskHead = w.pool.taskHead.nextatomic.AddInt32(&w.pool.taskCount, -1)}if t == nil {// if there's no task to do, exit// 如果前一步拿到的taskHead为空,说明无任务需要执行,清理后返回(关闭goroutine)w.close()w.pool.taskLock.Unlock()w.Recycle()return}w.pool.taskLock.Unlock()// 执行任务,针对panic会recover,并调用配置的handlerfunc() {defer func() {if r := recover(); r != nil {msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())logger.CtxErrorf(t.ctx, msg)if w.pool.panicHandler != nil {w.pool.panicHandler(t.ctx, r)}}}()t.f()}()t.Recycle()}}()
}

3、核心API

来看下使用gopool的核心APIGo(f func()),实现如下:

// util/gopool/gopool.go
func Go(f func()) {CtxGo(context.Background(), f)
}func CtxGo(ctx context.Context, f func()) {defaultPool.CtxGo(ctx, f)
}
// util/gopool/pool.go
func (p *pool) CtxGo(ctx context.Context, f func()) {// 创建一个task对象,将ctx和待执行的函数赋值t := taskPool.Get().(*task)t.ctx = ctxt.f = f// 将task插入pool的链表的尾部,更新链表数量p.taskLock.Lock()if p.taskHead == nil {p.taskHead = tp.taskTail = t} else {p.taskTail.next = tp.taskTail = t}p.taskLock.Unlock()atomic.AddInt32(&p.taskCount, 1)// The following two conditions are met:// 1. the number of tasks is greater than the threshold.// 2. The current number of workers is less than the upper limit p.cap.// or there are currently no workers.// 以下任意条件满足时,创建新的worker并唤起执行// 1.待执行的task超过了扩容阈值(默认值为1)且当前运行的worker数量小于上限(默认值为10000)// 2.无worker运行if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {// worker数量+1p.incWorkerCount()// 创建一个新的worker,并把当前pool赋值w := workerPool.Get().(*worker)w.pool = p// 唤起worker执行w.run()}
}

以下任意条件满足时,会扩容worker:

  1. 待执行的task超过了扩容阈值(默认值为1)且当前运行的worker数量小于上限(默认值为10000)
  2. 无worker运行

gopool自行维护一个defaultPool,这是一个默认的pool结构体,在引入包的时候就进行初始化。当我们直接调用gopool.Go()时,本质上是调用了defaultPool的同名方法

// util/gopool/gopool.go
var defaultPool Poolfunc init() {defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig())
}
// util/gopool/config.go
const (defaultScalaThreshold = 1
)type Config struct {// threshold for scale.// new goroutine is created if len(task chan) > ScaleThreshold.// defaults to defaultScalaThreshold.// 控制扩容的阈值,一旦待执行的task超过此值,且worker数量未达到上限,就开始启动新的workerScaleThreshold int32
}func NewConfig() *Config {c := &Config{ScaleThreshold: defaultScalaThreshold,}return c
}

defaultPool的名称为gopool.DefaultPool,池子容量一万,扩容阈值为1

当调用gopool.Go()时,gopool就会更新维护的任务链表,并且判断是否需要扩容worker:

  • 若此时已经有很多worker启动(底层一个worker对应一个goroutine),不需要扩容,就直接返回
  • 若判断需要扩容,就创建一个新的worker,并调用worker.run()方法启动,各个worker会异步地检查pool里面的任务链表是否还有待执行的任务,如果有就执行

gopool中三个角色的定位:

  • task是一个待执行的任务节点,同时还包含了指向下一个任务的指针,链表结构
  • worker是一个实际执行任务的执行器,它会异步启动一个goroutine执行协程池里面未执行的task
  • pool是一个逻辑上的协程池,对应了一个task链表,同时负责维护task状态的更新,以及在需要的时候创建新的worker

gopool核心实现原理如下图:

4、使用sync.Pool进行性能优化

gopool中多次使用了sync.Pool来池化对象的创建,复用woker和task对象

task池化:

// util/gopool/pool.go
var taskPool sync.Poolfunc init() {taskPool.New = newTask
}func newTask() interface{} {return &task{}
}func (t *task) Recycle() {t.zero()taskPool.Put(t)
}

worker池化:

// util/gopool/worker.go
var workerPool sync.Poolfunc init() {workerPool.New = newWorker
}func newWorker() interface{} {return &worker{}
}func (w *worker) Recycle() {w.zero()workerPool.Put(w)
}

参考:

解析 Golang 协程池 gopool 设计与实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/800857.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux进阶之路】地址篇

文章目录 一、ipv4地址1. 基本概念2. 分类3.CIDR4.特殊的ip地址 二、IP协议1. 协议字段2.分片与重组3.路由 三、NAT技术1.公有和私有2.NAT3.NAPT 四、ARP协议1.MAC地址2.ARP 五、DHCP协议六、DNS协议尾序 一、ipv4地址 1. 基本概念 概念&#xff1a;IP地址&#xff0c;英文全…

从零自制docker-8-【构建实现run命令的容器】

文章目录 log "github.com/sirupsen/logrus"args...go moduleimport第三方包失败package和 go import的导入go build . 和go runcli库log.SetFormatter(&log.JSONFormatter{})error和nil的关系cmd.Wait()和cmd.Start()arg……context.Args().Get(0)syscall.Exec和…

【Leetcode每日一题】 递归 - 验证二叉搜索树(难度⭐⭐)(53)

1. 题目解析 题目链接&#xff1a;98. 验证二叉搜索树 这个问题的理解其实相当简单&#xff0c;只需看一下示例&#xff0c;基本就能明白其含义了。 2.算法原理 中序遍历是二叉树遍历中的一种重要方式&#xff0c;它按照左子树、根节点、右子树的顺序访问每个节点。这种方式…

2024mathorcup妈妈杯数学建模A题思路模型

2024mathorcup妈妈杯数学建模A题思路模型&#xff1a;比赛开始后第一时间更新&#xff0c;更新见文末名片&#xff0c;下面对2022年B题进行介绍&#xff1a; 2022Mathorcup B题题目介绍 ​ B题无人仓的搬运机器人调度问题本题考在无人仓内的仓库管理问题之一&#xff0c;搬运机…

机场数据治理系列介绍(5)民用机场智慧能源系统评价体系设计

目录 一、背景 二、体系设计 1、评价体系设计维度 2、评价体系相关约定 3、评价指标体系框架设计 4、能源利用评价指标 5、环境友好评价指标 6、智慧管控评价指标 7、安全保障评价指标 三、具体落地措施 一、背景 在“双碳”国策之下&#xff0c;各类机场将能源系统建…

深入解析template,掌握C++模板的精髓!

掌握C模板&#xff08;template&#xff09;的优雅之道&#xff01; 一、什么是模板&#xff1f;二、模板如何工作&#xff1f;三、C 中的模板类型3.1、 类模板3.2、 函数模板 四、模板参数推导4.1、模板参数推导示例4.2、函数模板参数推导4.3、类模板参数推导&#xff08;C17 …

2024年MathorCup妈妈杯数学建模思路C题思路解析+参考成品

1 赛题思路 (赛题出来以后第一时间在群内分享&#xff0c;点击下方群名片即可加群) 2 比赛日期和时间 报名截止时间&#xff1a;2024年4月11日&#xff08;周四&#xff09;12:00 比赛开始时间&#xff1a;2024年4月12日&#xff08;周五&#xff09;8:00 比赛结束时间&…

GPU部署ChatGLM3

首先&#xff0c;检查一下自己的电脑有没有CUDA环境&#xff0c;没有的话&#xff0c;去安装一个。我的电脑是4060显卡&#xff0c;买回来就自带这些环境了。没有显卡的话&#xff0c;也不要紧&#xff0c;这个懒人安装包支持CPU运行&#xff0c;会自动识别没有GPU&#xff0c;…

启明智显M4核心板驱动17寸屏 为您打造无与伦比的视觉盛宴

近日&#xff0c;启明智显推出M4核心板驱动17寸屏&#xff0c;8 Link LVDS接口下1280*1024分辨率为用户展现了超强的视觉体验。 M4核心板采用纯国产架构&#xff0c;内置了16位DDR内存&#xff0c;为设备提供强大的数据处理能力和高效的运行速度。无论是处理复杂的任务还是进…

【简单讲解下C++max函数的使用】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

Java | Leetcode Java题解之第13题罗马数字转整数

题目&#xff1a; 题解&#xff1a; class Solution {Map<Character, Integer> symbolValues new HashMap<Character, Integer>() {{put(I, 1);put(V, 5);put(X, 10);put(L, 50);put(C, 100);put(D, 500);put(M, 1000);}};public int romanToInt(String s) {int …

考研数学|刷题用汤家凤《1800》还是张宇《1000》?看完这篇你就懂了

考研数学的复习是一个系统的过程&#xff0c;不同的习题集有各自的特点和适用场景。汤家凤的1800题和张宇的1000题都是非常受欢迎的考研数学复习资料&#xff0c;它们各有侧重点和优势。 汤家凤的1800题以其全面性和基础性著称&#xff0c;题目覆盖了考研数学的各个知识点&…

缓存穿透问题

缓存穿透 &#xff1a;缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在&#xff0c;这样缓存永远不会生效&#xff0c;这些请求都会打到数据库。 常见的两种解决方案&#xff1a; 1.缓存空对象 优点&#xff1a;实现简单&#xff0c;维护方便 缺点&#xff1a;占用…

【40分钟速成智能风控4】传统风险管理体系

目录 人工审核 纸质材料评估 电话回访 线下走访尽调 专家模型 业务规则库 专家调查权重法 熵权法 随着大数据和机器学习技术的发展与成熟&#xff0c;智能风控已经逐步取代传统风控&#xff0c;成为国内互联网金融机构主流的风险管理模式。一方面&#xff0c;传统风控是…

U盘中病毒了会影响电脑吗 U盘中病毒了怎么恢复数据 easyrecovery数据恢复软件免费版 easyrecovery绿色版破解版激活密钥无需注册

EasyRecovery是世界著名数据恢复公司 Ontrack 的技术杰作&#xff0c;EasyRecovery破解版是一个威力非常强大的硬盘数据恢复工具&#xff0c;能够帮你恢复丢失的数据以及重建文件系统。您只需要按软件提示一步一步操作&#xff0c;就能恢复出你电脑上的文档、表格、图片、音频、…

LeetCode 热题 100 | 多维动态规划(一)

目录 1 多维动态规划 2 62. 不同路径 3 64. 最小路径和 菜鸟做题&#xff0c;语言是 C&#xff08;细品动态规划 ing&#xff09; 1 多维动态规划 目前的感觉&#xff1a;抽象为二维数组。 2 62. 不同路径 题眼&#xff1a;“机器人每次只能向下或者向右移动一步”。…

什么是sso?

SSO&#xff08;Single Sign-On&#xff09;&#xff0c;即单点登录&#xff0c;是一种安全协议&#xff0c;它允许用户在多个应用程序之间使用同一组登录凭据进行身份验证。这意味着用户只需要登录一次&#xff0c;就可以访问多个需要身份验证的应用程序。 SSO的工作原理如下…

亚信安慧AntDB数据库分享“UltraSync特性介绍”技术演讲,助力客户降本增效

3月30日&#xff0c;由中国开源软件联盟组织的PostgreSQL技术峰会活动在南京举办&#xff0c;各数据库厂商研发带头人、企业资深DBA和众多技术爱好者齐聚一堂。湖南亚信安慧科技有限公司&#xff08;简称&#xff1a;亚信安慧&#xff09; AntDB-T产品线研发负责人梁博受邀参会…

「每日跟读」英语常用句型公式 第6篇

「每日跟读」英语常用句型公式 第6篇 1. As ___ as possible 越 ___ 越好 As soon as possible (ASAP)(越快越好) As happy as possible (越快乐越好) As prepared as possible (越有准备越好) As much/many as possible (越多越好 *不可数/可数) As early as possible …

探秘KMP算法:解密字符串匹配的黑科技

KMP算法 在正式进入KMP算法之前&#xff0c;不得不先引经据典一番&#xff0c;因为直接去理解KMP&#xff0c;你可能会很痛苦&#xff08;别问&#xff0c;问就是我也痛苦过&#xff09;。所以做好前面的预热工作非常非常重要&#xff0c;为了搞明白KMP&#xff0c;在没见到KMP…