go并发设计模式runner模式

go并发设计模式runner模式

在这里插入图片描述

真正运行的程序不可能是单线程运行的,go语言中最值得骄傲的就是CSP模型了,可以说go语言是CSP模型的实现。

假设现在有一个程序需要实现,这个程序有以下要求:

在这里插入图片描述

  • 程序可以在分配的时间内完成工作,正常终止;
  • 程序没有及时完成工作,“自杀”;
  • 接收到操作系统发送的中断事件,程序立刻试图清理状态并停止工作

数据类型设计

程序需要在规定时间内完成工作的最简单方法就是使用goroutine和channel,我们需要一个chan用来接收操作完成的信号,完成任务的函数可能有错误信息返回,因此我们这里定义一个错误类型的通道,用来通知什么时候完成任务以及完成任务的错误信息。

complete chan error

任务执行超时的最简单方法就是使用time包提供的After函数,当指定的时间内没有完成任务那么就出发一下超时通道,因为只需要接收超时的信号,因此只需要定义一个单向接收通道即可

timeout <-chan time.Time

当发生系统中断事件时,程序能立刻清理状态然后清理资源并停止工作,因此我们需要一个信号通道来接收操作系统发送的中断信号,这里我们使用signal包提供的Notify函数来注册信号,当操作系统发送信号时,会通过信号通道发送信号

interrupt chan os.Signal

程序最重要的是能够处理任务,用户需要处理多少任务提前是不能确定的,我们需要一个任务列表,这里我们使用一个切片来保存这些任务。

tasks []func(int)

经过上述设计,我们定义一个Runner结构体,用来保存这些通道和任务列表。

// 并且在操作系统发送中断信号时结束这些任务
type Runner struct {// interrupt channel 用来接收操作系统发送的信号interrupt chan os.Signal// complete channel 用来通知任务已经完成complete chan error// timeout channel 用来通知任务已经超时的接收通道timeout <-chan time.Time// tasks 用来保存任务列表tasks []func(int)
}

错误系统设计

错误系统设计,我们希望在任务执行完成或者超时或者操作系统发送的中断信号时返回错误,因此我们定义两个个错误变量,分别用来保存超时错误,中断错误和正常完成错误。

// ErrTimeout 定义一个超时错误, 会在人物执行超时时被返回
var ErrTimeout = errors.New("received timeout")// ErrInterrupt 定义一个中断错误, 会在收到操作系统事件的时候返回
var ErrInterrupt = errors.New("received interrupt")

数据类型说明

Signal

os.Signal 是一个接口类型,是对不同操作系统上捕获的信号的一个抽象接口,用来从操作系统接收中断事件。

// A Signal represents an operating system signal.
// The usual underlying implementation is operating system-dependent:
// on Unix it is syscall.Signal.
type Signal interface {String() stringSignal() // to distinguish from other Stringers
}

Error

error 是一个接口类型,用来表示错误,所有错误类型都实现了error接口,因此我们可以通过error接口来判断错误类型。

Time

time.Time 是一个结构体类型,用来表示一个时间,包含年月日时分秒纳秒等信息。

type Time struct {// wall and ext encode the wall time seconds, wall time nanoseconds,// and optional monotonic clock reading in nanoseconds.//// From high to low bit position, wall encodes a 1-bit flag (hasMonotonic),// a 33-bit seconds field, and a 30-bit wall time nanoseconds field.// The nanoseconds field is in the range [0, 999999999].// If the hasMonotonic bit is 0, then the 33-bit field must be zero// and the full signed 64-bit wall seconds since Jan 1 year 1 is stored in ext.// If the hasMonotonic bit is 1, then the 33-bit field holds a 33-bit// unsigned wall seconds since Jan 1 year 1885, and ext holds a// signed 64-bit monotonic clock reading, nanoseconds since process start.wall uint64ext  int64// loc specifies the Location that should be used to// determine the minute, hour, month, day, and year// that correspond to this Time.// The nil location means UTC.// All UTC times are represented with loc==nil, never loc==&utcLoc.loc *Location
}

方法设计

在go中方法需要示例进行调用,因此我们最后定义一个用来创建Runner实例的New方法,避免用户自行创建实例,导致示例的创建不统一。

名为 New 的工厂函数。这个函数接收一个 time.Duration 类型的值,并返回 Runner 类型的指针。这个函数会创建一个 Runner 类型的值,并初始化每个通道字段。因为 task 字段的零值是 nil,已经满足初始化的要求,所以没有被明确初始化。每个通道字段都有独立的初始化过程

通道 interrupt 被初始化为缓冲区容量为 1 的通道。这可以保证通道至少能接收一个来自语言运行时的 os.Signal 值,确保语言运行时发送这个事件的时候不会被阻塞。如果 goroutine没有准备好接收这个值,这个值就会被丢弃。例如,如果用户反复敲 Ctrl+C 组合键,程序只会在这个通道的缓冲区可用的时候接收事件,其余的所有事件都会被丢弃。

通道 complete 被初始化为无缓冲的通道。当执行任务的 goroutine 完成时,会向这个通道发送一个 error 类型的值或者 nil 值。之后就会等待 main 函数接收这个值。一旦 main 接收了这个 error 值, goroutine 就可以安全地终止了。

最后一个通道 timeout 是用 time 包的 After 函数初始化的。 After 函数返回一个time.Time 类型的通道。语言运行时会在指定的 duration 时间到期之后,向这个通道发送一个 time.Time 的值。

// New 返回一个Runner实例
func New(d time.Duration) *Runner {return &Runner{// 1个缓冲的信号通道interrupt: make(chan os.Signal, 1),// 没有缓冲的信号通道,如果没有接受者那么会阻塞complete: make(chan error),timeout:  time.After(d),}
}

Add 方法用来添加任务,因为需要执行的任务前期并不确定有多少,因此Add接收一个名为tasks的可变参数,可变参数可以接受任意数量的值作为传入参数。这个例子里,这些传入的值必须是一个接收一个整数且什么都
不返回的函数。函数执行时的参数 tasks 是一个存储所有这些传入函数值的切片。

// Add 方法用来添加任务,这个任务是一个接收int类型的ID作为参数的函数
func (r *Runner) Add(tasks ...func(int)) {r.tasks = append(r.tasks, tasks...)
}

run 方法会迭代 tasks 切片,并按顺序执行每个函数

func (r *Runner) run() error {for id, task := range r.tasks {if r.gotInterrupt() {return ErrInterrupt}// 执行注册的任务task(id)}return nil
}

gotInterrupt 展示了带 default 分支的 select 语句的经典用法。代码试图从 interrupt 通道去接收信号。一般来说, select 语句在没有任何要接收的数据时会阻塞,不过有了 default 分支就不会阻塞了。 default 分支会将接收 interrupt 通道的阻塞调用转变为非阻塞的。如果 interrupt 通道有中断信号需要接收,就会接收并处理这个中断。如果没有需要接收的信号,就会执行 default 分支。当收到中断信号后,代码会通过调用 Stop 方法来停止接收之后的所有事件。之后函数返回 true。如果没有收到中断信号,在第 99 行该方法会返回 false。本质上,gotInterrupt 方法会让 goroutine 检查中断信号,如果没有发出中断信号,就继续处理工作。

// gotInterrupt 检查是否接收到中断信号
func (r *Runner) gotInterrupt() bool {select {// 如果有中断信号那么返回truecase <-r.interrupt:// 接收到中断信号,停止后续再接收到中断信号signal.Stop(r.interrupt)return true// 没有终端信号返回false,继续执行default:return false}
}

一切步骤都执行完了,现在开始执行任务

// Start 方法用来开始执行任务,并监视通道事件
func (r *Runner) Start() error {// 我们希望接收所有中断信号signal.Notify(r.interrupt, os.Interrupt)// 异步执行任务go func() {r.complete <- r.run()}()select {// 当任务处理完成时该通道会返回case err := <-r.complete:return err// 当任务处理程序运行超时时发出信号case <-r.timeout:return ErrTimeout}
}

将以上代码全部都整合到runner.go文件中

// Package runner 处理任务的运行和声明周期管理
package runnerimport ("errors""os""os/signal""time"
)// Runner 在给定的超时时间内执行一组任务
// 并且在操作系统发送中断信号时结束这些任务
type Runner struct {// interrupt channel 用来接收操作系统发送的信号interrupt chan os.Signal// complete channel 用来通知任务已经完成complete chan error// timeout channel 用来通知任务已经超时timeout <-chan time.Time// tasks 用来保存任务列表tasks []func(int)
}// ErrTimeout 定义一个超时错误, 会在人物执行超时时被返回
var ErrTimeout = errors.New("received timeout")// ErrInterrupt 定义一个中断错误, 会在收到操作系统事件的时候返回
var ErrInterrupt = errors.New("received interrupt")// New 返回一个Runner实例
func New(d time.Duration) *Runner {return &Runner{// 1个缓冲的信号通道interrupt: make(chan os.Signal, 1),// 没有缓冲的信号通道,如果没有接受者那么会阻塞complete: make(chan error),timeout:  time.After(d),}
}// Add 方法用来添加任务,这个任务是一个接收int类型的ID作为参数的函数
func (r *Runner) Add(tasks ...func(int)) {r.tasks = append(r.tasks, tasks...)
}// Start 方法用来开始执行任务,并监视通道事件
func (r *Runner) Start() error {// 我们希望接收所有中断信号signal.Notify(r.interrupt, os.Interrupt)// 异步执行任务go func() {r.complete <- r.run()}()select {// 当任务处理完成时该通道会返回case err := <-r.complete:return err// 当任务处理程序运行超时时发出信号case <-r.timeout:return ErrTimeout}}func (r *Runner) run() error {for id, task := range r.tasks {if r.gotInterrupt() {return ErrInterrupt}// 执行注册的任务task(id)}return nil
}// gotInterrupt 检查是否接收到中断信号
func (r *Runner) gotInterrupt() bool {select {// 如果有中断信号那么返回truecase <-r.interrupt:// 接收到中断信号,停止后续再接收到中断信号signal.Stop(r.interrupt)return true// 没有终端信号返回false,继续执行default:return false}
}

在main.go中进行调用

package mainimport ("log""os""time""code/runner"
)// timeout 定义程序执行超时时间,如果超过这个时间还没执行完成会失败退出.
const timeout = 3 * time.Second// 主函数入口
func main() {log.Println("Starting work.")// 调用New创建 runner对象.r := runner.New(timeout)// 向任务队列中添加需要顺序执行的任务r.Add(createTask(), createTask(), createTask())// Run 执行人物,并按照返回错误处理if err := r.Start(); err != nil {switch err {case runner.ErrTimeout:log.Println("Terminating due to timeout.")os.Exit(1)case runner.ErrInterrupt:log.Println("Terminating due to interrupt.")os.Exit(2)}}// 记录执行结果log.Println("Process ended.")
}// createTask 返回一个入参为int的函数
func createTask() func(int) {return func(id int) {log.Printf("Processor - Task #%d.", id)time.Sleep(time.Duration(id) * time.Second)}
}

源码已经放到gitee需要的自行下载:
https://gitee.com/andrewgithub/note_lab/blob/main/example/go/concurrent_mode/runner/runner.go

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/61794.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

03-13、SpringCloud Alibaba第十三章,升级篇,服务降级、熔断和限流Sentinel

SpringCloud Alibaba第十三章&#xff0c;升级篇&#xff0c;服务降级、熔断和限流Sentinel 一、Sentinel概述 1、Sentinel是什么 随着微服务的流行&#xff0c;服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点&#xff0c;从流量控制、熔断降级、系统负载保…

【服务器问题】xshell 登录远程服务器卡住( 而 vscode 直接登录不上)

打开 xshell ssh 登录远程服务器&#xff1a;卡在下面这里&#xff0c;迟迟不继续 当 SSH 连接卡在 Connection established. 之后&#xff0c;但没有显示远程终端提示符时&#xff0c;这通常意味着连接已经成功建立&#xff0c;说明不是网络连接和服务器连接问题&#xff0c;…

图片预处理技术介绍4——降噪

图片预处理 大家好&#xff0c;我是阿赵。   这一篇将两种基础的降噪算法。   之前介绍过均值模糊和高斯模糊。如果从降噪的角度来说&#xff0c;模糊算法也算是降噪的一类&#xff0c;所以之前介绍的两种模糊可以称呼为均值降噪和高斯降噪。不过模糊算法对原来的图像特征的…

Linux 网络编程之TCP套接字

前言 上一期我们对UDP套接字进行了介绍并实现了简单的UDP网络程序&#xff0c;本期我们来介绍TCP套接字&#xff0c;以及实现简单的TCP网络程序&#xff01; &#x1f389;目录 前言 1、TCP 套接字API详解 1.1 socket 1.2 bind 1.3 listen 1.4 accept 1.5 connect 2、…

AI/ML 基础知识与常用术语全解析

目录 一.引言 二.AI/ML 基础知识 1.人工智能&#xff08;Artificial Intelligence&#xff0c;AI&#xff09; (1).定义 (2).发展历程 (3).应用领域 2.机器学习&#xff08;Machine Learning&#xff0c;ML&#xff09; (1).定义 (2).学习方式 ①.监督学习 ②.无监督…

计算机网络常见面试题总结(上)

计算机网络基础 网络分层模型 OSI 七层模型是什么&#xff1f;每一层的作用是什么&#xff1f; OSI 七层模型 是国际标准化组织提出的一个网络分层模型&#xff0c;其大体结构以及每一层提供的功能如下图所示&#xff1a; 每一层都专注做一件事情&#xff0c;并且每一层都需…

蓝桥杯准备训练(lesson1,c++方向)

前言 报名参加了蓝桥杯&#xff08;c&#xff09;方向的宝子们&#xff0c;今天我将与大家一起努力参赛&#xff0c;后序会与大家分享我的学习情况&#xff0c;我将从最基础的内容开始学习&#xff0c;带大家打好基础&#xff0c;在每节课后都会有练习题&#xff0c;刚开始的练…

Unity类银河战士恶魔城学习总结(P156 Audio Settings音频设置)

【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili 教程源地址&#xff1a;https://www.udemy.com/course/2d-rpg-alexdev/ 本章节实现了音频的大小设置与保存加载 音频管理器 UI_VolumeSlider.cs 定义了 UI_VolumeSlider 类&#xff0c;用于处理与音频设置相关的…

如何为 ext2/ext3/ext4 文件系统的 /dev/centos/root 增加 800G 空间

如何为 ext2/ext3/ext4 文件系统的 /dev/centos/root 增加 800G 空间 一、引言二、检查当前磁盘和分区状态1. 使用 `df` 命令检查磁盘使用情况2. 使用 `lsblk` 命令查看分区结构3. 使用 `fdisk` 或 `parted` 命令查看详细的分区信息三、扩展逻辑卷(如果使用 LVM)1. 检查 LVM …

java调用ai模型:使用国产通义千问完成基于知识库的问答

整体介绍&#xff1a; 基于RAG&#xff08;Retrieval-Augmented Generation&#xff09;技术&#xff0c;可以实现一个高效的Java智能问答客服机器人。核心思路是将预先准备的问答QA文档&#xff08;例如Word格式文件&#xff09;导入系统&#xff0c;通过数据清洗、向量化处理…

【C++boost::asio网络编程】有关异步Server样例以及伪闭包延长连接生命周期方法的笔记

异步Server 客户端源码Session类start函数handle_readhandle_write Server类构造函数start_accepthandle_accept 可能会造成的隐患利用伪闭包延长连接的生命周期 客户端源码 #include <iostream> #include <boost/asio.hpp> #include <string> int main() {…

力扣hot100道【贪心算法后续解题方法心得】(三)

力扣hot100道【贪心算法后续解题方法心得】 十四、贪心算法关键解题思路1、买卖股票的最佳时机2、跳跃游戏3、跳跃游戏 | |4、划分字母区间 十五、动态规划什么是动态规划&#xff1f;关键解题思路和步骤1、打家劫舍2、01背包问题3、完全平方式4、零钱兑换5、单词拆分6、最长递…

【linux】(23)对象存储服务-MinIo

MinIO 是一个高性能的对象存储服务&#xff0c;兼容 Amazon S3 API。 Docker安装MinIo 前提条件 确保您的系统已经安装了 Docker。如果还没有安装 Docker&#xff0c;可以参考 Docker 官方文档进行安装。 1. 拉取 MinIO Docker 镜像 首先&#xff0c;从 Docker Hub 拉取 Mi…

MySQL有哪些日志?

MySQL主要有三种日志&#xff1a;undo log、redo log、binlog。前两种是InnoDB特有的&#xff0c;binlog是MySQL的Server层中的。 Buffer Pool buffer pool是MySQL的缓冲池&#xff0c;里面存储了数据页、索引页、undo页等&#xff08;与数据库不一致的即为脏页&#xff09;。…

机器学习周志华学习笔记-第13章<半监督学习>

机器学习周志华学习笔记-第13章&#xff1c;半监督学习&#xff1e; 卷王&#xff0c;请看目录 13半监督学习13.1 生成式方法13.2 半监督SVM13.3 基于分歧的方法13.4 半监督聚类 13半监督学习 前面我们一直围绕的都是监督学习与无监督学习&#xff0c;监督学习指的是训练样本包…

SpringCloud框架学习(第六部分:Sentinel实现熔断与限流)

目录 十四、SpringCloud Alibaba Sentinel实现熔断与限流 1.简介 2.作用 3.下载安装 4.微服务 8401 整合 Sentinel 入门案例 5.流控规则 &#xff08;1&#xff09;基本介绍 &#xff08;2&#xff09;流控模式 Ⅰ. 直接 Ⅱ. 关联 Ⅲ. 链路 &#xff08;3&#xff0…

【Java基础面试题009】Java的I/O流是什么?

相关知识补充&#xff1a;黑马-字符集、IO流&#xff08;一&#xff09;.pdf Autism_Btkrsr/Blog_md_to_pdf - 码云 - 开源中国 (gitee.com) 黑马-IO流&#xff08;二&#xff09;.pdf Autism_Btkrsr/Blog_md_to_pdf - 码云 - 开源中国 (gitee.com) 回答重点 Java的I/O&…

第六届国际科技创新学术交流会暨管理科学信息化与经济创新发展(MSIEID 2024)

重要信息 大会官网&#xff1a;msieid2024.iaecst.org &#xff08;点击了解大会&#xff0c;参会等内容&#xff09; 大会时间&#xff1a;2024年12月6-8日 大会地点&#xff1a;中国-广州 大会简介 随着全球化和信息化的不断深入&#xff0c;管理科学、信息化和经济发展…

python学opencv|读取视频(一)灰度视频制作和保存

【1】引言 上一次课学习了用opencv读取图像&#xff0c;掌握了三个函数&#xff1a;cv.imread()、cv.imshow()、cv.imwrite() 相关链接如下&#xff1a; python学opencv|读取图像-CSDN博客 这次课我们继续&#xff0c;来学习用opencv读取视频。 【2】学习资源 首先是官网…

题外话 (火影密令)

哥们&#xff01; 玩火影不&#xff01; 村里人全部评论&#xff01; 不评论的忍战李全保底&#xff01; 哥们&#xff01; 密令领了不&#xff01; “1219村里人集合”领了吗&#xff01; 100金币&#xff01; 哥们&#xff01; 我粉丝没人能上影&#xff01; 老舅说的…