全网最细RocketMQ源码四:消息存储

看完上一章之后,有没有很好奇,生产者发送完消息之后,server是如何存储,这一章节就来学习

入口

SendMessageProcessor.processRequest
在这里插入图片描述
在这里插入图片描述

  private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));msgInner.setPropertiesString(requestHeader.getProperties());msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));CompletableFuture<PutMessageResult> putMessageResult = null;Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (transFlag != null && Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 使用defaultMessageStore.aysncPutMessage存储putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

实际真正的负责存储就是DefaultMessageStore, 不过在讲述DefaultMessageStore的时候,我们是自底往上学,因为DefaultMessageStore比较复杂,从顶往下学容易学乱。先从地基开始,然后再看高楼大厦

MappedFile

public class MappedFile extends ReferenceResource {// 内存页大小:4kpublic static final int OS_PAGE_SIZE = 1024 * 4;protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);// 当前进程下 所有的 mappedFile占用的总虚拟内存大小private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);// 当前进程下 所有的 mappedFile个数private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);// 当前mappedFile数据写入点protected final AtomicInteger wrotePosition = new AtomicInteger(0);protected final AtomicInteger committedPosition = new AtomicInteger(0);// 当前mappedFIle数据落盘位点(flushedPosition 之前的数据 都是安全数据,flushedPosition~wrotePosition之间的数据 属于脏页)private final AtomicInteger flushedPosition = new AtomicInteger(0);// 文件大小protected int fileSize;// 文件通道protected FileChannel fileChannel;/*** Message will put to here first, and then reput to FileChannel if writeBuffer is not null.*/protected ByteBuffer writeBuffer = null;protected TransientStorePool transientStorePool = null;// 文件名称(commitLog ConsumeQueue:文件名就是 第一条消息的 物理偏移量   索引文件: 年月日小时分钟秒.. )private String fileName;// 文件名转longprivate long fileFromOffset;// 文件对象private File file;// 内存映射缓冲区,访问虚拟内存private MappedByteBuffer mappedByteBuffer;// 该文件下 保存的第一条 msg 的存储时间private volatile long storeTimestamp = 0;// 当前文件如果是 目录内 有效文件的 首文件的话,该值为trueprivate boolean firstCreateInQueue = false;
  • 构造方法
    在这里插入图片描述
    在这里插入图片描述

  • appendMessage方法
    在这里插入图片描述
    在这里插入图片描述

  • appendMessage(byte[] data)
    在这里插入图片描述

  • flush
    在这里插入图片描述

MappedFileQueue

public class MappedFileQueue {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);private static final int DELETE_FILES_BATCH_MAX = 10;// mfq 管理的目录(CommitLog: ../store/commitlog  或者  consumeQueue: ../store/xxx_topic/0)private final String storePath;// 目录下每个文件大小(commitLog文件 默认 1g     consumeQueue 文件 默认 600w字节)private final int mappedFileSize;// list,目录下的每个 mappedFile 都加入该listprivate final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();// 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。private final AllocateMappedFileService allocateMappedFileService;// 目录的刷盘位点(它的值: curMappedFile.fileName + curMappedFile.wrotePosition)private long flushedWhere = 0;private long committedWhere = 0;// 当前目录下最后一条msg存储时间private volatile long storeTimestamp = 0;
  • load方法
    在这里插入图片描述

  • getLastMappedFile

 /*** 参数1:startOffset ,文件起始偏移量* 参数2:needCreate ,当 list 为空时,是否创建mappedFile*/public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {// 该值控制是否创建 mappedFile,当需要创建mappedFile时,它充当文件名的结尾// 两种情况会创建:// 1. list内没有mappedFile// 2. list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..long createOffset = -1;MappedFile mappedFileLast = getLastMappedFile();if (mappedFileLast == null) {// 情况1  list内没有mappedFile// createOffset 取值 必须是 mappedFileSize 的倍数 或者 0createOffset = startOffset - (startOffset % this.mappedFileSize);}if (mappedFileLast != null && mappedFileLast.isFull()) { // 情况2  list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..// 上一个文件名 转long + mappedFileSizecreateOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}if (createOffset != -1 && needCreate) {// 这里面是创建 新的 mappedFile 的逻辑。// 获取待创建文件的 绝对路径(下次即将要创建的文件名)String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);// 获取 下下次 要创建的文件的 绝对路径String nextNextFilePath = this.storePath + File.separator+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);MappedFile mappedFile = null;if (this.allocateMappedFileService != null) {// 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,// 内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。// 当mappedFileSize >= 1g 的话,这里创建的mappedFile 会执行它的 预热方法。mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {log.error("create mappedFile exception", e);}}if (mappedFile != null) {if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}this.mappedFiles.add(mappedFile);}return mappedFile;}return mappedFileLast;}
  • flush
 /*** @param flushLeastPages (0 表示强制刷新, > 0 脏页数据必须达到 flushLeastPages 才刷新)* @return boolean true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘*/public boolean flush(final int flushLeastPages) {boolean result = true;// 获取当前正在刷盘的文件 (正在顺序写的mappedFile)MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);if (mappedFile != null) {// 获取mappedFile 最后一条消息的存储时间long tmpTimeStamp = mappedFile.getStoreTimestamp();// 调用mf 刷盘 ,返回mf的最新的落盘位点int offset = mappedFile.flush(flushLeastPages);// mf起始偏移量 + mf最新的落盘位点long where = mappedFile.getFileFromOffset() + offset;// true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘result = where == this.flushedWhere;// 将最新的目录刷盘位点 赋值给 flushedWherethis.flushedWhere = where;if (0 == flushLeastPages) {this.storeTimestamp = tmpTimeStamp;}}return result;}

CommitLog

ConsumeQueue

DefaultMessageStore

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/625537.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【现代密码学】笔记5--伪随机置换(分组加密)《introduction to modern cryphtography》

【现代密码学】笔记5--伪随机置换&#xff08;分组加密&#xff09;《introduction to modern cryphtography》 写在最前面5 伪随机排列实践构造&#xff08;块密码/分组密码&#xff09; 写在最前面 主要在 哈工大密码学课程 张宇老师课件 的基础上学习记录笔记。 内容补充&…

pve虚拟机的改名和修改ID

PVE的虚拟机名字在web界面是无法修改id和名字的。要注意id和名字不能重。 在使用备份时就发现虚拟机是以虚拟机id作为维一标识&#xff0c;如果有多台pve节点&#xff0c;但共用同一个nfs目录备份或使用同一个pbs进行备份时就必须保障id的唯一性。这时可以使用这个方法来进行补…

MySQL使用通配符进行数据搜索以及过滤

目录 1.什么是通配符&#xff1f; 2.通配符之→百分号(%) 3.通配符之→下划线(_) 4.通配符使用注意事项 *本文涉及概念来源于图灵程序设计丛书&#xff0c;数据库系列——《MySQL必知必会》 1.什么是通配符&#xff1f; 通配符(wildcard) &#xff1a;用来匹配值的一部分…

返利机器人详细解读,纯属个人观点

返利机器人详细解读&#xff0c;纯属个人观点 大家好&#xff0c;我是免费搭建查券返利机器人赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天&#xff0c;我将为你详细解读返利机器人的发展历程、争议焦点以及…

scrollTop与offsetTop解决小分辨率区域块向上滚动效果效果,结合animation与@keyframes实现标题左右闪动更换颜色效果。

scrollTop 是一个属性&#xff0c;它表示元素的滚动内容垂直滚动条的位置。对于可滚动元素&#xff0c;scrollTop 属性返回垂直滚动条滚动的像素数&#xff0c;即元素顶部被隐藏的像素数。 offsetTop 是一个属性&#xff0c;用于获取一个元素相对于其父元素的垂直偏移量&…

【Mybatis-Plus】mybatisplus更新时,实体字段为空,数据库不更新的解决方案

一、背景描述 项目技术栈&#xff1a;jdk (1.8) spring boot (2.1.0) mybatis-plus (3.5.1) 数据库&#xff1a; MySQL 字段类型&#xff1a;varchar 和 Integer 从前端传过来的数据实体字段&#xff0c; convertType 和 step 设为null时&#xff0c;使用mybatis-plus 的…

移动端开发进阶之蓝牙通讯(二)

移动端开发进阶之蓝牙通讯&#xff08;二&#xff09; 蓝牙广播是一种无线通讯技术&#xff0c;通过无线电波传输数据&#xff1b; 在蓝牙低功耗&#xff08;BLE&#xff09;协议中&#xff0c;广播通信是其重要组成部分&#xff0c;主要有两类使用场景&#xff1a; 单一方向的…

力扣2085-统计出现过一次的公共字符串

统计出现过一次的公共字符串 题目链接 解题思路&#xff1a; 显然我们需要统计每个字符串数组中每个字符串出现的字数 使用哈希表key表示字符产&#xff0c;val用来记录该字符串出现的次数 最后遍历map1&#xff0c;要找到每个字符串只出现一次&#xff0c;并且在两个字符串数…

用Python实现给图片去黑边

图片去黑边&#xff08;只考虑了去水平方向上的黑边&#xff09;的核心算法是要找到图片顶部或顶部的黑边位置&#xff0c;即两个纵坐标值&#xff0c; 主要用到了canny边缘计算、 houghlines直线检测、easyocr识别等算法。 给图片去黑边的实现逻辑为&#xff1a; 先进行canny边…

腾讯云服务器租用价格表_2024新版报价

腾讯云服务器租用价格表&#xff1a;轻量应用服务器2核2G3M价格62元一年、2核2G4M价格118元一年&#xff0c;540元三年、2核4G5M带宽218元一年&#xff0c;2核4G5M带宽756元三年、轻量4核8G12M服务器446元一年、646元15个月&#xff0c;云服务器CVM S5实例2核2G配置280.8元一年…

127.0.0.1和0.0.0.0的区别

在网络开发中&#xff0c;经常会涉及到两个特殊的IP地址&#xff1a;127.0.0.1和0.0.0.0。这两者之间有一些关键的区别&#xff0c;本文将深入介绍它们的作用和用途。 127.0.0.1 127.0.0.1 是本地回环地址&#xff0c;通常称为 “localhost”。作用是让网络应用程序能够与本地…

【android】rk3588-android-bt

文章目录 蓝牙框架HCI接口蓝牙VENDORLIBvendorlib是什么 代码层面解读vendorlib1、 vendorlib实现&#xff0c;协议栈调用2、协议栈实现&#xff0c;vendorlib调用&#xff08;回调函数&#xff09;2.1、 init函数2.2、BT_VND_OP_POWER_CTRL对应处理2.3、BT_VND_OP_USERIAL_OPE…

5.1 内容管理模块 - 课程预览、提交审核

内容管理模块 - 课程预览、提交审核 文章目录 内容管理模块 - 课程预览、提交审核一、课程预览1.1 需求分析1.2 freemarker 模板引擎1.2.1 Maven 坐标1.2.2 freemaker 相关配置信息1.2.3 添加模板 1.3 测试静态页面1.3.1 部署Nginx1.3.2 解决端口问题被占用问题1.3.3 配置host文…

紫光展锐T770安卓核心板_展锐T770 5G核心板规格参数

紫光展锐T770安卓核心板是一款高性能的5G安卓智能模块&#xff0c;拥有先进的6nm制程工艺和强大的性能。板载8GB Ram 256GBROM的内存单元&#xff0c;支持4K H.265/ H.264视频编解码&#xff0c;搭载Android 13以上操作系统&#xff0c;功能丰富。除了支持5G NSA和SA双模式向下…

数学建模 | 运筹学的 LINGO 软件(附 LINGO代码)

===================================================== github:https://github.com/MichaelBeechan CSDN:https://blog.csdn.net/u011344545 ===================================================== 运筹学的 LINGO 软件 1 简介2 LINGO 快速入门3 LINGO 中的集

我的NPI项目之设备系统启动(三) -- CDT的一个实例

上面说了这么多&#xff0c;这里就添加一个CDT的使用实例和简单的代码解析。 首先生成cdt_configure.xml配置文件&#xff0c;然后执行如下命令&#xff1a; python cdt_generator.py cdt_configure.xml CDT.bin; 就可以生成对应的CDT.bin文件。同时也会生成, 我们会利用ha…

『 C++ 』AVL树详解 ( 万字 )

&#x1f988;STL容器类型 在STL的容器中,分为几种容器: 序列式容器&#xff08;Sequence Containers&#xff09;: 这些容器以线性顺序存储元素&#xff0c;保留了元素的插入顺序。 支持随机访问&#xff0c;因此可以使用索引或迭代器快速访问任何位置的元素。 主要的序列式…

03 SpringMVC响应数据之接收Cookie和请求头+原生API+共享域对象操作

下载postman,测试传json数据 1. 接收cookie 用CookieValue注解将cookie值绑定到控制器中的handler参数。 Controller类中的一个handler GetMapping("/CookieTest") public void handle(CookieValue("cookie的id(name)") String cookie) { //... }2. 接收…

leetcode-2085.统计出现过一次的公共字符串

题目链接&#xff1a;2085. 统计出现过一次的公共字符串 - 力扣&#xff08;LeetCode&#xff09; 解题思路&#xff1a; 1、暴力破解 首先想到的是暴力破解&#xff0c;用两个循环遍历列表&#xff0c;然后将单词出现的情况都记录在一个字典里面。最后遍历字典找到满足条件…

DBA技术栈(三):MySQL 性能影响因素

文章目录 前言一、影响MySQL性能的因素1.1 商业上的需求1.2 应用架构规划1.3 查询语句使用方式1.4 Schema的设计1.5 硬件环境 总结 前言 大部分人都一致认为一个数据库应用系统&#xff08;这里的数据库应用系统概指所有使用数据库的系统&#xff09;的性能瓶颈最容易出现在数…