SpringBoot整合RabbitMQ实现消息延迟队列

环境依赖

SpringBoot 3.1.0

JDK 17

前期准备

安装MQ:  liunx+docker+rabbitmq安装延迟队列插件

实例

实现延迟队列的一种方式是在 RabbitMQ 中使用消息延迟插件,这个插件可以让你在消息发送时设置一个延迟时间,超过这个时间后消息才会被消费者接收到。下面是 SpringBoot 整合 RabbitMQ 实现延迟队列的简单步骤:

1.添加 RabbitMQ 的 Maven 依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置 RabbitMQ

在 application.properties 配置文件中添加 RabbitMQ 的连接信息:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.virtual-host=/
# 手动应答
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
#每次从队列中取一个,轮询分发,默认是公平分发
spring.rabbitmq.listener.simple.prefetch=1
# 开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5

3.配置文件

@Configuration
public class RabbitMQOrderConfig {/*** 订单交换机*/public static final String ORDER_EXCHANGE = "order_exchange";/*** 订单队列*/public static final String ORDER_QUEUE = "order_queue";/*** 订单路由key*/public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";/*** 死信交换机*/public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";/*** 死信队列 routingKey*/public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";/*** 死信队列*/public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";/*** 延迟时间 (单位:ms(毫秒))*/public  static final Integer DELAY_TIME = 10000;/*** 创建死信交换机*/@Bean("orderDeadLetterExchange")public Exchange orderDeadLetterExchange() {return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);}/*** 创建死信队列*/@Bean("orderDeadLetterQueue")public Queue orderDeadLetterQueue() {return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();}/*** 绑定死信交换机和死信队列*/@Bean("orderDeadLetterBinding")public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();}/*** 创建订单交换机*/@Bean("orderExchange")public Exchange orderExchange() {return new TopicExchange(ORDER_EXCHANGE, true, false);}/*** 创建订单队列*/@Bean("orderQueue")public Queue orderQueue() {Map<String, Object> args = new HashMap<>(3);//消息过期后,进入到死信交换机args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);//消息过期后,进入到死信交换机的路由keyargs.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);//过期时间,单位毫秒args.put("x-message-ttl", DELAY_TIME);return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();}/*** 绑定订单交换机和队列*/@Bean("orderBinding")public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();}
}

4.定义消息实体类

定义一个消息体类,用来存储需要发送的消息:

@Slf4j
@Data
@Builder
public class OrderMessage implements Serializable {/*** 商户订单号*/private String orderId;/*** 支付宝订单号*/private String tradeNo;
}

5.定义消息发送者

定义一个 RabbitMQ 消息发送者类,用来发送消息到 RabbitMQ:

@Slf4j
@Component
public class MessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendOrderMessage(OrderMessage message) {//为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定rabbitTemplate.setMandatory(true);//开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息rabbitTemplate.setReturnsCallback(returned -> {int code = returned.getReplyCode();System.out.println("code=" + code);System.out.println("returned=" + returned);});rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", message);log.info("===============延时队列生产消息====================");log.info("发送时间:{},发送内容:{}, {}ms后执行", LocalDateTime.now(), message, RabbitMQConfig.DELAY_TIME);}
}

6.定义消息消费者

定义一个 RabbitMQ 消息消费者类,用来接收并处理消息:

@Component
@Slf4j
@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)
public class OrderMQListener {@RabbitHandlerpublic void consumer(OrderMessage orderMessage, Message message, Channel channel) throws IOException {log.info("收到消息:{}",new Date());log.info("msgTag:{}", message.getMessageProperties().getDeliveryTag());log.info("message:{}", message);log.info("content:{}", orderMessage);}
}

这里使用了 @RabbitListener 注解来将一个方法标记为一个 RabbitMQ 消息监听器,通过设置 queues 属性来指定监听的队列名称。

7.定义一个controller

@Slf4j
@Api(tags = "延迟消息接口")
@RestController
@RequestMapping("/rabbitmq_order_delay_message")
public class RabbitMQDelayMessageController {@Autowiredprivate MessageSender sender;/*** 发送消息* @return*/@RequestMapping(value = "/sendMsg", method = RequestMethod.GET)@ResponseBodypublic void sendMsg() {OrderMessage orderMessage = OrderMessage.builder().orderId(UUID.randomUUID().toString()).tradeNo(UUID.randomUUID().toString()).build();sender.sendOrderMessage(orderMessage);}
}

启动项目,请求运行结果:

总的xml:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId>
</dependency><dependency><groupId>com.xiaoleilu</groupId><artifactId>hutool-all</artifactId><version>3.0.7</version>
</dependency><dependency><groupId>io.swagger</groupId><artifactId>swagger-annotations</artifactId><version>${swagger-annotations.version}</version>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version><scope>compile</scope>
</dependency>

问题总结

1.Invalid argument, ‘x-delayed-type’ must be an existing exchange type

需要创建一个交换机

2.Connection refused: no further information

请检查配置 application.xml配置的rabbimq不生效,可以将配置放到application.properties

3.Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
这种情况:

1.消费者内部重复签收导致签收异常

​    解决方案:增加配置手动处理应答

        1.配置新增

spring.rabbitmq.listener.simple.acknowledge-mode=manual #手动签收

        2.代码里: 增加channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    public void consumer(String body, Message message, Channel channel) throws IOException {long msgTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("收到消息:" + new Date());System.out.println("msgTag=" + msgTag);System.out.println("message=" + message);System.out.println("body=" + body);channel.basicAck(msgTag, false);}catch (Exception e) {log.error("【订单延迟关闭处理异常】 接收到消息为:" + msgTag + " ,消息异常消费 : ", e);} finally {// 处理完之后手动签收(这里再次签收)channel.basicAck(msgTag, false);}}

2.已经是自动处理了,然后代码里还有手动处理channel.basicAck(msgTag, false)

​ 解决方案:去除channel.basicAck(msgTag, false)

4.Failed to convert message

消息发送和接收的方式不对 比如发送的是对象,则接收的也必须是对象,发送的是string ,接收的也必须是string

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/25158.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vyper重入漏洞解析

什么是重入攻击 Reentrancy攻击是以太坊智能合约中最具破坏性的攻击之一。当一个函数对另一个不可信合约进行外部调用时&#xff0c;就会发生重入攻击。然后&#xff0c;不可信合约会递归调用原始函数&#xff0c;试图耗尽资金。 当合约在发送资金之前未能更新其状态时&#…

Dubbo 3.x源码(20)—Dubbo服务引用源码(3)

基于Dubbo 3.1&#xff0c;详细介绍了Dubbo服务的发布与引用的源码。 此前我们学习了调用createProxy方法&#xff0c;根据服务引用参数map创建服务接口代理引用对象的整体流程&#xff0c;我们知道会调用createInvokerForRemote方法创建远程引用Invoker&#xff0c;这是Dubbo …

总结七大排序算法

插入排序 直接插入排序是一种简单的插入排序法&#xff0c;其基本思想是&#xff1a;把待排序的记录按其关键码值的大小逐个插入到一个已经排好序的有序序列中&#xff0c;直到所有的记录插入完为止&#xff0c;得到一个新的有序序列 。实际中我们玩扑克牌时&#xff0c;就用了…

Python Mistune库:Markdown解析和处理

更多Python学习内容&#xff1a;ipengtao.com Mistune是一个用于Python的快速且功能强大的Markdown解析库。它以其高性能和灵活性著称&#xff0c;能够轻松扩展和定制。Mistune支持标准的Markdown语法&#xff0c;并且可以通过插件扩展支持更多功能&#xff0c;例如数学公式、高…

数据结构严蔚敏版精简版-栈和队列以及c语言代码实现

1栈的定义和特权 栈(stack)是限定仅在表尾进行插入或删除操作的线性表。 注&#xff1a;虽然说栈的实现就是一端插入和删除&#xff0c;但不一定是在“表尾”&#xff0c;这个“表尾”是广义的。 头插法实现链栈 尾插法实现链栈 因此&#xff0c;对栈来说&#xff0c;表尾…

算法——Floyd判圈算法

介绍 Floyd判圈算法用于判断一个链表中是否有环。 思想 使用快慢指针fast, slow&#xff0c;快指针每次走两步fast fast.next.next&#xff0c;慢指针每次走一步slow slow.next。当出现fast null || fast.next null时&#xff0c;说明链表不存在环&#xff0c;如果存在环…

1.2-自然语言的分布式表示-基于计数的方法

本篇笔记对应的视频链接为&#xff1a; 3-基于计数的方法表示单词-将文字转换成编号的预处理工作_哔哩哔哩_bilibili&#xff1b;4-基于计数的方法表示单词-使用共现矩阵进行单词的分布式表示_哔哩哔哩_bilibili&#xff1b;5-基于计数的方法表示单词-单词之间相似度计算_哔哩哔…

计算机网络 —— 网络层(CIDR)

计算机网络 —— 网络层&#xff08;CIDR&#xff09; CIDR的提出背景什么是CIDR基本概念划分示例应用优势 举个例子路由聚合常用数字 我们今天来看IPv4地址划分的另一种方法 —— CIDR。 CIDR的提出背景 CIDR&#xff08;无类域间路由&#xff0c;Classless Inter-Domain Ro…

大众汽车裁员加速,38万元遣散费起步

导语 大家好&#xff0c;我是社长&#xff0c;老K。专注分享智能制造和智能仓储物流等内容。 新书《智能物流系统构成与技术实践》 几周前&#xff0c;大众汽车宣布了一项新的裁员计划。 一、裁员行动与额外福利并行 大众汽车近期在裁员行动上取得了显著进展&#xff0c;其遣散…

深度解析:AI Prompt 提示词工程的兴起、争议与未来发展

PART1: 提示词工程的兴起 在人工智能领域中&#xff0c;一个新的领域——提示词工程&#xff08;prompt engineering&#xff09;——开始显露头角。 这个领域的核心在于精心设计输入&#xff0c;以引导AI模型产生特定的、期望的输出。 随着AI技术的飞速发展&#xff0c;特别…

无头+单向+非循环链表的实现

这里写目录标题 1. 链表1.1 链表的概念及结构1.2 链表的分类 2. 接口实现3. 链表的实现3.1 打印链表3.2 头插3.3 尾插3.4 头删3.5 尾删3.6 单链表查找3.7 在pos之前插入3.8 在pos之后插入3.9 删除pos位置的值3.10 删除pos位置之后的值3.11 链表的释放3.12 动态申请一个节点 4. …

《精通ChatGPT:从入门到大师的Prompt指南》第11章:Prompt与AI的未来

第11章&#xff1a;Prompt与AI的未来 11.1 技术发展的新方向 在迅速发展的人工智能领域&#xff0c;Prompt工程作为与AI模型交互的核心方式&#xff0c;正处于技术创新的前沿。未来几年&#xff0c;Prompt工程将沿着多个新方向发展&#xff0c;这些方向不仅会改变我们与AI互动…

Transformer学习之SwinTransformer

1.算法简介 本文主要参考自以下链接&#xff0c;整理成线上的形式用于备忘&#xff0c;排版太麻烦了直接贴图&#xff0c;参考的朋友慎重&#xff0c;不如直接看参考链接&#xff0c;后期有了新的理解继续更正。 参考链接1&#xff1a;Swin-Transformer网络结构详解_swin tran…

【文件导出2】导出html文件数据

导出html文件数据 文章目录 导出html文件数据前言一、实现代码1.controller层2.接口层3.接口实现类4.FileUtil 工具类 二、文件导出效果总结 前言 springBoot项目实现在线导出html文件数据的功能。 一、实现代码 1.controller层 GetMapping("/record/_export") Ap…

Flutter中同步与异步

一&#xff0c;同步/异步的理解 1&#xff0c;await&#xff1a;同步机制 同步操作会阻止其他操作执行&#xff0c;直到完成为止。同步就好比打电话一样&#xff0c;打电话时都是一个人在说另一个人听&#xff0c;一个人在说的时候另一个人等待&#xff0c;等另一个人说完后再…

【Git】远程操作 -- 详解

一、理解分布式版本控制系统 我们目前所说的所有内容&#xff08;工作区、暂存区、版本库等等&#xff09;都是在本地&#xff0c;也就是在我们的笔记本或者计算机上。而我们的 Git 其实是分布式版本控制系统。 上面这段话是什么意思呢&#xff1f; 可以简单理解为&#xff1…

USB (2)

USB transaction 以2.0的枚举过程为例。 首先是TOKEN TRANSACTION&#xff0c;其次是DATA TRANSACTION&#xff0c;再次是Handshake Transaction。 上面的SETUP TRANSACTION是TOKEN TRANSACTION的一种。另外三种是OUT, IN, SOF。 在每个TRANSACTION中又包含了3个STAGE&#x…

如何在恢复出厂设置后从 Android 恢复照片

在某些情况下&#xff0c;您可能会考虑将 Android 设备恢复出厂设置。需要注意的是&#xff0c;恢复出厂设置后&#xff0c;所有设置、用户数据甚至应用程序数据都将被清除。因此&#xff0c;如果您将 Android 设备恢复出厂设置&#xff0c;甚至在里面留下了一些珍贵的照片&…

java判断对象是否还在被引用

1、代码取消强引用后&#xff0c;gc回收对象 public static void main(String[] args) {Object obj new Object();WeakReference<Object> weakRef new WeakReference<>(obj);System.out.println(weakRef.get());obj null; // 取消强引用,后续gc会被回收,如果不…

1.基于-LABVIEW的自动售卖机开发(前面板)

1.项目简介 随着科技的进步和人们生活节奏的加快&#xff0c;自动售卖机在日常生活中扮演着越来越重要的角色。它们不仅提高了商品购买的便捷性&#xff0c;还节省了人力成本。为了实现更加智能化和高效的售卖服务&#xff0c;本项目旨在开发一款基于LabVIEW平台的自动售卖机系…