【RocketMQ每日一问】RocketMQ重平衡逻辑是怎么样的?

  1. 消费者启动时,订阅相应的topic并加入到消费者组。
  2. 消费者将消费进度信息存储到Broker中,包括当前消费到的消息的offset、队列信息等。
  3. 消费者定时从Broker中获取topic的路由信息(包括消息队列、broker信息等),并更新本地缓存。
  4. 当消费者组内新增或删除消费者时,触发重平衡事件,即重新分配消息队列给消费者。
  5. 重平衡事件由消费组内最先触发的消费者发起,通知Broker开始重平衡。Broker接到重平衡请求后向消费者组内的其他消费者发送通知。
  6. 消费者接收到重平衡通知后开始重新计算自己所分配到的队列,根据拉取到的消息队列的分配策略重新分配队列。
  7. 消费者重新分配完队列后将分配结果发送给Broker,并更新本地缓存的路由信息。
  8. Broker接收到所有消费者发送的分配结果后,按照分配结果更新自己记录的消费组的消费进度信息,包括消费进度、offset等信息。
  9. 当消费者消费完消息队列中的所有消息后,将消费进度信息更新到Broker中,以便下次重平衡时可以根据消费进度信息进行分配。

以下是重平衡代码:

private boolean rebalanceByTopic(final String topic, final boolean isOrder) {boolean balanced = true;switch (messageModel) {//广播模式不需要重平衡case BROADCASTING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet);}balanced = mqSet.equals(getWorkingMessageQueue(topic));} else {this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}case CLUSTERING: {//内存中拿出所有queueSet<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);//GET_CONSUMER_LIST_BY_GROUP这个请求去ns拿出所有的客户端List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);//将两个list进行排序Collections.sort(mqAll);Collections.sort(cidAll);//重平衡策略AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {//策略进行重平衡allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);return false;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}//对比queue是否有变化,并且更新到processQueueTable中boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("client rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}balanced = allocateResultSet.equals(getWorkingMessageQueue(topic));}break;}default:break;}return balanced;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/610591.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

地表温度计算(二)--Landsat8 Collection2 level2数据集

目录 一、GEE Landsat8 Collection2 level2数据集二、操作步骤1.导入研究区矢量2.计算LST3.导出数据4.导入Arcgis查看三、总结之前做了一个劈窗算法反演,不过该反演在干旱区适用,在其他区域效果不佳。然后又尝试了一些单窗算法、ENVI中热辐射传输方法,但是这些方法都需要获取…

Linux内存管理:(七)页面回收机制

文章说明&#xff1a; Linux内核版本&#xff1a;5.0 架构&#xff1a;ARM64 参考资料及图片来源&#xff1a;《奔跑吧Linux内核》 Linux 5.0内核源码注释仓库地址&#xff1a; zhangzihengya/LinuxSourceCode_v5.0_study (github.com) 1. 触发页面回收 Linux内核中触发页…

Linq练习

准备类以及数据 class Student {public int StudentId { get; set; }public string Name { get; set; }public Course Course { get; set; }public Student(int studentId, string name, Course course){StudentId studentId;Name name;Course course;}public void PrintIn…

深入MySQL——10

查询为何如此之慢 分为两种情况一种是查询后长时间不返回的&#xff0c;还有一种是查询很慢的 我们先来说第一种情况 长时间不返回 这种情况下就是锁阻塞导致不能返回&#xff0c;可以通过show processlist来查看语句处于什么状态&#xff0c;一般情况下会出现这几种状态&a…

Vue3:vue-cli项目创建及vue.config.js配置

一、node.js检测或安装&#xff1a; node -v node.js官方 二、vue-cli安装&#xff1a; npm install -g vue/cli # OR yarn global add vue/cli/*如果安装的时候报错&#xff0c;可以尝试一下方法 删除C:\Users**\AppData\Roaming下的npm和npm-cache文件夹 删除项目下的node…

从传统部署到无服务器计算:AI应用在AWS平台上的革新与飞跃

文章目录 《快速构建AI应用–AWS无服务器AI应用实战》内容简介作者简介目录 随着人工智能技术的不断发展&#xff0c;越来越多的企业开始将人工智能应用于各个业务场景&#xff0c;以提高效率、降低成本并创造新的商业模式。然而&#xff0c;传统的人工智能解决方案往往需要大量…

从零开始C++精讲:第一篇——C++入门

文章目录 前言一、C关键字二、命名空间2.1引子2.2命名空间定义2.3命名空间的使用 三、C输入和输出3.1输出3.2输入 四、缺省参数4.1全缺省4.2半缺省 五、函数重载5.1重载概念 六、引用6.1定义6.2引用的使用示例6.2.1引用作参数6.2.1引用作返回值 6.3传值、传引用效率比较6.4常引…

超维空间M1无人机使用说明书——01、ROS机载电脑使用说明——远程连接

引言&#xff1a;远程连接通常采用两种方式&#xff0c;一种是通过可视化软件&#xff0c;如VNC、Nomachine等&#xff0c;另外一种是使用SSH。各有优缺点&#xff0c;两种远程登录方式的优缺点做一个简单的对比&#xff1a; 1、SSH优缺点 优点:1、消耗网络资源 2、运行稳定 …

前端面试题集合六(高频)

1、vue实现双向数据绑定原理是什么&#xff1f; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>…

java SSM问卷调查系统myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 java SSM问卷调查管理系统是一套完善的web设计系统&#xff08;系统采用SSM框架进行设计开发&#xff0c;springspringMVCmybatis&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代 码和数据库&#xff0c;系统主要采…

每天刷两道题——第十一天

1.1滑动窗口最大值 给你一个整数数组 nums&#xff0c;有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。返回滑动窗口中的最大值 。 输入&#xff1a;nums [1,3,-1,-3,5,3,6,7], k 3 输出&…

面试宝典之微服务框架面试题

S1、集群与分布式有啥区别&#xff1f; &#xff08;1&#xff09;相同点&#xff1a; 分布式和集群都是需要有很多节点服务器通过网络协同工作完成整体的任务目标。 &#xff08;2&#xff09;不同点&#xff1a; 分布式是指将业务系统进行拆分&#xff0c;即分布式的每一个…

Java微服务架构实践指南

Java 微服务架构是一种在软件开发中越来越受欢迎的架构风格&#xff0c;它将复杂的单体应用程序拆分成一组小型、独立的服务。每个服务都可以独立开发、部署和扩展&#xff0c;并通过轻量级通信机制相互协作。以下是 Java 微服务架构的实践指南&#xff1a; 拆分服务&#xff…

SpringBoot 注解超全详解(整合超详细版本)

使用注解的优势&#xff1a; 采用纯java代码&#xff0c;不在需要配置繁杂的xml文件 在配置中也可享受面向对象带来的好处 类型安全对重构可以提供良好的支持 减少复杂配置文件的同时亦能享受到springIoC容器提供的功能 1注解详解&#xff08;配备了完善的释义&#xff09…

力扣433. 最小基因变化

广度优先搜索 思路&#xff1a; 经过分析可知&#xff0c;基因 A 突变到基因 B&#xff0c;需要满足以下条件&#xff1a; 序列 A 与序列 B 只有一个字符不同&#xff1b;变化字符在集合中&#xff1b;突变后的基因 B 一定在 bank 中&#xff1b;尝试搜索所有合法突变的基因集…

Pycharm中如何配置python环境(conda)

首先在pycharm中点击 "File" > "Settings" 再次点击如下操作&#xff1a; 点击Python Interpreter的最右侧按钮&#xff0c;点击Show All... 找到python文件 最后点击OK

若依项目的table列表中对每一个字段增加排序按钮(单体版和前后端分离版)

一、目标&#xff1a;每一个字段都添加上下箭头用来排序 只需要更改前端代码&#xff0c;不需要更改后端代码&#xff0c;后面会讲解原理 二、单体版实现方式&#xff1a; 1.在options中添加sortable:true 2.在需要排序的字段中添加sortable:true 三、前后端分离版 1.el-tab…

Open CASCADE学习|非线性方程组

非线性方程组是一组包含非线性数学表达式的方程&#xff0c;即方程中含有未知数的非线性项。解这类方程组通常比解线性方程组更为复杂和困难。 非线性方程组在很多领域都有应用&#xff0c;例如物理学、工程学、经济学等。解决非线性方程组的方法有很多种&#xff0c;包括数值…

Unity中打印信息的两种方式

不继承MonoBehaviour的普通C#类中打印信息&#xff1a; 使用Debug类的方法&#xff1a; Unity提供了Debug类&#xff0c;其中包含了一些用于打印信息的静态方法。以下是常用的几种方法&#xff1a; Debug.Log(message)&#xff1a;打印普通信息。Debug.LogWarning(message)&a…

面试题-DAG 有向无环图

有向无环图用于解决前后依赖问题&#xff0c;在Apollo中用于各个组件的依赖管理。 在算法面试中&#xff0c;有很多相关题目 比如排课问题&#xff0c;有先修课比如启动问题&#xff0c;需要先启动1&#xff0c;才能启动2 概念 顶点&#xff1a; 图中的一个点&#xff0c;比…