目录
前言
准备
消息载体CommitLog
文件持久化位置
源码解析
broker消息对象MessageExtBrokerInner
异步存储message
CommitLog的真相
创建MappedFile文件
加入异步刷盘队列
Message异步存储MappedByteBuffer
总结
前言
在面试中我们经常会听到这样的回答,生产者将message发送给broker服务,然后消费者从broker中获取消息并消费,为了保证message在broker服务中不丢失,mq会对消息数据进行持久化到磁盘中。那么message到达broker服务后是如何进行存储并持久化到磁盘中的呢?这就是本篇要学习的内容。
准备
源码地址:https://github.com/apache/rocketmq
目前最新版本为:5.2.0
那么我们在idea上切换分支为 release-5.2.0
消息载体CommitLog
该对象是broker服务接收到message后进行存储的数据对象,一般就把存储消息的文件就称为commitLog文件也就是最终存储磁盘上的数据文件。
大致的message流向如图:
根据源码可以知道,一个commitLog文件最大存储1G数据,文件写满了,则会写入下一个文件中
文件持久化位置
commitlog文件的持久化存放的位置是通过broker.conf配置文件中storePathCommitLog配置
storePathCommitLog = /Users/leonsh/rocketmqnamesrv/store/commitlog
最后生成的文件为这样
文件命名
查看上面图片可知文件的名称是一串数字20个0组成,因为文件名称是按照偏移量offset来命名的,
因为这是第一个文件所以offset为0,补全20位,所以文件名称为20个0
,以此类推第二个文件名称则为00000000001073741824
上面说过一个commitlog文件最大存储1G,而1G=1024*1024*1024=1073741824bit,这就是第二个文件的偏移量
源码解析
前面说到Producer发送message到broker后,broker会对接收的message请求进行处理
//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:87
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request)
上面的方法名中顾名思义就是处理请求的,并且所在的文件命名SendMessageProcessor也说明了该类的作用。那么我们就从该方法深入源码中
看方法引用位置我们会发现许多地方调用了该方法,先抛开前面broker如何接收的,反正最后消息会到达这里,从该方法开始就是broker处理message的核心流程,也是本篇学习的重点
broker消息对象MessageExtBrokerInner
MessageExtBrokerInner该对象就是用来后续对message处理的封装
//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:255
//获取请求对象中的消息体
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
//初始化消息对象
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
- requestHeader 该对象就是在上一篇中讲到的发送message的消息请求头
- 从请求头中获取设置的队列id,如果没有设置,则会从对应的topic中随机获取一个randomQueueId()
- 从请求头中获取topic名称,通过名称再去获取broker中存储的topic对应的数据对象,深入源码会发现,broker中存储topic数据也是使用的map,ConcurrentMap<String, TopicConfig> topicConfigTable;
- 最后就是创建MessageExtBrokerInner对象并设值
异步存储message
//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:255
CompletableFuture<PutMessageResult> asyncPutMessageFuture;
if (sendTransactionPrepareMessage) {//事务消息asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {//普通消息asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
或许大家和博主开始一样都有一个疑惑,我们生产者发送的是同步消息,为何到了broker却是异常存储呢?
1.其实生产者发送同步消息和broker异步存储都是相互独立互不干扰的,broker异步存储只是为了提高mq接收消息的写入性能和吞吐量。broker异步存储会将写入内存的message进行异步刷盘。
2.就算broker是异步存储,但也不会立即返回结果给生产者,需要等待broker异步刷盘成功才会返回结果给生产者,通过broker提供的CompletableFuture机制实现。
什么,看完解释还是有点懵,有点抽象,我们继续向下深入源码,一步一步解开疑惑,我相信看完后面的解析同样会豁然开朗的!
CommitLog的真相
//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:255
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
到这里,本文开头提到的commitLog对象终于出现了,查看该源码可知,commitlog对象中定义了一个MappedFileQueue对象,这个对象又是做什么的,我们继续深入源码
//源码位置
//包名:org.apache.rocketmq.store
//文件:CommitLog
//行数:942
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
深入该方法,大概意思就是从MappedFileQueue对象中的CopyOnWriteArrayList<MappedFile> mappedFiles;集合中取出里面的最后一个MappedFile对象,至此赢来大结局MappedFile对象才是最终映射到磁盘文件的,而CommitLog可以理解为MappedFile对象的外层封装。但落到磁盘上的文件我们依然称为commitLog文件
扩展:
CopyOnWriteArrayList 是 Java 中的一种线程安全的 List 实现,属于 java.util.concurrent 包
读操作:不需要加锁,直接操作底层数组,底层数组在写操作时是一个副本,读操作不会影响正在进行的写操作,能够保证高效的并发读性能。
写操作:会创建底层数组的一个新的副本,对这个副本进行修改, 修改完成后,新的副本会替换原来的数组
创建MappedFile文件
//源码位置
//包名:org.apache.rocketmq.store
//文件:CommitLog
//行数:1001
if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noiseif (isCloseReadAhead()) {setFileReadMode(mappedFile, LibC.MADV_RANDOM);}
}
因为broker是启动后首次存储数据,所以上面获取出来的mappedFile一定为空则进入if代码块
因此偏移量也是初始值0
生成MappedFile文件路径名称
//源码位置
//包名:org.apache.rocketmq.store
//文件:MappedFileQueue
//行数:345
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset+ this.mappedFileSize);
- this.storePath:该字段就是前面在broker.conf文件中配置的文件地址
- File.separator:分隔符
- UtilAll.offset2FileName(createOffset):生成20位数字组成的文件名称,当前createOffset=0。
为何会生成两个地址nextFilePath与nextNextFilePath呢?
因为mq在生成当前需要使用的文件时同时生成下一个使用的文件,当第一个文件存储满后,直接使用下一个文件,减小了创建文件的开销,提高mq的性能。所以会同时生成2个文件。
那么问题来了,为何本文开头生成的文件怎么只有一个?
我们查看源码提交记录可知,nextNextFilePath第二个文件是2021年9月才新增的
查看rocketMq在github上各个版本的发布时间,2021年9月并没有发布新版本,但是2021年10月发布了rocketmq-all-4.9.2
那么由此可得,rocketMq同时创建2个文件从版本4.9.2开始支持,之前的版本都只会创建1个文件
因为博主的broker服务是通过docker镜像启动的,但是查看镜像版本显示的确为最新版本
其实这只是rocketMq镜像的版本,而我们看的是镜像中使用rocketMq框架版本
执行命令查看镜像的详细信息
docker inspect apacherocketmq/rocketmq:latest
由此可得博主的rocketMq版本为:4.6.0,所以只会创建一个commitLog文件
加入异步刷盘队列
//源码位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行数:62
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
//...
//加入队列触发异步刷盘操作
boolean offerOK = this.requestQueue.offer(nextReq);
//...
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
//...
boolean offerOK = this.requestQueue.offer(nextNextReq);
- AllocateRequest:就是message异步存储请求最后的封装
- this.requestTable:也是一个map对象 ConcurrentMap<String, AllocateRequest> requestTable;key为文件的路径,value则为AllocateRequest
- this.requestQueue:这是一个队列PriorityBlockingQueue<AllocateRequest> requestQueue;队列元素为AllocateRequest
PriorityBlockingQueue是如何做到异步刷盘的呢?
该队列就是为broker实现异步存储的核心,可能大家对这个队列比较陌生
它是Java 中 java.util.concurrent 包提供的一个线程安全的优先级队列。它基于优先级堆实现,能够保证元素按照自然顺序或者指定的比较器顺序进行排序
因为它是一个队列,那么我们首先就会想到生产者和消费者,那么就起到了异步解耦的作用
他有两个非常重要的方法:
- offer(): 将一个元素插入到队列中
- take(): 从队列中获取并移除元素 由于 PriorityBlockingQueue 是一个阻塞队列,如果队列为空,take 方法会一直阻塞直到有元素可用
总结:由上面我们知道offer()一般用于生产者调用,而take()则是消费者调用,当队列为空时消费者线程会一直阻塞,只要队列中存入对象,消费者就会感知到并消费。可以理解为消费者和生产者共享PriorityBlockingQueue对象
//源码位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行数:99
AllocateRequest result = this.requestTable.get(nextFilePath);
//...
//阻塞等待刷盘结果
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
上面源码的作用就是等待异步刷盘结果
- 第一段就是取出之前存入的第一个请求对象AllocateRequest
- 第二段则是判断异步刷盘是否完成,成功则返回,还没有处理完则一直阻塞,直到达到超时时间waitTimeOut
result.getCountDownLatch().await为何能做到阻塞等待结果呢?
进入AllocateRequest对象中可知,操作的是这个对象CountDownLatch countDownLatch = new CountDownLatch(1)
CountDownLatch或许大家不太熟悉,但ReentrantLock大家并不陌生吧,面试中经常问到,他们同属于java并发包JUC( java.util.concurrent )下的对象.
概念:它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。它是通过一个计数器实现的,该计数器初始化为一个给定的值。每当一个线程完成了它的一项操作后,这个计数器就递减。当计数器的值到达零时,等待在这个计数器上的线程将被唤醒并继续执行
总结:通过源码我们看到AllocateRequest被创建时里面属性CountDownLatch中的计数器默认就是1,所以需要一直等待被修改为0时才会继续执行后续逻辑,那就是等待异步刷盘完成。
Message异步存储MappedByteBuffer
//源码位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行数:155
AllocateRequest req = null;
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
该源码就是对之前加入队列的AllocateRequest取出来,并执行后续的存储操作,可以说就是消费者消费的地方,我们可以结合源码上下文代码可以知道,所在的类的顶级继承类是Runnable,而上面代码所在方法就是被重写的run()方法调用,可以认为消费者是在单独的一个线程中执行的。
获取缓冲区
//源码位置
//包名:org.apache.rocketmq.store.logfile
//文件:DefaultMappedFile
//行数:607
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
被操作的对象是MappedByteBuffer
MappedByteBuffer是什么?
是 Java NIO(New Input/Output)中的一个类,它允许将文件直接映射到内存中,从而提高文件的读写效率。RocketMQ 使用 MappedByteBuffer 来管理 CommitLog 文件,以实现高效的消息存储和检索。通过将文件映射到内存,RocketMQ 可以直接操作内存数据,而无需频繁的磁盘 I/O 操作。
MappedByteBuffer也是mmap的一种实现方式
什么是mmap?
mmap(内存映射文件)是一种将文件内容映射到进程的地址空间的技术。这样一来,文件内容就可以像访问内存一样被读写,从而显著提高 I/O 操作的效率。
调用mappedByteBuffer.slice()方法的作用是什么?
用于创建一个新的缓冲区,该缓冲区与原始缓冲区共享相同的底层内存,但具有独立的位置、限制和标记。这在需要操作内存映射文件的某一部分时非常有用,而不影响整个映射文件的其他部分。
MappedByteBuffer有两大特点:
- 延迟写入:数据写入 MappedByteBuffer 时,实际上是写入了内存中的映射区域,操作系统会在合适的时候将这些数据同步到磁盘,而不是立即进行磁盘 I/O 操作。
- 强制刷新:为了确保数据的一致性和持久性,MappedByteBuffer 提供了 force() 方法,可以将内存中的修改强制刷新到磁盘
//源码位置
//包名:org.apache.rocketmq.store.logfile
//文件:DefaultMappedFile
//行数:611
byteBuffer.put((int) i, (byte) 0);
//...
mappedByteBuffer.force();
总结:那么在RocketMQ 中,MappedFile 类通过使用 MappedByteBuffer 来管理 CommitLog 文件,并且使用 slice() 方法来创建子缓冲区进行局部操作,通过延迟写入减少了频繁的磁盘 I/O 操作,定期调用 force() 方法,将内存中的数据同步到磁盘,减少数据丢失的风险。这样可以提高性能和灵活性,特别是在处理大量消息时。
内存数据的刷盘过程本篇就不在深究,只要知道是通过MappedByteBuffer对延迟写入配置相关策略,并在设定的时期将内存数据写入磁盘文件中就可以了
基于上面所有内容重新修改一版简易的流程图如下
总结
本篇涉及到的知识面比较广,在broker存储message中出现了许多我们在日常开发中并不常见但功能强大的对象,比如PriorityBlockingQueue、CountDownLatch、MappedByteBuffer,RocketMq正是合理的运用了他们,从而造就了rocketMq本身这款优秀的消息队列框架,这也是我们读源码所要学习的。下一篇我们将学习RocketMq的“大脑”NameServer!