golang中的并发模型

并发模型

传统的编程语言(如C++、Java、Python等)并非为并发而生的,因此它们面对并发的逻辑多是基于操作系统的线程。其并发的执行单元(线程)之间的通信利用的也是操作系统提供的线程或进程间通信的原语,比如共享内存、信号、管道、消息队列、套接字等。在这些通信原语中,使用最多、最广泛同时也最高效的是结合了线程同步原语(比如锁以及更为低级的原子操作)的共享内存方式,因此,可以说传统语言的并发模型是基于共享内存的模型

Untitled

这些传统的就基于共享内存的并发模型难用且易错,在大型程序中,开发人员在设计并发程序时需要根据线程模型对程序进行建模同时规划线程之间的通信方式,且程序难以阅读、理解、维护

Go采用了CSP(Communicating Sequential Process,通信顺序进程)模型

一个符合CSP模型的并发程序应该是一组通过输入/输出原语连接起来的P的集合

Untitled

CSP模型旨在简化并发程序的编写,让并发程序的编写与编写顺序程序一样简单。Tony Hoare认为输入/输出应该是基本的编程原语,数据处理逻辑(CSP中的P)仅需调用输入原语获取数据,顺序处理数据,并将结果数据通过输出原语输出

CSP理论中的P(Process,进程)是个抽象概念,它代表任何顺序处理逻辑的封装,它获取输入数据(或从其他P的输出获取),并生产可以被其他P消费的输出数据。

为了实现CSP模型中的输入/输出原语,Go引入了goroutine(P)之间的通信原语channel。通过channel将goroutine(P)组合与连接在一起,这使得设计和编写大型并发系统变得更为简单和清晰

虽然CSP模型已经成为Go语言支持的主流并发模型,但Go也支持传统的基于共享内存的并发模型,并提供基本的低级同步原语(主要是sync包中的互斥锁、条件变量、读写锁、原子操作等

那么在实践中应该如何选择是使用channel还是低级同步原语下的共享内存?

Go始终推荐以CSP模型风格构建并发程序,尤其是在复杂的业务层面。这将提升程序逻辑的清晰度,大大降低并发设计的复杂性,并让程序更具可读性和可维护性;

对于局部情况,比如涉及性能敏感的区域或需要保护的结构体数据,可以使用更为高效的低级同步原语(如sync.Mutex),以保证goroutine对数据的同步访问。

并发模式

在语言层面,Go针对CSP模型提供了三种并发原语。

  • goroutine:对应CSP模型中的P,封装了数据的处理逻辑,是Go运行时调度的基本执行单元。
  • channel:对应CSP模型中的输入/输出原语,用于goroutine之间的通信和同步。
  • select:用于应对多路输入/输出,可以让goroutine同时协调处理多个channel操作。

深入了解一下在实践中这些原语的常见组合方式,即并发模式:

创建模式

go关键字+function/method 创建 goroutine:

go fmt.println("I'm a goroutine")
​
c := srv.NewConn(rw)
go c.serve(connCtx)

在稍微复杂的程序里,需要考虑通过原语的承载体channel在goroutine间建立联系,所以通常采用以下方式建立goroutine:

type T struct {...}func spwan(f func()) chan T {c := make(chan T)go func() {...f()...}()return c
}func main() {
//使用c与新创建的goroutine通信c := spawn(func(){})
}

在内部创建一个goroutine并返回一个channel类型变量函数

spwan函数创建的新的goroutine和调用spwan函数的goroutine通过channel建立联系

函数得以实现得益于channel作为go语言的一等公民(first-class citizen)的存在:channel可以像变量一样被初始化、传递和赋值。上面例子中的spawn只返回了一个channel变量、

退出模式

goroutine的执行函数返回意味着goroutine退出。但有些时候会要求优雅退出,以下为方案:

分离(detached)模式

是使用最广泛的goroutine退出模式

创建它的goroutine不需要关心它的退出,这类goroutine在启动后即与其创建者彻底分离,其生命周期与其执行的主函数相关,函数返回即goroutine退出。这类goroutine有两个常见用途。

一次性任务:用来执行任务完成后既退出,比如此标准库代码:

// $GOROOT/src/net/dial.gofunc (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {...if oldCancel := d.Cancel; oldCancel != nil {subCtx, cancel := context.WithCancel(ctx)defer cancel()
//有数据处理后既退出go func() {select {case <-oldCancel:cancel()case <-subCtx.Done():}}()ctx = subCtx}...
}

常驻后台执行的一些特定任务:如监视(monitor)、观察(watch)等。其实现通常采用for {…}或for { select{…} }代码段形式,并多以定时器(timer)或事件(event)驱动执行。

// $GOROOT/src/runtime/mgc.go
func gcBgMarkStartWorkers() {// 每个P都有一个运行在后台的用于标记的Gfor _, p := range allp {if p.gcBgMarkWorker == 0 {go gcBgMarkWorker(p) // 为每个P创建一个goroutine,以运行gcBgMarkWorkernotetsleepg(&work.bgMarkReady, -1)noteclear(&work.bgMarkReady)}}
}func gcBgMarkWorker(_p_ *p) {gp := getg()...for { // 常驻后台处理GC事宜...}
}

Join模式

在线程模型中,父线程可以通过pthread_join来等待子线程结束并获取子线程的结束状态。

在Go中,我们有时候也有类似的需求:goroutine的创建者需要等待新goroutine结束。

  • 等待一个goroutine退出

先看一段实例代码

func worker(args ...interface{}) {if len(args) == 0 {return}interval, ok := args[0].(int)if !ok {return}time.Sleep(time.Second * (time.Duration(interval)))
}func spawn(f func(args ...interface{}), args ...interface{}) chan struct{} {c := make(chan struct{})go func() {f(args...)c <- struct{}{}}()return c
}func main() {done := spawn(worker, 5)println("spawn a worker goroutine")<-doneprintln("worker done")
}

这个channel的用途就是在两个goroutine之间建立退出事件的“信号”通信机制。main goroutine在创建完新goroutine后便在该channel上阻塞等待,直到新goroutine退出前向该channel发送了一个信号。

运行过后

Untitled

  • 获取goroutine的退出状态

如果不仅要等goroutine退出还要精准获取其结束状态,可以通过自定义类型的channel实现这一需求:

var OK = errors.New("ok")func worker(args ...interface{}) error {if len(args) == 0 {return errors.New("invalid args")}interval, ok := args[0].(int)if !ok {return errors.New("invalid interval arg")}time.Sleep(time.Second * (time.Duration(interval)))return OK
}func spawn(f func(args ...interface{}) error, args ...interface{}) chan error {c := make(chan error)go func() {c <- f(args...)}()return c
}func main() {done := spawn(worker, 5)println("spawn worker1")err := <-donefmt.Println("worker1 done:", err)done = spawn(worker)println("spawn worker2")err = <-donefmt.Println("worker2 done:", err)
}

将channel中承载的类型由struct{}改为了error,这样channel承载的信息就不只是一个信号了,还携带了有价值的信息:新goroutine的结束状态。运行上述示例:

Untitled

  • 等待多个goroutine退出

有时候必须等待全部新goroutine退出,可以通过Go语言提供的sync.WaitGroup实现等待多个goroutine退出的模式:

func worker(args ...interface{}) {if len(args) == 0 {return}interval, ok := args[0].(int)if !ok {return}time.Sleep(time.Second * (time.Duration(interval)))
}func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {c := make(chan struct{})var wg sync.WaitGroupfor i := 0; i < n; i++ {wg.Add(1)go func(i int) {name := fmt.Sprintf("worker-%d:", i)f(args...)println(name, "done")wg.Done() // worker done!}(i)}go func() {wg.Wait()c <- struct{}{}}()return c
}func main() {done := spawnGroup(5, worker, 3)println("spawn a group of workers")<-doneprintln("group workers done")
}

通过sync.WaitGroup,spawnGroup每创建一个goroutine都会调用wg.Add(1),新创建的goroutine会在退出前调用wg.Done。

在spawnGroup中还创建了一个用于监视的goroutine,该goroutine调用sync.WaitGroup的Wait方法来等待所有goroutine退出。

在所有新创建的goroutine退出后,Wait方法返回,该监视goroutine会向done这个channel写入一个信号,这时main goroutine才会从阻塞在done channel上的状态中恢复,继续往下执行。

运行上述示例代码:

支持超时机制的等待

设置合理的退出时间,如若没有退出,则继续执行下一步:

func main() {done := spawnGroup(5, worker, 30)println("spawn a group of workers")timer := time.NewTimer(time.Second * 5)defer timer.Stop()select {case <-timer.C:println("wait group workers exit timeout!")case <-done:println("group workers done")}
}

notify-and-wait模式

main goroutine的停止代表着整个程序的停止,如果不事先通知退出,则容易导致业务数据损坏、不完整

我们可以通过notify-and-wait(通知并等待)模式来满足这一场景的要求。虽然这一模式也不能完全避免损失,但是它给了各个goroutine一个挽救数据的机会,从而尽可能减少损失。

  • 通知并等待一个goroutine的退出
func worker(j int) {time.Sleep(time.Second * (time.Duration(j)))
}func spawn(f func(int)) chan string {quit := make(chan string)go func() {var job chan int // 模拟job channelfor {select {case j := <-job:f(j)case <-quit:quit <- "ok"}}}()return quit
}func main() {quit := spawn(worker)println("spawn a worker goroutine")time.Sleep(5 * time.Second)// 通知新创建的goroutine退出println("notify the worker to exit...")quit <- "exit"timer := time.NewTimer(time.Second * 10)defer timer.Stop()select {case status := <-quit:println("worker done:", status)case <-timer.C:println("wait worker exit timeout")}
}

执行

此时,spawn函数不仅发送退出信号给创建者还承载创建者发送的退出信号,形成了一个双向的数据通道

  • 通知并等待多个goroutine退出

channel存在一个特性:当使用close关闭channel时,所有阻塞到该channel上的goroutine都会得到通知,所以可以利用这一特性实现这一模式:

func worker(j int) {time.Sleep(time.Second * (time.Duration(j)))
}func spawnGroup(n int, f func(int)) chan struct{} {quit := make(chan struct{})job := make(chan int)var wg sync.WaitGroupfor i := 0; i < n; i++ {wg.Add(1)go func(i int) {defer wg.Done() // 保证wg.Done在goroutine退出前被执行name := fmt.Sprintf("worker-%d:", i)for {j, ok := <-jobif !ok {println(name, "done")return}// 执行这个jobworker(j)}}(i)}go func() {<-quitclose(job) // 广播给所有新goroutinewg.Wait()quit <- struct{}{}}()return quit
}func main() {quit := spawnGroup(5, worker)println("spawn a group of workers")time.Sleep(5 * time.Second)// 通知 worker goroutine 组退出println("notify the worker group to exit...")quit <- struct{}{}timer := time.NewTimer(time.Second * 5)defer timer.Stop()select {case <-timer.C:println("wait group workers exit timeout!")case <-quit:println("group workers done")}
}

创建者直接利用了worker goroutine接收任务(job)的channel来广播退出通知,而实现这一广播的代码就是close(job)。此时各个worker goroutine监听job channel,当创建者关闭job channel时,通过“comma ok”模式获取的ok值为false,也就表明该channel已经被关闭,于是worker goroutine执行退出逻辑(退出前wg.Done()被执行)。

执行:

退出模式的应用

由于goroutine的运行状态不同,因此很难用同种框架全面管理,所以我们可以只实现一个“超时等待退出”框架,以统一解决各种运行状态

一组goroutine的退出有两种情况,第一种情况是并发退出,当goroutine的退出先后数据对数据处理无影响时可使用;另一种则是串行退出,也就是次序错误可能导致程序状态混乱和错误

  • 并发退出
  • 串行退出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/148361.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Unity】单例模式及游戏声音管理类应用

【Unity】单例模式及游戏声音管理类应用 描述 在日常游戏项目开发中&#xff0c;单例模式是一种常用的设计模式&#xff0c;它允许在应用程序的生命周期中只创建一个对象实例&#xff0c;并提供对该实例的全局访问点。通过使用单例模式&#xff0c;可以提高代码的可维护性和可…

2024年山东省职业院校技能大赛中职组 “网络安全”赛项竞赛试题-B卷

2024年山东省职业院校技能大赛中职组 “网络安全”赛项竞赛试题-B卷 2024年山东省职业院校技能大赛中职组 “网络安全”赛项竞赛试题-B卷A模块基础设施设置/安全加固&#xff08;200分&#xff09;A-1&#xff1a;登录安全加固&#xff08;Windows, Linux&#xff09;A-2&#…

Verilog基础:仿真时x信号的产生和x信号对于各运算符的特性

相关阅读 Verilog基础https://blog.csdn.net/weixin_45791458/category_12263729.html?spm1001.2014.3001.5482 信号爆x也许是所有IC人的噩梦&#xff0c;满屏的红色波形常让人头疼不已&#xff0c;但x信号的产生原因却常常只有几种&#xff0c;只要遵循一定的代码规范&#…

超聚变服务器关闭超线程CPU的步骤(完整版)

前言: 笨鸟先飞&#xff0c;好记性不如烂笔头。 我们项目都用不到超线程CPU&#xff0c;所以调测设备的时候都需要关掉&#xff0c;最近新设备换成了超聚变的服务器&#xff0c;这篇记录我关闭&#xff08;超聚变&#xff09;服务器超线程CPU的方法步骤。 关闭超线程CPU的步骤…

Django command执行脚本

python web项目中经常会使用到脚本&#xff0c;一般来说有两种很简单的方法&#xff0c;一种是直接python function&#xff0c;另一种就是 django 自定义command。 对比常规脚本 这里举个简单的例子&#xff0c;比如初始化数据、文件名称为initialize_data.py &#xff08;1…

集合的运算

集合的运算 #include <stdio.h> #include <stdlib.h> void print(int size, char arr[]) {if (size 0) {printf("null");}for (int i 0; i < size; i) {printf("%c", arr[i]);}printf("\n"); } int main() {char U[] { a,b,c,…

JS-项目实战-鼠标悬浮变手势(鼠标放单价上生效)

1、鼠标悬浮和离开事件.js //当页面加载完成后执行后面的匿名函数 window.onload function () {//get:获取 Element:元素 By:通过...方式//getElementById()根据id值获取某元素let fruitTbl document.getElementById("fruit_tbl");//table.rows:获取这个表格…

基于单片机音乐弹奏播放DS1302万年历显示及源程序

一、系统方案 1、本设计采用51单片机作为主控器。 2、DS1302计时显示年月日时分秒。 3、按键可以弹奏以及播放音乐&#xff0c;内置16首音乐。 二、硬件设计 原理图如下&#xff1a; 三、单片机软件设计 1、首先是系统初始化 /时钟显示**/ void init_1602_ds1302() { write…

文件转换,简简单单,pdf转word,不要去找收费的了,自己学了之后免费转,之后就复制粘贴就ok了

先上一个链接pdf转word文件转换 接口层 PostMapping("pdfToWord")public String pdfToWord(RequestParam("file") MultipartFile file) throws IOException {String fileName FileExchangeUtil.pdfToWord(file.getInputStream(),file.getName());return…

机器学习-搜索技术:从技术发展到应用实战的全面指南

在本文中&#xff0c;我们全面探讨了人工智能中搜索技术的发展&#xff0c;从基础算法如DFS和BFS&#xff0c;到高级搜索技术如CSP和优化问题的解决方案&#xff0c;进而探索了机器学习与搜索的融合&#xff0c;最后展望了未来的趋势和挑战&#xff0c;提供了对AI搜索技术深刻的…

Lesson 04 模板入门

C&#xff1a;渴望力量吗&#xff0c;少年&#xff1f; 文章目录 一、泛型编程1. 引入2. 函数模板&#xff08;1&#xff09;函数模板概念&#xff08;2&#xff09;函数模板格式&#xff08;3&#xff09;函数模板的原理&#xff08;4&#xff09;函数模板的实例化&#xff08…

uniapp优化h5项目-摇树优化,gzip压缩和删除console.log

1.摇树优化 勾选摇树优化,打包删除死代码 2.gzip压缩和删除console.log 安装插件webpack和compression-webpack-plugin webpack插件 npm install webpack4.46.0 --save-devcompression-webpack-plugin插件 npm install compression-webpack-plugin6.1.1 --save-devconst Com…

C语言链表

head.h typedef struct Node_s{int data; //数据域struct Node_s *pNext; //指针域 } Node_t, *pNode_t;void headInsert(pNode_t *ppHead, pNode_t *ppTail, int data); void print(pNode_t pHead); void tailInsert(pNode_t *ppHead, pNode_t *ppTail, int data); void sort…

PMCW体制雷达系列文章(4) – PMCW雷达之抗干扰

说明 本文作为PMCW体制雷达系列文章之一&#xff0c;主要聊聊FMCW&PMCW两种体制雷达的干扰问题。事实上不管是通信领域还是雷达领域&#xff0c;对于一切以电磁波作为媒介的信息传递活动&#xff0c;干扰是无处不在的。近年来&#xff0c;随着雷达装车率的提高&#xff0c;…

QQ五毛项目记

问题与挑战&#xff1a;某公司为了实现某马总造福全人类&#xff0c;红旗插遍全球的宏伟目标&#xff0c;为应对后续用户激增的问题。特别安排了一次针对全体用户的秒杀活动&#xff1a;于XXXX年XX月XX日XX时XX分XX秒开始的秒杀五毛钱一百个QQ币的活动。每个账户仅限一次&#…

Stable Diffusion 启动时 got an unexpected keyword argument ‘socket_options‘ 错误解决

Stable Diffusion 启动时 got an unexpected keyword argument socket_options 错误解决 问题解决方法 问题 Launching Web UI with arguments: Traceback (most recent call last):File "launch.py", line 48, in <module>main()File "launch.py"…

HTTP Error 500.31 - Failed to load ASP.NET Core runtime

在winserver服务器上部署net6应用后&#xff0c;访问接口得到以下提示&#xff1a; 原因是因为没有安装net6的运行时和环境&#xff0c;我们可以在windows自带的 “事件查看器” 查看原因。 可以直接根据给出的地址去官网下载sdk环境&#xff0c;安装即可 下载对应的net版本…

CentOS Linux release 7.9.2009 (Core)中安装配置Tomcat

一、安装JDK 部分内容可以参考我这篇文章&#xff1a;Windows11与CentOS7下配置与检测JDK与Maven环境变量 中的 2.2 安装jdk-8u371-linux-x64.tar.gz和配置环境变量/etc/profile //1、安装redhat-lsb yum install -y redhat-lsb//2、查看系统版本信息 lsb_release -a //3、查…

结构体——C语言初阶

一.结构体的声明&#xff1a; &#xff08;1&#xff09;结构的基础知识&#xff1a; 结构体是一种构造数据类型把不同类型的数据组合成一个整体结构体是一些值的集合&#xff0c;这些值称为成员变量。结构的每个成员可以是不同类型的变量需要注意的是&#xff0c;结构体是一种…

【C++入门】拷贝构造运算符重载

目录 1. 拷贝构造函数 1.1 概念 1.2 特征 1.3 常用场景 2. 赋值运算符重载 2.1 运算符重载 2.2 特征 2.3 赋值运算符 前言 拷贝构造和运算符重载是面向对象编程中至关重要的部分&#xff0c;它们C编程中的一个核心领域&#xff0c;本期我详细的介绍拷贝构造和运算符重载。 1. …