RocketMQ为什么要保证订阅关系一致

这篇文章,笔者想聊聊 RocketMQ 最佳实践之一:保证订阅关系一致。

订阅关系一致指的是同一个消费者 Group ID 下所有 Consumer 实例所订阅的 Topic 、Tag 必须完全一致。

如果订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

1 订阅关系演示

首先我们展示正确的订阅关系:多个 Group ID 订阅了多个 Topic,并且每个 Group ID 里的多个消费者的订阅关系保持了一致。

接下来,我们展示错误的订阅关系。

从上图中,单个 Group ID 订阅了多个 Topic,但是该 Group ID 里的多个消费者的订阅关系并没有保持一致。 

代码逻辑角度来看,每个消费者实例内订阅方法的主题、 TAG、监听逻辑都需要保持一致。

接下来,我们实验相同消费组,两种不正确的场景,看看消费者和 Broker 服务有什么异常。

订阅主题不同,标签相同

订阅主题相同,标签不同

2 订阅主题不同,标签相同


当我们启动两个消费者后,消费者组名:myconsumerGroup。C1 消费者订阅主题 TopicTest , C2 消费者订阅主题 mytest

在 Broker 端的日志里,会不停的打印拉取消息失败的日志 :

2023-10-09 14:52:53 WARN PullMessageThread_2 -

the consumer’s subscription not exist, group: myconsumerGroup, topic:TopicTest

那么在这种情况下,C1 消费者是不可能拉取到消息,也就不可能消费到最新的消息。

为什么呢 ? 我们知道客户端会定时的发送心跳包到 Broker 服务,心跳包中会包含消费者订阅信息,数据格式样例如下:

"subscriptionDataSet": [{"classFilterMode": false,"codeSet": [],"expressionType": "TAG","subString": "*","subVersion": 1696832107020,"tagsSet": [],"topic": "TopicTest"},{"classFilterMode": false,"codeSet": [],"expressionType": "TAG","subString": "*","subVersion": 1696832098221,"tagsSet": [],"topic": "%RETRY%myconsumerGroup"}]

Broker 服务会调用 ClientManageProcessorheartBeat 方法处理心跳请求。

最终跟踪到代码: org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer

Broker 服务的会保存消费者信息,消费者信息存储在消费者表 consumerTable 。消费者表以消费组名为 key , 值为消费者组信息 ConsumerGroupInfo

#org.apache.rocketmq.broker.client.ConsumerManager

private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =

new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);

如果消费组的消费者信息 ConsumerGroupInfo 为空,则新建新的对象。

更新订阅信息时,订阅信息是按照消费组存放的,这步骤就会导致同一个消费组内的各个消费者客户端的订阅信息相互被覆盖。

回到消费者客户端,当消费者拉取消息时,Broker 服务会调用 PullMessageProcessor processRequest 方法 。

首先会进行前置判断,查询当前的主题的订阅信息若该主题的订阅信息为空,则打印告警日志,并返回异常的响应结果。


subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());    if (null == subscriptionData) {log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));return response;}

通过调研 Broker 端的代码,我们发现:相同消费组的订阅信息必须保持一致,否则同一个消费组内的各个消费者客户端的订阅信息相互被覆盖,从而导致某个消费者客户端无法拉取到新的消息。

C1 消费者无法消费主题 TopicTest 的消息数据,那么 C2 消费者订阅主题 mytest,消费会正常吗 ?

上图来看,依然有问题。 主题 mytest 有四个队列,但只有两个队列被分配了, 另外两个队列的消息就没有办法消费了。

要解释这个问题,我们需要重新温习负载均衡的原理。

负载均衡服务会根据消费模式为” 广播模式” 还是 “集群模式” 做不同的逻辑处理,这里主要来看下集群模式下的主要处理流程:

(1) 获取该主题下的消息消费队列集合;

(2) 查询 Broker 端获取该消费组下消费者 Id 列表;

(3) 先对 Topic 下的消息消费队列、消费者 Id 排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列;

这里的平均分配算法,类似于分页的算法,将所有 MessageQueue 排好序类似于记录,将所有消费端排好序类似页数,并求出每一页需要包含的平均 size 和每个页面记录的范围 range ,最后遍历整个 range 而计算出当前消费端应该分配到的记录。

(4) 分配到的消息队列集合与 processQueueTable 做一个过滤比对操作。

 

消费者实例内 ,processQueueTable 对象存储着当前负载均衡的队列 ,以及该队列的处理队列 processQueue (消费快照)。

  1. 标红的 Entry 部分表示与分配到的消息队列集合互不包含,则需要将这些红色队列 Dropped 属性为 true , 然后从 processQueueTable 对象中移除。
  2. 绿色的 Entry 部分表示与分配到的消息队列集合的交集,processQueueTable 对象中已经存在该队列。
  3. 黄色的 Entry 部分表示这些队列需要添加到 processQueueTable 对象中,为每个分配的新队列创建一个消息拉取请求 pullRequest , 在消息拉取请求中保存一个处理队列 processQueue (队列消费快照),内部是红黑树(TreeMap),用来保存拉取到的消息。

最后创建拉取消息请求列表,并将请求分发到消息拉取服务,进入拉取消息环节。

通过上面的介绍 ,通过负载均衡的原理推导,原因就显而易见了。

C1 消费者被分配了队列 0、队列 1 ,但是 C1 消费者本身并没有订阅主题 mytest , 所以无法消费该主题的数据。

从本次实验来看,C1 消费者无法消费主题 TopicTest 的消息数据,C2 消费者只能部分消费主题 mytest 的消息数据。

但是因为在 Broker 端,同一个消费组内的各个消费者客户端的订阅信息相互被覆盖,所以这种消费状态非常混乱,偶尔也会切换成:C1 消费者可以部分消费主题 TopicTest 的消息数据,C2 消费者无法消费主题 mytest 的消息数据。

3 订阅主题相同,标签不同

如图,C1 消费者和 C2 消费者订阅主题 TopicTest ,但两者的标签 TAG 并不相同。

启动消费者服务之后,从控制台观察,负载均衡的效果也如预期一般正常。

笔者在 Broker 端打印埋点日志,发现主题 TopicTest 的订阅信息为 :


{"classFilterMode": false,"codeSet": [66],"expressionType": "TAG","subString": "B","subVersion": 1696901014319,"tagsSet": ["B"],"topic": "TopicTest"}

 

那么这种状态,消费正常吗 ?笔者做了一组实验,消费依然混乱:

C1 消费者无法消费 TAG 值为 A 的消息 ,C2 消费者只能消费部分 TAG 值为 B 的消息

想要理解原因,我们需要梳理消息过滤机制。

首先 ConsumeQueue 文件的格式如下 :

  1. Broker 端在接收到拉取请求后,根据请求参数定位 ConsumeQueue 文件,然后遍历 ConsumeQueue 待检索的条目, 判断条目中存储 Tag 的 hashcode 是否和订阅信息中 TAG 的 hashcode 是否相同,若不符合,则跳过,继续对比下一个, 符合条件的聚合后返回给消费者客户端。
  2. 消费者在收到过滤后的消息后,也要执行过滤机制,只不过过滤的是 TAG 字符串的值,而不是 hashcode

我们模拟下消息过滤的过程:

首先,生产者将不同的消息发送到 Broker 端,不同的 TAG 的消息会发送到保存的不同的队列中。

C1 消费者从队列 0 ,队列 1 中拉取消息时,因为 Broker 端该主题的订阅信息中 TAG 值为 B ,经过服务端过滤后, C1 消费者拉取到的消息的 TAG 值都是 B , 但消费者在收到过滤的消息后,也需要进行客户端过滤,A 并不等于 B ,所以 C1 消费者无法消费 TAG 值为 A 的消息。

C2 消费者从队列 2, 队列 3 中拉取消息,整个逻辑链路是正常的 ,但是因为负载均衡的缘故,它无法消费队列 0 ,队列 1 的消息。

4 总结

什么是消费组 ?消费同一类消息且消费逻辑一致 。RocketMQ 4.X 源码实现就是为了和消费组的定义保持一致 。

规避订阅关系不一致这个问题有两种方式:

  • 合理定义好主题和标签

当我们定义好主题和标签后,需要添加新的标签时,是否可以换一个思路:换一个新的消费组或者新建一个主题。

  • 严格规范上线流程

在上线之前,梳理好相关依赖服务,梳理好上线流程,做好上线评审,并严格按照流程执行。

最后的思考:

假如从基础架构层面来思考,将订阅关系信息中心化来设计,应该也可以实现 ,但成本较高,对于中小企业来讲,并不合算。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/103724.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

EMNLP 2023 录用论文公布,速看NLP各领域最新SOTA方案

EMNLP 2023 近日公布了录用论文。 开始前以防有同学不了解这个会议&#xff0c;先简单介绍介绍&#xff1a;EMNLP 是NLP 四大顶会之一&#xff0c;ACL大家应该都很熟吧&#xff0c;EMNLP就是由 ACL 下属的SIGDAT小组主办的NLP领域顶级国际会议&#xff0c;一年举办一次。相较于…

【Vue3 Antdv】Ant Design Vue文字溢出鼠标滑上显示tooltip。不溢出,鼠标滑上不显示tooltip

组件封装代码 <template><a-tooltip mouseenter"showToolTip" v-bind"getBindValue"><template #title>{{ props.title }}</template><slot><span>{{ props.title }}</span></slot></a-tooltip> &…

DB2 HADR 配置 centos 7配置 DB2 HADR 版本 11.1,【亲测可用】全网最细

DB2 HADR 配置 centos 7配置 DB2 HADR 版本 11.1&#xff0c;【亲测可用】全网最细的男人 操作系统 linux centos7 DB2版本 11.1 主库 192.168.46.70 备库 192.168.46.71 参考文章&#xff1a;一步一个脚印 DB2 10.5 HADR 主备库配置 前置条件 机器之间时间必须同步&#xff0…

C++11新特性(lambda,可变参数模板,包装器,bind)

lambda表达式是什么&#xff1f;包装器又是什么&#xff1f;有什么作用&#xff1f;莫急&#xff0c;此篇文章将详细带你探讨它们的作用。很多同学在学习时害怕这些东西&#xff0c;其实都是方便使用的工具&#xff0c;很多情况下我们学这些新的东西觉得麻烦&#xff0c;累赘&a…

uni-app开发微信小程序的报错[渲染层错误]排查及解决

一、报错信息 [渲染层错误] Framework nner error (expect FLOW INITIALCREATION end but get FLOW CREATE-NODE) 二、原因分析及解决方案 第一种 原因&#xff1a;基础库版本的原因导致的。 解决&#xff1a; 1.修改调试基础库版本 2.详情—>本地设置—>调试基础库…

扎根嵌入式行业需要什么学历文凭?

在嵌入式行业&#xff0c;学历并不是唯一关键。我本人拥有电子工程学士学位&#xff0c;但嵌入式行业更看重实际技能和经验。视频后方有免费的嵌入式学习资料&#xff0c;入门和进阶内容都涵盖其中。嵌入式行业一般接纳各种学历&#xff0c;从专科到本科到研究生&#xff0c;甚…

CentOS 安装MySQL 详细教程

参考:https://www.cnblogs.com/dotnetcrazy/p/10871352.html 参考:https://www.cnblogs.com/qiujz/p/13390811.html 参考:https://blog.csdn.net/darry1990/article/details/130419433 一、安装 一、进入安装目录 将账户切换到root账户下&#xff0c;进入local目录下 cd /usr…

通过商品ID获取淘宝天猫商品评论数据,淘宝商品评论接口,淘宝商品评论api接口

淘宝商品评论内容数据接口可以通过以下步骤获取&#xff1a; 登录淘宝开放平台&#xff0c;进入API管理控制台。在API管理控制台中创建一个应用&#xff0c;获取到应用的App Key和Secret Key。构造请求URL&#xff0c;请求URL由App Key和Secret Key拼接而成&#xff0c;请求UR…

【每日一题】ABC311G - One More Grid Task | 单调栈 | 简单

题目内容 原题链接 给定一个 n n n 行 m m m 列的矩阵&#xff0c;问权值最大的子矩阵的权值是多少。 对于一个矩阵&#xff0c;其权值定义为矩阵中的最小值 m i n v minv minv 乘上矩阵中所有元素的和。 数据范围 1 ≤ n , m ≤ 300 1\leq n,m \leq 300 1≤n,m≤300 1 ≤…

用Python和开源NLP工具库开发一个小型聊天机器人原型

为了创建一个小型聊天机器人原型&#xff0c;我们可以使用Python和开源NLP工具库spaCy。在本示例中&#xff0c;我们将演示如何创建一个简单的问答聊天机器人&#xff0c;它可以回答一些基本问题。 首先&#xff0c;确保您已经安装了Python和spaCy&#xff0c;然后下载spaCy的…

VUE3页面截取部署后的二级目录地址

用vue3开发了一个项目&#xff0c;只能部署在根目录&#xff0c;不能加二级目录&#xff0c;后来网上找了解决方案&#xff0c;在vite.config.ts中增加base: ./,配置解决问题&#xff0c;参考下图&#xff1a; 但部署后要获取部署的二级目录地址切遇到问题&#xff0c;后来想了…

数字化教育的未来:数字孪生技术助力校园创新

随着科技的飞速发展&#xff0c;智慧校园成为教育领域的新宠。数字孪生技术&#xff0c;作为一项新兴技术&#xff0c;正日益深刻地影响着校园的运营和管理。它为学校提供了前所未有的工具和资源&#xff0c;使校园管理更加高效、智能化。本文将探讨数字孪生技术如何助力智慧校…

深度学习中的激活函数

给定一个线性变换可以把x的值映射到一条直线上&#xff0c;如下图 输出结果就是y1w1xb1 如果y1经过一个线性变换得到一个y2&#xff0c;那么x和y2的关系是什么&#xff1f; 答案&#xff0c;毫无疑问是一条直线&#xff0c;不管如何的线性变换&#xff0c;依旧是一个线性的问…

【网路安全 --- Linux,window常用命令】网络安全领域,Linux和Windows常用命令,记住这些就够了,收藏起来学习吧!!

一&#xff0c;Linux 1-1 重要文件目录 1-1-1 系统运行级别 /etc/inittab 1-1-2 开机启动配置文件 /etc/rc.local /etc/rc.d/rc[0~6].d## 当我们需要开机启动自己的脚本时&#xff0c;只需要将可执行脚本丢在 /etc/init.d 目录下&#xff0c;然后在 /etc/rc.d/rc*.d 中建…

2022年最新Python大数据之Python基础【五】(字典)

文章目录 9、字典的定义10、字典的增加11、字典的删除12、字典的修改13、字典的查询14、字典的遍历 9、字典的定义 格式&#xff1a;变量 {key1 : value1, key2: value2…}空字典定义&#xff1a; {}dict&#xff08;&#xff09; 字典中键不能重复&#xff0c;是唯一的&…

Mysql的安装配置教程(详细)

MySQL是一种流行的关系型数据库管理系统&#xff0c;用于存储和管理数据。下面是MySQL的安装和配置教程的详细步骤&#xff1a; 下载MySQL安装程序&#xff1a; 访问MySQL官方网站&#xff08;https://dev.mysql.com/downloads/mysql/&#xff09;。选择适合你操作系统的版本&a…

分享关于msvcp110dll丢失的5种不同解决方法

今天我要和大家分享的主题是&#xff1a;msvcp110dll丢失的5种不同解决方法。我们都知道&#xff0c;在电脑使用过程中&#xff0c;经常会遇到一些棘手的问题&#xff0c;比如msvcp110.dll丢失。那么&#xff0c;这个文件丢失到底是什么意思呢&#xff1f;接下来&#xff0c;我…

CentOS 7下JumpServer安装及配置(超详细版)

前言 Jumpserver是一种用于访问和管理远程设备的Web应用程序&#xff0c;通常用于对服务器进行安全访问。它基于SSH协议&#xff0c;提供了一个安全和可管理的环境来管理SSH访问。Jumpserver是基于Python开发的一款开源工具&#xff0c;其提供了强大的访问控制功能&#xff0c;…

CakePHP 3.x/4.x反序列化RCE链

最近网上公开了cakephp一些反序列化链的细节&#xff0c;但是没有公开poc&#xff0c;并且网上关于cakephp的反序列化链比较少&#xff0c;于是自己跟一下 &#xff0c;构造pop链。 CakePHP简介 CakePHP是一个运用了诸如ActiveRecord、Association Data Mapping、Front Contr…

物联网AI MicroPython传感器学习 之 HX711称重传感器

学物联网&#xff0c;来万物简单IoT物联网&#xff01;&#xff01; 一、产品简介 下图是一款量程为5kg的称重传感器&#xff0c;采用悬臂梁方式安装。传感器主体结构是一个开孔金属条&#xff0c;金属条上下表面各贴有两个应变电阻&#xff0c;当金属条受力发生变形时时&…