文章目录
- 基于Spring Boot的RabbitMQ延时队列技术实现
- 延时队列应用场景
- 基本概念
- 实现延时队列
- 添加依赖
- 基础配置
- 配置类设计
- 消息生产者
- 消息消费者
- 两种TTL设置方式
- 订单超时关闭实例
- 订单服务
- 消息处理
- 延迟消息插件
- 安装插件
- 配置延迟交换机
基于Spring Boot的RabbitMQ延时队列技术实现
延时队列应用场景
- 订单系统:30分钟未支付订单自动取消
1. 用户下单 → 发送延时消息(30分钟TTL)
2. 消息进入普通队列等待
3. 30分钟后消息过期 → 转入死信队列
4. 消费者检查订单状态:- 未支付 → 执行关闭操作- 已支付 → 忽略
- 定时通知:预约提醒服务
场景:会议开始前15分钟提醒
1. 创建会议时发送延时消息
2. 消息存活直到会议开始前15分钟
3. 触发通知服务发送提醒
- 异步重试:失败任务延时重试机制
消息处理失败时:
1. 首次失败 → 延时5秒重试
2. 二次失败 → 延时30秒重试
3. 三次失败 → 进入死信队列人工处理
- 物流跟踪:预计送达时间状态更新
基本概念
延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延迟任务:设置在一定时间之后才执行的任务
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
- 消息被拒绝且不重新入队:消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
- 消息过期:消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
- 队列达到最大长度:要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过 dead-letter-exchange
属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称 DLX)。
RabbitMQ 本身没有直接的延时队列功能,通常是通过死信队列和**TTL(Time-To-Live)**来实现的。
[生产者] → [普通队列(设置TTL)] → (消息过期)→ [死信队列] → [消费者]
实现延时队列
添加依赖
<!-- amqp 依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Mybatis-Plus包 -->
<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.1</version>
</dependency>
<!-- MySQL驱动包 -->
<dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope>
</dependency>
<!-- lombok包 -->
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
基础配置
server:port: 8080
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/smbms?useUnicode=true&characterEncoding=UTF-8&useSSL=falseusername: rootpassword: rootrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest
mybatis-plus:type-aliases-package: com.hz.pojo #类型别名所在的包#控制台打印sql语句configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImplmap-underscore-to-camel-case: false # 驼峰映射
死信队列三要素
- DLX (Dead-Letter-Exchange):死信转发交换机
- DLK (Dead-Letter-Routing-Key):死信路由键
- TTL (Time-To-Live):消息存活时间
配置类设计
@Configuration
public class RabbitMQConfig {// 业务交换机public static final String BUSINESS_EXCHANGE = "business.exchange";// 业务队列public static final String BUSINESS_QUEUE = "business.queue";// 死信交换机public static final String DLX_EXCHANGE = "dlx.exchange";// 死信队列public static final String DLX_QUEUE = "dlx.queue";// 业务队列路由键private static final String BUSINESS_ROUTING_KEY = "business.key";// 死信路由键private static final String DLX_ROUTING_KEY = "dlx.key";// 声明业务交换机(直连型)@Beanpublic DirectExchange businessExchange() {return new DirectExchange(BUSINESS_EXCHANGE);}// 声明死信交换机@Beanpublic DirectExchange dlxExchange() {return new DirectExchange(DLX_EXCHANGE);}// 声明业务队列(绑定死信属性)@Beanpublic Queue businessQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DLX_EXCHANGE); // 设置死信交换机args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); // 设置死信路由键args.put("x-message-ttl", 10000); // 队列统一TTL(单位:毫秒)return new Queue(BUSINESS_QUEUE, true, false, false, args);}// 声明死信队列@Beanpublic Queue dlxQueue() {return new Queue(DLX_QUEUE);}// 绑定业务队列到交换机@Beanpublic Binding businessBinding() {return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(BUSINESS_ROUTING_KEY);}// 绑定死信队列到交换机@Beanpublic Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);}
}
消息生产者
@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送延时消息* @param message 消息内容* @param ttl 单位:秒*/public void sendDelayMessage(String message, int ttl) {// 消息属性设置MessagePostProcessor processor = message -> {message.getMessageProperties().setExpiration(String.valueOf(ttl * 1000)); // 消息级别TTLreturn message;};rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE,RabbitMQConfig.BUSINESS_ROUTING_KEY,message,processor);}
}
消息消费者
@Component
public class MessageConsumer {@Autowiredprivate BillService billService;@RabbitListener(queues = RabbitMQConfig.DLX_QUEUE)public void processDelayMessage(String billCode) {System.out.println("收到延时消息:" + billCode);billService.closeBill(billCode);System.out.println("超时未支付,订单已关闭--------------");}
}
两种TTL设置方式
队列级别TTL
args.put("x-message-ttl", 10000);
队列中所有消息统一过期时间;消息实际存活时间 = 队列TTL;性能更优(RabbitMQ统一处理)
消息级别TTL
message.getMessageProperties().setExpiration("5000");
每个消息可以设置不同TTL;实际存活时间取最小值(队列TTL vs 消息TTL);需要逐个处理消息,性能开销较大
订单超时关闭实例
订单服务
@Service
public class BillService {@Autowiredprivate MessageProducer messageProducer;@Resourceprivate BillMapper billMapper;public void createBill(Bill bill) {// 保存订单到数据库bill.setIsPayment(1); // 设置初始状态 1:未支付 2:已支付 3:已关闭billMapper.insert(bill);// 发送延时消息(10s)messageProducer.sendDelayMessage(bill.getBillCode(), 10);}public void closeBill(String billCode) {Bill bill = billMapper.selectOne(new QueryWrapper<Bill>().eq("billCode", billCode));if (bill != null && bill.getIsPayment() == 1) {bill.setIsPayment(3);billMapper.updateById(bill);}}
}
消息处理
@RestController
@RequestMapping("/bill")
public class BillController {@Autowiredprivate BillService billService;@GetMapping("/send")public String send(){// 创建测试订单Bill bill = new Bill();bill.setBillCode("BILL2025_999");bill.setProductName("可口可乐");// 创建账单并发送延时消息billService.createBill(bill);return "订单创建成功,10秒后未支付将自动关闭。订单号:" + bill.getBillCode();}
}
流程:
-
访问
localhost:8080/bill/send
创建测试订单 -
订单初始状态为待支付(1)
-
消息经过10秒延迟进入死信队列
-
消费者处理消息时检查订单状态
-
若仍为未支付状态,更新为已关闭(3)
延迟消息插件
RabbitMQ 提供了官方插件 rabbitmq_delayed_message_exchange
,它允许你发送延迟消息而无需设置消息的 TTL 和死信队列。这个插件提供了一个新的交换机类型 x-delayed-message
,可以用来实现消息的延迟投递。
安装插件
可以从 RabbitMQ 的插件页面下载,或者直接使用以下命令进行安装(假设 RabbitMQ 安装在默认位置):
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
安装完成后,重启 RabbitMQ 服务。
配置延迟交换机
@Bean
public CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
}// 发送消息时设置延迟头
rabbitTemplate.convertAndSend("delayed.exchange", "routing.key", message, msg -> {msg.getMessageProperties().setHeader("x-delay", 5000);return msg;
});