4、Rocketmq之存储原理

CommitLog ~ MappedFileQueue ~ MappedFile集合
在这里插入图片描述

正常情况下,RocketMQ支持消息体字节数最多为1个G。注意该消息体并不单单是消息体body。如果生产的消息其字节数超过1个G则该消息是无法被落盘处理的。因为没有一个MapperFile文件可以承载该消息所有的字节数。

1.AllocateMappedFileService

参考文章

异步初始化CommitLog文件时优先初始化nextFilePath、nextNextFilePath两个文件。

同时创建nextFilePath【/data/rocketmq/commitlog/00000000000000000000】、nextNextFilePath【/data/rocketmq/commitlog/00000000000000000050】两个文件是如何使用的呢?

  1. 优先返回nextFilePath,并添加到MappedFileQueue集合属性mappedFiles中。此时队列requestQueue为空。requestTable集合元素为nextNextFilePath【/data/rocketmq/commitlog/00000000000000000050】。
  2. 如果消息体的长度没有达到当前MapperFile中字节缓冲区capacity的大小,则不会创建新的MapperFile文件。
  3. 如果步骤2不成立,则创建新的nextFilePath【/data/rocketmq/commitlog/00000000000000000050】、nextNextFilePath【/data/rocketmq/commitlog/00000000000000000100】对应的MapperFile文件。但是由于requestTable集合不为空即存在nextFilePath对应的MapperFile文件【/data/rocketmq/commitlog/00000000000000000050】则删除并返回当前集合元素。
  4. 此时requestTable集合元素为nextNextFilePath【/data/rocketmq/commitlog/00000000000000000100】。MappedFileQueue中集合属性mappedFiles中存在00000000000000000000、00000000000000000050两个MappedFile文件。

如果真实发送的消息字节数没有超过当前字节缓冲区剩余空间则优先当前MapperFile文件处理。否则创建新的MapperFile文件。

public class AllocateMappedFileService extends ServiceThread {private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static int waitTimeOut = 1000 * 5;private ConcurrentMap<String, AllocateRequest> requestTable = new ConcurrentHashMap<>();private PriorityBlockingQueue<AllocateRequest> requestQueue = new PriorityBlockingQueue<>();private volatile boolean hasException = false;private DefaultMessageStore messageStore;public AllocateMappedFileService(DefaultMessageStore messageStore) {this.messageStore = messageStore;}// nextFilePath:CommitLog文件路径 /data/rocketmq/commitlog/00000000000000000000public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {int canSubmitRequests = 2;...AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;...boolean offerOK = this.requestQueue.offer(nextReq);AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;...boolean offerOK = this.requestQueue.offer(nextNextReq);...// 每次只是返回nextFilePath对应的MappedFile。此时 requestQueue 队列为空,requestTable集合中只是存在 nextNextFilePath 对应的MappedFile文件// 如果AllocateRequest result = this.requestTable.get(nextFilePath);messageStore.getPerfCounter().startTick("WAIT_MAPFILE_TIME_MS");// 阻塞等待 线程AllocateMappedFileService 初始化MapperFile文件。默认时间为5秒boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);messageStore.getPerfCounter().endTick("WAIT_MAPFILE_TIME_MS");if (!waitOK) {log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());return null;} else {this.requestTable.remove(nextFilePath);// 返回nextFilePath对应的MappedFile文件,并添加到MappedFileQueue中集合属性mappedFiles中return result.getMappedFile();}}...public void run() {// 初始化 MapperFile文件 任务while (!this.isStopped() && this.mmapOperation()) {}}/***  通过 putRequestAndReturnMappedFile 生成的文件名异步创建本地文件*/private boolean mmapOperation() {boolean isSuccess = false;AllocateRequest req = null;try {req = this.requestQueue.take();//移除并返回元素,否则阻塞等待AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());...if (req.getMappedFile() == null) {long beginTime = System.currentTimeMillis();//创建对应对应大小、对应磁盘地址的本地文件。并且建立磁盘 & 内核映射关系MappedFile mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize());;...// pre write mappedFile 预热处理if (mappedFile.getFileSize() >= mappedFileSizeCommitLog && warmMapedFileEnable) {FlushDiskType flushDiskType = this.messageStore.getMessageStoreConfig().getFlushDiskType();MessageStoreConfig messageStoreConfig = this.messageStore.getMessageStoreConfig();int flushLeastPagesWhenWarmMapedFile = messageStoreConfig.getFlushLeastPagesWhenWarmMapedFile();mappedFile.warmMappedFile(flushDiskType,flushLeastPagesWhenWarmMapedFile);}req.setMappedFile(mappedFile);this.hasException = false;isSuccess = true;}} finally {if (req != null && isSuccess)req.getCountDownLatch().countDown();//初始化完毕释放锁}return true;}static class AllocateRequest implements Comparable<AllocateRequest> {// Full file pathprivate String filePath;private int fileSize;private CountDownLatch countDownLatch = new CountDownLatch(1);private volatile MappedFile mappedFile = null;public AllocateRequest(String filePath, int fileSize) {this.filePath = filePath;this.fileSize = fileSize;}...public CountDownLatch getCountDownLatch() {return countDownLatch;}public void setCountDownLatch(CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;}public MappedFile getMappedFile() {return mappedFile;}public void setMappedFile(MappedFile mappedFile) {this.mappedFile = mappedFile;}}
}

2.DefaultMappedFile

public class DefaultMappedFile extends AbstractMappedFile {public DefaultMappedFile(final String fileName, final int mappedFileSizeCommitLog) throws IOException {init(fileName, mappedFileSizeCommitLog);}private void init(final String fileName, final int mappedFileSizeCommitLog) throws IOException {this.fileName = fileName;this.mappedFileSizeCommitLog = mappedFileSizeCommitLog;this.file = new File(fileName);this.fileFromOffset = Long.parseLong(this.file.getName());boolean ok = false;UtilAll.ensureDirOK(this.file.getParent());try {this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, mappedFileSizeCommitLog);TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(mappedFileSizeCommitLog);TOTAL_MAPPED_FILES.incrementAndGet();ok = true;}finally {if (!ok && this.fileChannel != null) {this.fileChannel.close();}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/36888.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【力扣每日一题】617. 合并二叉树 dfs bfs 8.14打卡

文章目录 题目思路代码 题目 617. 合并二叉树 难度&#xff1a; 简单 描述&#xff1a; 给你两棵二叉树&#xff1a; root1 和 root2 。 想象一下&#xff0c;当你将其中一棵覆盖到另一棵之上时&#xff0c;两棵树上的一些节点将会重叠&#xff08;而另一些不会&#xff0…

阿里云ACP知识点

前言&#xff1a;记录ACP错题 1、在创建阿里云ECS时&#xff0c;每台服务器必须要包含_______用来存储操作系统和核心配置。 系统盘&#xff08;不是实例&#xff0c;实例是一个虚拟的计算环境&#xff0c;由CPU、内存、系统盘和运行的操作系统组成&#xff1b;ESC实例作为云…

C++11 异步与通信之 std::async

概念简介 std::async 异步运行一个函数&#xff0c;将返回值保存在std::future中。 含有2个策略参数&#xff1a; launch::deferred 延迟执行&#xff0c;当调用wait()和get()时&#xff0c;任务才会被运行&#xff0c;且不创建线程&#xff1b;launch::async : 创建线程并执…

react项目做的h5页面加载缓慢优化(3s优化到0.6s)

打包到生产环境时去掉SOURCEMAP 禁用生成 Source Map 是一种权衡&#xff0c;可以根据项目的实际需求和优化目标来决定是否禁用。如果您对调试需求不是特别强烈&#xff0c;可以考虑在生产构建中禁用 Source Map 以获取更好的性能。但如果需要保留调试能力&#xff0c;可以在生…

【量化课程】08_1.机器学习量化策略基础实战

文章目录 1. 常用机器学习模型1.1 回归模型1.2 分类模型1.2.1 SVC介绍1.2.2 SVC在量化策略中的应用 2. 机器学习量化策略实现的基本步骤3. 策略实现 1. 常用机器学习模型 1.1 回归模型 线性回归多层感知器回归自适应提升树回归随机森林回归 1.2 分类模型 线性分类支持向量机…

Android布局【FrameLayout】

文章目录 常见属性说明项目结构主要代码 常见属性 android:foreground&#xff1a;设置前景android:foregroundGravity&#xff1a;设置前景位置 说明 FrameLayout的其他属性与前面学的差不多&#xff0c;只不过需要特别注意上面两个即可 项目结构 主要代码 activity_main…

【leetcode】第2章 链表

203. 移除链表元素 方法&#xff1a;添加一个虚拟节点&#xff0c;这不用考虑头节点删除情况 public ListNode removeElements(ListNode head, int val) {// 虚拟节点&#xff0c;指向头节点ListNode dummy new ListNode(0);dummy.next head;ListNode p dummy;// 找到被删…

数据结构:树状数组

老规矩&#xff0c;推荐一篇原理讲解清晰的博客&#xff01;&#xff08;树状数组(详细分析应用)&#xff0c;看不懂打死我!_树形数组_鲜果维他命的博客-CSDN博客&#xff09; 相对于线段树&#xff0c;树状数组的优点就是代码简洁&#xff0c;容易修改。单缺点就是优点问题只…

计算机视觉中的特征检测和描述

一、说明 这篇文章是关于计算机视觉中特征检测和描述概念的简要理解。在其中&#xff0c;我们探讨了它们的定义、常用技术、简单的 python 实现和一些限制。 二、什么是特征检测和描述&#xff1f; 特征检测和描述是计算机视觉中的基本概念&#xff0c;在图像识别、对象跟踪和图…

Beats:使用 Filebeat 将 golang 应用程序记录到 Elasticsearch - 8.x

毫无疑问&#xff0c;日志记录是任何应用程序最重要的方面之一。 当事情出错时&#xff08;而且确实会出错&#xff09;&#xff0c;我们需要知道发生了什么。 为了实现这一目标&#xff0c;我们可以设置 Filebeat 从我们的 golang 应用程序收集日志&#xff0c;然后将它们发送…

Maven教程_编程入门自学教程_菜鸟教程-免费教程分享

教程简介 Maven 是一款基于 Java 平台的项目管理和整合工具&#xff0c;它将项目的开发和管理过程抽象成一个项目对象模型&#xff08;POM&#xff09;。开发人员只需要做一些简单的配置&#xff0c;Maven 就可以自动完成项目的编译、测试、打包、发布以及部署等工作。Maven 是…

微信小程序备案流程

微信小程序备案流程 &#x1f4d4; 千寻简笔记介绍 千寻简笔记已开源&#xff0c;Gitee与GitHub搜索chihiro-notes&#xff0c;包含笔记源文件.md&#xff0c;以及PDF版本方便阅读&#xff0c;且是用了精美主题&#xff0c;阅读体验更佳&#xff0c;如果文章对你有帮助请帮我…

二、异常日志

二、异常日志 &#xff08;一&#xff09;、错误码 错误码的制定原则&#xff1a;快速溯源、沟通标准化错误码不体现版本号和错误等级信息全部正常&#xff0c;但不得不填充错误码时返回五个零&#xff1a;00000错误码为字符串类型&#xff0c;共 5 位&#xff0c;分成两个部分…

win10 anaconda pytorch avalanche-lib 实验步骤记录

conda create --name test_python3.10 conda activate test_python3.10 配置conda国内源(北外) conda install pytorch torchvision torchaudio cpuonly -c pytorch pip3 install avalanche-lib -i https://pypi.tuna.tsinghua.edu.cn/simple conda install jupyter jupyte…

[tidb] tiup升级tidb的版本到 v7.1.1

备份 为了避免数据丢失&#xff0c;升级前需要备份当前tidb集群的数据&#xff0c;参考 TiDB 备份与恢复概述 | PingCAP 文档中心 说明 由于新版本的tidb的tiflash需要cpui支持avx2&#xff0c;所有升级前先验证当前升级的服务器是否支持avx2。升级的文档可以参考 使用 TiUP…

Android布局【TableLayout】

文章目录 说明常见属性子控件设置属性 项目结构主要代码 说明 TableLayout也称为表格布局 常见属性 android:collapseColumns&#xff1a;设置需要被隐藏的列的序列号&#xff0c;从0开始android:stretchColumns&#xff1a;设置允许被拉伸的列的列序号&#xff0c;从0开始&…

docker私有镜像仓库搭建

1、下载registry镜像 docker pull registry:2.52、生成登录私有仓库的用户名以及密码 mkdir -p /opt/registry/auth/ docker run --entrypoint htpasswd registry:2.5 -Bbn username userpwd >> /opt/registry/auth/htpasswd3、创建配置文件 mkdir -p /opt/registry/…

Git - 配置代理 和 取消代理配置

一. 配置代理 (使git走网路代理) git config --global http.proxy socks5://127.0.0.1:1080 git config --global https.proxy socks5://127.0.0.1:1080 其中 1080 是 SOCKS 代理的端口&#xff0c;一般默认 1080&#xff0c;可以在代理工具的设置中查看 地址记录&#xff1a…

Python中使用隧道爬虫ip提升数据爬取效率

作为专业爬虫程序员&#xff0c;我们经常面临需要爬取大量数据的任务。然而&#xff0c;有些网站可能会对频繁的请求进行限制&#xff0c;这就需要我们使用隧道爬虫ip来绕过这些限制&#xff0c;提高数据爬取效率。本文将分享如何在Python中使用隧道爬虫ip实现API请求与响应的技…

(十八)大数据实战——Hive的metastore元数据服务安装

前言 Hive的metastore服务作用是为Hive CLI或者Hiveserver2提供元数据访问接口。Hive的metastore 是Hive元数据的存储和管理组件&#xff0c;它负责管理 Hive 表、分区、列等元数据信息。元数据是描述数据的数据&#xff0c;它包含了关于表结构、存储位置、数据类型等信息。本…