golang实现延迟队列(delay queue)

golang实现延迟队列

1 延迟队列:邮件提醒、订单自动取消

延迟队列:处理需要在未来某个特定时间执行的任务。这些任务被添加到队列中,并且指定了一个执行时间,只有达到指定的时间点时才能从队列中取出并执行。
应用场景:

  • 邮件提醒
  • 订单自动取消(超过多少时间未支付,就取消订单)
  • 对超时任务的处理等

由于任务的执行是在未来的某个时间点,因此这些任务不会立即执行,而是存储在队列中,直到它的预定执行时间才会被执行。

2 实现

2.1 simple简单版:go自带的time包实现

思路:

  1. 定义Task结构体,包含
  • ExecuteTime time.Time
  • Job func()
  1. 定义DelayQueue
  • TaskQueue []Task
  • func AddTask
  • func RemoveTask
  • ExecuteTask

这种方案存在的问题:

Go程序重启时,存储在slice中的延迟处理任务将全部丢失

完整代码:

package mainimport ("fmt""time"
)/*
基于go实现延迟队列
*/
type Task struct {ExecuteTime time.TimeJob         func()
}type DelayQueue struct {Tasks []*Task
}func (d *DelayQueue) AddTask(t *Task) {d.Tasks = append(d.Tasks, t)
}func (d *DelayQueue) RemoveTask() {//FIFO: remove the first task to enqueued.Tasks = d.Tasks[1:]
}func (d *DelayQueue) ExecuteTask() {for len(d.Tasks) > 0 {//dequeue a taskcurrentTask := d.Tasks[0]if time.Now().Before(currentTask.ExecuteTime) {//if the task execution time is not up, waittime.Sleep(currentTask.ExecuteTime.Sub(time.Now()))}//execute the taskcurrentTask.Job()//remove task who has been executedd.RemoveTask()}}func main() {fmt.Println("start delayQueue")delayQueue := &DelayQueue{}firstTask := &Task{ExecuteTime: time.Now().Add(time.Second * 1),Job: func() {fmt.Println("executed task 1 after delay")},}delayQueue.AddTask(firstTask)secondTask := &Task{ExecuteTime: time.Now().Add(time.Second * 7),Job: func() {fmt.Println("executed task 2 after delay")},}delayQueue.AddTask(secondTask)delayQueue.ExecuteTask()fmt.Println("all tasks have been done!!!")
}

效果:
在这里插入图片描述

2.2 complex持久版:go+redis

为了防止Go重启后存储到delayQueue的数据丢失,我们可以将任务持久化到redis中。

思路:

  1. 初始化redis连接
  2. 延迟队列采用redis的zset(有序集合)实现

前置准备:

# 安装docker
yum install -y yum-utils
yum-config-manager \--add-repo \https://download.docker.com/linux/centos/docker-ce.repo
yum install docker
systemctl start docker# docker搭建redis
mkdir -p /Users/ziyi2/docker-home/redis
docker run -d --name redis -v /Users/ziyi2/docker-home/redis:/data -p 6379:6379 redis

完整代码:

package mainimport ("fmt""github.com/go-redis/redis"log "github.com/ziyifast/log""time"
)/*
基于redis zset实现延迟队列
*/
var redisdb *redis.Client
var DelayQueueKey = "delay-queue"func initClient() (err error) {redisdb = redis.NewClient(&redis.Options{Addr:     "localhost:6379",Password: "", // not set passwordDB:       0,  //use default db})_, err = redisdb.Ping().Result()if err != nil {log.Errorf("%v", err)return err}return nil
}func main() {err := initClient()if err != nil {log.Errorf("init redis client err: %v", err)return}addTaskToQueue("task1", time.Now().Add(time.Second*3).Unix())addTaskToQueue("task2", time.Now().Add(time.Second*8).Unix())//执行队列中的任务getAndExecuteTask()
}// executeTime为unix时间戳,作为zset中的score。允许redis按照task应该执行时间来进行排序
func addTaskToQueue(task string, executeTime int64) {err := redisdb.ZAdd(DelayQueueKey, redis.Z{Score:  float64(executeTime),Member: task,}).Err()if err != nil {panic(err)}
}// 从redis中取一个task并执行
func getAndExecuteTask() {for {tasks, err := redisdb.ZRangeByScore(DelayQueueKey, redis.ZRangeBy{Min:    "-inf",Max:    fmt.Sprintf("%d", time.Now().Unix()),Offset: 0,Count:  1,}).Result()if err != nil {time.Sleep(time.Second * 1)continue}//处理任务for _, task := range tasks {fmt.Println("Execute task: ", task)//执行完任务之后用 ZREM 移除该任务redisdb.ZRem(DelayQueueKey, task)}time.Sleep(time.Second * 1)}
}

效果:

redis一直从延迟队列中取数据,如果处理完一批则睡眠1s

  • 具体根据大家的业务调整,此处主要介绍思路

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/696598.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智慧驿站_智慧文旅驿站_轻松的驿站智慧公厕_5G智慧公厕驿站_5G模块化智慧公厕

多功能城市智慧驿站是在智慧城市建设背景下,所涌现的一种创新型社会配套设施。其中,智慧公厕作为城市智慧驿站的重要功能基础,具备社会配套不可缺少的特点,所以在应用场景上,拥有广泛的需求和要求。那么,城…

#12解决request中getReader()和getInputStream()只能调用一次的问题

目录 1、背景 2、解决方案 2.1、自定义HttpServletRequestWrapper 2.2、JsonRequestHeaderParamsHelper 2.3、HttpServletRequestReplacedFilter 2.4、使用 1、背景 当前系统Content-Type为application/json,参数接收方式采用RequestBody和RequestParam&#…

平时积累的FPGA知识点(10)

平时在FPGA群聊等积累的FPGA知识点,第10期: 41 ZYNQ系列芯片的PL中使用PS端送过来的时钟,这些时钟名字是自动生成的吗? 解释:是的。PS端设置的是ps_clk,用report_clocks查出来的时钟名变成了clk_fpga_0&a…

Linux篇:进程

一. 前置知识 1.1冯诺依曼体系结构 我们常见的计算机,如笔记本。我们不常见的计算机,如服务器,大部分都遵守冯诺依曼体系 为什么计算机要采用冯诺依曼体系呢? 在计算机出现之前有很多人都提出过计算机体系结构,但最…

时序数据库TimescaleDB,实战部署全攻略

📢📢📢📣📣📣 哈喽!大家好,我是【IT邦德】,江湖人称jeames007,10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】!😜&am…

C++ Primer 笔记(总结,摘要,概括)——第5章 语句

目录 5.1 简单语句 5.2 语句作用域 5.3 条件语句 5.3.1 if语句 5.3.2 switch语句 5.4 迭代语句 5.4.1 while语句 5.4.2 传统的for语句 5.4.3 范围for语句 5.4.4 do while语句 5.5 跳转语句 5.5.1 break语句 5.5.2 continue语句 5.5.3 goto语句 5.6 try语句块和异常处理 5…

2024华北医院信息网络大会第二轮更新通知

大会背景 近年来,我国医疗行业信息化取得了飞跃式的发展,医疗信息化对医疗行业有着重要的支撑作用。2021年国家卫健委、中医药管理局联合印发《公立医院高质量发展促进行动(2021-2025年)》,提出重点建设“三位一体”智…

【青龙】快速搭建青龙面板,部署属于你自己的应用!

青龙面板是一个支持 Python3、JavaScript、Shell、Typescript 的定时任务管理平台。 废话不多说,直接开始。 这里使用一台 雨云 的云服务器作为演示。雨云注册地址:https://www.rainyun.com/ 优惠码:lz932 使用优惠码注册后绑定微信可获得8折…

【Chrono Engine学习总结】4-vehicle-4.3-两个vehicle碰撞测试

由于Chrono的官方教程在一些细节方面解释的并不清楚,自己做了一些尝试,做学习总结。 今天突发奇想,想试一下,是否可以实现两个vehicle的碰撞? 1、两辆vehicle的仿真 官方提供了demo_VEH_TwoCars这个demo&#xff0c…

C++入门04 函数的参数传递、引用类型与重载

图源:文心一言 听课笔记简单整理,供小伙伴们参考,包含以下内容“🐋3.11 引用类型、🐋3.14 内联函数、🐋3.15 默认参数值、🐋3.16 函数重载、🐋3.17 C系统函数”~🥝&…

LabVIEW多通道压力传感器实时动态检测

LabVIEW多通道压力传感器实时动态检测 介绍了一种基于LabVIEW的多通道压力传感器实时动态检测系统,解决压阻式压力传感器温度补偿过程的复杂度,提高测量的准确性。通过自动轮询检测方法,结合硬件检测模型和多通道检测系统设计,本…

集合框架之List集合

目录 ​编辑 一、什么是UML 二、集合框架 三、List集合 1.特点 2.遍历方式 3.删除 4.优化 四、迭代器原理 五、泛型 六、装拆箱 七、ArrayList、LinkedList和Vector的区别 ArrayList和Vector的区别 LinkedList和Vector的区别 一、什么是UML UML(Unif…

基于ORB-SLAM2与YOLOv8剔除动态特征点(三种方法)

基于ORB-SLAM2与YOLOv8剔除动态特征点(三种方法) 写上篇文章时测试过程比较乱,写的时候有些地方有点失误,所以重新写了这篇 本文内容均在RGB-D环境下进行程序测试 本文涉及到的动态特征点剔除速度均是以https://cvg.cit.tum.de/data/datasets/rgbd-dat…

编写程序,实现shell功能——项目训练——day08

c c今天做了一个实战项目训练,编写一个程序,实现shell功能,我们称之为minishell。 主要是利用Linux中IO接口实现,实现的功能有: 1.ls ls -a ls -l cd cp mv pwd c…

软件License授权原理

软件License授权原理 你知道License是如何防止别人破解的吗?本文将介绍License的生成原理,理解了License的授权原理你不但可以防止别人破解你的License,你甚至可以研究别人的License找到它们的漏洞。喜欢本文的朋友建议收藏关注,…

【Linux】进程状态

进程状态 进程状态的简要介绍运行状态进程排队 阻塞状态挂起状态Linux中的进程状态 进程状态的简要介绍 进程状态指的是一个操作系统中正在运行的进程当前所处的状态。根据不同的操作系统,进程状态可能会有一些细微的差别,但最主要的是以下三种状态 运行…

Java——方法的使用

目录 一.方法的概念及使用 1 什么是方法(method) 2.方法定义 3 方法调用的执行过程 4 实参和形参的关系(重要) 5.没有返回值的方法 二.方法重载 1.为什么需要方法重载 2.方法重载概念 3.方法签名 三.递归 1.递归的概念 2.递归执行过程分析 3. 递归练习 一.方法的…

猫头虎分享已解决Bug || 容器编排问题:OrchestrationFailure, ContainerManagementError

博主猫头虎的技术世界 🌟 欢迎来到猫头虎的博客 — 探索技术的无限可能! 专栏链接: 🔗 精选专栏: 《面试题大全》 — 面试准备的宝典!《IDEA开发秘籍》 — 提升你的IDEA技能!《100天精通鸿蒙》 …

【Python】【VS Code】VS Code中python.json和setting.json文件配置说明

目录 1. python.json配置 2. setting.json配置 3. 解决中文乱码 4. 实现效果 1. python.json配置 python.json 获取步骤:文件 -> 首选项 -> 配置用户代码片段 -> python 此为VS Code的头文件设置,复制以下内容到 python.json {"HEADER…

个人做抖店如何能够快速起店?掌握好技巧是关键!建议收藏!

大家好,我是电商小布。 相信我们每个朋友在店铺开通后,最关心的事情就是小店成功起店了。 那么个人做抖店想要快速起店,该怎么来进行操作呢? 接下来,小布重点给大家说三点: 首先来说一下小店的主体类型…