8、Broker进一步了解

1、Broker消息分发服务以及构建ConsumeQueue和IndexFile与消息清除

前面分析如何进行刷盘,本章分析Broker的消息分发以及构建ConsumerQueue和IndexFile,两者构建是为了能够提高效率,减少消息查找时间以及减少网络带宽与存储空间。

ConsumeQueue 可以快速定位消息,IndexFile 可以快速定位 ConsumeQueue 中的位置,并随时实时更新,以保证消息的消费速度。

ConsumeQueue 和 IndexFile 采用了压缩存储和分片存储的方式,消费者可以仅仅消费需要的消息,从而节省存储空间。

ConsumeQueue 可以快速定位消息,减少消息传递所消耗的网络带宽,提高消息传递效率。

消息分发

在执行Broker的启动方法会去启动消息分发服务,简单理解就是为消息建立一种索引更加快速有效的消费查询消息。
broker处理消息分发的类是ReputMessageService,启动一个线程不断地将commitLong分到到对应的consumerQueue与indexFile,消息分发流程处理完毕;


public void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 休眠1s,进行分发ConsumeQueue和IndexFileThread.sleep(1);this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

消息每隔1s进行消息的分发,尝试去构建ConsumerQueue索引与

IndexFile索引。
private void doReput() {if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();}// 如果分发偏移量小于commitlog的最大物理偏移量,那么循环分发for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {break;}// 从commitLog中获取需要分发的消息SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);if (result != null) {try {this.reputFromOffset = result.getStartOffset();for (int readSize = 0; readSize < result.getSize() && doNext; ) {DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();if (dispatchRequest.isSuccess()) {if (size > 0) {// CommitLog日志分发到ConsumeQueue和IndexFileDefaultMessageStore.this.doDispatch(dispatchRequest);// 长轮询:如果有消息到了主节点,并且开启了长轮询。(消息拉取逻辑后面单独讲)if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()&& DefaultMessageStore.this.messageArrivingListener != null) {//messageArrivingListener实例是NotifyMessageArrivingListener//通知被hold住的消费者拉取消息的请求DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());notifyMessageArrive4MultiQueue(dispatchRequest);}this.reputFromOffset += size;readSize += size;if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).add(dispatchRequest.getMsgSize());}} else if (size == 0) {// 如果等于0,表示读取到MappedFile文件尾// 获取下一个文件的起始索引this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) {if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;// If user open the dledger pattern or the broker is master node,// it will not ignore the exception and fix the reputFromOffset variableif (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",this.reputFromOffset);this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext = false;}}
}

通过getData()方法来获取需要进行分发的消息,根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile,然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer。
依次进行消息的分发调用doDispatch方法,把消息写入ConsumerQueue与IndexFile中。
如果当前节点是主节点且开启长轮询,则调用messageArrivingListener.arriving方法进行消息的投递,具体长轮询的方案后期介绍。
构建ConsumerQueue与IndexFile


public void doDispatch(DispatchRequest req) {//dispatchList中包含CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex,// 分别用来构建ConsumeQueue文件和IndexFile文件。for (CommitLogDispatcher dispatcher : this.dispatcherList) {dispatcher.dispatch(req);}
}

调用dispatch进行消息的分发,其中包括三个实现类。

CommitLogDispatcherBuildConsumeQueue类:写ConsumeQueue文件,构建ConsumeQueue索引。

CommitLogDispatcherBuildIndex类:写IndexFile文件,构建IndexFile索引。

CommitLogDispatcherCalcBitMap类:构建布隆过滤器,加速SQL92过滤效率,避免每次都解析sql。

消息清理

当消息到达一定程度就会被RocketMQ丢弃,而在Broker启动会去添加定时删除文件的任务,60s之后执行第一个后面每隔10s执行一次。

// 过期文件删除
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {// 清除方法DefaultMessageStore.this.cleanFilesPeriodically();}
}private void cleanFilesPeriodically() {// 删除CommitLog文件this.cleanCommitLogService.run();// 清除ConsumerQueue文件和IndexFile文件this.cleanConsumeQueueService.run();
}

清除的主要逻辑:首先清除CommitLog文件,然后在清除ConsumerQueue文件和IndexFile文件。

CommitLog文件的清除
// 定期删除CommitLog文件的地方
public void run() {try {// 删除文件this.deleteExpiredFiles();// 删除挂起文件this.redeleteHangedFile();} catch (Throwable e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}
}private void deleteExpiredFiles() {int deleteCount = 0;long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();//K2 Broker触发文件删除的三个条件boolean timeup = this.isTimeToDelete(); // deleteWhen是否达到,配置项boolean spacefull = this.isSpaceToDelete(); // 判断磁盘是否达到删除,diskmax配置项boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; // 手动删除次数// 并未判断消息是否已经被消费,所以当长时间积压的消息,触发上面三个条件之一就会导致消息丢失未消费的情况,所以我们得避免消息积压。if (timeup || spacefull || manualDelete) {if (manualDelete)this.manualDeleteFileSeveralTimes--;boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",fileReservedTime,timeup,spacefull,manualDeleteFileSeveralTimes,cleanAtOnce);fileReservedTime *= 60 * 60 * 1000;deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,destroyMapedFileIntervalForcibly, cleanAtOnce);if (deleteCount > 0) {} else if (spacefull) {log.warn("disk space will be full soon, but delete file failed.");}}
}

首先删除文件需要满足的要求是其一deleteWhen是否达到默认是04(可配置),其二判断磁盘是否达到删除默认75(可配置),其三手动删除。

触发过期⽂件删除时,有两个检查的纬度,⼀个是,是否到了触发删除的时间,就是broker.conf⾥配置的deleteWhen属性。另外还会检查磁盘利⽤率,达到阈值也会触发过期⽂件删除。这个阈值默认是75%,可以在broker.conf⽂件当中定制。但是最⼤值为95,最⼩值为10。

注意:观察源码可得 并未判断消息是否已经被消费,所以当长时间积压的消息,触发上面三个条件之一就会导致消息丢失未消费的情况,所以我们得避免消息积压。

如果删除的时候我们的文件正在被使用这就导致删除不会生效此时就需要redeleteHangedFile方法区删除挂起文件。

ConsumerQueueIndexFile清理
public void run() {try {// 删除过期的filethis.deleteExpiredFiles();} catch (Throwable e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}
}private void deleteExpiredFiles() {// 间隔100int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();if (minOffset > this.lastPhysicalMinOffset) {this.lastPhysicalMinOffset = minOffset;ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {for (ConsumeQueue logic : maps.values()) {// 进行consumerQueue删除int deleteCount = logic.deleteExpiredFile(minOffset);if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {try {Thread.sleep(deleteLogicsFilesInterval);} catch (InterruptedException ignored) {}}}}// 进行IndexFile删除DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);}
}

在删除ConsumeQueue和IndexFile⽂件时,会去检查CommitLog当前的最⼩Offset,然后在删除时进⾏对⻬。

2、Broker端整体流程以及文件索引结构

至此我们的Producer发送消息以及Broker如何处理存储消息以及介绍完了,首先我们需要去整理一下整个流程,然后后面再来分析Consumer端的消费,以及各种消息类型的案例源码等。

发送端流程总结

生产者发送消息写入内存推送给Broker进行刷盘入CommitLog文件,同时处理分发将消息索引存放到ConsumerQueue与IndexFile文件当中。

如果是主从架构就会去进行主从之间的同步。

接下来当消息达到一定条件就会触发删除机制,程序注册了一个定时任务首次60s执行后面每隔10s去执行一次清理工作。注意:删除文件并没有判断是否被消费过了只要达到要求就会删除继而导致消息的丢失,所以我们得去避免消息的积压。
在这里插入图片描述
索引结构

CommitLog⽂件的⼤⼩是固定的,但是其中每个存储的每个消息单元长度是不固定的,在源码CommitLog类的calMsgLength方法有计算消息长度的方法。

protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;final int msgLen = 4 //TOTALSIZE+ 4 //MAGICCODE+ 4 //BODYCRC+ 4 //QUEUEID+ 4 //FLAG+ 8 //QUEUEOFFSET+ 8 //PHYSICALOFFSET+ 4 //SYSFLAG+ 8 //BORNTIMESTAMP+ bornhostLength //BORNHOST+ 8 //STORETIMESTAMP+ storehostAddressLength //STOREHOSTADDRESS+ 4 //RECONSUMETIMES+ 8 //Prepared Transaction Offset+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY+ 1 + topicLength //TOPIC+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength+ 0;return msgLen;
}

正因为消息的记录⼤⼩不固定,所以RocketMQ在每次存CommitLog⽂件时,都会去检查当前CommitLog⽂件空间是否⾜够,如果不够的话,就重新创建⼀个CommitLog⽂件,⽂件名为当前消息的偏移量。

ConsumeQueue⽂件主要是加速消费者的消息索引。他的每个⽂件夹对应RocketMQ中的⼀个MessageQueue,⽂件夹下的⽂件记录了每个MessageQueue中的消息在CommitLog⽂件当中的偏移量。这样,消费者通过ComsumeQueue⽂件,就可以快速找到CommitLog⽂件中感兴趣的消息记录。⽽消费者在ConsumeQueue⽂件当中的消费进度会保存在config/consumerOffset.json⽂件当中。

ConsumerQueue=30w固定大小20byte的数据块组成(8字节msgPhyOffset表示起始位置+4字节msgSize文件占用长度+8字节magTagCode消息的tag的Hash值)。

IndexFile⽂件主要是辅助消息检索。他的作⽤主要是⽤来⽀持根据key和timestamp检索消息。它的⽂件名⽐较特殊不以消息偏移量命名⽽是⽤的时间命名,但是其实它也是⼀个固定⼤⼩的⽂件。

IndexFile=固定40字节indexHeader+固定20字节的个数500w的slot+固定20字节最多个数500W*4的index。

indexHeader表示文件头,slots表示一系列的槽位,index表示真正的索引,槽位里面存放index,槽位紧挨着indexHeader而每个槽位里面最多存放4个index。

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/202402.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mac电池最大充电限制工具 AlDente Pro中文 for Mac

Pro 版特有功能 热保护&#xff1a;在电池温度较高时为电池充电会导致电池老化更快。启用热保护后&#xff0c;当电池温度过高时&#xff0c;充电将自动停止。 航行模式&#xff1a;通常情况下&#xff0c;即使激活了最大电池充电&#xff0c;您的 MacBooks 电池也会始终稍微充…

7.上传project到服务器及拉取服务器project到本地、更新代码冲突解决

1.上传project到SVN服务器 1.在eclipse中&#xff0c;从show view里调出SVN资源库视图 2.在SVN资源库窗口的空白位置右键选择新建资源库位置 3.填好服务器的地址 4.资源库导入成功,SVN资源库视图下出现导入的资源库 5.新建project 6.写好project的初始版本 7.右键project --&…

激光雷达生成的图像检测关键点用来辅助里程计的方案

文章&#xff1a;LiDAR-Generated Images Derived Keypoints Assisted Point Cloud Registration Scheme in Odometry Estimation 作者&#xff1a;Haizhou Zhang , Xianjia Yu, Sier Ha , Tomi Westerlund 编辑&#xff1a;点云PCL 欢迎各位加入知识星球&#xff0c;获取PDF…

数据结构与算法编程题41

线性表中各结点的检索概率不等时&#xff0c;可用如下策略提高顺序检索的效率&#xff1a; 若找到指定的结点&#xff0c;则将该结点和其前驱结点&#xff08;若存在&#xff09;交换&#xff0c;使得经常被检索 的结点尽量位于表的前端。试设计在顺序结构的线性表上实现上述策…

DCGAN生成网络模型

DCGAN&#xff08;Deep Convolutional Generative Adversarial Network&#xff09;是一种生成对抗网络&#xff08;GAN&#xff09;的变体&#xff0c;专门设计用于生成图像。它结合了卷积神经网络&#xff08;CNN&#xff09;和生成对抗网络的概念&#xff0c;旨在生成具有高…

mysql基础之DQL基本单表查询

学习DQL之前先知道sql语句的执行顺序 from->join->on->where->group by->count(字段)->having->select->distinct->order by->limit null无法和任何值进行比较&#xff08;不相等&#xff09;&#xff0c;包括null和null也不相等 1.DQL简单查询…

免费好用的5个AI写作工具,如何更好的使用AI写作工具

人工智能&#xff08;AI&#xff09;作为当今科技领域的热门话题&#xff0c;正在以惊人的速度改变我们生活的方方面面。从智能助手到自动驾驶汽车&#xff0c;AI的应用已经渗透到我们日常的方方面面。 1. 什么是AI人工智能&#xff1f; 什么是AI人工智能&#xff1f;简而言之…

CCF编程能力等级认证GESP—C++1级—20230318

CCF编程能力等级认证GESP—C1级—20230318 单选题&#xff08;每题 2 分&#xff0c;共 30 分&#xff09;判断题&#xff08;每题 2 分&#xff0c;共 20 分&#xff09;编程题 (每题 25 分&#xff0c;共 50 分)每月天数长方形面积 答案及解析单选题判断题编程题1编程题2 单选…

会声会影2024软件还包含了视频教学以及模板素材

会声会影2024中文版是一款加拿大公司Corel发布的视频编软件。会声会影2024官方版支持视频合并、剪辑、屏幕录制、光盘制作、添加特效、字幕和配音等功能&#xff0c;用户可以快速上手。会声会影2024软件还包含了视频教学以及模板素材&#xff0c;让用户剪辑视频更加的轻松。 会…

基于springboot+vue篮球联盟管理系统源码

&#x1f345; 简介&#xff1a;500精品计算机源码学习 &#x1f345; 欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 文末获取源码 目录 一、以下学习内容欢迎交流&#xff1a; 二、文档资料截图&#xff1a; 三、项目技术栈 四、项目运行图 背景&#xff1a; 篮球运…

对比分析:黑盒测试 VS 白盒测试

一、引言 在软件开发过程中&#xff0c;测试是确保产品质量的关键环节。其中&#xff0c;黑盒测试和白盒测试是两种常见的测试方法。本文将详细解析这两种测试方法的定义、特点&#xff0c;同时通过具体示例进行对比分析。 二、黑盒测试 黑盒测试&#xff0c;又称功能测试&…

社区分享|简米Ping++基于MeterSphere开展异地测试协作

上海简米网络科技有限公司&#xff08;以下简称为“简米”&#xff09;是国内开放银行服务商&#xff0c;高新技术企业&#xff0c;中国支付清算协会会员单位。自2014年成立至今&#xff0c;简米长年聚焦金融科技领域&#xff0c;通过与银行、清算组织等金融机构合作&#xff0…

java基础进阶之数组排序-可能有你不知道的哦!!

1、使用Arrays类的sort方法 1.1、默认升序 java中Arrays类提供了sort方法来进行快速排序&#xff0c;默认是升序的。 Arrays.sort(数组名) private static void ArrSort1(int[] arr) {Arrays.sort(arr);System.out.println("快速排序-默认升序:"Arrays.toString(arr…

【PyTorch】多项式回归

文章目录 1. 模型与代码实现1.1. 模型1.2. 代码实现1.2.1. 完整代码1.2.2. 输出结果 2. Q&A2.1. 欠拟合与过拟合 1. 模型与代码实现 1.1. 模型 将多项式特征值预处理为线性模型的特征值。即 y w 0 w 1 x w 2 x 2 ⋯ w n x n y w_0w_1xw_2x^2\dotsw_nx^n yw0​w1​…

开关电源超强总结

什么是Power Supply? 开关电源的元件构成 三种基本的非隔离开关电源 三种基本的隔离开关电源 反激变换器&#xff08;Flyback&#xff09;工作原理 &#xff08;电流连续模式&#xff09; 反激变换器&#xff08;Flyback&#xff09;工作原理 &#xff08;电流断续模式&#x…

信息化系列——企业信息化建设(3)

期待已久的对策&#xff0c;马上”出炉“&#xff0c;第一次看的朋友&#xff0c;建议现在主页看看&#xff08;1&#xff09;和&#xff08;2&#xff09;&#xff0c;那咱们就废话少说了&#xff0c;开始今天的正题。 企业信息化建设对策 1、增强企业信息化意识 企业管理者…

【Python】Python读Excel文件生成xml文件

目录 ​前言 正文 1.Python基础学习 2.Python读取Excel表格 2.1安装xlrd模块 2.2使用介绍 2.2.1常用单元格中的数据类型 2.2.2 导入模块 2.2.3打开Excel文件读取数据 2.2.4常用函数 2.2.5代码测试 2.2.6 Python操作Excel官方网址 3.Python创建xml文件 3.1 xml语法…

PACS源码,医学影像传输系统源码,全院级应用,支持放射、超声、内窥镜、病理等影像科室,且具备多种图像处理及三维重建功能

​三维智能PACS系统源码&#xff0c;医学影像采集传输系统源码 PACS系统以大型关系型数据库作为数据和图像的存储管理工具&#xff0c;以医疗影像的采集、传输、存储和诊断为核心&#xff0c;集影像采集传输与存储管理、影像诊断查询与报告管理、综合信息管理等综合应用于一体的…

接口测试:轻松掌握基础知识,快速提升测试技能!

1.client端和server端 开始接口测试之前&#xff0c;首先搞清楚client端与server端是什么&#xff0c;区别。 web前端&#xff0c;顾名思义&#xff0c;指用户可以直观操作和看到的界面&#xff0c;包括web页面的结构&#xff0c;web的外观视觉表现及web层面的交互实…

顶级设计师力荐的界面设计软件,设计新选择

即时设计 作为专业的在线协作UI设计软件&#xff0c;即时设计可以实现视觉效果、交互效果、体验效果一站成型&#xff0c;为你的目标用户创造流畅体验。 轻松绘制原型&#xff1a;借助社区设计资源和原型模板的即时设计&#xff0c;开始敏捷高效的工作。与产品经理分解用户需…