RocketMq架构和源码解析

NameServer作为注册中心,提供路由注册、路由踢出、路由发现功能,舍弃强一致,保证高可用,集群中各个节点不会实时通讯,其中一个节点下线之后,会提供另外一个节点保证路由功能。

Broker:消息中转角色,负责存储消息、转发消息。Broker服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。Broker服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

Producer:负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

Consumer:负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

除了上面说的三个核心组件外,还有Topic这个概念下面也会多次提到:

Topic:表示一类消息的集合,每个Topic包含若干条消息,每条消息只能属于一个Topic,是RocketMQ进行消息订阅的基本单位。一个Topic可以分片在多个Broker集群上,每一个Topic分片包含多个queue

 NameServer架构

 启动入口:

org.apache.rocketmq.namesrv.NamesrvController#initialize

 public boolean initialize() {loadConfig();initiateNetworkComponents();initiateThreadExecutors();registerProcessor();startScheduleService();initiateSslContext();initiateRpcHooks();return true;}

路由注册

Broker服务器在启动的时候会想NameServer集群中所有的NameServer发送心跳信号进行注册,并会每隔30秒向nameserver发送心跳,告诉NameServer自己活着。NameServer接收到Broker发送的心跳包之后,会记录该broker信息,并保存最近一次收到心跳包的时间。

Broker服务发送心跳包,对外提供Topic配置

org.apache.rocketmq.broker.BrokerController#start

org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll

    public void start() throws Exception {this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster() || this.brokerConfig.isEnableControllerMode()) {isIsolated = true;}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}startBasicService();if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);this.registerBrokerAll(true, false, true);}scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run2() {try {if (System.currentTimeMillis() < shouldStartTime) {BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);return;}if (isIsolated) {BrokerController.LOG.info("Skip register for broker is isolated");return;}BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {BrokerController.LOG.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));if (this.brokerConfig.isEnableSlaveActingMaster()) {scheduleSendHeartbeat();scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run2() {try {BrokerController.this.syncBrokerMemberGroup();} catch (Throwable e) {BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);}}}, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));}if (this.brokerConfig.isEnableControllerMode()) {scheduleSendHeartbeat();}if (brokerConfig.isSkipPreOnline()) {startServiceWithoutCondition();}}public List<RegisterBrokerResult> registerBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final boolean oneway,final int timeoutMills,final boolean enableActingMaster,final boolean compressed,final Long heartbeatTimeoutMillis,final BrokerIdentity brokerIdentity) {final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setEnableActingMaster(enableActingMaster);requestHeader.setCompressed(false);if (heartbeatTimeoutMillis != null) {requestHeader.setHeartbeatTimeoutMillis(heartbeatTimeoutMillis);}RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {@Overridepublic void run2() {try {RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);if (result != null) {registerBrokerResultList.add(result);}LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr);} catch (Exception e) {LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills);}} catch (InterruptedException ignore) {}}return registerBrokerResultList;}

NameServer注册broker信息,集群名称、broker名称、ha注册中心地址、超时时间

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker

    public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final String zoneName,final Long timeoutMillis,final Boolean enableActingMaster,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {this.lock.writeLock().lockInterruptibly();//init or update the cluster infoSet<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());brokerNames.add(brokerName);boolean registerFirst = false;BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());this.brokerAddrTable.put(brokerName, brokerData);}boolean isOldVersionBroker = enableActingMaster == null;brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);brokerData.setZoneName(zoneName);Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();boolean isMinBrokerIdChanged = false;long prevMinBrokerId = 0;if (!brokerAddrsMap.isEmpty()) {prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());}if (brokerId < prevMinBrokerId) {isMinBrokerIdChanged = true;}//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTablebrokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());//If Local brokerId stateVersion bigger than the registering one,String oldBrokerAddr = brokerAddrsMap.get(brokerId);if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));if (null != oldBrokerInfo) {long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();if (oldStateVersion > newStateVersion) {log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +"Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);//Remove the rejected brokerAddr from brokerLiveTable.brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));return result;}}}if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);return null;}String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));boolean isMaster = MixAll.MASTER_ID == brokerId;boolean isPrimeSlave = !isOldVersionBroker && !isMaster&& brokerId == Collections.min(brokerAddrsMap.keySet());if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,topicConfigWrapper.getDataVersion(), brokerName,entry.getValue().getTopicName())) {final TopicConfig topicConfig = entry.getValue();if (isPrimeSlave) {// Wipe write perm for prime slavetopicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));}this.createAndUpdateQueueData(brokerName, topicConfig);}}}if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();//the topicQueueMappingInfoMap should never be null, but can be emptyfor (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());}//Note asset brokerName equal entry.getValue().getBname()//here use the mappingDetail.bnametopicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());}}}BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,new BrokerLiveInfo(System.currentTimeMillis(),timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);}if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddrInfo);} else {this.filterServerTable.put(brokerAddrInfo, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);if (masterLiveInfo != null) {result.setHaServerAddr(masterLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {notifyMinBrokerIdChanged(brokerAddrsMap, null,this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());}} catch (Exception e) {log.error("registerBroker Exception", e);} finally {this.lock.writeLock().unlock();}return result;}

路由剔除:NameServer和每个Broker保持长连接,每隔30秒接收Broker发送的心跳包,同时自身每个10秒扫描BrokerLiveTable,比较上次收到心跳时间和当前时间比较是否大于120秒,如果超过,那么认为Broker不可用,剔除路由表中该Broker相关信息。

路由发现:路由发现不是实时的,路由变化后,NameServer不主动推给客户端,等待producer定期拉取最新路由信息。这样的设计方式降低了NameServer实现的复杂性,当路由发生变化时通过在消息发送端的容错机制来保证消息发送的高可用(这块内容会在后续介绍producer消息发送时介绍,本文不展开讲解)。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/170763.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vatee万腾的科技征途:Vatee独特探索的数字化力量

在数字化时代的浪潮中&#xff0c;Vatee万腾以其独特的科技征途成为引领者。公司在数字化领域的探索之路不仅是技术的创新&#xff0c;更是一种对未知的勇敢涉足&#xff0c;是对新时代的深刻洞察和积极实践。 Vatee万腾通过独特的探索&#xff0c;展示了在数字化征途上的创新力…

Matplotlib子图的创建_Python数据分析与可视化

Matplotlib子图的创建 plt.axes创建子图fig.add_axes()创建子图 plt.axes创建子图 前面已经介绍过plt.axes函数&#xff0c;这个函数默认配置是创建一个标准的坐标轴&#xff0c;填满整张图。 它还有一个可选的参数&#xff0c;由图形坐标系统的四个值构成。这四个值表示为坐…

Spine深入学习 —— 数据

atlas数据的处理 作用 图集&#xff0c;描述了spine使用的图片信息。 结构 page 页块 页块包含了页图像名称, 以及加载和渲染图像的相关信息。 page1.pngsize: 640, 480format: RGBA8888filter: Linear, Linearrepeat: nonepma: truename: 首行为该页中的图像名称. 图片位…

Python武器库开发-前端篇之CSS盒模型(三十一)

前端篇之CSS盒模型(三十一) CSS盒模型是指网页中的每个元素可以看做是一个矩形盒子&#xff0c;该盒子有四个主要部分组成&#xff1a;content、padding、border和margin。其中&#xff1a; content&#xff1a;指盒子中的内容区域&#xff0c;可以包含文本、图像、视频、其他…

RedLock底层源码分析

RedLock底层源码分析 一、Redlock红锁算法 https://redis.io/docs/manual/patterns/distributed-locks/官网说明 1、为什么要学习这个&#xff1f;怎么产生的&#xff1f; ​ 一个很直接的问题&#xff0c;当我使用redis锁的那台机器挂了&#xff0c;出现了单点故障了&#…

游戏开发引擎Cocos Creator和Unity如何对接广告-AdSet聚合广告平台

在游戏开发方面&#xff0c;游戏引擎的选择对开发过程和最终的产品质量有着重大的影响&#xff0c;Unity和Cocos是目前全球两大商用、通用交互内容开发工具&#xff0c;这两款引擎受到广泛关注&#xff0c;本文将从多个维度对两者进行比较&#xff0c;为开发者提供正确的选择建…

84基于matlab的数字图像处理

基于matlab的数字图像处理&#xff0c;数据可更换自己的&#xff0c;程序已调通&#xff0c;可直接运行。 84matlab数字图像处理图像增强 (xiaohongshu.com)https://www.xiaohongshu.com/explore/656219d80000000032034dea

二进制数据转换成十六进制表示 binascii.hexlify()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 二进制数据转换成十六进制表示 binascii.hexlify() 选择题 binascii.hexlify()参数的数据类型可以是&#xff1f; import binascii number 11 byte_data number.to_bytes() hex_data bin…

Day45:300.最长递增子序列、674. 最长连续递增序列、718. 最长重复子数组

文章目录 300.最长递增子序列思路代码实现 674. 最长连续递增序列思路代码实现 718. 最长重复子数组思路代码实现 300.最长递增子序列 题目链接 思路 单个字符都是一个长为1的子序列&#xff0c;直接初始化dp为1。先固定一个元素位置i&#xff0c;判断0-i范围内到i的最长子序…

数字图像处理-Matlab实验

实验一 图像增强 实验内容: 对于给定的低对比度测试图像,利用灰度图像直方图均衡化算法进行图像视觉效果增强。 对于给定的低照度彩色测试图像,结合颜色空间转换和灰度图像直方图均衡化算法进行图像视觉效果增强。 实验数据: Test1_1.jpg: Test1_2.jpg: 实验步骤: %% …

Linux加强篇006-存储结构与管理硬盘

目录 前言 1. 从“/”开始 2. 物理设备命名规则 3. 文件系统与数据资料 4. 挂载硬件设备 5. 添加硬盘设备 6. 添加交换分区 7. 磁盘容量配额 8. VDO虚拟数据优化 9. 软硬方式链接 前言 悟已往之不谏&#xff0c;知来者之可追。实迷途其未远&#xff0c;觉今是而昨非…

C#,《小白学程序》第二十四课:大数的阶乘(BigInteger Factorial)算法与源程序

1 文本格式 /// <summary> /// 《小白学程序》第二十四课&#xff1a;大数&#xff08;BigInteger&#xff09;的阶乘 /// 用于大数的阶乘算法&#xff08;原始算法&#xff09; /// </summary> /// <param name"a"></param> /// <retur…

黑马点评-Feed流的实现方案,基于推拉结合模式实现笔记推送

Feed流实现方案 我们关注了博主之后,当用户发布了动态后我们应该把这些数据推送给粉丝,关注推送也叫作Feed(投喂)流,通过无限下拉刷新获取新的信息 传统的模式内容检索: 粉丝需要主动通过搜索引擎或者是其他方式去查找想看的内容新型Feed流的效果: 系统分析用户到底想看什么,…

【UGUI】中Content Size Fitter)组件-使 UI 元素适应其内容的大小

官方文档&#xff1a;使 UI 元素适应其内容的大小 - Unity 手册 必备组件&#xff1a;Content Size Fitter 通常&#xff0c;在使用矩形变换定位 UI 元素时&#xff0c;应手动指定其位置和大小&#xff08;可选择性地包括使用父矩形变换进行拉伸的行为&#xff09;。 但是&a…

突破技术障碍:软件工程师如何应对项目中的难题?

在软件开发项目中&#xff0c;工程师常常会遇到各种技术难题。这些难题可能涉及到复杂的算法、不兼容的系统、难以预见的软件行为&#xff0c;或者其他许多方面。 以下是一些策略和方法&#xff0c;可以帮助软件工程师有效地应对这些挑战&#xff1a; 1、理解问题&#xff1a;…

count=0语句的位置

简洁一点的代码&#xff1a; 像count0这种语句要注意放好位置&#xff0c;尤其是在循环里。

SAP Smartform小结

SAP系统做打印单据用的, 感觉很不好用, 特别是要嵌入韩文时必须使用嵌入的word编辑器,运行速度简直不可忍受. 见过一些Adobe interactive form的示例, 看着相当不错, 不过据说需要花money额外买licence, 哪有smartform这种免费东西来得实惠. 一般打印需求,会要求有标题抬头,打…

TikTok 将开源“云中和”边缘加速器

“从某种意义上说&#xff0c;我们正在努力破解云的骨干网&#xff0c;以造福于我们&#xff0c;”TikTok产品管理基础设施经理Vikram Siwach指出&#xff0c;他解释了该公司即将开源的“全球服务加速器”的好处&#xff0c;这是一个可编程的边缘平台&#xff0c;可将应用程序需…

Linux常用命令——bg命令

在线Linux命令查询工具 bg 用于将作业放到后台运行 补充说明 bg命令用于将作业放到后台运行&#xff0c;使前台可以执行其他任务。该命令的运行效果与在指令后面添加符号&amp;的效果是相同的&#xff0c;都是将其放到系统后台执行。 在Linux系统中执行某些操作时候&…

【通讯协议】gRPC和Webhook

RPC&#xff08;Remote procedure Call&#xff09;之所以被称为“远程”&#xff0c;是因为在微服务架构下&#xff0c;当服务部署到不同的服务器上时&#xff0c;它可以实现远程服务之间的通信。从用户的角度来看&#xff0c;它的作用就像本地函数调用。 下图说明了gRPC的整…