10、RocketMQ的Comsumer的消息队列的分配

前置知识:RocketMQ的topic存在多个队列,而多个topic分配在同一消费组里面,消费组里面存在多个消费者,当消费者注入到消费组时要进行消费者与多个队列之间的分配,而这种分配被称之为Rebalance机制,该机制的本意就是为了提升并行的消费能力。

RocketMQ当中保证的机制就是一个队列最多分给一个消费者,而一个消费者可以消费多个队列,当某个消费组下的消费者数量大于队列数时就会导致存在消费者分配不到任何队列。
在这里插入图片描述
之所以保证一个队列最多给一个消费者消费,就是需要保证消息的顺序性和可靠性,同时一个消费者可以消费多个队列增加消费者的并发消费能力和负载均衡性。

执行RebalanceService进行重分配

在DefaultMQPushConsumerImpl执行run方法的最后会去执行rebalanceImmediately()方法主动进行重平衡。

// 新的Consumer服务启动的时候,主动调用rebalanceImmediately唤醒
// 负载均衡服务rebalanceService,进行重分配。
public void rebalanceImmediately() {this.rebalanceService.wakeup();
}

在DefaultMQPushConsumerImpl执行run方法时会去启动CreateMQClientInstance客户端通信实例,这时就会去执行this.rebalanceService.start()方法启动重分配服务,然后就执行rebalanceService服务的run方法,每隔20s执行一次重分配。

@Override
public void run() {log.info(this.getServiceName() + " service started");// 服务没停止一直运行while (!this.isStopped()) {// 等待运行,默认休息20s,可以通过rocketmq.client.rebalance.waitInterval来配置this.waitForRunning(waitInterval);// 执行该方法进行负载均衡this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");
}

Broker心跳处理或topic的新增与删除,当新的Consumer被注册进来,MQClientInstance内部的服务也会定时30s发送心跳信息给broker,当发送给Broker之后处理Code为HEART_BEAT,根据Broker启动时候注册处理器方法registerProcessor(),最终处理逻辑由ClientManageProcessor的processRequest方法去处理,最终循环遍历处理consumerDataSet集合,如果consumer信息发生改变,(两个条件判断改变isNotifyConsumerIdsChangedEnable为true,存在updateChannel更新连接,存在updateSubscription更新订阅)broker会发送Code为NOTIFY_CONSUMER_IDS_CHANGED请求给同组的所有consumer客户端,要求进行重分配操作。


public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {// 心跳处理case RequestCode.HEART_BEAT:return this.heartBeat(ctx, request);case RequestCode.UNREGISTER_CLIENT:return this.unregisterClient(ctx, request);case RequestCode.CHECK_CLIENT_CONFIG:return this.checkClientConfig(ctx, request);default:break;}return null;
}

无论是那种方法都会去执行RebalanceService的run方法,实现重分配逻辑。

public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {this.waitForRunning(waitInterval);// 执行该方法进行负载均衡this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");
}

doRebalance()方法执行重分配。

public void doRebalance(final boolean isOrder) {// 获取当前消费者的订阅信息集合Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {// 不为空,编辑订阅信息for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {// 根据topic重新分配this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}this.truncateMessageQueueNotMyTopic();
}

获取当前消费者的订阅信息集合,然后遍历订阅信息集合,获取订阅的topic,调用rebalanceByTopic方法对该topic进行重分配。

rebalanceByTopic方法

private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {// 广播模式case BROADCASTING: {// 根据topic获取MessageQueue集合Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}// 集群模式case CLUSTERING: {// 获取Topic下的所有的队列Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);// 获取同一个消费者组的所有实例List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}// 负载均衡分配MessageQueueif (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();// 备份所有队列mqAll.addAll(mqSet);// 两个队列进行排序Collections.sort(mqAll);Collections.sort(cidAll);// 获取分配策略AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {// 进行分配,存在6种不同的策略allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;}
}

广播模式下没有重分配之说,每个Consumer都会去消费所有消息。

集群模式下获取同一消费组下的消费者,查找分配策略AllocateMessageQueueStrategy,执行allocate方,进行重分配。

AllocateMessageQueueStrategy是RocketMQ消费者之间消息分配的策略算法接口,本身来说RocketMQ提供6个内置实现,同时我们也可以通过实现该接口来定义自己的策略。
在这里插入图片描述

AllocateMachineRoomNearby表示机房就近分配

public List<MessageQueue>  allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();// 参数校验if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return result;}// 将消息队列根据机房分组Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();for (MessageQueue mq : mqAll) {String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);if (StringUtils.isNoneEmpty(brokerMachineRoom)) {if (mr2Mq.get(brokerMachineRoom) == null) {mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());}mr2Mq.get(brokerMachineRoom).add(mq);} else {throw new IllegalArgumentException("Machine room is null for mq " + mq);}}// 将消息者根据机房分组Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();for (String cid : cidAll) {String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);if (StringUtils.isNoneEmpty(consumerMachineRoom)) {if (mr2c.get(consumerMachineRoom) == null) {mr2c.put(consumerMachineRoom, new ArrayList<String>());}mr2c.get(consumerMachineRoom).add(cid);} else {throw new IllegalArgumentException("Machine room is null for consumer id " + cid);}}List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();// 获取当前消费者的机房,然后就当前消费者分配到就近机房String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));}//2.allocate the rest mq to each machine room if there are no consumer alive in that machine roomfor (Entry<String, List<MessageQueue>> machineRoomEntry : mr2Mq.entrySet()) {if (!mr2c.containsKey(machineRoomEntry.getKey())) { // no alive consumer in the corresponding machine room, so all consumers share these queuesallocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, machineRoomEntry.getValue(), cidAll));}}return allocateResults;
}

首先将消费者和队列按照机房进行分组,然后得到当前消费者的机房信息,如果消费者和队列属于同一个机房就对其进行分配,具体使用何种策略根据传递进来的allocateMessageQueueStrategy确定,如果没有对应上的消费者,那么消费队列就有所有的消费者分配,具体策略也由传入的allocateMessageQueueStrategy确定。
在这里插入图片描述

AllcateMessageQueueAveragely表示平均分配

public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();// 检验参数if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return result;}// currentCID在cidAll中的索引位置int index = cidAll.indexOf(currentCID);// 计算平均分配后的余数,大于0表示不能被整除int mod = mqAll.size() % cidAll.size();// 计算当前消费者分配的队列数int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;int range = Math.min(averageSize, mqAll.size() - startIndex);// 按顺序分配for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;
}

简单理解就是消息队列数与消费者数量进行相除,余数为0则正好平均分配,余数不为0则每个消费者最少分配除数个数的数量而余数只有排在前面的消费者能够分配到。
在这里插入图片描述
AllocateMessageQueueAveragelyByCircle表示环形平均分配

public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();//检查参数if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return result;}// 获取下标int index = cidAll.indexOf(currentCID);// 依次分配(轮询)for (int i = index; i < mqAll.size(); i++) {if (i % cidAll.size() == index) {result.add(mqAll.get(i));}}return result;
}

简单的理解就是依次去分配,直到分配完所有消息队列(归根结底就是轮询)。
在这里插入图片描述

AllocateMessageQueueAveragelyByCircle表示环形平均分配
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {return this.messageQueueList;
}

就是给我们一些扩容能够在调用setMessageQueueList方法来自定义需要消费的消息队列集合。

AllocateMessageQueueByMachineRoom表示机房平均分配
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();// 检查配置if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return result;}// 获取下标int currentIndex = cidAll.indexOf(currentCID);if (currentIndex < 0) {return result;}List<MessageQueue> premqAll = new ArrayList<MessageQueue>();for (MessageQueue mq : mqAll) {String[] temp = mq.getBrokerName().split("@");if (temp.length == 2 && consumeridcs.contains(temp[0])) {premqAll.add(mq);}}// 平均分配的对立int mod = premqAll.size() / cidAll.size();int rem = premqAll.size() % cidAll.size();int startIndex = mod * currentIndex;int endIndex = startIndex + mod;for (int i = startIndex; i < endIndex; i++) {result.add(premqAll.get(i));}if (rem > currentIndex) {result.add(premqAll.get(currentIndex + mod * cidAll.size()));}return result;
}

消息者只要绑定对应组里面的broker,这种策略要求brokerName的命名必须按照“机房名@brokerName”,消费者在分配队列的时候,首先会按照机房名称过来出所有的MesgeQueue,然后按照对应策略进行分配。同时AllocateMessageQueueByMachineRoom 更关注机房的划分和分配,而 AllocateMachineRoomNearby 则更关注就近部署和网络延迟的优化。

AllocateMessageQueueConsisterntHash表示一致性哈希分配

public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();// 检查配置if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return result;}Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();for (String cid : cidAll) {cidNodes.add(new ClientNode(cid));}final ConsistentHashRouter<ClientNode> router; //for building hash ringif (customHashFunction != null) {router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);} else {router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);}List<MessageQueue> results = new ArrayList<MessageQueue>();for (MessageQueue mq : mqAll) {ClientNode clientNode = router.routeNode(mq.toString());if (clientNode != null && currentCID.equals(clientNode.getKey())) {results.add(mq);}}return results;}

这个队列策略将Consumer的哈希值与Queue的哈希值作为Node节点都存放在hash环上,通过逆时针方向,距离Queue最近的那个Consumer就是该Queue要分配的Consumer,其中存在virtualNodeCnt对象,virtualNodeCnt表示物理节点的虚拟线程。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/216489.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux命令大全(全网最细讲解)

文章目录 一、基础知识&#xff08;1&#xff09; Linux系统的文件结构&#xff08;2&#xff09; Linux系统命令行的含义&#xff08;3&#xff09;命令的组成二、基础操作&#xff08;1&#xff09; 关闭系统&#xff08;2&#xff09; 关闭重启&#xff08;3&#xff09; 帮…

节日问候:在 Metaverse 中一起庆祝节日!

冬季即将来临&#xff0c;节日的脚步也越来越近&#xff0c;是时候通过 The Sandbox 中的最新活动——“节日问候”来迎接节日气氛了&#xff01;为期 43 天的庆祝活动从 12 月 11 日开始&#xff0c;到 1 月 22 日结束&#xff0c;将带领玩家穿越一个充满 60 种体验的冬季仙境…

d2l绘图不显示的问题

之前试了各种方法都不行 在pycharm中还是不行&#xff0c;但是在anaconda中的命令行是可以的 anaconda prompt conda activaye py39 #进入f盘 F: #运行文件 python F:\python_code\softmax.py

FreeRTOS的三处栈空间设置分析

1、汇编启动代码中设置栈 这个栈空间只有300字节&#xff0c;是用于汇编启动代码早期&#xff0c;以及调用C语言的main函数&#xff08;创建任务等&#xff09;在创建好任务&#xff0c;启动调取器后&#xff0c;这个栈空间就被抛弃掉&#xff0c;后续不会使用到等调度器开启后…

深入理解Dubbo-5.服务注册源码分析

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱吃芝士的土豆倪&#xff0c;24届校招生Java选手&#xff0c;很高兴认识大家&#x1f4d5;系列专栏&#xff1a;Spring源码、JUC源码、Kafka原理、分布式技术原理&#x1f525;如果感觉博主的文章还不错的话&#xff…

Self-Distillation from the Last Mini-Batch for Consistency Regularization中文版

Self-Distillation from the Last Mini-Batch for Consistency Regularization 从上一个小批量自发蒸馏&#xff0c;实现一致性正则化 摘要 知识蒸馏&#xff08;Knowledge distillation&#xff0c;KD&#xff09;展示了强大的潜力&#xff0c;作为一种强有力的正则化策略&a…

鸿蒙Stage模型开发—创建你的第一个ArkTS应用

Stage模型开发概述 基本概念 下图展示了Stage模型中的基本概念。 图1 Stage模型概念图 UIAbility组件和ExtensionAbility组件 Stage模型提供UIAbility和ExtensionAbility两种类型的组件&#xff0c;这两种组件都有具体的类承载&#xff0c;支持面向对象的开发方式。UIAbility…

关于代码质量度量和分析的一些总结

最近团队做CMMI3认证&#xff0c;这期间涉及到了代码质量度量。花了点时间做了总结&#xff0c;分享给大家。 先看一张整体的图&#xff0c;然后逐个指标展开说明。 一、单元测试覆盖率 单元测试覆盖率&#xff08;Coverage&#xff09;是一个度量单元测试覆盖了多少代码的指标…

CTF V8 pwn入门(一)

仍然是因为某些原因&#xff0c;需要学学浏览器pwn 环境 depot_tools建议直接去gitlab里下&#xff0c;github上这个我用魔法都没下下来 下完之后执行 echo export PATH$PATH:"/root/depot_tools" >> ~/.bashrc路径换成自己的就ok了 然后是ninja git clo…

《opencv实用探索·十七》calcBackProject直方图反向投影

在了解反向投影前需要先了解下直方图的概念&#xff0c;可以看我上一章内容&#xff1a;opencv直方图计算calcHist函数解析 直方图反向投影是一种图像处理技术&#xff0c;通常用于目标检测和跟踪。通过计算反向投影&#xff0c;可以将图像中与给定模式&#xff08;目标对象&a…

c++ map

unordered_map #include <iostream> #include <string> #include <unordered_map>int main() {// 创建包含三个字符串的&#xff08;映射到字符串的&#xff09;unordered_mapstd::unordered_map<std::string, std::string> u {{"red", &qu…

《opencv实用探索·十八》Camshift进行目标追踪流程

CamShift&#xff08;Continuously Adaptive Mean Shift&#xff09;是一种用于目标跟踪的方法&#xff0c;它是均值漂移&#xff08;Mean Shift&#xff09;的扩展&#xff0c;支持对目标的旋转跟踪&#xff0c;能够对目标的大小和形状进行自适应调整。 cv::CamShift和cv::me…

焦炭冶金工艺3D可视化仿真展示更直观、形象

冶金行业作为重要的工业领域&#xff0c;其岗位实践培训一直面临着诸多挑战&#xff0c;随着web3d开发和VR虚拟仿真技术的不断创新和应用&#xff0c;冶金3D虚拟仿真实践教学平台应运而生&#xff0c;为钢铁生产培训带来了崭新的变革。 冶金3D虚拟仿真实践教学平台采用了先进的…

【CANN训练营】高阶笔记

Ascend C Tilling计算 Tilling基本概念介绍 大多数情况下&#xff0c;Local Memory的存储&#xff0c;无法完全容纳算子的输入与输出的所有数据&#xff0c;需要每次搬运一部分输入数柜进行计算然后搬出&#xff0c;再敲运下一部分输入数据进行计算&#xff0c;直到得到完愁的…

GoEasy使用手册

GoEasy官网 登录 - GoEasy 即时通讯聊天案例 GoEasy - GoEasy (gitee.com) 注意事项 接口使用人数上限为15&#xff0c;超出之后会请求超时返回408状态码&#xff0c;可以新建一个应用用来更换common Key 创建应用 ​ 添加应用名称&#xff0c;其余默认&#xff0c;点击…

算法通关村第十三关—数学与数学基础问题(青铜)

数学与数学基础问题 一、统计专题 1.1 符号统计 LeetCode1822给定一个数组&#xff0c;求所有元素的乘积的符号&#xff0c;如果最终答案是负的返回-1&#xff0c;如果最终答案是正的返回1&#xff0c;如果答案是0返回0。  题目比较简单&#xff0c;正数对结果完全没影响&…

CVE-2021-4145:类型混淆导致释放任意 file 结构体

前言 影响版本&#xff1a; v5.13.4 之前 测试版本&#xff1a;v5.13.3 &#xff08;感谢 bsauce 大佬提供的测试环境&#xff09; 漏洞发生在 fsconfig 处理时调用的cgroup1_parse_param 函数中&#xff0c;patch&#xff1a; diff --git a/kernel/cgroup/cgroup-v1.c b/k…

软路由R4S+iStoreOS如何实现公网远程桌面本地电脑

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;数据结构、Cpolar杂谈 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 一. 简介1.1 软路由的定义1.2 使用软路由的好处1.3 常用组网 二. 配置远程桌面公网地址三. 家中使用…

mysql原理--B+树索引的使用

1.索引的代价 在介绍如何更好的使用索引之前先要了解一下使用这玩意儿的代价&#xff0c;它在空间和时间上都会拖后腿&#xff1a; (1). 空间上的代价 这个是显而易见的&#xff0c;每建立一个索引都要为它建立一棵 B 树&#xff0c;每一棵 B 树的每一个节点都是一个数据页&…

抖去推--短视频剪辑、矩阵无人直播saas营销工具一站式开发

抖去推是一款短视频剪辑和矩阵无人直播SAAS营销工具一站式开发平台。它提供了以下功能和特点&#xff1a; 1. 短视频剪辑&#xff1a;抖去推提供了一系列的剪辑工具&#xff0c;包括自动剪辑、特效制作、配音配乐等&#xff0c;可以帮助用户轻松制作出高质量的短视频。 2. 矩阵…