RocketMQ 延迟消息

RocketMQ 延迟消息

RocketMQ 消费者启动流程

什么是延迟消息

RocketMQ 延迟消息是指,生产者发送消息给消费者消息,消费者需要等待一段时间后才能消费到。

使用场景

用户下单之后,15分钟未支付,对支付账单进行提醒或者关单处理。

RocketMQ 开源版本的消息不支持任意时间精度,只支持5s 10s 1m等等。

Broker 如何处理延迟消息

消息投递如下:

  1. 生产者发送一个延迟消息到一个 topic
  2. Broker 判断是个延迟消息后,将消息暂存
  3. Broker 通过延迟服务, 先检查消息是否过期,如果到期将消息投递到目标 topic
  4. 消费者消费topic中的投递延迟消息。

开源RocketMQ 的消息不支持任意精度,默认支持 18个 level:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Broker 在启动的时候,会创建一个内部 topic:“SCHEDULE_TOPIC_XXXX” 根据延迟 level 数量,创建对应数量的 队列。 也就是说 18 level 对应了18 个队列。

具体可以在 代码TopicConfigManager.java 中 看到:

private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;

要注意的是,Broker 一般是集群模式
部署,也就是说,每个Broker 都会有18个队列。

TopicConfigManager#TopicConfigManager(BrokerController brokerController)

生产者消息延迟发送

代码示例如下:

Message msg=new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//设置延迟level为5,对应延迟1分钟
msg.setDelayTimeLevel(5);
producer.send(msg);

Broker 存储延迟消息

上一篇文章已经谈到,Broker 收到消费者消息后,会进行消息存储,然后再转发到消费队列(ConsumerQueue),然后再推给消费者。

其实一旦消息转发到

存储延迟消息的流程也类似

  1. 确定延迟消息投递到topic 哪个队列。存储生产者写入的消息时,将消息转发到 ConsumeQueue 中,消费者就能消费到。 延迟消息不能立即消息到,于是将 topic 名称修改为 SCHEDULE_TOPIC_XXX,并根据延迟消息级别,确定投递到哪个队列上。同时还会将原来消息要发送到的目标 topic 和队列记录投递到哪个队列。

代码在CommitLog#asyncPutMessage 中

设置延迟消息的投递队列信息代码如下:

 // Delay Deliveryif (msg.getDelayTimeLevel() > 0) {// 如果设置的级别超过了最大级别,重置延迟级别if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}
// 计算延迟消息应该投递到 SCHEDULE_TOPIC_XXXX 到哪个队列。topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId// 记录原始 topic ,queueid,方便后期投递到目标 topicMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 更新消息投递目标为 SCHEDULE_TOPIC_XXX,queueIdmsg.setTopic(topic);msg.setQueueId(queueId);}

消息转发

消息转发过程其实中会对延迟消息做一些特殊处理

CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中,会对延迟消息进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/35461.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PostgreSQL查询慢sql原因和优化方案

PostgreSQL sql查询慢优化方案有一下几种解决方案: 1.关闭会话 查询慢sql的执行会话,关闭进程。 查看数据库后台连接进程 SELECT count(*) FROM pg_stat_activity;SELECT * FROM pg_stat_activity; 查看数据库后台连接进程,但是此条SQL不…

用HARU-Net增强核分割:一种基于混合注意的残差u块网络

文章目录 Enhancing Nucleus Segmentation with HARU-Net: A Hybrid Attention Based Residual U-Blocks Network摘要本文方法损失函数后处理消融实验 Enhancing Nucleus Segmentation with HARU-Net: A Hybrid Attention Based Residual U-Blocks Network 摘要 核图像分割是…

W6100-EVB-PICO 做TCP Server进行回环测试(六)

前言 上一章我们用W6100-EVB-PICO开发板做TCP 客户端连接服务器进行数据回环测试,那么本章将用开发板做TCP服务器来进行数据回环测试。 TCP是什么?什么是TCP Server?能干什么? TCP (Transmission Control Protocol) 是一种面向连…

zabbix监控安装部署

目录 一、环境 二、配置 1.配置yum源,这里用的清华的 2.过滤一下安装包,查看依赖包 安装依赖包 3.配置数据库 开机自启 创建数据库 创建用户 授权 导入数据到数据库 查看zabbix数据库有没有表和数据 4.修改zabbix配置文件 1.修改zabbix配置…

去趋势化一个心电图信号、信号功率谱、低通IIR滤波器并平滑信号、对滤波器引起的延迟进行补偿研究(Matlab代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

SPM实现framework自动管理和分发

一、前言 Swift Package Manager (SPM) 是苹果官方提供的用于管理 Swift 项目的依赖关系和构建过程的工具。它是一个集成在 Swift 编程语言中的包管理器,用于解决在开发过程中管理和构建包依赖项的需求。 那么如何使用SPM管理和分发Objective C编写的二进制库呢&a…

不同路径 II——力扣63

class Solution {public:int uniquePathsWithObstacles(vector<vector<int>>& obstacleGrid) {int n=

【APITable】教程:创建并运行一个自建小程序

1.进入APITable&#xff0c;在想要创建小程序的看板页面点击右上角的【小程序】&#xff0c;进入小程序编辑页面。 2.创建一个新的小程序区。 点击【 添加小程序】 点击创建小程序&#xff0c;选择模板&#xff0c;输入名字。 3.确定后进入小程序部署引导页面。 4.打开Xshell 7…

初识鸿蒙跨平台开发框架ArkUI-X

HarmonyOS是一款面向万物互联时代的、全新的分布式操作系统。在传统的单设备系统能力基础上&#xff0c;HarmonyOS提出了基于同一套系统能力、适配多种终端形态的分布式理念&#xff0c;能够支持手机、平板、智能穿戴、智慧屏、车机等多种终端设备&#xff0c;提供全场景&#…

99. for循环练习题-3种方式输出0-9

【目录】 文章目录 99. for循环练习题-3种方式输出0-91. for循环和while循环的区别2. 输出 0~(n-1)的数字2.1 基础代码2.2 自定义函数代码2.3 异常处理语句代码 【正文】 99. for循环练习题-3种方式输出0-9 1. for循环和while循环的区别 for循环和while循环都用于重复执行特定…

idea模板的使用(配置xml文件模板)

1. 问题的引出 我们在日常项目中可以发现&#xff0c;sql映射文件和mybatis主配置文件&#xff0c;以及application.yml文件中有很多固定不变的内容&#xff0c;为了方面使用&#xff0c;所以可以把这些xml文件设置为模板 2. 创建模板的步骤 按照图片一步一步进行即可 点击…

(二)结构型模式:1、适配器模式(Adapter Pattern)(C++实现示例)

目录 1、适配器模式&#xff08;Adapter Pattern&#xff09;含义 2、适配器模式应用场景 3、适配器模式的UML图学习 4、C实现适配器模式的示例 1、适配器模式&#xff08;Adapter Pattern&#xff09;含义 将一个接口转换为客户端所期待的接口&#xff0c;从而使两个接口…

Mac安装nvm教程及使用

nvm 是 node 版本管理器&#xff0c;也就是说一个 nvm 可以管理多个 node 版本&#xff08;包含 npm 与 npx&#xff09;&#xff0c;可以方便快捷的安装、切换 不同版本的 node。 1、直接通过brew安装 执行命令&#xff1a;brew install nvm PS&#xff1a; 如果没有安装br…

Oracle database 静默安装 oracle 11g 一键安装

基于oracle安装包中应答文件实现一键安装 支持环境&#xff1a; Linux &#xff1a;centerOS 7 oracle &#xff1a;11.2.0 Oracle应答文件 runInstaller应答文件 /database/response/db_install.rsp netca应答文件 /database/response/netca.rsp dbca应答文件 /database/re…

python中的运算符号含义,python基本运算符的操作

本篇文章给大家谈谈python的运算符号有哪些类型&#xff0c;以及python各运算符号的功能说明&#xff0c;希望对各位有所帮助&#xff0c;不要忘了收藏本站喔。 1.算数运算符&#xff08;最常见的&#xff09; 标准算数运算符&#xff08;加减乘除&#xff09; 取余运算…

UML-状态图

目录 状态图 状态图的图符 状态机 状态 ​转换 电话机状态图 活动图和状态图区别&#xff1a; 状态图 状态图(Statechart Diagram)是描述一个实体基于事件反应的动态行为&#xff0c;显示了该实体如何根据当前所处的状态对不同的事件做出反应。通常我们创建一个UML状态…

Jmeter设置中文的两种方式,建议使用第二种

方案一 进入jmeter图像化界面&#xff0c;选择Options下的Choose Language&#xff0c;再选择Chinese(Simplified)。这个就是选择语言为简体中文&#xff08;缺陷&#xff1a;这个只是在本次使用时为中文&#xff0c;下次打开默认还是英文的&#xff09; 方案二&#xff08;…

数学建模(二)线性规划

课程推荐&#xff1a;6 线性规划模型基本原理与编程实现_哔哩哔哩_bilibili 目录 一、线性规划的实例与定义 1.1 线性规划的实例 1.2 线性规划的定义 1.3 最优解 1.4 线性规划的Mathlab标准形式 1.5 使用linprog函数 二、线性规划模型建模实战与代码 2.1 问题提出 2.2…

机器学习深度学习——seq2seq实现机器翻译(详细实现与原理推导)

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位即将上大四&#xff0c;正专攻机器学习的保研er &#x1f30c;上期文章&#xff1a;机器学习&&深度学习——seq2seq实现机器翻译&#xff08;数据集处理&#xff09; &#x1f4da;订阅专栏&#xff1a;机器学习&…

机器学习编译系列

机器学习编译MLC 1. 引言2. 机器学习编译--概述2.1 什么是机器学习编译 1. 引言 陈天奇目前任教于CMU&#xff0c;研究方向为机器学习系统。他是TVM、MXNET、XGBoost的主要作者。2022年夏天&#xff0c;陈天奇在B站开设了《机器学习编译》的课程。   《机器学习编译》课程共分…