RocketMq架构和源码解析

NameServer作为注册中心,提供路由注册、路由踢出、路由发现功能,舍弃强一致,保证高可用,集群中各个节点不会实时通讯,其中一个节点下线之后,会提供另外一个节点保证路由功能。

Broker:消息中转角色,负责存储消息、转发消息。Broker服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。Broker服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

Producer:负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

Consumer:负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

除了上面说的三个核心组件外,还有Topic这个概念下面也会多次提到:

Topic:表示一类消息的集合,每个Topic包含若干条消息,每条消息只能属于一个Topic,是RocketMQ进行消息订阅的基本单位。一个Topic可以分片在多个Broker集群上,每一个Topic分片包含多个queue

 NameServer架构

 启动入口:

org.apache.rocketmq.namesrv.NamesrvController#initialize

 public boolean initialize() {loadConfig();initiateNetworkComponents();initiateThreadExecutors();registerProcessor();startScheduleService();initiateSslContext();initiateRpcHooks();return true;}

路由注册

Broker服务器在启动的时候会想NameServer集群中所有的NameServer发送心跳信号进行注册,并会每隔30秒向nameserver发送心跳,告诉NameServer自己活着。NameServer接收到Broker发送的心跳包之后,会记录该broker信息,并保存最近一次收到心跳包的时间。

Broker服务发送心跳包,对外提供Topic配置

org.apache.rocketmq.broker.BrokerController#start

org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll

    public void start() throws Exception {this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster() || this.brokerConfig.isEnableControllerMode()) {isIsolated = true;}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}startBasicService();if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);this.registerBrokerAll(true, false, true);}scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run2() {try {if (System.currentTimeMillis() < shouldStartTime) {BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);return;}if (isIsolated) {BrokerController.LOG.info("Skip register for broker is isolated");return;}BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {BrokerController.LOG.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));if (this.brokerConfig.isEnableSlaveActingMaster()) {scheduleSendHeartbeat();scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run2() {try {BrokerController.this.syncBrokerMemberGroup();} catch (Throwable e) {BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);}}}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));}if (this.brokerConfig.isEnableControllerMode()) {scheduleSendHeartbeat();}if (brokerConfig.isSkipPreOnline()) {startServiceWithoutCondition();}}public List<RegisterBrokerResult> registerBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final boolean oneway,final int timeoutMills,final boolean enableActingMaster,final boolean compressed,final Long heartbeatTimeoutMillis,final BrokerIdentity brokerIdentity) {final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setEnableActingMaster(enableActingMaster);requestHeader.setCompressed(false);if (heartbeatTimeoutMillis != null) {requestHeader.setHeartbeatTimeoutMillis(heartbeatTimeoutMillis);}RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {@Overridepublic void run2() {try {RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);if (result != null) {registerBrokerResultList.add(result);}LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr);} catch (Exception e) {LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills);}} catch (InterruptedException ignore) {}}return registerBrokerResultList;}

NameServer注册broker信息,集群名称、broker名称、ha注册中心地址、超时时间

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker

    public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final String zoneName,final Long timeoutMillis,final Boolean enableActingMaster,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {this.lock.writeLock().lockInterruptibly();//init or update the cluster infoSet<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());brokerNames.add(brokerName);boolean registerFirst = false;BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());this.brokerAddrTable.put(brokerName, brokerData);}boolean isOldVersionBroker = enableActingMaster == null;brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);brokerData.setZoneName(zoneName);Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();boolean isMinBrokerIdChanged = false;long prevMinBrokerId = 0;if (!brokerAddrsMap.isEmpty()) {prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());}if (brokerId < prevMinBrokerId) {isMinBrokerIdChanged = true;}//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTablebrokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());//If Local brokerId stateVersion bigger than the registering one,String oldBrokerAddr = brokerAddrsMap.get(brokerId);if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));if (null != oldBrokerInfo) {long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();if (oldStateVersion > newStateVersion) {log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +"Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);//Remove the rejected brokerAddr from brokerLiveTable.brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));return result;}}}if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);return null;}String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));boolean isMaster = MixAll.MASTER_ID == brokerId;boolean isPrimeSlave = !isOldVersionBroker && !isMaster&& brokerId == Collections.min(brokerAddrsMap.keySet());if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,topicConfigWrapper.getDataVersion(), brokerName,entry.getValue().getTopicName())) {final TopicConfig topicConfig = entry.getValue();if (isPrimeSlave) {// Wipe write perm for prime slavetopicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));}this.createAndUpdateQueueData(brokerName, topicConfig);}}}if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();//the topicQueueMappingInfoMap should never be null, but can be emptyfor (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());}//Note asset brokerName equal entry.getValue().getBname()//here use the mappingDetail.bnametopicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());}}}BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,new BrokerLiveInfo(System.currentTimeMillis(),timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);}if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddrInfo);} else {this.filterServerTable.put(brokerAddrInfo, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);if (masterLiveInfo != null) {result.setHaServerAddr(masterLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {notifyMinBrokerIdChanged(brokerAddrsMap, null,this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());}} catch (Exception e) {log.error("registerBroker Exception", e);} finally {this.lock.writeLock().unlock();}return result;}

路由剔除:NameServer和每个Broker保持长连接,每隔30秒接收Broker发送的心跳包,同时自身每个10秒扫描BrokerLiveTable,比较上次收到心跳时间和当前时间比较是否大于120秒,如果超过,那么认为Broker不可用,剔除路由表中该Broker相关信息。

路由发现:路由发现不是实时的,路由变化后,NameServer不主动推给客户端,等待producer定期拉取最新路由信息。这样的设计方式降低了NameServer实现的复杂性,当路由发生变化时通过在消息发送端的容错机制来保证消息发送的高可用(这块内容会在后续介绍producer消息发送时介绍,本文不展开讲解)。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/170763.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vatee万腾的科技征途:Vatee独特探索的数字化力量

在数字化时代的浪潮中&#xff0c;Vatee万腾以其独特的科技征途成为引领者。公司在数字化领域的探索之路不仅是技术的创新&#xff0c;更是一种对未知的勇敢涉足&#xff0c;是对新时代的深刻洞察和积极实践。 Vatee万腾通过独特的探索&#xff0c;展示了在数字化征途上的创新力…

数据结构与算法之二叉树: LeetCode 101. 对称二叉树 (Typescript版)

对称二叉树 https://leetcode.cn/problems/symmetric-tree/ 描述 给你一个二叉树的根节点 root &#xff0c; 检查它是否轴对称。 示例 1 1/ | \2 | 2/ \ | / \3 4 | 4 3中间一条线是对称轴 输入&#xff1a;root [1,2,2,3,4,4,3] 输出&#xff1a;true示例 …

Matplotlib子图的创建_Python数据分析与可视化

Matplotlib子图的创建 plt.axes创建子图fig.add_axes()创建子图 plt.axes创建子图 前面已经介绍过plt.axes函数&#xff0c;这个函数默认配置是创建一个标准的坐标轴&#xff0c;填满整张图。 它还有一个可选的参数&#xff0c;由图形坐标系统的四个值构成。这四个值表示为坐…

Spine深入学习 —— 数据

atlas数据的处理 作用 图集&#xff0c;描述了spine使用的图片信息。 结构 page 页块 页块包含了页图像名称, 以及加载和渲染图像的相关信息。 page1.pngsize: 640, 480format: RGBA8888filter: Linear, Linearrepeat: nonepma: truename: 首行为该页中的图像名称. 图片位…

Python武器库开发-前端篇之CSS盒模型(三十一)

前端篇之CSS盒模型(三十一) CSS盒模型是指网页中的每个元素可以看做是一个矩形盒子&#xff0c;该盒子有四个主要部分组成&#xff1a;content、padding、border和margin。其中&#xff1a; content&#xff1a;指盒子中的内容区域&#xff0c;可以包含文本、图像、视频、其他…

RedLock底层源码分析

RedLock底层源码分析 一、Redlock红锁算法 https://redis.io/docs/manual/patterns/distributed-locks/官网说明 1、为什么要学习这个&#xff1f;怎么产生的&#xff1f; ​ 一个很直接的问题&#xff0c;当我使用redis锁的那台机器挂了&#xff0c;出现了单点故障了&#…

游戏开发引擎Cocos Creator和Unity如何对接广告-AdSet聚合广告平台

在游戏开发方面&#xff0c;游戏引擎的选择对开发过程和最终的产品质量有着重大的影响&#xff0c;Unity和Cocos是目前全球两大商用、通用交互内容开发工具&#xff0c;这两款引擎受到广泛关注&#xff0c;本文将从多个维度对两者进行比较&#xff0c;为开发者提供正确的选择建…

Rust10 Building a Multithreaded Web Server [End]

Rust学习笔记 Rust编程语言入门教程课程笔记 参考教材: The Rust Programming Language (by Steve Klabnik and Carol Nichols, with contributions from the Rust Community) Lecture 20: Final Project: Building a Multithreaded Web Server src/main.rs use std::fs; …

84基于matlab的数字图像处理

基于matlab的数字图像处理&#xff0c;数据可更换自己的&#xff0c;程序已调通&#xff0c;可直接运行。 84matlab数字图像处理图像增强 (xiaohongshu.com)https://www.xiaohongshu.com/explore/656219d80000000032034dea

python小数据分析小结及算法实践集锦

在缺乏大量历史数据的新兴技术和产业中&#xff0c;商业分析可能会面临一些挑战。然而&#xff0c;有一些技术和方法可以帮助分析者在数据不充分的情况下进行科学化商业分析&#xff0c;并为决策提供支持。 1. 当面对缺乏大量历史数据的新兴技术和产业时所采常用的技术和方法 …

二进制数据转换成十六进制表示 binascii.hexlify()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 二进制数据转换成十六进制表示 binascii.hexlify() 选择题 binascii.hexlify()参数的数据类型可以是&#xff1f; import binascii number 11 byte_data number.to_bytes() hex_data bin…

Android : Java中创建线程的几种方式_简单应用

主方法 MainTest.java package com.example.mythread;import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask;public class MainTest {public static void main(String[] data){ // 以下的方…

C#面试问题整理

sqlserver中视图和表的区别 在 SQL Server 中&#xff0c;视图&#xff08;View&#xff09;和表&#xff08;Table&#xff09;是不同的对象&#xff0c;它们有以下几点区别&#xff1a; 数据存储方式&#xff1a;表是一种实际存储数据的数据库对象&#xff0c;它包含列和行&…

Day45:300.最长递增子序列、674. 最长连续递增序列、718. 最长重复子数组

文章目录 300.最长递增子序列思路代码实现 674. 最长连续递增序列思路代码实现 718. 最长重复子数组思路代码实现 300.最长递增子序列 题目链接 思路 单个字符都是一个长为1的子序列&#xff0c;直接初始化dp为1。先固定一个元素位置i&#xff0c;判断0-i范围内到i的最长子序…

数字图像处理-Matlab实验

实验一 图像增强 实验内容: 对于给定的低对比度测试图像,利用灰度图像直方图均衡化算法进行图像视觉效果增强。 对于给定的低照度彩色测试图像,结合颜色空间转换和灰度图像直方图均衡化算法进行图像视觉效果增强。 实验数据: Test1_1.jpg: Test1_2.jpg: 实验步骤: %% …

谈谈Redis持久化

目录 前言 RDB AOF 总结 前言 我们都知道Redis 是基于内存的数据库&#xff0c;一旦服务器的进程退出&#xff0c;数据库数据就会随之丢失&#xff0c;这不是我们想看到的&#xff0c;为了避免这个问题&#xff0c;Redis 为我们提供了俩种持久化方案&#xff0c;将数据保存…

Linux加强篇006-存储结构与管理硬盘

目录 前言 1. 从“/”开始 2. 物理设备命名规则 3. 文件系统与数据资料 4. 挂载硬件设备 5. 添加硬盘设备 6. 添加交换分区 7. 磁盘容量配额 8. VDO虚拟数据优化 9. 软硬方式链接 前言 悟已往之不谏&#xff0c;知来者之可追。实迷途其未远&#xff0c;觉今是而昨非…

C#,《小白学程序》第二十四课:大数的阶乘(BigInteger Factorial)算法与源程序

1 文本格式 /// <summary> /// 《小白学程序》第二十四课&#xff1a;大数&#xff08;BigInteger&#xff09;的阶乘 /// 用于大数的阶乘算法&#xff08;原始算法&#xff09; /// </summary> /// <param name"a"></param> /// <retur…

【LeetCode】121. 买卖股票的最佳时机

121. 买卖股票的最佳时机 难度&#xff1a;简单 题目 给定一个数组 prices &#xff0c;它的第 i 个元素 prices[i] 表示一支给定股票第 i 天的价格。 你只能选择 某一天 买入这只股票&#xff0c;并选择在 未来的某一个不同的日子 卖出该股票。设计一个算法来计算你所能获…

做直播服务器要什么样的配置呢?

现在直播行业越来越火爆&#xff0c;大大小小的平台或者企业都选择通过直播卖货的方式出售产品&#xff0c;直播的内容还有观看直播的人数等等都影响了服务器的配置需求&#xff0c;今天小编就给大家讲一讲吧&#xff01; 1、内存&#xff1a;直播服务器需要足够的内存才能支持…