SpringBoot使用RabbitMQ实现延迟队列
- 需求和目标
- 名词解释
- 实现方式
- 引入依赖
- 添加配置文件
- 配置类
- 死信队列消费者
- 即时队列消费者
- 延迟消息发送
- 结果
- 注意
需求和目标
商城系统,用户下单后若15分钟内仍未完成支付,则自动取消订单,若已支付,不做其他特殊操作
系统还需要支持即时消息的功能,即发即收。
名词解释
①即时队列:即发即收
②延迟队列:发了消息,没有接收方,只有消息过期后才被处理
③死信队列:延迟队列上的消息过期后,会被自动转发到死信队列中,从而最终达到延迟的目的
实现方式
本文采用RabbitMQ自身属性:
TTL(Time To Live存活时间) + DLX(Dead-Letter-Exchange死信交换机)
实现延迟队列,先将消息发到指定了TTL时长的队列A中,队列A没有消费者,也就是说,队列A中的消息肯定会过期,等消息过期后,就会加入到队列B,也就是死信队列,B队列是有消费者在监听的,一旦收到消息,就进行后续的逻辑处理,从而达到延迟效果。
这种实现方式只能为队列设置消息延迟的时长,不能为每个消息指定延迟时长,粒度比较粗,请注意使用的业务场景!
引入依赖
<!--rabbitmq-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加配置文件
分别声明了:即时、延迟、死信的相关信息
其中,延迟和死信是相互配合形成了延迟队列
# rabbitMQ配置
mq:rabbit:host: 127.0.0.1:5672virtualHost: /username: testUserpassword: 123456normal-exchange: wms_exchange_normalnormal-queue: wms_queue_normalnormal-routing-key: wms_routing_key_normaldelay-exchange: wms_exchange_delaydelay-queue: wms_queue_delaydelay-routing-key: wms_routing_key_delaydlx-exchange: wms_exchange_dlxdlx-queue: wms_queue_dlxdlx-routing-key: wms_routing_key_dlx
配置类
package com.nwd.common.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {// 从配置文件中读取参数@Value("${mq.rabbit.host}")String HOST;@Value("${mq.rabbit.username}")String USERNAME;@Value("${mq.rabbit.password}")String PASSWORD;@Value("${mq.rabbit.normal-exchange}")String NORMAL_EXCHANGE;@Value("${mq.rabbit.normal-queue}")String NORMAL_QUEUE;@Value("${mq.rabbit.normal-routing-key}")String NORMAL_ROUTING_KEY;@Value("${mq.rabbit.delay-exchange}")String DELAY_EXCHANGE;@Value("${mq.rabbit.delay-queue}")String DELAY_QUEUE;@Value("${mq.rabbit.delay-routing-key}")String DELAY_ROUTING_KEY;@Value("${mq.rabbit.dlx-exchange}")String DLX_EXCHANGE;@Value("${mq.rabbit.dlx-queue}")String DLX_QUEUE;@Value("${mq.rabbit.dlx-routing-key}")String DLX_ROUTING_KEY;//创建mq连接@Bean(name = "connectionFactory")public ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setUsername(USERNAME);connectionFactory.setPassword(PASSWORD);//connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(true);//该方法配置多个host,在当前连接host down掉的时候会自动去重连后面的hostconnectionFactory.setAddresses(HOST);//connectionFactory.setPort(Integer.parseInt(port));return connectionFactory;}// 即时队列===========================================@Beanpublic Queue normalQueue() {return new Queue(NORMAL_QUEUE);}@Beanpublic DirectExchange normalDirectExchange(){return new DirectExchange(NORMAL_EXCHANGE);}@Beanpublic Binding normalBinding(){return BindingBuilder.bind(normalQueue()).to(normalDirectExchange()).with(NORMAL_ROUTING_KEY);}// 即时队列===========================================// 延迟队列===========================================@Beanpublic Queue delayQueue(){Map<String,Object> map = new HashMap<>();//message在该队列queue的存活时间最大为15分钟map.put("x-message-ttl", 10000*6*15);//x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)map.put("x-dead-letter-exchange", DLX_EXCHANGE);//x-dead-letter-routing-key参数是给这个DLX指定路由键map.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);return new Queue(DELAY_QUEUE,true,false,false,map);}@Beanpublic DirectExchange delayDirectExchange(){return new DirectExchange(DELAY_EXCHANGE);}@Beanpublic Binding delayBinding(){return BindingBuilder.bind(delayQueue()).to(delayDirectExchange()).with(DELAY_ROUTING_KEY);}// 延迟队列===========================================// 死信队列===========================================@Beanpublic Queue dlxQueue() {return new Queue(DLX_QUEUE);}@Beanpublic DirectExchange dlxDirectExchange(){return new DirectExchange(DLX_EXCHANGE);}@Beanpublic Binding dlxBinding(){return BindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with(DLX_ROUTING_KEY);}// 死信队列===========================================
}
死信队列消费者
package com.nwd.module.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 死信队列消息处理* 此队列消费到的,是经过延迟之后的消息* @author niuwenda* @since 2024-06-03 09:50*/
@Slf4j
@Component
@RabbitListener(queues = "${mq.rabbit.dlx-queue}")
public class DlxMsgConsumer {@RabbitHandler(isDefault = true)public void process(String msg, Message message, Channel channel) {try {// 处理消息的业务逻辑log.info("RabbitMq:死信队列接收到消息,{}",msg);// 此处应判断订单是否已完成支付,若未完成,后续继续编写取消订单逻辑// .....} catch (Exception e) {// 发生异常时,打印日志并拒绝消息(不重新放入队列)System.out.println("Error processing message: " + e.getMessage());/*try {channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception ex) {// 处理拒绝消息的异常}*/}}
}
即时队列消费者
保证系统有即发即收的功能,此处代码与订单需求无关
package com.nwd.module.mq;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** mq消息接收处理器* @author niuwenda* @since 2024-06-03 09:50*/
@Slf4j
@Component
@RabbitListener(queues = "${mq.rabbit.normal-queue}")
public class MqMsgConsumer {@RabbitHandler(isDefault = true)public void process(String msg, Message message, Channel channel) {try {// 处理消息的业务逻辑log.info("RabbitMq1:接收到消息,{}",msg);JSONObject msgObj = JSONObject.parseObject(msg);// 手动确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 发生异常时,打印日志并拒绝消息(不重新放入队列)System.out.println("Error processing message: " + e.getMessage());/*try {channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception ex) {// 处理拒绝消息的异常}*/}}
}
延迟消息发送
可以写在controller中,测试时,用接口调用来发送消息
@Resource
private RabbitTemplate rabbitTemplate;@Value("${mq.rabbit.delay-exchange}")
private String exchange;rabbitTemplate.convertAndSend(exchange, routingKey, param);
log.info("RabbitMq发送消息成功:{}", param);
结果
可看到,消息延迟了10秒收到
2024-06-03 16:09:23.640 INFO RabbitMqUtil : RabbitMq发送消息成功:helloMQ
2024-06-03 16:09:33.655 INFO DlxMsgConsumer : RabbitMq:死信队列接收到消息,helloMQ
注意
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息,如果时间过长,建议使用任务调度。