目录
- 一、简介
- 1.1、延迟级别
- 二、Maven依赖
- 三、application配置
- 四、生产者
- 4.1、同步发送延迟消息
- 4.2、异步发送延迟消息
- 五、延迟级别修改
- 5.1、 修改Broker端配置
- 5.2、 通过Broker的运维命令修改
- 5.3、 规则遵循
一、简介
在之前的文章中,我讲过了,同步发送单条消息,异步发送单条消息,发送单向消息,发送顺序消息,以及批量发送消息,今天我们讲讲延迟消息。延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发。使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。
1.1、延迟级别
Apache RocketMQ 一共支持18个等级的延迟投递,具体时间如下:
投递等级(delay level) | 延迟时间 | 投递等级(delay level) | 延迟时间 |
---|---|---|---|
1 | 1s | 10 | 6min |
2 | 5s | 11 | 7min |
3 | 10s | 12 | 8min |
4 | 30s | 13 | 9min |
5 | 1min | 14 | 10min |
6 | 2min | 15 | 20min |
7 | 3min | 16 | 30min |
8 | 4min | 17 | 1h |
9 | 5min | 18 | 2h |
二、Maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq</artifactId><groupId>com.alian</groupId><version>1.0.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>06-send-delay-message</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>com.alian</groupId><artifactId>common-rocketmq-dto</artifactId><version>1.0.0-SNAPSHOT</version></dependency></dependencies></project>
父工程已经在我上一篇文章里,通用公共包也在我上一篇文章里有说明,包括消费者。具体参考:RocketMQ笔记(一)SpringBoot整合RocketMQ发送同步消息
三、application配置
application.properties
server.port=8006# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默认的生产者组
rocketmq.producer.group=delay_group
# 发送同步消息超时时间
rocketmq.producer.send-message-timeout=3000
# 用于设置在消息发送失败后,生产者是否尝试切换到下一个服务器。设置为 true 表示启用,在发送失败时尝试切换到下一个服务器
rocketmq.producer.retry-next-server=true
# 用于指定消息发送失败时的重试次数
rocketmq.producer.retry-times-when-send-failed=3
# 设置消息压缩的阈值,为0表示禁用消息体的压缩
rocketmq.producer.compress-message-body-threshold=0
四、生产者
在 RocketMQ 中,RocketMQTemplate的syncSend方法,它允许你批量发送同步消息,主要参数:
- topic:主题
- Message:消息内容
- timeout:发送超时时间
- delayLevel:延迟级别
测试类都引入依赖
@Autowiredprivate RocketMQTemplate rocketMQTemplate;
4.1、同步发送延迟消息
@Testpublic void syncSendStringMessageWithBuilderDelayLevel() {String topic = "string_message_topic";String message = "我是一条同步延迟文本消息:syncSendStringMessageWithBuilderDelayLevel";Message<String> msg = MessageBuilder.withPayload(message)// 消息类型.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build();// 3秒发送超时,延迟级别为3(也就是要10秒后才能被消费者消费)SendResult sendResult = rocketMQTemplate.syncSend(topic, msg, 3000, 3);//会覆盖Message中的消息延迟级别log.info("同步发送返回的结果:{}", sendResult);}
运行结果:
生产者
2024-03-12 19:39:08.982 INFO 19476 --- [ main] com.alian.delay.SendDelayMessageTest : 同步发送延迟消息返回的结果:SendResult [sendStatus=SEND_OK, msgId=7F0000014C1418B4AAC23CDD7F240000, offsetMsgId=C0A800EA00002A9F0000000000063D8D, messageQueue=MessageQueue [topic=string_message_topic, brokerName=broker-a, queueId=0], queueOffset=1]
消费者
2024-03-12 19:39:18.987 INFO 6844 --- [_GROUP_STRING_1] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 我是一条同步延迟文本消息:syncSendStringMessageWithBuilderDelayLevel
从上面的结果可以看到延迟级别为3时,生产消息和消费到消息的是时间大概为10秒钟。
4.2、异步发送延迟消息
@Testpublic void asyncSendStringMessageWithBuilderDelayLevel() {String topic = "string_message_topic";String message = "我是一条异步延迟文本消息:asyncSendStringMessageWithBuilderDelayLevel";Message<String> msg = MessageBuilder.withPayload(message)// 设置消息类型.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build();// 3秒发送超时,延迟级别为3(也就是要10秒后才能被消费者消费)rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 异步发送成功的回调逻辑log.info("异步消息发送文本消息成功: " + sendResult);}@Overridepublic void onException(Throwable e) {// 异步发送失败的回调逻辑log.info("异步消息发送文本消息失败: " + e.getMessage());}}, 3000, 4);}
运行结果:
生产者
2024-03-12 19:41:06.800 INFO 19068 --- [ublicExecutor_1] com.alian.delay.SendDelayMessageTest : 异步延迟消息发送文本消息成功: SendResult [sendStatus=SEND_OK, msgId=7F0000014A7C18B4AAC23CDF4B3D0000, offsetMsgId=C0A800EA00002A9F000000000006409A, messageQueue=MessageQueue [topic=string_message_topic, brokerName=broker-a, queueId=1], queueOffset=0]
消费者
2024-03-12 19:41:36.778 INFO 6844 --- [_GROUP_STRING_2] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 我是一条异步延迟文本消息:asyncSendStringMessageWithBuilderDelayLevel
从上面的结果可以看到延迟级别为4时,生产消息和消费到消息的是时间大概为30秒钟。
五、延迟级别修改
虽说之前的延迟级别能满足很多情况下的需求,但是总有些特殊的要求,比如要超过2个小时之类的,能否自定义呢?RocketMQ 5.0 解除了 4.x 版本延时消息延迟级别的时间限制,现在生产者可以设置任意延迟时间。我提供两种方式:
5.1、 修改Broker端配置
这种方式需要修改Broker的配置文件,重启Broker后新配置才会生效。
- 停止Broker
- 修改${ROCKETMQ_HOME}/conf/broker.conf配置文件中的messageDelayLevel参数
- messageDelayLevel的值是一个字符串,代表着允许设置的延迟级别,默认为"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
- 比如要新增1分30秒的延迟级别,可以修改为"1s 5s 10s 30s 1m 90s 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
- 保存配置文件,重启Broker使配置生效
既然知道批量消息是作为一个整体的,那么肯定就会对消息大小有限制,在Apache RocketMQ中,
5.2、 通过Broker的运维命令修改
另一种方式是不重启Broker,通过Broker的运维命令行工具updateMessageDelayLevel动态更新。
- 通过控制台或命令行连接Broker的自带运维工具
- 执行updateMessageDelayLevel newDelayLevel 命令,比如updateMessageDelayLevel “1s 5s 10s 1m30s 2m”
- 此时无需重启,Broker就会动态切换为新的延迟级别配置
5.3、 规则遵循
无论使用哪种方式,自定义延迟级别时需要遵守一些规则:
- delayLevel字符串用空格分隔每个级别,不支持其他分隔符,每个级别的延迟用数字加单位组成,支持单位s/m/h/d分别表示秒/分/小时/天
- 级别必须从小到大排列,不允许重复或无序
- 理论上级别的范围是1s~2年,超过2年视为非法配置