Go 控制协程(goroutine)的并发数量

在使用协程并发处理某些任务时, 其并发数量往往因为各种因素的限制不能无限的增大. 例如网络请求、数据库查询等等。

从运行效率角度考虑,在相关服务可以负载的前提下(限制最大并发数),尽可能高的并发。

在Go语言中,可以使用一些方法来控制协程(goroutine)的并发数量,以防止并发过多导致资源耗尽或性能下降

1、使用信号量(Semaphore)

可以使用 Go 语言中的 channel 来实现简单的信号量,限制并发数量

package mainimport ("fmt""sync"
)func worker(id int, sem chan struct{}) {sem <- struct{}{} // 占用一个信号量defer func() {<-sem // 方法运行结束释放信号量}()// 执行工作任务fmt.Printf("Worker %d: Working...\n", id)
}func main() {concurrency := 3sem := make(chan struct{}, concurrency)var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func(id int) {defer wg.Done()worker(id, sem)}(i)}wg.Wait()close(sem)
}

sem 是一个有缓冲的 channel,通过控制 channel 中元素的数量,实现了一个简单的信号量机制 

 2、使用协程池

可以创建一个固定数量的协程池,将任务分发给这些协程执行。 

package mainimport ("fmt""sync"
)func worker(id int, jobs <-chan int, results chan<- int) {//jobs等待主要协程往jobs放数据for j := range jobs {fmt.Printf("协程池 %d: 协程池正在工作 %d\n", id, j)results <- j}
}func main() {const numJobs = 5    //协程要做的工作数量const numWorkers = 3 //协程池数量jobs := make(chan int, numJobs)results := make(chan int, numJobs)var wg sync.WaitGroup// 启动协程池for i := 1; i <= numWorkers; i++ {wg.Add(1)go func(id int) {defer wg.Done()worker(id, jobs, results)}(i)}// 提交任务for j := 1; j <= numJobs; j++ {jobs <- j}close(jobs)// 等待所有工作完成go func() {wg.Wait()close(results)}()// 处理结果for result := range results {fmt.Println("Result:", result)}
}

jobs 通道用于存储任务,results 通道用于存储处理结果。通过创建固定数量的工作协程,可以有效地控制并发数量。

3、使用其他包

Go 1.16 引入了 golang.org/x/sync/semaphore 包,它提供了一个更为灵活的信号量实现。

案例一

限制对外部API的并发请求 

假设我们有一个外部API,它对并发请求有限制,我们希望确保不超过这个限制。我们可以使用semaphore.Weighted来控制对API的并发访问

package mainimport ("context""fmt""golang.org/x/sync/semaphore""sync""time"
)func main() {/*1、在并发量一定的情况下,通过改变允许并发请求数可以更快处理请求任务(在CPU够用的前提下)2、sem := semaphore.NewWeighted(n),参数n就是权重量3、当一个协程需要获取的单位的权重越多,运行就会慢(比如权重总量n=5个,一个协程分配了2个,跟一个协程分配1个效率是不一样的)4、信号量没有足够的可用权重的情况发生在所有已分配的权重单位都已经被占用,即信号量的当前权重计数达到了它的总容量。在这种情况下,任何尝试通过Acquire方法获取更多权重的调用都将无法立即完成,从而导致调用者(通常是goroutine)阻塞,直到其他调用者释放一些权重单位。*//*1、权权重较大的任务在资源竞争时有更高的优先级,更容易获得执行的机会2、如果当前资源足够满足高权重任务的需求,这些任务将立即执行;若资源不足,则按照权重高低顺序排队等待3、一旦任务开始执行,其完成的速度主要取决于任务自身的逻辑复杂度、所需资源以及系统的当前负载等因素,与任务在信号量中的权重无关3、高权重的任务并不会中断已经在执行的低权重任务,而是等待这些任务自行释放资源。一旦资源释放,等待队列中的高权重任务将优先被唤醒4、Acquire 方法会检查当前信号量的可用资源量是否满足请求的权重,如果满足,则立即减少信号量的资源计数并返回,允许任务继续执行。如果不满足,任务将阻塞等待,直到有足够的资源被释放*/// 记录开始时间startTime := time.Now()// 假设外部API允许的最大并发请求为5(信号量的总容量是5个权重单位)const (maxConcurrentRequests = 5)sem := semaphore.NewWeighted(maxConcurrentRequests)var wg sync.WaitGroup// 模拟对外部API的10个并发请求for i := 0; i < 10; i++ {wg.Add(1)go func(requestId int) {defer wg.Done()// 假设我们想要获取2个单位的权重if err := sem.Acquire(context.Background(), 2); err != nil {fmt.Printf("请求 %d 无法获取信号量: %v\n", requestId, err)return}defer sem.Release(2) // 请求完成后释放信号量// 模拟对API的请求处理fmt.Printf("请求 %d 开始...\n", requestId)time.Sleep(2 * time.Second) // 模拟网络延迟fmt.Printf("请求 %d 完成。\n", requestId)}(i)}wg.Wait()// 记录结束时间endTime := time.Now()// 计算并打印总耗时fmt.Printf("程序总耗时: %v\n", endTime.Sub(startTime))
}

信号量没有足够的可用权重的情况发生在所有已分配的权重单位都已经被占用,即信号量的当前权重计数达到了它的总容量。在这种情况下,任何尝试通过Acquire方法获取更多权重的调用都将无法立即完成,从而导致调用者(通常是goroutine)阻塞,直到其他调用者释放一些权重单位。

以下是一些导致信号量没有足够可用权重的具体情况:

  1. 信号量初始化容量较小:如果信号量的总容量设置得较小,而并发请求的数量较大,那么很快就会出现权重不足的情况。

  2. 长时间占用权重:如果某些goroutine长时间占用权重单位而不释放,这会导致其他goroutine无法获取到权重,即使这些goroutine只是少数。

  3. 权重分配不均:在某些情况下,可能存在一些goroutine占用了不成比例的权重单位,导致其他goroutine无法获取足够的权重。

  4. 权重释放不及时:如果goroutine因为错误或异常情况提前退出,而没有正确释放它们所占用的权重,那么这些权重单位将不会被回收到信号量中。

  5. 高频率的请求:在短时间内有大量goroutine请求权重,即使它们请求的权重不大,累积起来也可能超过信号量的总容量。

  6. 信号量权重未正确管理:如果信号量的权重管理逻辑存在缺陷,例如错误地释放了过多的权重,或者在错误的时间点释放权重,也可能导致可用权重不足。

为了避免信号量没有足够的可用权重,可以采取以下措施:

  • 合理设置信号量容量:根据资源限制和并发需求合理设置信号量的总容量。
  • 及时释放权重:确保在goroutine完成工作后及时释放权重。
  • 使用超时:在Acquire调用中使用超时,避免无限期地等待权重。
  • 监控和日志记录:监控信号量的使用情况,并记录关键信息,以便及时发现和解决问题。
  • 权重分配策略:设计合理的权重分配策略,确保权重的公平和高效分配。

通过这些措施,可以更好地管理信号量的使用,避免因权重不足导致的并发问题。

案例二

假设有一个在线视频平台,它需要处理不同分辨率的视频转码任务。由于高清视频转码比标清视频更消耗计算资源,因此平台希望设计一个系统,能够优先处理更多标清视频转码请求,同时又不完全阻塞高清视频的转码,以保持整体服务质量和资源的有效利用。

package mainimport ("fmt""golang.org/x/net/context""golang.org/x/sync/semaphore""runtime""sync""time"
)// VideoTranscodeJob 视频转码任务
type VideoTranscodeJob struct {resolution stringweight     int64
}func main() {cpuCount := runtime.NumCPU()fmt.Printf("当前CPU个数%v\n", cpuCount)/*1、权权重较大的任务在资源竞争时有更高的优先级,更容易获得执行的机会2、如果当前资源足够满足高权重任务的需求,这些任务将立即执行;若资源不足,则按照权重高低顺序排队等待3、一旦任务开始执行,其完成的速度主要取决于任务自身的逻辑复杂度、所需资源以及系统的当前负载等因素,与任务在信号量中的权重无关3、高权重的任务并不会中断已经在执行的低权重任务,而是等待这些任务自行释放资源。一旦资源释放,等待队列中的高权重任务将优先被唤醒4、Acquire 方法会检查当前信号量的可用资源量是否满足请求的权重,如果满足,则立即减少信号量的资源计数并返回,允许任务继续执行。如果不满足,任务将阻塞等待,直到有足够的资源被释放*/// 初始化两个信号量,一个用于标清,一个用于高清,假设总共有8个CPU核心可用normalSem := semaphore.NewWeighted(6)  // 标清任务,分配6个单位权重,因为它们消耗资源较少highDefSem := semaphore.NewWeighted(2) // 高清任务,分配2个单位权重,因为它们更消耗资源var wg sync.WaitGroup//假设有20个需要转码的视频videoJobs := []VideoTranscodeJob{{"HD", 2}, {"HD", 2}, {"SD", 1}, {"SD", 1}, {"HD", 2},{"SD", 1}, {"SD", 1}, {"HD", 2}, {"SD", 1}, {"HD", 2},{"SD", 4}, {"SD", 4}, {"HD", 2}, {"SD", 1}, {"HD", 2},{"SD", 1}, {"SD", 4}, {"HD", 2}, {"SD", 6}, {"HD", 2},}for _, job := range videoJobs {wg.Add(1)go func(job VideoTranscodeJob) {defer wg.Done()var sem *semaphore.Weightedswitch job.resolution {case "SD":sem = normalSem //分配权重大,当前为6,任务在获取执行机会上有优势,但并不直接意味着执行速度快case "HD":sem = highDefSemdefault:panic("无效的分辨率")}if err := sem.Acquire(context.Background(), job.weight); err != nil {fmt.Printf("名为 %s 视频无法获取信号量: %v\n", job.resolution, err)return}defer sem.Release(job.weight) //释放权重对应的信号量// 模拟转码任务执行fmt.Printf("转码 %s 视频 (权重: %d)...\n", job.resolution, job.weight)//通过利用VideoTranscodeJob的weight值来模拟转码时间的长短,HD用时长则设置2比SD的1大,*时间就自然长,运行就时间长time.Sleep(time.Duration(job.weight*100) * time.Millisecond) // 模拟不同分辨率视频转码所需时间fmt.Printf("------------------------%s 视频转码完成。。。\n", job.resolution)}(job)}wg.Wait()
}

标清(SD)和高清(HD),分别分配了不同的权重(1和2)。通过创建两个不同权重的信号量,我们可以控制不同类型任务的同时执行数量,从而优先保证标清视频的快速处理,同时也确保高清视频能够在不影响系统稳定性的情况下进行转码。这展示了带权重的并发控制如何帮助在资源有限的情况下优化任务调度和执行效率。

 注意:对协程分配的权重单位数不能大于对应上下文semaphore.NewWeighted(n)中参数n的单位数

案例三

package mainimport ("context""fmt""sync""time""golang.org/x/sync/semaphore"
)type weightedTask struct {id     intweight int64
}func main() {const (maxTotalWeight = 20 // 最大总权重)sem := semaphore.NewWeighted(maxTotalWeight)var wg sync.WaitGrouptasksCh := make(chan weightedTask, 10)// 发送任务for i := 1; i <= 10; i++ {tasksCh <- weightedTask{id: i, weight: int64(i)} // 假设任务ID即为其权重}close(tasksCh)// 启动任务处理器for task := range tasksCh {wg.Add(1)go func(task weightedTask) {defer wg.Done()if err := sem.Acquire(context.Background(), int64(task.id)); err != nil {fmt.Printf("任务 %d 无法获取信号量: %v\n", task.id, err)return}defer sem.Release(int64(task.id)) //释放// 模拟任务执行fmt.Printf("任务 %d (权重: %d) 正在运行...\n", task.id, task.weight)time.Sleep(time.Duration(task.weight*100) * time.Millisecond) // 示例中简单用时间模拟权重影响fmt.Printf("任务 %d 完成.\n", task.id)}(task)}wg.Wait()
}

 

总结

选择哪种方法取决于具体的应用场景和需求。使用信号量是一种简单而灵活的方法,而协程池则更适用于需要批量处理任务的情况。golang.org/x/sync/semaphore 包提供了一个标准库外的更灵活的信号量实现         

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/16040.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

将list对象里的某一个属性取出组成一个新的list

使用Java8将对象里的某一个属性取出组成一个新的list List<Spgg1> listnew ArrayList<>();Spgg1 spgg1new Spgg1();spgg1.setSpdm("测试");spgg1.setGgdm("001");list.add(spgg1);Spgg1 spgg2new Spgg1();spgg2.setSpdm("测试2");sp…

Mysql 找出未提交事务的SQL及死锁

未提交事务&#xff1a; 通过查看information_schema.INNODB_TRX视图,您可以了解当前系统中正在运行的事务情况,从而进行问题排查和性能优化。 SELECT * FROM information_schema.innodb_trx; 通过trx_state为RUNNIG,trx_started判断是否有一直RUNNING的事务。 如果有未提交…

千帆【API接入】自定义组件简明教程

千帆【API接入】自定义组件简明教程 大家好&#xff0c;我是 JavaRoom&#xff0c;今天来给大家做一个自定义API接入来是心啊遣返组件创建案例简明教学。 1.准备工作 1.1 一个不欠费的千帆账号 财务总览地址&#xff1a;https://console.bce.baidu.com/billing/#/account/i…

“现代汽车中国前瞻软件赛杯” 牛客周赛 Round 43

A. 小红平分糖果&#xff08;签到&#xff09; // Problem: 小红平分糖果 // Contest: NowCoder // URL: https://ac.nowcoder.com/acm/contest/82394/A // Memory Limit: 524288 MB // Time Limit: 2000 ms // // Powered by CP Editor (https://cpeditor.org)#include<b…

作业-day-240524

使用sqlite3数据库&#xff0c;实现增删改查操作 #include <myhead.h>int do_add(sqlite3 *sqdb) {int add_num;char add_name[128];double add_score;printf("请输入要添加的学生编号:");scanf("%d",&add_num);printf("请输入要添加的学生…

以前:不会用电脑;现在:不会用AI

购买特价商务机票需要一些策略和灵活性。、 提前规划&#xff1a;商务舱票价通常在出发日期的2-3个月前最便宜。尽早规划您的行程并开始关注票价。 使用比价工具&#xff1a;使用Skyscanner、Kayak等比价网站来比较不同航空公司的商务舱票价。这些网站可以显示多个航空公司的价…

Jeecg | 如何解决 ERR Client sent AUTH, but no password is set 问题

最近在尝试Jeecg低代码开发&#xff0c;但是碰到了超级多的问题&#xff0c;不过总归是成功运行起来了。 下面说说碰到的最后一个配置问题&#xff1a;连接redis失败 Error starting ApplicationContext. To display the conditions report re-run your application with deb…

【ARMv7-A】——内联汇编

简介 使用内联汇编主要目的是为了提高效率,同时还是为了实现 C 语言无法实现的部分。 GNU内联汇编的基本格式: asm volatile("汇编语句": 输出部分: 输入部分: 会被修改的部分);ANSI C规范的关键字(前后都有两个下划线连接,中间没有空格): __asm__ __volati…

近期阅读论文

Exploring Hybrid Active-Passive RIS-Aided MEC Systems: From the Mode-Switching Perspective abstract 移动边缘计算&#xff08;MEC&#xff09;被认为是支持延迟敏感和计算密集型服务的有前途的技术。 然而&#xff0c;随机信道衰落特性导致的低卸载率成为制约MEC性能的…

Rust之函数、单元测试

1、函数 类似于C函数。 1.1、普通函数 在Rust中&#xff0c;函数的定义使用fn关键字&#xff0c;后跟函数名、参数列表、返回类型和函数体。函数体由一系列语句组成&#xff0c;用于执行特定的操作和计算。 函数定义&#xff1a; 使用fn关键字定义函数&#xff0c;函数由函数…

【C语言】C语言基础语法速览

C语言基础语法目录 C语言基础语法速览1. 变量类型1.1 类型字节数1.1 变量输出格式1.2 变量输入格式 2. 分支循环语句2.1 if分支语句2.2 switch 分支语句2.3 while循环语句2.4 do...while循环语句2.5 for循环语句 3. 数组3.1 一维数组3.2 二维数组 4. 结构体4.1 结构体类型定义4…

4个宝藏网站,免费即用,办公运营效率利器!

哈喽&#xff0c;各位小伙伴们好&#xff0c;我是给大家带来各类黑科技与前沿资讯的小武。 有很多朋友在日常办公时&#xff0c;需要发送邮件&#xff1b;在新媒体运营、设计及前端开发等工作场合中&#xff0c;都或多或少会遇上图片、视频等文件太大及格式问题需要压缩和转换…

数据结构(三)栈 队列 数组

2024年5月26日一稿(王道P78) 栈 基本概念 基本操作 顺序存储结构 基本操作 共享栈

微信小程序开发 懒加载+瀑布流+排序功能

在小程序的开发过程中&#xff0c;遇到了这样的功能需求&#xff1a;在保持瀑布流布局的情况下&#xff0c;使用懒加载来渲染页面&#xff0c;并且要求对其中的属性添加排序功能。 单独拿出来都是很好实现的功能&#xff0c;但是当三个功能联系在一起时&#xff0c;问题就出现…

数智乡村:是不是乡村治理治理的巨大进步呢?

一、什么是数智乡村 数智乡村是指借助信息技术和数据分析等手段&#xff0c;对乡村进行智能化管理和服务的模式。它可以提供更高效、精准、便利的服务&#xff0c;促进乡村经济发展和社会治理改善。因此&#xff0c;数智乡村确实可以被视为乡村治理的巨大进步。 二、数智乡村的…

大一久富农机实习与商业思维学习计划

学习目标&#xff1a; 大一久富农机实习与商业思维学习计划 一、目标 完成久富农机的实习&#xff0c;增强对农机行业的了解和实际操作能力。提升自身的商业思维能力&#xff0c;为未来的职业生涯打下坚实基础。组织并带领同学院的同学一起参加实习&#xff0c;增强团队合作…

腾讯发布ELLA:为扩散模型注入LLM能力,提升复杂场景的图像生成,准确率超90%

前言 近年来&#xff0c;基于扩散模型的文本到图像生成技术取得了显著进步&#xff0c;能够生成高质量、逼真的图像。然而&#xff0c;大多数扩散模型仍然使用CLIP作为文本编码器&#xff0c;这限制了它们理解复杂提示的能力&#xff0c;例如包含多个物体、详细属性、复杂关系…

python-pytorch seq2seq+luong dot attention笔记1.0.3

python-pytorch seq2seq+luong dot attention笔记1.0.0 可复用部分编写encoder编写Attention编写decoder编写seq2seq设定模型参数模型训练使用模型使用总结下一步参考链接可复用部分 主要将数据弄成如下格式: seq_example = [“你认识我吗”, “你住在哪里”, “你知道我的名…

ROS 2边学边练(51)-- 构建自定义(消息类型)RViz显示项

前言 一俩礼拜没有更新了&#xff0c;今天突然发现ROS 2突然有了新版本了&#xff0c;名为Jazzy&#xff0c;这更新速度与Qt有的一比。这么长时间没更新呢&#xff0c;一来工作上来活了&#xff0c;没那么多时间来继续ROS这块&#xff0c;二来&#xff0c;人懈怠了一点&#xf…

Grafana HTML Panel展示post获取后数据

<!DOCTYPE html> <html> <head><title>API 数据表格展示</title><script src"https://code.jquery.com/jquery-3.6.0.min.js"></script> </head> <body><table id"data-table" border"1&qu…