WaitGroup原理分析

背景

在实际业务开发中,我们会遇到以下场景:请求数据库,批量获取1000条数据记录后,处理数据
为了减少因一次批量获取的数据太多,导致的数据库延时增加,我们可以把一次请求拆分成多次请求,并发去处理,当所有的并发请求完成后,再继续处理这些返回的数据
golang中的WaitGroup,就可以帮助我们实现上述的场景

快速入门

背景:开启10个goroutine并发执行,等待所有goroutine执行完成后,当前goroutine打印执行完成

func TestWaitGroup(t *testing.T) {var wg sync.WaitGroupfor i := 0; i < 10; i++ {index := igo func() {wg.Add(1)defer wg.Done()fmt.Println(fmt.Sprintf("%+v 正在执行", index))}()}wg.Wait()fmt.Println("TestWaitGroup method done")
}

源码分析

golang版本:1.18.2

源码路径:src/sync/waitgroup.go

// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
// WaitGroup 等待 goroutine 集合完成
// 主 goroutine 调用 Add 设置等待的 goroutine 数量
// 然后每个 goroutine 运行并在完成时调用 Done
// 同时,Wait 可以用来阻塞,直到所有 goroutine 都完成
type WaitGroup struct {noCopy noCopy// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.// 64-bit atomic operations require 64-bit alignment, but 32-bit// compilers only guarantee that 64-bit fields are 32-bit aligned.// For this reason on 32 bit architectures we need to check in state()// if state1 is aligned or not, and dynamically "swap" the field order if// needed.// 64位值:高32位是计数器,低32位是waiter计数// 64位原子操作需要64位对齐,但32位编译器仅保证64位字段是32位对齐的// 因此,在 32 位架构上,我们需要在 state() 中检查 state1 是否对齐,并在需要时动态“交换”字段顺序state1 uint64state2 uint32
}

noCopy:WaitGroup在首次使用后,不能被复制
state1,state2:一共占用12字节,保存了三类信息:4字节保存goroutine计数,4字节保存waiter计数,4字节保存信号量
WaitGroup对外提供了以下三个方法:

// 设置等待的goroutine数量
func (wg *WaitGroup) Add(delta int)
// goroutine执行完成
func (wg *WaitGroup) Done()
// 阻塞等待所有的goroutine都执行完成
func (wg *WaitGroup) Wait()

Add

// state returns pointers to the state and sema fields stored within wg.state*.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {// state1 is 64-bit aligned: nothing to do.return &wg.state1, &wg.state2} else {// state1 is 32-bit aligned but not 64-bit aligned: this means that// (&state1)+4 is 64-bit aligned.state := (*[3]uint32)(unsafe.Pointer(&wg.state1))return (*uint64)(unsafe.Pointer(&state[1])), &state[0]}
}// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
// Add 将 delta(可能为负)添加到 WaitGroup 计数器。
// 如果计数器变为零,则所有在 Wait 上阻塞的 goroutine 都会被释放。
// 如果计数器变为负数,则添加panic。
// 请注意,计数器为零时发生的具有正增量的调用必须在等待之前发生。 
// 具有负增量的调用或在计数器大于零时开始的具有正增量的调用可能随时发生。
// 通常,这意味着对 Add 的调用应该在创建 goroutine 或其他要等待的事件的语句之前执行。
// 如果重用一个 WaitGroup 来等待几个独立的事件集,新的 Add 调用必须在所有先前的 Wait 调用返回后发生。
func (wg *WaitGroup) Add(delta int) {statep, semap := wg.state()if race.Enabled {_ = *statep // trigger nil deref earlyif delta < 0 {// Synchronize decrements with Wait.race.ReleaseMerge(unsafe.Pointer(wg))}race.Disable()defer race.Enable()}// 记录goroutine计数state := atomic.AddUint64(statep, uint64(delta)<<32)// 获取goroutine计数v := int32(state >> 32)// 获取waiter计数w := uint32(state)if race.Enabled && delta > 0 && v == int32(delta) {// The first increment must be synchronized with Wait.// Need to model this as a read, because there can be// several concurrent wg.counter transitions from 0.race.Read(unsafe.Pointer(semap))}// goroutine计数小于0if v < 0 {panic("sync: negative WaitGroup counter")}// w != 0说明已经执行了Wait且还有阻塞等待的goroutine,此时不允许在执行Addif w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// 存在没有执行完成的goroutine,或者当前没有waiter,直接返回if v > 0 || w == 0 {return}// This goroutine has set counter to 0 when waiters > 0.// Now there can't be concurrent mutations of state:// - Adds must not happen concurrently with Wait,// - Wait does not increment waiters if it sees counter == 0.// Still do a cheap sanity check to detect WaitGroup misuse.// 此时goroutine计数为0,且waiter计数大于0,不然上一步就返回了// 现在以下状态不能同时发生:// 1. 并发调用Add和Wait// 2. 当goroutine计数为0时,Wait不会继续增加waiter计数// 仍然做一个廉价的健全性检查来检测 WaitGroup 的滥用,防止以上情况发生if *statep != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// Reset waiters count to 0.// 重置waiter计数*statep = 0// 唤醒所有的waiterfor ; w != 0; w-- {runtime_Semrelease(semap, false, 0)}
}

delta代表本次需要记录的goroutine计数,可能为负数
64位原子操作需要64位对齐,但32位编译器仅保证64位字段是32位对齐的
当state1是64位对齐时,state1高32位是goroutine计数,低32位是waiter计数
当state1不是64位对齐时,动态“交换”字段顺序
记录goroutine计数的变化delta
如果goroutine计数小于0,则直接panic
如果已经执行了Wait且还有阻塞等待的goroutine,此时不允许在执行Add
如果存在没有执行完成的goroutine,或者当前没有waiter,直接返回
当goroutine计数为0,且waiter计数大于0时,现在以下状态不能同时发生:

并发调用Add和Wait
当goroutine计数为0时,Wait不会继续增加waiter计数

简单校验通过后,重置waiter计数为0,唤醒所有阻塞等待的waiter

Done

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {wg.Add(-1)
}

调用Add,delta = -1,代表goroutine计数-1

Wait

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {statep, semap := wg.state()if race.Enabled {_ = *statep // trigger nil deref earlyrace.Disable()}for {state := atomic.LoadUint64(statep)// 获取goroutine计数v := int32(state >> 32)// 获取waiter计数w := uint32(state)// goroutine计数为0,不需要等待,直接返回if v == 0 {// Counter is 0, no need to wait.if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}// Increment waiters count.// waiter计数+1if atomic.CompareAndSwapUint64(statep, state, state+1) {if race.Enabled && w == 0 {// Wait must be synchronized with the first Add.// Need to model this is as a write to race with the read in Add.// As a consequence, can do the write only for the first waiter,// otherwise concurrent Waits will race with each other.race.Write(unsafe.Pointer(semap))}// 阻塞,等待goroutine计数为0后唤醒继续执行runtime_Semacquire(semap)// Wait还没有执行完成,就开始复用WaitGroupif *statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}}
}

调用state(),保证字段内存对齐
如果goroutine计数为0,不需要等待,直接返回
尝试对waiter计数+1,若失败,则继续下一轮重试
对waiter计数+1成功,则阻塞当前goroutine,等待goroutine计数为0后唤醒继续执行
唤醒继续执行后,简单判断是否存在Wait还没有执行完成,就开始复用WaitGroup的情况,如果有,则panic;如果没有,则直接返回

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/209355.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C#-快速剖析文件和流,并使用

目录 一、概述 二、文件系统 1、检查驱动器信息 2、Path 3、文件和文件夹 三、流 1、FileStream 2、StreamWriter与StreamReader 3、BinaryWriter与BinaryReader 一、概述 文件&#xff0c;具有永久存储及特定顺序的字节组成的一个有序、具有名称的集合&#xff1b; …

大模型的全方位评估

摘要&#xff1a; 评估通过提供一种跟踪进度、理解模型以及记录其能力和偏差的方法&#xff0c;为基础大模型提供了背景。基础大模型挑战了机器学习中标准评估范式实现这些目标的能力&#xff0c;因为它们距离特定任务只有一步之遥。为了设想适合基础模型的评估新范式&#xff…

枚举 LeetCode2048. 下一个更大的数值平衡数

如果整数 x 满足&#xff1a;对于每个数位 d &#xff0c;这个数位 恰好 在 x 中出现 d 次。那么整数 x 就是一个 数值平衡数 。 给你一个整数 n &#xff0c;请你返回 严格大于 n 的 最小数值平衡数 。 如果n的位数是k&#xff0c;n它的下一个大的平衡数一定不会超过 k1个k1…

图论——最小生成树

图论——最小生成树 A wise man changes his mind, a fool never will 生成树 一个连通图的生成树是一个极小的连通子图&#xff0c;它包含图中全部的n个顶点&#xff0c;但只有构成一棵树的n-1条边。 最小生成树 在这些边中选择N-1条出来&#xff0c;连接所有的N个点。这N-1…

Java后端的登录、注册接口是怎么实现的

目录 Java后端的登录、注册接口是怎么实现的 Java后端的登录接口是怎么实现的 Java后端的注册接口怎么实现&#xff1f; 如何防止SQL注入攻击&#xff1f; Java后端的登录、注册接口是怎么实现的 Java后端的登录接口是怎么实现的 Java后端的登录接口的实现方式有很多种&a…

使用git出现的问题

保证 首先保证自己的git已经下载 其次保证自己的gitee账号已经安装并且已经生成ssh公钥 保证自己要push的代码在要上传的文件夹内并且配置文件等都在父文件夹&#xff08;也就是文件没有套着文件&#xff09; 问题 1 $ git push origin master gitgitee.com: Permission de…

近似同态加密的 IND/SIM-CPA+ 安全性:对于 CKKS 实际有效的攻击

参考文献&#xff1a; [LM21] Li B, Micciancio D. On the security of homomorphic encryption on approximate numbers[C]//Advances in Cryptology–EUROCRYPT 2021: 40th Annual International Conference on the Theory and Applications of Cryptographic Techniques, Z…

【Linux】命令expect使用详解

&#x1f984; 个人主页——&#x1f390;个人主页 &#x1f390;✨&#x1f341; &#x1fa81;&#x1f341;&#x1fa81;&#x1f341;&#x1fa81;&#x1f341;&#x1fa81;&#x1f341; 感谢点赞和关注 &#xff0c;每天进步一点点&#xff01;加油&#xff01;&…

【上海大学数字逻辑实验报告】五、记忆元件测试

一、实验目的 掌握R-S触发器、D触发器和JK触发器的工作原理及其相互转换。学会用74LS00芯片构成钟控RS触发器。学会用74LS112实现D触发器学会在Quartus II上用D触发器实现JK触发器。 二、实验原理 基本R-S触发器是直接复位-置位的触发器&#xff0c;它是构成各种功能的触发器…

AI文档助手,当下热门的AI文档助手【2024】

在当今信息爆炸的时代&#xff0c;文档创作的需求愈发庞大。为了满足用户对高效、准确、原创性文档的需求&#xff0c;人工智能技术的应用日益广泛。本文将专心分享AI文档助手领域的热门推荐。 AI文档助手的背景与应用 AI文档助手作为人工智能技术在文档创作领域的一大应用&am…

nginx配置自建SSL证书

文章目录 前言配置SSL证书SSL证书放在 Nginx 而不放在应用服务器上的好处Nginx只能转发http协议吗Nginx转发TCP协议会收到端口限制吗Nginx本身能将Websocket数据转化成TCP数据吗总结 前言 之前的一篇文章《自建CA并生成自签名SSL证书》中讲到为什么要自建CA和自签名SSL证书&am…

velocity-engine-core是什么?Velocity模板引擎的使用

velocity-engine-core是什么&#xff1f;Velocity模板引擎的使用 1. 常见的模板引擎2. Velocity 的语法3.Velocity的使用 相信在日常开发中或多或少都听过或者使用过模板引擎&#xff0c;比如熟知的freemarker, thymeleaf等。而模板引擎就是为了实现View和Data分离而产生的。 而…

C++封装、继承(单继承)、多态详细分析。

系列文章目录 文章目录 系列文章目录摘要一、基本概念二、多态的分类三、多态的实现3.1 类型兼容与函数重写3.2 动态联编与静态联编3.3 虚函数3.4 动态多态的实现过程 总结参考文献 摘要 多态性特征是 C中最为重要的一个特征&#xff0c;熟练使用多态是学好 C的关键&#xff0…

Kotlin关键字二——constructor和init

在关键字一——var和val中最后提到了构造函数&#xff0c;这里就学习下构造函数相关的关键字: constructor和init。 主要构造(primary constructor) kotlin和java一样&#xff0c;在定义类时就自动生成了无参构造 // 会生成默认的无参构造函数 class Person{ }与java不同的是…

configure脚本的常用参数

下面是一些常用的configure选项参数及其解释&#xff1a; --prefix<directory>&#xff1a;指定安装目录--with-<package>&#xff1a;指定依赖的外部库或软件包--enable-<feature>&#xff1a;启用某个特性--disable-<feature>&#xff1a;禁用某个特…

原创 | 数据的确权、流通、入表与监管研究(一):数据与确权

作者&#xff1a;张建军&#xff0c;中国电科首席专家&#xff0c;神州网信技术总监 本文约7100字&#xff0c;建议阅读10分钟 本文主要介绍数据与数据分类、数据确权规则、数据的所有权与其他权利等方面内容&#xff0c;并进行案例分析。 2022年12月发布的《关于构建数据基础制…

Linux 和 macOS 的主要区别在哪几个方面呢?

(꒪ꇴ꒪ )&#xff0c;Hello我是祐言QAQ我的博客主页&#xff1a;C/C语言&#xff0c;数据结构&#xff0c;Linux基础&#xff0c;ARM开发板&#xff0c;网络编程等领域UP&#x1f30d;快上&#x1f698;&#xff0c;一起学习&#xff0c;让我们成为一个强大的攻城狮&#xff0…

uniapp实战 —— 弹出层 uni-popup (含vue3子组件调父组件的方法)

效果预览 弹出的内容 src\pages\goods\components\ServicePanel.vue <script setup lang"ts"> // 子组件调父组件的方法 const emit defineEmits<{(event: close): void }>() </script><template><view class"service-panel"…

ALSA Compress-Offload API

概述 从 ALSA API 的早期开始&#xff0c;它就被定义为支持 PCM&#xff0c;或考虑到了 IEC61937 等固定比特率的载荷。参数和返回值以帧计算是常态&#xff0c;这使得扩展已有的 API 以支持压缩数据流充满挑战。 最近这些年&#xff0c;音频数字信号处理器 (DSP) 常常被集成…

git如何配置多个远程仓库,并且进行切换

一、配置多个远程仓库并进行切换&#xff0c;请按照以下步骤进行操作&#xff1a; 打开命令行终端&#xff0c;并进入您的 Git 仓库所在的目录。添加第一个远程仓库&#xff0c;使用以下命令&#xff1a;git remote add origin <第一个远程仓库的 URL>这里将远程仓库命名…