深度解析RocketMq源码-IndexFile

1.绪论

在工作中,我们经常需要根据msgKey查询到某条日志。但是,通过前面对commitLog分析,producer将消息推送到broker过后,其实broker是直接消息到达broker的先后顺序写入到commitLog中的。我们如果想根据msgKey检索一条消息无疑大海捞针,所以们需要像数集一样建立一个目录,我们其实可以想到的是构建一个Map,key存储msgKey,value存储msg在commitLog中的物理偏移量。而这个目录其实就是indexFile。

2.indexFile的组成和原理

indexFile主要由两部分组成,分别是indexFile文件头和index的文件内容。

2.1 indexFile文件头 - IndexHeader 

indexHeader占据40个字节,其中最重要的是他记录了整个索引文件的最开始插入的索引的时间和最后一条数据插入的时间,主要是为了支持根据时间进行范围搜索。以及第一条和最后一条日志的索引位置。还有一个就是已经插入了多少条索引IndexCount。

public class IndexHeader {
//index文件头占4个字节public static final int INDEX_HEADER_SIZE = 40;private static int beginTimestampIndex = 0;private static int endTimestampIndex = 8;private static int beginPhyoffsetIndex = 16;private static int endPhyoffsetIndex = 24;private static int hashSlotcountIndex = 32;private static int indexCountIndex = 36;private final ByteBuffer byteBuffer;//开始的时间戳private final AtomicLong beginTimestamp = new AtomicLong(0);//结束时间戳private final AtomicLong endTimestamp = new AtomicLong(0);//开始的物理偏移量private final AtomicLong beginPhyOffset = new AtomicLong(0);//结束的物理偏移量private final AtomicLong endPhyOffset = new AtomicLong(0);//hash槽的数量private final AtomicInteger hashSlotCount = new AtomicInteger(0);//index的数量private final AtomicInteger indexCount = new AtomicInteger(1);
}

2.2 indexFile的组成

idnexFile的内容包括:

1. 40个字节的indexFile头

2. 4* 500w个字节hash槽,每个槽记录的其实是:根据key取hash值%槽数在当前hash槽的索引的序号(也即当前有多少条索引)

3. 20*2000w个自己的索引数,每条索引20个字节,包含4个字节索引key的hash值+8个字节的物理偏移量+4个字节的当前索引的插入时间距离该索引文件第一条索引的插入时间的差值+4个字节的上一个在当前hash槽的索引的序号。

我们可以画图来描述一下:

e5b98bd4baf04c1c9393127f0a93ebca.png

可以看出idnexFile是采用链地址法解决hash冲突的,每个索引存储有上一条拥有相同hash值索引的index值,相当于通过链表将这些hash冲突的索引串起来。

public class IndexFile {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);//一个hash槽的大小为4个字节private static int hashSlotSize = 4;//一条索引的大小为20字节private static int indexSize = 20;private static int invalidIndex = 0;//hash槽的数量private final int hashSlotNum;//index的总数量private final int indexNum;//index也是存储在mappedFile中的private final MappedFile mappedFile;private final MappedByteBuffer mappedByteBuffer;//index文件的头private final IndexHeader indexHeader;public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,final long endPhyOffset, final long endTimestamp) throws IOException {//文件总大小 = 头部所占40个字节 + hash槽数量(默认为500w) * 4个字节 + index数量 * 20个字节int fileTotalSize =IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);//新建mappedFilethis.mappedFile = new MappedFile(fileName, fileTotalSize);//获取到与文件建立映射关系的bufferthis.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();//hash槽数量this.hashSlotNum = hashSlotNum;//索引文件的数量this.indexNum = indexNum;ByteBuffer byteBuffer = this.mappedByteBuffer.slice();//index文件的头部this.indexHeader = new IndexHeader(byteBuffer);if (endPhyOffset > 0) {//够级整个索引文件的开始的物理偏移量和结束的偏移量this.indexHeader.setBeginPhyOffset(endPhyOffset);this.indexHeader.setEndPhyOffset(endPhyOffset);}if (endTimestamp > 0) {//够级整个索引文件的开始时间戳和结束时间戳this.indexHeader.setBeginTimestamp(endTimestamp);this.indexHeader.setEndTimestamp(endTimestamp);}}
}

3.向indexFile插入一条索引数据

主要的步骤如下:

1.获取msgKey的hash值;

2.通过hash值对总的hash槽数取模得到对应第几个槽;

3.40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址;

4.40个字节的索引头大小+hash槽总数*4个字节+现在存储了多少条索引*20个字节得到最新一条数据写入的物理偏移量;

5.分别写入索引内容:hash值,commitLog的物理偏移量,距离第一条索引的时间戳+上一条指向同一个hash槽的索引的序号(也即当前hash槽中存储的值);

6.将最新一条的索引序号写入到hash槽中。

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {if (this.indexHeader.getIndexCount() < this.indexNum) {//1.获取msgKey的hash值int keyHash = indexKeyHashMethod(key);//2.通过hash值对总的hash槽数取模得到对应第几个槽int slotPos = keyHash % this.hashSlotNum;//3.40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;try {//获取到上一个hash槽的所指向的索引序号int slotValue = this.mappedByteBuffer.getInt(absSlotPos);if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {slotValue = invalidIndex;}//获取当前索引与第一条索引的差值long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();timeDiff = timeDiff / 1000;if (this.indexHeader.getBeginTimestamp() <= 0) {timeDiff = 0;} else if (timeDiff > Integer.MAX_VALUE) {timeDiff = Integer.MAX_VALUE;} else if (timeDiff < 0) {timeDiff = 0;}//40个字节的索引头大小+hash槽总数*4个字节+现在存储了多少条索引*20个字节得到最新一条数据写入的物理偏移量int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ this.indexHeader.getIndexCount() * indexSize;//分别写入索引内容:hash值,commitLog的物理偏移量,距离第一条索引的时间戳+上一条指向同一个hash槽的索引的序号this.mappedByteBuffer.putInt(absIndexPos, keyHash);this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//将最新一条的索引序号写入到hash槽中this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());//更新idnex中的最后一条索引的时间戳和物理偏移量if (this.indexHeader.getIndexCount() <= 1) {this.indexHeader.setBeginPhyOffset(phyOffset);this.indexHeader.setBeginTimestamp(storeTimestamp);}if (invalidIndex == slotValue) {this.indexHeader.incHashSlotCount();}   //增加indexheader索引序号this.indexHeader.incIndexCount();this.indexHeader.setEndPhyOffset(phyOffset);this.indexHeader.setEndTimestamp(storeTimestamp);return true;} catch (Exception e) {log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);}} else {log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()+ "; index max num = " + this.indexNum);}return false;}

4.从indexFile中读取一条索引数据

1.获取索引key的hash值;

2.hash值对槽总数取模获得第几个槽;

3.40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址;

4.从槽中读取到该槽所指向的最新的一条索引序号;

5.40个字节的索引头大小+hash槽总数*4个字节+hash槽中存储的索引序号*20个字节得到最新一条数据写入的物理偏移量;

6.如果hash值相等,并且时间匹配,证明匹配到数据,跳出循环;

7.如果不匹配,便根据链表寻找到拥有相同hash值并且时间匹配的日志;

    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,final long begin, final long end) {if (this.mappedFile.hold()) {//获取索引key的hash值int keyHash = indexKeyHashMethod(key);//hash值对槽总数取模获得第几个槽int slotPos = keyHash % this.hashSlotNum;//40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;try {//从槽中读取到该槽所指向的最新的一条索引序号int slotValue = this.mappedByteBuffer.getInt(absSlotPos);if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()|| this.indexHeader.getIndexCount() <= 1) {} else {for (int nextIndexToRead = slotValue; ; ) {if (phyOffsets.size() >= maxNum) {break;}// 40个字节的索引头大小+hash槽总数*4个字节+hash槽中存储的索引序号*20个字节得到最新一条数据写入的物理偏移量int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ nextIndexToRead * indexSize;//获取索引的hash值int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);//获取到该索引的物理偏移量long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);//获取到时间戳差值long timeDiff = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);//获取到拥有相同槽数的上一条索引序号int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);if (timeDiff < 0) {break;}timeDiff *= 1000L;long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;boolean timeMatched = (timeRead >= begin) && (timeRead <= end);//如果hash值相等,并且时间匹配,证明匹配到数据,跳出循环if (keyHash == keyHashRead && timeMatched) {phyOffsets.add(phyOffsetRead);}//如果上一条索引非法,证明已经到达链表头部,跳出循环,证明该条索引就是需要寻找的索引if (prevIndexRead <= invalidIndex|| prevIndexRead > this.indexHeader.getIndexCount()|| prevIndexRead == nextIndexToRead || timeRead < begin) {break;}nextIndexToRead = prevIndexRead;}}} catch (Exception e) {log.error("selectPhyOffset exception ", e);} finally {this.mappedFile.release();}}}

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/36691.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Zookeeper:基于Zookeeper的分布式锁

一、Zookeeper分布式锁原理 二、Zookeeper JavaAPI操作 1、Curator介绍 Curator是Apache Zookeeper的Java客户端。常见的Zookeeper Java API&#xff1a; 原生Java API。ZkClient。Curator。 Curator项目目标是简化Zookeeper客户端的使用。Curator最初是Netfix研发的&#xf…

C++11的可变参数模板

可变参数模板 什么是可变参数模板的可变参数展开参数包emplace系列函数引例emplace系列函数 什么是可变参数 printf和scanf中就涉及可变参数 这里三个点就代表可变参数&#xff0c;意思就是不管你传多少个参数&#xff0c;都可以接收 printf("%d",x); printf("…

狼牙山短视频:成都柏煜文化传媒有限公司

狼牙山短视频&#xff1a;记录自然与历史的交融 随着短视频的兴起&#xff0c;我们得以在短短几分钟内&#xff0c;跨越千山万水&#xff0c;领略世界各地的风情。成都柏煜文化传媒有限公司 而今天&#xff0c;我想带大家走进一个独特的地方——狼牙山&#xff0c;通过一系列短…

Transformer教程之Transformer的历史背景

在现代人工智能领域&#xff0c;Transformer模型已经成为一种不可或缺的技术&#xff0c;它在自然语言处理&#xff08;NLP&#xff09;和计算机视觉等多个领域取得了巨大的成功。本文将带你回顾Transformer的历史背景&#xff0c;了解它是如何从最初的构想到今天的广泛应用的。…

Web渗透:文件包含漏洞

Ⅱ.远程文件包含 远程文件包含漏洞&#xff08;Remote File Inclusion, RFI&#xff09;是一种Web应用程序漏洞&#xff0c;允许攻击者通过URL从远程服务器包含并执行文件&#xff1b;RFI漏洞通常出现在动态包含文件的功能中&#xff0c;且用户输入未经适当验证和过滤。接着我…

生产者发送数据,kafka服务器接收数据异常的问题记录

现象&#xff1a; 某个客户要求审计日志用kafka的方式传输给他们&#xff0c;使用了第三方的librdkafka库来开发。 往客户提供的kafka服务器上的一个topic发送数据&#xff0c;这个topic有三个分区&#xff0c;客户反馈接收到的数据和发送端发送的实际数量对不上&#xff0c;他…

使用VMware创建Ubuntu 24.04【一】

相关链接下载地址 VMware https://www.vmware.com/content/vmware/vmware-published-sites/cn/products/workstation-pro/workstation-pro-evaluation.html.html.html Ubuntu 24.04 LTS https://cn.ubuntu.com/download/desktop 虚拟机创建 1、打开VNware软件&#xff0c;点…

5.9k!一款清新好用的后台管理系统!【送源码】

今天给大家分享的开源项目是一个优雅清新后台管理系统——Soybean Admin。 简介 官方是这样介绍这个项目的&#xff1a; Soybean Admin 使用的是Vue3作为前端框架&#xff0c;TypeScript作为开发语言&#xff0c;同时还整合了NaiveUI组件库&#xff0c;使得系统具有高可用性和…

基于YOLOv5+pyqt5的口罩佩戴检测系统(PyQT页面+YOLOv5模型+数据集)

简介 在各种工作环境和公共场所,确保人们正确佩戴口罩对个人防护和公共卫生至关重要,尤其是在医疗设施、制造业车间和拥挤的公共交通中。为了满足这一需求,我们开发了一种基于YOLOv5目标检测模型的口罩佩戴检测系统。本项目不仅实现了高精度的口罩佩戴检测,还设计了一个可…

学习提示词工程

去年 11 月 8 日&#xff0c;新加坡政府科技局&#xff08;GovTech&#xff09;组织举办了首届 GPT-4 提示工程&#xff08;Prompt Engineering&#xff09;竞赛。数据科学家 Sheila Teo 最终夺冠&#xff0c;成为最终的提示女王&#xff08;Prompt Queen&#xff09;。之后&am…

Swagger2及常用校验注释说明

Api(value "后台用户管理") RestController RequestMapping("bossuser") public class BossUserController {ApiOperation(value "测试接口")PostMapping("test")public String testUser(Valid RequestBody TestUser user) {LOG.inf…

机器学习之集成学习

一&#xff1a;概念 顾名思义集成学习就是用多个其他的算法结合起来使用 对于“其他算法”有同类和同质的区别&#xff0c;同质指的是所用的算法都是同一类型的&#xff0c;比如决策树和神经网络&#xff0c;这种也叫基学习器。反之亦然&#xff0c;但一般使用的是同质的。 …

6种高效便捷的移动硬盘加密软件,总有一款适合你

想要给自己移动硬盘内的文件/文件夹加密来保护数据隐私&#xff0c;防止重要信息泄露&#xff1f;使用电脑文件夹加密工具可以轻松帮您解决&#xff01;面对市面上众多的加密工具&#xff0c;如何选择成为一大难题。本文将为您提供一份详细的挑选指南&#xff0c;帮助您选择最合…

Java程序员接单的十条“野路子”,分分钟收入20K!

Java程序员除了主业工作外&#xff0c;也要适当扩展兼职接单这条路。毕竟Java接单可以说是Java程序员进行技术变现的最佳方式之一。 因为Java程序员兼职接单的难度相对更低&#xff0c;单量也比较可观&#xff0c;最重要的是性价比也很顶&#xff0c;且听我一一道来&#xff1a…

2024年6月24日 (周一) 叶子游戏新闻

图吧工具箱: 全名图拉丁吧硬件检测工具箱,是开源、免费、绿色、纯净的硬件检测工具合集,专为图钉及所有DIY爱好者制作,包含常用硬件测试和检测工具,月工JS必备! 土豆录屏: 免费、无录制时长限制、无水印的录屏软件 高手在民间 粉丝玩家打造精美《黄金树幽影》巨大插画虽然不是专…

大数据------额外软件、插件及技术------Linux(完整知识点汇总)

Linxu 不同领域的主流操作系统 桌面操作系统 WindowsMAac OSLinux 服务器端操作系统 UNIX&#xff08;付费&#xff09;LinuxWindows Server&#xff08;付费&#xff09; 移动设备操作系统 Android&#xff08;基于Linux开源&#xff09;IOS&#xff08;不开源&#xff09; 嵌…

Three.js鼠标拖动设置骨骼姿态

实现 根据SkinnedMesh生成Mesh 作为射线检测的目标&#xff08;射线检测SkinnedMesh存在不足 无法应用骨骼形变的顶点 &#xff09;点击模型 获取点击位置对应的骨骼拖拽鼠标设置骨骼旋转角度&#xff08;使用TransformControl选中点击的骨骼 设置轴为XYZE 并隐藏控件 主动触发…

PostgreSQL计算 queryid 原理

数据库版本 PG 16.1 queryid 是什么 queryid 是将 sql 规范化 (normalization) 后&#xff0c;通过哈希函数计算出来的 64 位整数。 以 SELECT id, data FROM tbl_a WHERE id < 300 ORDER BY data; 这条 SQL 为例。当我们在 PG 中执行这条 sql 时&#xff0c;内核在语义…

【STM32-DAP 仿真器】

STM32-DAP 仿真器 ■ STM32-DAP仿真器介绍■ STM32-DAP仿真特点■ STM32-DAP仿真器实物图■ STM32-DAP高速 DAP 仿真器实物图■ STM32-DAP高速无线调试器 实物图■ STM32-DAP高速无线调试器示意图■ STM32-DAP高速无线调试器接线图■ STM32-DAP高速无线调试器接收端示意图 ■ S…

vcruntime140_1.dll是什么东东?vcruntime140_1.dll缺失的8个解决方法

当电脑出现找不到vcruntime140_1.dll,或vcruntime140_1.dll丢失无法打开软件怎么办&#xff1f;小编今天在本文详细为大家介绍解决方法与介绍vcruntime140_1.dll究竟是什么等vcruntime140_1.dll的问题。 一、vcruntime140_1.dll文件是什么 文件概述定义与功能 vcruntime140_…