前言
RocketMQ架构上主要分为四部分, Broker、Producer、Consumer、NameServer,其他三个都会与NameServer进行通信。
Producer:
**消息发布的角色,可集群部署。**通过NameServer集群获得Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等。(Producer只会将消息发送到Master节点,因此只需与Master节点建立连接)。
Consumer:
**消息消费的角色,可集群部署。**通过NameServer集群获得Topic的路由信息,连接到对应的Broker上拉取和消费消息。(Master和Slave都可以拉取消息,因此Consumer会与Master和Slave都建立连接)。
Broker:
主要负责消息的存储、投递和查询以及服务高可用保证。
介绍
NameServer 是专为 RocketMQ 设计的轻量级名称服务,也是一个简单的Topic路由注册中心。具有简单、可集群横向扩展、无状态,节点之间互不通信等特点。
NameServer 通常以集群的方式部署,各实例间相互不进行信息通讯,只是互为备份,达到高可用的效果。
NameServer具有两大功能:Broker管理和topic路由信息管理。
Broker管理:
**NameServer接受Broker的注册请求,处理请求数据作为路由信息的基础数据。**对broker进行心跳检测机制,检测是否还存活(120s);
Topic路由信息管理:
每个NameServer都保存整个Broker集群的路由信息,用于Producer和Conumser查询的路由信息,从而进行消息的投递和消费。
NameServer 仅仅处理其他模块的请求,而不会主动向其他模块发起请求。它其实本质上就是一个 NettyServer,它主要有 3 个模块:Topic 路由管理模块(RouteInfoManager
)、通信模块(DefaultRequestProcessor
、ClusterTestRequestProcessor
)、KV 数据存储模块(KVConfigManager
)。
RouteInfoManager
**RouteInfoManager 中存储 5 个 HashMap
,这就是 NameServer 中主要存储的数据。它们仅存在于内存中,并不会持久化。**其中数据内容如下:
- **topicQueueTable:保存 Topic 的队列信息,也是真正的路由信息。**队列信息中包含了其所在的 Broker 名称和读写队列数量。
- brokerAddrTable:保存 Broker 信息,包含其名称、集群名称、主备 Broker 地址。
- clusterAddrTable:保存 Cluster信息,包含每个集群中所有的 Broker 名称列表。
- brokerLiveTable:Broker 状态信息,包含当前所有存活的 Broker,和它们最后一次上报心跳的时间。
- filterServerTable:Broker 上的 FilterServer 列表,用于类模式消息过滤,该机制在 4.4 版本后被废弃。
RequestProcessor 继承了 AsyncNettyRequestProcessor。作为 NameServer 的请求处理器,根据不同种类的请求做不同类型的处理。 其中
KV_CONFIG
类型的请求用于 KVConfig 模块,当前不会用到。其他请求类型由 Broker 和 Producer、Consumer 发起。 KVConfigManager 内部保存了一个二级
HashMap
:configTable
,并且会将该对象进行持久化。
心跳机制
Broker
- 每隔 30s 向 NameServer 集群的每台机器都发送心跳包,包含自身 Topic 队列的路由信息。
- 当有 Topic 改动(创建/更新),Broker 会立即发送 Topic 增量信息到 NameServer,同时触发 NameServer 的数据版本号发生变更(+1)。
NameServer
- 将路由信息保存在内存中。它只被其他模块调用(被 Broker 上传,被客户端拉取),不会主动调用其他模块。
- 启动一个定时任务线程,每隔 10s 扫描 brokerAddrTable 中所有的 Broker 上次发送心跳时间,如果超过 120s 没有收到心跳,则从存活 Broker 表中移除该 Broker。
Client
- 生产者第一次发送消息时,向 NameServer 拉取该 Topic 的路由信息。
- 消费者启动过程中会向 NameServer 请求 Topic 路由信息。
- 每隔 30s 向 NameServer 发送请求,获取它们要生产/消费的 Topic 的路由信息。
启动流程
- 由启动脚本调用
NamesrvStartup#main
函数触发启动流程 NamesrvStartup#createNamesrvController
函数中先解析命令行参数,然后初始化 NameServer 和 Netty remote server 配置,最后创建NamesrvController
的实例。NamesrvStartup#start
初始化NamesrvController
;调用NamesrvController#start()
方法,启动 Netty remoting server;最后注册关闭钩子函数,在 JVM 线程关闭之前,关闭 Netty remoting server 和处理线程池,关闭定时任务线程。
NamesrvController
实例是 NameServer 的核心控制器,它的初始化方法 initialize()
先加载 KVConfig manager,然后初始化 Netty remoting server。最后添加 2 个定时任务:一个每 10s 打印一次 KV 配置,一个每 10s 扫描 Broker 列表,移除掉线的 Broker。
为什么选择重新开发一个NameServer?
RocketMQ的架构设计决定了只需一个轻量级的元数据服务器,只需保持最终一致,所以AP模式直接pass。
NameServer互相独立,彼此没有通信关系,由于Broker向每个NameServer注册自己的路由信息,所以每个NameServer都保存一份完整的路由信息。
单台NameServer挂掉,Broker仍然可以向其它NameServer同步路由信息,不影响其他NameServer,所以Producer,Consumer仍然可以动态感知Broker的路由的信息。
源码分析
NameServer的路由数据来源是broker注册提供,然后内部加工处理,而路由的数据的使用者是producer和consumer。
1、路由数据结构
RouteInfoManager是NameServer核心逻辑类,其代码作用就是维护路由信息管理,提供路由注册/查询等核心功能。
由于路由信息都是保存在NameServer应用内存里,其本质就是维护HashMap,而为了防止并发操作,添加了ReentrantReadWriteLock读写锁。
RouteInfoManager源码结构如下:
QueueData 属性解析:
/*** 队列信息*/
public class QueueData implements Comparable<QueueData> {// 队列所属的Broker名称private String brokerName;// 读队列数量 默认:16private int readQueueNums;// 写队列数量 默认:16private int writeQueueNums;//todo Topic的读写权限(2是写 4是读 6是读写)private int perm;/** 同步复制还是异步复制--对应TopicConfig.topicSysFlag* {@link org.apache.rocketmq.common.sysflag.TopicSysFlag}*/private int topicSynFlag;...省略...}
map: topicQueueTable 数据格式demo(json):
{"TopicTest":[{"brokerName":"broker-a","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4}]
}
BrokerData 属性解析:
/*** broker的数据:Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0 表示Master,非0表示Slave。*/
public class BrokerData implements Comparable<BrokerData> {// broker所属集群private String cluster;// brokerNameprivate String brokerName;// 同一个brokerName下可以有一个Master和多个Slave,所以brokerAddrs是一个集合// brokerld=O表示 Master,大于 O表示从 Slaveprivate HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;// 用于查找broker地址private final Random random = new Random();...省略...}
map: brokerAddrTable 数据格式demo(json):
{"broker-a":{"brokerAddrs":{"0":"172.16.62.75:10911"},"brokerName":"broker-a","cluster":"DefaultCluster"}
}
BrokerLiveInfo 属性解析:
/*** 存放存活的Broker信息,当前存活的 Broker,该信息不是实时的,NameServer 每10S扫描一次所有的 broker,根据心跳包的时间得知 broker的状态,* 该机制也是导致当一个 Broker 进程假死后,消息生产者无法立即感知,可能继续向其发送消息,导致失败(非高可用)*/
class BrokerLiveInfo {//最后一次更新时间private long lastUpdateTimestamp;//版本号信息private DataVersion dataVersion;//Netty的Channelprivate Channel channel;//HA Broker的地址 是Slave从Master拉取数据时链接的地址,由brokerIp2+HA端口构成private String haServerAddr;...省略...}
map: brokerLiveTable 数据格式demo(json):
{"172.16.62.75:10911":{"channel":{"active":true,"inputShutdown":false,"open":true,"outputShutdown":false,"registered":true,"writable":true},"dataVersion":{"counter":2,"timestamp":1630907813571},"haServerAddr":"172.16.62.75:10912","lastUpdateTimestamp":1630907814074}
}
brokerAddrTable -Map 数据格式demo(json)
{"DefaultCluster":["broker-a"]}
从RouteInfoManager维护的HashMap数据结构和QueueData、BrokerData、BrokerLiveInfo类属性得知,NameServer维护的信息既简单但极其重要。
2、路由注册
路由注册流程:
roker主动注册的几种情况:
- 启动时向集群中所有的NameServer注册
- 定时30s向集群中所有NameServer发送心跳包注册
- 当broker中topic信息发送变更(新增/修改/删除)发送心跳包注册。
NameServer中注册的核心处理逻辑是RouteInfoManager#registerBroker
public RegisterBrokerResult registerBroker(final String clusterName, final String brokerAddr,final String brokerName, final long brokerId,final String haServerAddr,//TopicConfigSerializeWrapper比较复杂的数据结构,主要包含了broker上所有的topic信息final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList, final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {try {this.lock.writeLock().lockInterruptibly(); // 锁//1: 此处维护 clusterAddrTable 集群元数据Set<String> brokerNames = this.clusterAddrTable.get(clusterName);if (null == brokerNames) {brokerNames = new HashSet<String>();this.clusterAddrTable.put(clusterName, brokerNames);}brokerNames.add(brokerName);//2:此处维护 brokerAddrTable broker元数据boolean registerFirst = false;//是否第一次注册(如果Topic配置信息发生变更或者该broker为第一次注册)BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());this.brokerAddrTable.put(brokerName, brokerData);}//3: 此处维护 topicQueueTable 主题队列数据,数据更新操作方法在:createAndUpdateQueueDataString oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);registerFirst = registerFirst || (null == oldAddr);if (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) { //小知识点:只处理主节点请求,因为备节点的topic信息是同步主节点的// 如果Topic配置信息发生变更或者该broker为第一次注册if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}//4: 此处维护:brokerLiveTable数据,关键点:BrokerLiveInfo构造器第一个参数:System.currentTimeMillis(),用于存活判断BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}//5-维护:filterServerTable 数据if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}//返回值(如果当前broker为slave节点)则将haServerAddr、masterAddr等信息设置到result返回值中if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("registerBroker Exception", e);}return result;
}
从源码中分析,Broker注册的路由信息对于NameServer来说,其实就是维护clusterAddrTable、brokerAddrTable、topicQueueTable、brokerLiveTable、filterServerTable。
3、路由删除
**路由删除有两种方式,一种是broker主动上报删除,一种是NameServer主动删除。**从根本上来说,是删除clusterAddrTable、brokerAddrTable、topicQueueTable、brokerLiveTable、filterServerTable相关信息.
1)Broker主动上报删除
Broker在正常被关闭的情况下,会执行unregisterBroker指令。向NameServer发送取消注册请求,之后执行删除信息操作(加写锁)。
1-1)直接删除 brokerLiveTable 信息,无需判断时间
1-2)删除 filterServerTable 信息
1-3)维护删除 brokerAddrTable 信息
1-4)维护删除 clusterAddrTable 信息
1-5)根据 BrokerName,从 topicQueueTable
中移除该 Broker 的队列。也就是维护删除 topicQueueTable 信息
2)NameServer主动删除:
NameServer定时(10s)扫描brokerLiveTable,检测上次心跳包与当前系统时间的时间差,如果时间戳大于120s,则需要移除该Broker信息。
2-1)判断是否有时间戳大于120s的,若是有,则执行移除操作
2-2)移除前,加读锁,查询需要删除的broker信息。
2-3)根据broker信息brokerAddrFound(加写锁),删除相关信息brokerLiveTable、filterServerTable、brokerAddrTable、clusterAddrTable、topicQueueTable 信息。
4、路由发现
当Topic路由出现变化后,NameServer不会主动推送给客户端,而是由生产端和消费端定时拉取主题最新的路由,所以路由信息非实时的。NameServer 收到客户端获取路由信息请求后,调用 DefaultRequestProcessor#getRouteInfoByTopic()
方法,返回 Topic 路由信息。
源码中,实际上是调用 RouteInfoManager#pickupTopicRouteData()
方法从topicQueueTable、brokerAddrTable、filterServerTable这些Map中查询数据,组装给TopicRouteData,然后返回给客户端使用。
如果该主题为顺序消息,从 KVConfig 中获取顺序消息相关的配置,填充进 TopicRouteData
对象。之后将 TopicRouteData
对象编码,并返回给客户端。
RouteInfoManager#pickupTopicRouteData
public TopicRouteData pickupTopicRouteData(final String topic) {TopicRouteData topicRouteData = new TopicRouteData();boolean foundQueueData = false;boolean foundBrokerData = false;Set<String> brokerNameSet = new HashSet<String>();List<BrokerData> brokerDataList = new LinkedList<BrokerData>();topicRouteData.setBrokerDatas(brokerDataList);HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();topicRouteData.setFilterServerTable(filterServerMap);try {try {this.lock.readLock().lockInterruptibly();List<QueueData> queueDataList = this.topicQueueTable.get(topic);if (queueDataList != null) {topicRouteData.setQueueDatas(queueDataList);foundQueueData = true;Iterator<QueueData> it = queueDataList.iterator();while (it.hasNext()) {QueueData qd = it.next();brokerNameSet.add(qd.getBrokerName());}// 处理构建:BrokerData数据for (String brokerName : brokerNameSet) {BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null != brokerData) {BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());brokerDataList.add(brokerDataClone);foundBrokerData = true;for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {List<String> filterServerList = this.filterServerTable.get(brokerAddr);filterServerMap.put(brokerAddr, filterServerList);}}}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("pickupTopicRouteData Exception", e);}log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);if (foundBrokerData && foundQueueData) {return topicRouteData;}return null;
}public class TopicRouteData extends RemotingSerializable {//topic排序的配置,和"ORDER_TOPIC_CONFIG"这个NameSpace有关,参照DefaultRequestProcessor#getRouteInfoByTopic,后续可讲解此小知识点private String orderTopicConf;// topic 队列元数据private List<QueueData> queueDatas;// topic分布的 broker元数据private List<BrokerData> brokerDatas;// broker上过滤服务器地址列表private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;...省略...
}
为什么不用ConcurrentHashMap存储呢?
首先ConcurrentHashMap如果是1.8之前,初始化容量之后,是不能扩容的。其容量默认是16,且锁的粒度是segment,那么最大并发度也就是容量大小。
如果是1.8之后的版本是支持扩容,且在加锁方面有优化,调整为synchronized+CAS,但是同时会要求jdk版本。
最主要是RocketMQ 中的路由信息比较多。
疑问
1、假设Broker异常宕机,导致生产者发送消息失败怎么解决?
假设Broker异常宕机,NameServer至少等120s才将该Broker从路由信息中剔除,在Broker故障期间,消息生产者Producer根据topic获取到的路由信息包含已经宕机的Broker,会导致消息在短时间内发送失败。
那么生产者是否也有相关的尝试策略?
之后再做补充…
总结
NameServer作为RocketMQ的“大脑”,保存着集群MQ的路由信息,具体就是记录维护Topic、Broker的信息,及监控Broker的运行状态,为client提供路由能力。
而从源代码的角度总结:NameServer就是维护了多个HashMap,Broker的注册,Client的查询都是围绕其Map操作,当然为了解决并发问题添加了ReentrantReadWriteLock(读写锁)。