全网最细RocketMQ源码四:消息存储

看完上一章之后,有没有很好奇,生产者发送完消息之后,server是如何存储,这一章节就来学习

入口

SendMessageProcessor.processRequest
在这里插入图片描述
在这里插入图片描述

  private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));msgInner.setPropertiesString(requestHeader.getProperties());msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));CompletableFuture<PutMessageResult> putMessageResult = null;Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (transFlag != null && Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 使用defaultMessageStore.aysncPutMessage存储putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

实际真正的负责存储就是DefaultMessageStore, 不过在讲述DefaultMessageStore的时候,我们是自底往上学,因为DefaultMessageStore比较复杂,从顶往下学容易学乱。先从地基开始,然后再看高楼大厦

MappedFile

public class MappedFile extends ReferenceResource {// 内存页大小:4kpublic static final int OS_PAGE_SIZE = 1024 * 4;protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);// 当前进程下 所有的 mappedFile占用的总虚拟内存大小private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);// 当前进程下 所有的 mappedFile个数private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);// 当前mappedFile数据写入点protected final AtomicInteger wrotePosition = new AtomicInteger(0);protected final AtomicInteger committedPosition = new AtomicInteger(0);// 当前mappedFIle数据落盘位点(flushedPosition 之前的数据 都是安全数据,flushedPosition~wrotePosition之间的数据 属于脏页)private final AtomicInteger flushedPosition = new AtomicInteger(0);// 文件大小protected int fileSize;// 文件通道protected FileChannel fileChannel;/*** Message will put to here first, and then reput to FileChannel if writeBuffer is not null.*/protected ByteBuffer writeBuffer = null;protected TransientStorePool transientStorePool = null;// 文件名称(commitLog ConsumeQueue:文件名就是 第一条消息的 物理偏移量   索引文件: 年月日小时分钟秒.. )private String fileName;// 文件名转longprivate long fileFromOffset;// 文件对象private File file;// 内存映射缓冲区,访问虚拟内存private MappedByteBuffer mappedByteBuffer;// 该文件下 保存的第一条 msg 的存储时间private volatile long storeTimestamp = 0;// 当前文件如果是 目录内 有效文件的 首文件的话,该值为trueprivate boolean firstCreateInQueue = false;
  • 构造方法
    在这里插入图片描述
    在这里插入图片描述

  • appendMessage方法
    在这里插入图片描述
    在这里插入图片描述

  • appendMessage(byte[] data)
    在这里插入图片描述

  • flush
    在这里插入图片描述

MappedFileQueue

public class MappedFileQueue {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);private static final int DELETE_FILES_BATCH_MAX = 10;// mfq 管理的目录(CommitLog: ../store/commitlog  或者  consumeQueue: ../store/xxx_topic/0)private final String storePath;// 目录下每个文件大小(commitLog文件 默认 1g     consumeQueue 文件 默认 600w字节)private final int mappedFileSize;// list,目录下的每个 mappedFile 都加入该listprivate final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();// 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。private final AllocateMappedFileService allocateMappedFileService;// 目录的刷盘位点(它的值: curMappedFile.fileName + curMappedFile.wrotePosition)private long flushedWhere = 0;private long committedWhere = 0;// 当前目录下最后一条msg存储时间private volatile long storeTimestamp = 0;
  • load方法
    在这里插入图片描述

  • getLastMappedFile

 /*** 参数1:startOffset ,文件起始偏移量* 参数2:needCreate ,当 list 为空时,是否创建mappedFile*/public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {// 该值控制是否创建 mappedFile,当需要创建mappedFile时,它充当文件名的结尾// 两种情况会创建:// 1. list内没有mappedFile// 2. list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..long createOffset = -1;MappedFile mappedFileLast = getLastMappedFile();if (mappedFileLast == null) {// 情况1  list内没有mappedFile// createOffset 取值 必须是 mappedFileSize 的倍数 或者 0createOffset = startOffset - (startOffset % this.mappedFileSize);}if (mappedFileLast != null && mappedFileLast.isFull()) { // 情况2  list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..// 上一个文件名 转long + mappedFileSizecreateOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}if (createOffset != -1 && needCreate) {// 这里面是创建 新的 mappedFile 的逻辑。// 获取待创建文件的 绝对路径(下次即将要创建的文件名)String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);// 获取 下下次 要创建的文件的 绝对路径String nextNextFilePath = this.storePath + File.separator+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);MappedFile mappedFile = null;if (this.allocateMappedFileService != null) {// 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,// 内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。// 当mappedFileSize >= 1g 的话,这里创建的mappedFile 会执行它的 预热方法。mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {log.error("create mappedFile exception", e);}}if (mappedFile != null) {if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}this.mappedFiles.add(mappedFile);}return mappedFile;}return mappedFileLast;}
  • flush
 /*** @param flushLeastPages (0 表示强制刷新, > 0 脏页数据必须达到 flushLeastPages 才刷新)* @return boolean true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘*/public boolean flush(final int flushLeastPages) {boolean result = true;// 获取当前正在刷盘的文件 (正在顺序写的mappedFile)MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);if (mappedFile != null) {// 获取mappedFile 最后一条消息的存储时间long tmpTimeStamp = mappedFile.getStoreTimestamp();// 调用mf 刷盘 ,返回mf的最新的落盘位点int offset = mappedFile.flush(flushLeastPages);// mf起始偏移量 + mf最新的落盘位点long where = mappedFile.getFileFromOffset() + offset;// true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘result = where == this.flushedWhere;// 将最新的目录刷盘位点 赋值给 flushedWherethis.flushedWhere = where;if (0 == flushLeastPages) {this.storeTimestamp = tmpTimeStamp;}}return result;}

CommitLog

ConsumeQueue

DefaultMessageStore

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/625537.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【现代密码学】笔记5--伪随机置换(分组加密)《introduction to modern cryphtography》

【现代密码学】笔记5--伪随机置换&#xff08;分组加密&#xff09;《introduction to modern cryphtography》 写在最前面5 伪随机排列实践构造&#xff08;块密码/分组密码&#xff09; 写在最前面 主要在 哈工大密码学课程 张宇老师课件 的基础上学习记录笔记。 内容补充&…

pve虚拟机的改名和修改ID

PVE的虚拟机名字在web界面是无法修改id和名字的。要注意id和名字不能重。 在使用备份时就发现虚拟机是以虚拟机id作为维一标识&#xff0c;如果有多台pve节点&#xff0c;但共用同一个nfs目录备份或使用同一个pbs进行备份时就必须保障id的唯一性。这时可以使用这个方法来进行补…

MySQL使用通配符进行数据搜索以及过滤

目录 1.什么是通配符&#xff1f; 2.通配符之→百分号(%) 3.通配符之→下划线(_) 4.通配符使用注意事项 *本文涉及概念来源于图灵程序设计丛书&#xff0c;数据库系列——《MySQL必知必会》 1.什么是通配符&#xff1f; 通配符(wildcard) &#xff1a;用来匹配值的一部分…

scrollTop与offsetTop解决小分辨率区域块向上滚动效果效果,结合animation与@keyframes实现标题左右闪动更换颜色效果。

scrollTop 是一个属性&#xff0c;它表示元素的滚动内容垂直滚动条的位置。对于可滚动元素&#xff0c;scrollTop 属性返回垂直滚动条滚动的像素数&#xff0c;即元素顶部被隐藏的像素数。 offsetTop 是一个属性&#xff0c;用于获取一个元素相对于其父元素的垂直偏移量&…

移动端开发进阶之蓝牙通讯(二)

移动端开发进阶之蓝牙通讯&#xff08;二&#xff09; 蓝牙广播是一种无线通讯技术&#xff0c;通过无线电波传输数据&#xff1b; 在蓝牙低功耗&#xff08;BLE&#xff09;协议中&#xff0c;广播通信是其重要组成部分&#xff0c;主要有两类使用场景&#xff1a; 单一方向的…

腾讯云服务器租用价格表_2024新版报价

腾讯云服务器租用价格表&#xff1a;轻量应用服务器2核2G3M价格62元一年、2核2G4M价格118元一年&#xff0c;540元三年、2核4G5M带宽218元一年&#xff0c;2核4G5M带宽756元三年、轻量4核8G12M服务器446元一年、646元15个月&#xff0c;云服务器CVM S5实例2核2G配置280.8元一年…

【android】rk3588-android-bt

文章目录 蓝牙框架HCI接口蓝牙VENDORLIBvendorlib是什么 代码层面解读vendorlib1、 vendorlib实现&#xff0c;协议栈调用2、协议栈实现&#xff0c;vendorlib调用&#xff08;回调函数&#xff09;2.1、 init函数2.2、BT_VND_OP_POWER_CTRL对应处理2.3、BT_VND_OP_USERIAL_OPE…

5.1 内容管理模块 - 课程预览、提交审核

内容管理模块 - 课程预览、提交审核 文章目录 内容管理模块 - 课程预览、提交审核一、课程预览1.1 需求分析1.2 freemarker 模板引擎1.2.1 Maven 坐标1.2.2 freemaker 相关配置信息1.2.3 添加模板 1.3 测试静态页面1.3.1 部署Nginx1.3.2 解决端口问题被占用问题1.3.3 配置host文…

紫光展锐T770安卓核心板_展锐T770 5G核心板规格参数

紫光展锐T770安卓核心板是一款高性能的5G安卓智能模块&#xff0c;拥有先进的6nm制程工艺和强大的性能。板载8GB Ram 256GBROM的内存单元&#xff0c;支持4K H.265/ H.264视频编解码&#xff0c;搭载Android 13以上操作系统&#xff0c;功能丰富。除了支持5G NSA和SA双模式向下…

我的NPI项目之设备系统启动(三) -- CDT的一个实例

上面说了这么多&#xff0c;这里就添加一个CDT的使用实例和简单的代码解析。 首先生成cdt_configure.xml配置文件&#xff0c;然后执行如下命令&#xff1a; python cdt_generator.py cdt_configure.xml CDT.bin; 就可以生成对应的CDT.bin文件。同时也会生成, 我们会利用ha…

『 C++ 』AVL树详解 ( 万字 )

&#x1f988;STL容器类型 在STL的容器中,分为几种容器: 序列式容器&#xff08;Sequence Containers&#xff09;: 这些容器以线性顺序存储元素&#xff0c;保留了元素的插入顺序。 支持随机访问&#xff0c;因此可以使用索引或迭代器快速访问任何位置的元素。 主要的序列式…

DBA技术栈(三):MySQL 性能影响因素

文章目录 前言一、影响MySQL性能的因素1.1 商业上的需求1.2 应用架构规划1.3 查询语句使用方式1.4 Schema的设计1.5 硬件环境 总结 前言 大部分人都一致认为一个数据库应用系统&#xff08;这里的数据库应用系统概指所有使用数据库的系统&#xff09;的性能瓶颈最容易出现在数…

MOSS 混元 巅峰对话!2024大模型发展都在这里

引言 2023 年&#xff0c;各大厂商争先投入 LLM 研发&#xff0c;一年内&#xff0c;在国内累计就有 200 余个大模型正式发布。尽管很多大模型并不完善&#xff0c;但行业内的研究专家及产业领袖都在为大模型的突破甚至 AGI 的发展&#xff0c;做着不懈探索。 但同时&#xff0…

基于Java (spring-boot)的停车场管理系统

一、项目介绍 基于Java (spring-boot)的停车场管理系统、预订车位系统、停车缴费系统功能&#xff1a; 登录、注册、后台首页、用户信息管理、车辆信息管理、新增车辆、车位费用设置、停泊车辆查询、车辆进出管理、登录日志查询、个人中心、预定停车位、缴费信息。 适用人群&…

Windows系统缺失api-ms-win-crt-runtime-l1-1-0.dll的修复方法

“在Windows操作系统环境下&#xff0c;用户经常遇到丢失api-ms-win-crt-runtime-l1-1-0.dll文件的问题&#xff0c;这一现象引发了广泛的关注与困扰。该dll文件作为Microsoft Visual C Redistributable Package的重要组成部分&#xff0c;对于系统内许多应用程序的正常运行起着…

C++ λ表达式

λ表达式提供了函数对象的另一种编程机制。 在 C 11 和更高版本中&#xff0c;Lambda 表达式&#xff08;通常称为 Lambda&#xff09;是一种在被调用的位置或作为参数传递给函数的位置定义匿名函数对象&#xff08;闭包&#xff09;的简便方法。 Lambda 通常用于封装传递给算法…

论文笔记(三十九)Learning Human-to-Robot Handovers from Point Clouds

Learning Human-to-Robot Handovers from Point Clouds 文章概括摘要1. 介绍2. 相关工作3. 背景3.1. 强化学习3.2. 移交模拟基准 4. 方法4.1. Handover Environment4.2. 感知4.3. 基于视觉的控制4.4. 师生两阶段培训 (Two-Stage Teacher-Student Training) 5. 实验5.1. 模拟评估…

CSS实现平行四边形

1、为什么实现平行四边形 在日常开发过程中&#xff0c;有些时候我们可以会遇到一种情况&#xff0c;如可视化大屏中要求我们横线实现对应的进度条&#xff0c;但进度条的内容是由无数个平行四边形组装类似于进度条的形式&#xff0c;那么我们就需要使用CSS来进行对应的实现。 …

OPT(erlang)打造一套缓存系统(一)

缓存的设计 这个简易缓存存储的是键/值对&#xff0c;其中键与键之间不得重复&#xff0c;并且每个键只能映射到一个值。这个设计背后的核心思想是为写人缓存的每一个值都分配一个独立的存储进程再将对应的键映射至该进程。你可能会对这种为每个值分配一个进程的设计感到惊讶&…

20240114-寻找零损失或一损失的玩家

题目要求 给定一个整数数组 matches&#xff0c;其中 matches[i] [winneri, Loseri] 表示玩家 Winneri 在一场比赛中击败了玩家 Loseri。 返回大小为 2 的列表答案&#xff0c;其中&#xff1a; answer[0] 是所有未输掉任何比赛的玩家的列表。answer[1] 是仅输掉一场比赛的…