【RocketMQ】RocketMq之IndexFile深入研究

一:RocketMq 整体文件存储介绍

存储⽂件主要分为三个部分:
  • CommitLog:存储消息的元数据。所有消息都会顺序存⼊到CommitLog⽂件当中。CommitLog由多个⽂件组成,每个⽂件固定⼤⼩1G。以第⼀条消 息的偏移量为⽂件名。
  • ConsumerQueue:存储消息在CommitLog的索引。⼀个MessageQueue⼀个⽂件,记录当前MessageQueue被哪些消费者组消费到了哪⼀条CommitLog。
  • IndexFile:为了消息查询提供了⼀种通过key或时间区间来查询消息的⽅法,这种通过IndexFile来查找消息的⽅法不影响发送与消费消息的主流程。

这篇文章主要介绍IndexFile的研究,以rocketmq5.3.0版本作为研究。

二:IndexFile的文件结构

文件整理格式,如下图2-1所示

                                                图2-1 IndexFile 文件结构图


IndexFile 文件格式

  • 文件名:以时间戳命名(例如 20240301120000000),表示该文件索引的消息的时间范围。

  • 文件大小:默认为 400MB,可通过 maxIndexSize 配置调整。

  • 存储路径:默认在 ~/store/index 目录下。

每个 IndexFile 文件由三部分组成:
1. 文件头部(Header)
2. 哈希槽(Hash Slot)区域
3. 索引条目(Index Entry)区域


1. 文件头部(Header)

字段名

长度(字节)

说明

beginTimestamp

8

索引文件覆盖的最小时间戳(消息存储时间)

endTimestamp

8

索引文件覆盖的最大时间戳(消息存储时间)

beginPhyOffset

8

索引文件对应的最小物理偏移量(CommitLog 中的起始位置)

endPhyOffset

8

索引文件对应的最大物理偏移量(CommitLog 中的结束位置)

hashSlotCount

4

哈希槽数量(固定为 5,000,000)

indexCount

4

当前已写入的索引条目数量


2. 哈希槽(Hash Slot)区域

  • 哈希槽数量:固定为 500 万个(5,000,000),每个哈希槽占 4 字节

  • 哈希函数:对消息的 Key(如 UNIQ_KEYKEYS)进行哈希计算,得到槽位索引:
    slotPos = abs(hash(key)) % 5000000

每个哈希槽存储的是 索引条目区域 的起始位置(索引条目链表的头节点)。


3. 索引条目(Index Entry)区域

每个索引条目占 20 字节,包含以下字段:

字段名

长度(字节)

说明

keyHash

4

消息 Key 的哈希值(用于快速比对)

phyOffset

8

消息在 CommitLog 中的物理偏移量

timeDiff

4

消息存储时间与文件头部 beginTimestamp 的时间差(秒级)

slotValue

4

下一个索引条目的位置(用于解决哈希冲突的链表结构)


 三:IndexFile 写入和查询流程

IndexFile 写入流程:

+---------------------+
| Producer 发送消息     |
+---------------------+|v
+---------------------+
| 提取消息的 Key        | --> 如 UNIQ_KEY 或 KEYS 属性
+---------------------+|v
+---------------------+
| 检查 IndexFile 容量   | --> 是否已满?(indexCount >= indexNum)
+---------------------+| 是v
+---------------------+
| 返回 false,写入失败   |
+---------------------+| 否v
+---------------------+
| 计算 Key 的哈希值     | --> `keyHash = indexKeyHashMethod(key)`
+---------------------+|v
+---------------------+
| 计算哈希槽位置         | --> `slotPos = keyHash % hashSlotNum`
+---------------------+|v
+---------------------+
| 计算哈希槽绝对位置      | --> `absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize`
+---------------------+|v
+---------------------+
| 读取哈希槽的当前值      | --> `slotValue = mappedByteBuffer.getInt(absSlotPos)`
+---------------------+|v
+---------------------+
| 校验 slotValue 有效性  | --> 是否无效?(slotValue <= invalidIndex || slotValue > indexCount)
+---------------------+| 是v
+---------------------+
| 将 slotValue 设为无效   | --> `slotValue = invalidIndex`
+---------------------+| 否v
+---------------------+
| 计算时间差 (timeDiff)  | --> `timeDiff = (storeTimestamp - beginTimestamp) / 1000`
+---------------------+|v
+---------------------+
| 处理 timeDiff 边界值   | --> 确保 `0 <= timeDiff <= Integer.MAX_VALUE`
+---------------------+|v
+---------------------+
| 计算索引条目绝对位置     | --> `absIndexPos = IndexHeader.INDEX_HEADER_SIZE + hashSlotNum * hashSlotSize + indexCount * indexSize`
+---------------------+|v
+---------------------+
| 写入索引条目内容        |
| - keyHash            |
| - phyOffset          |
| - timeDiff           |
| - slotValue (nextIndex)|
+---------------------+|v
+---------------------+
| 更新哈希槽指向新条目     | --> `mappedByteBuffer.putInt(absSlotPos, indexCount)`
+---------------------+|v
+---------------------+
| 更新 IndexFile 头部信息 |
| - 若 indexCount <= 1,更新 beginPhyOffset 和 beginTimestamp |
| - 若 slotValue 无效,增加 hashSlotCount |
| - 增加 indexCount      |
| - 更新 endPhyOffset 和 endTimestamp |
+---------------------+|v
+---------------------+
| 返回 true,写入成功     |
+---------------------+|v
+---------------------+
| IndexFile 是否已满?   | -- 是 --> 创建新 IndexFile
| (文件大小 ≥ 400MB)    |
+---------------------+  

源码入口:org.apache.rocketmq.store.index.IndexFile#putKey

IndexFile 查询流程:

+---------------------+
| Consumer 根据 Key 查询 |
+---------------------+|v
+---------------------+
| 计算 Key 的哈希值     | --> `keyHash = Math.abs(key.hashCode())`
+---------------------+|v
+---------------------+
| 计算哈希槽位置         | --> `slotPos = keyHash % 5,000,000`
+---------------------+|v
+---------------------+
| 读取哈希槽的链表头位置   | --> `slotValue = mappedByteBuffer.getInt(slotPos * 4)`
+---------------------+|v
+---------------------+  
| 遍历链表条目           |
| while (slotValue > 0)|
+---------------------+|v
+---------------------+
| 读取索引条目:         |
| - keyHashRead       |
| - phyOffset         |
| - timeDiff          |
| - nextIndex         |
+---------------------+|v
+---------------------+
| 检查时间范围是否匹配?   | --> `storeTime = beginTimestamp + timeDiff * 1000`
| (storeTime ∈ [begin, end]?)|
+---------------------+| 否|------------------> 跳过,继续下一个条目| 是v
+---------------------+
| 比对 keyHashRead 和 keyHash |
| (是否相等?)          |
+---------------------+| 否|------------------> 跳过,继续下一个条目| 是v
+---------------------+
| 从 CommitLog 读取实际 Key |
| (检查 Key 是否一致?)    |
+---------------------+| 否|------------------> 跳过,继续下一个条目| 是v
+---------------------+
| 返回 phyOffset       | --> 添加到结果列表
+---------------------+|v
+---------------------+
| slotValue = nextIndex| --> 继续遍历下一个条目
+---------------------+|v
+---------------------+
| 遍历结束,返回结果列表   |
+---------------------+

源码入口:org.apache.rocketmq.store.index.IndexService#queryOffset

四:IndexFile解决hash冲突问题思想

RocketMQ 的 IndexFile 通过 链地址法(Chaining) 解决哈希冲突问题,其核心思想是将哈希到同一槽位的多个索引条目组织成链表结构,并通过哈希槽(Hash Slot)与索引条目(Index Entry)的关联实现高效写入和查询。以下是具体实现思想及关键设计:


1. 哈希冲突的背景

  • 哈希冲突:不同 Key 经过哈希函数计算后可能得到相同的哈希值,导致被分配到同一个哈希槽。

  • 问题:若不处理冲突,后续 Key 的索引会覆盖已有数据,导致查询结果错误。


2. 解决冲突的核心思想:链地址法

RocketMQ 的 IndexFile 采用 单链表 结构管理同一哈希槽下的所有冲突条目,具体流程如下:

(1) 写入时的链表插入

  • 新条目插入链表头部
    当新 Key 的哈希值与某槽位已有条目冲突时,新条目会被插入链表头部,并更新哈希槽指针指向新条目。

    // 新条目的 nextIndex 指向原头节点
    this.mappedByteBuffer.putInt(absIndexPos + 16, slotValue);
    // 更新哈希槽指针为新条目位置
    this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
     
    • 优势:插入时间复杂度为 O(1),无需遍历链表。

(2) 查询时的链表遍历

  • 遍历链表比对 Key
    查询时,从哈希槽指向的链表头节点开始,依次遍历所有条目,通过两次比对(哈希值 + 实际 Key)过滤冲突。

    while (nextIndexToRead > 0) {// 1. 读取条目内容int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);// 2. 比对哈希值if (keyHashRead == keyHash) {// 3. 从 CommitLog 读取实际 Key 比对String keyStored = readKeyFromCommitLog(phyOffsetRead);if (key.equals(keyStored)) {phyOffsets.add(phyOffsetRead);}}// 4. 移动到下一个节点nextIndexToRead = prevIndexRead;
    }

3. 关键设计优化

(1) 哈希槽数量固定

  • 默认 500 万个哈希槽

    private static final int HASH_SLOT_NUM = 5000000; // 默认槽数
    • 目的:通过大量槽位减少哈希冲突的概率,使冲突链表尽可能短。

    • 权衡:槽数过多会占用更多内存,但查询效率更高。

(2) 时间范围过滤

  • 索引条目存储时间差(timeDiff)
    每个索引条目记录消息存储时间与 IndexFile 起始时间的差值(秒级),查询时快速过滤掉不满足时间范围的条目。

    long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff * 1000L;
    if (timeRead < begin || timeRead > end) {continue; // 跳过不符合时间条件的条目
    }
    • 优势:减少无效条目的遍历,提升查询性能。

(3) 文件滚动(Rolling)

  • 按时间或大小滚动
    IndexFile 文件默认大小上限为 400MB,或时间跨度超过阈值时,创建新文件。

    • 目的:避免单个文件过大导致链表过长,同时支持按时间范围快速定位文件。

4. 示例场景

写入冲突场景

  • Key1: Ea#20231001123456 → 哈希值 19583063 → 槽位 18332292

  • Key2: FB#20231001123456 → 哈希值 19583063 → 槽位 18332292(冲突)

  • 处理流程

    1. Key1 写入槽位 18332292,链表头指向 Key1。

    2. Key2 写入时,插入链表头部,槽位指针更新为 Key2,Key2 的 nextIndex 指向 Key1。

查询冲突场景

  • 查询 Key: Ea#20231001123456

    1. 哈希计算定位到槽位 18332292。

    2. 遍历链表:

      • 先读取 Key2(哈希值匹配但 Key 不匹配,跳过)。

      • 再读取 Key1(哈希值 + Key 均匹配,返回 phyOffset)。

hash冲突代码调试示例

 public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("producerGroup");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();Message msg = new Message("Ea", "TagA" , ("消息1").getBytes(RemotingHelper.DEFAULT_CHARSET));msg.setKeys("20231001123456");producer.sendOneway(msg);Message msg2 = new Message("FB", "TagA" , ("消息3").getBytes(RemotingHelper.DEFAULT_CHARSET));msg2.setKeys("20231001123456");producer.sendOneway(msg2);producer.shutdown();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/70021.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

注解与反射基础

注解 概述 注解&#xff08;Annotation&#xff09;&#xff0c;从jdk5.0引入。 作用 不是程序本身&#xff0c;可以对程序作出解释&#xff08;这一点和注释没什么区别&#xff09;可以被其他程序读取 格式 注释是以“注释名”在代码中存在的&#xff0c;还可以添加一些…

SliverAppBar的功能和用法

文章目录 1 概念介绍2 使用方法3 示例代码 我们在上一章回中介绍了SliverGrid组件相关的内容&#xff0c;本章回中将介绍SliverAppBar组件.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1 概念介绍 我们在本章回中介绍的SliverAppBar和普通的AppBar类似&#xff0c;它们的…

BFS(广度优先搜索)——搜索算法

BFS&#xff0c;也就是广度&#xff08;宽度&#xff09;优先搜索&#xff0c;二叉树的层序遍历就是一个BFS的过程。而前、中、后序遍历则是DFS&#xff08;深度优先搜索&#xff09;。从字面意思也很好理解&#xff0c;DFS就是一条路走到黑&#xff0c;BFS则是一层一层地展开。…

数据库 - Sqlserver - SQLEXPRESS、由Windows认证改为SQL Server Express认证进行连接 (sa登录)

本文讲SqlServer Express版本在登录的时候&#xff0c; 如何由Windows认证&#xff0c;修改为Sql Server Express认证。 目录 1&#xff0c;SqlServer Express的Windows认证 2&#xff0c;修改为混合认证 3&#xff0c;启用sa 用户 4&#xff0c;用sa 用户登录 下面是详细…

GWO优化SVM回归预测matlab

灰狼优化算法&#xff08;Grey Wolf Optimizer&#xff0c;简称 GWO&#xff09;&#xff0c;是由澳大利亚格里菲斯大学的 Mirjalii 等人于 2014 年提出的群智能优化算法。该算法的设计灵感源自灰狼群体的捕食行为&#xff0c;核心思想是对灰狼社会的结构与行为模式进行模仿。 …

elasticsearch8.15 高可用集群搭建(含认证Kibana)

文章目录 1.资源配置2.系统参数优化3.JDK17安装4.下载&安装ES 8.155.生成ES的证书(用于ES节点之间进行安全数据传输)6.修改ES 相关配置文件7.创建es用户并启动8.配置ES的账号和密码(用于ES服务端和客户端)9.下载和安装Kibana10.编辑Kibana配置文件11.启动Kiabana12.访问Kia…

地址查询API接口:高效查询地址信息,提升数据处理效率

地址查询各省市区API接口 地址查询是我们日常生活中经常遇到的一个需求&#xff0c;无论是在物流配送、地图导航还是社交网络等应用中&#xff0c;都需要通过地址来获取地理位置信息。为了满足这个需求&#xff0c;我们可以使用地址查询API接口来高效查询地址信息&#xff0c;提…

3、C#基于.net framework的应用开发实战编程 - 实现(三、三) - 编程手把手系列文章...

三、 实现&#xff1b; 三&#xff0e;三、编写应用程序&#xff1b; 此文主要是实现应用的主要编码工作。 1、 分层&#xff1b; 此例子主要分为UI、Helper、DAL等层。UI负责便签的界面显示&#xff1b;Helper主要是链接UI和数据库操作的中间层&#xff1b;DAL为对数据库的操…

vscode软件操作界面UI布局@各个功能区域划分及其名称称呼

文章目录 abstract检查用户界面的主要区域官方文档关于UI的介绍 abstract 检查 Visual Studio Code 用户界面 - Training | Microsoft Learn 本质上&#xff0c;Visual Studio Code 是一个代码编辑器&#xff0c;其用户界面和布局与许多其他代码编辑器相似。 界面左侧是用于访…

类和对象(下)——类型转化 static成员 内部类 匿名对象 拷贝对象优化

一、类型转换 1.1 类型转化特点 C支持内置类型隐式类型转换为类类型对象&#xff0c;需要有相关内置类型为参数的构造函数。构造函数前面加explicit就不再支持隐式类型转换。类类型的对象之间也可以隐式转换&#xff0c;需要相应的构造函数支持 内置类型转换为类类型对象&#…

基于场景图的零样本目标导航

参考论文&#xff1a;SG-Nav&#xff1a;Online 3D Scene Graph Prompting for LLM-based Zero-shot Object Navigation 0 前言 基于现成的视觉基础模型VFMs和大语言模型LLM构建了无需任何训练的零样本物体巡航框架SG-Nav。 通过VLMs将机器人对场景的观测构建为在线的3D场景图…

开屏广告-跳过神器

给大家介绍一款超实用的软件——SKIP&#xff0c;它堪称李跳跳的最佳平替&#xff01;这款软件已经在Github开源免费&#xff0c;完全无需担心内置源问题&#xff0c;也无需导入任何规则。安装完成后&#xff0c;即可直接使用&#xff0c;非常便捷&#xff01; 首次打开软件时…

大模型本地化部署(Ollama + Open-WebUI)

文章目录 环境准备下载Ollama模型下载下载Open-WebUI 本地化部署的Web图形化界面本地模型联网查询安装 Docker安装 SearXNG本地模型联网查询 环境准备 下载Ollama 下载地址&#xff1a;Ollama网址 安装完成后&#xff0c;命令行里执行命令 ollama -v查看是否安装成功。安装成…

自制虚拟机(C/C++)(三、做成标准GUI Windows软件,扩展指令集,直接支持img软盘)

开源地址:VMwork 要使终端不弹出&#xff0c; #pragma comment(linker, "/subsystem:windows /ENTRY:mainCRTStartup") 还要实现jmp near 0x01类似的 本次的main.cpp #include <graphics.h> #include <conio.h> #include <windows.h> #includ…

【游戏设计原理】97 - 空间感知

一、游戏空间的类型 将游戏设计中的空间设计单独提取出来&#xff0c;可以根据其结构、功能和玩家的交互方式划分为以下几种主要类型。这些类型可以单独存在&#xff0c;也可以组合使用&#xff0c;以创造更加复杂和有趣的游戏体验。 1. 线性空间 定义&#xff1a;空间设计是…

基于开源AI智能名片2 + 1链动模式S2B2C商城小程序视角下的个人IP人设构建研究

摘要&#xff1a;本文深入探讨在开源AI智能名片2 1链动模式S2B2C商城小程序的应用场景下&#xff0c;个人IP人设构建的理论与实践。通过剖析个人IP人设定义中的“诉求”“特质”“可感知”三要素&#xff0c;结合该小程序特点&#xff0c;阐述其对个人IP打造的影响与推动作用&…

数据库和数据表的创建、修改、与删除

1.标识符命名规则 数据库名、表名不得超过30个字符&#xff0c;变量名限制为29个 必须只能包含A-Z,a-z,0-9,_共63个字符 数据库名、表名、字段名等对象名中间不能包含空格 同一个MySQL软件中&#xff0c;数据库不能同名&#xff1b;同一个库中&#xff0c;表不能重名&#…

算法日记10:SC62求和(单调栈)(共享求解)

一、题目 二、题解&#xff1a; 1、首先&#xff0c;我们看到题目的第一个想法&#xff0c;就是把样例答案如何求解给列出来&#xff0c;图例如下 2、通过分析样例&#xff0c;可以很清晰的发现每一个数字都有其管辖的区间&#xff0c;因此我们可以想到能否找到一个数字它所管…

Revit二次开发 自适应族添加放样融合

大多数博客给出的方案都是如何在有自适应族的情况下进行修改定位点或是将数据传入自适应族,如何直接在族文件中创建自适应模型并将点转换为自适应点,连接自适应点成为自适应路径这种方式没有文章介绍. 下面的代码中给出了如何在自适应族文件中创建参照点并转换为自适应点连接…

基于VMware的ubuntu与vscode建立ssh连接

1.首先安装openssh服务 sudo apt update sudo apt install openssh-server -y 2.启动并检查ssh服务状态 到这里可以按q退出 之后输入命令 &#xff1a; ip a 红色挡住的部分就是我们要的地址&#xff0c;这里就不展示了哈 3.配置vscode 打开vscode 搜索并安装&#xff1a;…