详解Go语言中的Goroutine组(Group)在项目中的使用

背景(Why)

Go语言通过其内置的goroutine和通道(channel)机制,提供了强大的并发支持。goroutine的开销非常低,一个goroutine仅占用几KB的内存,可以轻松创建成千上万个goroutine来处理并发任务。然而,随着并发任务数量的增加,管理goroutine的生命周期、处理错误以及保证资源正确回收变得越来越复杂。例如,我们需要处理以下场景:

  • 错误处理困难:如果某个goroutine发生错误或panic,需要有机制捕获这些错误并作出相应处理。
  • 资源管理复杂:确保所有goroutine在完成任务后正确回收资源,防止资源泄漏。
  • 任务调度不灵活:在多个goroutine之间调度任务,确保高效执行和公平分配。在goroutine执行前后进行必要的操作,如日志记录或环境准备。
  • 同步复杂性:确保所有goroutine都在某个时间点前完成,或者在发生重大错误时取消所有未完成的goroutine。

为了解决这些问题,引入了一个Group结构体,提供了一种更高级的方式来管理一组goroutine。

What

定义一个Group结构体来实现goroutine组管理

type Group struct {chs []func(ctx context.Context) error  // 保存所有要在组中执行的任务name string         // 组名err  error          // 保存组中发生的第一个错误ctx  context.Context // 组的上下文,用于控制任务的执行panicCb  func([]byte) bool // 在发生 panic 时调用的回调函数beforeCb func()            // 在任务执行之前调用的回调函数panicTimeout time.Duration // 调用 panicCb 之间的时间间隔ch           chan func(ctx context.Context) error // 任务通道cancel       func()         // 取消任务的函数wg           sync.WaitGroup // 等待组内所有任务完成errOnce    sync.Once        // 确保 err 只被设置一次workerOnce sync.Once        // 确保 worker 只被启动一次panicTimes int8             // 最大允许 panic 的次数
}
  • chs []func(ctx context.Context) error

    • 类型:切片,包含多个函数,这些函数接受 context.Context 作为参数并返回错误。
    • 用途:保存所有要在组中执行的任务。
  • name string

    • 类型:字符串。
    • 用途:保存组的名称。
  • err error

    • 类型:错误。
    • 用途:保存组中第一个发生的错误。
  • ctx context.Context

    • 类型:上下文。
    • 用途:控制任务的执行,可以用来取消任务或者设置任务的超时时间。
  • panicCb func([]byte) bool

    • 类型:函数,接受一个字节切片参数(panic 的堆栈信息)并返回布尔值。
    • 用途:当组中的任务发生 panic 时调用的回调函数。
  • beforeCb func()

    • 类型:函数,无参数无返回值。
    • 用途:在每个任务执行之前调用的回调函数。
  • panicTimeout time.Duration

    • 类型:持续时间。
    • 用途:两次调用 panicCb 之间的时间间隔。如果某个任务频繁地发生 panic,而每次 panic 都调用 panicCb,这可能会导致系统性能下降或产生大量日志。通过设置 panicTimeout,可以限制 panicCb 的调用频率,确保在一个指定的时间间隔内不会多次调用 panicCb
  • ch chan func(ctx context.Context) error

    • 类型:通道,包含函数,这些函数接受 context.Context 作为参数并返回错误。
    • 用途:用于在组内传递任务。
  • cancel func()

    • 类型:函数,无参数无返回值。
    • 用途:用于取消组内的所有任务。
  • wg sync.WaitGroup

    • 类型:等待组。
    • 用途:用于等待组内所有任务完成。
  • errOnce sync.Once

    • 类型:同步 Once。
    • 用途:确保 err 只被设置一次。
  • workerOnce sync.Once

    • 类型:同步 Once。
    • 用途:确保 worker 只被启动一次。
  • panicTimes int8

    • 类型:整数(8位)。
    • 用途:最大允许的 panic 次数。

创建NewGroup函数 

NewGroup函数用于创建一个新的goroutine组实例,初始化相关参数,并设置panic处理回调函数。

func NewGroup(option Option) *Group {log = logger.SLogger("goroutine")name := "default"if len(option.Name) > 0 {name = option.Name}g := &Group{name:         name,panicCb:      option.PanicCb,panicTimes:   option.PanicTimes,panicTimeout: option.PanicTimeout,}//如果 option 中未提供 panicCb,则使用默认的 panicCb 回调函数。这个函数会记录 panic 的信息,并增加 goroutineCrashedVec 指标。if g.panicCb == nil {g.panicCb = func(crashStack []byte) bool {log.Errorf("recover panic: %s", string(crashStack))goroutineCrashedVec.WithLabelValues(name).Inc()return true}}goroutineGroups.Inc()return g
}

3.定义GOMAXPROCS 方法

GOMAXPROCS 函数用于设置并发执行的最大 goroutine 数量。具体来说,它通过创建一个缓冲通道来限制并发执行的 goroutine 数量,并启动相应数量的 goroutine 来处理通道中的任务。

// GOMAXPROCS set max goroutine to work.
func (g *Group) GOMAXPROCS(n int) {if n <= 0 {panic("goroutine: GOMAXPROCS must great than 0")}g.workerOnce.Do(func() { // 确保该逻辑只执行一次g.ch = make(chan func(context.Context) error, n) // 创建缓冲通道,大小为 nfor i := 0; i < n; i++ { // 启动 n 个 goroutine 来处理通道中的任务go func() {for f := range g.ch {g.do(f) // 调用 g.do 方法执行任务}}()}})
}

使用 sync.Once 确保逻辑只执行一次。创建一个缓冲大小为 n 的通道 g.ch,用于存储任务。

启动 n 个 goroutine,循环从通道 g.ch 中获取任务并执行 g.do(f) 方法。每个 goroutine 都会持续从通道中获取任务并执行,直到通道被关闭。

for f := range g.ch 这种结构中,如果通道 g.ch 中没有任务,读取操作将会阻塞,直到有新的任务被写入通道。 也就是说开了n个goroutine在g.ch中等待任务发放和执行任务,所以最大并发的goroutine数量为n。某个goroutine从通道 g.ch 中取出的任务 f 不会在另一个 goroutine 的循环中再次出现,每个任务只会被一个 goroutine 处理一次。

4. 定义Go方法

Go方法用于启动一个新的goroutine,并将其添加到组中进行管理。如果Group已经初始化了工作通道,也就是如果有通道 g.ch,则尝试将任务发送到通道,如果通道已满(无法立即发送),则将函数 f 添加到 g.chs 列表中,等待稍后执行。如果没有通道 g.ch,则立即启动一个新的 goroutine 来执行任务。

func (g *Group) Go(f func(ctx context.Context) error) {g.wg.Add(1)goroutineCounterVec.WithLabelValues(g.name).Inc()if g.ch != nil {select {case g.ch <- f:default:g.chs = append(g.chs, f)}return}go g.do(f)
}

使用通道 g.ch 来限制同时运行的 goroutine 数量。当通道已满时,新的任务会被暂存到 g.chs 列表中。如果没有设置并发限制(即 g.chnil),则每次调用 Go 方法都会立即启动一个新的 goroutine 来执行任务。也就是提供了两种模式可供选择!

4. 定义Wait方法

Wait 方法用于等待所有通过 Go 方法启动的 goroutine 完成执行,并返回第一个非空错误(如果有)。

func (g *Group) Wait() error {if g.ch != nil {for _, f := range g.chs {g.ch <- f}}g.wg.Wait()if g.ch != nil {close(g.ch) // let all receiver exit}if g.cancel != nil {g.cancel()}return g.err
}

Wait 方法的设计确保了所有通过 Go 方法启动的 goroutine 都能够正确完成执行,并清理所有相关的资源。如果有任何 goroutine 返回错误,该方法会返回第一个非空错误。这个方法提供了一种优雅的方式来管理并发任务的生命周期和错误处理。

5. 定义具体执行任务的方法do方法

do方法负责在 goroutine 中执行任务,并处理可能发生的 panic。do方法执行传入的任务f,。如果任务中发生panic,do方法会根据配置的重试次数进行重试,并调用panicCb回调函数。

func (g *Group) do(f func(ctx context.Context) error) {//如果定义了 beforeCb 回调函数,调用它。这可以在每次任务开始前执行一些操作,如初始化工作或记录日志。if g.beforeCb != nil {g.beforeCb()}//初始化上下文ctx := g.ctxif ctx == nil {ctx = context.Background()}//设定重试次数为 g.panicTimes - 1。在 do 方法内部可能会递减该值来控制 panic 的重试逻辑。panicTimes := g.panicTimes - 1var (err   error//run 是一个匿名函数,用于执行传入的任务 f(ctx),并在任务完成后进行错误处理和资源清理。run   func()start = time.Now())run = func() {//通过 recover 捕获 panic 信息,并将堆栈信息存储在 buf 中,记录错误信息,并根据 panicCb 回调函数的返回值决定是否重试。defer func() {if r := recover(); r != nil {goroutineCrashedVec.WithLabelValues(g.name).Inc()isPanicRetry := truebuf := make([]byte, 4096) //nolint:gomndbuf = buf[:runtime.Stack(buf, false)]if e, ok := r.(error); ok {buf = append([]byte(fmt.Sprintf("%s\n", e.Error())), buf...)}if g.panicCb != nil {isPanicRetry = g.panicCb(buf)}//如果 panicCb 回调函数定义了,调用它,并判断是否继续重试。if isPanicRetry && panicTimes > 0 {panicTimes--if g.panicTimeout > 0 {time.Sleep(g.panicTimeout)}goroutineRecoverVec.WithLabelValues(g.name).Inc()//重试执行函数run()return} else {//如果重试次数用完了,则更新监控指标,记录 panic 发生的次数和恢复的次数。goroutineCounterVec.WithLabelValues(g.name).Dec()goroutineCostVec.WithLabelValues(g.name).Observe(float64(time.Since(start)) / float64(time.Second))goroutineStoppedVec.WithLabelValues(g.name).Inc()}err = fmt.Errorf("goroutine: panic recovered: %s", r)} else {//没有发生panic,则只用记录指标goroutineCounterVec.WithLabelValues(g.name).Dec()goroutineCostVec.WithLabelValues(g.name).Observe(float64(time.Since(start)) / float64(time.Second))goroutineStoppedVec.WithLabelValues(g.name).Inc()}//如果有err,则记录在g实例的字段中if err != nil {g.errOnce.Do(func() {g.err = errif g.cancel != nil {g.cancel()}})}g.wg.Done()}()err = f(ctx)}run()
}

HOW

下面是一个使用Group管理goroutine的示例代码:

func demoFunc(){fmt.Println("finish")
}
func main() {group := NewGroup(Option{Name:         "example-group",PanicCb:      nil, // 使用默认的panic处理回调PanicTimes:   3,   // 最大重试次数PanicTimeout: time.Second * 2, // 重试间隔})// 在这个group中启动5个goroutine执行任务,即增加五个func到group.chfor i := 0; i < 5; i++ {group.Go(func(ctx context.Context) error {// 在这里放入你要执行的函数(任务)demoFunc()return nil})}// 等待所有任务完成if err := group.Wait(); err != nil {fmt.Printf("group execution completed with error: %v\n", err)} else {fmt.Println("group execution completed successfully")}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/46315.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端web性能统计

前端web性能统计 1. 背景2. 业界方案2.1 腾讯2.2 蚂蚁金服2.3 字节跳动2.4 美团 3. 相关观念3.1 RAIL模型3.2 性能指标3.3 真实用户监控3.4 performance 4. 性能监控工具介绍5. 推荐采用方案 1. 背景 在如今的数字时代&#xff0c;网站和应用程序的性能对用户体验至关重要。用…

Bootstrap 栅格系统的工作原理?

Bootstrap的栅格系统是一种响应式的网格布局系统&#xff0c;用于在不同屏幕尺寸下实现页面布局的自适应。栅格系统基于12列的布局&#xff0c;可以让开发者轻松地创建响应式的网页布局。 工作原理如下&#xff1a; 容器&#xff08;Container&#xff09;&#xff1a;Bootstra…

STM32MP135裸机编程:唯一ID(UID)、设备标识号、设备版本

0 资料准备 1.STM32MP13xx参考手册1 唯一ID&#xff08;UID&#xff09;、设备标识号、设备版本 1.1 寄存器说明 &#xff08;1&#xff09;唯一ID 唯一ID可以用于生成USB序列号或者为其它应用所使用&#xff08;例如程序加密&#xff09;。 &#xff08;2&#xff09;设备…

Java实习修炼:力扣第116题之填充每个节点的下一个右侧指针

摘要 LeetCode第116题要求填充每个节点的下一个右侧指针&#xff0c;并指向其下一个右侧节点。本题考察了二叉树的遍历和指针操作。本文将介绍如何使用Java语言解决这个问题&#xff0c;并提供详细的代码实现。 1. 问题描述 给定一个完美二叉树&#xff0c;节点数量为m&…

Git代码管理工具 — 3 Git基本操作指令详解

目录 1 获取本地仓库 2 基础操作指令 2.1 基础操作指令框架 2.2 git status查看修改的状态 2.3 git add添加工作区到暂存区 2.4 提交暂存区到本地仓库 2.5 git log查看提交日志 2.6 git reflog查看已经删除的记录 2.7 git reset版本回退 2.8 添加文件至忽略列表 1 获…

0.单片机工作原理

文章目录 最小系统 单片机芯片 时钟电路 复位电路 电源 最小系统 单片机芯片 本次51单片机的芯片为&#xff1a;STC89C52 Flash(闪存)程序存储器&#xff1a;存储程序的空间 SRAM&#xff1a;数据存储器&#xff0c;可用于存放程序执行的中间结果和过程数据 DPTR&#xff1a;…

2024-07-14 Unity插件 Odin Inspector2 —— Essential Attributes

文章目录 1 说明2 重要特性2.1 AssetsOnly / SceneObjectsOnly2.2 CustomValueDrawer2.3 OnValueChanged2.4 DetailedInfoBox2.5 EnableGUI2.6 GUIColor2.7 HideLabel2.8 PropertyOrder2.9 PropertySpace2.10 ReadOnly2.11 Required2.12 RequiredIn&#xff08;*&#xff09;2.…

决策树算法入门到精通:全面解析与案例实现

1. 介绍决策树算法 决策树的基本概念和原理 决策树是一种基于树形结构的分类和回归方法&#xff0c;通过对数据集进行递归地划分&#xff0c;每个内部节点表示一个属性上的判断&#xff0c;每个叶节点代表一种类别或者数值。 决策树在机器学习中的应用场景 分类问题&#xf…

删除矩阵中0所在行 matlab

%for验证 new[]; for i1:size(old,1)if old(i,4)~0 %assume 0所在列在第4列new(end1,:)old(i,:);end enda(a(:,2)0,:)[]参考&#xff1a; 两种方式

Java:使用JMH做Benchmark基准测试

BenchMark 又叫做基准测试&#xff0c;主要用来测试一些方法的性能&#xff0c;可以根据不同的参数以不同的单位进行计算&#xff08;例如可以使用吞吐量为单位&#xff0c;也可以使用平均时间作为单位&#xff0c;在 BenchmarkMode 里面进行调整&#xff09;。 依赖 <dep…

机器人相关工科专业课程体系

机器人相关工科专业课程体系 前言传统工科专业机械工程自动化/控制工程计算机科学与技术 新兴工科专业智能制造人工智能机器人工程 总结Reference: 前言 机器人工程专业是一个多领域交叉的前沿学科&#xff0c;涉及自然科学、工程技术、社会科学、人文科学等相关学科的理论、方…

Linux编程(三)—makefile快速编译

起因 linux环境下&#xff0c;编译c程序很麻烦&#xff0c;后面g -o demo demo.cpp ……往往跟了许多许多东西&#xff0c;这些每次编译的时候都要书写&#xff0c;所以就产生了makefile快速编译方式&#xff0c;具体操作如下。 怎么用makefile? 第一步&#xff1a;下载 m…

WPF学习(2) -- 样式基础

一、代码 <Window x:Class"学习.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"xmlns:d"http://schemas.microsoft.com/expression/blend/2008&…

gtest单元测试:进程冻结与恢复管理模块的单元测试实现

文章目录 1. 概要2. 进程管理接口详解2.1 进程冻结与恢复的基本概念2.2 进程查找与PID获取2.3 进程冻结与恢复的实现2.3.1 进程冻结2.3.2 进程恢复 2.4 进程终止2.5 进程状态监控与控制 3. dummy_process的设计与实现3.1 创建dummy_process脚本3.2 启动dummy_process3.3 终止du…

SSLRec代码分析

文章目录 encoder-models-general_cfautocf.py data_utilsdata_handler_general_cf.py输入输出说明使用方法 trainertuner.py encoder-models-general_cf autocf.py import torch as t # 导入PyTorch并重命名为t from torch import nn # 从PyTorch导入神经网络模块 import …

MySQL 聚簇索引和非聚簇索引有什么区别?

聚簇索引&#xff08;主键索引&#xff09;、非聚簇索引&#xff08;二级索引&#xff09;。 这两者之间的最主要的区别是 B 树的叶子节点存放的内容不同&#xff1a; 聚簇索引的 B 树叶子节点存放的是主键值完整的记录&#xff1b;非聚簇索引的 B 树叶子节点存放的是索引值主…

Spring Boot项目实战:短信功能分布式限流

项目背景与需求 项目名称&#xff1a;充电桩项目升级&#xff1a;进行微服务架构升级关键功能&#xff1a;短信服务&#xff0c;用于用户登录、注册等 短信功能设计考虑 短信模板存储&#xff1a;需考虑存储方式发送次数限制&#xff1a;防止恶意攻击&#xff0c;设计60秒内…

【面试八股总结】C++内存管理:内存分区、内存泄漏、new和delete、malloc和free

参考资料&#xff1a;代码随想录、阿秀 一、内存分区 &#xff08;1&#xff09;栈区 在执行函数时&#xff0c;函数内部局部变量的存储单元都可以在栈上创建&#xff0c;函数执行结束时这些存储单元自动被释放。栈内存分配运算内置于处理器的指令集中&#xff0c;效率很高&am…

Postman下载及使用说明

Postman使用说明 Postman是什么&#xff1f; ​ Postman是一款接口对接工具【接口测试工具】 接口&#xff08;前端接口&#xff09;是什么&#xff1f; ​ 前端发送的请求普遍被称为接口 ​ 通常有网页的uri参数格式json/key-value请求方式post/get响应请求的格式json 接…

Java---匿名内部类及Lambda表达式简化函数式接口的匿名内部类

匿名内部类 什么是匿名内部类 一种特殊的局部内部类&#xff1b; 所谓匿名&#xff1a;指的是程序员不需要为这个类声明名字。 特点&#xff1a;匿名内部类本质就是一个子类或者实现类&#xff0c;定义类的同时会创建出对象 作用&#xff1a;更方便的创建子类&#xff08;实…