RocketMQ源码阅读-Message消息存储

RocketMQ源码阅读-Message消息存储

  • 1. CommitLog的作用
  • 2. CommitLog 存储消息
  • 3. 时序图
  • 4. 小结

在Broker消息接收一篇中,分析到Broker接收到消息,最终会调用CommitLong#putMessage方法存储消息。
本篇来分析CommitLong#putMessage存储消息的流程。

1. CommitLog的作用

Store all metadata downtime for recovery, data protection reliability

翻译为:存储元数据,提供在停机时恢复和数据保护的能力

CommitLog是针对 MappedFileQueue 的封装使用。

其中有一个属性:

protected final MappedFileQueue mappedFileQueue;

MappedFileQueue是用来存储消息的文件队列,对上层提供可无限使用的文件容量,有一个属性:

private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

是实际存储消息的链表,MappedFile是消息文件实体,每个 MappedFile 统一文件大小。

他们三个的关系是:

他们的比例为:CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N

CommitLog 存储在 MappedFile 有两种内容类型:

  1. MESSAGE :消息。
  2. BLANK :文件不足以存储消息时的空白占位。

2. CommitLog 存储消息

CommitLog存储消息的入口方法为CommitLog#putMessage(…):

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();int queueId = msg.getQueueId();// 事务相关final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic = ScheduleMessageService.SCHEDULE_TOPIC;queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}long eclipseTimeInLock = 0;// 获取写入映射文件MappedFile unlockMappedFile = null;MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();// 获取写入锁putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);// 当不存在映射文件时,进行创建if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);}// 存储消息result = mappedFile.appendMessage(msg, this.appendMessageCallback);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:// 当文件尾时,获取新的映射文件,并进行插入unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);}result = mappedFile.appendMessage(msg, this.appendMessageCallback);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);case UNKNOWN_ERROR:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);default:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);}eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock = 0;} finally {// 释放写入锁putMessageLock.unlock();}if (eclipseTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());// 刷新磁盘handleDiskFlush(result, putMessageResult, msg);// 数据同步,主节点同步到从节点可以看到handleHA(result, putMessageResult, msg);return putMessageResult;
}

可以看到,此方法获取到映射文件,然后将消息写入文件,并刷入磁盘,其中会使用到写入锁进行并发控制。
最后进行数据同步,将主节点的数据同步到从节点。
刷新磁盘的方法为CommitLog#handleDiskFlush:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 进行同步||异步 flush||commit// Synchronization flushif (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()+ " client address: " + messageExt.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}} else {service.wakeup();}}// Asynchronous flushelse {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();} else {commitLogService.wakeup();}}
}

主从数据同步的方法为CommitLog#handleHA:

public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 如果是Master节点,同步到从节点if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {HAService service = this.defaultMessageStore.getHaService();if (messageExt.isWaitStoreMsgOK()) {// Determine whether to waitif (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);service.getWaitNotifyObject().wakeupAll();boolean flushOK =request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}}// Slave problemelse {// Tell the producer, slave not availableputMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);}}}}

对于获取文件和文件刷新落盘先不做深究,后面再深入分析。

3. 时序图

在这里插入图片描述

4. 小结

本篇主要分析了CommitLog的作用以及其进行消息存储的流程:
CommitLog主要是对MappedFileQueue的封装使用,MappedFileQueue是对消息文件的封装。
存储流程主要是:

  • 获取映射文件
  • 获取写锁
  • 写文件
  • 释放写锁
  • 刷新文件到磁盘(分为同步刷新和异步刷新)
  • 如果是Master节点,需要同步数据到从节点

Master同步数据到从节点的过程放在下一篇《RocketMQ源码阅读-Master数据同步从节点》进行分析。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/621789.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自编C++题目——战争

预估难度 困难 题目描述 国与国以河为界&#xff0c;有一天他们两国发生了战争&#xff0c;在边疆的战士开始了厮杀。小明是一个参谋&#xff0c;他也知道两国的兵都能打个人&#xff0c;所以他想让你帮忙安排布置兵&#xff0c;以击杀所有国的兵。 打仗规则 只能打在同一…

鸿蒙应用开发尝鲜:初识HarmonyOS

初识HarmonyOS 来源:华为官方网站 : https://developer.huawei.com/ 相信大家对鸿蒙应用开发也不在陌生,很多身处互联网行业或者不了解的人们现在也一定都听说过华为鸿蒙.这里我将不再说废话,直接步入正题 鸿蒙应用开发语言 HarmonyOS应用开发采用的是ArkTS语言,ArkTS是在Typ…

高校站群内容管理系统开发语言各有优势

站群管理系统开发可以选择多种编程语言&#xff0c;具体选择哪种语言最好需要考虑多个因素&#xff0c;包括开发团队的技术栈、项目需求、性能要求、安全性等。下面列举一些常见的编程语言及其适用场景&#xff1a; PHP&#xff1a;PHP是一种广泛使用的服务器端脚本语言&#…

数据结构:堆和堆排序

数据结构&#xff1a;堆和堆排序 文章目录 数据结构&#xff1a;堆和堆排序1.二叉树的存储结构1.顺序结构2.链式结构 2.堆3.堆的实现4.堆排序&#xff08;选择排序中的一类&#xff09;1. 基本思想2.代码实现 1.二叉树的存储结构 1.顺序结构 顺序结构存储就是使用数组来表示一…

【数据结构 | 直接插入排序】

直接插入排序 基本思路直接插入排序SelectSort 基本思路 扑克牌是我们几乎每个人都可能玩过的游戏最基本的扑克玩法都是—边模牌,— 边理牌。加入我们拿到如图这样的扑克牌&#xff1a; 我们会按照如下理牌&#xff1a; 将3和4移动至5的左侧&#xff0c;再将2移动到最左侧&a…

按键开关机的锂电池充放电解决方案

一、产品概述 TP4562 是一款集成线性充电管理、同步升压转换、电池电量指示和多种保护功能的单芯片电源管理SOC&#xff0c;为锂电池的充放电提供完整的单芯片电源解决方案。 TP4562 内部集成了线性充电管理模块、同步升压放电管理模块、电量检测与 LED 指示模块、保护模块、…

使用Spark操作Hudi表详细教程

Hudi Spark使用 本篇为大家带来通过Spark shell和Spark SQL操作Hudi表的方式。 Hudi表还可以通过Spark ThriftServer操作。 软件准备 Scala 2.12Flink 1.15Spark 3.3Hudi 0.13.1 Hudi编译的时候会遇到依赖下载缓慢的情况。需要换用国内源。修改settings.xml文件&#xff0c…

[NSSCTF Round#16 Basic]RCE但是没有完全RCE

[NSSCTF Round#16 Basic]RCE但是没有完全RCE 第一关 <?php error_reporting(0); highlight_file(__file__); include(level2.php); if (isset($_GET[md5_1]) && isset($_GET[md5_2])) {if ((string)$_GET[md5_1] ! (string)$_GET[md5_2] && md5($_GET[md…

Protecting Intellectual Property of Deep NeuralNetworks with Watermarking

保护深度神经网络的知识产权与数字水印技术 ABSTRACT 深度学习是当今人工智能服务的关键组成部分&#xff0c;在视觉分析、语音识别、自然语言处理等多个任务方面表现出色&#xff0c;为人类提供了接近人类水平的能力。构建一个生产级别的深度学习模型是一项非常复杂的任务&a…

Redis常用连接工具

RedisInsight 官网地址&#xff1a; RedisInsight | The Best Redis GUI Redis Desktop Manager 官网地址&#xff1a; RedisInsight | The Best Redis GUI 样式&#xff1a; QuickRedis 官网地址&#xff1a; QuickOfficial - QuickRedis 样式&#xff1a; AnotherRed…

ssm基于spring和vue开发的web新闻流媒体平台论文

摘 要 如今的时代&#xff0c;是有史以来最好的时代&#xff0c;随着计算机的发展到现在的移动终端的发展&#xff0c;国内目前信息技术已经在世界上遥遥领先&#xff0c;让人们感觉到处于信息大爆炸的社会。信息时代的信息处理肯定不能用之前的手工处理这样的解决方法&#x…

详解SpringCloud微服务技术栈:认识微服务、服务拆分与远程调用

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位大四、研0学生&#xff0c;正在努力准备大四暑假的实习 &#x1f30c;上期文章&#xff1a;首期文章 &#x1f4da;订阅专栏&#xff1a;微服务技术全家桶 希望文章对你们有所帮助 在此之前&#xff0c;耗时半个月&#x…

数据结构之bool类

bool类 bool 是布尔类。它是最简单的一个类&#xff0c;其取值有两种&#xff0c;1和O&#xff0c;即 True 和 False。可以这样简单地理解&#xff0c;除了1和0以及 True 和 False 的情况之外&#xff0c;但凡有值&#xff08;非空&#xff09;即为真&#xff0c;但凡无值&…

linux DHCP和DNS

DHCP dhcp 动态主机配置协议 dhcp工作原理 客户端 ------------------------------------------------->dhcp服务器 客户端会发送dhcp discover广播报文&#xff0c;寻找dhcp服务器 客户端 <-------------------------------------------------dhcp服务器 服务器收…

Java中的包机制、final和super关键字

一、包机制 关于java语言当中的包机制&#xff1a; 1.包又被称为package,java中引入package这种语法机制主要是为了方便程序的管理。 不同功能的类被分门别类放到不同的软件包当中&#xff0c;查找比较方便&#xff0c;管理比较方便&#xff0c;易维护。 2.怎么定义package呢…

第 7 章 排序算法

文章目录 7.1 排序算法的介绍7.3 算法的时间复杂度7.3.1 度量一个程序(算法)执行时间的两种方法7.3.2 时间频度7.3.3 时间复杂度7.3.4 常见的时间复杂度7.3.5 平均时间复杂度和最坏时间复杂度 7.4 算法的空间复杂度简介7.4.1 基本介绍 7.5 冒泡排序7.5.1 基本介绍7.5.2 演示冒泡…

Redis的主从配置,哨兵模式,集群模式

目录 什么是主从复制&#xff1f; 主从复制的作用&#xff1f; 主从复制的流程&#xff1f; 搭建Redis的主从复制 安装Redis 环境准备 修改内核参数 安装Redis 定义systemd服务管理脚本 修改Redis配置文件&#xff08;Master节点操作&#xff09;192.168.17.25 修改Re…

【数据库】聊聊MVCC机制与BufferPool缓存机制

上一篇文章&#xff0c;介绍了隔离级别&#xff0c;MySQL默认是使用可重复读&#xff0c;但是在可重复读的级别下&#xff0c;可能会出现幻读&#xff0c;也就是读取到另一个session添加的数据&#xff0c;那么除了配合使用间隙锁的方式&#xff0c;还使用了MVCC机制解决&#…

DSL查询文档--查询结果处理

排序 elasticsearch默认是根据相关度算分&#xff08;_score&#xff09;来排序&#xff0c;但是也支持自定义方式对搜索结果排序。可以排序字段类型有&#xff1a;keyword类型、数值类型、地理坐标类型、日期类型等。 普通字段排序 keyword、数值、日期类型排序的语法基本一…

SSH远程访问与控制

ssh优点 数据传输是加密的&#xff0c;可以防止信息泄露 数据传输是压缩的&#xff0c;可以提高传输速度 作用 sshd 服务使用 SSH 协议可以用来进行远程控制&#xff0c;或在计算机之间传送文件。 ssh服务端主要包括两个服务功能 ssh远程链接和sftp服务&#xff08;文件传…