用go实现一个任务调度类 (泛型)

用go实现一个任务调度类 (泛型)

源码地址:
https://github.com/robinfoxnan/BirdTalkServer/blob/main/server/core/workmanager.go

1.概述

实现了一个简单的任务管理系统,允许用户定义任务和工作者,并将任务分配给工作者进行处理。这个系统旨在提供一个灵活的任务管理框架,可以根据需要动态地添加和移除工作者,以及处理任务。

2.主要功能

  1. 定义了 Task 接口和 Worker 接口,用于表示任务和工作者;
  2. 提供了基础的任务类型 BaseTask 和基础的工作者类型 BaseWorker,用户可以基于这些基础类型来实现自定义的任务和工作者。需要在 BaseTask结构上继承一个新的结构,并实现Process方法;
  3. 实现了一个泛型任务管理器 Manager,用于管理工作者并分配任务给工作者。根据最大工作者个数和任务队列长度,动态地添加工作者。提供了停止所有工作者的方法,提供了方法来等待所有工作者完成任务。

3.类型和接口

3.1Task 任务接口

type Task interface {Process()
}

任务接口定义了一个 Process() 方法,用于执行任务的处理逻辑。

3.2Worker 接口

type Worker interface {Init(id int64, taskChan chan Task, wg *sync.WaitGroup, f WorkerCleanF)Start()Stop()
}

工作者接口定义了三个方法:

  • Init() 方法用于初始化工作者。创建后,设置工作者ID,任务通道,同步组,以及一个析构函数类似的清理函数;
  • Start() 方法用于启动工作者协程,开始处理任务;
  • Stop() 方法用于停止工作者;(关闭通道)

3.3BaseTask 结构体

这是一个最基础的示例,后续自定义结构可以包含这个结构:

type BaseTask struct {Id int64
}

基础任务结构体包含一个任务 ID,实现了 Task 接口的 Process() 方法,用于执行任务的处理逻辑。

3.4BaseWorker 结构体

type BaseWorker struct {Id       int64waitGrp  *sync.WaitGrouptaskChan chan TaskcleanFun WorkerCleanFquitChan chan struct{}
}

基础工作者结构体包含工作者 ID、等待组、任务通道、清理函数和退出通道,实现了 Worker 接口的 Init()Start()Stop() 方法,用于初始化工作者、启动工作者和停止工作者。

4. Manager 结构体

type Manager[T Task, W Worker] struct {workers       map[int64]W    // 使用一个map管理各个协程maxWorkers    int64          // 最大协程数量workerCounter int64          // 使用原子方式计数taskChan      chan Task      // 任务通道lock          sync.Mutex     // map用的锁wg            sync.WaitGroup // 同步组newWorkerFunc func() W       // 用于创建泛型中工作者结构的函数exiting       int32          // 退出状态标记,防止停止过程中加入任务workerIdSeq   int64          // 协程序号,可以用雪花算法代替,一般应该够用
}

任务管理器结构体包含了一个工作者映射、最大工作者数量、工作者计数器、任务通道、互斥锁、等待组、新建工作者函数、退出标志和工作者 ID 序列,提供了方法来添加任务、移除工作者、等待所有工作者完成任务和停止所有工作者。

5. 使用示例

最简单的一个测试示例

    manager := NewManager[Task, *BaseWorker](20, NewBaseWorker)// 添加示例任务到管理器go func() {for i := 0; i < 10; i++ {var t = &BaseTask{Id: int64(i)}manager.AddTask(t)}}()time.Sleep(time.Minute * 1)manager.StopAll()// 等待所有工作者完成任务manager.Wait()

我们需要重新定义一个结构用于表示任务,通常需要更多的字段

type CustomTask struct {BaseTaskAdditionalInfo string// 这里添加更多的字段
}// 实现 Task 接口的 Run 方法,
// 必须要实现这个函数,这是任务调度的功能入口,在协程中运行
func (t *CustomTask) Process() {fmt.Printf("CustomTask with additional info '%s' is running\n", t.AdditionalInfo)// 调用父类的 Process 方法//t.BaseTask.Process()
}

重写测试:


func TestWorkers(t *testing.T) {manager := NewManager[Task, *BaseWorker](20, NewBaseWorker)// 添加示例工作者到管理器// 添加示例任务到管理器go func() {for i := 0; i < 10; i++ {var t = &BaseTask{Id: int64(i)}manager.AddTask(t)}for i := 10; i < 16; i++ {var t = &CustomTask{BaseTask: BaseTask{Id: int64(i)}, AdditionalInfo: "Custom Info"}manager.AddTask(t)}}()time.Sleep(time.Minute * 1)manager.StopAll()// 等待所有工作者完成任务manager.Wait()
}

结论

各个语言实现的这个轮子基本都差不多。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/768780.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ARM 用串口来实现灯的点亮

main.c #include "uart4.h"//封装延时函数void delay(int ms){int i,j;for(i0;i<ms;i){for(j0;j<2000;j){}}}int main(){led_init();uart4_init();//char buf[128];char *str;//char i;while(1){/*igetchar();putchar(i1);putchar(\n);putchar(\r);*/strgets(…

013_Linux(上传rz,下载sz,tar,zip,unzip)

目录 一、上传、下载 1、通过鼠标操作 &#xff08;1&#xff09;下载 &#xff08;2&#xff09;上传 2、通过命令操作 rz、sz &#xff08;1&#xff09;下载 sz &#xff08;2&#xff09;上传 rz 二、压缩、解压 1、tar命令 &#xff08;1&#xff09;压缩 &…

Jenkins hudson.plugins.git.GitSCM.ALLOW_LOCAL_CHECKOUT 属性设置问题

ERROR: Checkout of Git remote ‘local/path’ aborted because it references a local directory, which may be insecure. You can allow local checkouts anyway by setting the system property ‘hudson.plugins.git.GitSCM.ALLOW_LOCAL_CHECKOUT’ to true. Finished: F…

PMP考试备考——项目管理标准

项目的定义和特性 项目 是为创造独特的产品、服务或成果而进行的临时性工作。这意味着项目具有以下几个关键特征&#xff1a; 临时性&#xff1a;项目有一个明确的开始和结束日期&#xff0c;不同于持续运营的工作。独特性&#xff1a;每个项目都有其独特之处&#xff0c;即使…

QT----基于QT的人脸考勤系统ubuntu系统运行,编译开发板

目录 1 Ubantu编译opencv和seetaface库1.1 Ubantu编译opencv1.2 Ubuntu编译seetaface1.3 安装qt 2 更改代码2.1 直接运行报错/usr/bin/ld: cannot find -lGL: No such file or directory2.2 遇到报错摄像头打不开2.3 修改部分代码2.4 解决中文语音输出问题 3 尝试交叉编译rk358…

【 Mysql8.0 忘记登录密码 可以试试 】

** Mysql8.0 忘记登录密码 可以试试 ** 2024-3-21 段子手168 1、首先停止 mysql 服务 &#xff0c;WIN R 打开运行&#xff0c;输入 services.msc 回车打开服务&#xff0c;找到 mysql 服务&#xff0c;停止。 然后 WIN R 打开运行&#xff0c;输入 CMD 打开控制台终端输…

yolov5训练并生成rknn模型部署在RK3588开发板上,实现NPU加速推理

简介 RK3588是瑞芯微&#xff08;Rockchip&#xff09;公司推出的一款高性能、低功耗的集成电路芯片。它采用了先进的28纳米工艺技术&#xff0c;并配备了八核心的ARM Cortex-A76和Cortex-A55处理器&#xff0c;以及ARM Mali-G76 GPU。该芯片支持多种接口和功能&#xff0c;适…

构建Pytorch虚拟环境教程

构建PyTorch虚拟环境通常涉及使用诸如Anaconda或venv等工具来管理Python环境&#xff0c;以便在一个独立的空间中安装PyTorch和其他依赖项。以下是使用Anaconda创建PyTorch虚拟环境的步骤&#xff08;适用于不同操作系统&#xff0c;包括Windows、Linux和MacOS&#xff09;&…

数据分析-Pandas数据分类的转换控制

数据分析-Pandas数据分类的转换控制 数据分析和处理中&#xff0c;难免会遇到各种数据&#xff0c;那么数据呈现怎样的规律呢&#xff1f;不管金融数据&#xff0c;风控数据&#xff0c;营销数据等等&#xff0c;莫不如此。如何通过图示展示数据的规律&#xff1f; 数据表&am…

vue的URL和函数优化

URL优化 async…await 用于同步接收网络请求的结果 常规的代码 export async function articleGetAllService () {//发送异步请求&#xff0c;获取所有文章数据//同步等待服务器响应的结果&#xff0c;并返回&#xff0c;async,awaitreturn await axios.get(http://localhost:…

【 Vue.js 属性 | 生命周期 】

computed计算属性 规则&#xff1a; 1.用已有的属性计算不存在的属性 2.默认调用一次get() 3.简写时注意&#xff1a; 只有值不发生改变才可以是用简写&#xff08;函数&#xff09;&#xff0c;值发生改变必须使用对象&#xff0c;才可以配置set()方法 4.底层原理使用 Object.…

shell脚本入门练习(非常详细)零基础入门到精通,收藏这一篇就够了

【脚本1】打印形状 打印等腰三角形、直角三角形、倒直角三角形、菱形 #!/bin/bash \# 等腰三角形 read \-p "Please input the length: " n for i in \seq 1 $n\ do for ((j\$n;j>i;j--)) do echo \-n " " done for m in \seq 1 $i\ do…

淘宝1688京东...商品详情数据采集,按关键词搜索商品列表

淘宝、1688、京东等电商平台的商品详情数据采集以及按关键词搜索商品列表&#xff0c;通常可以通过以下几种方法实现&#xff1a; 请求示例&#xff0c;API接口接入Anzexi58 一、使用API接口 这些电商平台通常都提供开放API接口&#xff0c;允许开发者调用接口获取所需的数据…

【Linux】网络编程套接字一

网络编程套接字一 1.预备知识1.1理解源IP地址和目的IP地址1.2认识端口号1.3认识TCP协议1.4认识UDP协议1.5网络字节序 2.socket编程接口3.UDP网络程序3.1UDP Server服务器端3.2UDP Client客户端 4.根据UDP客户端服务端做的设计4.1字典热加载4.2shell命令行4.3聊天室 5.windows客…

WSL下Ubuntu+RTX4090安装CUDA+cuDnn+Pytorch

安装驱动 首先需要明确的是&#xff0c;在WSL下安装Ubuntu&#xff0c;如果要使用主机的GPU卡&#xff0c;只需要在主机Windows上安装驱动&#xff0c;Linux中不需要安装驱动&#xff0c;可以在Linux中使用nvidia-smi命令查看驱动版本。 安装CUDA 避坑注意事项&#xff1a;如…

网络原理(6)——IP协议

目录 一、网段划分 现在的网络划分&#xff1a; 1、一般情况下的家庭网络环境 2、IP地址 3、子网掩码 4、网关 以前的网络划分&#xff1a; 二、特殊IP 1、环回 IP 2、主机号为全 0 的IP 3、广播地址IP 三、路由选择&#xff08;路线规划&#xff09; 一、网段划分…

毕业论文降重(gpt+完美降重指令),sci论文降重gpt指令——超级好用,重复率低于4%

1. 降重方法&#xff1a;gpt降重指令 2. gpt网站 https://yiyan.baidu.com/ https://chat.openai.com/ 3. 降重指令——非常好用&#xff01;&#xff01;sci论文&#xff0c;本硕大论文都可使用&#xff01; 请帮我把下面句子重新组织&#xff0c;通过调整句子逻辑&#xff0…

通过命令在Windows入站出站放行上放行端口8090, 8443, 5222, 8021

可以通过循环结构来简化操作&#xff0c;下面分别创建入站和出站规则的示例&#xff1a; 入站规则 $ports 8090, 8443, 5222, 8021foreach ($port in $ports) {New-NetFirewallRule -DisplayName "Allow Inbound Port $($port)" -Direction Inbound -Action Allow…

day6:STM32MP157——串口通信实验

使用的是cortex A7内核 【串口通信的工作原理】 本次实验使用的是uart4的串口&#xff0c;分别使用了uart4_tx和uart4_rx两个引脚。根据板子的原理图我们可以知道&#xff0c;他们分别对应着芯片的PG11和PB2 从引脚名字也可以知道使用了GPIO口&#xff0c;所以本次实验同样需…

array go 语言的数组 /切片

内存地址通过& package mainimport "fmt"func main() {var arr [2][3]int16fmt.Println(arr)fmt.Printf("arr的地址是: %p \n", &arr)fmt.Printf("arr[0]的地址是 %p \n", &arr[0])fmt.Printf("arr[0][0]的地址是 %p \n"…