RabbitMQ 如何使用延迟队列

RabbitMQ 如何使用延迟队列

目录

  1. 前置条件
  2. 场景描述
  3. RabbitMQ 延迟队列机制
  4. 实现步骤
    • 1. 安装 RabbitMQ 延迟队列插件
    • 2. 创建延迟队列和交换机
    • 3. 发布延迟消息
    • 4. 消费延迟消息
  5. 示例代码
    • 1. 延迟队列配置
    • 2. 发布消息的 Producer 代码
    • 3. 消费消息的 Consumer 代码
  6. 注意事项

前置条件

  • 操作系统:CentOS 7
  • RabbitMQ:版本 3.8.0+
  • Erlang:版本 21.0+
  • RabbitMQ 延迟队列插件:rabbitmq_delayed_message_exchange

场景描述

假设我们正在设计一个线上售卖电影票的系统,用户购票后有 15 分钟时间进行付款,如果用户在 15 分钟内未付款,订单将自动取消并释放电影票库存。这里,我们可以利用 RabbitMQ 的延迟队列机制,在用户购票时发送一条延迟消息到 RabbitMQ,并设定延迟时间为 15 分钟。如果用户未在 15 分钟内完成付款,延迟消息将被消费者接收并处理订单取消的逻辑。

RabbitMQ 延迟队列机制

RabbitMQ 本身不直接支持延迟队列功能,需要借助 rabbitmq_delayed_message_exchange 插件来实现。该插件为 RabbitMQ 提供了一种新的消息交换机类型——x-delayed-message,可以基于消息属性设置延迟时间,在设定的延迟时间后,将消息发送到目标队列。

实现步骤

1. 安装 RabbitMQ 延迟队列插件

# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez# 将插件移动到 RabbitMQ 插件目录
mv rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.0/plugins/# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

2. 创建延迟队列和交换机

我们将使用 x-delayed-message 类型的交换机,并设定延迟队列,用于处理延迟消息。

# 创建交换机
rabbitmqadmin declare exchange name=delayed_exchange type=x-delayed-message \arguments='{"x-delayed-type":"direct"}'# 创建队列
rabbitmqadmin declare queue name=delayed_queue# 绑定交换机与队列
rabbitmqadmin declare binding source=delayed_exchange destination=delayed_queue routing_key=order.payment

3. 发布延迟消息

在发布消息时,可以设置消息属性 x-delay 来指定延迟时间。

# 使用 rabbitmqadmin 发布延迟消息
rabbitmqadmin publish exchange=delayed_exchange routing_key=order.payment \payload="{'order_id': '12345', 'status': 'PENDING_PAYMENT'}" \properties='{"headers":{"x-delay":900000}}'

4. 消费延迟消息

消费者将从延迟队列中消费消息并执行订单取消逻辑。

示例代码

1. 延迟队列配置

在 Spring Boot 项目中,可以通过以下配置来创建延迟交换机和队列。

@Configuration
public class RabbitConfig {public static final String DELAYED_EXCHANGE_NAME = "delayed_exchange";public static final String DELAYED_QUEUE_NAME = "delayed_queue";public static final String ROUTING_KEY = "order.payment";// 创建延迟交换机@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 创建延迟队列@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}// 绑定延迟队列与交换机@Beanpublic Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTING_KEY).noargs();}
}

2. 发布消息的 Producer 代码

@Component
public class OrderProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendOrderMessage(String orderId) {Map<String, Object> message = new HashMap<>();message.put("order_id", orderId);message.put("status", "PENDING_PAYMENT");MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay", 15 * 60 * 1000); // 延迟 15 分钟Message msg = new Message(new ObjectMapper().writeValueAsBytes(message), messageProperties);rabbitTemplate.convertAndSend(RabbitConfig.DELAYED_EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, msg);}
}

3. 消费消息的 Consumer 代码

@Component
public class OrderConsumer {@RabbitListener(queues = RabbitConfig.DELAYED_QUEUE_NAME)public void processOrderCancellation(Message message) {try {Map<String, Object> orderMessage = new ObjectMapper().readValue(message.getBody(), Map.class);String orderId = (String) orderMessage.get("order_id");// 取消订单逻辑System.out.println("Order " + orderId + " has been canceled due to non-payment.");} catch (Exception e) {e.printStackTrace();}}
}

注意事项

  1. 插件兼容性:请确保 rabbitmq_delayed_message_exchange 插件与您的 RabbitMQ 版本兼容,否则可能导致插件无法加载。
  2. Erlang 版本:RabbitMQ 依赖于 Erlang,因此确保您的 Erlang 版本满足 RabbitMQ 版本的最低要求。
  3. 延迟时间限制:合理设置延迟时间,避免消息被延迟过长时间导致系统不可预测的性能问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/836742.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS怎样命名才能更好的理解

经常因为不知道怎么给css命名的小伙伴看过来 CSS命名方法对于维护可读性和易于理解的代码至关重要。这里有一个基于BEM&#xff08;Block Element Modifier&#xff09;的命名方法&#xff0c;它被广泛认为是提高CSS可维护性和可读性的有效方式&#xff1a; BEM&#xff08;B…

栈的实现与OJ括号匹配

今日备忘录: "不破不立. " 本文索引 1. 前言2. 顺序表与链表的区别3. 什么是栈4. 栈的实现5. OJ括号匹配6. 总结 1. 前言 人总是在坍塌中重建, 有些东西必须摧毁, 才能迎来新生, 不管是那些消耗你的人, 还是令你感到焦虑的事情, 还是一份你觉得毫无意义并且又不喜欢…

(车载)毫米波雷达信号处理中的恒虚警检测(CFAR)技术概述

说明 恒虚警检测(Constant False-Alarm Rate, CFAR)是雷达目标(信号)检测中很重要的一个概念&#xff0c;从事雷达相关科研或工程研发的或多或少应该都接触过。CFAR这项技术在工程实践上其实是比较简单的(至少在我了解的车载雷达领域)&#xff0c;不过这项技术也有很多可以深挖…

Ansys ACT的一个例子

由XML和IronPython文件组成&#xff0c;文件结构如下&#xff1a; ExtSample.xml <extension version"1" name"ExtSample1"><guid shortid"ExtSample1">2cc739d5-9011-400f-ab31-a59e36e5c595</guid><script src"sam…

10分钟了解Flink SQL使用

Flink 是一个流处理和批处理统一的大数据框架&#xff0c;专门为高吞吐量和低延迟而设计。开发者可以使用SQL进行流批统一处理&#xff0c;大大简化了数据处理的复杂性。本文将介绍Flink SQL的基本原理、使用方法、流批统一&#xff0c;并通过几个例子进行实践。 1、Flink SQL基…

【Linux】17. 进程间通信 --- 管道

1. 什么是进程间通信(进程间通信的目的) 数据传输&#xff1a;一个进程需要将它的数据发送给另一个进程 资源共享&#xff1a;多个进程之间共享同样的资源。 通知事件&#xff1a;一个进程需要向另一个或一组进程发送消息&#xff0c;通知它&#xff08;它们&#xff09;发生了…

Springboot自动装配源码分析

版本 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.4.RELEASE</version><relativePath/> <!-- lookup parent from repository --> </par…

基于zhdate的Python公历、农历互算

zhdate 是公历、农历换算的python工具包。 生活中有时候需要计算跟农历和天数有关的日期&#xff0c;于是对zhdate进行了封装&#xff0c;实现了如下功能&#xff1a; 1 公历 -> 公历 : 天数 2 公历 -> 农历 : 天数 3 农历 -> 公历 : 天数 4 农历 -> 农历 …

第六十节 Java设计模式 - 过滤器/标准模式

Java设计模式 - 过滤器/标准模式 过滤器模式使用不同的条件过滤对象。 这些标准可以通过逻辑操作链接在一起。 过滤器模式是一种结构型模式。 例子 import java.util.List; import java.util.ArrayList;class Employee {private String name;private String gender;private…

决策树学习记录

对于一个决策树的决策面&#xff1a; 他其实是在任意两个特征基础上对于所有的点进行一个分类&#xff0c;并且展示出不同类别的之间的决策面&#xff0c;进而可以很清楚的看出在这两个特征上各个数据点种类的分布。 对于多输出的问题&#xff0c;在利用人的上半张脸来恢复下半…

ICode国际青少年编程竞赛- Python-4级训练场-复杂嵌套for循环

ICode国际青少年编程竞赛- Python-4级训练场-复杂嵌套for循环 1、 for i in range(4):Dev.step(i6)for j in range(3):Dev.turnLeft()Dev.step(2)2、 for i in range(4):Dev.step(i3)for j in range(4):Dev.step(2)Dev.turnRight()Dev.step(-i-3)Dev.turnRight()3、 for i …

产品经理考完NPDP后有必要考PMP吗?

NPDP由美国产品开发与管理协会&#xff08;PDMA&#xff09;所发起&#xff0c;是国际公认的唯一的新产品开发专业认证。而PMP则由PMI组织和出题&#xff0c;在项目管理领域较为权威。一个产品管理&#xff0c;一个项目管理&#xff0c;很多人考了NPDP之后&#xff0c;还会再考…

知识付费课程分销系统,网课平台哪个好?你知道几个平台呢?

疫情期间&#xff0c;教育行业受到了很大的冲击&#xff0c;很多线下机构转型线上&#xff0c;就连教师也都在家做上了直播课程&#xff0c;网课平台哪个好?你知道几个平台呢? 目前的线上教学平台有企业微信、腾讯视频会议、QQ视频电话、雨课堂、钉钉。 一、企业微信 1. 平台…

Windows关闭NGINX命令

1、首先用cmd进入NGINX的目录下,输入下面命令&#xff0c;查看nginx是否启动 tasklist /fi "imagename eq nginx.exe"2、关闭nginx taskkill /f /t /im nginx.exe3、启动&#xff1a;start nginx 4、重启&#xff1a;nginx -s reload

【牛客】SQL211 获取当前薪水第二多的员工的emp_no以及其对应的薪水salary

1、描述 有一个薪水表salaries简况如下&#xff1a; 请你获取薪水第二多的员工的emp_no以及其对应的薪水salary&#xff0c; 若有多个员工的薪水为第二多的薪水&#xff0c;则将对应的员工的emp_no和salary全部输出&#xff0c;并按emp_no升序排序。 2、题目建表 drop table …

ctfshow 源码审计 web301--web305

web301 在checklogin.php 发现了 $sql"select sds_password from sds_user where sds_username".$username." order by id limit 1;";在联合查询并不存在的数据时&#xff0c;联合查询就会构造一个虚拟的数据就相当于构造了一个虚拟账户&#xff0c;可以…

iOS 更改button文字和图片的位置

1.上代码&#xff1a; [self.selectAlbumButtonsetTitleEdgeInsets:UIEdgeInsetsMake(0, -36,0,0)]; [self.selectAlbumButtonsetImageEdgeInsets:UIEdgeInsetsMake(0,80,0,0)]; [self.selectCloudDiskButtonsetTitleEdgeInsets:UIEdgeInsetsMake(0, -36,0,0)]; [self.sele…

Springboot-配置文件中敏感信息的加密:三种加密保护方法比较

一. 背景 当我们将项目部署到服务器上时&#xff0c;一般会在jar包的同级目录下加上application.yml配置文件&#xff0c;这样可以在不重新换包的情况下修改配置。 一般会将数据库连接&#xff0c;Redis连接等放到配置文件中。 例如配置数据库连接&#xff1a; spring:serv…

排序-插入排序的优化--半插入排序

半插入排序&#xff08;有时也称为二分查找插入排序&#xff09;是对传统插入排序的一种优化。基本思想是在执行插入操作时&#xff0c;不是简单地从前向后遍历已排序序列来寻找插入位置&#xff0c;而是使用二分查找法来确定新元素的正确位置&#xff0c;从而减少比较次数&…

MSMQ消息队列

MQ是一种企业服务的消息中间节技术&#xff0c;这种技术常常伴随着企业服务总线相互使用&#xff0c;构成了企业分布式开发的一部分&#xff0c;如果考虑到消息的发送和传送之间是可以相互不联系的并且需要分布式架构&#xff0c;则可以考虑使用MQ做消息的中间价技术&#xff0…