rocketmq存储结构_RocketMQ消息存储

存储架构

RMQ存储架构

上图即为RocketMQ的消息存储整体架构,RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog,1G)来存储。

Consume Queue相当于kafka中的partition,是一个逻辑队列,存储了这个Queue在CommiLog中的     起始offset,log大小和MessageTag的hashCode。

每次读取消息队列先读取consumerQueue,然后再通过consumerQueue去commitLog中拿到消息主体。

Kafka存储架构

rocketMQ的设计理念很大程度借鉴了kafka,所以有必要介绍下kafka的存储结构设计:

存储特点: 和RocketMQ类似,每个Topic有多个partition(queue),kafka的每个partition都是一个独立的物理文件,消息直接从里面读写。

根据之前阿里中间件团队的测试,一旦kafka中Topic的partitoin数量过多,队列文件会过多,会给磁盘的IO读写造成很大的压力,造成tps迅速下降。

所以RocketMQ进行了上述这样设计,consumerQueue中只存储很少的数据,消息主体都是通过CommitLog来进行读写。

ps:上一行加粗理解:consumerQueue存储少量数据,即使数量很多,但是数据量不大,文件可以控制得非常小,绝大部分的访问还是Page Cache的访问,而不是磁盘访问。正式部署也可以将CommitLog和consumerQueue放在不同的物理SSD,避免多类文件进行IO竞争。

RMQ存储设计优缺点

优点:

队列轻量化,单个队列数据量非常少。对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致IOWAIT增高。

缺点:

写虽然完全是顺序写,但是读却变成了完全的随机读。

读一条消息,会先读ConsumeQueue,再读CommitLog,增加了开销。

要保证CommitLog与ConsumeQueue完全的一致,增加了编程的复杂度。

缺点克服:

随机读,尽可能让读命中page cache,减少IO读操作,所以内存越大越好。如果系统中堆积的消息过多,读数据要访问磁盘会不会由于随机读导致系统性能急剧下降,答案是否定的。

访问page cache 时,即使只访问1k的消息,系统也会提前预读出更多数据,在下次读时,就可能命中内存。

随机访问Commit Log磁盘数据,系统IO调度算法设置为NOOP方式,会在一定程度上将完全的随机读变成顺序跳跃方式,而顺序跳跃方式读较完全的随机读性能会高5倍以上。

另外4k的消息在完全随机访问情况下,仍然可以达到8K次每秒以上的读性能。

由于Consume Queue存储数据量极少,而且是顺序读,在PAGECACHE预读作用下,Consume Queue的读性能几乎与内存一致,即使堆积情况下。所以可认为Consume Queue完全不会阻碍读性能。

Commit Log中存储了所有的元信息,包含消息体,类似于Mysql、Oracle的redolog,所以只要有Commit Log在,Consume Queue即使数据丢失,仍然可以恢复出来。

RMQ存储底层实现

MappedByteBuffer

RocketMQ中的文件读写主要就是通过MappedByteBuffer进行操作,来进行文件映射。利用了nio中的FileChannel模型,可以直接将物理文件映射到缓冲区,提高读写速度。

这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销。

这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了。

page cache

刚刚提到的缓冲区,也就是之前说到的page cache。

通俗的说:pageCache是系统读写磁盘时为了提高性能将部分文件缓存到内存中,下面是详细解释:

page cache:这里所提及到的page cache,在我看来是linux中vfs虚拟文件系统层的cache层,一般pageCache默认是4K大小,它被操作系统的内存管理模块所管理,文件被映射到内存,一般都是被mmap()函数映射上去的。

mmap()函数会返回一个指针,指向逻辑地址空间中的逻辑地址,逻辑地址通过MMU映射到page cache上。

上图中,整个OS有3.7G的物理内存,用掉了2.7G,应当还剩下1G空闲的内存,但OS给出的却是175M。

因为OS发现系统的物理内存有大量剩余时,为了提高IO的性能,就会使用多余的内存当做文件缓存,也就是图上的buff / cache,广义我们说的Page Cache就是这些内存的子集。

pageCache缺点:

内核把可用的内存分配给Page Cache后,free的内存相对就会变少,如果程序有新的内存分配需求或者缺页中断,恰好free的内存不够,内核还需要花费一点时间将热度低的Page Cache的内存回收掉,对性能非常苛刻的系统会产生毛刺。

RMQ发送、消费逻辑

发送逻辑

发送时,Producer不直接与Consume Queue打交道。上文提到过,RMQ所有的消息都会存放在Commit Log中,为了使消息存储不发生混乱,对Commit Log进行写之前就会上锁。

消息持久被锁串行化后,对Commit Log就是顺序写,也就是常说的Append操作。配合上Page Cache,RMQ在写Commit Log时效率会非常高。

Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据,不停的轮询,将当前的consumeQueue中的offSet和commitLog中的offSet进行对比,将多出来的offSet进行解析,然后put到consumeQueue中的MapedFile中。

ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。而IndexFile(索引文件)则只是为了消息查询提供了一种通过key或时间区间来查询消息的方法(ps:这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程)。

消费逻辑

消费时,Consumer不直接与Commit Log打交道,而是从Consume Queue中去拉取数据。拉取的顺序从旧到新,在文件表示每一个Consume Queue都是顺序读,充分利用了Page Cache。光拉取Consume Queue是没有数据的,里面只有一个对Commit Log的引用,所以再次拉取Commit Log。

但整个RMQ只有一个Commit Log,虽然是随机读,但整体还是有序地读,只要那整块区域还在Page Cache的范围内,还是可以充分利用Page Cache。(dstat命令)

对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Noop”(此时块存储采用SSD的话),随机读的性能也会有所提升。

刷盘方式

同步刷盘

在消息真正落盘后,才返回成功给Producer,只要磁盘没有损坏,消息就不会丢。一般只用于金融场景。

异步刷盘

读写文件充分利用了Page Cache,即写入Page Cache就返回成功给Producer,RMQ中有两种方式进行异步刷盘,整体原理是一样的。

RMQ文件存储模型层

RocketMQ文件存储模型层次结构如上图所示,根据类别和作用从概念模型上大致可以划分为5层,下面将从各个层次分别进行分析和阐述:

(1)RocketMQ业务处理器层

Broker端对消息进行读取和写入的业务逻辑入口,这一层主要包含了业务逻辑相关处理操作(根据解析RemotingCommand中的RequestCode来区分具体的业务操作类型,进而执行不同的业务处理流程),比如前置的检查和校验步骤、构造MessageExtBrokerInner对象、decode反序列化、构造Response返回对象等。

(2)RocketMQ数据存储组件层

该层主要是RocketMQ的存储核心类—DefaultMessageStore,其为RocketMQ消息数据文件的访问入口,通过该类的“putMessage()”和“getMessage()”方法完成对CommitLog消息存储的日志数据文件进行读写操作(具体的读写访问操作还是依赖下一层中CommitLog对象模型提供的方法);另外,在该组件初始化时候,还会启动很多存储相关的后台服务线程,包括AllocateMappedFileService(MappedFile预分配服务线程)、ReputMessageService(回放存储消息服务线程)、HAService(Broker主从同步高可用服务线程)、StoreStatsService(消息存储统计服务线程)、IndexService(索引文件服务线程)等。

(3)RocketMQ存储逻辑对象层

该层主要包含了RocketMQ数据文件存储直接相关的三个模型类IndexFile、ConsumerQueue和CommitLog。IndexFile为索引数据文件提供访问服务,ConsumerQueue为逻辑消息队列提供访问服务,CommitLog则为消息存储的日志数据文件提供访问服务。这三个模型类也是构成了RocketMQ存储层的整体结构(对于这三个模型类的深入分析将放在后续篇幅中)。

(4)封装的文件内存映射层

RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel两种方式完成数据文件的读写。其中,采用MappedByteBuffer这种内存映射磁盘文件的方式完成对大文件的读写,在RocketMQ中将该类封装成MappedFile类。这里限制的问题在上面已经讲过;对于每类大文件(IndexFile/ConsumerQueue/CommitLog),在存储时分隔成多个固定大小的文件(单个IndexFile文件大小约为400M、单个ConsumerQueue文件大小约5.72M、单个CommitLog文件大小为1G),其中每个分隔文件的文件名为前面所有文件的字节大小数+1,即为文件的起始偏移量,从而实现了整个大文件的串联。这里,每一种类的单个文件均由MappedFile类提供读写操作服务(其中,MappedFile类提供了顺序写/随机读、内存数据刷盘、内存清理等和文件相关的服务)。

(5)磁盘存储层

主要指的是部署RocketMQ服务器所用的磁盘。这里,需要考虑不同磁盘类型(如SSD或者普通的HDD)特性以及磁盘的性能参数(如IOPS、吞吐量和访问时延等指标)对顺序写/随机读操作带来的影响。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/467804.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MIPI白皮书

#mipi 5G中优势#5G手机中的mipi#mipi#背景2020年 是5G元年,很多行业因5g重新定义。显示行业也不例外,针对5G,mipi联盟发表了对应的白皮书 描述了现有的MIPI规范如何能够支持第一代5G智能手机以及其他新兴的5G移动平台,包括互联/自…

Python 如何调用 Java

引用了这个文章,请打开链接 http://www.cnblogs.com/junrong624/p/5278457.html 日后待补写 。。。转载于:https://www.cnblogs.com/houzhizhe/p/7456843.html

redis rdb aof区别_Redis 持久化之 RDB 与 AOF 详解

走过路过不要错过点击蓝字关注我们文章出处:https://www.cnblogs.com/jojop/p/13941195.htmlRedis 持久化我们知道Redis的数据是全部存储在内存中的,如果机器突然GG,那么数据就会全部丢失,因此需要有持久化机制来保证数据不会因为…

块设备驱动初探

前言研究IO也很久了,一直无法串联bio和块设备驱动,只知道bio经过IO调度算法传递到块设备驱动,怎么过去的,IO调度算法在哪里发挥作用,一直没有完全搞明白,查看了很多资料,终于对块设备驱动有所理…

Java打war包or打jar包

//一个jar包可以包含多个entry,这样就能实现下面功能1.I/O 读文件流步骤 File filenew File(filePath);InputStreamReader read new InputStreamReader(new FileInputStream(file));BufferedReader bufferedReader new BufferedReader(read);String lineTxt…

Linux 块设备,Block Layer层架构演变

前言Block Layer层在整个I/O中负责承上启下,上接文件系统,下接块驱动。我不想直接讨论代码,希望从一个架构的演变来初探一下Block Layer层。一、1.0版本首先我们来了解几个重要的数据结构1.1 biobio代表了一次I/0请求,代表一个块设…

回溯 皇后 算法笔记_算法笔记-回溯法

(1)0-1背包问题思路:构造一个二叉树,每个商品都有两种状态,要或者不要。如果要就在这个节点的左枝挂子节点,如果不要就在右节点挂子节点。如果全部商品都分配完状态之后就回溯,回溯到一个还有其他选择的节点&#xff0…

Quartz集群

前言 前面说到过项目使用到了Quartz,当项目部署到多节点后,同样的调度任务会被重复执行,这时候就需要用到集群了。 集群配置 quartz.properties # # Configure Main Scheduler Properties # org.quartz.scheduler.instanceName me #ID设置为…

matalotlib(2)

文章目录注释文字Tex公式区域填充极坐标注释 import matplotlib.pyplot as plt import numpy as np xnp.arange(-10,11,1) yx*x plt.plot(x,y)plt.annotate(this is the bottom,xy(0,1),xytext(0,20),arrowpropsdict(facecolorr,frac0.2)) plt.show()文字 import matplotlib…

去华为吗?

昨晚的这条朋友圈很多人给我回复,支持去华为的人很多,但是也有几个反对的,一个说,怕是有命赚钱,没命花钱吧。还有一个说,自己拿到了华为offer,但是拒绝了,去了一个做开关电源的公司做…

plsql存过声明游标_plsql--游标用法

1.游标概念在 PL/SQL 块中执行 SELECT、INSERT、DELETE 和 UPDATE 语句时,ORACLE 会在内存中为其分配上下文区(Context Area),即缓冲区。游标是指向该区的一个指针,或是命名一个工作区(Work Area),或是一种结构化数据类型。它为应…

objectid.go源码阅读

/*http://docs.mongodb.org/manual/reference/object-id/ObjectId 按照字节顺序,一次代表:ObjectId is a 12-byte BSON type, constructed using:4个字节代表1970年元月一日到现在毫秒数 UNIX时间戳a 4-byte value representing the seconds since the …

实例

文章目录函数积分图散点条形图球员能力值函数积分图 import matplotlib.pyplot as plt import numpy as np from matplotlib.pyplot import Polygon def func(x):return -(x-2)*(x-8)40 xnp.linspace(0,10) yfunc(x) axplt.subplot() plt.plot(x,y,r,linewidth2)a2 b9 ax.set_…

闲来无事,拆个示波器玩玩。

首先要解释一下何为混合域示波器,既然说到这个话题就不得不说一下示波器进化史了,接下来大概讲一下示波器进化简史。第一代示波器——模拟示波器(ART-analog real time oscilloscope )纯模拟机器,使用示波管显示X-Y扫描成像显示波形&#xff…

r roc函数_R绘制ROC曲线 | Public Library of Bioinformatics

ROC曲线,受试者工作特征曲线 (receiver operating characteristic curve,简称ROC曲线),又称为感受性曲线(sensitivity curve)。ROC曲线是根据一系列不同的二分类方式(分界值或决定阈),以真阳性率(灵敏度)为纵坐标,假阳…

机器算法1)

SKLEARN Scikit-learn与特征工程 “数据决定了机器学习的上限,而算法只是尽可能逼近这个上限”,这句话很好的阐述了数据在机器学习中的重要性。大部分直接拿过来的数据都是特征不明显的、没有经过处理的或者说是存在很多无用的数据,那么需要…

老罗直播——只要给你一个机会,你就伸双手去接!

昨天,4月1日,罗永浩在抖音上直播卖货。一时间舆论纷纷,有吐槽老罗状况频出的,也有感叹老罗为了挣钱能屈能伸的。总之,有人讨论,有人关注,这个事件已经成功了一大半。老罗与抖音签约费是6000万&a…

segmenter.go

//Go中文分词package segoimport ("bufio""fmt""log""math""os""strconv""strings""unicode""unicode/utf8")const (minTokenFrequency 2 // 仅从字典文件中读取大于等于此频率的…

我在MTK平台下调试音频ALSA

#前言前言我就随便写了,因为是项目的需要,我需要在我们的MTK8167S平台上面调试音频。包括录音和播放。#硬件原理图因为是我们公司的项目,我就不能把完整的原理图给出来。因为两个MIC不涉及机密,跟MTK的公版是一样的。可以给出来大…

java 左边补0_java 数字左补齐0

NumberFormat nf NumberFormat.getInstance();//设置是否使用分组nf.setGroupingUsed(false);//设置最大整数位数nf.setMaximumIntegerDigits(2);nf.setMinimumIntegerDigits(2);//可以左补齐两位数的数字//以下是查询当前天数的所有日期String nowDaygetNowYMD();String curD…