RocketMQ 中的 ProducerManager 组件剖析

一、引言

在分布式系统的消息传递领域,RocketMQ 以其高性能、高可用性和强大的扩展性脱颖而出。ProducerManager 作为 RocketMQ 中的一个关键组件,在消息生产环节发挥着至关重要的作用。它负责管理消息生产者(Producer)的生命周期、配置和操作,为系统的稳定运行和高效消息传递提供了坚实的基础。

二、ProducerManager 的核心功能

2.1 核心属性

//网络连接过期超时时间private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;//获取可用网络连接重试次数 默认3次private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3;//生产组-》网络连接-》客户端信息private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =new ConcurrentHashMap<>();//每个生产者网络客户端id到网络连接的映射关系private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();//正数计数器private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();

主要数据结构为groupChannelTable,里面进行存放的是key为生产者组的名字,value为ConcurrentHashMap<Channel, ClientChannelInfo>,这个Map中Channel为与客户端通信的channel,value为ClientChannelInfo 是客户端的信息,主要属性为:

// 消费者客户端网络连接信息
public class ClientChannelInfo {private final Channel channel;//消费者客户端网络连接idprivate final String clientId;//编程语言private final LanguageCode language;//版本语言private final int version;//最后更新时间戳private volatile long lastUpdateTimestamp = System.currentTimeMillis();//.....省略代码
}

2.2 核心方法

  1. 自动扫描方法,会每隔一段时间进行针对groupChannelTable中的数据进行扫描,将Map中Channel最后更新时间,超过2分钟没有进行更新的连接从groupChannelTable中进行移除。

代码如下:

    /*** 扫描生产者过期的网络连接*/public void scanNotActiveChannel() {for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {final String group = entry.getKey();final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();while (it.hasNext()) {Entry<Channel, ClientChannelInfo> item = it.next();// final Integer id = item.getKey();final ClientChannelInfo info = item.getValue();long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();if (diff > CHANNEL_EXPIRED_TIMEOUT) {it.remove();clientChannelTable.remove(info.getClientId());log.warn("SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);RemotingUtil.closeChannel(info.getChannel());}}}}

 2.处理生产者连接关闭的事件

  //处理生产者连接关闭的事件public synchronized void doChannelCloseEvent(final String remoteAddr, final Channel channel) {if (channel != null) {for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {final String group = entry.getKey();final ConcurrentHashMap<Channel, ClientChannelInfo> clientChannelInfoTable =entry.getValue();final ClientChannelInfo clientChannelInfo =clientChannelInfoTable.remove(channel);if (clientChannelInfo != null) {clientChannelTable.remove(clientChannelInfo.getClientId());log.info("NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",clientChannelInfo.toString(), remoteAddr, group);}}}}

3.生产者的注册与下线

注册方法:

    //注册生产者public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {ClientChannelInfo clientChannelInfoFound = null;ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);if (null == channelTable) {channelTable = new ConcurrentHashMap<>();this.groupChannelTable.put(group, channelTable);}clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());if (null == clientChannelInfoFound) {channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());log.info("new producer connected, group: {} channel: {}", group,clientChannelInfo.toString());}if (clientChannelInfoFound != null) {clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());}}

下线方法

   //生产者的下线public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);if (null != channelTable && !channelTable.isEmpty()) {ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());clientChannelTable.remove(clientChannelInfo.getClientId());if (old != null) {log.info("unregister a producer[{}] from groupChannelTable {}", group,clientChannelInfo.toString());}if (channelTable.isEmpty()) {this.groupChannelTable.remove(group);log.info("unregister a producer group[{}] from groupChannelTable", group);}}}

4. 根据生产者的groupId获取可用的连接

//获取当前生产者组可用的连接public Channel getAvailableChannel(String groupId) {if (groupId == null) {return null;}List<Channel> channelList;ConcurrentHashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);if (channelClientChannelInfoHashMap != null) {channelList = new ArrayList<>(channelClientChannelInfoHashMap.keySet());} else {log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);return null;}int size = channelList.size();if (0 == size) {log.warn("Channel list is empty. groupId={}", groupId);return null;}Channel lastActiveChannel = null;//轮询算法,依次获取生产者组的每一个生产者的连接地址int index = positiveAtomicCounter.incrementAndGet() % size;Channel channel = channelList.get(index);int count = 0;boolean isOk = channel.isActive() && channel.isWritable();while (count++ < GET_AVAILABLE_CHANNEL_RETRY_COUNT) {if (isOk) {return channel;}if (channel.isActive()) {lastActiveChannel = channel;}index = (++index) % size;channel = channelList.get(index);isOk = channel.isActive() && channel.isWritable();}return lastActiveChannel;}

三、总结

总的来看,ProducerManager主要是通过类中的方法对groupChannelTable集合中的属性进行操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/75555.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

k8s进阶之路:本地集群环境搭建

概述 文章将带领大家搭建一个 master 节点&#xff0c;两个 node 节点的 k8s 集群&#xff0c;容器基于 docker&#xff0c;k8s 版本 v1.32。 一、系统安装 安装之前请大家使用虚拟机将 ubuntu24.04 系统安装完毕&#xff0c;我是基于 mac m1 的系统进行安装的&#xff0c;所…

深度学习数据集划分比例多少合适

在机器学习和深度学习中&#xff0c;测试集的划分比例需要根据数据量、任务类型和领域需求灵活调整。 1. 常规划分比例 通用场景 训练集 : 验证集 : 测试集 60% : 20% : 20% 适用于大多数中等规模数据集&#xff08;如数万到数十万样本&#xff09;&#xff0c;平衡了训练数…

【TS学习】(15)分布式条件特性

在 TypeScript 中&#xff0c;分布式条件类型&#xff08;Distributive Conditional Types&#xff09; 是一种特殊的行为&#xff0c;发生在条件类型作用于裸类型参数&#xff08;Naked Type Parameter&#xff09; 时。这种特性使得条件类型可以“分布”到联合类型的每个成员…

NSSCTF [HGAME 2023 week1]simple_shellcode

3488.[HGAME 2023 week1]simple_shellcode 手写read函数shellcode和orw [HGAME 2023 week1]simple_shellcode (1) motalymotaly-VMware-Virtual-Platform:~/桌面$ file vuln vuln: ELF 64-bit LSB pie executable, x86-64, version 1 (SYSV), dynamically linked, interpret…

PostgreSQL的扩展(extensions)-常用的扩展-pg_dirtyread

PostgreSQL的扩展&#xff08;extensions&#xff09;-常用的扩展-pg_dirtyread pg_dirtyread 是 PostgreSQL 的一个特殊扩展&#xff0c;它允许读取已被删除但尚未被 VACUUM 清理的数据行&#xff0c;是数据恢复的重要工具。 原理&#xff1a; pg_dirtyread 通过直接访问表的…

linux3 mkdir rmdir rm cp touch ls -d /*/

Linux 系统的初始目录结构遵循 FHS&#xff08;Filesystem Hierarchy Standard&#xff0c;文件系统层次标准&#xff09;&#xff0c;定义了每个目录的核心功能和存储内容。以下是 Linux 系统初始安装后的主要目录及其作用&#xff1a; 1. 核心系统目录 目录用途典型内容示例…

Bazel中的Symbol, Rule, Macro, Target, Provider, Aspect 等概念

学习Bazel &#xff0c;就要学习Bazel 的规则定义&#xff0c; 弄清各个概念是重要的一个步骤。 在 Bazel 规则定义中&#xff0c;Symbol、Rule 和 Macro 是常见的概念。除此之外&#xff0c;Bazel 还有 Target、Provider、Aspect Repository、Package、 Workspace、 Configura…

深入探究 Hive 中的 MAP 类型:特点、创建与应用

摘要 在大数据处理领域,Hive 作为一个基于 Hadoop 的数据仓库基础设施,提供了方便的数据存储和分析功能。Hive 中的 MAP 类型是一种强大的数据类型,它允许用户以键值对的形式存储和操作数据。本文将深入探讨 Hive 中 MAP 类型的特点,详细介绍如何创建含有 MAP 类型字段的表…

基于Java的区域化智慧养老系统(源码+lw+部署文档+讲解),源码可白嫖!

摘 要 时代在飞速进步&#xff0c;每个行业都在努力发展现在先进技术&#xff0c;通过这些先进的技术来提高自己的水平和优势&#xff0c;区域化智慧养老系统当然不能排除在外。区域化智慧养老系统是在实际应用和软件工程的开发原理之上&#xff0c;运用Java语言、JSP技术以及…

关于JVM和OS中的指令重排以及JIT优化

关于JVM和OS中的指令重排以及JIT优化 前言&#xff1a; 这东西应该很重要才对&#xff0c;可是大多数博客都是以讹传讹&#xff0c;全是错误&#xff0c;尤其是JVM会对字节码进行重排都出来了&#xff0c;明明自己测一测就出来的东西&#xff0c;写出来误人子弟… 研究了两天&…

VS2022远程调试Linux程序

一、 1、VS2022安装参考 VS Studio2022安装教程&#xff08;保姆级教程&#xff09;_visual studio 2022-CSDN博客 注意&#xff1a;勾选的时候&#xff0c;要勾选下方的选项&#xff0c;才能调试Linux环境下运行的程序&#xff01; 2、VS2022远程调试Linux程序测试 原文参…

WPF设计学习记录滴滴滴4

<Button x:Name"btn"Content"退出"Width" 100"Height"25"Click"btn_Click" IsDefault"True"/> <Button x:Name"btn" <!-- 控件标识&#xff1a;定义按钮的实例名称为"btn&…

JVM 有哪些垃圾回收器

垃圾收集算法 标记-复制算法(Copying): 将可用内存按容量划分为两个区域,每次只使用其中的一块。当这一块的内存用完了,就将还存活着的对象复制到另外一块上面, 然后再把已使用过的内存空间一次清理掉。 标记-清除算法(Mark-Sweep): 算法分为“标记” 和“清除”两个…

React DndKit 实现类似slack 类别、频道拖动调整位置功能

一周调试终于实现了类 slack 类别、频道拖动调整位置功能。 历经四个版本迭代。 实现了类似slack 类别、频道拖动调整功能 从vue->react &#xff1b;更喜欢React的生态及编程风格&#xff0c;新项目用React来重构了。 1.zustand全局状态 2.DndKit 拖动 功能视频&…

新浪财经股票每天10点自动爬取

老规矩还是先分好三步&#xff0c;获取数据&#xff0c;解析数据&#xff0c;存储数据 因为股票是实时的&#xff0c;所以要加个cookie值&#xff0c;最好分线程或者爬取数据时等待爬取&#xff0c;不然会封ip 废话不多数&#xff0c;直接上代码 import matplotlib import r…

使用Android 原生LocationManager获取经纬度

一、常用方案 1、使用LocationManager GPS和网络定位 缺点&#xff1a;个别设备,室内或者地下停车场获取不到gps定位,故需要和网络定位相结合使用 2、使用Google Play服务 这种方案需要Android手机中有安装谷歌服务,然后导入谷歌的第三方库&#xff1a; 例如&#xff1a;i…

验证码实现

验证码案例 学了Spring MVC &#xff0c;配置 相关章节&#xff0c; 现可以尝试写一个前后端交互的验证码 文章目录 验证码案例前言一、验证码是什么&#xff1f;二、需求1.引入依赖2.导入前端页面3.约定前后段交互接口 三、代码解析Controllermodelapplication.xml 四丶结果五…

查询当前用户的购物车和清空购物车

业务需求&#xff1a; 在小程序用户端购物车页面能查到当前用户的所有菜品或者套餐 代码实现 controller层 GetMapping("/list")public Result<List<ShoppingCart>> list(){List<ShoppingCart> list shoppingCartService.shopShoppingCart();r…

(多看) CExercise_05_1函数_1.2计算base的exponent次幂

题目&#xff1a; 键盘录入两个整数&#xff1a;底(base)和幂指数(exponent)&#xff0c;计算base的exponent次幂&#xff0c;并打印输出对应的结果。&#xff08;注意底和幂指数都可能是负数&#xff09; 提示&#xff1a;求幂运算时&#xff0c;基础的思路就是先无脑把指数转…