Go-知识并发控制WaitGroup

Go-知识并发控制WaitGroup

  • 1. 认识 WaitGroup
  • 2. 基本原理
    • 2.1 信号量
    • 2.2 数据结构
    • 2.3 Add
    • 2.4 Wait
    • 2.5 Done
  • 3. 小例子
    • 3.1 主协程等待子协程执行完成
    • 3.2 子协程等待主协程信号
    • 3.3 GetFirst
  • 4. 总结

gitio: https://a18792721831.github.io/

1. 认识 WaitGroup

WaitGroup 是Go 应用开发过程中经常使用的并发控制技术。
WaitGroup 可理解为 Wait-Goroutine-Group,即等待一组 goroutine 结束。
比如主协程需要等待启动的N个子协程完成,主协程在退出,那么可以认为子协程就是一组。
比如主协程创建10个子协程,子协程睡眠1秒,然后子协程退出,主协程等待子协程退出后再退出:

func TestWaitGroup(t *testing.T) {sum := 10// 创建一个 waitGroupwg := sync.WaitGroup{}// 设置组内需要等待的数量是 10wg.Add(sum)for i := 0; i < sum; i++ {ti := igo func() {name := fmt.Sprintf("goroutine %d", ti)fmt.Println(name, "start")time.Sleep(3 * time.Second)fmt.Println(name, "end")// 每执行完一个,就将数量减少1wg.Done()}()}// 阻塞等待数量减少到0wg.Wait()fmt.Println("all goroutine done")
}

执行如下:
在这里插入图片描述

在上面程序中wg内部维护了一个计数器:

  • 启动goroutine前通过Add(sum)方法将计数器设置为待启动的goroutine的个数
  • 启动goroutine后,使用Wait方法阻塞自己,等待计数器变为0
  • 每个goroutine执行结束后通过Done方法将计数器减1
  • 计数器变为0后,阻塞的goroutine呗唤醒

WaitGroup也可以嵌套调用,以实现更复杂的并发管理逻辑,不过复杂就意味着难实现。

2. 基本原理

2.1 信号量

信号量是UNIX系统提供的一种保护共享资源的机制,用于防止多个线程同时访问某个资源。
信号量可简单理解为一个数值:

  • 当信号量 > 0 时,表示资源可用,获取信号量时系统自动将信号量减1
  • 当信号量 == 0 时,表示资源不可用,获取信号量时,当前线程会进入睡眠,当信号量为正时被唤醒。

2.2 数据结构

在源码包src/sync/waitgroup.go定义了数据结构:
在这里插入图片描述

上面的 noCopy 是内部结构,防止拷贝 WaitGroup 进行使用
在这里插入图片描述

而 state 分为两部分,高32位是当前还未执行结束的 goroutine 计数器 counter ,低32位是等待 goroutine-group 结束的 goroutine 数量 waiter count,即有多少个等候者。
sema 则是信号量。
同时 WaitGroup 对外暴露了三个方法:
Add(delta int): 将 delta 值加到 state 的高 32 位,也就是未执行结束的 goroutine 数量加 x
Wait: waiter 递增1,并阻塞等待信号 sema ,表示等候者+1
Done: counter 递减1,按照 waiter 数值释放相应次数信号量

2.3 Add

Add方法做了两件事,第一是把传入的值累加到 counter 中,因为传入的值可以为负值,也就是说 counter 有可能变成 0 或者负值。
第二就是当 counter 值变为 0 时,根据 waiter count 的值释放等量的信号量,把等待的 goroutine 全部唤醒。 如果 counter 值变为负值,则触发 panic 。

// Add将增量 (可能为负) 添加到WaitGroup计数器。
// 如果计数器变为零,则释放在等待时阻塞的所有goroutines。
// 如果计数器为负,则添加panics。
//
// 请注意,当计数器为零时发生的具有正增量的调用
// 必须在等待之前发生。具有负delta的呼叫,或具有
// 当计数器大于零时开始的正增量,可能会发生
// 任何时候。
// 通常,这意味着对Add的调用应在语句之前执行
// 创建要等待的goroutine或其他事件。
// 如果一个WaitGroup被重用等待几个独立的事件集,
// 新的添加调用必须在所有以前的等待调用返回后发生。
// 请参见WaitGroup示例。
func (wg *WaitGroup) Add(delta int) {// 将传入的值,加到 state 的 高 32 位// 这里也就是 counter 的值,当前还未结束的 goroutine 的值,也就是执行 goroutine 的值state := wg.state.Add(uint64(delta) << 32)// counterv := int32(state >> 32)// waiter count 等待 goroutine 结束的 值// waiter count 是由 Wait 方法 递增的,所以  waiter count 一定是 大于等于0w := uint32(state)// 如果 counter 小于 0 触发 panic if v < 0 {panic("sync: negative WaitGroup counter")}// 如果 waiter count 大于 0 表示已经有 goroutine 在等待了,而且 counter 等待执行的数量和设置的相同,// 此时在 调用 Add ,表示 Add 与 Wait 并发调用了 if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// 如果 counter 大于 0 ,但是 waiter count 等于 0 ,表示 有 counter 个 goroutine 待执行,// 但是没有 goroutine 阻塞等待,所以直接设置 counter 就结束了if v > 0 || w == 0 {return}// 当waiters> 0时,此goroutine已将counter设置为0。// 现在不能有状态的并发突变://-Adds不能与Wait同时发生,//-如果看到counter == 0,Wait不会增加waiters。// 仍然做一个便宜的健全性检查来检测WaitGroup滥用。if wg.state.Load() != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// Reset waiters count to 0.// 到了这里 counter 一定等于 0 , 而且 waiter count 大于 0// 将 counter 设置为 0 wg.state.Store(0)// 根据 waiter count 的数量,释放信号量for ; w != 0; w-- {runtime_Semrelease(&wg.sema, false, 0)}
}

如果按照串行的执行方式,一般都是先 Add 在 Wait ,然后在 Done ,但是在并发的环境中,执行的先后顺序存在不确定性,所以,在Add里面会对各种情况做判断。

2.4 Wait

Wait 方法也是两个操作,第一个是累加 waiter count ,第二个是阻塞并等待信号量。

// 等待块,直到WaitGroup计数器为零。
func (wg *WaitGroup) Wait() {// 死循环,死循环的目的是 做 cas 的时候,如果失败,不断重试// 这也是 Java 中乐观锁的实现,死循环 casfor {// 获取 counter 和 waiter count state := wg.state.Load()// counter v := int32(state >> 32)// waiter countw := uint32(state)// 如果 counter 等于 0 ,表示等待执行的 goroutine 已经全部都执行完了,那么就无需等待了if v == 0 {// Counter is 0, no need to wait.return}// 增加 waiter count 计数。// 等待 信号的数量加1// 使用 cas 累加,如果累加失败,那么就 for 循环重试if wg.state.CompareAndSwap(state, state+1) {// 阻塞并等待信号量runtime_Semacquire(&wg.sema)// 如果 得到了信号量,但是 waiter count 和 counter 都不为0 那么失败// 这里可以认为是使用成本比较低的方式做了健壮性检查if wg.state.Load() != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}// 跳出循环return}}
}

使用死循环的CAS保证并发的时候,也能正确的累加 waiter count.

2.5 Done

Done 方法只做一件事,就是把 counter 减 1,但是需要注意的是, counter 减 1 的操作是通过 Add 方法实现的。
所以 Done 等价于 Add(-1)
在这里插入图片描述

在回过头看看 Add 的逻辑:

// Add将增量 (可能为负) 添加到WaitGroup计数器。
// 如果计数器变为零,则释放在等待时阻塞的所有goroutines。
// 如果计数器为负,则添加panics。
//
// 请注意,当计数器为零时发生的具有正增量的调用
// 必须在等待之前发生。具有负delta的呼叫,或具有
// 当计数器大于零时开始的正增量,可能会发生
// 任何时候。
// 通常,这意味着对Add的调用应在语句之前执行
// 创建要等待的goroutine或其他事件。
// 如果一个WaitGroup被重用等待几个独立的事件集,
// 新的添加调用必须在所有以前的等待调用返回后发生。
// 请参见WaitGroup示例。
func (wg *WaitGroup) Add(delta int) {// 将传入的值,加到 state 的 高 32 位// 这里也就是 counter 的值,当前还未结束的 goroutine 的值,也就是执行 goroutine 的值state := wg.state.Add(uint64(delta) << 32)// counterv := int32(state >> 32)// waiter count 等待 goroutine 结束的 值// waiter count 是由 Wait 方法 递增的,所以  waiter count 一定是 大于等于0w := uint32(state)// 如果 counter 小于 0 触发 panic if v < 0 {panic("sync: negative WaitGroup counter")}// 如果 waiter count 大于 0 表示已经有 goroutine 在等待了,而且 counter 等待执行的数量和设置的相同,// 此时在 调用 Add ,表示 Add 与 Wait 并发调用了 if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// 如果 counter 大于 0 ,但是 waiter count 等于 0 ,表示 有 counter 个 goroutine 待执行,// 但是没有 goroutine 阻塞等待,所以直接设置 counter 就结束了// 在 Done 递减的时候,如果 counter 不等于 0 ,那么就结束了, 如果 counter > 0 同时 waiter count 也大于0呢if v > 0 || w == 0 {return}// 当waiters> 0时,此goroutine已将counter设置为0。// 现在不能有状态的并发突变://-Adds不能与Wait同时发生,//-如果看到counter == 0,Wait不会增加waiters。// 仍然做一个便宜的健全性检查来检测WaitGroup滥用。if wg.state.Load() != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// Reset waiters count to 0.// 到了这里 counter 一定等于 0 , 而且 waiter count 大于 0// 将 counter 设置为 0 wg.state.Store(0)// 根据 waiter count 的数量,释放信号量for ; w != 0; w-- {runtime_Semrelease(&wg.sema, false, 0)}
}

可以这么简单的理解,初始化 x 个 counter 等待数量,需要等待的 goroutine 通过 Wait 方法阻塞等待信号量,并且将 waiter count 加1,
执行 Done 的 goroutine 将 counter 数量减1,当 counter 减少到 0 的时候,释放 waiter count 个信号量,唤醒 goroutine 继续执行。

3. 小例子

3.1 主协程等待子协程执行完成

func TestWaitGroup(t *testing.T) {sum := 10wg := sync.WaitGroup{}wg.Add(sum)for i := 0; i < sum; i++ {ti := igo func() {name := fmt.Sprintf("goroutine %d", ti)fmt.Println(name, "start")time.Sleep(3 * time.Second)fmt.Println(name, "end")wg.Done()}()}wg.Wait()fmt.Println("all goroutine done")
}

主协程等待10个子协程执行完成后,继续执行。

3.2 子协程等待主协程信号

func TestWaitGroup(t *testing.T) {sum := 10wg := sync.WaitGroup{}wg.Add(1)for i := 0; i < sum; i++ {go func() {fmt.Println("goroutine wait")// 子协程等待资源信号wg.Wait()fmt.Println("goroutine done")}()}time.Sleep(time.Second)// 主协程资源准备好了,发出信号,让子协程继续处理wg.Done()time.Sleep(time.Second)
}

这种使用方式不多,考虑如下场景:
主协程查询到100条数据,子协程需要先准备一下,执行一段逻辑,然后在执行分配的数据。
如果是主协程数据准备好了,在启动子协程,处理数据,那么如果处理数据之前的操作比较耗时,那么性能就比较差。
这个时候就可以使用上面这种用法,主协程先创建子协程,子协程先执行逻辑,到达从主协程获取数据前,使用 wait 进行等待。
主协程数据准备好了,在使用 done 通知 子协程继续。

3.3 GetFirst

func TestWaitGrou(t *testing.T) {sum := 10wg := sync.WaitGroup{}wg.Add(sum)for i := 0; i < sum; i++ {idx := igo func() {fmt.Printf("goroutine %d start\n", idx)if idx == 6 {// 因为 Done 就是 调用  Add(-1) , 所以如果某个 goroutine 执行成功,那么可以使用 Add(-sum) 一次性解除 主协程的等待// 其他子协程的执行实际上就没有意义了,因为只需要执行成功的任意一个就行了time.Sleep(time.Second)fmt.Printf("goroutine %d success\n", idx)wg.Add(0 - sum)}// 模拟超时time.Sleep(3 * time.Second)wg.Done()fmt.Println("goroutine done")}()}fmt.Println("wait some one ok")wg.Wait()fmt.Println("master done")
}

这种用法可以用在查询中:
比如知道一个资源id,但是不知道这个资源id是哪个部门的,假设每个部门一个数据库,那么就需要将所有的部门都遍历一遍,才能知道是哪个部门的。
如果是串行,那么当某个部门的数据库,查询这个资源id有数据返回,那么查询结束,否则就继续查询,直到全部查询完成。
如果是并行,那么当某个子协程返回数据,那么就可以结束了,因为隐含了一个逻辑,就是当资源在A部门的时候,就不可能在其他部门,因为一个资源不能多次分配。
但是使用这种方法的时候需要注意,其他子协程在 done 的时候,因为 counter 会变成负数,导致 panic 。

4. 总结

WaitGroup 通常用于等待一组“工作协程”结束的场景,其内部维护两个计数器,可以称为“工作协程”计数器和“等待协程”计数器。WaitGroup 对外提供的三个方法分工明确:

  • Add(delta int) 方法用于增加“工作协程”计数,通常在启动新的“工作协程”之前调用。
  • Done 方法用于减少“工作协程”计数,每次调用递减1,通常在“工作协程”内部且在临近返回之前调用。
  • Wait 方法用于增加“等待协程”计数,通常在所有“工作协程”全部启动之后调用。

Done 方法除了负责递减“工作协程”计数,还会在“工作协程”计数变为0时检查“等待协程”计数器,并把“等待协程”唤醒。需要注意,Done 方法递减“工作协程”计数后,如果“工作协程”
计数变为负数,则会触发panic,这就要求 Add 方法调用要早于 Done 方法。
此外,通过 Add 方法累加的“工作协程”计数要与实际需要等待的“工作协程”数量一致,否则 实际“工作协程”调用 Done 递减“工作协程”计数,无法使计数等于0,那么“等待协程”永远不会被唤醒,就发生了死锁。
Go运行时检测到死锁触发panic。

WaitGroup 如果遇到派生的 goroutine ,就需要将 WaitGroup 不断的传递,还要保证数量能对上,否则就会产生死锁,对于存在派生的,多层级的 goroutine 的时候,WaitGroup 的使用难度大大增加。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/20616.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

公网IP地址如何查询?

公网IP地址是指在互联网中可以被全球范围内的设备访问的IP地址。在网络通信中&#xff0c;公网IP地址扮演着重要的角色&#xff0c;它可以标识设备在互联网中的位置。查询公网IP地址是一种常见的网络管理需求&#xff0c;因为它能够提供网络设备的准确位置信息&#xff0c;方便…

AI 绘画爆火背后:扩散模型原理及实现

节前&#xff0c;我们星球组织了一场算法岗技术&面试讨论会&#xff0c;邀请了一些互联网大厂朋友、参加社招和校招面试的同学。 针对算法岗技术趋势、大模型落地项目经验分享、新手如何入门算法岗、该如何准备、面试常考点分享等热门话题进行了深入的讨论。 合集&#x…

Java进阶学习笔记34——Arrays类

Arrays&#xff1a; 用来操作数组的工具类。 解释说明&#xff1a; 只要知道代码这么写就可以了。 package cn.ensource.d5_arrays;import java.util.Arrays; import java.util.function.IntToDoubleFunction;public class ArraysTest1 {public static void main(String[] arg…

LTspice仿真中设置电阻随时间变化的方法

背景&#xff1a; 笔者找了很多资料都没有看到如何设置电阻、电容等参数随时间变化。但在实际模拟中&#xff0c;总会遇到需要模拟这些量的变化。故撰写此文&#xff0c;供大家参考。 除了模拟随时间变化外&#xff0c;同样的思路也可以模拟随其他变量变化 效果展示 设置电…

32【Aseprite 作图】石头——拆解

1 石头先画轮廓&#xff0c;还是2 4 1 1 2 2 2&#xff0c;这样画一个圆的轮廓 或者2 1 1 3 5 1 1 1 1 2 4 &#xff0c; 2 最暗一层的黑色&#xff0c;做阴影部分&#xff0c;就是7 4 3 2 做最深的部分 各个地方画一些浅色的&#xff0c;做高光部分&#xff0c;上面的高光偏圆…

015、列表_应用场景

1.消息队列 如图所示,Redis的lpush+brpop命令组合即可实现阻塞队列,生产者客户端使用lrpush从列表左侧插入元素,多个消费者客户端使用brpop命令阻塞式的“抢”列表尾部的元素,多个客户端保证了消费的负载均衡和高可用性。 2.文章列表 每个用户有属于自己的文章列表,现…

收银系统源码-千呼新零售2.0【智慧供应链】

千呼新零售2.0系统是零售行业连锁店一体化收银系统&#xff0c;包括线下收银线上商城连锁店管理ERP管理商品管理供应商管理会员营销等功能为一体&#xff0c;线上线下数据全部打通。 适用于商超、便利店、水果、生鲜、母婴、服装、零食、百货等连锁店使用。 详细介绍请查看下…

FinalShell 配置SSH密钥登陆

转载请标明出处&#xff1a;http://blog.csdn.net/donkor_/article/details/139355489 文章目录 前言生成密钥服务器配置公钥本地配置私钥存储私钥FinalShell配置 总结 前言 本机FinalShell 配置SSH密钥登陆服务器&#xff0c;这样就不再需要使用密码进行登陆了。由于FinalSh…

【StableDiffusion秋叶包反斜杠问题】Failed to find xxx\sd-webui-aki-v4.8\...\xxx.pth

一、问题发生 1.在我额外安装预处理器时报错 意思是没办法找到有这么一个包&#xff08;但我已经把这个包扔进去了&#xff09; 完整报错&#xff1a; Failed to find S:\app_AI\stableDiffusion-webui-aki\sd-webui-aki-v4.8\extensions\sd-webui-controlnet\annotator\dow…

电机测试方法的介绍与功能实现(T测试方法)

目录 概述 1 理论介绍 2 实现原理 2.1 旋转式编码器原理 2.2 系统实现框图 2.3 测速原理 2.4 计算速度值 3 STM32Cube配置项目 3.1 软件版本信息 3.2 配置项目 4 代码实现 4.1 电机速度控制 4.2 速度计算函数 4.3 功能实现 5 测试 概述 本文主要介绍测试电机速…

Vue项目运行页面禁止缩放【移动端和PC端都禁止缩放】解决方案

Vue项目运行页面禁止缩放【移动端和PC端都禁止缩放】解决方案&#xff0c;有的人手很J,总喜欢放大缩小&#xff0c;从而会导致页面错乱&#xff0c;以下是解决方案&#xff0c;简单有效 效果图PC&#xff1a;滚轮缩放和其他缩放都会禁止 移动端效果图&#xff1a;各种手机平板…

SSL发送邮件时如何配置客户端确保安全性?

怎么使用SSL安全协议通过AokSend发送加密的电子邮件&#xff1f; SSL是一种常用的加密通信协议&#xff0c;用于确保数据在客户端和服务器之间的安全传输。AokSend将讨论如何通过配置客户端确保SSL发送邮件的安全性&#xff0c;并介绍如何使用SSL安全协议通过AokSend发送加密的…

zibll-V7.7最新版2024完美破解授权可用(含授权教程)

最近这个正版安装包流出来了,试了一下用以前的绕过授权方法&#xff0c;一样可以授权。 源码下载&#xff1a;https://download.csdn.net/download/m0_66047725/89379057 更多资源下载&#xff1a;关注我。

力扣200. 岛屿数量(BFS)

Problem: 200. 岛屿数量 文章目录 题目描述思路及解法复杂度Code 题目描述 思路及解法 1.定义方向数组&#xff1a;定义一个方向数组 DIRECTIONS&#xff0c;表示上、下、左、右四个方向的移动。 2.获取网格的行数和列数同时初始化一个计数器 numIslands 用于记录岛屿的数量。 …

什么是 Redis 缓存?它解决了什么问题?怎么使用它?

前言 写在前面&#xff0c;让我们从 3 个问题开始今天的文章&#xff1a;什么是 Redis 缓存&#xff1f;它解决了什么问题&#xff1f;怎么使用它&#xff1f; 在笔者近 3 年的 Java 一线开发经历中&#xff0c;尤其是一些移动端、用户量大的互联网项目&#xff0c;经常会使用…

数学建模 —— 数学规划模型(5)

目录 一、数学规划 1.1 数学规划问题一般形式 二、常见规划模型 2.1 线性规划&#xff08;Linear Programming&#xff09; 2.1.1 定义 2.1.2 一般形式 2.1.3 标准形式 2.1.4 求解 2.2 整数规划&#xff08;Integer Programming&#xff09; 2.2.1 单目标规划 2.…

RT_thread nano移植Finsh

参考连接: https://blog.csdn.net/baseball214/article/details/131341722 移植的前提是,你已经有一个可以使用的nano功能. 1.将rtthread-nano-master\rt-thread\components文件复制到工程. 2.添加Finsh中的.c以及相关.h头文件路径 3.注释掉finsh_config.h文件中以下两个宏…

C语言中的数据类型转换:隐式类型转换与显示类型转换

一. 简介 本文简单学习一下&#xff0c;C语言中的数据类型转换。重点学习一下隐式类型转换。 二. C语言中的数据类型转换&#xff1a;隐式类型转换与显示类型转换 类型转换&#xff08;TypeCasting&#xff09;&#xff1a;在C语言中是将一种数据类型值转换为另一种数据类型…

transfomer中attention为什么要除以根号d_k

简介 得到矩阵 Q, K, V之后就可以计算出 Self-Attention 的输出了&#xff0c;计算的公式如下: A t t e n t i o n ( Q , K , V ) S o f t m a x ( Q K T d k ) V Attention(Q,K,V)Softmax(\frac{QK^T}{\sqrt{d_k}})V Attention(Q,K,V)Softmax(dk​ ​QKT​)V 好处 除以维…

leetcode102. 二叉树的层序遍历

一、题目描述&#xff1a; 给你二叉树的根节点 root &#xff0c;返回其节点值的 层序遍历 。 &#xff08;即逐层地&#xff0c;从左到右访问所有节点&#xff09;。 二、输入输出实例&#xff1a; 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&am…