RocketMQ 投递消息方式以及消息体结构分析:Message、MessageQueueSelector

在这里插入图片描述

🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任 Java 开发,CSDN 优质创作者
📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代
🌲文章所在专栏:RocketMQ
🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识
💬 向我询问任何您想要的东西,ID:vnjohn
🔥觉得博主文章写的还 OK,能够帮助到您的,感谢三连支持博客🙏
😄 代词: vnjohn
⚡ 有趣的事实:音乐、跑步、电影、游戏

目录

  • 前言
  • Message
    • Properties
    • Topic、Tag
    • Keys
    • Queue
  • SendCallback
  • MessageQueue
  • MessageQueueSelector
  • FAQ
  • 总结

前言

在踏入投递消息 send 方法源码解读开始前,要先搞清楚消息的结构以及消息的不同的方式实现,以便于在阅读底层源码时不至于要重头过来再梳理各自的类、属性结构起到了什么作用,在这里再啰嗦啰嗦,让 RocketMQ 源码能够更加完整.

RocketMQ 专栏篇:

从零开始:手把手搭建 RocketMQ 单节点、集群节点实例

保护数据完整性:探索 RocketMQ 分布式事务消息的力量

RocketMQ 分布式事务消息实战指南:确保数据一致性的关键设计

RocketMQ 生产者源码分析:DefaultMQProducer、DefaultMQProducerImpl

RocketMQ MQClientInstance、生产者实例启动源码分析

Message

RocketMQ 消息构成的结构如下:

  1. Topic:表示要发送消息的主题
  2. body:表示消息的存储内容
  3. Properties:表示消息属性
  4. transactionId:在事务消息中使用

Keys、Tag:无论是 RocketMQ Tag 过滤还是延迟消息等都会利用 Properties 消息属性机制,这些特殊信息都使用了系统保留的属性 Key,设置自定义属性时要避免和系统属性 Key 相冲突

服务端会根据 Keys 创建哈希索引,设置以后,可以在 Console 系统通过 Topic、Keys 来查询消息,由于是哈希索引,尽可能保证 Key 唯一,例如:订单号、会员 Id 等

RocketMQ 系统保留的属性 Key 集合,在使用消息属性机制时尽量避免:org.apache.rocketmq.common.message.MessageConst

Properties

在 Message 结构体中可以设置的属性如下:

字段名默认值必要性说明
Topicnull必填消息所属 Topic 名称
Bodynull必填消息体
Tagsnull选填消息标签:方便服务器过滤消息使用
消息属性:TAGS
Keysnull选填代表这条消息的业务关键词
消息属性:KEYS
Flag0选填由应用程序设置,RocketMQ 不作干预
DelayTimeLevel0选填消息延迟级别,0 表示不延时,
大于 0 会延迟特定的时间才会被消息
消息属性:DELAY
WaitStoreMsgOKtrue选填表示消息是否在服务器落盘后才返回 SEND_OK
消息属性:WAIT

Topic、Tag

Topic、Tag 都是用来在业务上分类的标识,区别在于 Topic 是一级分类,Tag 理解为是二级分类,使用 Tag 可以实现对 Topic 中的消息进行过滤

Topic:消息主题,通过 Topic 对不同的业务消息进行分类

Tag:消息标签,用来进一步区分某个 Topic 下的消息分类,消息从生产者发出时就带上的属性

Tag 属性可以为 Topic 做另外一层业务上再细粒化地区分子业务

在这里插入图片描述

什么时候该用 Topic,什么该用 Tag?

一般可以从以下几方面进行判别:

  1. 消息类型是否一致:普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分

    除了普通消息以外,其余的消息类型应该使用对应的后缀拼接,如:事务 _transaction、延迟 _delay、顺序 _order

  2. 业务是否相关联:没有直接关联的消息,如淘宝交易消息与京东物流消息使用不同的 Topic 区分;若同样的是京东物流消息,电器类订单、男装类订单、鞋类订单的消息可以使用 Tag 进行划分

  3. 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分

  4. 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic

总的来说,针对消息分类,可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag;通常情况下,不同的 Topic 之间消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如:全集、子集的关系,流程先后的关系.

Keys

RocketMQ 每个消息可以在业务层面上设置唯一标识码 Keys 字段,方便将来定位消息丢失的问题,Broker 会为每个消息创建哈希索引,应用可以通过 Topic、Key 来查询这条消息的内容,以及消息被谁消费,由于是哈希索引,请务必保证 Key 尽可能唯一,这样可以避免潜在的哈希冲突

// 订单Id
String orderId = "2024016163739";
message.setKeys(orderId);

Queue

为了支持高并发以及水平扩展,需要对 Topic 进行分区「类比于 Kafka Partition 机制」在 RocketMQ 中被称之为队列,一个 Topic 下可能有多个队列,并且可能分布在不同的 Broker 上

在这里插入图片描述

一般来说一条消息,若没有重复发送(比如:因为客户端没有响应而进行重试)则只会存在 Topic 其中的一个队列,消息在队列中按照先进先出的原则存储,每条消息会有自己的位点,每个队列会统计当前消息的总条数,这个称为最大位点:MaxOffset;队列的起始位置对应的位置叫做起始位点 MinOffset,队列可以提升消息发送和消费的并发度.

在这里谈到了 Topic、Queue 之间的关联,也就有必要说说 RocketMQ 中的 AKF 原则了

在这里插入图片描述

  • X 轴:处理的服务节点的单点故障问题,支持横向扩展、全量镜像
  • Y 轴:在 RocketMQ 服务节点基础上根据业务来划分出不同的 Topic
  • Z 轴:基于 Topic 分配出不同的 Queue 分区,每个 Queue 可能分散到不同的 Broker 服务节点上

SendCallback

当 send 方法 SendCallback 参数不为空,说明它是属于异步发送的方式,异步发送:发送方发出一条消息以后,不等待服务端返回响应,会接着发送下一条消息

SendCallback 是基于异步发送时需要的指定的回调接口,它提供了两个方法:

public interface SendCallback {void onSuccess(final SendResult sendResult);void onException(final Throwable e);
}

消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如:视频上传后通知启动转码服务,转码完成后通知推送转码结果等

同步发送示例代码:

DefaultMQProducer mqProducer = new DefaultMQProducer();
mqProducer.setProducerGroup("vnjohn");
mqProducer.setNamesrvAddr("172.16.249.10:9876");
mqProducer.start();
Message message = new Message();
message.setTopic("vnjohn");
message.setWaitStoreMsgOK(Boolean.FALSE);
message.setKeys("2024016163739");
message.setBody("Hello RocketMQ".getBytes(StandardCharsets.UTF_8));
message.setTags("blog");
mqProducer.send(message);

异步发送示例代码:

mqProducer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("message send success...");}@Overridepublic void onException(Throwable e) {System.out.println("message send exception...");}
});

异步发送、同步发送代码唯一区别在于调用 send 接口的参数不同,异步发送不会等待发送返回,取而代之的是 send 方法需要传入 SendCallback 的实现,SendCallback 接口主要有 onSuccess、onException 两个方法,表示消息发送成功和消息发送失败

MessageQueue

当 send 方法时,指定了 MessageQueue 参数,说明要将消息指定投递给 Topic 其中的一个队列:Queue

// MessageQueue:指定 Topic 主题、Broker、Topic-Queue
private String topic;
private String brokerName;
private int queueId;

比如我要将消息投递给 「Topic:vnjohn」 下的 「Queue:0」所属的 「brokerName:broker-0」

mqProducer.send(message, new MessageQueue("vnjohn", "broker-0", 0));

MessageQueueSelector

在生产者投递消息时,通过该类可以指定将需要保证顺序消费的消息「通过 args 来指定」放入到同一个队列中,确保在消费时可以通过消费这一个队列保证消费的顺序性

生产顺序性:

RocketMQ 通过生产者和服务端的协议保障 单个生产者以串行的方式 发送消息,并按序存储和持久化,如需保证消息生产的有序性,则必须满足以下条件:

  1. 单一生产者:消息的生产的顺序仅支持单一生产者,不同生产者分布在不同的系统,即使设置为相同的分区键-arg,不同生产者之间产生的消息也无法判定其顺序
  2. 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用了多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序

满足以上条件的生产者,将顺序消息发送至服务端以后,会保证设置了同一分区键的消息,按照发送的顺序存储在同一个队列中

在 RocketMQ 中,Topic 下只存在一个 Queue 时,生产者可以保证 全局有序性,否则生产者就只能保证 局部有序性

对于一个指定的 Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在 RocketMQ 中支持分区顺序消息,如下图所示:我们可以按照某一个标准对消息进行分区(比如图中的 ShardingKey),同一个 ShardingKey 消息会被分配到同一个队列中,并按照顺序被消费

在这里插入图片描述

顺序消息的应用场景非常广泛,在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统之间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理

例如:创建订单的场景,需要保证同一个订单的生成、付款和发货,这三个操作被顺序执行;如果是普通消息,订单-A 的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保证顺序,而顺序消息发送时将 ShardingKey 相同(同一个订单号)的消息路由到同一个逻辑队列中

提到的 SharingKey 指的就是在往服务端有序发送消息时,必须指定的一个参数:Object arg,假设它就是订单 id

在这里插入图片描述

MessageQueueSelector 是一个接口,它的定义如下:

public interface MessageQueueSelector {MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

其中 mqs 是可以发送的队列,msg 是消息,arg 是 send 方法时传递的 Object 对象,返回的是该消息需要发送到的队列

通过 MessageQueueSelector 顺序发送消息的示例代码如下:

MessageQueueSelector customMessageQueueSelector = (mqs, msg, arg) -> {int shardingKey = arg.hashCode();int messageQueueIndex = mqs.size() % shardingKey;return mqs.get(messageQueueIndex);
};
mqProducer.send(message, customMessageQueueSelector, "20240107015444");

生产环境中建议选择最细粒度的分区键进行拆分,例如:将订单 ID、用户 ID 作为分区键关键字,可实现同一个终端用户的消息按照顺序处理,不同用户的消息无须保证顺序.

FAQ

若一个 Broker 掉线,队列总数发送变化会如何?

若发送变化,那么同一个 ShardingKey 的消息就会发送到不同的队列上,造成乱序;若不发生变化,那消息将会发送到掉线 Broker 队列上,必然是失败的;因此 RocketMQ 提供了两种模式:严格顺序、顺序可用性,如果要保证严格顺序而不是可用性,创建 Topic 时要指定 -o (-order)参数,表示顺序消息主题

$ sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=true, attributes=null]

其次就是要保证 nameserver 中的配置参数:orderMessageEnablereturnOrderTopicConfigToBroker 必须是 true,如果上述任意一个条件不满足,则是保证可用性而不是严格顺序

总结

该篇博文主要介绍 Message 结构体有哪些以及它里面属性的作用,同步与异步发送之间的区别参数:SendCallback 回调接口,指定 Queue 投递消息:MessageQueue,生产者侧确保消息能够有序地投递:MessageQueueSelector,以及在 RocketMQ 如何保证全局有序、局部有序:生产者确保单 Queue 全局有序、生产者确保多 Queue 局部有序,希望该篇博文你能够喜欢,感谢三连支持❤️

学习指南针:

Rocket 官方文档

RocketMQ GitHub

🌟🌟🌟愿你我都能够在寒冬中相互取暖,互相成长,只有不断积累、沉淀自己,后面有机会自然能破冰而行!

博文放在 RocketMQ 专栏里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

推荐专栏:Spring、MySQL,订阅一波不再迷路

大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/602660.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

腾讯云取消免费10G CDN流量包:免费CDN时代结束

关注卢松松&#xff0c;会经常给你分享一些我的经验和观点。 免费送了7-8年的腾讯云10G免费流量包&#xff0c;从2024年开始&#xff0c;停止赠送了!自此&#xff0c;国内绝大多数互联网大厂的CDN都开收费了! 大概从2016年开始&#xff0c;腾讯云为了抢夺CDN客户&#xff0…

基于Spring-boot-websocket的聊天应用开发总结

目录 1.概述 1.1 Websocket 1.2 STOMP 1.3 源码 2.Springboot集成WS 2.1 添加依赖 2.2 ws配置 2.2.1 WebSocketMessageBrokerConfigurer 2.2.2 ChatController 2.2.3 ChatInRoomController 2.2.4 ChatToUserController 2.3 前端聊天配置 2.3.1 index.html和main.j…

mnn-llm: 大语言模型端侧CPU推理优化

在大语言模型(LLM)端侧部署上&#xff0c;基于 MNN 实现的 mnn-llm 项目已经展现出业界领先的性能&#xff0c;特别是在 ARM 架构的 CPU 上。目前利用 mnn-llm 的推理能力&#xff0c;qwen-1.8b在mnn-llm的驱动下能够在移动端达到端侧实时会话的能力&#xff0c;能够在较低内存…

MySQL之视图外连接、内连接和子查询的使用

一、视图 1.1 含义 虚拟表&#xff0c;和普通表一样使用 1.2 操作 创建视图 create view 视图名 as 修改视图 方式一&#xff1a; create or replace view 视图名 as 【查看视图相关字段】 方式二&#xff1a; alter view 视图名 as 【查看的SQL语句】 查看视图 方式一&…

Nginx快速入门:Nginx实现高可用|结合keepalived实现主备节点(九)

0. 引言 在生产中我们要尽可能避免单点故障&#xff0c;nginx也不例外&#xff0c;因此搭建主备节点必不可少&#xff0c;今天我们来学习下如何利用keepalived实现主备 1. keepalived简介 keepalived 是一个LINUX系统中开源的负载均衡和故障转移软件&#xff0c;它主要用于高…

2024年阿里云、腾讯云、华为云、LightNode、硅云服务器如何选?怎么买最划算?[最新价格表]

很多小伙伴都有一颗上云的心&#xff0c;包括我自己 有事没事的折腾一下自己的小破站&#xff0c;也挺有意思的&#xff01; 那么&#xff0c;云服务器哪家好&#xff1f;优惠力度哪家大&#xff1f;活动入口哪里进&#xff1f;云服务器如何配置&#xff1f;如何选型&#xf…

K8S集群部署解决工作节点couldn‘t get current server API group list问题

最近在自己电脑上装了VMWare Player&#xff0c;在上面装了两个Ubuntu虚拟机&#xff0c;为了方便学习云原生技术&#xff0c;决定在上面装一个2个节点&#xff08;一个控制面&#xff0c;一个工作节点&#xff09;的K8S集群。 参考这篇文章&#xff1a; Ubuntu 22.04 搭建K8…

kubectl的插件安装工具krew

最近得知一个kubectl插件安装工具&#xff0c;叫做krew。 官网地址是&#xff1a;Krew – kubectl plugin manager 安装krew 按照官网的做法&#xff0c;一直安装失败&#xff0c;于是拆解步骤&#xff0c;一步一步下载离线安装。 1、下载krew.yaml 地址&#xff1a;https:…

Proteus 各版本安装指南

Proteus下载链接 https://pan.baidu.com/s/1vHgg8jK9KSHdxSU9SDy4vQ?pwd0531 1.鼠标右击【Proteus8.15(64bit&#xff09;】压缩包&#xff08;win11及以上系统需先点击“显示更多选项”&#xff09;【解压到Proteus8.15(64bit&#xff09; 】。 2.打开解压后的文件夹&#…

适用于生物行业的生信云平台

随着基因检测技术的不断发展&#xff0c;生物信息云平台在基因检测行业的应用越来越广泛。生物信息云平台是一种基于云计算的技术&#xff0c;可以将基因检测数据存储在云端&#xff0c;并通过数据分析、挖掘等技术手段&#xff0c;对基因数据进行处理、分析和解读。 这种技术的…

Linux安装nginx(带http ssl)

nginx安装 nginx文件 以及gcc pcre zlib openssl 网盘下载 1.安装gcc yum -y install gcc gcc-c 2.安装pcre rpm -ivh pcre-8.32-17.el7.x86_64.rpm --force --nodeps rpm -ivh pcre-devel-8.32-17.el7.x86_64.rpm --force --nodeps 3.安装zlib tar -zxvf zlib-1.2.11.ta…

metaSPAdes,megahit,IDBA-UB:宏基因组装软件安装与使用

metaSPAdes,megahit,IDBA-UB是目前比较主流的宏基因组组装软件 metaSPAdes安装 GitHub - ablab/spades: SPAdes Genome Assembler #3.15.5的预编译版貌似有问题&#xff0c;使用源码安装试试 wget http://cab.spbu.ru/files/release3.15.5/SPAdes-3.15.5.tar.gz tar -xzf SP…

C++ 给父类带参构造函数的赋值

在类的使用中&#xff0c;默认的构造函数不带任何参数&#xff0c;但是也会因为需要而使用带参数的构造函数。 在带参的构造函数中&#xff0c;是如何继承的呢&#xff0c;这里我们通过使用基类&#xff0c;子类&#xff0c;孙类的两重继承来观察&#xff0c;如何给带参构造函数…

完美版视频网站模板 – 苹果CMS v10大橙子vfed主题

源码下载&#xff1a; https://download.csdn.net/download/m0_66047725/88700504 这次提供的大橙子 vfed 模板 已经完美&#xff0c;只去除了授权验证和正版主题神秘后门&#xff0c;不影响任何功能体验性。主题优化&#xff1a;全站响应式自带主题设置面板自带联盟资源库大全…

【C/C++】开源串口库 CSerialPort 应用

文章目录 1、简述2、效果图2.1、命令行&#xff08;不带GUI&#xff09;2.2、GUI&#xff08;这里用的Qt&#xff09; 3、串口硬件知识普及4、核心实现4.1、Qt的pro文件4.2、main文件4.3、SSerialPort类4.3.1、头文件4.3.2、源文件 4.4、Linux下的CMakeLists.txt 1、简述 本文…

设计模式之单例模式的懒饿汉

懒汉式 说白了就是你不叫我我不动&#xff0c;你叫我我才动。 类初始化模式&#xff0c;也叫延迟占位模式。在单例类的内部由一个私有静态内部类来持有这个单例类的实例。因为在 JVM 中&#xff0c;对类的加载和类初始化&#xff0c;由虚拟机保证线程安全。 public class Singl…

cesium键盘控制模型

效果&#xff1a; 由于对添加模型和更新位置api进行二次了封装&#xff0c;下面提供思路 1.添加模型 const person reactive({modelTimer: null,position: {lon: 104.07274,lat: 30.57899,alt: 1200,heading: 0,pitch: 0,roll: 0,}, }); window.swpcesium.addEntity.addMo…

SSH 无密登录配置

1)配置 ssh (1)基本语法 ssh 另一台电脑的 IP 地址 (2)ssh 连接时出现 Host key verification failed 的解决方法 [yuxuan@yuxuan102 ~]$ ssh yuxuan103 ➢ 如果出现如下内容 Are you sure you want to continue connecting (yes/no)? ➢ 输入 yes,并回车 (3)退回到 …

简单 Web Server 程序的设计与实现 (2024)

1.题目描述 Web 服务是 Internet 最方便与受用户欢迎的服务类型&#xff0c;它的影响力也远远超出了专业技术范畴&#xff0c; 已广泛应用于电子商务、远程教育、远程医疗与信息服务等领域&#xff0c;并且有继续扩大的趋势。目前很多 的 Internet 应用都是基于 Web 技术的&…

Linux驱动学习—中断

1、中断基础概念 1.1 什么是中断 CPU在正常运行期间&#xff0c;由外部或者内部引起的时间&#xff0c;让CPU停下当前正在运行的程序&#xff0c;转而去执行触发他的中断所对应的程序&#xff0c;这就是中断。 响应中断的过程&#xff1a; <1>中断请求 <2>中断…