# 消息中间件 RocketMQ 高级功能和源码分析(七)

消息中间件 RocketMQ 高级功能和源码分析(七)

一、 消息中间件 RocketMQ 源码分析:消息存储核心类介绍

1、消息存储在 store 模块中。消息存储核心类 DefaultMessageStore.java

在这里插入图片描述

2、消息存储核心类介绍


private final MessageStoreConfig messageStoreConfig;	//消息配置属性
private final CommitLog commitLog;		//CommitLog文件存储的实现类
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;	//消息队列存储缓存表,按照消息主题分组
private final FlushConsumeQueueService flushConsumeQueueService;	//消息队列文件刷盘线程
private final CleanCommitLogService cleanCommitLogService;	//清除CommitLog文件服务
private final CleanConsumeQueueService cleanConsumeQueueService;	//清除ConsumerQueue队列文件服务
private final IndexService indexService;	//索引实现类
private final AllocateMappedFileService allocateMappedFileService;	//MappedFile分配服务
private final ReputMessageService reputMessageService;//CommitLog消息分发,根据CommitLog文件构建ConsumerQueue、IndexFile文件
private final HAService haService;	//存储HA机制
private final ScheduleMessageService scheduleMessageService;	//消息服务调度线程
private final StoreStatsService storeStatsService;	//消息存储服务
private final TransientStorePool transientStorePool;	//消息堆外内存缓存
private final BrokerStatsManager brokerStatsManager;	//Broker状态管理器
private final MessageArrivingListener messageArrivingListener;	//消息拉取长轮询模式消息达到监听器
private final BrokerConfig brokerConfig;	//Broker配置类
private StoreCheckpoint storeCheckpoint;	//文件刷盘监测点
private final LinkedList<CommitLogDispatcher> dispatcherList;	//CommitLog文件转发请求

二、 消息中间件 RocketMQ 源码分析:消息存储流程

1、消息存储流程 示例图:

在这里插入图片描述

2、 消息存储入口:DefaultMessageStore#putMessage

//判断Broker角色如果是从节点,则无需写入
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("message store is slave mode, so putMessage is forbidden ");}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}//判断当前写入状态如果是正在写入,则不能继续
if (!this.runningFlags.isWriteable()) {long value = this.printTimes.getAndIncrement();return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {this.printTimes.set(0);
}
//判断消息主题长度是否超过最大限制
if (msg.getTopic().length() > Byte.MAX_VALUE) {log.warn("putMessage message topic length too long " + msg.getTopic().length());return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
//判断消息属性长度是否超过限制
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
//判断系统PageCache缓存去是否占用
if (this.isOSPageCacheBusy()) {return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}//将消息写入CommitLog文件
PutMessageResult result = this.commitLog.putMessage(msg);

3、 代码:CommitLog#putMessage

//记录消息存储时间
msg.setStoreTimestamp(beginLockTimestamp);//判断如果mappedFile如果为空或者已满,创建新的mappedFile文件
if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
}
//如果创建失败,直接返回
if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}//写入消息到mappedFile中
result = mappedFile.appendMessage(msg, this.appendMessageCallback);

4、 代码:MappedFile#appendMessagesInner

//获得文件的写入指针
int currentPos = this.wrotePosition.get();//如果指针大于文件大小则直接返回
if (currentPos < this.fileSize) {//通过writeBuffer.slice()创建一个与MappedFile共享的内存区,并设置position为当前指针ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result = null;if (messageExt instanceof MessageExtBrokerInner) {//通过回调方法写入result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);} else if (messageExt instanceof MessageExtBatch) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}this.wrotePosition.addAndGet(result.getWroteBytes());this.storeTimestamp = result.getStoreTimestamp();return result;
}

5、 代码:CommitLog#doAppend

//文件写入位置
long wroteOffset = fileFromOffset + byteBuffer.position();
//设置消息ID
this.resetByteBuffer(hostHolder, 8);
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);//获得该消息在消息队列中的偏移量
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {queueOffset = 0L;CommitLog.this.topicQueueTable.put(key, queueOffset);
}//获得消息属性长度
final byte[] propertiesData =msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;if (propertiesLength > Short.MAX_VALUE) {log.warn("putMessage message properties length too long. length={}", propertiesData.length);return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}//获得消息主题大小
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;//获得消息体大小
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
//计算消息总长度
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

6、 代码:CommitLog#calMsgLength

protected static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {final int msgLen = 4 //TOTALSIZE+ 4 //MAGICCODE  + 4 //BODYCRC+ 4 //QUEUEID+ 4 //FLAG+ 8 //QUEUEOFFSET+ 8 //PHYSICALOFFSET+ 4 //SYSFLAG+ 8 //BORNTIMESTAMP+ 8 //BORNHOST+ 8 //STORETIMESTAMP+ 8 //STOREHOSTADDRESS+ 4 //RECONSUMETIMES+ 8 //Prepared Transaction Offset+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY+ 1 + topicLength //TOPIC+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength+ 0;return msgLen;
}

7、 代码:CommitLog#doAppend

//消息长度不能超过4M
if (msgLen > this.maxMessageSize) {CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength+ ", maxMessageSize: " + this.maxMessageSize);return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}//消息是如果没有足够的存储空间则新创建CommitLog文件
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3 The remaining space may be any value// Here the length of the specially set maxBlankfinal long beginTimeMills = CommitLog.this.defaultMessageStore.now();byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}//将消息存储到ByteBuffer中,返回AppendMessageResult
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() -beginTimeMills);
switch (tranType) {case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE://更新消息队列偏移量CommitLog.this.topicQueueTable.put(key, ++queueOffset);break;default:break;
}

8、 代码:CommitLog#putMessage

//释放锁
putMessageLock.unlock();
//刷盘
handleDiskFlush(result, putMessageResult, msg);
//执行HA主从同步
handleHA(result, putMessageResult, msg);

三、 消息中间件 RocketMQ 源码分析:消息存储文件介绍

1、消息存储文件结构图:

在这里插入图片描述

2、消息存储文件介绍:

  • commitLog:消息存储目录
  • config:运行期间一些配置信息
  • consumerqueue:消息消费队列存储目录
  • index:消息索引文件存储目录
  • abort:如果存在改文件寿命 Broker 非正常关闭
  • checkpoint:文件检查点,存储 CommitLog 文件最后一次刷盘时间戳、consumerquueue 最后一次刷盘时间,index 索引文件最后一次刷盘时间戳。

四、 消息中间件 RocketMQ 源码分析:存储文件内存映射-MappedFileQueue

1、消息 存储文件内存映射

RocketMQ 通过使用内存映射文件提高 IO 访问性能,无论是 CommitLog、ConsumerQueue 还是 IndexFile,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。

2、消息 存储文件内存映射-MappedFileQueue

在这里插入图片描述

String storePath;	//存储目录
int mappedFileSize;	// 单个文件大小
CopyOnWriteArrayList<MappedFile> mappedFiles;	//MappedFile文件集合
AllocateMappedFileService allocateMappedFileService;	//创建MapFile服务类
long flushedWhere = 0;		//当前刷盘指针
long committedWhere = 0;	//当前数据提交指针,内存中ByteBuffer当前的写指针,该值大于等于flushWhere

3、根据存储时间查询 MappedFile


public MappedFile getMappedFileByTime(final long timestamp) {Object[] mfs = this.copyMappedFiles(0);if (null == mfs)return null;//遍历MappedFile文件数组for (int i = 0; i < mfs.length; i++) {MappedFile mappedFile = (MappedFile) mfs[i];//MappedFile文件的最后修改时间大于指定时间戳则返回该文件if (mappedFile.getLastModifiedTimestamp() >= timestamp) {return mappedFile;}}return (MappedFile) mfs[mfs.length - 1];
}

4、根据消息偏移量 offset 查找 MappedFile


public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {//获得第一个MappedFile文件MappedFile firstMappedFile = this.getFirstMappedFile();//获得最后一个MappedFile文件MappedFile lastMappedFile = this.getLastMappedFile();//第一个文件和最后一个文件均不为空,则进行处理if (firstMappedFile != null && lastMappedFile != null) {if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {} else {//获得文件索引int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {//根据索引返回目标文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null;
}

5、获取存储文件最小偏移量


public long getMinOffset() {if (!this.mappedFiles.isEmpty()) {try {return this.mappedFiles.get(0).getFileFromOffset();} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getMinOffset has exception.", e);}}return -1;
}

6、获取存储文件最大偏移量


public long getMaxOffset() {MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();}return 0;
}

7、返回存储文件当前写指针


public long getMaxWrotePosition() {MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();}return 0;
}

五、 消息中间件 RocketMQ 源码分析:存储文件内存映射-MappedFile

1、消息 存储文件内存映射-MappedFile

MappedFile 示例图:

在这里插入图片描述

2、消息 存储文件内存映射-MappedFile 文件介绍:


int OS_PAGE_SIZE = 1024 * 4;		//操作系统每页大小,默认4K
AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);	//当前JVM实例中MappedFile虚拟内存
AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);	//当前JVM实例中MappedFile对象个数
AtomicInteger wrotePosition = new AtomicInteger(0);	//当前文件的写指针
AtomicInteger committedPosition = new AtomicInteger(0);	//当前文件的提交指针
AtomicInteger flushedPosition = new AtomicInteger(0);	//刷写到磁盘指针
int fileSize;	//文件大小
FileChannel fileChannel;	//文件通道	
ByteBuffer writeBuffer = null;	//堆外内存ByteBuffer
TransientStorePool transientStorePool = null;	//堆外内存池
String fileName;	//文件名称
long fileFromOffset;	//该文件的处理偏移量
File file;	//物理文件
MappedByteBuffer mappedByteBuffer;	//物理文件对应的内存映射Buffer
volatile long storeTimestamp = 0;	//文件最后一次内容写入时间
boolean firstCreateInQueue = false;	//是否是MappedFileQueue队列中第一个文件

3、 MappedFile 初始化

  • 未开启transientStorePoolEnabletransientStorePoolEnable=truetrue表示数据先存储到堆外内存,然后通过Commit线程将数据提交到内存映射Buffer中,再通过Flush线程将内存映射Buffer中数据持久化磁盘。
private void init(final String fileName, final int fileSize) throws IOException {this.fileName = fileName;this.fileSize = fileSize;this.file = new File(fileName);this.fileFromOffset = Long.parseLong(this.file.getName());boolean ok = false;ensureDirOK(this.file.getParent());try {this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);TOTAL_MAPPED_FILES.incrementAndGet();ok = true;} catch (FileNotFoundException e) {log.error("create file channel " + this.fileName + " Failed. ", e);throw e;} catch (IOException e) {log.error("map file " + this.fileName + " Failed. ", e);throw e;} finally {if (!ok && this.fileChannel != null) {this.fileChannel.close();}}
}

4、 开启transientStorePoolEnable


public void init(final String fileName, final int fileSize,final TransientStorePool transientStorePool) throws IOException {init(fileName, fileSize);this.writeBuffer = transientStorePool.borrowBuffer();	//初始化writeBufferthis.transientStorePool = transientStorePool;
}

5、 MappedFile 提交

提交数据到 FileChannel,commitLeastPages 为本次提交最小的页数,如果待提交数据不满 commitLeastPages,则不执行本次提交操作。如果 writeBuffer 如果为空,直接返回 writePosition 指针,无需执行 commit 操作,表名 commit 操作主体是 writeBuffer。


public int commit(final int commitLeastPages) {if (writeBuffer == null) {//no need to commit data to file channel, so just regard wrotePosition as committedPosition.return this.wrotePosition.get();}//判断是否满足提交条件if (this.isAbleToCommit(commitLeastPages)) {if (this.hold()) {commit0(commitLeastPages);this.release();} else {log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());}}// 所有数据提交后,清空缓冲区if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {this.transientStorePool.returnBuffer(writeBuffer);this.writeBuffer = null;}return this.committedPosition.get();
}

6、 MappedFile#isAbleToCommit

判断是否执行 commit 操作,如果文件已满返回 true;如果 commitLeastpages 大于 0,则比较 writePosition 与上一次提交的指针 commitPosition 的差值,除以 OS_PAGE_SIZE 得到当前脏页的数量,如果大于 commitLeastPages 则返回 true,如果 commitLeastpages 小于0表示只要存在脏页就提交。


protected boolean isAbleToCommit(final int commitLeastPages) {//已经刷盘指针int flush = this.committedPosition.get();//文件写指针int write = this.wrotePosition.get();//写满刷盘if (this.isFull()) {return true;}if (commitLeastPages > 0) {//文件内容达到commitLeastPages页数,则刷盘return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;}return write > flush;
}

7、 MappedFile#commit0

具体提交的实现,首先创建 WriteBuffer 区共享缓存区,然后将新创建的 position 回退到上一次提交的位置(commitPosition),设置 limit 为 wrotePosition(当前最大有效数据指针),然后把 commitPosition 到 wrotePosition 的数据写入到 FileChannel 中,然后更新 committedPosition 指针为 wrotePosition。commit 的作用就是将 MappedFile 的 writeBuffer 中数据提交到文件通道 FileChannel 中。


protected void commit0(final int commitLeastPages) {//写指针int writePos = this.wrotePosition.get();//上次提交指针int lastCommittedPosition = this.committedPosition.get();if (writePos - this.committedPosition.get() > 0) {try {//复制共享内存区域ByteBuffer byteBuffer = writeBuffer.slice();//设置提交位置是上次提交位置byteBuffer.position(lastCommittedPosition);//最大提交数量byteBuffer.limit(writePos);//设置fileChannel位置为上次提交位置this.fileChannel.position(lastCommittedPosition);//将lastCommittedPosition到writePos的数据复制到FileChannel中this.fileChannel.write(byteBuffer);//重置提交位置this.committedPosition.set(writePos);} catch (Throwable e) {log.error("Error occurred when commit data to FileChannel.", e);}}
}

8、 MappedFile#flush

刷写磁盘,直接调用 MappedByteBuffer 或 fileChannel 的 force 方法将内存中的数据持久化到磁盘,那么 flushedPosition 应该等于 MappedByteBuffer 中的写指针;如果 writeBuffer不为空,则 flushPosition 应该等于上一次的 commit指针;因为上一次提交的数据就是进入到 MappedByteBuffer 中的数据;如果 writeBuffer 为空,数据时直接进入到 MappedByteBuffer,wrotePosition 代表的是 MappedByteBuffer 中的指针,故设置 flushPosition 为 wrotePosition。

在这里插入图片描述

public int flush(final int flushLeastPages) {//数据达到刷盘条件if (this.isAbleToFlush(flushLeastPages)) {//加锁,同步刷盘if (this.hold()) {//获得读指针int value = getReadPosition();try {//数据从writeBuffer提交数据到fileChannel再刷新到磁盘if (writeBuffer != null || this.fileChannel.position() != 0) {this.fileChannel.force(false);} else {//从mmap刷新数据到磁盘this.mappedByteBuffer.force();}} catch (Throwable e) {log.error("Error occurred when force data to disk.", e);}//更新刷盘位置this.flushedPosition.set(value);this.release();} else {log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());this.flushedPosition.set(getReadPosition());}}return this.getFlushedPosition();
}

9、 MappedFile#getReadPosition

获取当前文件最大可读指针。如果 writeBuffer 为空,则直接返回当前的写指针;如果 writeBuffer 不为空,则返回上一次提交的指针。在 MappedFile 设置中,只有提交了的数据(写入到 MappedByteBuffer 或 FileChannel 中的数据)才是安全的数据


public int getReadPosition() {//如果writeBuffer为空,刷盘的位置就是应该等于上次commit的位置,如果为空则为mmap的写指针return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}

10、 MappedFile#selectMappedBuffer

查找 pos 到当前最大可读之间的数据,由于在整个写入期间都未曾改 MappedByteBuffer 的指针,如果 mappedByteBuffer.slice()方法返回的共享缓存区空间为整个 MappedFile,然后通过设置 ByteBuffer 的position 为待查找的值,读取字节长度当前可读最大长度,最终返回的 ByteBuffer的limit 为 size。整个共享缓存区的容量为(MappedFile#fileSize-pos)。故在操作 SelectMappedBufferResult 不能对包含在里面的 ByteBuffer 调用 filp 方法。


public SelectMappedBufferResult selectMappedBuffer(int pos) {//获得最大可读指针int readPosition = getReadPosition();//pos小于最大可读指针,并且大于0if (pos < readPosition && pos >= 0) {if (this.hold()) {//复制mappedByteBuffer读共享区ByteBuffer byteBuffer = this.mappedByteBuffer.slice();//设置读指针位置byteBuffer.position(pos);//获得可读范围int size = readPosition - pos;//设置最大刻度范围ByteBuffer byteBufferNew = byteBuffer.slice();byteBufferNew.limit(size);return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);}}return null;
}

11、 MappedFile#shutdown

MappedFile 文件销毁的实现方法为 public boolean destory(long intervalForcibly),intervalForcibly 表示拒绝被销毁的最大存活时间。


public void shutdown(final long intervalForcibly) {if (this.available) {//关闭MapedFilethis.available = false;//设置当前关闭时间戳this.firstShutdownTimestamp = System.currentTimeMillis();//释放资源this.release();} else if (this.getRefCount() > 0) {if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {this.refCount.set(-1000 - this.getRefCount());this.release();}}
}

六、 消息中间件 RocketMQ 源码分析:存储文件内存映射-TransientStorePool

1、消息 存储文件内存映射-TransientStorePool 介绍:

短暂的存储池。RocketMQ 单独创建一个 MappedByteBuffer 内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由 commit 线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。RocketMQ 引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。

2、 TransientStorePool 示例图:

在这里插入图片描述

3、 TransientStorePool 代码:


private final int poolSize;		//availableBuffers个数
private final int fileSize;		//每隔ByteBuffer大小
private final Deque<ByteBuffer> availableBuffers;	//ByteBuffer容器。双端队列

4、 TransientStorePool 初始化


public void init() {//创建poolSize个堆外内存for (int i = 0; i < poolSize; i++) {ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);final long address = ((DirectBuffer) byteBuffer).address();Pointer pointer = new Pointer(address);//使用com.sun.jna.Library类库将该批内存锁定,避免被置换到交换区,提高存储性能LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));availableBuffers.offer(byteBuffer);}
}

上一节关联链接请点击:
# 消息中间件 RocketMQ 高级功能和源码分析(六)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/30872.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据库 | 期末复习专题(HBUT 韩洪木)

总结&#xff1a; 考研数据库系统概论题目整理_若视图的属性来自聚集函数、表达式,则该视图是可以更新的。-CSDN博客 数据库系统概论 ---知识点大全&#xff08;期末复习版&#xff09;_数据库系统概论期末复习-CSDN博客 03数据库关系代数习题_关系代数例题-CSDN博客 【数据库…

select的奇葩操作总结

摘要&#xff1a; 世界奇葩事千千万&#xff0c;select操作占一半&#xff01;最近原生开发中遇到一些问题&#xff0c;特别是select&#xff01; select原生实现二级树&#xff1a; php的twig下实现占位符号错位实现 <select name"category_id" id"input-c…

珈和科技和比昂科技达成战略合作,共创智慧农业领域新篇章

6月14日&#xff0c;四川省水稻、茶叶病虫害监测预警与绿色防控培训班在成都蒲江举办。本次培训班由四川省农业农村厅植物保护站主办&#xff0c;蒲江县农业农村局、成都比昂科技筹办。四川省农业农村厅植物保护站及四川省14个市州36个县植保站负责人进行了观摩学习。 武汉珈…

R3CTF NinjaClub复现

R3CTF NinjaClub jinjia2沙箱 题目源码 from jinja2.sandbox import SandboxedEnvironment, is_internal_attribute from jinja2.exceptions import UndefinedError from fastapi import FastAPI, Form from fastapi.responses import HTMLResponse from pydantic import Bas…

商超便利店收银系统源码推荐

细节决定成败&#xff0c;无论是做什么事情都要注重细节&#xff0c;让我们来看看关于商超便利店陈列的“细节”有哪些需要注意的地方。 首先要注意商品不要摆太高&#xff0c;放在适当位置即可&#xff01; 商超便利店内&#xff0c;销量最佳的物品摆放位置依次为与顾客视线…

卡巴斯基安全卡片

卡巴斯基委托我们制作展示各种安全场景的插图卡片&#xff0c;这些卡片用于在欧洲委员会支持下开发的互动在线培训课程。我们的设计师为这个项目创造了一种独特的风格&#xff0c;既美观又实用。卡片展示了可能出现的潜在危险情况&#xff0c;例如在购物中心、公交车站或办公室…

2004年上半年软件设计师【下午题】试题及答案

文章目录 2004年上半年软件设计师下午题--试题2004年上半年软件设计师下午题--答案2004年上半年软件设计师下午题–试题

若依 Excel导入 字段值转换 字典自动匹配转换等

Excel表格数据截图&#xff1a; 数据库结构&#xff1a;需要将数据转换为数值或char类型存储 转换结果&#xff1a; 未使用Excel注解参数的效果&#xff1a; 断点数据 使用Excel注解参数的效果&#xff1a; 断点数据 最终入库&#xff1a; 参考&#xff1a; http://doc.ru…

华为Pocket 2,夏日达人的时尚新宠!

夏天炎炎&#xff0c;适合撒欢~但时尚与便利从不缺席&#xff01;我的时尚新宠华为Pocket 2跟我一起。 高颜值的外观一定是出行拍照和搭配单品的选项&#xff0c;这款小巧精致的手机&#xff0c;外屏设计超级时尚,轻松搭配出夏日潮流风。它的外屏还支持个性化设置&#xff0c;…

0613,基本数据类型,表达式

目录 第三章&#xff08;基本数据类型&#xff09;思维导图 题目1&#xff0c;选做&#xff1a;0xCAFE的各种位运算 答案代码/补&#xff1a; 参考答案&#xff1a; 题目二&#xff0c;必做&#xff1a;判断闰年&#xff0c;下一天&#xff0c;两天时差&#xff0c;星期几…

红队实战宝典之内网渗透测试

本文源自《红队实战宝典之内网渗透测试》一书前言。 近年来&#xff0c;随着计算机网络技术的发展和应用范围的扩大&#xff0c;不同结构、不同规模的局域网和广域网迅速遍及全球。 以互联网为代表的计算机网络技术在短短几十年内经历了从0到1、从简单到复杂的飞速发展&#…

STM32单片机-PWR电源控制和WDG看门狗

STM32单片机-PWR电源控制和WDG看门狗 一、PWR简介二、低功耗模式三、修改主频&睡眠模式&停机模式&待机模式3.1 修改主频3.2 睡眠模式3.3 停机模式3.4 待机模式 四、WDG简介4.1 独立看门狗原理4.2 窗口看门狗原理4.3 IWDG和WWDG对比 五、独立看门狗&窗口看门狗5…

DrissionPage框架应用

DrissionPage框架应用 Scrapy框架可以自定义请求&#xff0c;我们经常使用的selenium,pypuppteer&#xff0c;playwight等模拟浏览器的环境执行网络的请求&#xff1b;但是以上都有被检测的风险&#xff0c;新晋浏览器防检测工具&#xff0c;不仅不需要繁琐的安装浏览器的内核&…

『大模型笔记』斯坦福大学教授李飞飞在2024年数据与人工智能峰会上的人工智能历史与未来

MAC 文章目录 一. 斯坦福大学教授李飞飞在2024年数据与人工智能峰会上的人工智能历史与未来引言过去与现在现代 AI 的进步未来的发展空间智能近期进展与未来展望文字输入制作视频机器人学习AI 与人类互动医疗健康应用结语二. 参考文献一. 斯坦福大学教授李飞飞在2024年数据与人…

全球AI视频技术竞赛加速:Runway即将推出更优更快的第三代AI视频模型|TodayAI

Runway即将在未来几天推出其更优更快的第三代AI视频模型&#xff0c;这是新一代模型中最小的一个。据公司透露&#xff0c;这款名为Gen-3的模型将带来“在真实度、一致性和动态效果上的重大提升”&#xff0c;同时在速度上也有显著的加快。 去年六月&#xff0c;Runway首次推出…

Mathtype与word字号对照+Mathtype与word字号对照

字体大小对照表如下 初号44pt 小初36pt 一号26pt 小一24pt 二号22pt 小二18pt 三号16pt 小三15pt 四号14pt 小四12pt 五号10.5pt 小五9pt 六号7.5pt 小六6.5pt 七号5.5pt 八号5pt 1 保存12pt文件 首选选择第一个公式&#xff0c;将其大小改为12pt 然后依次选择 “预置”—…

ClipArt ETC - 典雅的剪贴画网站

文章目录 ClipArt ETCClippix佛罗里达教学技术中心课堂数字内容 ClipArt ETC 网站地址&#xff1a; https://etc.usf.edu/clipart/ ClipArt ETC为学生和教师提供了超过71,500件高质量的教育剪贴画。 每个插图都有图像大小的选择以及学校项目中正确引用的完整源信息。 所有图像…

【Java】已解决java.sql.SQLRecoverableException异常

文章目录 一、分析问题背景二、可能出错的原因三、错误代码示例四、正确代码示例五、注意事项 已解决java.sql.SQLRecoverableException异常 在Java的数据库编程中&#xff0c;java.sql.SQLRecoverableException是一个重要的异常&#xff0c;它通常表示一个可以恢复的SQL异常。…

【扫雷游戏】C语言教程

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~~ &#x1f4a5;&#x1f4a5;个人主页&#xff1a;奋斗的小羊 &#x1f4a5;&#x1f4a5;所属专栏&#xff1a;C语言 &#x1f680;本系列文章为个人学习…