Go 之常用并发学习

在 Go语言实战 中看到有些并发相关的例子,讲解得也比较详细,于是乎写来加深下印象。

无缓冲通道

无缓冲通道在接收前没有能力保存任何值。我自己找了书上的示例来加深一下印象。

模拟网球比赛

package mainimport ("fmt""math/rand""sync""time"
)var (wg sync.WaitGroup
)func init() {rand.Seed(time.Now().UnixNano()) // 让每次运行生成的随机数不相同
}func main() {count := make(chan int)wg.Add(2)go player("Nadal", count)go player("Looking", count)count <- 1wg.Wait()
}func player(name string, count chan int) {defer wg.Done()for {ball, ok := <-countif !ok {fmt.Printf("Player %s Win\n", name)return}n := rand.Intn(100)if n%13 == 0 {fmt.Printf("Player %s Missed\n", name)close(count)return}fmt.Printf("Player %s Hit %d\n", name, ball)count <- ball}
}

模拟接力赛

接力赛中,接力棒只能在一个人手中。

package mainimport ("fmt""sync""time"
)var (wg sync.WaitGroup
)func main() {baton := make(chan int)wg.Add(1)go Runner(baton)baton <- 1wg.Wait()
}func Runner(baton chan int)  {var newRunner intrunner := <- batonfmt.Printf("Runner %d Running with baton\n", runner)if runner != 4 {newRunner = runner + 1fmt.Printf("Runner %d to the line\n", newRunner)go Runner(baton)}time.Sleep(100 * time.Millisecond)if runner == 4 {fmt.Printf("Runner %d finished, Race over\n", runner)wg.Done()return}fmt.Printf("Runner %d Exchange with runner %d \n", runner, newRunner)baton <- newRunner
}
Runner 1 Running with baton
Runner 2 to the line
Runner 1 Exchange with runner 2 
Runner 2 Running with baton
Runner 3 to the line
Runner 2 Exchange with runner 3 
Runner 3 Running with baton
Runner 4 to the line
Runner 3 Exchange with runner 4 
Runner 4 Running with baton
Runner 4 finished, Race over

有缓冲通道

缓冲通道不强制发送和接收同时完成。

当通道关闭后,goroutine 依旧可以从通道接收数据,但是不能再向通道里发送数据。能够从已经关闭的通道接收数据这一点非常重要,因为这允许通 道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。从一个已经关闭且没有数据的通道 里获取数据,总会立刻返回,并返回一个通道类型的零值。如果在获取通道时还加入了可选的标 志,就能得到通道的状态信息。

模拟任务分发和处理

package mainimport ("fmt""math/rand""sync""time"
)const (numberGoroutines = 4taskLoad         = 10
)var wg sync.WaitGroupfunc init() {rand.Seed(time.Now().UnixNano())
}func main() {tasks := make(chan string, taskLoad)wg.Add(numberGoroutines)for gr := 1; gr <= numberGoroutines; gr++ {go worker(tasks, gr)}for post := 1; post <= taskLoad; post++ {tasks <- fmt.Sprintf("Task: %d", post)}close(tasks) // 任务发布完毕后关闭通道,关闭通道不影响其它Goroutine对已发布内容的正常接收wg.Wait()
}func worker(tasks chan string, worker int) {defer wg.Done()for {task, ok := <-tasksif !ok {fmt.Printf("Worker: %d : Shutting down\n", worker)return}fmt.Printf("Worker: %d : Started %s\n", worker, task)sleep := rand.Int63n(100)time.Sleep(time.Duration(sleep) * time.Millisecond)fmt.Printf("Worker: %d : Completed %s\n", worker, task)}
}
Worker: 4 : Started Task: 4
Worker: 2 : Started Task: 2
Worker: 3 : Started Task: 3
Worker: 1 : Started Task: 1
Worker: 3 : Completed Task: 3
Worker: 3 : Started Task: 5
Worker: 2 : Completed Task: 2
Worker: 2 : Started Task: 6
Worker: 1 : Completed Task: 1
Worker: 1 : Started Task: 7
Worker: 1 : Completed Task: 7
Worker: 1 : Started Task: 8
Worker: 3 : Completed Task: 5
Worker: 3 : Started Task: 9
Worker: 4 : Completed Task: 4
Worker: 4 : Started Task: 10
Worker: 2 : Completed Task: 6
Worker: 2 : Shutting down
Worker: 3 : Completed Task: 9
Worker: 3 : Shutting down
Worker: 1 : Completed Task: 8
Worker: 1 : Shutting down
Worker: 4 : Completed Task: 10
Worker: 4 : Shutting down

runner

runner 包用于展示如何使用通道来监视程序的执行时间,如果程序运行时间太长,也可以
用runner 包来终止程序。当开发需要调度后台处理任务的程序的时候,这种模式会很有用。

runner/runner.go

package runnerimport ("errors""os""os/signal""time"
)type Runner struct {interrupt chan os.Signalcomplete  chan errortimeout   <-chan time.Time // 单向通道,只允许接收tasks     []func(int)
}var ErrTimeout = errors.New("received timeout")
var ErrInterrupt = errors.New("received interrupt")func New(d time.Duration) *Runner {return &Runner{interrupt: make(chan os.Signal, 1),complete:  make(chan error),timeout:   time.After(d),}
}func (r *Runner) Add(tasks ...func(int)) {r.tasks = append(r.tasks, tasks...)
}func (r *Runner) Start() error {signal.Notify(r.interrupt, os.Interrupt) // 如果有中断,会将中断信号发送到 r.interruptgo func() {r.complete <- r.run()}()select {case err := <-r.complete:return err // 如果提前中断,err 是 ErrInterrupt,正常结束则是 nilcase <-r.timeout:return ErrTimeout}
}func (r *Runner) run() error {for id, task := range r.tasks {if r.gotInterrupt() {return ErrInterrupt}task(id)}return nil
}func (r *Runner) gotInterrupt() bool {select {case <-r.interrupt:signal.Stop(r.interrupt)return truedefault:return false}
}

main.go 

package mainimport ("github.com/test/runner""log""os""time"
)const timeout = 3 * time.Secondfunc main() {log.Println("Starting work.")r := runner.New(timeout)r.Add(createTask(), createTask(), createTask())if err := r.Start(); err != nil {switch err {case runner.ErrTimeout:log.Println("Terminating due to timeout.")os.Exit(1)case runner.ErrInterrupt:log.Println("Terminating due to interrupt.")os.Exit(2)}}log.Println("Process ended.")
}func createTask() func(int) {return func(id int) {log.Printf("Processor - Task #%d.", id)time.Sleep(time.Duration(id) * time.Second)}
}

超时退出

2024/04/21 17:26:30 Starting work.
2024/04/21 17:26:30 Processor - Task #0.
2024/04/21 17:26:30 Processor - Task #1.
2024/04/21 17:26:31 Processor - Task #2.
2024/04/21 17:26:33 Terminating due to timeout.

中断退出

2024/04/21 17:28:18 Starting work.
2024/04/21 17:28:18 Processor - Task #0.
2024/04/21 17:28:18 Processor - Task #1.
2024/04/21 17:28:19 Terminating due to interrupt.

正常退出

2024/04/21 17:30:40 Starting work.
2024/04/21 17:30:40 Processor - Task #0.
2024/04/21 17:30:40 Processor - Task #1.
2024/04/21 17:30:41 Processor - Task #2.
2024/04/21 17:30:43 Process ended.

pool

pool 使用有缓冲通道实现资源池。

pool/pool.go

package poolimport ("errors""io""log""sync"
)type Pool struct {m         sync.Mutexresources chan io.Closerfactory   func() (io.Closer, error)closed    bool
}var ErrPoolClosed = errors.New("pool has been closed")func New(fn func() (io.Closer, error), size uint) (*Pool, error) {if size <= 0 {return nil, errors.New("size value too small")}return &Pool{resources: make(chan io.Closer, size),factory:   fn,}, nil
}func (p *Pool) Acquire() (io.Closer, error) {select {case r, ok := <-p.resources:log.Println("Acquire:", "Shared resource")if !ok {return nil, ErrPoolClosed}return r, nildefault:log.Println("Acquire:", "New resource")return p.factory()}
}func (p *Pool) Release(r io.Closer) {p.m.Lock()defer p.m.Unlock()if p.closed {r.Close()return}select {case p.resources <- r:log.Println("Release:", "In queue")default:log.Println("Release:", "Closing")r.Close()}
}func (p *Pool) Close() {p.m.Lock()defer p.m.Unlock()if p.closed {return}p.closed = trueclose(p.resources)for r := range p.resources {r.Close()}
}

main.go

package mainimport ("github.com/test/pool""io""log""math/rand""sync""sync/atomic""time"
)const (maxGoroutines   = 5pooledResources = 2
)type dbConnection struct {ID int32
}func (dbConn *dbConnection) Close() error {log.Println("Close: Connection", dbConn.ID)return nil
}var idCounter int32func createConnection() (io.Closer, error) {id := atomic.AddInt32(&idCounter, 1)log.Println("Create: New Connection", id)return &dbConnection{id}, nil
}func main() {var wg sync.WaitGroupwg.Add(maxGoroutines)p, err := pool.New(createConnection, pooledResources)if err != nil {log.Println(err)}for query := 0; query < maxGoroutines; query++ {go func(q int) {defer wg.Done()performQueries(q, p)}(query)}wg.Wait()log.Println("Shutdown program")p.Close()
}func performQueries(query int, p *pool.Pool) {conn, err := p.Acquire()if err != nil {log.Println(err)return}defer p.Release(conn)time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
}

运行结果 

2024/04/21 20:29:20 Acquire: New resource
2024/04/21 20:29:20 Create: New Connection 1
2024/04/21 20:29:20 Acquire: New resource
2024/04/21 20:29:20 Acquire: New resource
2024/04/21 20:29:20 Create: New Connection 3
2024/04/21 20:29:20 Acquire: New resource
2024/04/21 20:29:20 Create: New Connection 4
2024/04/21 20:29:20 Create: New Connection 2
2024/04/21 20:29:20 Acquire: New resource
2024/04/21 20:29:20 Create: New Connection 5
2024/04/21 20:29:20 QID[0] CID[2]
2024/04/21 20:29:20 Release: In queue
2024/04/21 20:29:20 QID[2] CID[5]
2024/04/21 20:29:20 Release: In queue
2024/04/21 20:29:20 QID[4] CID[1]
2024/04/21 20:29:20 Release: Closing
2024/04/21 20:29:20 Close: Connection 1
2024/04/21 20:29:21 QID[1] CID[4]
2024/04/21 20:29:21 Release: Closing
2024/04/21 20:29:21 Close: Connection 4
2024/04/21 20:29:21 QID[3] CID[3]
2024/04/21 20:29:21 Release: Closing
2024/04/21 20:29:21 Close: Connection 3
2024/04/21 20:29:21 Shutdown program
2024/04/21 20:29:21 Close: Connection 2
2024/04/21 20:29:21 Close: Connection 5

work

work 使用无缓冲通道创建资源池。work 新建时就生成指定个数 goroutine 循环等待(无缓冲通道无数据阻塞)消费任务,然后主线程再分发相应任务到通道,消费 goroutine 再继续执行消费任务。

work/work.go

package workimport "sync"type Worker interface {Task()
}type Pool struct {work chan Workerwg   sync.WaitGroup
}func New(maxGoroutines int) *Pool {p := Pool{work: make(chan Worker),}p.wg.Add(maxGoroutines)for i := 0; i < maxGoroutines; i++ {go func() {defer p.wg.Done()for w := range p.work { // 如果 work 关闭,for range 循环结束w.Task()}}()}return &p
}func (p *Pool) Run(w Worker) {p.work <- w
}func (p *Pool) Shutdown() {close(p.work) // 关闭通道,避免 New 产生的 goroutine 一直阻塞不退出p.wg.Wait()
}

main.go

package mainimport ("github.com/test/work""log""sync""time"
)var names = []string{"steve","jason","looking",
}type namePrinter struct {name string
}func (m *namePrinter) Task() {log.Println(m.name)time.Sleep(time.Second)
}const times = 3func main() {p := work.New(2)var wg sync.WaitGroupwg.Add(times * len(names))for i := 0; i < times; i++ {for _, name := range names {np := namePrinter{name: name}go func() {defer wg.Done()p.Run(&np)}()}}wg.Wait()p.Shutdown()
}

运行结果

2024/04/21 21:39:28 steve
2024/04/21 21:39:28 looking
2024/04/21 21:39:29 jason
2024/04/21 21:39:29 looking
2024/04/21 21:39:30 steve
2024/04/21 21:39:30 jason
2024/04/21 21:39:31 looking
2024/04/21 21:39:31 steve
2024/04/21 21:39:32 jason

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/1456.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信有关白名单IP

一、商家支付 二、公众号

开启智慧之旅,AI与机器学习驱动的微服务设计模式探索

​&#x1f308; 个人主页&#xff1a;danci_ &#x1f525; 系列专栏&#xff1a;《设计模式》 &#x1f4aa;&#x1f3fb; 制定明确可量化的目标&#xff0c;坚持默默的做事。 &#x1f680; 转载自热榜文章&#x1f525;&#xff1a;探索设计模式的魅力&#xff1a;开启智慧…

基于SpringBoot + Vue实现的时装购物管理系统设计与实现+毕业论文+开题报告+答辩PPT

介绍 系统包含用户、管理员两个角色 管理员&#xff1a;首页、个人中心、用户管理、商品分类管理、颜色管理、商品信息管理、商品评价管理、系统管理、订单管理 用户:首页、个人中心、商品评价管理、我的收藏管理、订单管理 前台首页:首页、商品信息、商品资讯、个人中心、后台…

【MySQL】查询(进阶)

文章目录 前言1、新增2、聚合查询2.1聚合函数2.1.1count2.1.2sum2.1.3avg2.1.4max和min 2.2、GROUP BY子句2.3HAVING 3、联合查询/多表查询3.1内连接和外连接3.2自连接3.3子查询3.4合并查询 前言 在前面的内容中我们已经把查询的基本操作介绍的差不多了&#xff0c;接下来我们…

Llama 3 实测效果炸裂,一秒写数百字(附镜像站)

这几天大火的llama 3刚刚在https://askmanyai.cn上线了&#xff01; 玩了一会儿&#xff0c;这个生成速度是真的亚麻呆住。文案写作和代码生成直接爽到起飞&#xff0c;以往gpt要写一两分钟的千字文&#xff0c;llama 3几秒钟就写完了。而且效果甚至感觉更好&#xff1f; 效果惊…

Java 中的重写与重载

目录 重写 重写的条件 重写的示例 重载&#xff08;Overload&#xff09; 重载的条件 重载的示例 区别总结 Java 作为一门面向对象的编程语言&#xff0c;提供了丰富的多态性支持&#xff0c;其中重写&#xff08;Override&#xff09;和重载&#xff08;Overload&#…

el-menu 有一级二级三级菜单

效果如下 菜单代码如下 <el-menu:default-active"menuDefaultActive"class"el-menu-box":text-color"menuTextColor":active-text-color"menuActiveTextColor":unique-opened"true"><!-- 一级菜单 --><tem…

浏览器原理之浏览器同源策略

一 什么是同源策略 同源策略&#xff08;Same-Origin Policy, SOP&#xff09;是一种重要的安全策略&#xff0c;用于Web浏览器中。它限制了一个源&#xff08;origin&#xff09;的文档或脚本如何与另一个源的资源进行交互。这有助于防止恶意文档窃取另一个文档的数据&#x…

二、python+前端 实现MinIO分片上传

python前端 实现MinIO分片上传 一、背景二、流程图三、代码 一、背景 问题一&#xff1a;前端 -> 后端 ->对象存储 的上传流程&#xff0c;耗费带宽。 解决方案&#xff1a;上传流程需要转化为 前端 -> 对象存储&#xff0c;节省上传带宽 问题二&#xff1a;如果使用…

Crypto量化高频体验总结

Crypto量化高频体验总结 人工智能与量化交易算法知识库 2024-04-21 21:02 美国 以下文章来源于Quant搬砖工 &#xff0c;作者quant搬砖队工头 Quant搬砖工. 稳健的收益要一点一点赚&#xff0c;量化的板砖要一块一块搬&#xff01; 前言 前两天在翻历史文章的时候&#xf…

【高阶数据结构】并查集 -- 详解

一、并查集的原理 1、并查集的本质和概念 &#xff08;1&#xff09;本质 并查集的本质&#xff1a;森林。 &#xff08;2&#xff09;概念 在一些应用问题中&#xff0c;需要将 n 个不同的元素划分成一些不相交的集合。 开始时&#xff0c;每个元素自成一个单元素集合&…

SpringBoot 集成Nacos注册中心和配置中心-支持自动刷新配置

SpringBoot 集成Nacos注册中心和配置中心-支持自动刷新配置 本文介绍SpringBoot项目集成Nacos注册中心和配置中心的步骤&#xff0c;供各位参考使用 1、配置pom.xml 文件 在pom.xml文件中定义如下配置和引用依赖&#xff0c;如下所示&#xff1a; <properties><pr…

buuctf之ciscn_2019_c_1

ciscn_2019_c_1 一、查看属性二、静态分析三、动态分析四、思路五、exp 一、查看属性 首先还是必要的查看属性环节&#xff1a; 可以知道该文件是一个x86架构下的64位小端ELF文件&#xff0c;开启了栈不可执行&#xff08;NX&#xff09; 执行一下&#xff0c;先有一个选择&…

ROS2 王牌升级:Fast-DDS 性能直接碾压 zeroMQ 「下」

以下内容为本人的学习笔记&#xff0c;如需要转载&#xff0c;请声明原文链接 微信公众号「ENG八戒」https://mp.weixin.qq.com/s/aU1l3HV3a9YnwNtC1mTiOA 性能比较 下面就以官网的测试数据为准&#xff0c;让我们一起来看看它们的性能差别到底怎样。 本次比较仅针对 Fast RT…

SQL语法基础-其他函数V

SQL语法基础-其他函数V 一、数据类型转换函数二、系统信息函数三、条件表达式函数四、XML相关函数五、JSON函数&#xff08;从Oracle 12c开始支持&#xff09; 一、数据类型转换函数 这类函数用于将数据从一种类型转换为另一种类型&#xff0c;非常有用于数据清洗和准备阶段。…

60道计算机二级模拟试题选择题(含答案和解析)

点击下载《60道计算机二级模拟试题选择题&#xff08;含答案和解析&#xff09;》 1. 前言 本文设计了一份针对计算机二级考试的选择题&#xff0c;旨在考察考生对计算机基础知识和应用技能的掌握情况。试题涵盖了计算机基础知识、操作系统、办公软件、计算机网络等多个方面&…

【CVPR2023】《A2J-Transformer:用于从单个RGB图像估计3D交互手部姿态的锚点到关节变换网络

这篇论文的标题是《A2J-Transformer: Anchor-to-Joint Transformer Network for 3D Interacting Hand Pose Estimation from a Single RGB Image》&#xff0c;作者是Changlong Jiang, Yang Xiao, Cunlin Wu, Mingyang Zhang, Jinghong Zheng, Zhiguo Cao, 和 Joey Tianyi Zhou…

polkit服务启动失败

使用systemctl 命令报错 Authorization not available. Check if polkit service is running or see debug message for more information. 查看polkit状态是失败的状态&#xff0c;报缺少libstdc.so.6 systemctl status polkit 需要安装libstdc.so.6库 先加载所有安装包 …

Java学习Go(入门)

下载Go 《官网下载golang》 直接点Download&#xff0c;然后根据你自己的操作系统进行下载&#xff0c;我这里以win10为例 安装go 默认安装到C:\Program Files\Go&#xff0c;这里我们可以选择安装到其他盘&#xff0c;也可以选择默认安装。初学者建议直接一路next。 安装完…

IMUGNSS的误差状态卡尔曼滤波器(ESKF)---更新过程

IMU&GNSS的误差状态卡尔曼滤波器&#xff08;ESKF&#xff09;---更新过程 ESKF的更新过程 ESKF的更新过程 前面介绍的是ESKF的运动过程&#xff0c;现在考虑更新过程。假设一个抽象的传感器能够对状态变量产生观测&#xff0c;其观测方程为抽象的h,那么可以写为 其中z为…