RocketMQ源码 Broker-SubscriptionGroupManager 订阅组管理组件源码分析

前言

SubscriptionGroupManager 继承了ConfigManager配置管理组件,拥有将内存数据持久化到磁盘文件subscriptionGroup.json的能力。它主要负责维护所有消费组在内存中的订阅数据。


源码版本:4.9.3

源码架构图

核心数据结构

主要的数据结构比较简单,维护了Map<消费组名称, 订阅组配置>的映射关系。

// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);// Map<消费组名称,订阅组配置>private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);// 内存数据版本号private final DataVersion dataVersion = new DataVersion();
}

深入看下SubscriptionGroupConfig 的数据结构。

public class SubscriptionGroupConfig {// 消费组名称private String groupName;// 是否开启消费private boolean consumeEnable = true;// 是否允许消费最早消息private boolean consumeFromMinEnable = true;// 是否允许广播消费private boolean consumeBroadcastEnable = true;// 重试队列数private int retryQueueNums = 1;// 重试最大次数private int retryMaxTimes = 16;// brokerIdprivate long brokerId = MixAll.MASTER_ID;// 当产生慢消费时,选择第几个brokerprivate long whichBrokerWhenConsumeSlowly = 1;// 是否通知消费者ids变化private boolean notifyConsumerIdsChangedEnable = true;
}

核心数据行为

数据行为主要都是对上面提到的数据结构的维护,代码 + 注释如下:

// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {public SubscriptionGroupManager() {this.init();}public SubscriptionGroupManager(BrokerController brokerController) {this.brokerController = brokerController;this.init();}private void init() {{// 初始化系统消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化过滤服务消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化自测消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化http代理消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_PULL消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true); // 激活广播模式this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_PERMISSION消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_OWNER消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);}}// 更新订阅配置,且更新内存数据版本号public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);if (old != null) {log.info("update subscription group config, old: {} new: {}", old, config);} else {log.info("create new subscription group, {}", config);}this.dataVersion.nextVersion();this.persist();}// 失效消费组public void disableConsume(final String groupName) {SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);if (old != null) {old.setConsumeEnable(false);this.dataVersion.nextVersion();}}// 查找指定消费组的订阅配置public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);if (null == subscriptionGroupConfig) {if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(group);SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);if (null == preConfig) {log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());}this.dataVersion.nextVersion();this.persist();}}return subscriptionGroupConfig;}// 将内存数据结构编码成字符串@Overridepublic String encode() {return this.encode(false);}// 获取配置文件路径@Overridepublic String configFilePath() {return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());}// 从字符串中恢复数据,写回内存数据结构@Overridepublic void decode(String jsonString) {if (jsonString != null) {SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);if (obj != null) {this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);this.dataVersion.assignNewOne(obj.dataVersion);this.printLoadDataWhenFirstBoot(obj);}}}// 将内存数据结构编码成字符串public String encode(final boolean prettyFormat) {return RemotingSerializable.toJson(this, prettyFormat);}// 当第一次启动时,打印加载数据时的日志private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();while (it.hasNext()) {Entry<String, SubscriptionGroupConfig> next = it.next();log.info("load exist subscription group, {}", next.getValue().toString());}}public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {return subscriptionGroupTable;}public DataVersion getDataVersion() {return dataVersion;}// 删除指定消费组的订阅配置public void deleteSubscriptionGroupConfig(final String groupName) {SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);if (old != null) {log.info("delete subscription group OK, subscription group:{}", old);this.dataVersion.nextVersion();this.persist();} else {log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/227811.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【PostgreSQL】从零开始:(二)PostgreSQL下载与安装

【PostgreSQL】从零开始:&#xff08;二&#xff09;PostgreSQL下载与安装 Winodws环境下载与安装PostgreSQL下载PostgreSQL安装PostgreSQL1.登录数据库2.查看下我们已有的数据库 Liunx环境下载与安装PostgreSQL使用YUM下载安装PostgreSQL1.下载PostgreSQL安装包2.安装PostgreS…

【漏洞复现】CVE-2023-47261 Dokmee ECM信息泄露致远程命令执行

漏洞描述 Dokmee ECM是一款国外企业内容管理 (ECM) 软件。每个公司的办公室每个角落都存放着文档、记录和档案。Dokmee 一系列解决方案可以帮助您高效地组织、保护和管理这些文件。支持的文件:PDF、TIFF、Word、Excel、Auto-CAD 绘图、电子邮件等。Dokmee 可以帮助您立即实现…

c#_sqlserver_三层架构winform学生信息管理及选课系统

基本功能包括管理员登录、注册学生账号、删除学生信息、查找学生信息、发布课程、修改课程、删除课程等。 教师端 登录&#xff1a;管理员登陆&#xff0c;拥有相应账号即可登录&#xff08;后台注册&#xff09;。注册学生账号&#xff1a;管理员可给学生分配学号&#xff0…

加权准确率WA,未加权平均召回率UAR和未加权UF1

加权准确率WA&#xff0c;未加权平均召回率UAR和未加权UF1 1.加权准确率WA&#xff0c;未加权平均召回率UAR和未加权UF12.参考链接 1.加权准确率WA&#xff0c;未加权平均召回率UAR和未加权UF1 from sklearn.metrics import classification_report from sklearn.metrics impor…

2018年AMC8数学竞赛真题的典型考点和详细解析

从战争中学习战争最有效。前几天&#xff0c;六分成长分析了2023年、2022年、2020、2019年的AMC8的典型考题、考点和详细答案解析。今天继续为大家分享2018年的AMC8的五道典型考题。 欢迎您查看历史文章了解之前各年的真题解析&#xff0c;本系列会持续更新&#xff0c;直到大家…

【2.5w字吐血总结 | 新手必看】全网最详细MySQL笔记

写在前面 鉴于全网MySQL知识点的总结分散难懂、良莠不齐&#xff0c;为了避免初学者少走弯路&#xff0c;更好更快地掌握MySQL知识&#xff0c;博主特地将自己所学的笔记分享出来。 如果想深度理解掌握MySQL&#xff0c;欢迎订阅专栏&#xff1a;MySQL进阶之路【秋说】&#…

王世军:铁笔翰墨染丹青 九峰冠华传千古

鸡是十二生肖中一员&#xff0c;在民间过年时常被剪成窗花&#xff0c;贴于窗户大门上。为表达人们对鸡的喜爱&#xff0c;将正月初一定为“鸡日”&#xff0c;鸡谐音“吉”&#xff0c;意为大吉大利&#xff0c;讨个好彩头。鸡又为“五德之君”&#xff0c;鸡的五德谓之文、武…

【改进YOLOv8】生猪胖瘦评价分级系统:可重参化EfficientRepBiPAN优化Neck

1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 研究背景与意义&#xff1a; 随着计算机视觉和深度学习的快速发展&#xff0c;目标检测成为了计算机视觉领域的一个重要研究方向。目标检测的目标是在图像或视频中准确地识别和定…

swing快速入门(十五)

注释很详细&#xff0c;直接上代码 上一篇 新增内容 1.文件对话框&#xff08;保存文件&#xff09; 2.文件对话框&#xff08;打开文件&#xff09; import java.awt.*; import java.awt.event.ActionEvent; import java.awt.event.ActionListener;public class swing_tes…

Linux 中使用 docker 安装 Elasticsearch 及 Kibana

Linux 中使用 docker 安装 Elasticsearch 及 Kibana 安装 Elasticsearch 和 Kibana安装分词插件 ik_smart 安装 Elasticsearch 和 Kibana 查看当前运行的镜像及本地已经下载的镜像&#xff0c;确认之前没有安装过 ES 和 Kibana 镜像 docker ps docker images从远程镜像仓库拉…

京东大数据-10月京东咖啡机市场销售数据分析-销售额增长41%,德龙等海外头部品牌店铺数据分析

如今&#xff0c;咖啡已经成为了人们日常生活中流行的生活饮品之一&#xff0c;消费量较大。随着咖啡的受众人群越来越多&#xff0c;消费者们对咖啡品质的要求也愈来愈高&#xff0c;而咖啡品质除了受咖啡豆质量影响外&#xff0c;还受制作过程中煮泡时间、水温和物料数量等因…

【学习笔记】Linux(基础知识)

第1章 Linux概况 1.1 Linux起源 四个重要的支柱: ①Unix操作系统; ②Minix操作系统; ③GNU计划; ④Internet网络。 1. Unix操作系统 UNIX的诞生 1971年,用汇编语言首先开发成功16位UNIX系统 1973年,用C语言重写了UNIX系统 创始人:Ken Thompson & Dennis Ritch…

KSP实战-使用ksp AutoService为SPI自动生成配置文件

AutoService AutoService KSP annotation processor 简介 AutoService是自动为Service Provider Interface&#xff08;SPI&#xff09;生成 META-INF/services 配置的高性能KSP注解处理器插件&#xff1b; 效果如图&#xff1a; 背过Java面试题的应该都知道SPI&#xff0c…

【c语言】【visual studio】动态内存管理,malloc,calloc,realloc详解。

引言&#xff1a;随着大一期末的到来&#xff0c;想必许多学生都学到内存的动态管理这一部分了&#xff0c;看望这篇博客后&#xff0c;希望能解除你心中对这一章节的疑惑。 (・∀・(・∀・(・∀・*) 1.malloc详解 malloc的头文件是#include <sdtlib.h>,malloc - C Ref…

【C语言】——认识指针变量和地址,以及指针变量类型的意义

&#x1f3a5; 岁月失语唯石能言的个人主页 &#x1f525;个人栏专&#xff1a;秒懂C语言 ⭐若在许我少年时&#xff0c;一两黄金一两风 目录 前言 一、指针变量和地址 1.1 取地址操作符&#xff08;&&#xff09; 1.2 指针变量和解引用操作符&#xff…

Linux上使用HTTP协议进行数据获取的实战示例

嗨&#xff0c;Linux爱好者们&#xff0c;今天我们要一起探讨一下如何在Linux上进行HTTP协议的数据获取。这不是一项简单的任务&#xff0c;但放心&#xff0c;我会以最简单的语言&#xff0c;结合实例来给大家讲解。 首先&#xff0c;我们需要一个工具&#xff0c;那就是curl…

初识Dubbo学习,一文掌握Dubbo基础知识文集(2)

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…

springMVC-@RequestMapping

基本介绍 RequestMapping注解可以指定控制器/处理器的某个方法的请求的url, 示例 &#xff08;结合springMVC基本原理理解&#xff09; Controller public class UserHandler {RequestMapping(value "/login")public String login() {System.out.println("登…

springCloud项目打包如何把jar放到指定目录下

springCloud项目打包如何把jar发放到指定目录下 maven-antrun-plugin springCloud微服务打包jar&#xff0c;模块过多&#xff1b;我的项目模块结构如下&#xff1a; 我把实体类相关的单独抽离一个模块在service-api下服务单独写在service某块下&#xff0c; 每个模块的jar都…

如何使用jQuery获取当前网址路径

如何使用jQuery获取当前网址路径概述在前端开发中&#xff0c;经常需要获取当前网址的路径&#xff0c;通过使用jQuery库&#xff0c;我们可以轻松地实现这个功能。本文将逐步介绍如何使用jQuery获取当前网址路径。流程步骤以下是实现该功能的步骤&#xff1a;步骤 描述…