深度解析RocketMq源码-消费者索引ConsumeQueue

1.绪论

rocketmq的broker中关于消息持久化的组件主要包含三个,分别是:持久化消息到文件中的组件commitLog;根据消息key索引commitLog日志的indexFile;消费者根据topic和queueId查询commitLog日志的consumeQueue。前面已经介绍commitLog和indexFile,这节我们就来学习一下consumeQueue。

2. consumeQueue

4.1.1 consumeQueue的中每条消息的组成

消费者在消息的时候,只知道它在哪个topic的哪个queue里面消费哪个tags的拿条消息,那我们怎么由此定位到一条消息呢?所以我们需要为commitLog建立一条索引。

其实每个topic+queueId都会建立一个ConsumeQueue文件,而这个映射关系存储在broker中consumeQueueTable中,我们查询消息的时候,过consumeQueueTable根据queueId和topic快速定位到我们需要的ConsumeQueue,然后我们再根据消费者所提交的consumeQueueOffset*每条consumequeue索引的大小便能找到我们所以要的consume索引文件的位置,再根据里面存储的commitLog的物理偏移量便能在commitLog中定位到具体的消息的位置。

commitLog的存储结构可以如下所示:

└── TopicTest
    ├── 0
            └── 00000000000000000000
    ├── 1
            └── 00000000000000000000
    ├── 2
            └── 00000000000000000000
    └── 3
            └── 00000000000000000000

可以通过下图来说明,consumequeue是如何组成的,并且和commitLog的关系:

3.consumeQueue中消息的创建和获取

consumequeue其实底层和commitLog是一样的,其实由多个mappedFile来构成的,这里我们就不在讨论consumequeue的具体存储逻辑。有兴趣的小伙伴可以看这篇文章:

深度解析RocketMq源码-持久化组件(四) CommitLog_rocketmq一主一备commitlog-CSDN博客

接下来我们主要看一看consumequeue是什么时候创建的,并且在消费者知道它需要消费的topic和queueId过后,如何找到它具体要消费的哪条消息的,这其实也是rocketmq的核心之一。

3.1. consumequeue建立消息

其实consumequeue是一个采用mappedFile持久化数据的组件,它写入数据其实发分两步:

1.根据topic和queueId在topic和queueid与consumequeue的映射表(consumeQueueTable)中找到Consumequeue。

2.构造一条consumequeue记录,包括8字节的8字节的commitLog的offset + 4字节的消息大小+8自己的tagsCode,然后顺序写入到consumequeue中。

3.1.1 根据topic和queueId找到需要写入的consumequeue

  public void putMessagePositionInfo(DispatchRequest dispatchRequest) {//根据topic和queueId找到对应的consumequeueConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());//根据consumequeue顺序写入consumemequeue的索引数据cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));}
 //根据queueId和topic检索到这个topic和queueId是属于哪个topic的public ConsumeQueue findConsumeQueue(String topic, int queueId) {//根据consumeQueueTable和queueId获取到具体的ConsumeQueueConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}ConsumeQueue logic = map.get(queueId);if (null == logic) {//如果consumeQueue为空,便新建一个ConsumequeueConsumeQueue newLogic = new ConsumeQueue(topic,queueId,StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),this);//并且设置到consumeQueueTable中ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {if (MixAll.isLmq(topic)) {lmqConsumeQueueNum.getAndIncrement();}logic = newLogic;}}return logic;}

3.1.2 向consumequeue中写入一条记录

//commitLog的真正组成8字节的commitLog的offset + 4字节的消息大小+8自己的tagsCodeprivate boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) {if (offset + size <= this.maxPhysicOffset) {log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);return true;}this.byteBufferIndex.flip();//consumqueue的每条数据占20个字节this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);//8字节的commotLog的物理偏移量this.byteBufferIndex.putLong(offset);//4字节消息大小this.byteBufferIndex.putInt(size);//8字节的tagsCodethis.byteBufferIndex.putLong(tagsCode);//写入到哪个offsetfinal long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;//获取到offset的最后一条数据MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);if (mappedFile != null) {if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {//将跟新mappedFilde1写指针为expectLogicOffsetthis.minLogicOffset = expectLogicOffset;this.mappedFileQueue.setFlushedWhere(expectLogicOffset);this.mappedFileQueue.setCommittedWhere(expectLogicOffset);this.fillPreBlank(mappedFile, expectLogicOffset);log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "+ mappedFile.getWrotePosition());}if (cqOffset != 0) {//long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();if (expectLogicOffset < currentLogicOffset) {log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);return true;}if (expectLogicOffset != currentLogicOffset) {LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset,currentLogicOffset,this.topic,this.queueId,expectLogicOffset - currentLogicOffset);}}//实际的物理地址为offset+size大小this.maxPhysicOffset = offset + size;//将数据写入到bytebuffer中return mappedFile.appendMessage(this.byteBufferIndex.array());}return false;}

3.2 根据topic和messagequeue检索消息

先根据topic和queueId找到以及consumeoffset查询一条消息分三步:

1.根据topic和queueId查询到对应的consumequeue;

2.根据consumeoffset在consumequeue中找到一条consumequeue的记录,里面包含一个属性就是实际消息在commitLog中的物理偏移量和大小;

3.根据物理偏移量和消息大小在commitLog中获取到实际消息内容。

    //根据queueId和topic检索到这个topic和queueId是属于哪个topic的public ConsumeQueue findConsumeQueue(String topic, int queueId) {//根据consumeQueueTable和queueId获取到具体的ConsumeQueueConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}ConsumeQueue logic = map.get(queueId);if (null == logic) {//如果consumeQueue为空,便新建一个ConsumequeueConsumeQueue newLogic = new ConsumeQueue(topic,queueId,StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),this);//并且设置到consumeQueueTable中ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {if (MixAll.isLmq(topic)) {lmqConsumeQueueNum.getAndIncrement();}logic = newLogic;}}return logic;}

再根据consumeoffset在consumequeue中获取到具体consumequeue的索引数据。

public boolean get(final long address, final CqExtUnit cqExtUnit) {if (!isExtAddr(address)) {return false;}final int mappedFileSize = this.mappedFileSize;final long realOffset = unDecorate(address);MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(realOffset, realOffset == 0);if (mappedFile == null) {return false;}int pos = (int) (realOffset % mappedFileSize);SelectMappedBufferResult bufferResult = mappedFile.selectMappedBuffer(pos);if (bufferResult == null) {log.warn("[BUG] Consume queue extend unit({}) is not found!", realOffset);return false;}boolean ret = false;try {ret = cqExtUnit.read(bufferResult.getByteBuffer());} finally {bufferResult.release();}return ret;}

4.总结

至此,我们已经大概了解消息在进入到broker过后做了什么。在生产者推送消息到broker过后,为了保证数据的能够快速的持久化,是直接按照到达顺序写入到commitLog中的,然后就会给主线程返回生产消息成功的通知。但是消费者需要根据topic和queueId获取到一条消息,并且需要根据消息的key检索一条消息。为了满足上述两个需求,rocketmq会启动一个线程,扫描commitLog,如果有新的消息写入,便会构建IndexFile和consumequeue两个文件,其实相当于两个索引文件。这一步骤在我们后面章节会详细介绍。

其实先持久化文件,然后启动线程对消息做其他处理,这一思想的本质就是为了增大吞吐量。在其他框架中也会应用到这种思想,比如elasticsearch中,在写消息的时候,会同时写入到transLog和memory buffer中后便会返回成功,后续单独启动线程根据memory buffer中的数据来进行其他操作,比如分词,建立倒排索引等,可以看出translog其实就类似于rokcetmq的commitLog。所以万变不离其中,只要有一份持久化数据过后,便可以跟客户端返回成功了,然后再单独的启动线程根据这份持久化数据做定制化处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/37167.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用QGIS进行研究区域制图实战

目录 前言 一、QGIS的版本和数据介绍 1、关于QGIS版本 2、需要准备的数据 二、准备制图 1、制作全国区位图 2、矢量和遥感信息的编辑 三、出图编辑 1、设置主题信息 2、打印布局制作 3、美化地图 总结 前言 俗话说“一图胜千言”&#xff0c;在地理信息的领域中&…

Feign:简化微服务通信的利器

介绍 1.1 什么是 Feign&#xff1f; Feign 是一个声明式、模板化的 HTTP 客户端&#xff0c;它简化了编写 Web 服务客户端的过程。它的主要目的是使 HTTP API 客户端的开发变得更加简单和直观。Feign 的设计理念是将 HTTP 客户端的细节隐藏在背后&#xff0c;使开发者可以专注…

ai智能语音机器人在电销里发挥怎样的作用

得益于语音识别技术的的进步&#xff0c;人工智能发展越来越成熟。相信作为企业的管理者&#xff0c;都遇到过这样的事&#xff1a;一个电销新人刚刚入行&#xff0c;需求经过一两个月的学习培训才能成为一名合格的销售人员。在这段学习的期间&#xff0c;企业投入的成本是没有…

使用 Selenium 获取 Web 页面信息的全指南

目录 为什么使用 Selenium 获取页面信息Selenium 基础设置获取页面标题获取当前 URL获取页面源代码获取元素的文本获取元素的属性获取 Cookie截图示例代码总结 正文 1. 为什么使用 Selenium 获取页面信息 在 Web 自动化测试和数据抓取中&#xff0c;获取页面信息是一个基本…

来聊聊nacos

先关注下下方公众号呗&#xff1a; 第1部分&#xff1a;引言 微服务的挑战 尽管微服务架构带来了许多好处&#xff0c;如敏捷性、可扩展性和容错性&#xff0c;但它也带来了一些挑战&#xff0c;特别是在服务发现、配置管理、服务间通信和运维管理方面。这些挑战需要有效的解…

opencv 图像的缩放(放大,缩小),翻转与旋转

目录 opencv 图像的缩放(放大&#xff0c;缩小)&#xff0c;翻转&#xff0c;旋转1、图像的缩放&#xff0c;旋转过程中为什么需要插值&#xff1a;2、常见的插值算法包括&#xff1a;3、图像的缩放&#xff0c;翻转&#xff0c;旋转&#xff1a;&#xff08;1&#xff09;图像…

混凝土搅拌站中的智能化系统应用

随着科技的飞速发展&#xff0c;混凝土搅拌站已经进入了现代化、智能化的新时代。现代自动化、智能化技术的应用&#xff0c;使得混凝土搅拌站更加高效、准确、可靠&#xff0c;同时也提高了生产效率和质量。本文将带你深入探索混凝土搅拌站中运用到现代自动化、智能化的方方面…

Java代码基础算法练习-删除有序数组中的重复项-2024.05.07

任务描述&#xff1a; 有一批同学需要计算各自的出生年月是否闰年。请使用算法计算出他们的出生年份是否闰年。 解决思路&#xff1a; 如果要一次性输出结果&#xff0c;就是先输入数字n&#xff0c;确定首先循环几次&#xff0c;在每次循环中进行闰年判断操作&#xff0c;每次…

国外的Claude3.5 Sonnet Artifacts和国内的CodeFlying孰强孰弱?

在Claude 3.5 Sonnet发布后&#xff0c;最受大家关注的问题应该就是它在编写代码能力上的变化。 要知道在Claude3.0发布以来的这几个月就因为它的编写代码能力而一直受到人们的诟病。 那Anthropic这次终于是不负众望&#xff0c;在Claude 3.5 Sonnet中更新了一个叫做Artifact…

【STM32】SysTick系统滴答定时器

1.SysTick简介 CM4内核的处理和CM3一样&#xff0c;内部都包含了一个SysTick定时器&#xff0c;SysTick 是一个24 位的倒计数定时器&#xff0c;当计到0 时 &#xff0c;将 从RELOAD 寄存器中自动重装载定时初值。只要不把它在SysTick 控制及状态寄存器中的使能位清除&#xf…

使用阿里云效API操作流水线

使用阿里云效&#xff08;Alibaba Cloud DevOps&#xff09;API操作流水线时&#xff0c;需要注意以下几个方面&#xff1a; 认证与授权 确保你已经获取了正确的访问凭证&#xff08;AccessKey ID 和 AccessKey Secret&#xff09;&#xff0c;并且这些凭证具有足够的权限来执行…

优维“统一开放平台”:开放、开发、集成、客制化

基于丰富完善的产品体系&#xff0c;优维重磅推出了统一开放平台。这款由优维自主设计与研发&#xff0c;集数据开发、能力开放、能力集成、客制化为一体的统一开放平台&#xff0c;具备应用市场、应用开发、连接能力、采控平台、API集市、开发者工具等功能模块&#xff0c;可为…

探索MySQL的执行奥秘:从查询执行到数据存储与优化的深入解析

MySQL是一个功能强大且广泛应用的关系数据库管理系统。理解MySQL的执行机制、优化策略以及数据存储方式&#xff0c;对于数据库开发和管理至关重要。本文将详细解析这些内容&#xff0c;通过具体实例和实用建议&#xff0c;帮助读者深入掌握MySQL的高级特性。 一、MySQL的执行…

【RNN练习】LSTM-火灾温度预测

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 前期准备工作 import torch.nn.functional as F import numpy as np import pandas as pd import torch from torch import nn1. 导入数据 data pd.read_cs…

Linux中信号的机制

在操作系统中,信号是一种软件中断,用于通知进程某个事件已经发生。信号可以分为两大类:同步信号和异步信号。 同步信号(Synchronous Signals) 同步信号通常是由于进程执行中的异常情况引起的,比如违反内存访问规则(段错误),执行非法指令(非法指令),或其他导致核心…

LLM大模型实战 —— DB-GPT阿里云部署指南

简介&#xff1a; DB-GPT 是一个实验性的开源应用&#xff0c;它基于FastChat&#xff0c;并使用vicuna-13b作为基础模型, 模型与数据全部本地化部署, 绝对保障数据的隐私安全。 同时此GPT项目可以直接本地部署连接到私有数据库, 进行私有数据处理&#xff0c; 目前已支持SQL生…

慌慌张张,匆匆忙忙,又是学习的一天

今天学进程 进程的状态 &#xff08;本科的考点我记得哈哈&#xff09; 什么是线程 线程的状态 线程和进程的区别 一个共享 一个私有 独立 多线程的优缺点 线程的分类 内核支持线程 用户级线程 组合方式线程 协程coroutine 进程 分配资源的最小单位 线程 是cpu调度的最小…

Spring AI之后,阿里推出Spring Cloud Alibaba AI,接入体验篇——Java也能方便用 AI

阿里推出Spring Cloud Alibaba AI&#xff0c;接入体验篇——Java也能方便用 AI 1.Spring AI2.Spring Cloud Alibaba AI3. 接入体验 1.Spring AI Spring AI 是 Spring 官方社区项目&#xff0c;旨在简化 Java AI 应用程序开发&#xff0c;让 Java 开发者像使用 Spring 开发普通…

NSSCTF-Web题目18(反序列化)

目录 [NISACTF 2022]babyserialize 1、题目 2、知识点 3、思路 [SWPUCTF 2022 新生赛]ez_ez_unserialize 4、题目 5、知识点 6、思路 [NISACTF 2022]babyserialize 1、题目 2、知识点 反序列化、绕过过滤、命令执行 3、思路 <?php include "waf.php";…

基于Vue,mysql,JavaEE的简单投票与投票管理系统

项目介绍 ​ 本项目&#xff0c;基于Vue2.6,mysql,JavaEE 实现简单的投票与投票管理系统 项目地址 VotingSystem: 投票系统1.0 管理员和普通用户 (gitee.com) 有问题请评论私聊哦 项目分类 数据库 创建投票人&#xff0c;被投票人&#xff0c;投票关系&#xff08;追踪谁…