在分布式消息中间件的领域中,RocketMQ 以其高性能、高可靠性和强大的功能占据着重要的地位。而 Broker 作为 RocketMQ 的核心组件之一,其控制器的启动过程涉及到众多关键环节和复杂的逻辑。理解这个过程对于深入掌握 RocketMQ 的运行机制以及在实际应用中更好地部署和优化它至关重要。今天,我们就一起深入探讨 RocketMQ 的 Broker 控制器是如何启动的。
//源码位置:org.apache.rocketmq.broker.BrokerController#start
public static BrokerController start(BrokerController controller) {try {//Broker控制器启动controller.start();//打印Broker成功的消息String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();if (null != controller.getBrokerConfig().getNamesrvAddr()) {tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();}log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}
start方法的逻辑比较简单,首先启动Broker控制器,然后打印成功启动Broker控制器的日志。Broker控制器启动的逻辑主要在controller.start()中,接下来,分析下controller.start()方法的作用,controller.start()方法主要是启动各种组件:
● 启动消息消息存储器
● netty服务的启动
● 文件监听器启动
● broker 对外api启动
● 长轮询拉取消息服务启动
● 客户端长连接服务启动
● 过滤服务管理启动
● broker 相关统计启动
● broker 快速失败启动
//源码位置:org.apache.rocketmq.broker.BrokerController#start
public void start() throws Exception {if (this.messageStore != null) {//启动消息消息存储this.messageStore.start();}if (this.remotingServer != null) {//netty服务的启动this.remotingServer.start();}if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}//文件改变监听启动if (this.fileWatchService != null) {this.fileWatchService.start();}//broker 对外api启动if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}//长轮询拉取消息服务启动if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}//客户端长连接服务启动if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}//过滤服务管理启动if (this.filterServerManager != null) {this.filterServerManager.start();}//如果没有采用主从切换(多副本)if (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true, false, true);}//定时注册brokerthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);//broker 相关统计启动if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}//broker 快速失败启动if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}
总体而言,Broker的启动过程还是比较复杂的,启动过程可以分为两个部分,创建Broker控制器和启动Broker控制器。创建Broker控制器的过程中。初始化配置信息、创建各种组件、创建和启动一些后台线程服务、以及初始化各种组件;启动Broker控制的过程就是各种组件的启动,另外还启动定时注册Broker的任务。从宏观的角度大体分析了Broker的启动过程,还有很多细节没有进行深入,这些细节的深入将在后续的源码分析中体现。
一、存储设计
试想一下,如果让你来设计消息的存储,你会如何设计呢?
- 首先我们肯定需要有块内存缓冲区,用来接收消息
- 但是内存毕竟有限,当消息大量堆积的时候,全放在内存肯定是不合适的,所以我们肯定需要将消息从内存写到文件中。
- 如果所有消息全都存放到一个文件中,消息检索会很耗时,过期消息的清理也会很麻烦,所以消息肯定要进行多文件存储。
其实 RocketMQ 大概就是按照上面这样去实现的:
每个Broker都对应有一个MessageStore,专门用来存储发送到它的消息,不过MessageStore本身不是文件,只是存储的一个抽象,MessageStore 中保存着一个 CommitLog,CommitLog 维护了一个 MappedFileQueue,而MappedFileQueue 中又维护了多个 MappedFile,每个MappedFile都会映射到文件系统中一个文件,这些文件才是真正的存储消息的地方,MappedFile的文件名为它记录的第一条消息的全局物理偏移量。
说的我都有点晕了。画成图,大概就是下面这个样子:
二、源码分析
2.1 消息接收
Broker对于每个请求 Code,都注册了对应的处理类,其中用于接收消息的处理类为:SendMessageProcessor
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
SendMessageProcessor 实现了 NettyRequestProcessor 接口,并在接口方法processRequest()中处理接收到的请求,SendMessageProcessor在processRequest()中调用了sendMessage()方法来进行消息处理
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.consumerSendMsgBack(ctx, request);default:......RemotingCommand response;if (requestHeader.isBatch()) {response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);}......return response;}
}
2.2 消息存储
接收到消息请求后,就要处理请求了,上面调用了sendMessage()来处理消息。
SendMessageProcessor 中 sendMessage()中主要分为下面几步:
1:根据收到请求,封装成内部消息结构:MessageExtBrokerInner。
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
......
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
msgInner.setBody(body);
......
2:调用 Broker 中 的MessageStore的putMessage()方法,将消息放入MessageStore中。
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
前面我们已经提过,每个Broker都有一个MessageStore实例,MessageStore本身是一个接口,定义了一些用来存储消息的接口协议,MessageStore默认的实现类为DefaultMessageStore,Broker在其初始化方法initialize()中便会初始化好DefaultMessageStore。
DefaultMessageStore 中 putMessage()逻辑又分为下面几步:
1:检查当前Broker是否可以存储消息,比如 MessageStore 被关闭、Broker 状态为 Slave 都会拒绝存储。
if (this.shutdown) {log.warn("message store has shutdown, so putMessage is forbidden");return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("message store is slave mode, so putMessage is forbidden ");}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}......
2:检查消息合法性,比如消息的Topic长度和内容长度是否超出限制。这种情况下也会拒绝存储。
if (msg.getTopic().length() > Byte.MAX_VALUE) {log.warn("putMessage message topic length too long " + msg.getTopic().length());return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
3:如果消息通过了上面的重重考验,便会被提交给 MessageStore 中的 CommitLog,进行下一步处理。
PutMessageResult result = this.commitLog.putMessage(msg);
消息到了CommitLog后,便要开始进入存储逻辑了。我们来看看CommitLog中是如何处理消息的。
CommitLog 中的 PutMessage()大概步骤如下:
1:获取写锁,保证同一时刻只处理一条消息的存储操作。
putMessageLock.lock();
2:从CommitLog的Message 中获取最新的MappedFile,追加消息。
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
前面介绍到,CommitLog 中保存了一个MappedFileQueue,MappedFileQueue 初始化的时候配置了消息文件MappedFile的存储路径以及单个MappedFile文件的大小,当某个消息文件写满后,便会生成一个新的MappedFile继续写入消息,所以MappedFileQueue中会按照消息写入时间顺序,维护多个MappedFile。
3:消息追加结束后,释放写锁
putMessageLock.unlock();
上面这几步中,我们重点要关注的便是第2步,即将消息追加到当前最新的MappedFile中。
上面追加消息调用的是MappedFile.appendMessage()方法,此方法最终调用到MappedFile.appendMessagesInner()中:
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {assert messageExt != null;assert cb != null;// 获取当前文件的写入位置int currentPos = this.wrotePosition.get();// 如果当前文件未写满,则进入追加逻辑if (currentPos < this.fileSize) {ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result = null;if (messageExt instanceof MessageExtBrokerInner) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);} else if (messageExt instanceof MessageExtBatch) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}this.wrotePosition.addAndGet(result.getWroteBytes());this.storeTimestamp = result.getStoreTimestamp();return result;}log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}
我们来分析下appendMessagesInner()中追加的的逻辑:
1:获取MappedFile中的 writeBuffer,如果 writeBuffer 为空,则获取mappedByteBuffer。
在MessageStore 初始化的时候,会初始化一个Buffer缓存池:TransientStorePool,TransientStorePool在初始化时会初始化若干DirectBuffer,放入一个Deque中,默认池子容量为5。MappedFile的writeBuffer就是从这个池子中获取的。
而 mappedByteBuffer 类型为 MappedByteBuffer,前面说到每个MappedFile都会映射到文件系统中的一个文件,mappedByteBuffer 即为该文件在内存中的映射。
当追加消息到MappedFile中,会优先追加到 writeBuffer中。
2:调用 cb.doAppend()追加消息,调用该方法时,传入了下面几个参数
this.getFileFromOffset():MappedFile的全局消息物理偏移量(即MappedFile中第一个消息全局物理偏移量,也是MappedFile的文件名)。
byteBuffer:即MappedFile的内存缓冲区,也即是 1 中的writeBuffer或mappedByteBuffer。
this.fileSize - currentPos:fileSize为单个文件的额定大小,默认为1GB,currentPos为当前文件中已经写到什么位置,两个相减即为当前文件剩余容量。
(MessageExtBrokerInner) messageExt:这个没什么好说的,就是内部封装好的消息
cb 从哪来的呢?
前面CommitLog在调用appendMessagesInner()时,传入的 cb 为:this.appendMessageCallback,它的类型为 DefaultAppendMessageCallback,实现了AppendMessageCallback接口。所以我们接下来就要看看DefaultAppendMessageCallback中对于doAppend()的实现即可。
doAppend()主要逻辑如下:
1:计算消息存储的各个属性,如消息长度,消息在消息队列中的长度等。
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
......
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
2:判断消息追加后是否超过单个MappedFile大小,如果超出,则返回状态码:AppendMessageStatus.END_OF_FILE
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3 The remaining space may be any value// Here the length of the specially set maxBlankfinal long beginTimeMills = CommitLog.this.defaultMessageStore.now();byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
此时 CommitLog 会新创建一个MappedFile,重新追加消息。
switch (result.getStatus()) {......case END_OF_FILE:unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);}result = mappedFile.appendMessage(msg, this.appendMessageCallback);break;
}
3:序列化消息内容,存储到内存缓存区中
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
//this.msgBatchMemory.put(msgInner.getStoreHostBytes());
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)this.msgStoreItemMemory.put(propertiesData);final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
4:返回追加成功的结果
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
三、总结
总结一下,RocketMQ 会创建多个MappedFile用来存储文件,每个MappedFile大小固定,有自己的内存缓冲区和对应的系统文件,所有的MappedFile由CommitLog中的MappedFileQueue统一维护。
本篇文章主要讲解了消息从接收到存储到内存中的过程,但是事情到这还没结束,因为消息最终是要存放到文件中的,下一篇文章就要来说说RocketMQ的文件刷盘策略。