rocketmq基本架构

在这里插入图片描述

简介

Name server

  • 负责broker注册、心跳,路由等功能,类似Kafka的ZK
  • name server节点之间不互相通信,broker需要和所有name server进行通信。扩容name server需要重启broker,不然broker不会和name server建立连接
  • producer和consumer也都是和name server建立长连接,获取路由信息,拿到对应的broker信息,与broker建立长连接,然后发送/消费消息

路由发现

Pull的模式。当topic路由信息发生变化时,name server不回主动推送给客户端,而是客户端定时拉取。默认客户端每30秒会拉取一次最新的路由信息

扩展:

1)push模型:实时性好,但是需要维护一个长链接,消耗服务端资源。client数量不多,实时性要求高,server数据变化比较频繁的场景适合此种模式

2)pull模型:实时性差

3)long
polling模型:长轮询模式。客户端定时发送拉取请求,服务端会hold住连接一段时间,在此期间的数据变动通过此连接推送。超过hold时间后才断开连接。兼顾以上两种方式

Broker

  • broker每30s给name server发送一次心跳,name server每120s检查一次所有的broker心跳时间,超过阈值踢出broker
  • broker节点集群是主从集群,master负责处理读写请求,slave负责对master中的数据进行备份。master和slave有相同的broker name,但broker id不同,broker id为0的是master,非0的是slave。每个broker与name server集群中的所有节点建立长连接,定时注册topic信息到所有name server

源码分析

NameServer

NameServer的启动过程分析

NameServer服务器相关的源码在namesrv模块下,目录结构如下:
在这里插入图片描述

NamesrvStartup类就是Name Server服务器启动的启动类,NamesrvStartup类中有一个main启动类,main方法调用main0,main0主要流程代码
在这里插入图片描述

main0 方法的主要作用就是创建Name Server服务器的控制器,并且启动Name Server服务器的控制器。NamesrvController类的作用就是为Name Server服务的启动提供具体的逻辑实现,主要包括配置信息的加载、远程通信服务器的创建和加载、默认处理器的注册以及心跳检测机器监控Broker的健康状态等。Name Server服务器的控制器的创建方法为createNamesrvController方法,createNamesrvController方法的主要流程代码如下:

//代码位置:org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController
public static NamesrvController createNamesrvController(String[] args){//设置rocketMQ的版本信息,REMOTING_VERSION_KEY的值为:rocketmq.remoting.version,CURRENT_VERSION的值为:V4_7_0System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//构建命令行,添加帮助命令和Name Server的提示命令,将createNamesrvController方法的args参数进行解析//代码省略//nameServer 服务器配置类和netty 服务器配置类final NamesrvConfig namesrvConfig = new NamesrvConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();//设置netty服务器的监听端口nettyServerConfig.setListenPort(9876);// 判断上述构建的命令行是否有configFile(缩写为C)配置文件,如果有的话,则读取configFile配置文件的配置信息,// 并将转为NamesrvConfig和NettyServerConfig的配置信息// 代码省略// 如果构建的命令行存在字符'p',就打印所有的配置信息病区退出方法// 代码省略//首先将构建的命令行转换为Properties,然后将通过反射的方式将Properties的属性转换为namesrvConfig的配置项和配置值。MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);//打印nameServer 服务器配置类和 netty 服务器配置类的配置信息MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);//将namesrvConfig和nettyServerConfig作为参数创建nameServer 服务器的控制器final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);//将所有的配置保存在内存中(Properties)controller.getConfiguration().registerConfig(properties);return controller;
}

createNamesrvController方法主要做了几件事,读取和解析配置信息,包括Name Server服务的配置信息、Netty 服务器的配置信息、打印读取或者解析的配置信息、保存配置信息到本地文件中,以及根据namesrvConfig配置和nettyServerConfig配置作为参数创建nameServer 服务器的控制器。创建好Name server控制器以后,就可以启动它了。启动Name Server的方法的主流程如下:

//代码位置:org.apache.rocketmq.namesrv.NamesrvStartup#start
public static NamesrvController start(final NamesrvController controller){//初始化nameserver 服务器,如果初始化失败则退出boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}//添加关闭的钩子,进行内存清理、对象销毁等惭怍Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {@Overridepublic Void call() throws Exception {controller.shutdown();return null;}}));//启动controller.start();
}

start方法没什么逻辑,主要作用就是进行初始化工作,然后进行启动Name Server控制器,接下来看看进行了哪些初始化工作以及如何启动Name Server的,初始化initialize方法的主要流程如下:

//代码位置:org.apache.rocketmq.namesrv.NamesrvStartup#initialize
public boolean initialize() {// key-value 配置加载this.kvConfigManager.load();// //创建netty远程服务器,用来进行网络传输以及通信this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);//远程服务器线程池this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));//注册处理器this.registerProcessor();//每10秒扫描不活跃的brokerthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);//每10秒打印配置信息(key-value)this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);//省略部分代码return true;}

initialize方法的主要逻辑如下:

加载配置文件。读取文件名为"user.home/namesrv/kvConfig.json"(其中user.home为用户的目录),然后将读取的文件内容转为KVConfigSerializeWrapper类,最后将所有的key-value保存在如下map中:

//用来保存不同命名空间的key-value private final HashMap<String/* Namespace /,
HashMap<String/
Key /, String/ Value */>> configTable = new
HashMap<String, HashMap<String, String>>();

  • 创建Netty服务器。Name Server 用netty与生产者、消费者以及Boker进行通信。
  • 注册处理器。这里主要注册的是默认的处理器DefaultRequestProcessor,注册的逻辑主要是初始化DefaultRequestProcessor并保存着,待需要使用的时候直接使用。处理器的作用就是处理生产者、消费者以及Broker服务器的不同请求,比如获取生产者和消费者获取所有的路由信息,Broker服务器注册路由信息等。处理器DefaultRequestProcessor处理不同的请求将会在下面进行讲述。
  • 执行定时任务。主要有两个定时任务,一个是每十秒扫描不活跃的Broker。并且将过期的Broker清理掉。另外一个是每十秒打印key-valu的配置信息。

上面就是initialize方法的主要逻辑,特别需要注意每10秒扫描不活跃的broker的定时任务:

//NamesrvController.this.routeInfoManager.scanNotActiveBroker();
//代码位置:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBrokerpublic void scanNotActiveBroker() {//所有存活的BrokerIterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();//遍历Brokerwhile (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();//最后更新时间加上broker过期时间(120秒)小于当前时间,则关闭与broker的远程连接。并且将缓存在map中的broker信息删除if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();//将过期的Channel连接清理掉。以及删除缓存的Brokerthis.onChannelDestroy(next.getKey(), next.getValue().getChannel());}}}

scanNotActiveBroker方法的逻辑主要是遍历缓存在brokerLiveTable的Broker,将Broker最后更新时间加上120秒的结果是否小于当前时间,如果小于当前时间,说明Broker已经过期,可能是已经下线了,所以可以清除Broker信息,并且关闭Name Server 服务器与Broker服务器连接,这样被清除的Broker就不会与Name Server服务器进行远程通信了。brokerLiveTable的结果如下:

//保存broker地址与broker存活信息的对应关系
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

brokerLiveTable缓存着以brokerAddr为key(Broker 地址),以BrokerLiveInfo为value的结果,BrokerLiveInfo是Broker存活对象,主要有如下几个属性:

class BrokerLiveInfo {//最后更新时间private long lastUpdateTimestamp;//版本信息private DataVersion dataVersion;//连接private Channel channel;//高可用服务器地址private String haServerAddr;//省略代码
}

从BrokerLiveInfo中删除了过期的Broker后,还需要做清理Name Server服务器与Broker服务器的连接,onChannelDestroy方法主要是清理缓存在如下map的信息:

////代码位置:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager//保存broker地址与broker存活信息的对应关系
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//保存broker地址与过滤服务器的对应关系,Filter Server 与消息过滤有关系
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
//保存broker 名字与 broker元数据的关系
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//保存集群名字与集群下所有broker名字对应的关系
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//保存topic与topic下所有队列元数据的对应关系private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

在扫描过期的broker时,首先找到不活跃的broker,然后onChannelDestroy方法清理与该不活跃broker有关的缓存,清理的主要流程如下:

  • 清理不活跃的broker存活信息。首先遍历brokerLiveTable找到不活跃的broker,然后删除brokerLiveTable中的与该不活跃的broker有关的缓存信息。
  • 清理与消息过滤有关的缓存。找到不活跃的broker存活信息,删除filterServerTable中的与该broker地址有关的消息过滤的服务信息。
  • 清理与不活跃broker的元素居。brokerAddrTable保存着broker名字与broker元素居对应的信息,BrokerData类保存着cluster、brokerName、brokerId与broker name。遍历brokerAddrTable找到与该不活跃broker的名字相等的broker元素进行删除。
  • 清理集群下对应的不活跃broker名字。clusterAddrTable保存集群名字与集群下所有broker名字对应的关系,遍历clusterAddrTable的所有key,从clusterAddrTable中找到与不活跃broker名字相等的元素,然后删除。
  • 清理与该不活跃broker的topic对应队列数据。topicQueueTable保存topic与topic下所有队列元数据的对应关系,QueueData保存着brokerName、readQueueNums(可读队列数量)、writeQueueNums(可写队列数量)等。遍历topicQueueTable的key,找到与不活跃broker名字相同的QueueData进行删除。

初始化nameserver 服务器以后,接下来就可以启动nameserver 服务器:

//代码位置:org.apache.rocketmq.namesrv.NamesrvController#start
public void start() throws Exception {//启动远程服务器(netty 服务器)this.remotingServer.start();//启动文件监听线程if (this.fileWatchService != null) {this.fileWatchService.start();}
}

start方法做了两件事,第一件就是启动netty服务器,netty服务器主要负责与Broker、生产者与消费者之间的通信,处理Broker、生产者与消费者的不同请求。根据nettyConfig配置,设置启动的配置和各种处理器,然后采用netty服务器启动的模板启动服务器,具体的代码就不分析了,有兴趣的可以看看netty启动代码模板是怎么样的。第二件事就是启动文件监听线程,监听tts相关文件是否发生变化。

Name Server 服务器启动流程的源代码分析到此为止了,在这里总结下Name Server 服务器启动流程主要做了什么事:

  • 加载和读取配置。设置Name Server 服务器启动的配置NamesrvConfig和启动Netty服务器启动的配置NettyServerConfig。
  • 初始化相关的组件。netty服务类、远程服务线程池、处理器以及定时任务的初始化。
  • 启动Netty服务器。Netty服务器用来与broker、生产者、消费者进行通信、处理与它们之间的各种请求,并且对请求的响应结果进行处理。

Broker管理和路由信息的管理

Name Server 服务器的作用主要有两个:Broker管理和路由信息管理。

Broker管理

在上面分析的Name Server 服务器的启动过程中,也有一个与Broker管理相关的分析,那就是启动一个定时线程池每十秒去扫描不活跃的Broker。将不活跃的Broker清理掉。除了在Name Server 服务器启动时启动定时任务去扫描不活跃的Broker外,Name Server 服务器启动以后,通过netty服务器接收Broker、生产者、消费者的不同请求,将接收到请求会交给在Name Server服务器启动时注册的处理器DefaultRequestProcessor类的processRequest方法处理。processRequest方法根据请求的不同类型,将请求交给不同的方法进行处理。有关Broker管理的请求主要有注册Broker、注销Broker,processRequest方法处理注册Broker、注销Broker请求的代吗如下:

//代码位置:org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequestpublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request)  {switch (request.getCode()) {//省略无关代码//注册Brokercase RequestCode.REGISTER_BROKER:Version brokerVersion = MQVersion.value2Version(request.getVersion());if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {return this.registerBrokerWithFilterServer(ctx, request);} else {return this.registerBroker(ctx, request);}//注销Brokercase RequestCode.UNREGISTER_BROKER:return this.unregisterBroker(ctx, request);//省略无关代码}}
Broker注册

Broker 服务器启动时,会向Name Server 服务器发送Broker 相关的信息,如集群的名字、Broker地址、Broker名字、topic相关信息等,注册Broker主要的代码比较长,接下来会分成好几部分进行讲解。如下:

//代码位置:org.apache.rocketmq.namesrv.processor.RouteInfoManager#registerBroker
public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();this.lock.writeLock().lockInterruptibly();//根据集群的名字获取所有的broker名字Set<String> brokerNames = this.clusterAddrTable.get(clusterName);if (null == brokerNames) {brokerNames = new HashSet<String>();this.clusterAddrTable.put(clusterName, brokerNames);}//名字保存在broker名字中brokerNames.add(brokerName);//省略代码
}

registerBroker方法根据集群的名字获取该集群下所有的Broker名字的Set,如果不存在就创建并添加进clusterAddrTable中,clusterAddrTable保存着集群名字与该集群下所有的Broker名字对应关系,最后将broker名字保存在set中。

public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {//省略代码boolean registerFirst = false;//获取broker 元数据BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());this.brokerAddrTable.put(brokerName, brokerData);}//获取所有的broker地址Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> item = it.next();if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {it.remove();}}String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);registerFirst = registerFirst || (null == oldAddr);//省略代码}

上述代码主要做了两件事:

  • 缓存broker元数据信息。首先根据broker名字从brokerAddrTable中获取Broker元数据brokerData,如果brokerData不存在,说明是第一次注册,创建Broker元数据并添加进brokerAddrTable中,brokerAddrTable保存着Broker名字与Broker元数据对应的信息。
  • 从Broker元数据brokerData中获取该元数据中的所有Broker地址信息brokerAddrsMap。brokerAddrsMap保存着brokerId与所有Broker名字对应信息。遍历brokerAddrsMap中的所有broker地址,查找与参数brokerAddr相同但是与参数borkerId不同的进行删除,保证一个broker名字对应着BrokerId,最后将参数brokerId与参数brokerAddr保存到brokerData元数据的brokerAddrsMap中进行缓存。
public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {//省略代码//如果topic的配置不空并且是broker masterif (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {//如果topic配置改变或者是第一次注册if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {//获取所有的topic配置ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {//遍历topic配置,创建并更新队列元素for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}//省略代码}

如果参数topicConfigWrapper不等于空,并且brokerId等于0时,判断topic是否改变,如果topic改变或者是第一次注册,获取所有的topic配置,并创建和更新队列元数据。QueueData保存着队列元数据,如Broker名字、写队列数量、读队列数量,如果队列缓存中不存在该队列元数据,则添加,否则遍历缓存map找到该队列元数据进行删除,如果是新添加的则添加进队列缓存中。

public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {//省略代码//创建broker存活对象,并进行保存BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}//如果过滤服务地址不为空,则缓存到filterServerTableif (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}//如果不是broker master,获取高可用服务器地址以及master地址if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}return result;}

最后代码片段,主要做了三件事,首先创建了Broker存活对象BrokerLiveInfo,添加到brokerLiveTable中缓存,在Name Server 启动时,供定时线程任务每十秒进行扫描。以确保非正常的Broker被清理掉。然后是判断参数filterServerList是否为空,如果不为空,则添加到filterServerTable缓存,filterServerTable保存着与消息过滤相关的过滤服务。最后,判断该注册的Broker不是Broker master,则设置高可用服务器地址以及master地址。到此为止,Broker注册的代码就分析完成了,总而言之,Broker注册就是Broker将相关的元数据信息,如Broker名字,Broker地址、topic信息发送给Name Server服务器,Name Server接收到以后将这些元数据缓存起来,以供后续能够快速找到这些元数据,生产者和消费者也可以通过Name Server服务器获取到Broke相关的信息,这样,生产者和消费者就可以和Broker服务器进行通信了,生产者发送消息给Broker服务器,消费者从Broker服务器消费消息。

Broker注销

Broker注销的过程刚好跟Broker注册的过程相反,Broker注册是将Broker相关信息和Topic配置信息缓存起来,以供生产者和消费者使用。而Broker注销则是将Broker注销缓存的Broker信息从缓存中删除,Broker注销unregisterBroker方法主要代码流程如下:

//代码位置:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unregisterBroker
public void unregisterBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId) {this.lock.writeLock().lockInterruptibly();//将缓存的broker存活对象删除BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);//将所有的过滤服务删除this.filterServerTable.remove(brokerAddr);boolean removeBrokerName = false;//删除broker元数据if (null != brokerData) {String addr = brokerData.getBrokerAddrs().remove(brokerId);if (brokerData.getBrokerAddrs().isEmpty()) {this.brokerAddrTable.remove(brokerName);removeBrokerName = true;}}//如果删除broker元数据成功if (removeBrokerName) {Set<String> nameSet = this.clusterAddrTable.get(clusterName);if (nameSet != null) {boolean removed = nameSet.remove(brokerName);if (nameSet.isEmpty()) {this.clusterAddrTable.remove(clusterName);}}//根据brokerName删除topic配置信息this.removeTopicByBrokerName(brokerName);}this.lock.writeLock().unlock();
}

unregisterBroker方法的参数有集群名字、broker地址、broker名字、brokerId,主要逻辑为:

  • 根据broker地址删除broker存活对象。
  • 根据broker地址删除所有消息过滤服务。
  • 删除broker元数据。
  • 如果删除元数据成功,则根据集群名字删除该集群的所有broker名字,以及根据根据- brokerName删除topic配置信息。
路由信息的管理

处理器DefaultRequestProcessor类的processRequest方法除了处理Broker注册和Broker注销的请求外,还处路由信息管理有关的请求,接收到生产者和消费者的路由信息相关的请求,会交给处理器DefaultRequestProcessor类的processRequest方法处理,processRequest方法则会根据不同的请求类型将请求交给RouteInfoManager类的不同方法处理。RouteInfoManager类用map进行缓存路由相关信息,map如下:

//topic与队列数据对应映射关系
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//broker 名字与broker 元数据对应映射关系
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//保存cluster的所有broker name
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//broker 地址 与 BrokerLiveInfo存活对象的对应映射关系
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//broker 地址 的所有过滤服务
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

RouteInfoManager类利用上面几个map缓存了Broker信息,topic相关信息、集群信息、消息过滤服务信息等,如果这些缓存的信息有变化,就是网这些map新增或删除缓存。这就是Name Server服务的路由信息管理。processRequest方法是如何处理路由信息管理的,具体实现可以阅读具体的代码,无非就是将不同的请求委托给RouteInfoManager的不同方法,RouteInfoManager的不同实现了上面缓存信息的管理。

Broker

Broker 主要负责消息的存储,投递和查询以及保证服务的高可用。Broker负责接收生产者发送的消息并存储、同时为消费者消费消息提供支持。为了实现这些功能,Broker包含几个重要的子模块:

通信模块:负责处理来自客户端(生产者、消费者)的请求。
客户端管理模块:负责管理客户端(生产者、消费者)和维护消费者的Topic订阅信息。
存储模块:提供存储消息和查询消息的能力,方便Broker将消息存储到硬盘。
高可用服务(HA Service):提供数据冗余的能力,保证数据存储到多个服务器上,将Master Broker的数据同步到Slavew Broker上。
索引服务(Index service):对投递到Broker的消息建立索引,提供快速查询消息的能力。
在这里插入图片描述

broker启动过程分析

在Name Server启动以后,Broker就可以开始启动了,启动过程将所有路由信息都注册到Name server服务器上,生产者就可以发送消息到Broker,消费者也可以从Broker消费消息。接下来就来看看Broker的具体启动过程。

//源代码位置:org.apache.rocketmq.broker.BrokerStartup#main
public static void main(String[] args) {start(createBrokerController(args));
}

BrokerStartup类是Broker的启动类,在BrokerStartup类的main方法中,首先创建用createBrokerController方法创建Broker控制器(BrokerController类),Broker控制器主要负责Broker启动过程的具体的相关逻辑实现。创建好Broker 控制器以后,就可以启动Broker 控制器了,所以下面将从两个部分分析Broker的启动过程:

  • 创建Broker控制器
  • 初始化配置信息
  • 创建并初始化Broker控制
  • 注册Broker关闭的钩子
  • 启动Broker控制器
创建Broker控制器

Broker在启动的时候,会初始化一些配置,如Broker配置、netty服务端配置、netty客户端配置、消息存储配置,为Broker启动提供配置准备。

//源代码位置:org.apache.rocketmq.broker.BrokerStartup#createBrokerControllerpublic static BrokerController createBrokerController(String[] args) {/**省略代码注释:1、设置RocketMQ的版本2、设置netty接收和发送请求的buffer大小3、构建命令行:将命令行进行解析封装**///broker配置、netty服务端配置、netty客户端配置final BrokerConfig brokerConfig = new BrokerConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();final NettyClientConfig nettyClientConfig = new NettyClientConfig();nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));//设置netty监听接口nettyServerConfig.setListenPort(10911);//消息存储配置final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();//如果broker的角色是slave,设置命中消息在内存的最大比例if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}//省略代码}

createBrokerController方法创建BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig配置类,BrokerConfig类是Broker配置类。

  • BrokerConfig:属性主要包括Broker相关的配置属性,如Broker名字、Broker Id、Broker连接的Name server地址、集群名字等。
  • NettyServerConfig:Broker netty服务端配置类,Broker netty服务端主要用来接收客户端的请求,NettyServerConfig类主要属性包括监听接口、服务工作线程数、接收和发送请求的buffer大小等。
  • NettyClientConfig:netty客户端配置类,用于生产者、消费者这些客户端与Broker进行通信相关配置,配置属性主要包括客户端工作线程数、客户端回调线程数、连接超时时间、连接不活跃时间间隔、连接最大闲置时间等。
  • MessageStoreConfig:消息存储配置类,配置属性包括存储路径、commitlog文件存储目录、CommitLog文件的大小、CommitLog刷盘的时间间隔等。
初始化配置信息

创建完这些配置类以后,接下来会为这些配置类的一些配置属性设置值,先看看如下代码:

//源代码位置:org.apache.rocketmq.broker.BrokerStartup#createBrokerControllerpublic static BrokerController createBrokerController(String[] args) {//省略代码//如果命令中包含字母c,则读取配置文件,将配置文件的内容设置到配置类中if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {configFile = file;InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);//读取配置文件的中namesrv地址properties2SystemEnv(properties);//将配置文件中的配置项映射到配置类中去MixAll.properties2Object(properties, brokerConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, messageStoreConfig);//设置配置broker配置文件BrokerPathConfigHelper.setBrokerConfigPath(file);in.close();}}//设置broker配置类MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);//省略代码}

上述主要的代码逻辑为如果命令行中存在命令参数为‘c’(c是configFile的缩写),那么就读取configFile文件的内容,将configFile配置文件的配置项映射到BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig配置类中。接下来createBrokerController方法做一些判断必要配置的合法性,如下代码所示:

//源代码位置:org.apache.rocketmq.broker.BrokerStartup#createBrokerController
public static BrokerController createBrokerController(String[] args) {//省略代码//如果broker配置文件的rocketmqHome属性值为null,直接结束程序if (null == brokerConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}//如果name server服务器的地址不为nullString namesrvAddr = brokerConfig.getNamesrvAddr();if (null != namesrvAddr) {try {//namesrvAddr是以";"分割的多个地址String[] addrArray = namesrvAddr.split(";");//每个地址是ip:port的形式,检测下是否形如ip:port的形式for (String addr : addrArray) {RemotingUtil.string2SocketAddress(addr);}} catch (Exception e) {System.out.printf("The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",namesrvAddr);System.exit(-3);}}//设置BrokerId,broker master 的BrokerId设置为0,broker slave 设置为大于0的值switch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:brokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE://如果小于等于0,退出程序if (brokerConfig.getBrokerId() <= 0) {System.out.printf("Slave's brokerId must be > 0");System.exit(-3);}break;default:break;}//省略代码}

首先会判断下RocketmqHome的值是否为空,RocketmqHome是Borker相关配置保存的文件目录,如果为空则直接退出程序,启动Broker失败;然后判断下Name server 地址是否为空,如果不为空则解析以“;”分割的name server地址,检测下地址的合法性,如果不合法则直接退出程序;最后判断下Broker的角色,如果是master,BrokerId设置为0,如果是SLAVE,则BrokerId设置为大于0的数,否则直接退出程序,Broker启动失败。
createBrokerController方法进行必要配置参数的判断以后,将进行日志的设置、以及打印配置信息,主要代码如下:

//源代码位置:org.apache.rocketmq.broker.BrokerStartup#createBrokerController
public static BrokerController createBrokerController(String[] args) {//省略代码//注释:日志设置//printConfigItem 打印配置信息if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig);MixAll.printObjectProperties(console, nettyServerConfig);MixAll.printObjectProperties(console, nettyClientConfig);MixAll.printObjectProperties(console, messageStoreConfig);System.exit(0);} else if (commandLine.hasOption('m')) {//printImportantConfig 打印重要配置信息InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig, true);MixAll.printObjectProperties(console, nettyServerConfig, true);MixAll.printObjectProperties(console, nettyClientConfig, true);MixAll.printObjectProperties(console, messageStoreConfig, true);System.exit(0);}//打印配置信息log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);MixAll.printObjectProperties(log, brokerConfig);MixAll.printObjectProperties(log, nettyServerConfig);MixAll.printObjectProperties(log, nettyClientConfig);MixAll.printObjectProperties(log, messageStoreConfig);//代码省略}

createBrokerController方法的以上代码逻辑打印配置信息,先判断命令行参数是否包含字母‘p’(printConfigItem的缩写),如果包含字母‘p’,则打印配置信息,否则判断下命令行是否包含字母‘m’,则打印被@ImportantField注解的配置属性,也就是重要的配置属性。最后,不管命令行中是否存在字母‘p’或者字母‘m’,都打印配置信息。

以上就是初始化配置信息的全部代码,初始化配置信息主要是创建BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig配置类,并为这些配置类设置配置的值,同时根据命令行参数判断打印配置信息。

初始化Broker控制器
//源代码位置:org.apache.rocketmq.broker.BrokerStartup#createBrokerController
public static BrokerController createBrokerController(String[] args) {//省略代码//创建BrokerController(broker 控制器)final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// remember all configs to prevent discard//将所有的配置信息保存在内存controller.getConfiguration().registerConfig(properties);//初始化broker控制器boolean initResult = controller.initialize();//如果初始化失败,则退出if (!initResult) {controller.shutdown();System.exit(-3);}//省略代码
}

创建并初始化Broker控制的代码比较简单,创建以配置类作为参数的BrokerController对象,并将所有的配置信息保存在内容中,方便在其他地方使用;创建完Broker控制器对象以后,对控制器进行初始化,当初始化失败以后,则直接退出程序。

initialize方法主要是加载一些保存在本地的一些配置数据,总结起来做了如下几方面的事情:

  • 加载topic配置、消费者位移数据、订阅组数据、消费者过滤配置
  • 创建消息相关的组件,并加载消息数据
  • 创建netty服务器
  • 创建一系列线程
  • 注册处理器
  • 启动一系列定时任务
  • 初始化事务组件
  • 初始化acl组件
  • 注册RpcHook
加载topic配置、消费者位移数据、订阅组数据、消费者过滤配置
//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//加载topic配置 topics.jsonboolean result = this.topicConfigManager.load();//加载消费者位移数据 consumerOffset.jsonresult = result && this.consumerOffsetManager.load();//加载订阅组数据 subscriptionGroup.jsonresult = result && this.subscriptionGroupManager.load();//加载消费者过滤 consumerFilter.jsonresult = result && this.consumerFilterManager.load();//省略代码
}

load方法是抽象类ConfigManager的方法,该方法读取文件的内容解码成对应的配置对象,如果文件中的内容为空,就读取备份文件中的内容进行解码。读取的文件都是保存在user.home/store/config/下,user.home是用户目录,不同人的电脑user.home一般不同。topicConfigManager.load()读取topics.json文件,如果该文件的内容为空,那么就读取topics.json.bak文件内容,topics.json保存的是topic数据;同理,consumerOffsetManager.load()方法读取consumerOffset.json和consumerOffset.json.bak文件,保存的是消费者位移数据;subscriptionGroupManager.load()方法读取subscriptionGroup.json和subscriptionGroup.json.bak文件,保存订阅组数据(消费者分组数据)、consumerFilterManager.load()方法读取的是consumerFilter.json和consumerFilter.json.bak的内容,保存的是消费者过滤数据。

创建消息相关的组件,并加载消息数据
//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//省略代码//如果上述都加载成功if (result) {try {//创建消息存储器this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager,                                this.messageArrivingListener,this.brokerConfig);//如果开启了容灾、主从自动切换,添加DLedger角色改变处理器if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}//broker 相关统计this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load plugin//加载消息存储插件MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}//加载消息文件result = result && this.messageStore.load();//省略代码
}

如果加载topic配置、消费者位移数据、订阅组数据、消费者过滤配置成功以后,就创建消息相关的组件,并加载消息数据,这个过程创建了消息存储器、DLedger角色改变处理器、Broker统计相关组件以及消息存储插件,然后加载消息文件中的数据。接下来具体看看加载消息文件中的messageStore.load()方法:

//代码位置:org.apache.rocketmq.store.DefaultMessageStore#load
public boolean load() {boolean result = true;try {//判断abort是否存在boolean lastExitOK = !this.isTempFileExist();log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");//加载定时消费服务器if (null != scheduleMessageService) {//读取delayOffset.json文件result = result && this.scheduleMessageService.load();}// load Commit Log//加载 Commit log 文件result = result && this.commitLog.load();// load Consume Queue//加载消费者队列 文件consumequeueresult = result && this.loadConsumeQueue();if (result) {//加载检查点文件checkpointthis.storeCheckpoint =new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));//加载索引文件this.indexService.load(lastExitOK);//数据恢复this.recover(lastExitOK);log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());}} catch (Exception e) {log.error("load exception", e);result = false;}if (!result) {this.allocateMappedFileService.shutdown();}return result;
}

load方法主要逻辑就是加载各种数据文件,主要有以下几方面进行加载数据:

  • isTempFileExist方法判断abort是否存在,如果不存在,说明Broker是正常关闭的,否则就是异常关闭。
  • scheduleMessageService.load()方法读取user.home/store/config/下的delayOffset.json文件的内容,该文件内容保存延迟消息的位移数据。
  • commitLog.load()加载 CommitLog 文件, CommitLog 文件保存的是消息内容
  • loadConsumeQueue()方法加载consumequeue目录下的内容,ConsumeQueue(消息消费队列)是消费消息的索引,消费者通过ConsumeQueue可以快速找到查找待消费的消息,consumequeue目录下的文件组织方式是:topic/queueId/fileName,所以就可以快速找待消费的消息在哪一个Commit log 文件中。
  • indexService.load(lastExitOK)加载索引文件,加载的是user.home/store/index/目录下文件,文件名fileName是以创建时的时间戳命名的,所以可以通过时间区间来快速查询消息,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故底层实现为hash索引。
  • recover(lastExitOK)方法将CommitLog 文件的内容加载到内存中以及topic队列。
创建netty服务器
//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//省略代码if (result) {//创建netty远程服务器this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);//省略代码}//省略代码
}

创建netty服务器的时候创建了两个,一个是普通的,一个是快速的,remotingServer用来与生产者、消费者进行通信。当isSendMessageWithVIPChannel=true的时候会选择port-2的fastRemotingServer进行的消息的处理,为了防止某些很重要的业务阻塞,就再开启了一个remotingServer进行处理,但是现在默认是不开启的,fastRemotingServer主要是为了兼容老版本的RocketMQ.。

创建一系列线程池
//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//代码省略//发送消息线程池this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));//拉取消息线程池//this.pullMessageExecutor //回复消息线程池//this.replyMessageExecutor //查询消息线程池//this.queryMessageExecutor //broker 管理线程池//this.adminBrokerExecutor//客户端管理线程池//this.clientManageExecutor //心跳线程池//this.heartbeatExecutor //事务线程池// this.endTransactionExecutor //消费者管理线程池this.consumerManageExecutor =Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl("ConsumerManageThread_"));//代码省略
}

创建的线程池对象有发送消息线程池、拉取消息线程池、回复消息线程池、查询消息线程池、broker 管理线程池、客户端管理线程池、心跳线程池、事务线程池、消费者管理线程池。

注册请求处理器
//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//省略代码//注册处理器this.registerProcessor();//省略代码
}

registerProcessor()方法如下:

//源代码位置:org.apache.rocketmq.broker.BrokerController#registerProcessor
public void registerProcessor() {//发送消息处理器SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);//远程服务注册发送消息处理器this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);//注册拉消息处理器//注册回复消息处理器//注册查询消息处理器//注册客户端管理处理器//注册消费者管理处理器//注册事务处理器//注册broker处理器AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}

registerProcessor方法注册了发送消息处理器、远程服务注册发送消息处理器、拉消息处理器、回复消息处理器、查询消息处理器、客户端管理处理器、消费者管理处理器、事务处理器、broker处理器。registerProcessor注册方法也很简单,就是以RequestCode作为key,以Pair<处理器,线程池>作为Value保存在名字为processorTable的HashMap中。每个请求都是在线程池中处理的,这样可以提高处理请求的性能。对于每个传入的请求,根据RequestCode就可以在processorTable查找处理器来处理请求。每个处理器都有有一个processRequest方法进行处理请求。

启动一系列定时任务

Broker初始化方法initialize中,会启动一系列的后台定时线程任务,这些后台任务包括都是由scheduledExecutorService线程池执行的,scheduledExecutorService是单线程线程池( Executors.newSingleThreadScheduledExecutor()),只用单线程线程池执行后台定时任务有一个好处就是减少线程过多,反而导致线程为了抢占CPU加剧了竞争。这一些后台定时线程任务如下:

  • 每24小时打印昨天产生了多少消息,消费了多少消息
  • 每五秒保存消费者位移到文件中
  • 每10秒保存消费者过滤到文件中
  • 每3分钟定时检测消费的进度
  • 每秒打印队列的大小以及队列头部元素存在的时间
  • 每分钟打印已存储在CommitLog中但尚未分派到消费队列的字节数
  • 每两分钟定时获取获取name server 地址
  • 每分钟定时打印slave 数据同步落后多少
//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//省略代码final long period = 1000 * 60 * 60 * 24;//每24小时打印昨天产生了多少消息,消费了多少消息this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);//省略代码
}

每24小时打印昨天产生了多少消息,消费了多少消息的定时任务比较简单,就是将昨天消息的生产和消费的数量统计出来,然后把这两个指标打印出来。

//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//省略代码//每五秒保存消费者位移this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);//每10秒保存消费者过滤this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);//省略代码
}

每五秒保存消费者位移和每10秒保存消费者过滤定时任务都是保存在文件中,每五秒保存消费者位移定时任务将消费者位移保存在consumerOffset.json文件中,每10秒保存消费者过滤定时任务将消费者过滤保存在consumerFilter.json文件中。

//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//省略代码//每3分钟定时检测消费的进度this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);//省略代码
}

每3分钟定时检测消费进度的定时任务的作用是检测消费者的消费进度,当消费者消费消息的进度落后大于配置的最大落后阈值时,就停止消费者消费,具体的实现看protectBroker的源码:

//源代码位置:org.apache.rocketmq.broker.BrokerController#protectBroker
public void protectBroker() {
//是否开启慢消费检测开关,默认未开启
if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {
//遍历统计项
final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();
while (it.hasNext()) {
final Map.Entry<String, MomentStatsItem> next = it.next();
final long fallBehindBytes = next.getValue().getValue().get();
//消费者消费消息的进度落后消费者落后阈值
if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {
final String[] split = next.getValue().getStatsKey().split(“@”);
final String group = split[2];
LOG_PROTECTION.info(“[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it”, group, fallBehindBytes);
//设置消费者消费的标志,关闭消费
this.subscriptionGroupManager.disableConsume(group);
}
}
}
}
protectBroker方法首先判别是否开启慢消费检测开关,如果开启了,就进行遍历统计项,判断消费者消费消息的进度落后消费者落后阈值的时候,就停止该消费者停止消费来保护broker,如果消费者消费比较慢,那么在Broker的消费会越来越多,积压在Broker上,所以停止慢消费者消费消息,让其他消费者消费,减少消息的积压。

//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//代码省略//每秒打印队列的大小以及队列头部元素存在的时间this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);//代码省略
}

每秒打印队列的大小以及队列头部元素存在的时间定时任务,会打印发送消息线程池队列、拉取消息线程池队列、查询消息线程池队列、结束事务线程池队列的大小,以及打印队列头部元素存在的时间,这个时间等于当前时间减去头部元素创建的时间,就是该元素创建到现在已经花费了多长时间。具体的代码如下:

//源代码位置:org.apache.rocketmq.broker.BrokerController#headSlowTimeMills
public long headSlowTimeMills(BlockingQueue<Runnable> q) {long slowTimeMills = 0;//队列的头final Runnable peek = q.peek();if (peek != null) {RequestTask rt = BrokerFastFailure.castRunnable(peek);//当前时间减去创建时间slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();}if (slowTimeMills < 0) {slowTimeMills = 0;}return slowTimeMills;
}
初始化事务消息
//源码位置:org.apache.rocketmq.broker.BrokerController#initialTransaction
private void initialTransaction() {//加载transactionalMessageService,利用spithis.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);if (null == this.transactionalMessageService) {this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());}//创建transactionalMessage检查监听器this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);if (null == this.transactionalMessageCheckListener) {this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());}this.transactionalMessageCheckListener.setBrokerController(this);//创建事务消息检查服务this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);}

initialTransaction方法主要创建与事务消息相关的类,创建transactionalMessageService(事务消息服务)、transactionalMessageCheckListener(事务消息检查监听器)、transactionalMessageCheckService(事务消息检查服务)。transactionalMessageService用于处理事务消息,transactionalMessageCheckListener主要用来回查消息监听,transactionalMessageCheckService用于检查超时的 Half 消息是否需要回查。RocketMQ发送事务消息是将消费先写入到事务相关的topic的中,这个消息就称为半消息,当本地事务成功执行,那么半消息会还原为原来的消息,然后再进行保存。initialTransaction在创建transactionalMessageService和transactionalMessageCheckListener都使用了ServiceProvider.loadClass方法,这个方法就是采用SPI原理,SPI原理就是利用反射加载META-INF/service目录下的某个接口的所有实现,只要实现接口,然后META-INF/service目录下添加文件名为全类名的文件,这样SPI就可以加载具体的实现类,具有可拓展性。

初始化acl组件
//源码位置:org.apache.rocketmq.broker.BrokerController#initialAcl
private void initialAcl() {if (!this.brokerConfig.isAclEnable()) {log.info("The broker dose not enable acl");return;}//利用SPI加载权限相关的校验器List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);if (accessValidators == null || accessValidators.isEmpty()) {log.info("The broker dose not load the AccessValidator");return;}//将所有的权限校验器进行缓存以及注册for (AccessValidator accessValidator: accessValidators) {final AccessValidator validator = accessValidator;accessValidatorMap.put(validator.getClass(),validator);this.registerServerRPCHook(new RPCHook() {@Overridepublic void doBeforeRequest(String remoteAddr, RemotingCommand request) {//Do not catch the exceptionvalidator.validate(validator.parse(request, remoteAddr));}@Overridepublic void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {}});}}

initialAcl方法主要是加载权限相关校验器,RocketMQ的相关的管理的权限验证和安全就交给这里的加载的校验器了。initialAcl方法也利用SPI原理加载接口的具体实现类,将所有加载的校验器缓存在map中,然后再注册RPC钩子,在请求之前调用校验器的validate的方法。

注册RpcHook
//源码位置:org.apache.rocketmq.broker.BrokerController#initialRpcHooks
private void initialRpcHooks() {//利用SPI加载钩子List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);if (rpcHooks == null || rpcHooks.isEmpty()) {return;}//注册钩子for (RPCHook rpcHook: rpcHooks) {this.registerServerRPCHook(rpcHook);}
}

initialRpcHooks方法加RPC钩子,利用SPI原理加载具体的钩子实现,然后将所有的钩子进行注册,钩子的注册是将钩子保存在List中。

以上分析就是创建Broker控制器的全过程,这个过程首先进行一些必要的初始化配置,如Broker配置、网络通信Neety配置以及存储相关配置等。然后在创建并初始化Broker控制器,创建并初始化Broker控制器的过程中,又进行了多个步骤,如加载topic配置、消费者位移数据、启动一系列后台定时任务、创建事务消息相关组件等。

Broker控制器的启动

//源码位置:org.apache.rocketmq.broker.BrokerController#start
public static BrokerController start(BrokerController controller) {try {//Broker控制器启动controller.start();//打印Broker成功的消息String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();if (null != controller.getBrokerConfig().getNamesrvAddr()) {tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();}log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}

controller.start()方法主要是启动各种组件:

  • 启动消息消息存储器
  • netty服务的启动
  • 文件监听器启动
  • broker 对外api启动
  • 长轮询拉取消息服务启动
  • 客户端长连接服务启动
  • 过滤服务管理启动
  • broker 相关统计启动
  • broker 快速失败启动
//源码位置:org.apache.rocketmq.broker.BrokerController#start
public void start() throws Exception {if (this.messageStore != null) {//启动消息消息存储this.messageStore.start();}if (this.remotingServer != null) {//netty服务的启动this.remotingServer.start();}if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}//文件改变监听启动if (this.fileWatchService != null) {this.fileWatchService.start();}//broker 对外api启动if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}//保持长轮询请求的服务启动if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}//客户端长连接服务启动if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}//过滤服务管理启动if (this.filterServerManager != null) {this.filterServerManager.start();}//如果没有采用主从切换(多副本)if (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true, false, true);}//定时注册brokerthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);//broker 相关统计启动if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}//broker 快速失败启动if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}

启动过程还有很多细节没有分析,放到下个文章吧吧吧吧吧

文章大量参考:https://www.zhihu.com/column/c_1437729921845690368

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/893361.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

国产编辑器EverEdit - 大纲视图

1 大纲视图 1.1 应用场景 在编辑较长代码文件时&#xff0c;使用大纲视图可以方便的检视当前文件的变量、函数等信息&#xff0c;方便在不同函数间跳转&#xff0c;对整个文档的全貌了然于胸。   在编辑XML文档时&#xff0c;通过展示XML文件的层次结构、节点布局&#xff0…

Linux中的基本指令(一)

一、Linux中指令的存在意义 Linux中&#xff0c;通过输入指令来让操作系统执行&#xff0c;以此达到控制操作系统的目的&#xff0c;类似于Windows中的双击&#xff0c;右键新建文件&#xff0c;新建文件夹等 1.补&#xff1a;关于屏幕的几个操作指令 ①清屏指令 clear 回…

2025/1/21 学习Vue的第四天

睡觉。 --------------------------------------------------------------------------------------------------------------------------------- 11.Object.defineProperty 1.在我们之前学习JS的时候&#xff0c;普通得定义一个对象与属性。 <!DOCTYPE html> <h…

Go Map 源码分析(一)

Go语言中的map是通过哈希表实现的&#xff0c;其底层结构和实现机制如下&#xff1a; 一、hash 结构 hmap结构体&#xff1a;是map的头部结构&#xff0c;主要字段及含义如下&#xff1a; count&#xff1a;表示当前哈希表中的元素数量&#xff0c;与len()函数相对应。flags…

Linux-C/C++--深入探究文件 I/O (上)(文件的管理、函数返回错误、exit()、_Exit()、_exit())

经过上一章内容的学习&#xff0c;相信各位读者对 Linux 系统应用编程中的基础文件 I/O 操作有了一定的认识和理解了&#xff0c;能够独立完成一些简单地文件 I/O 编程问题&#xff0c;如果你的工作中仅仅只是涉及到一些简单文件读写操作相关的问题&#xff0c;其实上一章的知识…

【机器学习实战中阶】音乐流派分类-自动化分类不同音乐风格

音乐流派分类 – 自动化分类不同音乐风格 在本教程中,我们将开发一个深度学习项目,用于自动化地从音频文件中分类不同的音乐流派。我们将使用音频文件的频率域和时间域低级特征来分类这些音频文件。 对于这个项目,我们需要一个具有相似大小和相似频率范围的音频曲目数据集…

Walrus Learn to Earn计划正式启动!探索去中心化存储的无限可能

本期 Learn to Earn 活动将带领开发者和区块链爱好者深入探索 Walrus 的技术核心与实际应用&#xff0c;解锁分布式存储的无限可能。参与者不仅能提升技能&#xff0c;还能通过完成任务赢取丰厚奖励&#xff01;&#x1f30a; 什么是 Walrus&#xff1f; 数据主权如今正成为越…

git 常用命令 git archive

git archive 是 Git 中用于创建一个包含指定提交或分支中所有文件的归档文件&#xff08;如 .tar 或 .zip&#xff09;的命令。这个命令非常适合用于分发项目快照、备份代码库或导出特定版本的文件。 git archive --formatzip --outputproject.zip HEAD …

Excel 技巧15 - 在Excel中抠图头像,换背景色(★★)

本文讲了如何在Excel中抠图头像&#xff0c;换背景色。 1&#xff0c;如何在Excel中抠图头像&#xff0c;换背景色 大家都知道在PS中可以很容易抠图头像&#xff0c;换背景色&#xff0c;其实Excel中也可以抠简单的图&#xff0c;换背景色。 ※所用头像图片为百度搜索&#x…

持续升级《在线写python》小程序的功能,文章页增加一键复制功能,并自动去掉html标签

增加复制按钮后的界面是这样的 代码如下&#xff1a; <template><view><x-header></x-header><view class"" v-if"article_info"><view class"kuai bgf"><view class"ac fs26"><img sr…

FPGA与ASIC:深度解析与职业选择

IC&#xff08;集成电路&#xff09;行业涵盖广泛&#xff0c;涉及数字、模拟等不同研究方向&#xff0c;以及设计、制造、封测等不同产业环节。其中&#xff0c;FPGA&#xff08;现场可编程门阵列&#xff09;和ASIC&#xff08;专用集成电路&#xff09;是两种重要的芯片类型…

【Linux】Linux入门(三)权限

目录 前提权限概念whoami指令 Linux权限管理文件访问者的分类&#xff08;人&#xff09;file指令权限信息权限的表示方法 chmod指令 更改权限chown指令 修改文件&#xff0c;文件夹所属用户和用户组 权限掩码umask&#xff08;权限掩码&#xff09; 粘滞位 前提 请先看下面这…

蓝桥与力扣刷题(73 矩阵置零)

题目&#xff1a;给定一个 m x n 的矩阵&#xff0c;如果一个元素为 0 &#xff0c;则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,1,1],[1,0,1],[1,1,1]] 输出&#xff1a;[[1,0,1],[0,0,0],[1,0,1]]示例 2&…

Node.js接收文件分片数据并进行合并处理

前言&#xff1a;上一篇文章讲了如何进行文件的分片&#xff1a;Vue3使用多线程处理文件分片任务&#xff0c;那么本篇文章主要看一下后端怎么接收前端上传来的分片并进行合并处理。 目录&#xff1a; 一、文件结构二、主要依赖1. express2. multer3. fs (文件系统模块)4. pat…

大数据,Hadoop,HDFS的简单介绍

大数据 海量数据&#xff0c;具有高增长率、数据类型多样化、一定时间内无法使用常规软件工具进行捕捉、管理和处理的数据集 合 大数据的特征: 4V Volume : 巨大的数据量 Variety : 数据类型多样化 结构化的数据 : 即具有固定格式和有限长度的数据 半结构化的数据 : 是…

深度强化学习:PPO

深度强化学习算法&#xff1a;PPO 1. Importance Sampling 先说一下什么是采样&#xff1a;对于一个随机变量&#xff0c;我们通常用概率密度函数来描述该变量的概率分布特性。具体来说&#xff0c;给定随机变量的一个取值&#xff0c;可以根据概率密度函数来计算该值对应的概…

Flink底层架构与运行流程

这张图展示了Flink程序的架构和运行流程。 主要组件及功能&#xff1a; Flink Program&#xff08;Flink程序&#xff09;&#xff1a; 包含Program code&#xff08;程序代码&#xff09;&#xff0c;这是用户编写的业务逻辑代码。经过Optimizer / Graph Builder&#xff08…

嵌入式知识点总结 C/C++ 专题提升(一)-关键字

针对于嵌入式软件杂乱的知识点总结起来&#xff0c;提供给读者学习复习对下述内容的强化。 目录 1.C语言宏中"#“和"##"的用法 1.1.(#)字符串化操作符 1.2.(##)符号连接操作符 2.关键字volatile有什么含意?并举出三个不同的例子? 2.1.并行设备的硬件寄存…

mysql精简单机版,免登录,可复制,不启动服务与本机mysql无冲突

突然有了个需要在本地使用的mysql需求&#xff0c;要求不用安装,随拷随用,不影响其他mysql服务,占用空间小.基于这种需求做了个精简版的mysql 首先下载mysql的zip安装包 > windows 64位 > https://repo.huaweicloud.com/mysql/Downloads/MySQL-5.7/mysql-5.7.36-winx64…

俄语画外音的特点

随着全球媒体消费的增加&#xff0c;语音服务呈指数级增长。作为视听翻译和本地化的一个关键方面&#xff0c;画外音在确保来自不同语言和文化背景的观众能够以一种真实和可访问的方式参与内容方面发挥着重要作用。说到俄语&#xff0c;画外音有其独特的特点、挑战和复杂性&…