深入理解 Go 语言 Goroutine 的工作原理

一、设计思路

1、设计描述
  • 启动服务之时先初始化一个 Goroutine Pool 池,这个 Pool 维护了一个类似栈的 LIFO 队列,里面存放负责处理任务的 Worker
  • 然后在 client 端提交 task 到 Pool 中之后,在 Pool 内部,接收 task 之后的核心操作是
    • 检查当前 Worker 队列中是否有可用的 Worker,如果有,取出执行当前的 task
    • 没有可用的 Worker,判断当前在运行的 Worker 是否已超过该 Pool 的容量
    • 每个 Worker 执行完任务之后,放回 Pool 的队列中等待

2、Pool struct 
type sig struct{}type f func() error// pool从客户端获取任务,它限制 goroutines 总数,并且回收再使用
type Pool struct {capacity int32                   // 协程池容量running int32                    // 正在运行的goroutine数量expiryDuration time.Duration     // 为每个worker设置一个过期时间workers []*Worker                // 存放空闲 worker,请求进入 Pool先检查workers若有则取出绑定任务执行release chan sig                 // 当关闭该 Pool 支持通知所有 worker 退出运行以防 goroutine 泄露lock sync.Mutex                  // 同步操作锁once sync.Once                   // 确保 Pool 关闭操作只会执行一次
}
3、初始化 Pool 并启动定期清理过期 worker 任务
//新建一个线程池实例
func NewPool(size int) (*Pool,error) {return NewTimingPool(size,DefaultCleanIntervalTime)
}//产生一个带有自定义定时器的线程池实例
func NewTimingPool(size, expiry int) (*Pool,error) {if size <= 0{return nil,ErrInvalidPoolSize}if expiry <= {return nil,ErrInvalidPoolExpiry}p := &Pool{capacity:       int32(size),freeSignal:     make(chan sig, math.MaxInt32),release:        make(chan sig, 1),expiryDuration: time.Duration(expiry) * time.Second,}// 启动定期清理过期worker任务,独立goroutine运行,进一步节省系统资源p.monitorAndClear()return p, nil
}
4、提交任务到 Pool
  • 第一个 if 判断当前 Pool 是否已被关闭,若是则不再接受新任务,否则获取一个 Pool 中可用的 worker,绑定该 task 执行
func (p *Pool) Submit(task f) error {if len(p.release) > 0 {return ErrPoolClosed}w := p.getWorker()w.task <- taskreturn nil
}
5、获取可用 worker(核心)
  • p.getWorker()源码
//返回一个可用的 worker 来运行这些任务
func (p *Pool) getWorker() *Worker {var w *Workerwaiting :=false    //标志变量,判断当前正在运行的 worker 数量是否已到达 Pool 的容量上限p.lock。Lock()    //加锁,检测队列中是否有可用 worker,并进行相应操作idleWorkers := p.workersn := len(idleWorkers) - 1if n < 0 {	 // 当前队列中无可用worker// 判断运行worker数目已达到该Pool的容量上限,置等待标志waiting = p.Running() >= p.Cap()} else {		// 当前队列有可用worker,从队列尾部取出一个使用w = idleWorkers[n]idleWorkers[n] = nilp.workers = idleWorkers[:n]}p.lock.Unlock()	  // 检测完成,解锁if waiting {	 // Pool容量已满,新请求等待for {		 // 利用锁阻塞等待直到有空闲workerp.lock.Lock()idleWorkers = p.workersl := len(idleWorkers) - 1if l < 0 {p.lock.Unlock()continue}w = idleWorkers[l]idleWorkers[l] = nilp.workers = idleWorkers[:l]p.lock.Unlock()break}// 当前无空闲worker但是Pool还没有满,则可以直接新开一个worker执行任务} else if w == nil {w = &Worker{pool: p,task: make(chan f, 1),}w.run()// 运行worker数加一p.incRunning()}return w
}
6、执行任务
  • 结合前面的 p.Submit(task f) 和 p.getWorker() ,提交任务到 Pool 之后,获取一个可用 worker
  • 每新建一个 worker 实例之时都需要调用 w.run() 启动一个 goroutine 监听 worker 的任务列表 task ,一有任务提交进来就执行
  • 所以,当调用 worker 的 sendTask(task f) 方法提交任务到 worker 的任务队列之后,马上就可以被接收并执行
  • 当任务执行完之后,会调用 w.pool.putWorker(w *Worker) 方法将这个已经执行完任务的 worker 从当前任务解绑放回 Pool 中,以供下个任务可以使用
  • 至此,一个任务从提交到完成的过程就此结束,Pool 调度将进入下一个循环。
// Worker是运行任务的实际执行者,它启动一个接受任务并执行函数调用的goroutine
type Worker struct {pool *Pool   // 每个pool对应一个workertask chan f  // 任务是一项应该完成的工作recycleTime time.Time	 // 当将一个worker放回队列时,recycleTime将被更新。
}// Run启动一个goroutine以重复执行函数调用的过程
func (w *Worker) run() {go func() {// 循环监听任务列表,一旦有任务立马取出运行for f := range w.task {if f == nil {// 退出goroutine,运行worker数减一w.pool.decRunning()return}f()// worker回收复用w.pool.putWorker(w)}}()
}
7、worker回收(goroutine 复用)
// putWorker将一个worker放回空闲池,回收goroutines
func (p *Pool) putWorker(worker *Worker) {// 写入回收时间,亦即该worker的最后一次结束运行的时间worker.recycleTime = time.Now()p.lock.Lock()p.workers = append(p.workers, worker)p.lock.Unlock()
}
8、动态扩容或者缩小池容量
// ReSize更改此池的容量
func (p *Pool) ReSize(size int) {if size == p.Cap() {return}atomic.StoreInt32(&p.capacity, int32(size))diff := p.Running() - sizeif diff > 0 {for i := 0; i < diff; i++ {p.getWorker().task <- nil}}
}
9、定期清理过期 Worker
  • 定期检查空闲 worker 队列中是否有已过期的 worker 并清理
  • 因为采用了 LIFO 后进先出队列存放空闲 worker,所以该队列默认已经是按照 worker 的最后运行时间由远及近排序
  • 可以方便地按顺序取出空闲队列中的每个 worker 并判断它们的最后运行时间与当前时间之差是否超过设置的过期时长
  • 若是,则清理掉该 goroutine,释放该 worker,并且将剩下的未过期 worker 重新分配到当前 Pool 的空闲 worker 队列中,进一步节省系统资源
//  定期清理过期 Worker
func (p *Pool) periodicallyPurge() {heartbeat := time.NewTicker(p.expiryDuration)for range heartbeat.C {currentTime := time.Now()p.lock.Lock()idleWorkers := p.workersif len(idleWorkers) == 0 && p.Running() == 0 && len(p.release) > 0 {p.lock.Unlock()return}n := 0for i, w := range idleWorkers {if currentTime.Sub(w.recycleTime) <= p.expiryDuration {break}n = iw.task <- nilidleWorkers[i] = nil}n++if n >= len(idleWorkers) {p.workers = idleWorkers[:0]} else {p.workers = idleWorkers[n:]}p.lock.Unlock()}
}

二、pool 使用

1、公共池
package mainimport ("fmt""sync""time""github.com/panjf2000/ants/v2"
)func demoFunc() {time.Sleep(10 * time.Millisecond)fmt.Println("Hello World!")
}func main() {// 在retrieveWorker()中可能有一些调用者在等待,因此我们需要唤醒它们来防止那些无限阻塞的调用者defer ants.Release()var wg sync.WaitGroupsyncCalculateSum := func() {demoFunc()wg.Done()}for i := 0; i < 1000; i++ {wg.Add(1)_ = ants.Submit(syncCalculateSum)}wg.Wait()fmt.Printf("running goroutines: %d\n", ants.Running())fmt.Printf("finish all tasks.\n")
}/*Hello World!Hello World!running goroutines: 1000finish all tasks.*/
2、方法绑定池
package mainimport ("fmt""github.com/panjf2000/ants/v2""sync"
)func myFunc(i interface{}) {fmt.Printf("run with %d\n", i)
}func main() {defer ants.Release()var wg sync.WaitGroup// 使用池和函数,设置goroutine pool的容量为10,超时时间为1秒。p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {myFunc(i)wg.Done()})defer p.Release()// 逐个提交任务for i := 0; i < 1000; i++ {wg.Add(1)_ = p.Invoke(int32(i))}wg.Wait()fmt.Printf("running goroutines: %d\n", p.Running())
}/*
run with 976
run with 990
run with 971
running goroutines: 10*/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/220763.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据库数据恢复—Mysql数据库误删表数据的数据恢复案例

mysql数据库数据恢复环境&#xff1a; 本地服务器&#xff0c;windows server操作系统 &#xff0c;部署有mysql单实例&#xff0c;数据库引擎类型为innodb&#xff0c;独立表空间&#xff0c;无数据库备份&#xff0c;未开启binlog。 mysql数据库故障&#xff1a; 工作人员使…

勒索病毒最新变种.mallox勒索病毒来袭,如何恢复受感染的数据?

导言&#xff1a; 威胁着我们数据安全的勒索病毒如.mallox已经变得愈发狡猾和具有挑战性。本文91数据恢复将深入介绍.mallox勒索病毒的特征、恢复受害数据的方法&#xff0c;以及一些预防措施&#xff0c;助您更好地应对这一威胁。 如果受感染的数据确实有恢复的价值与必要性&…

HarmonyOS应用开发初体验

9月25日华为秋季全场景新品发布会上&#xff0c;余承东宣布&#xff0c;全面启动鸿蒙原生应用&#xff0c;HarmonyOS NEXT开发者预览版将在2024年第一季度面向开发者开放。 最近鸿蒙开发可谓是火得一塌糊涂&#xff0c;各大培训平台都开设了鸿蒙开发课程。美团发布了鸿蒙高级工…

ZLMediaKit 编译以及测试(Centos 7.9 环境)

文章目录 一、前言二、编译器1、获取代码2、编译器2.1 编译器版本要求2.2 安装编译器 3、安装cmake4、依赖库4.1 依赖库列表4.2 安装依赖库4.2.1 安装libssl-dev和libsdl-dev4.2.2 安装 ffmpeg-devel依赖和ffmpeg依赖 三、构建和编译项目&#xff08;启用WebRTC功能&#xff09…

JavaWeb笔记之MySQL数据库

#Author 流云 #Version 1.0 一、引言 1.1 现有的数据存储方式有哪些&#xff1f; Java程序存储数据&#xff08;变量、对象、数组、集合&#xff09;&#xff0c;数据保存在内存中&#xff0c;属于瞬时状态存储。 文件&#xff08;File&#xff09;存储数据&#xff0c;保存…

电商类app如何进行软件测试?有必要进行第三方软件测试吗?

电商类app在开发过程中&#xff0c;软件测试是一个非常重要的环节。通过软件测试&#xff0c;可以确保app在发布和使用过程中的稳定性和安全性。那么&#xff0c;电商类app究竟如何进行软件测试?是否有必要进行第三方软件测试? 一、电商类app如何进行软件测试?   1. 内部…

武汉小程序开发全攻略:从创意到上线,10个必备步骤详解

在移动互联网快速发展的今天&#xff0c;武汉小程序开发成为越来越受关注的领域。作为专业从业者&#xff0c;我将为您详细解读武汉小程序开发的全攻略&#xff0c;从创意到上线的十个必备步骤&#xff0c;助您轻松掌握小程序开发的要点。 步骤一&#xff1a;明确小程序定位与…

python numpy 两种方法将相同shape的一维数组合并为二维数组

1 np.column_stack 最简单的一种方法 将多个一维数据按【列】合并为二维数组 import numpy as np# a b 都是一维数组 a np.array((1,2,3)) b np.array((2,3,4))# 变成二维 merge np.column_stack((a,b)) # array([[1, 2],[2, 3],[3, 4]])2 np.hstack 尽管该函数也是对【列…

FPGA乒乓操作详解,知道与FIFO的区别吗?

FPGA乒乓操作是一种高效的数据流控制处理技巧&#xff0c;它主要应用于需要快速且连续数据处理和缓冲的场合。乒乓操作的核心在于利用两个缓冲区交替存储数据流&#xff0c;从而实现数据的无缝实时传输和处理。 本文将详细介绍乒乓操作的基本原理、应用场景以及与FIFO的区别。…

超越GPT-4!谷歌AI大模型Gemini震撼发布

原创 | 文 BFT机器人 在Open AI风头正盛之际&#xff0c;谷歌大杀器终于上线&#xff01; 当地时间12月6日&#xff0c;谷歌CEO桑达尔・皮查伊宣布正式推出其规模最大、功能最强大的新大型语言模型Gemini 1.0版。 据悉&#xff0c;Gemini 1.0是谷歌筹备了一年之久“对抗”GPT-…

python通过selenium获取输入框的文本值爬取编辑框内容

以百度首页的输入框为例,当输入‘你好‘后&#xff0c;html中的value的值会变成‘你好’ from selenium import webdriver web webdriver.Chrome() web.get(http://www.baidu.com) # 初始页面 cc web.find_element_by_xpath(//*[id"kw"]) #定位输入通过复制xpat…

Excel单元格隐藏如何取消?

Excel工作表中的有些单元格隐藏了数据&#xff0c;如何取消隐藏行列呢&#xff1f;今天分享几个方法给大家 方法一&#xff1a; 选中隐藏的区域&#xff0c;点击右键&#xff0c;选择【取消隐藏】就可以了 方法二&#xff1a; 如果工作表中有多个地方有隐藏的话&#xff0c;…

数据分析基础之《numpy(1)—介绍》

一、numpy介绍 1、numpy 数值计算库 num - numerical 数值化的 py - python 2、numpy是一个开源的python科学计算库&#xff0c;用于快速处理任意维度的数组 numpy支持常见的数组和矩阵操作。对于同样的数值计算任务&#xff0c;使用numpy比直接使用python要简洁的多 numpy使…

二、如何保证架构的质量、架构前期准备、技术填补与崩溃预防、系统重构

1、如何保证架构的质量 -- 稳定性和健壮性 2、正确的选择是良好的开端 -- 架构前期准备 ① 架构师分类&#xff1a;系统架构师、应用架构师、业务架构师 3、技术填补与崩溃预防 4、系统重构

Python创建代理IP池详细教程

一、问题背景 在进行网络爬虫或数据采集时&#xff0c;经常会遇到目标网站对频繁访问的IP进行封禁的情况&#xff0c;为了规避这种封禁&#xff0c;我们需要使用代理IP来隐藏真实IP地址&#xff0c;从而实现对目标网站的持续访问。 二、代理IP池的基本概念 代理IP池是一个包…

RLC防孤岛负载测试的操作和维护

孤岛现象是指当电网因故障或停电而与主电网断开连接时&#xff0c;某些部分仍然保持供电的现象。这种情况下&#xff0c;如果电力系统的保护设备不能及时检测到孤岛并切断供电&#xff0c;可能会导致严重的安全事故。因此&#xff0c;进行RLC防孤岛负载测试对于确保电力系统的安…

亿欧网首届“元创·灵镜”科技艺术节精彩纷呈,实在智能AI Agent智能体展现硬核科技图景

12月4日-10日&#xff0c;持续一周的首届“元创灵镜”科技艺术节在海南陵水香水湾拉开帷幕&#xff0c;虚实交互创造出的“海岛之镜”开幕式呈现出既真实又虚幻的未来感&#xff0c;融入前沿科技元素的艺术装置作品在“虚实之镜&自然生长”科技艺术展诠释着浪漫想象&#x…

C# WPF上位机开发(树形控件在地图软件中的应用)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 前面我们聊过图形软件的开发方法。实际上&#xff0c;对于绘制的图形&#xff0c;我们一般还会用树形控件管理一下。举个例子&#xff0c;一个地图…

功率信号源可以用在工业哪些产品上

功率信号源是一种关键的工业设备&#xff0c;其在各种产品和系统中发挥着至关重要的作用。这些信号源产生电信号&#xff0c;用于测试、校准、仿真和驱动各种工业设备。以下是功率信号源在工业中广泛应用的一些方面。 1.电源测试和校准 功率信号源常用于测试和校准电源系统。在…

【Spark精讲】Spark存储原理

目录 类比HDFS的存储架构 Spark的存储架构 存储级别 RDD的持久化机制 RDD缓存的过程 Block淘汰和落盘 类比HDFS的存储架构 HDFS集群有两类节点以管理节点-工作节点模式运行&#xff0c;即一个NameNode(管理节点)和多个DataNode(工作节点)。 Namenode管理文件系统的命名空…