4、Rocketmq之存储原理

CommitLog ~ MappedFileQueue ~ MappedFile集合
在这里插入图片描述

正常情况下,RocketMQ支持消息体字节数最多为1个G。注意该消息体并不单单是消息体body。如果生产的消息其字节数超过1个G则该消息是无法被落盘处理的。因为没有一个MapperFile文件可以承载该消息所有的字节数。

1.AllocateMappedFileService

参考文章

异步初始化CommitLog文件时优先初始化nextFilePath、nextNextFilePath两个文件。

同时创建nextFilePath【/data/rocketmq/commitlog/00000000000000000000】、nextNextFilePath【/data/rocketmq/commitlog/00000000000000000050】两个文件是如何使用的呢?

  1. 优先返回nextFilePath,并添加到MappedFileQueue集合属性mappedFiles中。此时队列requestQueue为空。requestTable集合元素为nextNextFilePath【/data/rocketmq/commitlog/00000000000000000050】。
  2. 如果消息体的长度没有达到当前MapperFile中字节缓冲区capacity的大小,则不会创建新的MapperFile文件。
  3. 如果步骤2不成立,则创建新的nextFilePath【/data/rocketmq/commitlog/00000000000000000050】、nextNextFilePath【/data/rocketmq/commitlog/00000000000000000100】对应的MapperFile文件。但是由于requestTable集合不为空即存在nextFilePath对应的MapperFile文件【/data/rocketmq/commitlog/00000000000000000050】则删除并返回当前集合元素。
  4. 此时requestTable集合元素为nextNextFilePath【/data/rocketmq/commitlog/00000000000000000100】。MappedFileQueue中集合属性mappedFiles中存在00000000000000000000、00000000000000000050两个MappedFile文件。

如果真实发送的消息字节数没有超过当前字节缓冲区剩余空间则优先当前MapperFile文件处理。否则创建新的MapperFile文件。

public class AllocateMappedFileService extends ServiceThread {private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static int waitTimeOut = 1000 * 5;private ConcurrentMap<String, AllocateRequest> requestTable = new ConcurrentHashMap<>();private PriorityBlockingQueue<AllocateRequest> requestQueue = new PriorityBlockingQueue<>();private volatile boolean hasException = false;private DefaultMessageStore messageStore;public AllocateMappedFileService(DefaultMessageStore messageStore) {this.messageStore = messageStore;}// nextFilePath:CommitLog文件路径 /data/rocketmq/commitlog/00000000000000000000public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {int canSubmitRequests = 2;...AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;...boolean offerOK = this.requestQueue.offer(nextReq);AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;...boolean offerOK = this.requestQueue.offer(nextNextReq);...// 每次只是返回nextFilePath对应的MappedFile。此时 requestQueue 队列为空,requestTable集合中只是存在 nextNextFilePath 对应的MappedFile文件// 如果AllocateRequest result = this.requestTable.get(nextFilePath);messageStore.getPerfCounter().startTick("WAIT_MAPFILE_TIME_MS");// 阻塞等待 线程AllocateMappedFileService 初始化MapperFile文件。默认时间为5秒boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);messageStore.getPerfCounter().endTick("WAIT_MAPFILE_TIME_MS");if (!waitOK) {log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());return null;} else {this.requestTable.remove(nextFilePath);// 返回nextFilePath对应的MappedFile文件,并添加到MappedFileQueue中集合属性mappedFiles中return result.getMappedFile();}}...public void run() {// 初始化 MapperFile文件 任务while (!this.isStopped() && this.mmapOperation()) {}}/***  通过 putRequestAndReturnMappedFile 生成的文件名异步创建本地文件*/private boolean mmapOperation() {boolean isSuccess = false;AllocateRequest req = null;try {req = this.requestQueue.take();//移除并返回元素,否则阻塞等待AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());...if (req.getMappedFile() == null) {long beginTime = System.currentTimeMillis();//创建对应对应大小、对应磁盘地址的本地文件。并且建立磁盘 & 内核映射关系MappedFile mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize());;...// pre write mappedFile 预热处理if (mappedFile.getFileSize() >= mappedFileSizeCommitLog && warmMapedFileEnable) {FlushDiskType flushDiskType = this.messageStore.getMessageStoreConfig().getFlushDiskType();MessageStoreConfig messageStoreConfig = this.messageStore.getMessageStoreConfig();int flushLeastPagesWhenWarmMapedFile = messageStoreConfig.getFlushLeastPagesWhenWarmMapedFile();mappedFile.warmMappedFile(flushDiskType,flushLeastPagesWhenWarmMapedFile);}req.setMappedFile(mappedFile);this.hasException = false;isSuccess = true;}} finally {if (req != null && isSuccess)req.getCountDownLatch().countDown();//初始化完毕释放锁}return true;}static class AllocateRequest implements Comparable<AllocateRequest> {// Full file pathprivate String filePath;private int fileSize;private CountDownLatch countDownLatch = new CountDownLatch(1);private volatile MappedFile mappedFile = null;public AllocateRequest(String filePath, int fileSize) {this.filePath = filePath;this.fileSize = fileSize;}...public CountDownLatch getCountDownLatch() {return countDownLatch;}public void setCountDownLatch(CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;}public MappedFile getMappedFile() {return mappedFile;}public void setMappedFile(MappedFile mappedFile) {this.mappedFile = mappedFile;}}
}

2.DefaultMappedFile

public class DefaultMappedFile extends AbstractMappedFile {public DefaultMappedFile(final String fileName, final int mappedFileSizeCommitLog) throws IOException {init(fileName, mappedFileSizeCommitLog);}private void init(final String fileName, final int mappedFileSizeCommitLog) throws IOException {this.fileName = fileName;this.mappedFileSizeCommitLog = mappedFileSizeCommitLog;this.file = new File(fileName);this.fileFromOffset = Long.parseLong(this.file.getName());boolean ok = false;UtilAll.ensureDirOK(this.file.getParent());try {this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, mappedFileSizeCommitLog);TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(mappedFileSizeCommitLog);TOTAL_MAPPED_FILES.incrementAndGet();ok = true;}finally {if (!ok && this.fileChannel != null) {this.fileChannel.close();}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/36888.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【力扣每日一题】617. 合并二叉树 dfs bfs 8.14打卡

文章目录 题目思路代码 题目 617. 合并二叉树 难度&#xff1a; 简单 描述&#xff1a; 给你两棵二叉树&#xff1a; root1 和 root2 。 想象一下&#xff0c;当你将其中一棵覆盖到另一棵之上时&#xff0c;两棵树上的一些节点将会重叠&#xff08;而另一些不会&#xff0…

阿里云ACP知识点

前言&#xff1a;记录ACP错题 1、在创建阿里云ECS时&#xff0c;每台服务器必须要包含_______用来存储操作系统和核心配置。 系统盘&#xff08;不是实例&#xff0c;实例是一个虚拟的计算环境&#xff0c;由CPU、内存、系统盘和运行的操作系统组成&#xff1b;ESC实例作为云…

【量化课程】08_1.机器学习量化策略基础实战

文章目录 1. 常用机器学习模型1.1 回归模型1.2 分类模型1.2.1 SVC介绍1.2.2 SVC在量化策略中的应用 2. 机器学习量化策略实现的基本步骤3. 策略实现 1. 常用机器学习模型 1.1 回归模型 线性回归多层感知器回归自适应提升树回归随机森林回归 1.2 分类模型 线性分类支持向量机…

Android布局【FrameLayout】

文章目录 常见属性说明项目结构主要代码 常见属性 android:foreground&#xff1a;设置前景android:foregroundGravity&#xff1a;设置前景位置 说明 FrameLayout的其他属性与前面学的差不多&#xff0c;只不过需要特别注意上面两个即可 项目结构 主要代码 activity_main…

计算机视觉中的特征检测和描述

一、说明 这篇文章是关于计算机视觉中特征检测和描述概念的简要理解。在其中&#xff0c;我们探讨了它们的定义、常用技术、简单的 python 实现和一些限制。 二、什么是特征检测和描述&#xff1f; 特征检测和描述是计算机视觉中的基本概念&#xff0c;在图像识别、对象跟踪和图…

Beats:使用 Filebeat 将 golang 应用程序记录到 Elasticsearch - 8.x

毫无疑问&#xff0c;日志记录是任何应用程序最重要的方面之一。 当事情出错时&#xff08;而且确实会出错&#xff09;&#xff0c;我们需要知道发生了什么。 为了实现这一目标&#xff0c;我们可以设置 Filebeat 从我们的 golang 应用程序收集日志&#xff0c;然后将它们发送…

微信小程序备案流程

微信小程序备案流程 &#x1f4d4; 千寻简笔记介绍 千寻简笔记已开源&#xff0c;Gitee与GitHub搜索chihiro-notes&#xff0c;包含笔记源文件.md&#xff0c;以及PDF版本方便阅读&#xff0c;且是用了精美主题&#xff0c;阅读体验更佳&#xff0c;如果文章对你有帮助请帮我…

Android布局【TableLayout】

文章目录 说明常见属性子控件设置属性 项目结构主要代码 说明 TableLayout也称为表格布局 常见属性 android:collapseColumns&#xff1a;设置需要被隐藏的列的序列号&#xff0c;从0开始android:stretchColumns&#xff1a;设置允许被拉伸的列的列序号&#xff0c;从0开始&…

Python中使用隧道爬虫ip提升数据爬取效率

作为专业爬虫程序员&#xff0c;我们经常面临需要爬取大量数据的任务。然而&#xff0c;有些网站可能会对频繁的请求进行限制&#xff0c;这就需要我们使用隧道爬虫ip来绕过这些限制&#xff0c;提高数据爬取效率。本文将分享如何在Python中使用隧道爬虫ip实现API请求与响应的技…

(十八)大数据实战——Hive的metastore元数据服务安装

前言 Hive的metastore服务作用是为Hive CLI或者Hiveserver2提供元数据访问接口。Hive的metastore 是Hive元数据的存储和管理组件&#xff0c;它负责管理 Hive 表、分区、列等元数据信息。元数据是描述数据的数据&#xff0c;它包含了关于表结构、存储位置、数据类型等信息。本…

Android Jetpack Compose 中的分页与缓存展示

Android Jetpack Compose 中的分页与缓存展示 在几乎任何类型的移动项目中&#xff0c;移动开发人员在某个时候都会处理分页数据。如果数据列表太大&#xff0c;无法一次从服务器检索完毕&#xff0c;这就是必需的。因此&#xff0c;我们的后端同事为我们提供了一个端点&#…

ArcGIS Pro应用—暨基础入门、制图、空间分析、影像分析、三维建模、空间统计分析与建模、python融合、案例应用全流程科研能力提升教程

详情点击链接&#xff1a;ArcGIS Pro应用—暨基础入门、制图、空间分析、影像分析、三维建模、空间统计分析与建模、python融合、案例应用全流程科研能力提升教程 第一&#xff1a;GIS及ArcGIS Pro 1.GIS基本原理及常用软件 2.ArcGIS Pro 安装与配置 3.ArcGIS Pro 3.0 的新…

小白到运维工程师自学之路 第七十三集 (kubernetes应用部署)

一、安装部署 1、以Deployment YAML方式创建Nginx服务 这个yaml文件在网上可以下载 cat nginx-deployment.yaml apiVersion: apps/v1 #apiVersion是当前配置格式的版本 kind: Deployment #kind是要创建的资源类型&#xff0c;这里是Deploymnet metadata: #metadata是该资源…

Photoshop多图片与多窗口下排列操作方法

首先&#xff0c;在Photoshop中打开6张图片&#xff0c;在“窗口”菜单下切换窗口排列状态&#xff1a; 在 “窗口”菜单下对窗口进行排列&#xff0c;分别呈现如下&#xff1a; &#xff08;一&#xff09;. 点击“窗口” -> “排列”->"全部垂直拼贴": &am…

本地oracle登录账号锁定处理,the account is locked

1.打开cmd命令窗口 2.打开sqlplus: sqlplus /nolog(加/nolog是不登录服务器的意思&#xff0c;不加就需要输账号密码) 3.切换到管理员&#xff1a;conn / as sysdba; 第2步第3步可以合并&#xff0c;直接使用sysdba登录&#xff1a;sqlplus / as sysdba; 4.解锁账号&#x…

如何从cpu改为gpu,pytorch,cuda

1.cmd输入nvcc -V 2.得到 cuda版本后&#xff0c;去pytorch官网 3.根据自己的cuda进行选择 4.复制上述链接&#xff0c;进入cmd 5.cmd中输入activate XXX,这里的"XXX"指代自己在工程中用到的环境 6.进入后&#xff0c;将刚才链接粘贴&#xff0c;回车等待下载结束 …

《高性能MySQL》——查询性能优化(笔记)

文章目录 六、查询性能优化6.1 查询为什么会慢6.2 慢查询基础&#xff1a;优化数据访问6.2.1 是否向数据库请求了不需要的数据查询不需要的记录多表关联时返回全部列总是取出全部列重复查询相同的数据 6.2.2 MySQL 是否在扫描额外的记录响应时间扫描的行数与返回的行数扫描的行…

新增守护进程管理、支持添加MySQL远程数据库,支持PHP版本切换,1Panel开源面板v1.5.0发布

2023年8月14日&#xff0c;现代化、开源的Linux服务器运维管理面板1Panel正式发布v1.5.0版本。 在这个版本中&#xff0c;1Panel新增了守护进程管理功能&#xff1b;支持添加MySQL远程数据库&#xff1b;支持添加FTP/S和WebDAV的SFTP服务&#xff1b;支持PHP版本切换。此外&am…

jupyter打开ipynb后,还没有运行cell,反复报错

今天遇到了一个比较奇怪的问题&#xff1a; 这个原因是当前目录下有一个code.py的文件&#xff0c;一旦打开ipynb&#xff0c;就是先执行code.py&#xff0c;而且遇到报错&#xff0c;还会反复执行&#xff0c;导致内核崩溃。

创建一个 React+Typescript 项目

接下来 我们来一起探索一下用TypeScript 来编写react 这也是一个非常好的趋势&#xff0c;目前也非常多人使用 那么 我们就先从创建项目开始 首先 我们先找一个 或者 之前创建一个目录 用来放我们的项目 然后 在这个目录下直接输入 例如 这里 我想创建一个叫 tsReApp 的项目…