RocketMQ笔记(六)SpringBoot整合RocketMQ发送延迟消息

目录

    • 一、简介
      • 1.1、延迟级别
    • 二、Maven依赖
    • 三、application配置
    • 四、生产者
      • 4.1、同步发送延迟消息
      • 4.2、异步发送延迟消息
    • 五、延迟级别修改
      • 5.1、 修改Broker端配置
      • 5.2、 通过Broker的运维命令修改
      • 5.3、 规则遵循

一、简介

  在之前的文章中,我讲过了,同步发送单条消息,异步发送单条消息,发送单向消息,发送顺序消息,以及批量发送消息,今天我们讲讲延迟消息。延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发。使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。

1.1、延迟级别

  Apache RocketMQ 一共支持18个等级的延迟投递,具体时间如下:

投递等级(delay level)延迟时间投递等级(delay level)延迟时间
11s106min
25s117min
310s128min
430s139min
51min1410min
62min1520min
73min1630min
84min171h
95min182h

二、Maven依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq</artifactId><groupId>com.alian</groupId><version>1.0.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>06-send-delay-message</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>com.alian</groupId><artifactId>common-rocketmq-dto</artifactId><version>1.0.0-SNAPSHOT</version></dependency></dependencies></project>

  父工程已经在我上一篇文章里,通用公共包也在我上一篇文章里有说明,包括消费者。具体参考:RocketMQ笔记(一)SpringBoot整合RocketMQ发送同步消息

三、application配置

application.properties

server.port=8006# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默认的生产者组
rocketmq.producer.group=delay_group
# 发送同步消息超时时间
rocketmq.producer.send-message-timeout=3000
# 用于设置在消息发送失败后,生产者是否尝试切换到下一个服务器。设置为 true 表示启用,在发送失败时尝试切换到下一个服务器
rocketmq.producer.retry-next-server=true
# 用于指定消息发送失败时的重试次数
rocketmq.producer.retry-times-when-send-failed=3
# 设置消息压缩的阈值,为0表示禁用消息体的压缩
rocketmq.producer.compress-message-body-threshold=0

四、生产者

  在 RocketMQ 中,RocketMQTemplatesyncSend方法,它允许你批量发送同步消息,主要参数:

  • topic:主题
  • Message:消息内容
  • timeout:发送超时时间
  • delayLevel:延迟级别

测试类都引入依赖

	@Autowiredprivate RocketMQTemplate rocketMQTemplate;

4.1、同步发送延迟消息

    @Testpublic void syncSendStringMessageWithBuilderDelayLevel() {String topic = "string_message_topic";String message = "我是一条同步延迟文本消息:syncSendStringMessageWithBuilderDelayLevel";Message<String> msg = MessageBuilder.withPayload(message)// 消息类型.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build();// 3秒发送超时,延迟级别为3(也就是要10秒后才能被消费者消费)SendResult sendResult = rocketMQTemplate.syncSend(topic, msg, 3000, 3);//会覆盖Message中的消息延迟级别log.info("同步发送返回的结果:{}", sendResult);}

运行结果:
生产者

2024-03-12 19:39:08.982  INFO 19476 --- [           main] com.alian.delay.SendDelayMessageTest     : 同步发送延迟消息返回的结果:SendResult [sendStatus=SEND_OK, msgId=7F0000014C1418B4AAC23CDD7F240000, offsetMsgId=C0A800EA00002A9F0000000000063D8D, messageQueue=MessageQueue [topic=string_message_topic, brokerName=broker-a, queueId=0], queueOffset=1]

消费者

2024-03-12 19:39:18.987  INFO 6844 --- [_GROUP_STRING_1] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 我是一条同步延迟文本消息:syncSendStringMessageWithBuilderDelayLevel

从上面的结果可以看到延迟级别为3时,生产消息和消费到消息的是时间大概为10秒钟。

4.2、异步发送延迟消息

    @Testpublic void asyncSendStringMessageWithBuilderDelayLevel() {String topic = "string_message_topic";String message = "我是一条异步延迟文本消息:asyncSendStringMessageWithBuilderDelayLevel";Message<String> msg = MessageBuilder.withPayload(message)// 设置消息类型.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build();// 3秒发送超时,延迟级别为3(也就是要10秒后才能被消费者消费)rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 异步发送成功的回调逻辑log.info("异步消息发送文本消息成功: " + sendResult);}@Overridepublic void onException(Throwable e) {// 异步发送失败的回调逻辑log.info("异步消息发送文本消息失败: " + e.getMessage());}}, 3000, 4);}

运行结果:
生产者

2024-03-12 19:41:06.800  INFO 19068 --- [ublicExecutor_1] com.alian.delay.SendDelayMessageTest     : 异步延迟消息发送文本消息成功: SendResult [sendStatus=SEND_OK, msgId=7F0000014A7C18B4AAC23CDF4B3D0000, offsetMsgId=C0A800EA00002A9F000000000006409A, messageQueue=MessageQueue [topic=string_message_topic, brokerName=broker-a, queueId=1], queueOffset=0]

消费者

2024-03-12 19:41:36.778  INFO 6844 --- [_GROUP_STRING_2] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 我是一条异步延迟文本消息:asyncSendStringMessageWithBuilderDelayLevel

从上面的结果可以看到延迟级别为4时,生产消息和消费到消息的是时间大概为30秒钟。

五、延迟级别修改

  虽说之前的延迟级别能满足很多情况下的需求,但是总有些特殊的要求,比如要超过2个小时之类的,能否自定义呢?RocketMQ 5.0 解除了 4.x 版本延时消息延迟级别的时间限制,现在生产者可以设置任意延迟时间。我提供两种方式:

5.1、 修改Broker端配置

  这种方式需要修改Broker的配置文件,重启Broker后新配置才会生效。

  • 停止Broker
  • 修改${ROCKETMQ_HOME}/conf/broker.conf配置文件中的messageDelayLevel参数
  • messageDelayLevel的值是一个字符串,代表着允许设置的延迟级别,默认为"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
  • 比如要新增1分30秒的延迟级别,可以修改为"1s 5s 10s 30s 1m 90s 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
  • 保存配置文件,重启Broker使配置生效

  既然知道批量消息是作为一个整体的,那么肯定就会对消息大小有限制,在Apache RocketMQ中,

5.2、 通过Broker的运维命令修改

  另一种方式是不重启Broker,通过Broker的运维命令行工具updateMessageDelayLevel动态更新。

  • 通过控制台或命令行连接Broker的自带运维工具
  • 执行updateMessageDelayLevel newDelayLevel 命令,比如updateMessageDelayLevel “1s 5s 10s 1m30s 2m”
  • 此时无需重启,Broker就会动态切换为新的延迟级别配置

5.3、 规则遵循

  无论使用哪种方式,自定义延迟级别时需要遵守一些规则:

  • delayLevel字符串用空格分隔每个级别,不支持其他分隔符,每个级别的延迟用数字加单位组成,支持单位s/m/h/d分别表示秒/分/小时/天
  • 级别必须从小到大排列,不允许重复或无序
  • 理论上级别的范围是1s~2年,超过2年视为非法配置

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/808656.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端小白的学习之路(Vue2 四)

提示&#xff1a;学习vue2的第四天&#xff0c;笔记记录&#xff1a;混入(mixins),插槽(slot),过渡与动画(transition) 目录 一、混入(mixins) 二、插槽(solt) 1.匿名插槽 2.具名插槽 三、过渡与动画(transition) 1.过渡 1&#xff09;单元素过渡 Ⅰ.通用类名 Ⅱ.指定…

每天学点儿Python(6) -- 列表和枚举

列表是Python中内置的可变序列&#xff0c;类使用C/C中的数组&#xff0c;使用 [ ] 定义列表&#xff0c;列表中的元素与元素之间用英文逗号&#xff08; , &#xff09;分隔&#xff0c; 但是Python中列表可以存储任意类型的数据&#xff0c;且可以混存&#xff08;即类型可以…

图层、窗口、画布、视图

本文内容主要参考《Android图形显示系统》 图形显示系统会涉及到图层、窗口、画布和视图等概念&#xff0c;下面分别对它们进行简单介绍。 1&#xff09;图层&#xff1a;图层是SurfaceFlinger中的概念&#xff0c;使用Layer表示&#xff0c;SurfaceFlinger在合成最终显示的图…

项目管理-人情世故

综述&#xff1a;对于事业生活&#xff0c;人情世故我觉得在生活工作中比较重要&#xff0c;下面是我说的自己想法&#xff0c;有啥不对的&#xff0c;可以一起沟通确认。 事业上 总的来说&#xff0c;我们大多数为人情世故大家觉得没啥&#xff0c;实际上我觉得也挺重要的。…

Partisia Blockchain 何以落地隐私技术的高能场景应用?

致力于隐私保护、互操作性和可持续创新的 Layer1 区块链新星&#xff0c;Partisia Blockchain 以安全公平标榜&#xff0c;带给加密用户无忧交易的体验环境。对于这样一个融合零知识证明&#xff08;ZK&#xff09;技术和多方计算&#xff08;MPC&#xff09;的全新项目来说&am…

18_SPI通信外设

SPI通信外设 SPI通信外设SPI外设简介SPI框图SPI基本结构主模式全双工连续传输非连续传输 SPI通信外设 SPI外设简介 STM32内部集成了硬件SPI收发电路&#xff0c;可以由硬件自动执行时钟生成、数据收发等功能&#xff0c;减轻CPU的负担 可配置8位/16位数据帧、高位先行/低位先…

TypeScript尚硅谷学习

第二章&#xff1a;面向对象 面向对象是程序中一个非常重要的思想&#xff0c;它被很多同学理解成了一个比较难&#xff0c;比较深奥的问题&#xff0c;其实不然。面向对象很简单&#xff0c;简而言之就是程序之中所有的操作都需要通过对象来完成。 举例来说&#xff1a; 操作…

wsl 2在windows11上的设置

详细参考&#xff1a;Manual installation steps for older versions of WSL | Microsoft Learn 1.系统组件要打开 分别是&#xff1a;Hyper-V、虚拟机平台、适用于Windows的Linux子系统 2.以管理员方式运行命令行&#xff0c;逐步执行下面的命令 update to WSL 2, you must…

opc ua 环境构建(记录一)

1、准备 Siemens Simatic WinCC v7.5 二、配置 SIMATIC NET与S7-200 SMART 集成以太网口OPC 通信(TIA平台) 硬件: ①S7-200 SMART ②PC 机 ( 集成以太网卡) 软件: ① STEP 7-Micro/WIN SMART V2.1 ② STEP 7 Professional(TIA Portal V13 SP1 Upd 9) ③ SIMATIC NET …

在直播间卖云,云厂商终于“疯了”

图片&#xff5c;电影《疯狂的石头》截图 ©自象限原创 作者丨程心 云厂商们&#xff0c;在直播间打起来了&#xff01; 继阿里云在罗永浩直播间亮相、京东云硬刚友商之后&#xff0c;腾讯云也开始在“直播间”送起了福利。 4月8日&#xff0c;腾讯云发布新一代AIGC存…

设计基于锁的并发数据结构

1. 线程安全的栈容器 #include <exception> #include <memory> #include <mutex> #include <stack>struct empty_stack : std::exception {const char *what () const throw(); };template <typename T> class threadsafe_stack { private:std:…

记录vite打包并上传到npm

开始 起因&#xff1a;我们单位这个项目用的vitereact使用print打印 开发环境没问题、一到打包时就卡住、所以我就想单独打包成组件在引用看看还有问题么、结果还真可以&#xff01;又是离谱的一天 首先需要把npm的分支切换成官网地址、因为只有官网地址才能登陆npm账号 这里说…

FreeRTOS学习 -- 移植

一、添加FreeRTOS源码 在基础工程中新建一个名为FreeRTOS的文件夹&#xff0c;创建FreeRTOS文件夹以后将FreeRTOS的源码添加到这个文件夹中。 portable 文件夹&#xff0c;只需要保留keil、MemMang 和 RVDS这三个文件夹&#xff0c;其他的都可以删除掉。 移植FreeRTOSConfig…

SimOne协作版正式发布!“云+端”一体化,加速自动驾驶技术迭代!

创新的“云端”一体化方案 让11大于2 两端登录 场景共享 本地算法 云端并发 颠覆传统自动驾驶研发工作方式 加速自动驾驶算法迭代与优化 SimOne协作版正式发布&#xff01; 什么是SimOne协作版&#xff1f; SimOne协作版&#xff0c;一个创新的“云端”一体化产品。 它将…

【数学建模】机器人避障问题

已知&#xff1a; 正方形5的左下顶点坐标 ( 80 , 60 ) (80,60) (80,60)&#xff0c;边长 150 150 150机器人与障碍物的距离至少超过 10 10 10个单位规定机器人的行走路径由直线段和圆弧组成&#xff0c;其中圆弧是机器人转弯路径。机器人不能折线转弯&#xff0c;转弯路径由与…

如何在rosbag中获取第一帧数据

文章目录 1. 找出感兴趣的话题名2. 在一个终端中启动rosbag play3. 在另一个终端中使用rostopic echo4. 继续播放bag文件&#xff1a; 1. 找出感兴趣的话题名 首先&#xff0c;你需要知道你感兴趣的话题名称。可以通过rosbag info your_bagfile.bag来查看bag文件中包含的话题。…

代码随想录-算法训练营day02【数组02:滑动窗口、螺旋矩阵】

代码随想录-035期-算法训练营【博客笔记汇总表】-CSDN博客 https://docs.qq.com/doc/DUGRwWXNOVEpyaVpG?uc71ed002e4554fee8c262b2a4a4935d8977.有序数组的平方 &#xff0c;209.长度最小的子数组 &#xff0c;59.螺旋矩阵II &#xff0c;总结 建议大家先独立做题&#xff0c;…

基于 MATLAB 和 App Designer 的 UI 交互框架开发的一款电力系统潮流计算工具

基于 MATLAB 和 App Designer 的 UI 交互框架开发的一款电力系统潮流计算工具 文章目录 基于 MATLAB 和 App Designer 的 UI 交互框架开发的一款电力系统潮流计算工具一、软件介绍二、软件功能1、数据输入 2、潮流作业设置3、 潮流结果报表及可视化三、 软件设计思路1 、牛顿拉…

【Vue3语法单文件——自用】

1. Vue3基础语法 <script setup> import { ref,computed } from vue// 定义响应式的变量 const count ref(0) const author ref({name: John Doe,books: [Vue 2 - Advanced Guide,Vue 3 - Basic Guide,Vue 4 - The Mystery] }) //定义props const props defineProps(…

为什么forEach中的await不起作用

在JavaScript的forEach方法中使用await是无效的&#xff0c;因为forEach方法不支持异步操作的等待。 forEach是一个数组的遍历方法&#xff0c;它会对数组中的每个元素依次执行提供的回调函数。而在JavaScript中&#xff0c;await关键字只能在异步函数(async函数)中使用&#…