从零开始读RocketMq源码(三)Broker存储Message流程解析

目录

前言

准备

消息载体CommitLog

文件持久化位置 

源码解析

broker消息对象MessageExtBrokerInner

异步存储message

CommitLog的真相

创建MappedFile文件

加入异步刷盘队列

Message异步存储MappedByteBuffer

总结


前言

在面试中我们经常会听到这样的回答,生产者将message发送给broker服务,然后消费者从broker中获取消息并消费,为了保证message在broker服务中不丢失,mq会对消息数据进行持久化到磁盘中。那么message到达broker服务后是如何进行存储并持久化到磁盘中的呢?这就是本篇要学习的内容。

准备

源码地址:https://github.com/apache/rocketmq

目前最新版本为:5.2.0

那么我们在idea上切换分支为 release-5.2.0

消息载体CommitLog

该对象是broker服务接收到message后进行存储的数据对象,一般就把存储消息的文件就称为commitLog文件也就是最终存储磁盘上的数据文件。

大致的message流向如图:

根据源码可以知道,一个commitLog文件最大存储1G数据,文件写满了,则会写入下一个文件中

文件持久化位置 

commitlog文件的持久化存放的位置是通过broker.conf配置文件中storePathCommitLog配置

storePathCommitLog = /Users/leonsh/rocketmqnamesrv/store/commitlog

最后生成的文件为这样

文件命名

查看上面图片可知文件的名称是一串数字20个0组成,因为文件名称是按照偏移量offset来命名的,

因为这是第一个文件所以offset为0,补全20位,所以文件名称为20个0

,以此类推第二个文件名称则为00000000001073741824

上面说过一个commitlog文件最大存储1G,而1G=1024*1024*1024=1073741824bit,这就是第二个文件的偏移量

源码解析

前面说到Producer发送message到broker后,broker会对接收的message请求进行处理

//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:87
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request)

上面的方法名中顾名思义就是处理请求的,并且所在的文件命名SendMessageProcessor也说明了该类的作用。那么我们就从该方法深入源码中

看方法引用位置我们会发现许多地方调用了该方法,先抛开前面broker如何接收的,反正最后消息会到达这里,从该方法开始就是broker处理message的核心流程也是本篇学习的重点

broker消息对象MessageExtBrokerInner

MessageExtBrokerInner该对象就是用来后续对message处理的封装

//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:255
//获取请求对象中的消息体
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
//初始化消息对象
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
  • requestHeader 该对象就是在上一篇中讲到的发送message的消息请求头
  • 从请求头中获取设置的队列id,如果没有设置,则会从对应的topic中随机获取一个randomQueueId()
  • 从请求头中获取topic名称,通过名称再去获取broker中存储的topic对应的数据对象,深入源码会发现,broker中存储topic数据也是使用的map,ConcurrentMap<String, TopicConfig> topicConfigTable
  • 最后就是创建MessageExtBrokerInner对象并设值

异步存储message

//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:255
CompletableFuture<PutMessageResult> asyncPutMessageFuture;
if (sendTransactionPrepareMessage) {//事务消息asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {//普通消息asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}

或许大家和博主开始一样都有一个疑惑,我们生产者发送的是同步消息,为何到了broker却是异常存储呢?

1.其实生产者发送同步消息和broker异步存储都是相互独立互不干扰的,broker异步存储只是为了提高mq接收消息的写入性能吞吐量broker异步存储会将写入内存的message进行异步刷盘。

2.就算broker是异步存储但也不会立即返回结果给生产者,需要等待broker异步刷盘成功才会返回结果给生产者,通过broker提供的CompletableFuture机制实现。

什么,看完解释还是有点懵,有点抽象,我们继续向下深入源码,一步一步解开疑惑,我相信看完后面的解析同样会豁然开朗的!

CommitLog的真相

//源码位置
//包名:org.apache.rocketmq.broker.processor
//文件:SendMessageProcessor
//行数:255
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

到这里,本文开头提到的commitLog对象终于出现了,查看该源码可知,commitlog对象中定义了一个MappedFileQueue对象这个对象又是做什么的,我们继续深入源码

//源码位置
//包名:org.apache.rocketmq.store
//文件:CommitLog
//行数:942
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

深入该方法,大概意思就是从MappedFileQueue对象中的CopyOnWriteArrayList<MappedFile> mappedFiles集合中取出里面的最后一个MappedFile对象,至此赢来大结局MappedFile对象才是最终映射到磁盘文件的,而CommitLog可以理解为MappedFile对象的外层封装。但落到磁盘上的文件我们依然称为commitLog文件

扩展:

CopyOnWriteArrayList 是 Java 中的一种线程安全的 List 实现,属于 java.util.concurrent 包

读操作:不需要加锁,直接操作底层数组,底层数组在写操作时是一个副本,读操作不会影响正在进行的写操作,能够保证高效的并发读性能。

写操作:会创建底层数组的一个新的副本,对这个副本进行修改, 修改完成后,新的副本会替换原来的数组

创建MappedFile文件

//源码位置
//包名:org.apache.rocketmq.store
//文件:CommitLog
//行数:1001
if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noiseif (isCloseReadAhead()) {setFileReadMode(mappedFile, LibC.MADV_RANDOM);}
}

因为broker是启动后首次存储数据,所以上面获取出来的mappedFile一定为空则进入if代码块

因此偏移量也是初始值0

生成MappedFile文件路径名称

//源码位置
//包名:org.apache.rocketmq.store
//文件:MappedFileQueue
//行数:345
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset+ this.mappedFileSize);
  • this.storePath:该字段就是前面在broker.conf文件中配置的文件地址
  • File.separator:分隔符
  • UtilAll.offset2FileName(createOffset):生成20位数字组成的文件名称,当前createOffset=0。

为何会生成两个地址nextFilePathnextNextFilePath呢?

因为mq在生成当前需要使用的文件时同时生成下一个使用的文件,当第一个文件存储满后,直接使用下一个文件,减小了创建文件的开销,提高mq的性能。所以会同时生成2个文件

那么问题来了,为何本文开头生成的文件怎么只有一个?

我们查看源码提交记录可知,nextNextFilePath第二个文件是2021年9月才新增的

查看rocketMq在github上各个版本的发布时间,2021年9月并没有发布新版本,但是2021年10月发布了rocketmq-all-4.9.2

那么由此可得,rocketMq同时创建2个文件从版本4.9.2开始支持,之前的版本都只会创建1个文件

因为博主的broker服务是通过docker镜像启动的,但是查看镜像版本显示的确为最新版本

其实这只是rocketMq镜像的版本,而我们看的是镜像中使用rocketMq框架版本

执行命令查看镜像的详细信息

docker inspect apacherocketmq/rocketmq:latest

由此可得博主的rocketMq版本为:4.6.0,所以只会创建一个commitLog文件

加入异步刷盘队列

//源码位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行数:62
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
//...
//加入队列触发异步刷盘操作
boolean offerOK = this.requestQueue.offer(nextReq);
//...
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
//...
boolean offerOK = this.requestQueue.offer(nextNextReq);
  • AllocateRequest:就是message异步存储请求最后的封装
  • this.requestTable:也是一个map对象 ConcurrentMap<String, AllocateRequest> requestTable;key为文件的路径,value则为AllocateRequest
  • this.requestQueue这是一个队列PriorityBlockingQueue<AllocateRequest> requestQueue队列元素为AllocateRequest

PriorityBlockingQueue是如何做到异步刷盘的呢?

该队列就是为broker实现异步存储核心,可能大家对这个队列比较陌生

它是Java 中 java.util.concurrent 包提供的一个线程安全的优先级队列。它基于优先级堆实现,能够保证元素按照自然顺序或者指定的比较器顺序进行排序

因为它是一个队列那么我们首先就会想到生产者消费者,那么就起到了异步解耦的作用

他有两个非常重要的方法:

  • offer(): 将一个元素插入到队列中
  • take(): 从队列中获取并移除元素 由于 PriorityBlockingQueue 是一个阻塞队列,如果队列为空,take 方法会一直阻塞直到有元素可用

总结:由上面我们知道offer()一般用于生产者调用,而take()则是消费者调用,当队列为空时消费者线程会一直阻塞,只要队列中存入对象,消费者就会感知到并消费。可以理解为消费者和生产者共享PriorityBlockingQueue对象

//源码位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行数:99
AllocateRequest result = this.requestTable.get(nextFilePath);
//...
//阻塞等待刷盘结果
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);

上面源码的作用就是等待异步刷盘结果

  • 第一段就是取出之前存入的第一个请求对象AllocateRequest
  • 第二段则是判断异步刷盘是否完成,成功则返回,还没有处理完则一直阻塞,直到达到超时时间waitTimeOut

result.getCountDownLatch().await为何能做到阻塞等待结果呢?

进入AllocateRequest对象中可知,操作的是这个对象CountDownLatch countDownLatch = new CountDownLatch(1)

CountDownLatch或许大家不太熟悉,但ReentrantLock大家并不陌生吧,面试中经常问到,他们同属于java并发包JUC( java.util.concurrent 下的对象.

概念:它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。它是通过一个计数器实现的,该计数器初始化为一个给定的值。每当一个线程完成了它的一项操作后,这个计数器就递减。当计数器的值到达零时,等待在这个计数器上的线程将被唤醒并继续执行

总结:通过源码我们看到AllocateRequest被创建时里面属性CountDownLatch中计数器默认就是1所以需要一直等待被修改为0时才会继续执行后续逻辑,那就是等待异步刷盘完成。

Message异步存储MappedByteBuffer

//源码位置
//包名:org.apache.rocketmq.store
//文件:AllocateMappedFileService
//行数:155
AllocateRequest req = null;
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());

该源码就是对之前加入队列的AllocateRequest取出来,并执行后续的存储操作,可以说就是消费者消费的地方,我们可以结合源码上下文代码可以知道,所在的类的顶级继承类是Runnable,而上面代码所在方法就是被重写的run()方法调用,可以认为消费者是在单独的一个线程中执行的。

获取缓冲区

//源码位置
//包名:org.apache.rocketmq.store.logfile
//文件:DefaultMappedFile
//行数:607
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();

被操作的对象是MappedByteBuffer

MappedByteBuffer是什么?

是 Java NIO(New Input/Output)中的一个类,它允许将文件直接映射到内存中,从而提高文件的读写效率。RocketMQ 使用 MappedByteBuffer 来管理 CommitLog 文件,以实现高效的消息存储和检索通过将文件映射到内存,RocketMQ 可以直接操作内存数据,而无需频繁的磁盘 I/O 操作。

MappedByteBuffer也是mmap的一种实现方式

什么是mmap?

mmap(内存映射文件)是一种将文件内容映射到进程的地址空间的技术。这样一来,文件内容就可以像访问内存一样被读写,从而显著提高 I/O 操作的效率。

调用mappedByteBuffer.slice()方法的作用是什么?

用于创建一个新的缓冲区,该缓冲区与原始缓冲区共享相同的底层内存,但具有独立的位置、限制和标记。这在需要操作内存映射文件的某一部分时非常有用,而不影响整个映射文件的其他部分。

MappedByteBuffer有两大特点:

  • 延迟写入:数据写入 MappedByteBuffer 时,实际上是写入了内存中的映射区域,操作系统会在合适的时候将这些数据同步到磁盘,而不是立即进行磁盘 I/O 操作。
  • 强制刷新:为了确保数据的一致性和持久性,MappedByteBuffer 提供了 force() 方法,可以将内存中的修改强制刷新到磁盘
//源码位置
//包名:org.apache.rocketmq.store.logfile
//文件:DefaultMappedFile
//行数:611
byteBuffer.put((int) i, (byte) 0);
//...
mappedByteBuffer.force();

总结:那么在RocketMQ 中,MappedFile 类通过使用 MappedByteBuffer 来管理 CommitLog 文件,并且使用 slice() 方法来创建子缓冲区进行局部操作,通过延迟写入减少了频繁的磁盘 I/O 操作,定期调用 force() 方法,将内存中的数据同步到磁盘,减少数据丢失的风险。这样可以提高性能和灵活性,特别是在处理大量消息时。


内存数据的刷盘过程本篇就不在深究,只要知道是通过MappedByteBuffer对延迟写入配置相关策略,并在设定的时期将内存数据写入磁盘文件中就可以了


基于上面所有内容重新修改一版简易的流程图如下

总结

本篇涉及到的知识面比较广,在broker存储message中出现了许多我们在日常开发中并不常见但功能强大的对象,比如PriorityBlockingQueueCountDownLatchMappedByteBufferRocketMq正是合理的运用了他们,从而造就了rocketMq本身这款优秀的消息队列框架,这也是我们读源码所要学习的。下一篇我们将学习RocketMq的“大脑”NameServer!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/45005.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

国产化趋势下源代码数据防泄密的信创沙盒的方案分享

随着国产化的大力推进&#xff0c;越来越多的企事业单位在逐步替换Windows、Linux等操作系统的使用。那么什是国产化了&#xff1f;国产化是指在产品或服务中采用国内自主研发的技术和标注&#xff0c;替代过去依赖的他国的产品和服务&#xff0c;国产化又被称之为“信创”&…

GitLab CI/CD实现项目自动化部署

1 GitLab CI/CD介绍 GitLab CI/CD 是 GitLab 中集成的一套用于软件开发的持续集成&#xff08;Continuous Integration&#xff09;、持续交付&#xff08;Continuous Delivery&#xff09;和持续部署&#xff08;Continuous Deployment&#xff09;工具。这套系统允许开发团队…

vue里实现点击按钮回到页面顶部功能,博客必备!

效果 步骤 1-标签结构 动态绑定样式style&#xff0c;监听点击事件&#xff0c;后续控制opacity透明度。和滚动距离 <div class"toTop" :style"dynamicStyles" click"toTop"><!--<i class"fa fa-arrow-up"></i>…

超简单的通配证书签发工具,免费,无需安装任何插件到本地

常见的acme.sh 或者 lego等工具需要配置&#xff0c;安装不灵活&#xff0c;续签需要配置计划任务&#xff0c;签发单域名证书或者通配证书需要不同的指令和配置&#xff0c;繁琐&#xff0c;如果自己程序想要对接签发证书的api有的不支持&#xff0c;有的用起来繁琐。 最近发…

【VIVADO SDK调试遇到DataAbortHandler】

问题 SDK调试遇到DataAbortHandler问题。 运行后不显示结果&#xff0c;debug模式下发现进入DataAbortHandler异常函数。程序中存在大数组。 原因:SDK默认的堆栈为1024bytes,需要将堆栈调大。 修改方法&#xff1a; 解决:对application中src下的lscript.ld双击&#xff0c;…

Linux 程序卡死的特殊处理

一、前言 Linux环境。 我们在日常编写的程序中&#xff0c;可能会出现一些细节问题&#xff0c;导致程序卡死&#xff0c;即程序没法正常运行&#xff0c;界面卡住&#xff0c;也不会闪退... 当这种问题出现在客户现场&#xff0c;那就是大问题了。。。 当我们暂时还无法排…

如何定量选择孔销基准?-DTAS来帮你!

在当今快速发展的工程领域&#xff0c;公差仿真的作用日渐重要&#xff0c;在公差仿真中&#xff0c;基准体系的选择对于最终结果更是至关重要。基准体系不同可能导致仿真过程中的参数计算、误差分析以及最终的工程设计都有所不同。基准体系作为评估和比较的参照&#xff0c;直…

Suricata引擎二次开发之命中规则定位

二开背景 suricata是一款高性能的开源网络入侵检测防御引擎&#xff0c;旨在检测、预防和应对网络中的恶意活动和攻击。suricata引擎使用多线程技术&#xff0c;能够快速、准确地分析网络流量并识别潜在的安全威胁&#xff0c;是众多IDS和IPS厂商的底层规则检测模块。 前段时间…

强制升级最新系统,微软全面淘汰Win10和部分11用户

说出来可能不信&#xff0c;距离 Windows 11 正式发布已过去整整三年时间&#xff0c;按理说现在怎么也得人均 Win 11 水平了吧&#xff1f; 然而事实却是&#xff0c;三年时间过去 Win 11 占有率仅仅突破到 29%&#xff0c;也就跳起来摸 Win 10 屁股的程度。 2024 年 6 月 Wi…

【Linux】磁盘性能压测-FIO工具

一、FIO工具介绍 fio&#xff08;Flexible I/O Tester&#xff09;是一个用于评估计算机系统中 I/O 性能的强大工具。 官网&#xff1a;fio - fio - Flexible IO Tester 注意事项&#xff01; 1、不要指定文件系统名称&#xff08;如/dev/mapper/centos-root)&#xff0c;避…

react启用mobx @decorators装饰器语法

react如果没有经过配置&#xff0c;直接使用decorators装饰器语法会报错&#xff1a; Support for the experimental syntax ‘decorators’ isn’t currently enabled 因为react默认是不支持装饰器语法&#xff0c;需要做一些配置来启用装饰器语法。 step1: 在 tsconfig.js…

【学术会议征稿】第三届能源互联网及电力系统国际学术会议(ICEIPS 2024)

第三届能源互联网及电力系统国际学术会议&#xff08;ICEIPS 2024&#xff09; 2024 3rd International Conference on Energy Internet and Power Systems 能源互联网是实现新一代电力系统智能互动、开放共享的重要支撑技术之一&#xff0c;也是提升能源调度效率&#xff0…

Jetson-AGX-Orin 非docker环境源码编译安装CyberRT

Jetson-AGX-Orin 非docker环境源码编译安装CyberRT 1、安装依赖 sudo apt update sudo apt-get install g gdb gcc cmake sudo apt install libpoco-dev uuid-dev libncurses5-dev python3-dev python3-pip python3 -m pip install protobuf3.14.02、下载CyberRT源码 git cl…

python+pygame实现五子棋人机对战之三

上回讲过&#xff1a; pythonpygame实现五子棋人机对战之一 pythonpygame实现五子棋人机对战之二 界面已经有了&#xff0c;并且可以支持鼠标操作选择菜单和人机对战开始下棋了&#xff0c;那电脑是如何应手落子呢&#xff1f;以下内容是通用的类&#xff0c;全部放在utils.…

全球高端销量第一 凯迪仕智能锁建博会获重磅大奖再次遥遥领先

2024年7月11日&#xff0c;第26届中国广州建博会圆满落幕。Kaadas凯迪仕第11年受邀参展&#xff0c;凭借超吸睛的赛博风展馆和重磅旗舰传奇大师K70系列智能锁震撼亮相&#xff0c;吸引抖音网红云集打卡直播以及众多主流及行业媒体聚集报道。在大家居建装行业全球第一展的舞台上…

问题清除指南|Dell OptiPlex 7070 升级 win11 开启 TPM 2.0 教程

前言&#xff1a;最近想把实验室台式机的系统从 Windows 10 升级到 Windows 11&#xff0c;遇到一点小问题&#xff0c;在此记录一下解决办法。 ⚠️ 注&#xff1a;本教程仅在 Dell OptiPlex 7070 台式机系统中测试有效&#xff0c;并不保证其余型号机器适用此教程。 参考链接…

中国科学院地理所牛书丽团队《Global Change Biology 》最新成果!

本文首发于“生态学者”微信公众号&#xff01; 在全球气候变化的背景下&#xff0c;干旱地区的扩张对生态系统的氮循环产生了深远影响。氮同位素&#xff08;δ15N&#xff09;的天然丰度&#xff0c;尤其是土壤中的δ15N&#xff0c;是评估陆地生态系统氮循环动态和氮限制的关…

【ARMv8/v9 GIC 系列 1.7 -- GIC PPI | SPI | SGI | LPI 中断使能配置概述】

请阅读【ARM GICv3/v4 实战学习 】 文章目录 GIC 各种中断使能配置PPIs(每个处理器私有中断)SPIs(共享外设中断)SGIs(软件生成的中断)LPIs(局部中断)GIC 各种中断使能配置 在ARM GICv3和GICv4架构中,不同类型的中断(如PPIs、SPIs、SGIs和LPIs)可以通过不同的方式进…

分享:2024好的ai文章生成器下载资源 tzqsbic

在当今数字化的时代&#xff0c;ai技术的发展日新月异&#xff0c;为我们的生活和工作带来了诸多便利。其中&#xff0c;ai文章生成器作为一项创新的工具&#xff0c;给当代人们带来了很多好处&#xff0c;尤其是对于很多创作者&#xff0c;不仅能解决创作困难&#xff0c;而且…

【开发工具】webStrom2024版-永久使用

1、解压文件 2、安装步骤 先执行unistall-current-user.vbs&#xff0c;确保当前环境变量下没有历史使用记录。再执行install-current-user.vbs。运行的时候&#xff0c;会有第一个弹窗&#xff0c;点击确定&#xff0c;稍微等待一会&#xff0c;会出现 Done 的弹窗&#xff0…