SpringBoot使用RabbitMQ实现延迟队列

SpringBoot使用RabbitMQ实现延迟队列

    • 需求和目标
    • 名词解释
    • 实现方式
    • 引入依赖
    • 添加配置文件
    • 配置类
    • 死信队列消费者
    • 即时队列消费者
    • 延迟消息发送
    • 结果
    • 注意

需求和目标

商城系统,用户下单后若15分钟内仍未完成支付,则自动取消订单,若已支付,不做其他特殊操作
系统还需要支持即时消息的功能,即发即收。

名词解释

①即时队列:即发即收
②延迟队列:发了消息,没有接收方,只有消息过期后才被处理
③死信队列:延迟队列上的消息过期后,会被自动转发到死信队列中,从而最终达到延迟的目的

实现方式

本文采用RabbitMQ自身属性:
TTL(Time To Live存活时间) + DLX(Dead-Letter-Exchange死信交换机)
实现延迟队列,先将消息发到指定了TTL时长的队列A中,队列A没有消费者,也就是说,队列A中的消息肯定会过期,等消息过期后,就会加入到队列B,也就是死信队列,B队列是有消费者在监听的,一旦收到消息,就进行后续的逻辑处理,从而达到延迟效果。
这种实现方式只能为队列设置消息延迟的时长,不能为每个消息指定延迟时长,粒度比较粗,请注意使用的业务场景!

引入依赖

<!--rabbitmq-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加配置文件

分别声明了:即时、延迟、死信的相关信息
其中,延迟和死信是相互配合形成了延迟队列

# rabbitMQ配置
mq:rabbit:host: 127.0.0.1:5672virtualHost: /username: testUserpassword: 123456normal-exchange: wms_exchange_normalnormal-queue: wms_queue_normalnormal-routing-key: wms_routing_key_normaldelay-exchange: wms_exchange_delaydelay-queue: wms_queue_delaydelay-routing-key: wms_routing_key_delaydlx-exchange: wms_exchange_dlxdlx-queue: wms_queue_dlxdlx-routing-key: wms_routing_key_dlx

配置类

package com.nwd.common.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {// 从配置文件中读取参数@Value("${mq.rabbit.host}")String HOST;@Value("${mq.rabbit.username}")String USERNAME;@Value("${mq.rabbit.password}")String PASSWORD;@Value("${mq.rabbit.normal-exchange}")String NORMAL_EXCHANGE;@Value("${mq.rabbit.normal-queue}")String NORMAL_QUEUE;@Value("${mq.rabbit.normal-routing-key}")String NORMAL_ROUTING_KEY;@Value("${mq.rabbit.delay-exchange}")String DELAY_EXCHANGE;@Value("${mq.rabbit.delay-queue}")String DELAY_QUEUE;@Value("${mq.rabbit.delay-routing-key}")String DELAY_ROUTING_KEY;@Value("${mq.rabbit.dlx-exchange}")String DLX_EXCHANGE;@Value("${mq.rabbit.dlx-queue}")String DLX_QUEUE;@Value("${mq.rabbit.dlx-routing-key}")String DLX_ROUTING_KEY;//创建mq连接@Bean(name = "connectionFactory")public ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setUsername(USERNAME);connectionFactory.setPassword(PASSWORD);//connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(true);//该方法配置多个host,在当前连接host down掉的时候会自动去重连后面的hostconnectionFactory.setAddresses(HOST);//connectionFactory.setPort(Integer.parseInt(port));return connectionFactory;}// 即时队列===========================================@Beanpublic Queue normalQueue() {return new Queue(NORMAL_QUEUE);}@Beanpublic DirectExchange normalDirectExchange(){return new DirectExchange(NORMAL_EXCHANGE);}@Beanpublic Binding normalBinding(){return BindingBuilder.bind(normalQueue()).to(normalDirectExchange()).with(NORMAL_ROUTING_KEY);}// 即时队列===========================================// 延迟队列===========================================@Beanpublic Queue delayQueue(){Map<String,Object> map = new HashMap<>();//message在该队列queue的存活时间最大为15分钟map.put("x-message-ttl", 10000*6*15);//x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)map.put("x-dead-letter-exchange", DLX_EXCHANGE);//x-dead-letter-routing-key参数是给这个DLX指定路由键map.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);return new Queue(DELAY_QUEUE,true,false,false,map);}@Beanpublic DirectExchange delayDirectExchange(){return new DirectExchange(DELAY_EXCHANGE);}@Beanpublic Binding delayBinding(){return BindingBuilder.bind(delayQueue()).to(delayDirectExchange()).with(DELAY_ROUTING_KEY);}// 延迟队列===========================================// 死信队列===========================================@Beanpublic Queue dlxQueue() {return new Queue(DLX_QUEUE);}@Beanpublic DirectExchange dlxDirectExchange(){return new DirectExchange(DLX_EXCHANGE);}@Beanpublic Binding dlxBinding(){return BindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with(DLX_ROUTING_KEY);}// 死信队列===========================================
}

死信队列消费者

package com.nwd.module.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 死信队列消息处理* 此队列消费到的,是经过延迟之后的消息* @author niuwenda* @since 2024-06-03  09:50*/
@Slf4j
@Component
@RabbitListener(queues = "${mq.rabbit.dlx-queue}")
public class DlxMsgConsumer {@RabbitHandler(isDefault = true)public void process(String msg, Message message, Channel channel) {try {// 处理消息的业务逻辑log.info("RabbitMq:死信队列接收到消息,{}",msg);// 此处应判断订单是否已完成支付,若未完成,后续继续编写取消订单逻辑// .....} catch (Exception e) {// 发生异常时,打印日志并拒绝消息(不重新放入队列)System.out.println("Error processing message: " + e.getMessage());/*try {channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception ex) {// 处理拒绝消息的异常}*/}}
}

即时队列消费者

保证系统有即发即收的功能,此处代码与订单需求无关

package com.nwd.module.mq;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** mq消息接收处理器* @author niuwenda* @since 2024-06-03  09:50*/
@Slf4j
@Component
@RabbitListener(queues = "${mq.rabbit.normal-queue}")
public class MqMsgConsumer {@RabbitHandler(isDefault = true)public void process(String msg, Message message, Channel channel) {try {// 处理消息的业务逻辑log.info("RabbitMq1:接收到消息,{}",msg);JSONObject msgObj = JSONObject.parseObject(msg);// 手动确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 发生异常时,打印日志并拒绝消息(不重新放入队列)System.out.println("Error processing message: " + e.getMessage());/*try {channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception ex) {// 处理拒绝消息的异常}*/}}
}

延迟消息发送

可以写在controller中,测试时,用接口调用来发送消息

@Resource
private RabbitTemplate rabbitTemplate;@Value("${mq.rabbit.delay-exchange}")
private String exchange;rabbitTemplate.convertAndSend(exchange, routingKey, param);
log.info("RabbitMq发送消息成功:{}", param);

结果

可看到,消息延迟了10秒收到

2024-06-03 16:09:23.640  INFO  RabbitMqUtil : RabbitMq发送消息成功:helloMQ
2024-06-03 16:09:33.655  INFO DlxMsgConsumer : RabbitMq:死信队列接收到消息,helloMQ

注意

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。

因此,不建议设置延迟时间过长的延迟消息,如果时间过长,建议使用任务调度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/22057.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

重组蛋白的定量定性方法,你了解吗?

重组蛋白的定量和定性分析是蛋白质工程和生物技术中至关重要的步骤&#xff0c;用于确保蛋白质的表达、纯度和功能性符合预期。以下是小编整理的一些常用的方法以及实验介绍&#xff0c;希望这些方法帮助研究人员详细了解重组蛋白的特性。 主要的定性方法 1 WB&#xff08;Wes…

AIGC 011-SAM第一个图像分割大模型-分割一切!

AIGC 011-SAM第一个图像分割大模型-分割一切&#xff01; 文章目录 0 论文工作1论文方法2 效果 0 论文工作 这篇论文介绍了 Segment Anything (SA) 项目&#xff0c;这是一个全新的图像分割任务、模型和数据集。SA 项目是一个具有里程碑意义的工作&#xff0c;它为图像分割领域…

基于springboot的多媒体素材库源码数据库

基于springboot的多媒体素材库源码数据库 近年来&#xff0c;信息化管理行业的不断兴起&#xff0c;使得人们的日常生活越来越离不开计算机和互联网技术。首先&#xff0c;根据收集到的用户需求分析&#xff0c;对设计系统有一个初步的认识与了解&#xff0c;确定多媒体素材库…

迎七一党史知识竞赛答题怎么做

迎七一党史知识竞赛答题&#xff0c;不仅是对于党史知识的检验&#xff0c;更是对于参赛者学习态度和综合能力的考量。在参与这类竞赛时&#xff0c;我们需要做好充分的准备&#xff0c;掌握一定的答题技巧&#xff0c;才能取得好的成绩。 首先&#xff0c;我们要深入了解竞赛…

FFmpeg播放器的相关概念【1】

播放器框架 相关术语 •容器&#xff0f;文件&#xff08;Conainer/File&#xff09;&#xff1a;即特定格式的多媒体文件&#xff0c;比如mp4、flv、mkv等。 • 媒体流&#xff08;Stream&#xff09;&#xff1a;表示时间轴上的一段连续数据&#xff0c;如一段声音数据、一段…

UFS Explorer Professional Recovery: 如何从启用了 mSATA 缓存的 Drobo 设备中恢复数据

天津鸿萌科贸发展有限公司是 UFS Explorer Professional Recovery 数据恢复软件的授权代理商。 UFS Explorer Professional Recovery 数据恢复软件提供综合性的解决方案&#xff0c;用于解决复杂的数据恢复案例&#xff0c;包括那些采用特殊存储技术的案例&#xff0c;或介质受…

上海亚商投顾:创业板指震荡收涨 超70家ST股跌停

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 沪指昨日震荡震荡&#xff0c;创业板指走势稍强&#xff0c;盘中一度涨超1%&#xff0c;黄白二线分化严重。算…

vue ts 导入 @/assets/ 红色显示的问题解决

vue ts 导入 /assets/ 红色显示的问题解决 一、问题描述 在使用的时候这样导入会出现如上的错误。 在使用的时候&#xff0c;导入的类型也没有对应的代码提示&#xff0c;说明导入有问题。 二、解决 在 tsconfig.json 中添加如下内容&#xff1a; {"compilerOptions&…

AI大模型探索之路-实战篇15: Agent智能数据分析平台之整合封装Tools和Memory功能代码

系列篇章&#x1f4a5; AI大模型探索之路-实战篇4&#xff1a;深入DB-GPT数据应用开发框架调研 AI大模型探索之路-实战篇5&#xff1a;探索Open Interpreter开放代码解释器调研 AI大模型探索之路-实战篇6&#xff1a;掌握Function Calling的详细流程 AI大模型探索之路-实战篇7…

模式识别判断题

贝叶斯估计的方法类似于贝叶斯决策&#xff0c;也需要定义损失函数。&#xff08;正确&#xff09; 解释&#xff1a;贝叶斯估计是一种基于贝叶斯定理的参数估计方法&#xff0c;它在估计参数时考虑了参数的先验分布。与贝叶斯决策类似&#xff0c;贝叶斯估计也需要定义损失函数…

46.ThreadPoolExcutor接口

线程池状态 ThreadPoolExcutor使用int高3位来表示线程池状态&#xff0c;低29位表示线程数量 状态高三位接收新任务处理阻塞队列任务说明RUNNING111YYSHUTDOWN000NY不会接收新任务&#xff0c;但会处理阻塞队列剩余任务&#xff0c;比较温和&#xff0c;已经提交的任务都会执…

15.1 测试-重要性与testing包

1. 测试的重要性 1.1 单元测试 单元测试是针对一小部分代码进行独立地测试。 单元测试的对象通常是单个函数或方法&#xff0c;而要测试的是它在接受给定的输入后&#xff0c;能否产生符合预期的输出。 单元测试的作用主要表现在以下两个方面&#xff1a; 验证程序的最小…

C++ STL-迭代器函数对象适配器

目录 一.迭代器 二. 函数对象 三. 适配器 一.迭代器 是一种通用的指针类型&#xff0c;可以用来遍历 STL 容器中的元素。 具有以下作用和意义&#xff1a; 提供一种通用的方式来访问容器中的元素。允许对不同类型的容器进行统一的操作。增强了代码的灵活性和可扩展性。 一…

The Best Toolkit 最好用的工具集

The Best Toolkit 工欲善其事&#xff0c;必先利其器&#xff0c;整理过往工作与生活中遇到的最好的工具软件 PDF合并等 PDF24 Tools PDF查看器 SumatraPDF 可以使用黑色来查看&#xff0c;相对不伤眼睛&#xff0c;也有电子书相关的阅读器 Kindle pdf裁边工具 briss 软件卸载…

【C++题解】1085 - 寻找雷劈数

问题&#xff1a;1085 - 寻找雷劈数 类型&#xff1a;for循环 题目描述&#xff1a; 把整数 3025 从中剪开分为 30 和 25 两个数&#xff0c;此时再将这两数之和平方&#xff0c;计算结果又等于原数。 (3025)(3025)55553025 &#xff0c;这样的数叫“雷劈数”。 求所有符合这…

Photoshop版本选择及系统要求

1、ps2018cc/2020cc版本 适合新手&#xff0c;增加了很多智能化操作&#xff0c;非常方便好上手。 2020&#xff1a; 2、ps2015版本 cc2015版本不论是功能还是硬件上&#xff0c;都是不二选择&#xff0c;适合于配置较低的电脑&#xff0c;该有的基本功能它都有。 3、2021/2…

std::numeric_limits::max和宏定义重复报错问题

问题描述 今天在编译Beckhoff ADS开源组件的时候发现编译报错&#xff0c;报错代码如下 long AdsDevice::ReadReqEx2(uint32_t group, uint32_t offset, size_t length, void* buffer, uint32_t* bytesRead) const {if (length > std::numeric_limits<uint32_t>::ma…

Algorand 的复兴之路:改变游戏规则,打造 RWA 第一公链

TLDR 发布 AlgoKit 2.0&#xff0c;支持 Python 原生语言&#xff0c;打造开发者友好的开发环境&#xff0c;Algorand 的开发者社区规模迅速扩大。 升级共识激励机制&#xff0c;用 ALGO 奖励共识节点参与共识的执行&#xff0c;增加 ALGO 的应用场景&#xff0c;同时进一步确…

GB28181的主动、被动的含义

GB28181有点象视频会议&#xff1a; 终端通过SIP登录服务器。管理员点击某个终端&#xff0c;进行视频。 就是这个主动、被动没有听说。于是问了同事&#xff0c;他说&#xff1a; 主动被动是从服务器角度来看的。所谓被动&#xff0c;就是服务器开一个端口&#xff0c;被动接…

钢结构乙级资质延期,企业如何降低经营风险

当企业面临钢结构乙级资质延期时&#xff0c;为了降低经营风险&#xff0c;可以采取以下措施&#xff1a; 1. 提前规划与准备 资质延续规划&#xff1a;在资质到期前&#xff0c;提前规划资质延续的相关工作&#xff0c;包括准备所需材料、明确流程和时间节点等。 项目评估&…