RocketMQ-源码架构二

梳理一些比较完整,比较复杂的业务线

消息持久化设计

RocketMQ的持久化文件结构

消息持久化也就是将内存中的消息写入到本地磁盘的过程。而磁盘IO操作通常是一个很耗性能,很慢的操作,所以,对消息持久化机制的设计,是一个MQ产品提升性能的关键,甚至可以说是最为重要的核心也不为过。接下来梳理RocketMQ是如何在本地磁盘中保存消息的

RocketMQ消息直接采用磁盘文件保存消息,默认路径在${user_home}/store目录。这些存储目录可以在broker.conf中自行指定。

存储文件主要分为三个部分:

  • CommitLog:存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名。

  • ConsumerQueue:存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。

  • IndexFile:为消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程

另外,还有几个辅助的存储文件,主要记录一些描述消息的元数据:

  • checkpoint:数据存盘检查点。里面主要记录commitlog文件、ConsumeQueue文件以及IndexFile文件最后一次刷盘的时间戳。

  • config/*.json:这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset 等等一些信息。

  • abort:这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作。

整体的消息存储结构,官方做了个图进行描述:

Producer发过来的所有消息,不管是属于哪个Topic,Broker都统一存在CommitLog文件当中,然后分别构建ConsumeQueue文件和IndexFile两个索引文件,用来辅助消费者进行消息检索。这种设计最直接的好处是可以较少查找目标文件的时间,让消息以最快的速度落盘。对比Kafka存文件时,需要寻找消息所属的Partition文件,再完成写入。当Topic比较多时,这样的Partition寻址就会浪费非常多的时间。所以Kafka不太适合多Topic的场景。而RocketMQ的这种快速落盘的方式,在多Topic的场景下,优势就比较明显了。

在文件形式上:CommitLog文件的大小是固定的。文件名就是当前CommitLog文件当中存储的第一条消息的Offset。

ConsumeQueue文件主要是加速消费者进行消息索引。每个文件夹对应RocketMQ中的一个MessageQueue,文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量。这样,消费者通过ConsumeQueue文件,就可以快速找到CommitLog文件中感兴趣的消息记录。而消费者在ConsumeQueue文件中的消费进度,会保存在config/consumerOffset.json文件当中。

IndexFile文件主要是辅助消费者进行消息索引。消费者进行消息消费时,通过ConsumeQueue文件就足够完成消息检索了,但是如果消费者指定时间戳进行消费,或者要按照MessageId或者MessageKey来检索文件,比如RocketMQ管理控制台的消息轨迹功能,ConsumeQueue文件就不够用了。IndexFile文件就是用来辅助这类消息检索的。他的文件名比较特殊,不是以消息偏移量命名,而是用的时间命名。但是其实,他也是一个固定大小的文件。

这是对RocketMQ存盘文件最基础的了解,但是只有这样的设计,是不足以支撑RocketMQ的三高性能的。RocketMQ如何保证ConsumeQueue、IndexFile两个索引文件与CommitLog中的消息对齐?如何保证消息断电不丢失?如何保证文件高效的写入磁盘?等等。如果你想要去抓住RocketMQ这些三高问题的核心设计,那么还是需要到源码当中去深究。

commitLog写入

消息存储的入口在: DefaultMessageStore.asyncPutMessage方法

CommitLog的asyncPutMessage方法中会给写入线程加锁,保证一次只会允许一个线程写入。写入消息的过程是串行的,一次只会允许一个线程写入。

最终进入CommitLog中的DefaultAppendMessageCallback#doAppend方法,这里就是Broker写入消息的实际入口。这个方法最终会把消息追加到MappedFile映射的一块内存里,并没有直接写入磁盘。而是在随后调用ComitLog#submitFlushRequest方法,提交刷盘申请。刷盘完成之后,内存中的文件才真正写入到磁盘当中。

在提交刷盘申请之后,就会立即调用CommitLog#submitReplicaRequest方法,发起主从同步申请。

文件同步刷盘与异步刷盘

入口:CommitLog.submitFlushRequest

这里涉及到了对于同步刷盘与异步刷盘的不同处理机制。这里有很多极致提高性能的设计,对于我们理解和设计高并发应用场景有非常大的借鉴意义。

同步刷盘和异步刷盘是通过不同的FlushCommitLogService的子服务实现的。

//org.apache.rocketmq.store.CommitLog的构造方法
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {this.flushCommitLogService = new GroupCommitService();
} else {this.flushCommitLogService = new FlushRealTimeService();
}
​
this.commitLogService = new CommitRealTimeService();

同步刷盘采用的是GroupCommitService子线程。虽然是叫做同步刷盘,但是从源码中能看到,他实际上并不是来一条消息就刷一次盘。而是这个子线程每10毫秒执行一次doCommit方法,扫描文件的缓存。只要缓存当中有消息,就执行一次Flush操作。

而异步刷盘采用的是FlushRealTimeService子线程。这个子线程最终也是执行Flush操作,只不过他的执行时机会根据配置进行灵活调整。所以可以看到,这里异步刷盘和同步刷盘的最本质区别,实际上是进行Flush操作的频率不同。

我们经常说使用RocketMQ的同步刷盘,可以保证Broker断电时,消息不会丢失。但是可以看到,RocketMQ并不可能真正来一条消息就进行一次刷盘,这样在海量数据下,操作系统是承受不了的。而只要不是来一次消息刷一次盘,那么在Broker直接断电的情况接下,就总是会有内存中的消息没有刷入磁盘的情况,这就会造成消息丢失。所以,对于消息安全性的设计,其实是重在取舍,无法做到绝对。

同步刷盘和异步刷盘最终落地到FileChannel的force方法。这个force方法就会最终调用一次操作系统的fsync系统调用,完成文件写入。

//org.apache.rocketmq.store.MappedFile#flush
public int flush(final int flushLeastPages) {if (this.isAbleToFlush(flushLeastPages)) {if (this.hold()) {int value = getReadPosition();
​try {//We only append data to fileChannel or mappedByteBuffer, never both.if (writeBuffer != null || this.fileChannel.position() != 0) {this.fileChannel.force(false);} else {this.mappedByteBuffer.force();}} catch (Throwable e) {log.error("Error occurred when force data to disk.", e);}
​this.flushedPosition.set(value);this.release();} else {log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());this.flushedPosition.set(getReadPosition());}}return this.getFlushedPosition();
}

另外一个CommitRealTimeService这个子线程则是用来写入堆外内存的。应用可以通过配置TransientStorePoolEnable参数开启堆外内存,如果开启了堆外内存,会在启动时申请一个跟CommitLog文件大小一致的堆外内存,这部分内存就可以确保不会被交换到虚拟内存中。而CommitRealTimeService处理消息的方式则只是调用mappedFileQueue的commit方法。这个方法只是往操作系统的PagedCache里写入消息,并不主动进行刷盘操作。会由操作系统通过Dirty Page机制,在某一个时刻进行统一刷盘。例如我们在正常关闭操作系统时,经常会等待很长时间。这里面大部分的时间其实就是在做PageCache的刷盘。

//org.apache.rocketmq.store.MappedFileQueue
public boolean commit(final int commitLeastPages) {boolean result = true;MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);if (mappedFile != null) {int offset = mappedFile.commit(commitLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.committedWhere;this.committedWhere = where;}
​return result;
}

在梳理同步刷盘与异步刷盘的具体实现时,可以看到一个小点,RocketMQ是如何让两个刷盘服务间隔执行的?RocketMQ提供了一个自己实现的CountDownLatch2工具类来提供线程阻塞功能,使用CAS驱动CountDownLatch2的countDown操作。每来一个消息就启动一次CAS,成功后,调用一次countDown。而这个CountDonwLatch2在Java.util.concurrent.CountDownLatch的基础上,实现了reset功能,这样可以进行对象重用

CommigLog主从复制

入口:CommitLog.submitReplicaRequest

主从同步时,也体现到了RocketMQ对于性能的极致追求。最为明显的,RocketMQ整体是基于Netty实现的网络请求,而在主从复制这一块,却放弃了Netty框架,转而使用更轻量级的Java的NIO来构建。

在主要的HAService中,会在启动过程中启动三个守护进程。

//HAService#start
public void start() throws Exception {this.acceptSocketService.beginAccept();this.acceptSocketService.start();this.groupTransferService.start();this.haClient.start();
}

这其中与Master相关的是acceptSocketService和groupTransferService。其中acceptSocketService主要负责维护Master与Slave之间的TCP连接。groupTransferService主要与主从同步复制有关。而slave相关的则是haClient。

至于其中关于主从的同步复制与异步复制的实现流程,还是比较复杂的,有兴趣的同学可以深入去研究一下。

推荐一篇可供参考的博客 RocketMQ源码分析之主从数据复制-CSDN博客

分发ConsumeQueue和IndexFile

当CommitLog写入一条消息后,在DefaultMessageStore的start方法中,会启动一个后台线程reputMessageService。源码就定义在DefaultMessageStore中。这个后台线程每隔1毫秒就会去拉取CommitLog中最新更新的一批消息。如果发现CommitLog中有新的消息写入,就会触发一次doDispatch。

//org.apache.rocketmq.store.DefaultMessageStore中的ReputMessageService线程类
public void doDispatch(DispatchRequest req) {for (CommitLogDispatcher dispatcher : this.dispatcherList) {dispatcher.dispatch(req);}
}

dispatchList中包含两个关键的实现类CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex。源码就定义在DefaultMessageStore中。他们分别用来构建ConsumeQueue索引和IndexFile索引。

并且,如果服务异常宕机,会造成CommitLog和ConsumeQueue、IndexFile文件不一致,有消息写入CommitLog后,没有分发到索引文件,这样消息就丢失了。DefaultMappedStore的load方法提供了恢复索引文件的方法,入口在load方法。

过期文件删除机制

入口: DefaultMessageStore.addScheduleTask -> DefaultMessageStore.this.cleanFilesPeriodically()

在这个方法中会启动两个线程,cleanCommitLogService用来删除过期的CommitLog文件,cleanConsumeQueueService用来删除过期的ConsumeQueue和IndexFile文件。

在删除CommitLog文件时,Broker会启动后台线程,每60秒,检查CommitLog、ConsumeQueue文件。然后对超过72小时的数据进行删除。也就是说,默认情况下, RocketMQ只会保存3天内的数据。这个时间可以通过fileReservedTime来配置。

触发过期文件删除时,有两个检查的纬度,一个是,是否到了触发删除的时间,也就是broker.conf里配置的deleteWhen属性。另外还会检查磁盘利用率,达到阈值也会触发过期文件删除。这个阈值默认是72%,可以在broker.conf文件当中定制。但是最大值为95,最小值为10。

然后在删除ConsumeQueue和IndexFile文件时,会去检查CommitLog当前的最小Offset,然后在删除时进行对齐。

需要注意的是,RocketMQ在删除过期CommitLog文件时,并不检查消息是否被消费过。 所以如果有消息长期没有被消费,是有可能直接被删除掉,造成消息丢失的。

RocketMQ整个文件管理的核心入口在DefaultMessageStore的start方法中,整体流程总结如下:

文件索引结构

了解了大部分的文件写入机制之后,最后我们来理解一下RocketMQ的索引构建方式。

1、CommitLog文件的大小是固定的,但是其中存储的每个消息单元长度是不固定的,具体格式可以参考org.apache.rocketmq.store.CommitLog中计算消息长度的方法

protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;final int msgLen = 4 //TOTALSIZE+ 4 //MAGICCODE+ 4 //BODYCRC+ 4 //QUEUEID+ 4 //FLAG+ 8 //QUEUEOFFSET+ 8 //PHYSICALOFFSET+ 4 //SYSFLAG+ 8 //BORNTIMESTAMP+ bornhostLength //BORNHOST+ 8 //STORETIMESTAMP+ storehostAddressLength //STOREHOSTADDRESS+ 4 //RECONSUMETIMES+ 8 //Prepared Transaction Offset+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY+ 1 + topicLength //TOPIC+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength+ 0;return msgLen;
}

因为消息的记录大小不固定,所以RocketMQ在每次存CommitLog文件时,都会去检查当前CommitLog文件空间是否足够,如果不够的话,就重新创建一个CommitLog文件。文件名为当前消息的偏移量。

2、ConsumeQueue文件主要是加速消费者的消息索引。他的每个文件夹对应RocketMQ中的一个MessageQueue,文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量。这样,消费者通过ComsumeQueue文件,就可以快速找到CommitLog文件中感兴趣的消息记录。而消费者在ConsumeQueue文件当中的消费进度,会保存在config/consumerOffset.json文件当中。

文件结构: 每个ConsumeQueue文件固定由30万个固定大小20byte的数据块组成,数据块的内容包括:msgPhyOffset(8byte,消息在文件中的起始位置)+msgSize(4byte,消息在文件中占用的长度)+msgTagCode(8byte,消息的tag的Hash值)。

msgTag是和消息索引放在一起的,所以消费者根据Tag过滤消息的性能是非常高的。

在ConsumeQueue.java当中有一个常量CQ_STORE_UNIT_SIZE=20,这个常量就表示一个数据块的大小。

private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) {
​if (offset + size <= this.maxPhysicOffset) {log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);return true;}
​this.byteBufferIndex.flip();//在ConsumeQueue.java当中构建一条ConsumeQueue索引的方法中,记录一个单元块的数据this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this.byteBufferIndex.putLong(offset);this.byteBufferIndex.putInt(size);this.byteBufferIndex.putLong(tagsCode);
​final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;... ...
}

3、IndexFile文件主要是辅助消息检索。他的作用主要是用来支持根据key和timestamp检索消息。他的文件名比较特殊,不是以消息偏移量命名,而是用的时间命名。但是其实,他也是一个固定大小的文件。

文件结构: 他的文件结构由 indexHeader(固定40byte)+ slot(固定500W个,每个固定20byte) + index(最多500W*4个,每个固定20byte) 三个部分组成。

indexFile的详细结构有大厂之前面试过,参考博文: RocketMQ之底层IndexFile存储协议_rocketmq index_roykingw的博客-CSDN博客

延迟消息机制

关注重点

延迟消息是RocketMQ非常有特色的一个功能,其他MQ产品中,往往需要开发者使用一些特殊方法来变相实现延迟消息功能。而RocketMQ直接在产品中实现了这个功能,开发者只需要设定一个属性就可以快速实现。

延迟消息的核心使用方法就是在Message中设定一个MessageDelayLevel参数,对应18个延迟级别。然后Broker中会创建一个默认的Schedule_Topic主题,这个主题下有18个队列,对应18个延迟级别。消息发过来之后,会先把消息存入Schedule_Topic主题中对应的队列。然后等延迟时间到了,再转发到目标队列,推送给消费者进行消费。

源码重点

延迟消息的处理入口在scheduleMessageService这个组件中。 会在broker启动时也一起加载。

1、消息写入到系统内置的Topic中

代码见CommitLog.putMessage方法。

在CommitLog写入消息时,会判断消息的延迟级别,然后修改Message的Topic和Queue,将消息转储到系统内部的Topic中,这样消息就对消费者不可见了。而原始的目标信息,会作为消息的属性,保存到消息当中。

//should be consistent with the old version
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery// 延迟消息转到系统Topicif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}
​
​String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
​// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));// 修改消息的Topic和Queue,转储到系统的Topic中msg.setTopic(topic);msg.setQueueId(queueId);}
}

2、消息转储到目标Topic

接下来就是需要过一点时间,再将消息转回到Producer提交的Topic和Queue中,这样就可以正常往消费者推送了。

这个转储的核心服务是scheduleMessageService,他也是Broker启动过程中的一个功能组件。随DefaultMessageStore组件一起构建。这个服务只在master节点上启动,而在slave节点上会主动关闭这个服务。

//org.apache.rocketmq.store.DefaultMessageStore
@Override
public void handleScheduleMessageService(final BrokerRole brokerRole) {if (this.scheduleMessageService != null) {if (brokerRole == BrokerRole.SLAVE) {this.scheduleMessageService.shutdown();} else {this.scheduleMessageService.start();}}
}

由于RocketMQ的主从节点支持切换,所以就需要考虑这个服务的幂等性。在节点切换为slave时就要关闭服务,切换为master时就要启动服务。并且,即便节点多次切换为master,服务也只启动一次。所以在ScheduleMessageService的start方法中,就通过一个CAS操作来保证服务的启动状态。

if (started.compareAndSet(false, true)) {

这个CAS操作还保证了在后面,同一时间只有一个DeliverDelayedMessageTimerTask执行。这种方式,给整个延迟消息服务提供了一个基础保证。

ScheduleMessageService会每隔1秒钟执行一个executeOnTimeup任务,将消息从延迟队列中写入正常Topic中。 代码见ScheduleMessageService中的DeliverDelayedMessageTimerTask.executeOnTimeup方法。

在executeOnTimeup方法中,就会去扫描SCHEDULE_TOPIC_XXXX这个Topic下的所有messageQueue,然后扫描这些MessageQueue对应的ConsumeQueue文件,找到没有处理过的消息,计算他们的延迟时间。如果延迟时间没有到,就等下一秒再重新扫描。如果延迟时间到了,就进行消息转储。将消息转回到原来的目标Topic下。

整个延迟消息的实现方式:

ScheduleMessageService中扫描延迟消息的主要逻辑:

//ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup
public void executeOnTimeup() {//找到延迟队列对应的ConsumeQueue文件ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));
​if (cq == null) {this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);return;}//通过计算,找到这一次扫描需要处理的的ConsumeQueue文件SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);... ...long nextOffset = this.offset;try {int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();//循环过滤ConsumeQueue文件当中的每一条消息索引for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {//解析每一条ConsumeQueue记录long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();
​... ...//计算延迟时间long now = System.currentTimeMillis();long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);//延迟时间没到就等下一次扫描long countdown = deliverTimestamp - now;if (countdown > 0) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}
​... ...//时间到了就进行转储boolean deliverSuc;if (ScheduleMessageService.this.enableAsyncDeliver) {deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);} else {deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);}
​if (!deliverSuc) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}}//计算下一次扫描时的Offset起点nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);} catch (Exception e) {log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);} finally {bufferCQ.release();}//部署下一次扫描任务this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}

如果清楚了ConsumeQueue文件的结构,就可以很清晰的感受到RocketMQ其实就是在Broker端,像一个普通消费者一样去进行消费,然后扩展出了延迟消息的整个扩展功能。而这,其实也是很多互联网大厂对RocketMQ进行自定义功能扩展的很好的参考。

长轮询机制

功能回顾

RocketMQ对消息消费者提供了Push推模式和Pull拉模式两种消费模式。但是这两种消费模式的本质其实都是Pull拉模式,Push模式可以认为是一种定时的Pull机制。但是这时有一个问题,当使用Push模式时,如果RocketMQ中没有对应的数据,那难道一直进行空轮询吗?如果是这样的话,那显然会极大的浪费网络带宽以及服务器的性能,并且,当有新的消息进来时,RocketMQ也没有办法尽快通知客户端,而只能等客户端下一次来拉取消息了。针对这个问题,RocketMQ实现了一种长轮询机制 long polling。

长轮询机制简单来说,就是当Broker接收到Consumer的Pull请求时,判断如果没有对应的消息,不用直接给Consumer响应(给响应也是个空的,没意义),而是就将这个Pull请求给缓存起来。当Producer发送消息过来时,增加一个步骤去检查是否有对应的已缓存的Pull请求,如果有,就及时将请求从缓存中拉取出来,并将消息通知给Consumer。

源码重点

Consumer请求缓存,代码入口PullMessageProcessor#processRequest方法

PullRequestHoldService服务会随着BrokerController一起启动。

生产者线:从DefaultMessageStore.doReput进入

整个流程以及源码重点:

关于零拷贝与顺序写

刷盘机制保证消息不丢失

在操作系统层面,当应用程序写入一个文件时,文件内容并不会直接写入到硬件当中,而是会先写入到操作系统中的一个缓存PageCache中。PageCache缓存以4K大小为单位,缓存文件的具体内容。这些写入到PageCache中的文件,在应用程序看来,是已经完全落盘保存好了的,可以正常修改、复制等等。但是,本质上,PageCache依然是内存状态,所以一断电就会丢失。因此,需要将内存状态的数据写入到磁盘当中,这样数据才能真正完成持久化,断电也不会丢失。这个过程就称为刷盘。

Java当中使用FileOutputStream类或者BufferedWriter类,进行write操作,就是写入的Pagecache。

RocketMQ中通过fileChannel.commit方法写入消息,也是写入到Pagecache。

PageCache是源源不断产生的,而Linux操作系统显然不可能时时刻刻往硬盘写文件。所以,操作系统只会在某些特定的时刻将PageCache写入到磁盘。例如当我们正常关机时,就会完成PageCache刷盘。另外,在Linux中,对于有数据修改的PageCache,会标记为Dirty(脏页)状态。当Dirty Page的比例达到一定的阈值时,就会触发一次刷盘操作。例如在Linux操作系统中,可以通过/proc/meminfo文件查看到Page Cache的状态。

[root@192-168-65-174 ~]# cat /proc/meminfo 
MemTotal:       16266172 kB
.....
Cached:           923724 kB
.....
Dirty:                32 kB
Writeback:             0 kB
.....
Mapped:           133032 kB
.....

但是,只要操作系统的刷盘操作不是时时刻刻执行的,那么对于用户态的应用程序来说,那就避免不了非正常宕机时的数据丢失问题。因此,操作系统也提供了一个系统调用,应用程序可以自行调用这个系统调用,完成PageCache的强制刷盘。在Linux中是fsync,同样我们可以用man 2 fsync 指令查看。

RocketMQ对于何时进行刷盘,也设计了两种刷盘机制,同步刷盘和异步刷盘。只需要在broker.conf中进行配置就行。

零拷贝加速文件读写

零拷贝(zero-copy)是操作系统层面提供的一种加速文件读写的操作机制,非常多的开源软件都在大量使用零拷贝,来提升IO操作的性能。对于Java应用层,对应着mmap和sendFile两种方式。

理解CPU拷贝和DMA拷贝

操作系统对于内存空间,是分为用户态和内核态的。用户态的应用程序无法直接操作硬件,需要通过内核空间进行操作转换,才能真正操作硬件。这其实是为了保护操作系统的安全。正因为如此,应用程序需要与网卡、磁盘等硬件进行数据交互时,就需要在用户态和内核态之间来回的复制数据。而这些操作,原本都是需要由CPU来进行任务的分配、调度等管理步骤的,早先这些IO接口都是由CPU独立负责,所以当发生大规模的数据读写操作时,CPU的占用率会非常高。

之后,操作系统为了避免CPU完全被各种IO调用给占用,引入了DMA(直接存储器存储)。由DMA来负责这些频繁的IO操作。DMA是一套独立的指令集,不会占用CPU的计算资源。这样,CPU就不需要参与具体的数据复制的工作,只需要管理DMA的权限即可。

DMA拷贝极大的释放了CPU的性能,因此他的拷贝速度会比CPU拷贝要快很多。但是,其实DMA拷贝本身,也在不断优化。

引入DMA拷贝之后,在读写请求的过程中,CPU不再需要参与具体的工作,DMA可以独立完成数据在系统内部的复制。但是,数据复制过程中,依然需要借助数据总进线。当系统内的IO操作过多时,还是会占用过多的数据总线,造成总线冲突,最终还是会影响数据读写性能。

为了避免DMA总线冲突对性能的影响,后来又引入了Channel通道的方式。Channel,是一个完全独立的处理器,专门负责IO操作。既然是处理器,Channel就有自己的IO指令,与CPU无关,他也更适合大型的IO操作,性能更高。

这也解释了,为什么Java应用层与零拷贝相关的操作都是通过Channel的子类实现的。这其实是借鉴了操作系统中的概念。

而所谓的零拷贝技术,其实并不是不拷贝,而是要尽量减少CPU拷贝。

再来理解下mmap文件映射机制是怎么回事

mmap机制的具体实现参见配套示例代码。主要是通过java.nio.channels.FileChannel的map方法完成映射。

以一次文件的读写操作为例,应用程序对磁盘文件的读与写,都需要经过内核态与用户态之间的状态切换,每次状态切换的过程中,就需要有大量的数据复制。

在这个过程中,总共需要进行四次数据拷贝。而磁盘与内核态之间的数据拷贝,在操作系统层面已经由CPU拷贝优化成了DMA拷贝。而内核态与用户态之间的拷贝依然是CPU拷贝。所以,在这个场景下,零拷贝技术优化的重点,就是内核态与用户态之间的这两次拷贝。

而mmap文件映射的方式,就是在用户态不再保存文件的内容,而只保存文件的映射,包括文件的内存起始地址,文件大小等。真实的数据,也不需要在用户态留存,可以直接通过操作映射,在内核态完成数据复制。

这个拷贝过程都是在操作系统的系统调用层面完成的,在Java应用层,其实是无法直接观测到的,但是我们可以去JDK源码当中进行间接验证。在JDK的NIO包中,java.nio.HeapByteBuffer映射的就是JVM的一块堆内内存,在HeapByteBuffer中,会由一个byte数组来缓存数据内容,所有的读写操作也是先操作这个byte数组。这其实就是没有使用零拷贝的普通文件读写机制。

HeapByteBuffer(int cap, int lim) {            // package-privatesuper(-1, 0, lim, cap, new byte[cap], 0);/*hb = new byte[cap];offset = 0;*/
}

而NIO把包中的另一个实现类java.nio.DirectByteBuffer则映射的是一块堆外内存。在DirectByteBuffer中,并没有一个数据结构来保存数据内容,只保存了一个内存地址。所有对数据的读写操作,都通过unsafe魔法类直接交由内核完成,这其实就是mmap的读写机制。

最后,这种mmap的映射机制由于还是需要用户态保存文件的映射信息,数据复制的过程也需要用户态的参与,这其中的变数还是非常多的。所以,mmap机制适合操作小文件,如果文件太大,映射信息也会过大,容易造成很多问题。通常mmap机制建议的映射文件大小不要超过2G 。而RocketMQ的CommitLog文件保持在1G固定大小,也是为了方便文件映射。

梳理下sendFile机制是怎么运行的

sendFile机制的具体实现参见配套示例代码。主要是通过java.nio.channels.FileChannel的transferTo方法完成

sourceReadChannel.transferTo(0,sourceFile.length(),targetWriteChannel);

早期的sendfile实现机制其实还是依靠CPU进行页缓存与socket缓存区之间的数据拷贝。但是,在后期的不断改进过程中,sendfile优化了实现机制,在拷贝过程中,并不直接拷贝文件的内容,而是只拷贝一个带有文件位置和长度等信息的文件描述符FD,这样就大大减少了需要传递的数据。而真实的数据内容,会交由DMA控制器,从页缓存中打包异步发送到socket中。

最后,sendfile机制在内核态直接完成了数据的复制,不需要用户态的参与,所以这种机制的传输效率是非常稳定的。sendfile机制非常适合大数据的复制转移。

顺序写加速文件写入磁盘

通常应用程序往磁盘写文件时,由于磁盘空间不是连续的,会有很多碎片。所以我们去写一个文件时,也就无法把一个文件写在一块连续的磁盘空间中,而需要在磁盘多个扇区之间进行大量的随机写。这个过程中有大量的寻址操作,会严重影响写数据的性能。而顺序写机制是在磁盘中提前申请一块连续的磁盘空间,每次写数据时,就可以避免这些寻址操作,直接在之前写入的地址后面接着写就行。

Kafka官方详细分析过顺序写的性能提升问题。Kafka官方曾说明,顺序写的性能基本能够达到内存级别。而如果配备固态硬盘,顺序写的性能甚至有可能超过写内存。而RocketMQ很大程度上借鉴了Kafka的这种思想。

例如可以看下org.apache.rocketmq.store.CommitLog#DefaultAppendMessageCallback中的doAppend方法。在这个方法中,会以追加的方式将消息先写入到一个堆外内存byteBuffer中,然后再通过fileChannel写入到磁盘。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/211304.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

蒙特霍尔问题(选择三扇门后的车与羊)及其贝叶斯定理数学解释

1. 蒙特霍尔问题 有一个美国电视游戏节目叫做“Let’s Make a Deal”&#xff0c;游戏中参赛者将面对3扇关闭的门&#xff0c;其中一扇门背后有一辆汽车&#xff0c;另外两扇门后是山羊&#xff0c;参赛者如果能猜中哪一扇门后是汽车&#xff0c;就可以得到它。 通常&#xf…

javaee实验:文件上传及拦截器的使用

目录 文件上传ModelAttribute注解实验目的实验内容实验过程项目结构编写代码结果展示 文件上传 Spring MVC 提供 MultipartFile 接口作为参数来处理文件上传。 MultipartFile 提供以下方法来获取上传的文件信息&#xff1a;  getOriginalFilename 获取上传的文件名字&#x…

飞天使-linux操作的一些技巧与知识点2

TCP 的三次握手 第一次&#xff0c;客户端与服务端建立链接&#xff0c;需要发送请求连接的消息 第二次&#xff0c;服务端接口到数据后&#xff0c;返回一个确认的操作*&#xff08;至此客户端和服务端链路建立成功&#xff09; 第三次&#xff0c;服务端还需要发送要与客户端…

【Linux】探索Linux进程状态 | 僵尸进程 | 孤儿进程

最近&#xff0c;我发现了一个超级强大的人工智能学习网站。它以通俗易懂的方式呈现复杂的概念&#xff0c;而且内容风趣幽默。我觉得它对大家可能会有所帮助&#xff0c;所以我在此分享。点击这里跳转到网站。 目录 一、进程状态1.1运行状态1.2阻塞状态1.3挂起状态 二、具体L…

React中使用react-json-view展示JSON数据

文章目录 一、前言1.1、在线demo1.2、Github仓库 二、实践2.1、安装react-json-view2.2、组件封装2.3、效果2.4、参数详解2.4.1、src(必须) &#xff1a;JSON Object2.4.2、name&#xff1a;string或false2.4.3、theme&#xff1a;string2.4.4、style&#xff1a;object2.4.5、…

[ROS2] --- service

1 service介绍 1.1 service概念 话题通信是基于订阅/发布机制的&#xff0c;无论有没有订阅者&#xff0c;发布者都会周期发布数据&#xff0c;这种模式适合持续数据的收发&#xff0c;比如传感器数据。机器人系统中还有另外一些配置性质的数据&#xff0c;并不需要周期处理&…

C#,图算法——以邻接节点表示的图最短路径的迪杰斯特拉(Dijkstra)算法C#程序

1 文本格式 using System; using System.Text; using System.Linq; using System.Collections; using System.Collections.Generic; namespace Legalsoft.Truffer.Algorithm { public class Node // : IComparable<Node> { private int vertex, weigh…

【分布式微服务专题】从单体到分布式(一、SpringCloud项目初步升级)

目录 前言阅读对象阅读导航前置知识笔记正文一、单体服务介绍二、服务拆分三、分布式微服务升级前的思考3.1 关于SpringBoot/SpringCloud的思考【有点门槛】 四、SpringCloud升级整合4.1 新建父子项目 学习总结感谢 前言 从本节课开始&#xff0c;我将自己手写一个基于SpringC…

如何轻松恢复 Windows 中删除的文件夹

我们都曾经历过这样的事&#xff0c;而且我们中的大多数人可能很快就会再次这样做。我们讨论的是在 Windows 中按“Delete”或“ShiftDelete”键意外删除重要文件夹的情况。 如果您刚刚按下删除键且未超过 30 天&#xff0c;或者尚未清空回收站&#xff0c;则可以恢复文件夹。…

操作系统学习笔记---内存管理

目录 概念 功能 内存空间的分配和回收 地址转换 逻辑地址&#xff08;相对地址&#xff09; 物理地址&#xff08;绝对地址&#xff09; 内存空间的扩充 内存共享 存储保护 方式 源程序变为可执行程序步骤 链接方式 装入方式 覆盖 交换 连续分配管理方式 单一连…

python安装与工具PyCharm

摘要&#xff1a; 周末闲来无事学习一下python&#xff01;不是你菜鸡&#xff0c;只不过是对手太强了&#xff01;所以你要不断努力&#xff0c;去追求更高的未来&#xff01;下面先了解python与环境的安装与工具的配置&#xff01; python安装&#xff1a; 官网 进入官网下载…

git 关于分支、merge、commit提交

最近开始用git终端提交代码&#xff0c;梳理了一些知识点 一 关于分支 关于分支&#xff0c;git的分支分为本地分支远程分支两种分支&#xff0c;在上传代码时&#xff0c;我们要确保当前本地分支连接了一个远程分支。 我们可以通过下面代码查看当前的本地分支&#xff1a; g…

迅为3588开发板 sudo: 无法解析主机:/DNS配置

环境申明 RK3588 ubuntu 22.04 jammy 迅为开发板 hostname 看是否有Host .&#xff0c;如果没有&#xff0c; sudo vim /etc/hostname在里面加一行&#xff0c;我这就这一个 iTOP-RK3588hosts 修改本地hosts sudo vim /etc/hosts127.0.0.1 localhost localhost iTOP-RK3…

2.postman环境变量及接口关联

一、环境变量以及全局变量 操作流程 1.点击environment 2.点击environment右侧号&#xff0c;新增环境变量 3.在变量中输入变量名以及变量值 4.回到collection页面&#xff0c;修改变量环境 5.在collection中通过{{变量名}}调用变量 变量定义 环境变量&#xff1a;环境变量…

vue 限制在指定容器内可拖拽的div

<template><div class"container" id"container"><div class"drag-box center" v-drag v-if"isShowDrag"><div>无法拖拽出容器的div浮窗</div></div></div> </template><script&g…

P11 Linux进程编程exec族函数

前言 &#x1f3ac; 个人主页&#xff1a;ChenPi &#x1f43b;推荐专栏1: 《Linux C应用编程&#xff08;概念类&#xff09;_ChenPi的博客-CSDN博客》✨✨✨ &#x1f525; 推荐专栏2: 《C_ChenPi的博客-CSDN博客》✨✨✨ &#x1f6f8;推荐专栏3: ​​​​​​《链表_C…

Java 简易版 UDP 多人聊天室

服务端 import java.io.*; import java.net.*; import java.util.ArrayList; public class Server{public static ServerSocket server_socket;public static ArrayList<Socket> socketListnew ArrayList<Socket>(); public static void main(String []args){try{…

算法通关村第五关—LRU的设计与实现(黄金)

LRU的设计与实现 一、理解LRU的原理 LeetCode146:运用你所掌握的数据结构&#xff0c;设计和实现一个LRU(最近最少使用)缓存机制 实现LRUCache类&#xff1a; LRUCache(int capacity) 以正整数作为容量capacity初始化 LRU 缓存 int get(int key) 如果关键字key存在于缓存中&a…

数据可视化|jupyter notebook运行pyecharts,无法正常显示“可视化图形”,怎么解决?

前言 本文是该专栏的第39篇,后面会持续分享python数据分析的干货知识,记得关注。 相信有些同学在本地使用jupyter notebook运行pyecharts的时候,在代码没有任何异常的情况下,无论是html还是notebook区域,都无法显示“可视化图形”,界面区域只有空白一片。遇到这种情况,…

Nginx服务优化以及防盗链

1. 隐藏版本号 以在 CentOS 中使用命令 curl -I http://192.168.66.10 显示响应报文首部信息。 查看版本号 curl -I http://192.168.66.10 1. 修改配置文件 vim /usr/local/nginx/conf/nginx.conf http {include mime.types;default_type application/octet-stream;…