SpringBoot整合RabbitMQ实现消息延迟队列

环境依赖

SpringBoot 3.1.0

JDK 17

前期准备

安装MQ:  liunx+docker+rabbitmq安装延迟队列插件

实例

实现延迟队列的一种方式是在 RabbitMQ 中使用消息延迟插件,这个插件可以让你在消息发送时设置一个延迟时间,超过这个时间后消息才会被消费者接收到。下面是 SpringBoot 整合 RabbitMQ 实现延迟队列的简单步骤:

1.添加 RabbitMQ 的 Maven 依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置 RabbitMQ

在 application.properties 配置文件中添加 RabbitMQ 的连接信息:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.virtual-host=/
# 手动应答
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
#每次从队列中取一个,轮询分发,默认是公平分发
spring.rabbitmq.listener.simple.prefetch=1
# 开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5

3.配置文件

@Configuration
public class RabbitMQOrderConfig {/*** 订单交换机*/public static final String ORDER_EXCHANGE = "order_exchange";/*** 订单队列*/public static final String ORDER_QUEUE = "order_queue";/*** 订单路由key*/public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";/*** 死信交换机*/public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";/*** 死信队列 routingKey*/public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";/*** 死信队列*/public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";/*** 延迟时间 (单位:ms(毫秒))*/public  static final Integer DELAY_TIME = 10000;/*** 创建死信交换机*/@Bean("orderDeadLetterExchange")public Exchange orderDeadLetterExchange() {return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);}/*** 创建死信队列*/@Bean("orderDeadLetterQueue")public Queue orderDeadLetterQueue() {return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();}/*** 绑定死信交换机和死信队列*/@Bean("orderDeadLetterBinding")public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();}/*** 创建订单交换机*/@Bean("orderExchange")public Exchange orderExchange() {return new TopicExchange(ORDER_EXCHANGE, true, false);}/*** 创建订单队列*/@Bean("orderQueue")public Queue orderQueue() {Map<String, Object> args = new HashMap<>(3);//消息过期后,进入到死信交换机args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);//消息过期后,进入到死信交换机的路由keyargs.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);//过期时间,单位毫秒args.put("x-message-ttl", DELAY_TIME);return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();}/*** 绑定订单交换机和队列*/@Bean("orderBinding")public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();}
}

4.定义消息实体类

定义一个消息体类,用来存储需要发送的消息:

@Slf4j
@Data
@Builder
public class OrderMessage implements Serializable {/*** 商户订单号*/private String orderId;/*** 支付宝订单号*/private String tradeNo;
}

5.定义消息发送者

定义一个 RabbitMQ 消息发送者类,用来发送消息到 RabbitMQ:

@Slf4j
@Component
public class MessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendOrderMessage(OrderMessage message) {//为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定rabbitTemplate.setMandatory(true);//开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息rabbitTemplate.setReturnsCallback(returned -> {int code = returned.getReplyCode();System.out.println("code=" + code);System.out.println("returned=" + returned);});rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", message);log.info("===============延时队列生产消息====================");log.info("发送时间:{},发送内容:{}, {}ms后执行", LocalDateTime.now(), message, RabbitMQConfig.DELAY_TIME);}
}

6.定义消息消费者

定义一个 RabbitMQ 消息消费者类,用来接收并处理消息:

@Component
@Slf4j
@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)
public class OrderMQListener {@RabbitHandlerpublic void consumer(OrderMessage orderMessage, Message message, Channel channel) throws IOException {log.info("收到消息:{}",new Date());log.info("msgTag:{}", message.getMessageProperties().getDeliveryTag());log.info("message:{}", message);log.info("content:{}", orderMessage);}
}

这里使用了 @RabbitListener 注解来将一个方法标记为一个 RabbitMQ 消息监听器,通过设置 queues 属性来指定监听的队列名称。

7.定义一个controller

@Slf4j
@Api(tags = "延迟消息接口")
@RestController
@RequestMapping("/rabbitmq_order_delay_message")
public class RabbitMQDelayMessageController {@Autowiredprivate MessageSender sender;/*** 发送消息* @return*/@RequestMapping(value = "/sendMsg", method = RequestMethod.GET)@ResponseBodypublic void sendMsg() {OrderMessage orderMessage = OrderMessage.builder().orderId(UUID.randomUUID().toString()).tradeNo(UUID.randomUUID().toString()).build();sender.sendOrderMessage(orderMessage);}
}

启动项目,请求运行结果:

总的xml:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId>
</dependency><dependency><groupId>com.xiaoleilu</groupId><artifactId>hutool-all</artifactId><version>3.0.7</version>
</dependency><dependency><groupId>io.swagger</groupId><artifactId>swagger-annotations</artifactId><version>${swagger-annotations.version}</version>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version><scope>compile</scope>
</dependency>

问题总结

1.Invalid argument, ‘x-delayed-type’ must be an existing exchange type

需要创建一个交换机

2.Connection refused: no further information

请检查配置 application.xml配置的rabbimq不生效,可以将配置放到application.properties

3.Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
这种情况:

1.消费者内部重复签收导致签收异常

​    解决方案:增加配置手动处理应答

        1.配置新增

spring.rabbitmq.listener.simple.acknowledge-mode=manual #手动签收

        2.代码里: 增加channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    public void consumer(String body, Message message, Channel channel) throws IOException {long msgTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("收到消息:" + new Date());System.out.println("msgTag=" + msgTag);System.out.println("message=" + message);System.out.println("body=" + body);channel.basicAck(msgTag, false);}catch (Exception e) {log.error("【订单延迟关闭处理异常】 接收到消息为:" + msgTag + " ,消息异常消费 : ", e);} finally {// 处理完之后手动签收(这里再次签收)channel.basicAck(msgTag, false);}}

2.已经是自动处理了,然后代码里还有手动处理channel.basicAck(msgTag, false)

​ 解决方案:去除channel.basicAck(msgTag, false)

4.Failed to convert message

消息发送和接收的方式不对 比如发送的是对象,则接收的也必须是对象,发送的是string ,接收的也必须是string

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/25158.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vyper重入漏洞解析

什么是重入攻击 Reentrancy攻击是以太坊智能合约中最具破坏性的攻击之一。当一个函数对另一个不可信合约进行外部调用时&#xff0c;就会发生重入攻击。然后&#xff0c;不可信合约会递归调用原始函数&#xff0c;试图耗尽资金。 当合约在发送资金之前未能更新其状态时&#…

Dubbo 3.x源码(20)—Dubbo服务引用源码(3)

基于Dubbo 3.1&#xff0c;详细介绍了Dubbo服务的发布与引用的源码。 此前我们学习了调用createProxy方法&#xff0c;根据服务引用参数map创建服务接口代理引用对象的整体流程&#xff0c;我们知道会调用createInvokerForRemote方法创建远程引用Invoker&#xff0c;这是Dubbo …

深入理解Python:多线程与多进程编程

深入理解Python:多线程与多进程编程 在现代软件开发中,充分利用计算机的多核处理能力来提升程序的性能是至关重要的。Python提供了多线程和多进程两种并发编程方式。本文将深入探讨这两种并发编程的基本概念、使用方法以及各自的优缺点,并通过实际代码示例展示其应用。 目…

总结七大排序算法

插入排序 直接插入排序是一种简单的插入排序法&#xff0c;其基本思想是&#xff1a;把待排序的记录按其关键码值的大小逐个插入到一个已经排好序的有序序列中&#xff0c;直到所有的记录插入完为止&#xff0c;得到一个新的有序序列 。实际中我们玩扑克牌时&#xff0c;就用了…

QML键盘事件的用法和示例

文章目录 处理单一按键处理组合键处理快捷键在QML中,键盘事件通常通过Keys对象和相关的事件处理器来管理。这些事件处理器可以直接添加到任何接受键盘输入的QML元素中。这里介绍一下各种键盘事件的处理方式。 处理单一按键 处理单一按键的方式比较简单,直接在元素的Keys对象…

Python Mistune库:Markdown解析和处理

更多Python学习内容&#xff1a;ipengtao.com Mistune是一个用于Python的快速且功能强大的Markdown解析库。它以其高性能和灵活性著称&#xff0c;能够轻松扩展和定制。Mistune支持标准的Markdown语法&#xff0c;并且可以通过插件扩展支持更多功能&#xff0c;例如数学公式、高…

数据结构严蔚敏版精简版-栈和队列以及c语言代码实现

1栈的定义和特权 栈(stack)是限定仅在表尾进行插入或删除操作的线性表。 注&#xff1a;虽然说栈的实现就是一端插入和删除&#xff0c;但不一定是在“表尾”&#xff0c;这个“表尾”是广义的。 头插法实现链栈 尾插法实现链栈 因此&#xff0c;对栈来说&#xff0c;表尾…

4. 流程控制语句

文章目录 4.1 if 条件语句4.1.1 单分支4.1.2 双分支4.1.3 多分支 4.2 退出程序4.3 case语句 4.1 if 条件语句 4.1.1 单分支 语法如下&#xff1a; if <条件表达式> then指令 fi或者 if <条件表达式>; then指令 fi使用示例&#xff1a;判断是否已经成功挂载光盘&…

JAVA 整合 RabbitMQ

maven pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/…

Web后端开发(请求-实体参数)(二)

简单实体对象&#xff1a;请求参数名与形参对象属性名相同&#xff0c;定义对象(POJO)接收即可 RequestMapping("/simplePojo") public String simplePojo(User user){System.out.println(user);return "OK"; } public class User{private String name;p…

算法——Floyd判圈算法

介绍 Floyd判圈算法用于判断一个链表中是否有环。 思想 使用快慢指针fast, slow&#xff0c;快指针每次走两步fast fast.next.next&#xff0c;慢指针每次走一步slow slow.next。当出现fast null || fast.next null时&#xff0c;说明链表不存在环&#xff0c;如果存在环…

1.2-自然语言的分布式表示-基于计数的方法

本篇笔记对应的视频链接为&#xff1a; 3-基于计数的方法表示单词-将文字转换成编号的预处理工作_哔哩哔哩_bilibili&#xff1b;4-基于计数的方法表示单词-使用共现矩阵进行单词的分布式表示_哔哩哔哩_bilibili&#xff1b;5-基于计数的方法表示单词-单词之间相似度计算_哔哩哔…

计算机网络 —— 网络层(CIDR)

计算机网络 —— 网络层&#xff08;CIDR&#xff09; CIDR的提出背景什么是CIDR基本概念划分示例应用优势 举个例子路由聚合常用数字 我们今天来看IPv4地址划分的另一种方法 —— CIDR。 CIDR的提出背景 CIDR&#xff08;无类域间路由&#xff0c;Classless Inter-Domain Ro…

大众汽车裁员加速,38万元遣散费起步

导语 大家好&#xff0c;我是社长&#xff0c;老K。专注分享智能制造和智能仓储物流等内容。 新书《智能物流系统构成与技术实践》 几周前&#xff0c;大众汽车宣布了一项新的裁员计划。 一、裁员行动与额外福利并行 大众汽车近期在裁员行动上取得了显著进展&#xff0c;其遣散…

深度解析:AI Prompt 提示词工程的兴起、争议与未来发展

PART1: 提示词工程的兴起 在人工智能领域中&#xff0c;一个新的领域——提示词工程&#xff08;prompt engineering&#xff09;——开始显露头角。 这个领域的核心在于精心设计输入&#xff0c;以引导AI模型产生特定的、期望的输出。 随着AI技术的飞速发展&#xff0c;特别…

Spark MLlib 机器学习

Spark MLlib是一个在Apache Spark上构建的机器学习库&#xff0c;用于解决大规模数据集上的机器学习问题。它提供了一组丰富的机器学习算法和工具&#xff0c;可以用于分类、回归、聚类、推荐和协同过滤等任务。同时&#xff0c;它还提供了一些特征提取、特征转换和特征选择的工…

无头+单向+非循环链表的实现

这里写目录标题 1. 链表1.1 链表的概念及结构1.2 链表的分类 2. 接口实现3. 链表的实现3.1 打印链表3.2 头插3.3 尾插3.4 头删3.5 尾删3.6 单链表查找3.7 在pos之前插入3.8 在pos之后插入3.9 删除pos位置的值3.10 删除pos位置之后的值3.11 链表的释放3.12 动态申请一个节点 4. …

《精通ChatGPT:从入门到大师的Prompt指南》第11章:Prompt与AI的未来

第11章&#xff1a;Prompt与AI的未来 11.1 技术发展的新方向 在迅速发展的人工智能领域&#xff0c;Prompt工程作为与AI模型交互的核心方式&#xff0c;正处于技术创新的前沿。未来几年&#xff0c;Prompt工程将沿着多个新方向发展&#xff0c;这些方向不仅会改变我们与AI互动…

Golang 高级面试题

在准备 Golang 高级面试时,通常会涉及到多种关键领域。本文将涵盖各个领域的具体问题示例和实现代码。 数据结构与算法 实现堆、链表、栈、队列、哈希表 1.最小堆: 最小堆是一种完全二叉树,树中每个节点的值都小于等于其子节点的值。常用于实现优先队列。 package main…

Spring AI 第三讲Embeddings(嵌入式) Model API 第一讲OpenAI 嵌入

Spring AI 支持 OpenAI 的文本嵌入模型。OpenAI 的文本嵌入测量文本字符串的相关性。嵌入是一个浮点数向量&#xff08;列表&#xff09;。两个向量之间的距离可以衡量它们之间的相关性。距离小表示关联度高&#xff0c;距离大表示关联度低。 先决条件 您需要与 OpenAI 创建一…