Golang RabbitMQ实现的延时队列

文章目录

  • 前言
  • 一、延时队列与应用场景
  • 二、RabbitMQ如何实现延时队列
    • 实现延时队列的基本要素
    • 整体的实现原理如下
  • 三、Go语言实战
    • 生产者
    • 消费者


前言

之前做秒杀商城项目的时候使用到了延时队列来解决订单超时问题,本博客就总结一下Golang是如何利用RabbitMQ实现的延时队列的。

一、延时队列与应用场景

延迟队列是一种特殊类型的消息队列,用于在一定时间后将消息投递给消费者。它可以用于处理需要延迟执行的任务或者具有定时特性的业务场景。使用延迟队列可以灵活地控制消息的发送和处理时间,适用于很多场景,如订单超时处理、提醒任务等

具体应用场景有如下:

  1. 订单取消:当订单生成时,将订单消息发送到延迟队列中,并设置延迟时间为十分钟。消费者在十分钟后接收到订单消息并进行关闭操作。

  2. 店铺商品提醒:在店铺创建时,将提醒消息发送到延迟队列中,并设置延迟时间为十天。消费者在十天后接收到消息并发送提醒通知。

  3. 用户登录提醒:用户注册成功后,将提醒消息发送到延迟队列中,并设置延迟时间为三天。消费者在三天后接收到消息并发送短信提醒。

  4. 退款通知:当用户发起退款时,将通知消息发送到延迟队列中,并设置延迟时间为三天。消费者在三天后接收到消息并通知相关运营人员。

  5. 会议提醒:在会议预定时,将提醒消息发送到延迟队列中,并设置延迟时间为预定时间前十分钟。消费者在指定时间点前十分钟接收到消息并发送会议参加通知。

通过使用延迟队列,可以在指定的时间点触发任务,避免了轮询的低效方式,并且能够满足大量数据和时效性的需求。这种方法提供了更高的性能和实时性,并有效减轻了系统的负载。
下图是订单超时处理的流程图。
在这里插入图片描述

二、RabbitMQ如何实现延时队列

虽然 rabbitmq 没有延时队列的功能,但是稍微变动一下也是可以实现的。
通过设置消息的 TTL 和 DLX 等参数,可以将消息转发到一个指定的队列中,以便在一定的时间后再进行处理。

实现延时队列的基本要素

1、存在一个倒计时机制:Time To Live(TTL)
2、当到达时间点的时候会触发一个发送消息的事件:Dead Letter Exchanges(DLX)

基于第一点,我利用的是消息存在过期时间这一特性, 消息一旦过期就会变成dead letter,可以让单独的消息过期,也可以设置整个队列消息的过期时间 而rabbitmq会有限取两个值的最小
**基于第二点,**是用到了rabbitmq的过期消息处理机制: . x-dead-letter-exchange 将过期的消息发送到指定的 exchange 中 . x-dead-letter-routing-key 将过期的消息发送到自定的 route当中

整体的实现原理如下

发送者将消息发送到延时队列上并设置过期时间,当过期时间到达时,消息会被自动转发到指定的交换机和队列中供接收者消费。
1、建立与 RabbitMQ 服务器的连接并创建通道。
2、发送者通过 ch.Publish 方法将消息发送到延时队列(“test_delay”)上,设置消息的过期时间。
3、延时队列中的消息在到达过期时间后会自动被发送到 “logs” 交换机,由交换机将消息广播给所有绑定的队列。
4、接收者通过监听 “test_logs” 队列接收并处理消息。当有消息到达时,会触发回调函数进行处理。
也就是说要实现延时队列,消费者必须试实现两个队列。
一个是延时队列(“test_delay”),另一个是接收延时消息的队列(“test_logs”)。

这两个队列的作用如下:
延时队列(“test_delay”):这个队列用于接收需要延时发送的消息。发送者通过将消息发送到延时队列,设置消息的过期时间。当消息过期时,RabbitMQ 会自动将消息转发到指定的交换机和队列中。
接收延时消息的队列(“test_logs”):这个队列用于接收延时消息。在示例中,这个队列是通过将 “test_logs” 队列绑定到 “logs” 交换机上来实现的。交换机会将消息广播给所有绑定的队列,因此当延时消息到达过期时间后,会被发送到这个队列中供消费者进行处理。
通过使用两个队列,消息可以被延时发送到指定的队列,并在过期后自动转发到接收队列,实现了延时发送和消费的功能。

三、Go语言实战

生产者

首先建立与 RabbitMQ 服务器的连接,并创建一个通道。然后,通过 ch.Publish 方法将消息发送到延时队列上。这里使用的是空字符串作为交换机(exchange),表示不选择任何交换机,只将消息发送到指定的队列(“test_delay”)。在消息的属性中,设置了消息的过期时间为 5 秒。


func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()body := "hello"// 将消息发送到延时队列上err = ch.Publish("", 				// exchange 这里为空则不选择 exchange"test_delay",     	// routing keyfalse,  			// mandatoryfalse,  			// immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),Expiration: "5000",	// 设置五秒的过期时间})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}

消费者

同样建立与 RabbitMQ 服务器的连接,并创建一个通道。然后,声明了一个名为 “logs” 的交换机,类型为 “fanout”,并且可持久化,表示该交换机会将消息广播给所有绑定的队列。接着,声明了一个常规的队列 “test_logs”,并将其绑定到 “logs” 交换机上。之后,声明了一个延时队列 “test_delay”,并设置了该队列的 x-dead-letter-exchange 参数为 “logs”,即当消息过期时将消息发送到 “logs” 交换机。最后,通过 ch.Consume 方法监听 “test_logs” 队列,并在回调函数中处理接收到的消息。


func main() {// 建立链接conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个主要使用的 exchangeerr = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments)failOnError(err, "Failed to declare an exchange")// 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列q, err := ch.QueueDeclare("test_logs",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)failOnError(err, "Failed to declare a queue")/*** 注意,这里是重点!!!!!* 声明一个延时队列, ß我们的延时消息就是要发送到这里*/_, errDelay := ch.QueueDeclare("test_delay",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitamqp.Table{// 当消息过期时把消息发送到 logs 这个 exchange"x-dead-letter-exchange":"logs",},   // arguments)failOnError(errDelay, "Failed to declare a delay_queue")err = ch.QueueBind(q.Name, // queue name, 这里指的是 test_logs"",     // routing key"logs", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")// 这里监听的是 test_logsmsgs, err := ch.Consume(q.Name, // queue name, 这里指的是 test_logs"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/71536.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

结构体对齐原理及在STM32中的设计原则和实现

在嵌入式系统开发中&#xff0c;结构体作为一种常见的数据组织方式&#xff0c;在内存中的布局方式对于程序性能和内存占用具有重要影响。本文将深入探讨单片机C语言中的结构体对齐原理、重要性以及不同的对齐方式&#xff0c;并通过示例演示结构体对齐如何影响内存占用、访问性…

【代码随想录】Day 50 动态规划11 (买卖股票Ⅲ、Ⅳ)

买卖股票Ⅲ https://leetcode.cn/problems/best-time-to-buy-and-sell-stock-iii/ 无语了。。。 写的很好就是怎么都过不了。。。 还是就用代码随想录的写法吧。。。 class Solution { public:int maxProfit(vector<int>& prices) {int n prices.size();vector&…

权限提升-Windows本地提权-AT+SC+PS命令-进程迁移-令牌窃取-getsystem+UAC

权限提升基础信息 1、具体有哪些权限需要我们了解掌握的&#xff1f; 后台权限&#xff0c;网站权限&#xff0c;数据库权限&#xff0c;接口权限&#xff0c;系统权限&#xff0c;域控权限等 2、以上常见权限获取方法简要归类说明&#xff1f; 后台权限&#xff1a;SQL注入,数…

DCMM数据能力成熟度评估模型--学习笔记(1)

DCMM数据能力成熟度评估模型--学习笔记 1、DCMM简介、结构组成和成熟度评估等级划分1.1 DCMM简介1.2 DCMM结构组成1.3 DCMM关键过程域1.3.1、数据战略&#xff08;指导方针&#xff09;1.3.2、数据治理 &#xff08;机制保障&#xff09;1.3.3、数据架构 (施工图纸)1.3.4、数据…

【Java】线程都有哪几种状态

文章目录 前言传统线程模型&#xff08;操作系统&#xff09;中线程状态Java线程中的状态线程的运行流程 前言 首先我们要知道&#xff0c;在传统&#xff08;操作系统&#xff09;的线程模型中线程被分为五种状态&#xff0c;在java线程中&#xff0c;线程被分为六种状态。 …

iOS 16.4更新指南:问题解答与新功能一览

我应该更新到iOS 16.4吗&#xff1f;这是许多iPhone用户在新更新可用时问自己的一个常见问题。最新的iOS版本提供了各种功能和改进&#xff0c;因此更新的诱惑力很大。 但是&#xff0c;在更新之前&#xff0c;你应该考虑几个因素&#xff0c;以确保安装过程顺利成功。这些因素…

云计算中的负载均衡技术,确保资源的平衡分配

文章目录 1. 硬件负载均衡器2. 软件负载均衡器3. DNS负载均衡4. 内容分发网络&#xff08;CDN&#xff09; &#x1f388;个人主页&#xff1a;程序员 小侯 &#x1f390;CSDN新晋作者 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 ✨收录专栏&#xff1a;云计算 ✨文章内…

Android studio 实现生成二维码和扫描二维码

效果图 build.gradle(:app)添加依赖 dependencies {implementation com.google.zxing:core:3.3.3implementation com.journeyapps:zxing-android-embedded:3.6.0implementation com.google.zxing:javase:3.0.0 }Manifests.xml <uses-permission android:name"android…

线程中的join()、wait() 和 notify()详解及练习题

一、join() Thread 类提供了 join() 方法&#xff0c;用于等待当前线程所调用的其他线程执行完毕。 1、当一个线程调用另一个线程的 join() 方法时&#xff0c;它会被阻塞&#xff0c;直到被调用的线程执行完毕或达到指定的超时时间。 比如&#xff1a;当主线程main中调用了…

“搞事情”?OpenAl将于11月召开其首届开发者大会

摘要&#xff1a;OpenAI也要召开它的第一届开发者大会了。这次活动&#xff0c;或许标志着OpenAI向其下一阶段的商业开发迈出了关键一步。 昨天&#xff0c;OpenAI宣布将于11月6日举办其首次开发者大会。在这场名为“OpenAI DevDay”的活动中&#xff0c;OpenAI的技术人员将进行…

白鲸开源 DataOps 平台加速数据分析和大模型构建

作者 | 李晨 编辑 | Debra Chen 数据准备对于推动有效的自助式分析和数据科学实践至关重要。如今&#xff0c;企业大都知道基于数据的决策是成功数字化转型的关键&#xff0c;但要做出有效的决策&#xff0c;只有可信的数据才能提供帮助&#xff0c;随着数据量和数据源的多样…

安防监控/视频存储/视频汇聚平台EasyCVR如何接入智能分析网关V4?

TSINGSEE青犀AI边缘计算网关硬件 —— 智能分析网关目前有5个版本&#xff1a;V1、V2、V3、V4、V5&#xff0c;每个版本都能实现对监控视频的智能识别和分析&#xff0c;支持抓拍、记录、告警等&#xff0c;每个版本在算法模型及性能配置上略有不同。硬件可实现的AI检测包括&am…

nowcoder NC10 大数乘法

题目链接&#xff1a; https://www.nowcoder.com/practice/c4c488d4d40d4c4e9824c3650f7d5571?tpId196&tqId37177&rp1&ru/exam/company&qru/exam/company&sourceUrl%2Fexam%2Fcompany&difficultyundefined&judgeStatusundefined&tags&tit…

222. 完全二叉树的节点个数

题目链接&#xff1a; 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 我的想法&#xff1a; 递归法 万金油--层次遍历法 当然上面两中都是笨方法&#xff0c;就算不是完全二叉树也能算&#xff0c;没有用到完全二叉树的特性。 我的代码&#xff1…

linux线程讲解

1.线程概述 一个进程在同一时刻只做一件事情&#xff0c;进程是程序执行的一个实例。 线程是操作系统能够进行运算调度的最小单位&#xff0c;一个进程中可以并发多个线程&#xff0c;每条线程并行执行不同的任务。 进程&#xff1a;资源分配的最小单位。线程&#xff0c;程…

50etf期权最多能开仓多少手?

50etf期权限仓限额的操作&#xff0c;是为了能更好防范和控制期权交易的风险&#xff0c;无论是期货还是期权&#xff0c;在交易中都有规定的持仓限额&#xff0c;不能超过某个额度&#xff0c;那么50etf期权最多能开仓多少手&#xff1f;下文为你们全面介绍&#xff01;本文来…

【数据结构】单链表详解

当我们学完顺序表的时候&#xff0c;我们发现了好多问题如下&#xff1a; 中间/头部的插入删除&#xff0c;时间复杂度为O(N)增容需要申请新空间&#xff0c;拷贝数据&#xff0c;释放旧空间。会有不小的消耗。增容一般是呈2倍的增长&#xff0c;势必会有一定的空间浪费。例如当…

纯手工总结超详细关于计算机网络的五层知识点,看看你都掌握了没

纯手工总结超详细关于计算机网络的五层知识点&#xff0c;看看你都掌握了没 文章目录 纯手工总结超详细关于计算机网络的五层知识点&#xff0c;看看你都掌握了没1.应用层1.1 HTTP协议1.1.1 URL1.1.2 HTTP方法1.1.3 HTTP请求1.1.4 HTTP状态码1.1.5 HTTP会话保持 1.2 HTTPS协议 …

Linux上安装FTP

1、登录FTP&#xff0c;执行安装命令 yum -y install vsftpd 2、启动FTP服务器&#xff0c;设置开启自启动 systemctl enable vsftpd.service systemctl start vsftpd.service systemctl status vsftpd.service #查看状态, 显示active说明FTP启动成功 3、修改FTP配置文件/et…

数据通信——传输层TCP(可靠传输机制的滑动窗口)

引言 之前提到过拥塞问题&#xff0c;如果大量数据疯狂涌入&#xff0c;接收端无法及时处理就会导致数据丢包&#xff0c;从而使得通信受到干扰。之前的连续ARQ如果不加以节制&#xff0c;疯狂发送报文&#xff0c;接收端无法及时返回ACK就会导致网络瘫痪。 滑动窗口机制协议 这…