Go语言ErrGroup

Go语言ErrGroup

在并发编程里,sync.WaitGroup 并发原语的使用频率非常高,它经常用于协同等待的场景:goroutine A 在检查

点等待一组执行任务的 worker goroutine 全部完成,如果在执行任务的这些 goroutine 还没全部完成,

goroutine A 就会阻塞在检查点,直到所有 woker goroutine 都完成后才能继续执行。

如果在 woker goroutine 的执行过程中遇到错误并想要处理该怎么办? WaitGroup 并没有提供传播错误的功能,

遇到这种场景我们该怎么办? Go 语言在扩展库提供了 ErrorGroup 并发原语正好适合在这种场景下使用,它在

WaitGroup 的基础上还提供了,错误传播以及上下文取消的功能。

Go 扩展库通过 errorgroup.Group 提供 ErrorGroup 原语的功能,它有三个方法可调用:

func WithContext(ctx context.Context) (*Group, context.Context)
func (g *Group) Go(f func() error)
func (g *Group) Wait() error

接下来我们让主 goroutine 使用 ErrorGroup 代替 WaitGroup 等待所以子任务的完成,ErrorGroup 有一个特点是

会返回所以执行任务的 goroutine 遇到的第一个错误。试着执行一下下面的程序,观察程序的输出。

package mainimport ("fmt""golang.org/x/sync/errgroup""net/http"
)func main() {var urls = []string{"http://www.golang.org/","http://www.baidu.com/","http://www.noexist11111111.com/",}g := new(errgroup.Group)for _, url := range urls {url := urlg.Go(func() error {resp, err := http.Get(url)if err != nil {fmt.Println(err)return err}fmt.Printf("get [%s] success: [%d] \n", url, resp.StatusCode)return resp.Body.Close()})}if err := g.Wait(); err != nil {fmt.Println(err)} else {fmt.Println("All success!")}
}

输出:

Get "http://www.noexist11111111.com/": dial tcp: lookup www.noexist11111111.com: no such host
get [http://www.baidu.com/] success: [200]
Get "http://www.golang.org/": dial tcp 172.217.24.113:80: connectex: A connection attempt failed because the connected party did not properly respond after a period o
f time, or established connection failed because connected host has failed to respond.
Get "http://www.noexist11111111.com/": dial tcp: lookup www.noexist11111111.com: no such host

ErrorGroup 有一个特点是会返回所以执行任务的 goroutine 遇到的第一个错误:

package mainimport ("fmt""golang.org/x/sync/errgroup""log""time"
)func main() {var eg errgroup.Groupfor i := 0; i < 100; i++ {i := ieg.Go(func() error {time.Sleep(2 * time.Second)if i > 90 {fmt.Println("Error:", i)return fmt.Errorf("Error occurred: %d", i)}fmt.Println("End:", i)return nil})}if err := eg.Wait(); err != nil {log.Fatal(err)}
}

上面程序,遇到 i 大于 90 的都会产生错误结束执行,但是只有第一个执行时产生的错误被 ErrorGroup 返回,程

序的输出大概如下:

输出:

......
End: 35
End: 38
End: 28
End: 37
End:38;2;127;0;0m2023/06/29 14:18:03 Error occurred: 98
32
Error: 92
End: 23
End: 30
Error: 95
Error: 94
End: 74
End: 25
......

最早执行遇到错误的 goroutine 输出了Error: 98 但是所有未执行完的其他任务并没有停止执行,那么想让程序遇

到错误就终止其他子任务该怎么办呢?我们可以用 errgroup.Group 提供的 WithContext 方法创建一个带可取消

上下文功能的 ErrorGroup。

使用 errorgroup.Group 时注意它的两个特点:

  • errgroup.Group 在出现错误或者等待结束后都会调用 Context 对象的 cancel 方法同步取消信号。
  • 只有第一个出现的错误才会被返回,剩余的错误都会被直接抛弃。
package mainimport ("context""fmt""golang.org/x/sync/errgroup""log""time"
)func main() {eg, ctx := errgroup.WithContext(context.Background())for i := 0; i < 100; i++ {i := ieg.Go(func() error {time.Sleep(2 * time.Second)select {case <-ctx.Done():fmt.Println("Canceled:", i)return nildefault:if i > 90 {fmt.Println("Error:", i)return fmt.Errorf("Error: %d", i)}fmt.Println("End:", i)return nil}})}if err := eg.Wait(); err != nil {log.Fatal(err)}
}

Go 方法单独开启的 gouroutine 在执行参数传递进来的函数时,如果函数返回了错误,会对 ErrorGroup 持有的

err 字段进行赋值并及时调用 cancel 函数,通过上下文通知其他子任务取消执行任务。所以上面更新后的程序运

行后有如下类似的输出。

......
Canceled: 87
Canceled: 34
Canceled: 92
Canceled: 86
Cancled: 78
Canceled: 46
Cancel[38;2;127;0;0m2023/06/29 14:22:07 Error: 99
ed: 45
Canceled: 44
Canceled: 77
Canceled: 43
Canceled: 50
Canceled: 42
Canceled: 25
Canceled: 76
Canceled: 24
Canceled: 75
Canceled: 40
......

errorgroup源码:

在上面的例子中,子 goroutine 出现错误后,会 cancle 到其他的子任务,但是我们并没有看到调用 ctx 的 cancel

方法,下面我们看下源码,看看内部是怎么处理的。errgroup 的设计非常精练,全部代码如下:

package errgroupimport ("context""sync"
)// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid and does not cancel on error.
type Group struct {cancel func()wg sync.WaitGrouperrOnce sync.Onceerr     error
}// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {ctx, cancel := context.WithCancel(ctx)return &Group{cancel: cancel}, ctx
}// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {g.wg.Wait()if g.cancel != nil {g.cancel()}return g.err
}// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {g.wg.Add(1)go func() {defer g.wg.Done()if err := f(); err != nil {g.errOnce.Do(func() {g.err = errif g.cancel != nil {g.cancel()}})}}()
}

可以看到,errgroup 的实现依靠于结构体 Group,它通过封装 sync.WaitGroup,继承了 WaitGroup 的特性,在

Go() 方法中新起一个子任务 goroutine,并在 Wait() 方法中通过 sync.WaitGroup 的 Wait 进行阻塞等待。

同时 Group 利用 sync.Once 保证了它有且仅会保留第一个子 goroutine 错误。

Group 通过嵌入 context.WithCancel 方法产生的 cancel 函数,能够在子 goroutine 发生错误时,及时通过调用

cancle 函数,将 Context 的取消信号及时传播出去。

再看一个实际应用的例子:

package mainimport ("context""fmt""golang.org/x/sync/errgroup"
)func main() {g, ctx := errgroup.WithContext(context.Background())dataChan := make(chan int, 20)// 数据生产端任务子goroutineg.Go(func() error {defer close(dataChan)for i := 1; ; i++ {if i == 10 {return fmt.Errorf("data 10 is wrong")}dataChan <- ifmt.Println(fmt.Sprintf("sending %d", i))}})// 数据消费端任务子goroutinefor i := 0; i < 3; i++ {g.Go(func() error {for j := 1; ; j++ {select {case <-ctx.Done():return ctx.Err()case number := <-dataChan:fmt.Println(fmt.Sprintf("receiving %d", number))}}})}// 主任务goroutine等待pipeline结束数据流err := g.Wait()if err != nil {fmt.Println(err)}fmt.Println("main goroutine done!")
}
# 输出
sending 1
sending 2
sending 3
sending 4
sending 5
sending 6
sending 7
sending 8
sending 9
receiving 2
receiving 1
receiving 3
data 10 is wrong
main goroutine done!

自己实现一个 ErrGroup:

package mainimport ("context""errors""fmt""sync""sync/atomic""time"
)const (M = 2N = 8
)func main() {ctx, cancel := context.WithTimeout(context.Background(), time.Second*50)defer cancel()result := make([]int, N+1)errCh := make(chan error, 1)var firstSendErr int32wg := new(sync.WaitGroup)done := make(chan struct{}, 1)limit := make(chan struct{}, M)for i := 1; i <= N; i++ {limit <- struct{}{}var quit boolselect {// context已经被cancel,不需要起新的goroutine了case <-ctx.Done():quit = truedefault:}if quit {break}wg.Add(1)go func(x int) {defer func() {wg.Done()<-limit}()if ret, err := doTask(ctx, x); err != nil {if atomic.CompareAndSwapInt32(&firstSendErr, 0, 1) {errCh <- err// cancel其他的请求cancel()}} else {result[x] = ret}}(i)}go func() {wg.Wait()close(done)}()select {case err := <-errCh:handleErr(err, result[1:])<-donecase <-done:if len(errCh) > 0 {err := <-errChhandleErr(err, result[1:])return}fmt.Println("success handle all task:", result[1:])}
}func handleErr(err error, result []int) {fmt.Println("task err occurs: ", err, "result", result)
}func doTask(ctx context.Context, i int) (ret int, err error) {fmt.Println("task start", i)defer func() {fmt.Println("task done", i, "err", err)}()select {// 模拟处理任务时间case <-time.After(time.Second * time.Duration(i)):// 处理任务要支持被context cancel,不然就一直等到处理完再返回了case <-ctx.Done():fmt.Println("task canceled", i)return -1, ctx.Err()}// 模拟出现错误if i == 6 {return -1, errors.New("err test")}return i, nil
}
# 输出
task start 2
task start 1
task done 1 err <nil>
task start 3
task done 2 err <nil>
task start 4
task done 3 err <nil>
task start 5
task done 4 err <nil>
task start 6
task done 5 err <nil>
task start 7
task done 6 err err test
task canceled 7
task done 7 err context canceled
task err occurs:  err test result [1 2 3 4 5 0 0 0]

总结:

使用 errorgroup.Group 时注意它的特点:

  • 继承了 WaitGroup 的功能

  • errgroup.Group 在出现错误或者等待结束后都会调用 Context 对象 的 cancel 方法同步取消信号。

  • 只有第一个出现的错误才会被返回,剩余的错误都会被直接抛弃。

  • context 信号传播:如果子任务 goroutine 中有循环逻辑,则可以添加 ctx.Done 逻辑,此时通过 context 的

    取消信号,提前结束子任务执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/5284.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Godot实用代码-存取存档的程序设计

1. Settings.gd 全局变量 用于保存玩家设置 对应Settings.json 2. Data.gd 全局变量 用于保存玩具数据 对应Data.json 实践逻辑指南 1.在游戏开始的时候&#xff08;游戏场景入口的_ready()处&#xff0c; Settings.gd

day09面试题

面试题 说说对 React 的理解?有哪些特性?说说 Real DOM 和 Virtual DOM 的区别?优缺点?说说 React 生命周期有哪些不同阶段?每个阶段对应的方法是?说说 React 中的 setState 执行机制&#xff1f;说说对 React 中类组件和函数组件的理解?有什么区别? 说说对 React 的理…

Linux内核结构与特性简介

系统调用接口&#xff1a;位于最上层&#xff0c;实现了一些基本的功能&#xff0c;如read和write等系统调用。这是用户空间程序与内核交互的接口&#xff0c;提供了对内核功能的访问。 内核代码&#xff1a;位于系统调用接口之下&#xff0c;可以看作是独立于体系结构的通用内…

RabbitMQ的基本使用

RabbitMQ的基本使用 引入程序集&#xff1a;RabbitMQ.Client 生产者 /// <summary> /// ProducerWrites 写入消息 ConsumerConsumption 消费消息 /// </summary> public class ProducerWrites {public static void Send(){string path AppDomain.CurrentDomain.…

qt和vue交互

1、首先在vue项目中引入qwebchannel /******************************************************************************** Copyright (C) 2016 The Qt Company Ltd.** Copyright (C) 2016 Klarlvdalens Datakonsult AB, a KDAB Group company, infokdab.com, author Milian …

CLIP概述

文章目录 Learning Transferable Visual Models From Natural Language Supervision(使用自然语言的监督信号训练一个可迁移的视觉模型)AbstractIntroduction and Motivating WorkApproachNatural Language SupervisionCreating a Suffciently Large DatasetSelecting an Eff…

13_Linux无设备树Platform设备驱动

目录 Linux驱动的分离与分层 驱动的分隔与分离 驱动的分层 platform平台驱动模型简介 platform总线 platform驱动 platform设备 platform设备程序编写 platform驱动程序编写 测试APP编写 运行测试 Linux驱动的分离与分层 像I2C、SPI、LCD 等这些复杂外设的驱动就不…

Fortinet Accelerate 2023·中国区巡展收官丨让安全成就未来

7月18日&#xff0c;2023 Fortinet Accelerate Summit在上海成功举办&#xff01;这亦象征着“Fortinet Accelerate2023中国区巡展”圆满收官。Fortinet携手来自多个典型行业的百余位代表客户&#xff0c;以及Telstra - PBS 太平洋电信、Tenable等多家生态合作伙伴&#xff0c;…

利用数据分析告警机制,实现鸿鹄与飞书双向集成

需求描述 实现鸿鹄与飞书的双向集成&#xff0c;依赖鸿鹄的告警机制&#xff0c;可以发送用户关心的信息到飞书。同时依赖飞书强大的卡片消息功能&#xff0c;在飞书消息里面能够通过链接&#xff08;如下图&#xff09;返回到鸿鹄以方便用户进一步排查和分析问题。 解决方案 1…

【PHP面试题75】PHP有哪些魔术变量,如何使用他们?

文章目录 一、前言二、魔术变量2.1 __LINE__2.2 __FILE__2.3 __DIR__2.4 __FUNCTION__2.5 __CLASS__2.6 __TRAIT__2.7 __METHOD__2.8 __NAMESPACE__ 三、总结 一、前言 本文已收录于PHP全栈系列专栏&#xff1a;PHP面试专区。 计划将全覆盖PHP开发领域所有的面试题&#xff0c;…

CGT Asia嘉年华|2023第四届亚洲细胞与基因治疗 创新峰会(广州站)10月升级启航

近年来&#xff0c;全球CGT发展突飞猛进&#xff0c;为遗传罕见病、难治性慢性病和肿瘤患者带来了新的希望&#xff0c;也成为整个国际领域科技竞争的未来焦点。国家发改委发布的《“十四五”生物经济发展规划》明确指出要重点发展基因诊疗、干细胞治疗、免疫细胞治疗等新技术&…

利用鸿鹄优化共享储能的SCADA 系统功能,赋能用户数据自助分析

摘要 本文主要介绍了共享储能的 SCADA 系统大数据架构&#xff0c;以及如何利用鸿鹄来更好的优化 SCADA 系统功能&#xff0c;如何为用户进行数据自助分析赋能。 1、共享储能介绍 说到共享储能&#xff0c;可能不少朋友比较陌生&#xff0c;下面我们简单介绍一下共享储能的价值…

IntelliJ IDEA - Error:Module ‘name‘ production: java.lang.NullPointerException

问题描述 Error:Module name production: java.lang.NullPointerException 原因分析 一般出现这种情况多见于 IDEA 自身的问题&#xff0c;比如&#xff1a;切换分支或者拉取最新代码时结构相差过大&#xff0c;所以解决 IDEA 自身缓存的问题即可 解决方案 Build > Rebuil…

Python高光谱遥感数据处理与高光谱遥感机器学习方法深度应用

目录 ​第一章 高光谱基础 第二章 高光谱开发基础&#xff08;Python&#xff09; 第三章 高光谱机器学习技术&#xff08;python&#xff09; 第四章 典型案例操作实践 更多推荐 本教程提供一套基于Python编程工具的高光谱数据处理方法和应用案例。 涵盖高光谱遥感的基础…

2023年7月18日,File类,IO流,线程

File类 1. 概述 File&#xff0c;是文件和目录路径的抽象表示 File只关注文件本身的信息&#xff0c;而不能操作文件里的内容 。如果需要读取或写入文件内容&#xff0c;必须使用IO流来完成。 在Java中&#xff0c;java.io.File 类用于表示文件或目录的抽象路径名。它提供了一…

Rancher 管理 Kubernetes 集群

//Rancher 简介 Rancher 是一个开源的企业级多集群 Kubernetes 管理平台&#xff0c;实现了 Kubernetes 集群在混合云本地数据中心的集中部署与管理&#xff0c; 以确保集群的安全性&#xff0c;加速企业数字化转型。超过 40000 家企业每天使用 Rancher 快速创新。 官网&#…

selenium.chrome怎么写扩展拦截或转发请求?

Selenium WebDriver 是一组开源 API&#xff0c;用于自动测试 Web 应用程序&#xff0c;利用它可以通过代码来控制chrome浏览器&#xff01; 有时候我们需要mock接口的返回&#xff0c;或者拦截和转发请求&#xff0c;今天就来实现这个功能。 代码已开源&#xff1a; https:/…

HTML语法

文章目录 前言HTML 文件基本结构常见标签标签种类特殊符号图片链接a链接 双标签链接 列表表格 &#xff1a;表单多行文本域: 前言 HTML是有标签组成的 <body>hello</body>大部分标签成对出现. 为开始标签, 为结束标签. 少数标签只有开始标签, 称为 “单标签”. 开…

Helm 安装prometheus-stack 使用local pv持久化存储数据

目录 背景&#xff1a; 环境准备&#xff1a; 1. 磁盘准备 2. 磁盘分区格式化 local storage部署 1. 节点打标签 2. 创建local pv storageClass和prometheus-pv Prometheus-stack部署 1. 下载helm chart包 2. values.yaml 参数解释 3. 部署prometheus-stack 4. 查看…

Baichuan-13B:130亿参数的开源语言模型,引领中文和英文benchmark

Baichuan-13B: 一个强大的开源大规模语言模型 标题&#xff1a;Baichuan-13B&#xff1a;130亿参数的开源语言模型&#xff0c;引领中文和英文benchmark Baichuan-13B是由百川智能开发的一个开源大规模语言模型项目&#xff0c;包含了130亿参数。该模型在中文和英文的权威ben…