RocketMQ源码阅读-Message消息存储
- 1. CommitLog的作用
- 2. CommitLog 存储消息
- 3. 时序图
- 4. 小结
在Broker消息接收一篇中,分析到Broker接收到消息,最终会调用CommitLong#putMessage方法存储消息。
本篇来分析CommitLong#putMessage存储消息的流程。
1. CommitLog的作用
Store all metadata downtime for recovery, data protection reliability
翻译为:存储元数据,提供在停机时恢复和数据保护的能力
CommitLog是针对 MappedFileQueue 的封装使用。
其中有一个属性:
protected final MappedFileQueue mappedFileQueue;
MappedFileQueue是用来存储消息的文件队列,对上层提供可无限使用的文件容量,有一个属性:
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
是实际存储消息的链表,MappedFile是消息文件实体,每个 MappedFile 统一文件大小。
他们三个的关系是:
他们的比例为:CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N
CommitLog 存储在 MappedFile 有两种内容类型:
- MESSAGE :消息。
- BLANK :文件不足以存储消息时的空白占位。
2. CommitLog 存储消息
CommitLog存储消息的入口方法为CommitLog#putMessage(…):
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();int queueId = msg.getQueueId();// 事务相关final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic = ScheduleMessageService.SCHEDULE_TOPIC;queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}long eclipseTimeInLock = 0;// 获取写入映射文件MappedFile unlockMappedFile = null;MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();// 获取写入锁putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);// 当不存在映射文件时,进行创建if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);}// 存储消息result = mappedFile.appendMessage(msg, this.appendMessageCallback);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:// 当文件尾时,获取新的映射文件,并进行插入unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);}result = mappedFile.appendMessage(msg, this.appendMessageCallback);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);case UNKNOWN_ERROR:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);default:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);}eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock = 0;} finally {// 释放写入锁putMessageLock.unlock();}if (eclipseTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());// 刷新磁盘handleDiskFlush(result, putMessageResult, msg);// 数据同步,主节点同步到从节点可以看到handleHA(result, putMessageResult, msg);return putMessageResult;
}
可以看到,此方法获取到映射文件,然后将消息写入文件,并刷入磁盘,其中会使用到写入锁进行并发控制。
最后进行数据同步,将主节点的数据同步到从节点。
刷新磁盘的方法为CommitLog#handleDiskFlush:
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 进行同步||异步 flush||commit// Synchronization flushif (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()+ " client address: " + messageExt.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}} else {service.wakeup();}}// Asynchronous flushelse {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();} else {commitLogService.wakeup();}}
}
主从数据同步的方法为CommitLog#handleHA:
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 如果是Master节点,同步到从节点if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {HAService service = this.defaultMessageStore.getHaService();if (messageExt.isWaitStoreMsgOK()) {// Determine whether to waitif (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);service.getWaitNotifyObject().wakeupAll();boolean flushOK =request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}}// Slave problemelse {// Tell the producer, slave not availableputMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);}}}}
对于获取文件和文件刷新落盘先不做深究,后面再深入分析。
3. 时序图
4. 小结
本篇主要分析了CommitLog的作用以及其进行消息存储的流程:
CommitLog主要是对MappedFileQueue的封装使用,MappedFileQueue是对消息文件的封装。
存储流程主要是:
- 获取映射文件
- 获取写锁
- 写文件
- 释放写锁
- 刷新文件到磁盘(分为同步刷新和异步刷新)
- 如果是Master节点,需要同步数据到从节点
Master同步数据到从节点的过程放在下一篇《RocketMQ源码阅读-Master数据同步从节点》进行分析。