Broker的关机恢复机制
概述
Broker关机恢复是指恢复CommitLog、Consume Queue、Index File等数据文件。Broker关机分为正常调用命令关机和异常被迫进程终止关机两种情况。恢复过程的设计目标是使正常停止的进程实现零数据丢失,异常停止的进程实现最少量的数据丢失,与关机恢复相关的主要文件有两个:abort和checkpoint.
abort文件
abort是一个空文件,标记当前Broker是否正常关机,Broker进程正常启动的时候,创建该文件。Broker进程正常停止后,该文件就会删除;如果异常退出,则文件依旧存在,创建和删除的过程如图
- abort文件创建流程
- aboirt文件删除流程
Checkpoint文件
checkpoint是检查点文件,保存Broker最后正常存储各种数据的时间,在重启Broker时,恢复程序知道从什么时候恢复数据。检查点逻辑由StoreCheckpoint类实现。
在StoreCheckpoint类中保存了3个时间,更新过程如图.
- physicMsgTimestamp:最后一条已存储CommitLog的消息的存储时间
- logicsMsgTimestamp:最后一条已存储Consume Queue的消息的存储时间
- indexMsgTimestamp:最后一条已存储IndexFile的消息的存储时间
- physicMsgTimestamp和logicsMsgTimestamp的更新都是在数据存储成功后进行的,过程比较简单。而indexMsgTimestamp的逻辑是在Index File刷盘时被更新的,Index File刷盘方法IndexService.flush()。
从上述代码可以看到,在IndexFile刷盘后,已刷盘文件文件的最后存储消息时间被赋值给indexMsgTimestamp,并对Checkpoint文件进行刷盘。
注:IndexFile的刷盘设计和CommitLog、Consume Queue刷盘的方式不同,容易被忽略
Broker关机恢复流程
Broker在启动时会初始化abort、checkpoint两个文件。正常关闭进程时会删除abort文件,将checkpoint文件刷盘;异常关闭时,通常来不及删除abort文件。由此,在重新启动Broker时会根据abort判断是否需要异常停止进程,而后恢复数据。Broker启动时,会启动存储服务DefaultMessageStore.存储服务在初始化时执行load方法加载全部数据,这里主要分析数据加载流程。Broker关机的恢复过程可以分为以下几步.
- 第一步:Broker异常退出检查。如果abort文件存在,说明上次是异常退出的。
- 第二步:加载延迟消息的位点信息。ScheduleMessageService服务通过继承和重写ConfigManager,调用load()方法从磁盘加载延迟位点文件的内容,并根据配置项messageDelayLevel初始化延迟级别
- 第三步:加载全部CommitLog文件(#1部分)。通过读取CommitLog目录下的所有文件,依次加载每个CommitLog为MappedFile,并且设置写指针、已刷盘指针、已提交指针,使所有指针都指向该文件的最末位.CommitLog文件加载代码如图。如果文件大小已配置的大小不一致,恢复时
就直接被忽略,所以,在重启时不要修改mappedFileSizeCommitLog(默认是1G)参数的值,否则数据无法恢复
- 第四步:加载全部Consume Queue文件及数据(如图#2、#3)。调用loadConsumeQueue方法,读取./consumequeue/Topic/queueId/目录,加载全部Topic、queueId作为ConsumeQueue对象,再调用load()方法初始化每一个ConsumeQueue
- 第五步:初始化Checkpoint文件为StoreCheckpoint对象,并且初始化三个数据:physicMsgTimestamp、logicsMsgTimestsamp和indexMsgTimestamp.
初始化StoreCheckpoint对象
在StoreCheckpoint构造方法中初始化三个时间戳
- 第六步:加载IndexFile索引(#4部分)。加载./index目录下的全部索引文件,如果上次进程异常退出并且索引文件操作的最后时间戳大于Checkpoint中保存的时间,则说明当前文件有部分数据可能存在错误,须立即销毁文件
- 第七步:恢复全部数据(#5部分)lastExitOK=True,表示上次进程正常退出。全部恢复数据主要恢复ConsumeQueue、CommitLog、内存中的consumeQueueTable,并纠正Consume Queue中的最新位点值。
recoverCOnsumeQueue()方法通过循环所有Topic对应的ConsumeQueue,依次调用ConsumeQUeue.recover()方法执行数据恢复
recoverNormally()方法在Broker正常关闭后重启执行CommitLog恢复(#5,2)对于CommitLog恢复数据,这里有一个小技巧,正常恢复是从倒数第三个文件开始直到最后一个文件。正常恢复是假定数据都是正常的,大部分场景都关心最新的消息,所以恢复最新的三个文件到内存中,消息量大小为3GB,当然,如果恢复文件个数做成可配置的就更好了
recoverAbnormally()方法在Broker异常关闭后重启时执行CommitLog恢复(#5.3)CommitLog异常恢复是从最后一个文件开始反向恢复到第一个文件。因为当进程异常停止后最容易出错的是最新的某些文件。所以异常恢复时,RocketMQ从最后一个文件开始,倒序找第一个正常的文件开始恢复。
CommitLog.isMappedFileMatchedRecover()方法判断文件是否正常,整个方法的重点在于,只要文件的最后消息的存储时间都小于在Checkpoint保存的对应时间,那么该文件并未损坏。
CommitLog恢复完毕,会将该文件中的消息重新分发,创建ConsumeQueue和IndexFile。分发全部消息还是部分消息时根据duplicationEnable的值(默认为False)来判断的
recoverTopicQueueTable():纠正Consume Queue中最小消费位点和恢复ComitLog内存中的TopicTable(#5.4)