[RocketMQ] Broker CommitLogDispatcher 异步构建ConsumeQueue和IndexFile源码解析 (十四)

  1. CommitLogDispatcherBuildConsumeQueue: 异步构建ConsumerQueue。
  2. CommitLogDispatcherBuildIndex: 异步构建IndexFile。

    文章目录

        • 1.CommitLogDispatcherBuildConsumeQueue构建ConsumeQueue
          • 1.1 putMessagePositionInfo写入消息位置信息
          • 1.2 findConsumeQueue查找ConsumeQueue
            • 1.2.1 创建ConsumeQueue
          • 1.3 putMessagePositionInfoWrapper追加消息索引
            • 1.3.1 putMessagePositionInfo写入消息位置信息
            • 1.3.2 MappedFile#appendMessage追加消息
        • 2.CommitLogDispatcherBuildIndex构建IndexFile
          • 2.1 buildIndex构建Index索引
            • 2.1.1 retryGetAndCreateIndexFile获取IndexFile
            • 2.1.2 getAndCreateLastIndexFile获取最新索引文件
            • 2.1.3 创建IndexFile
            • 2.1.4 buildKey构建Key
            • 2.1.5 putKey构建Index索引
        • 3.总结

1.CommitLogDispatcherBuildConsumeQueue构建ConsumeQueue

CommitLogDispatcherBuildConsumeQueue用于接收分发请求并构建ConsumeQueue。

对于非事务消息或者是事务commit消息, 调用DefaultMessageStore#putMessagePositionInfo方法写入消息位置信息到consumeQueue, 如果是事务prepared消息和事务rollback消息, 则不出理。

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {/*** DefaultMessageStore的方法** @param request 分派消息请求*/@Overridepublic void dispatch(DispatchRequest request) {//从该消息的消息系统flag中获取事务状态final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());switch (tranType) {//如果不是事务消息或者是事务commit消息,则进行处理case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE://写入消息位置信息到consumeQueueDefaultMessageStore.this.putMessagePositionInfo(request);break;//如果是事务prepared消息或者是事务rollback消息,则不进行处理case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;}}
}

1.1 putMessagePositionInfo写入消息位置信息

  1. 首先调用findConsumeQueue方法根据topic和队列id确定需要写入的ConsumeQueue。
  2. 然后调用ConsumeQueue#putMessagePositionInfoWrapper方法将消息信息追加到ConsumeQueue索引文件中。
/*** DefaultMessageStore的方法* 写入消息位置信息** @param dispatchRequest 分派消息请求*/
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {/** 根据topic和队列id确定ConsumeQueue*/ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());/** 将消息信息追加到ConsumeQueue索引文件中*/cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));
}

1.2 findConsumeQueue查找ConsumeQueue

根据topic和队列id确定需要写入的ConsumeQueue, 查找的目标是consumeQueueTable缓存集合。ConsumerQueue文件是延迟加载的。需要到该ConsumeQueue的时候才会新建。

/*** DefaultMessageStore* <p>* 根据topic和队列id查找ConsumeQueue*/
public ConsumeQueue findConsumeQueue(String topic, int queueId) {//从consumeQueueTable中获取该topic所有的队列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);//如果没有保存该topic的喜喜,那么存入一个空的mapif (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}// 从map中根据queueId 获取对应的 消费队列ConsumeQueue logic = map.get(queueId);//如果ConsumeQueue为null,那么新建,所以说ConsumeQueue是延迟创建的if (null == logic) {//新建ConsumeQueueConsumeQueue newLogic = new ConsumeQueue(topic,queueId,StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//单个文件大小,默认为可存储30W数据的大小,每条数据20Bytethis.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),this);//存入map中,如果已存在则取旧的ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {// light message queue(LMQ)if (MixAll.isLmq(topic)) {lmqConsumeQueueNum.getAndIncrement();}logic = newLogic;}}return logic;
}
1.2.1 创建ConsumeQueue

创建ConsumerQueue, 初始化各种属性, 会初始化20个字节的堆外内存, 用于临时存储单个索引, 可以重复使用。

public ConsumeQueue(final String topic,final int queueId,final String storePath,final int mappedFileSize,final DefaultMessageStore defaultMessageStore) {//各种属性this.storePath = storePath;//单个文件大小,默认为可存储30W数据的大小,每条数据20Bytethis.mappedFileSize = mappedFileSize;this.defaultMessageStore = defaultMessageStore;this.topic = topic;this.queueId = queueId;//queue的路径 $HOME/store/consumequeue/{topic}/{queueId}/{fileName}String queueDir = this.storePath+ File.separator + topic+ File.separator + queueId;//创建mappedFileQueue,内部保存在该queueId下面的所有的consumeQueue文件集合mappedFiles相当于一个文件夹this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);//分配20个字节的堆外内存,用于临时存储单个索引,这段内存可循环使用this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);//是否启用消息队列的扩展存储,默认falseif (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {this.consumeQueueExt = new ConsumeQueueExt(topic,queueId,StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt());}
}

ConsumeQueue文件可以看成是基于topic的commitlog索引文件, ConsumeQueue文件的组织方式: topic/queue/file三层组织结构, $HOME/store/consumequeue/{topic}/{queueId}/{fileName}。

在这里插入图片描述

1.3 putMessagePositionInfoWrapper追加消息索引

构建消息索引信息并且存入找到的ConsumeQueue文件中。支持重试, 最多30次。

/*** ConsumeQueue的方法* <p>* 将消息信息追加到ConsumeQueue索引文件中*/
public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) {//最大重试次数30final int maxRetries = 30;//检查ConsumeQueue文件是否可写boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();//如果文件可写,并且重试次数小于30次,那么写入ConsumeQueue索引for (int i = 0; i < maxRetries && canWrite; i++) {//获取tagCodelong tagsCode = request.getTagsCode();//如果支持扩展信息写入,默认falseif (isExtWriteEnable()) {ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();cqExtUnit.setFilterBitMap(request.getBitMap());cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());cqExtUnit.setTagsCode(request.getTagsCode());long extAddr = this.consumeQueueExt.put(cqExtUnit);if (isExtAddr(extAddr)) {tagsCode = extAddr;} else {log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,topic, queueId, request.getCommitLogOffset());}}/** 写入消息位置信息到ConsumeQueue中*/boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());if (result) {if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {//修改StoreCheckpoint中的physicMsgTimestamp:最新commitlog文件的刷盘时间戳,单位毫秒this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());}this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());if (multiQueue) {multiDispatchLmqQueue(request, maxRetries);}return;} else {// XXX: warn and notify melog.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()+ " failed, retry " + i + " times");try {Thread.sleep(1000);} catch (InterruptedException e) {log.warn("", e);}}}// XXX: warn and notify melog.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
1.3.1 putMessagePositionInfo写入消息位置信息

将消息位置信息写入到ConsumeQueue文件中:

  1. 如果消息偏移量+消息大小小于等于ConsumeQueue已处理的最大物理偏移量, 说明该消息已经被写过了, 返回true。
  2. 将消息信息offset、size、tagsCode按照顺序存入临时缓冲区byteBufferIndex中。
  3. 调用getLastMappedFile方法, 根据偏移量获取将要写入的最新ConsumeQueue文件的MappedFile。
  4. 进行一系列校验, 例如是否需要重设索引信息, 是否存在写入错误等等。
  5. 更新消息最大物理偏移量maxPhysicOffset = 消息在CommitLog中的物理偏移量 + 消息的大小。
  6. 调用MappedFile#appendMessage方法将临时缓冲区中的索引信息追加到mappedFile的mappedByteBuffer中, 并且更新wrotePosition的位置信息。

8B的offset+4B的size+8BtagsCode, offset: 消息在CommitLog中的物理偏移量。size: 消息大小。tagsCode: 延迟消息就是消息投递时间, 其他消息就是消息的tags的hashCode。

/*** 写入消息位置信息到ConsumeQueue中** @param offset   消息在CommitLog中的物理偏移量* @param size     消息大小* @param tagsCode 消息tagsCode,延迟消息就是消息投递时间,其他消息就是消息的tags的hashCode* @param cqOffset 消息在消息消费队列的偏移量*/
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) {//如果消息偏移量+消息大小 小于等于ConsumeQueue已处理的最大物理偏移量//说明该消息已经被写过了,直接返回trueif (offset + size <= this.maxPhysicOffset) {log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);return true;}/** 将消息信息offset、size、tagsCode按照顺序存入临时缓冲区byteBufferIndex中*///position指针移到缓冲区头部this.byteBufferIndex.flip();//缓冲区的限制20Bthis.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);//存入8个字节长度的offset,消息在CommitLog中的物理偏移量this.byteBufferIndex.putLong(offset);//存入4个字节长度的size,消息大小this.byteBufferIndex.putInt(size);//存入8个字节长度的tagsCode,延迟消息就是消息投递时间,其他消息就是消息的tags的hashCodethis.byteBufferIndex.putLong(tagsCode);//已存在索引数据的最大预计偏移量final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;/** 根据偏移量获取将要写入的最新ConsumeQueue文件的MappedFile,可能会新建ConsumeQueue文件*/MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);if (mappedFile != null) {//如果mappedFile是第一个创建的消费队列,并且消息在消费队列的偏移量不为0,并且消费队列写入指针为0//那么表示消费索引数据错误,需要重设索引信息if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {//设置最小偏移量为预计偏移量this.minLogicOffset = expectLogicOffset;//设置刷盘最新位置,提交的最新位置this.mappedFileQueue.setFlushedWhere(expectLogicOffset);this.mappedFileQueue.setCommittedWhere(expectLogicOffset);//对该ConsumeQueue文件expectLogicOffset之前的位置填充前导0this.fillPreBlank(mappedFile, expectLogicOffset);log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "+ mappedFile.getWrotePosition());}//如果消息在消费队列的偏移量不为0,即此前有数据if (cqOffset != 0) {//获取当前ConsumeQueue文件最新已写入物理偏移量long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();//最新已写入物理偏移量大于预期偏移量,那么表示重复构建消费队列if (expectLogicOffset < currentLogicOffset) {log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);return true;}//如果不相等,表示存在写入错误,正常情况下,两个值应该相等,因为一个索引条目固定大小20Bif (expectLogicOffset != currentLogicOffset) {LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset,currentLogicOffset,this.topic,this.queueId,expectLogicOffset - currentLogicOffset);}}//更新消息最大物理偏移量 = 消息在CommitLog中的物理偏移量 + 消息的大小this.maxPhysicOffset = offset + size;/** 将临时缓冲区中的索引信息追加到mappedFile的mappedByteBuffer中,并且更新wrotePosition的位置信息,到此构建ComsumeQueue完毕*/return mappedFile.appendMessage(this.byteBufferIndex.array());}return false;
}
1.3.2 MappedFile#appendMessage追加消息

该方法用于将数据追加到MappedFile, 追加到对应的mappedByteBuffer中, 基于mmap技术仅仅是将数据写入pageCache中, 没有立即刷盘, 依靠操作系统判断刷盘, 保证写入的高可用。

/*** MappedFile的方法* <p>* 追加消息** @param data 追加的数据*/
public boolean appendMessage(final byte[] data) {//获取写入位置int currentPos = this.wrotePosition.get();//如果当前位置加上消息大小小于等于文件大小,那么将消息写入mappedByteBufferif ((currentPos + data.length) <= this.fileSize) {try {//消息写入mappedByteBuffer即可,并没有执行刷盘ByteBuffer buf = this.mappedByteBuffer.slice();buf.position(currentPos);buf.put(data);} catch (Throwable e) {log.error("Error occurred when append message to mappedFile.", e);}//更新写入位置this.wrotePosition.addAndGet(data.length);return true;}return false;
}

2.CommitLogDispatcherBuildIndex构建IndexFile

接收分发请求并构建IndexFile。判断是否支持消息Index, 调用IndexService#buildIndex方法构建, 不存在则不构建, Index文件是否存在都不影响RocketMQ的正常运行, 提高根据keys或者时间范围查询消息的效率。

/*** DefaultMessageStore的方法* 写入消息位置信息到IndexFile** @param request 分派消息请求*/
@Override
public void dispatch(DispatchRequest request) {//是否支持IndexFile,默认trueif (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {//构建IndexDefaultMessageStore.this.indexService.buildIndex(request);}
}

2.1 buildIndex构建Index索引

  1. 通过retryGetAndCreateIndexFile方法获取或创建最新索引文件IndexFile, 最多重试3次。
  2. 判断当前消息在commitlog中的偏移量小于该文件的结束索引在commitlog中的偏移量, 表示已为该消息构建Index索引, 直接返回。如果该消息是事务回滚消息, 同样直接返回, 不创建索引。
  3. 获取客户端生成的uniqId, 也叫msgId, 代表客户端生成的唯一一条消息, 如果uniqId不为null的话, 调用putKey()为uniqId创建索引。
  4. 获取客户端传递的keys, 如果keys不是空, 那么调用putKey方法为keys中的每一个key构建索引。
/*** IndexService的方法* <p>* 构建Index索引*/
public void buildIndex(DispatchRequest req) {/** 获取或创建最新索引文件,支持重试最多3次*/IndexFile indexFile = retryGetAndCreateIndexFile();if (indexFile != null) {//获取结束物理索引long endPhyOffset = indexFile.getEndPhyOffset();DispatchRequest msg = req;//获取topic和keysString topic = msg.getTopic();String keys = msg.getKeys();//如果消息在commitlog中的偏移量小于该文件的结束索引在commitlog中的偏移量,那么表示已为该消息之后的消息构建Index索引//此时直接返回,不需要创建索引if (msg.getCommitLogOffset() < endPhyOffset) {return;}//获取该消息的事务类型final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());switch (tranType) {case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:break;//如果是事务回滚消息,则直接返回,不需要创建索引case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:return;}//获取客户端生成的uniqId,也被称为msgId,从逻辑上代表客户端生成的唯一一条消息//如果uniqId不为null,那么为uniqId构建索引if (req.getUniqKey() != null) {indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));if (indexFile == null) {log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());return;}}//获取客户端传递的keys//如果keys不为空,那么为keys中的每一个key构建索引if (keys != null && keys.length() > 0) {//按照空格拆分keyString[] keyset = keys.split(MessageConst.KEY_SEPARATOR);//为keys中的每一个key构建索引for (int i = 0; i < keyset.length; i++) {String key = keyset[i];if (key.length() > 0) {indexFile = putKey(indexFile, msg, buildKey(topic, key));if (indexFile == null) {log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());return;}}}}} else {log.error("build index error, stop building index");}
}
2.1.1 retryGetAndCreateIndexFile获取IndexFile

该方法用于获取或创建索引文件, 支持重试, 最多循环3次, 循环中调用getAndCreateLastIndexFile方法获取最新索引文件, 如果文件写满了或者没有文件, 则自动创建文件。

/*** IndexService的方法* <p>* 获取或创建索引文件,支持重试*/
public IndexFile retryGetAndCreateIndexFile() {IndexFile indexFile = null;//循环尝试,尝试创建索引文件的最大次数为3for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {//获取最新的索引文件,如果文件写满了或者还没有文件则会自动创建新的索引文件indexFile = this.getAndCreateLastIndexFile();//如果获取的indexFile不为null,那么退出循环if (null != indexFile)break;try {log.info("Tried to create index file " + times + " times");Thread.sleep(1000);} catch (InterruptedException e) {log.error("Interrupted", e);}}//标记indexFile异常if (null == indexFile) {this.defaultMessageStore.getAccessRights().makeIndexFileError();log.error("Mark index file cannot build flag");}return indexFile;
}
2.1.2 getAndCreateLastIndexFile获取最新索引文件

获取最新IndexFile, 如果文件写满了或者还没有文件则会自动创建新的索引文件。

  1. 获取读锁。
    1. 如果indexFileList不为空, 重试获取最后的indexFile, 否则创建一个新的。
    2. 如果最后一个indexFile没写满, 赋值给indexFile。
    3. 如果最后一个IndexFile写满了, 创建新文件, 获取目前最后一个文件的endPhyOffset, endTimestamp等信息。
  2. 如果上一步没有获取到indexFile, 创建新的IndexFile。
    1. 获取完整文件名 $HOME/store/index${fileName}。fileName是以创建时的时间戳命名的。
    2. 调用IndexFile的构造器创建新的IndexFile。
    3. 获取写锁, 将新建的IndexFile加入indexFileList, 释放写锁。
    4. 创建新文件后, 尝试将上一个文件刷盘。
  3. 返回获取的indexFile。
/*** IndexService的方法* <p>* 获取最新的索引文件,如果文件写满了或者还没有文件则会自动创建新的索引文件*/
public IndexFile getAndCreateLastIndexFile() {IndexFile indexFile = null;IndexFile prevIndexFile = null;long lastUpdateEndPhyOffset = 0;long lastUpdateIndexTimestamp = 0;/** 尝试获取最新IndexFile*/{//尝试获取读锁this.readWriteLock.readLock().lock();//如果indexFileList不为空if (!this.indexFileList.isEmpty()) {//尝试获取最后一个IndexFileIndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);if (!tmp.isWriteFull()) {//如果最后一个IndexFile没写满,则赋值给indexFileindexFile = tmp;} else {//如果最后一个IndexFile写满了,则创建新文件//获取目前最后一个文件的endPhyOffsetlastUpdateEndPhyOffset = tmp.getEndPhyOffset();//获取目前最后一个文件的endTimestamplastUpdateIndexTimestamp = tmp.getEndTimestamp();//赋值给prevIndexFileprevIndexFile = tmp;}}this.readWriteLock.readLock().unlock();}/** 尝试创建一个新的IndexFile*/if (indexFile == null) {try {//获取完整文件名$HOME/store/index${fileName},fileName是以创建时的时间戳命名的,精确到毫秒String fileName =this.storePath + File.separator+ UtilAll.timeMillisToHumanString(System.currentTimeMillis());//创建IndexFileindexFile =new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,lastUpdateIndexTimestamp);//获取写锁this.readWriteLock.writeLock().lock();//加入到indexFileList集合中this.indexFileList.add(indexFile);} catch (Exception e) {log.error("getLastIndexFile exception ", e);} finally {//释放写锁this.readWriteLock.writeLock().unlock();}/** 创建了新的文件之后,尝试将上一个文件刷盘*/if (indexFile != null) {final IndexFile flushThisFile = prevIndexFile;/** 新开一个线程,异步的对上一个IndexFile文件刷盘*/Thread flushThread = new Thread(new Runnable() {@Overridepublic void run() {IndexService.this.flush(flushThisFile);}}, "FlushIndexFileThread");flushThread.setDaemon(true);flushThread.start();}}return indexFile;
}
2.1.3 创建IndexFile

第一次构建Index或者之前的IndexFile写满了的时候, 创建新的IndexFile。IndexFile文件大小约为
40B 头数据indexHeader + 500w * 4B hashslot + 2000w * 20B index = 420000040B: 400M大小。

/*** 创建IndexFile** @param fileName     文件名* @param hashSlotNum  哈希槽数量,默认5000000* @param indexNum     索引数量默认,默认5000000 * 4* @param endPhyOffset 上一个文件的endPhyOffset* @param endTimestamp 上一个文件的endTimestamp* @throws IOException*/
public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,final long endPhyOffset, final long endTimestamp) throws IOException {//文件大小,默认约400M左右//40B 头数据 + 500w * 4B hashslot + 2000w * 20B indexint fileTotalSize =IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);//构建mappedFilethis.mappedFile = new MappedFile(fileName, fileTotalSize);this.fileChannel = this.mappedFile.getFileChannel();this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();this.hashSlotNum = hashSlotNum;this.indexNum = indexNum;//生成DirectByteBuffer,对该buffer写操作会被反映到文件里面ByteBuffer byteBuffer = this.mappedByteBuffer.slice();//获取indexHeaderthis.indexHeader = new IndexHeader(byteBuffer);//设置新文件的起始物理索引和结束物理索引都为上一个文件的结束物理索引if (endPhyOffset > 0) {this.indexHeader.setBeginPhyOffset(endPhyOffset);this.indexHeader.setEndPhyOffset(endPhyOffset);}//设置新文件的起始时间戳和结束时间戳都为上一个文件的结束时间戳if (endTimestamp > 0) {this.indexHeader.setBeginTimestamp(endTimestamp);this.indexHeader.setEndTimestamp(endTimestamp);}
}
2.1.4 buildKey构建Key

构建Index索引的key, RocketMQ将会为uniqId和keys中的每个key构建索引。

UniqKey将会转换为topic#UniqKey, 而keys则会先通过空格拆分, 然后将每个key转换为topic#key。

/*** IndexService的方法* 构建key*/
private String buildKey(final String topic, final String key) {//拼接return topic + "#" + key;
}
2.1.5 putKey构建Index索引

IndexFile文件大约400M, 一个IndexFile文件可以保存2000W个索引, IndexFile底层是HashMap结构, 故RocketMQ的索引文件底层实现为hash索引。

putKey方法循环调用indexFile#putKey方法构建Index索引, 每次构建失败都将调用retryGetAndCreateIndexFile方法尝试获取或创建最新索引文件然后再尝试构建。

  1. 判断如果当前文件的index索引数量小于2000w, 则表明当前文件还可以继续构建索引。
  2. 计算Key的哈希值keyHash, 通过 哈希值keyHash & hash槽数量hashSlotNum获取key对应的hash槽的下标slotPos, 计算该消息的绝对hash槽偏移量 absSlotPos = 40B + slotPos * 4B。
  3. 计算当前消息在commitlog中的消息存储时间与该Index文件起始时间差timeDiff, 计算该消息的索引存放位置的绝对偏移量absIndexPos = 40B + 500w * 4B + indexCount * 20B。
  4. 在absIndexPos位置顺序存放Index索引数据, 大小为20B, 存入4B的当前消息的Key的哈希值, 存入8B的当前消息在commitlog中的物理偏移量, 存入4B的当前消息在commitlog中的消息存储时间与该Index文件起始时间差, 存入4B的slotValue, 可更新当前hash槽的值为最新的IndexFile的索引条目计数的编号。
  5. 在absSlotPos位置更新当前hash槽的值为最新的IndexFile的索引条目计数的编号, 为当前索引存入的编号。
  6. 判断如果索引数量小于等于1, 说明时该文件第一次存入索引, 初始化beginPhyOffset和beginTimestamp。
  7. 判断如果slotValue为0, 则为一个新槽, hashSlotCount + 1。
  8. 索引条目计数indexCount自增1, 设置新的endPhyOffset和endTimestamp。
/*** IndexService的方法* <p>* 构建Index索引** @param indexFile indexFile* @param msg       消息* @param idxKey    key*/
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {//循环尝试构建Index索引for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");//构建失败,则尝试获取或创建最新索引文件,支持重试indexFile = retryGetAndCreateIndexFile();if (null == indexFile) {return null;}//再次尝试构建Index索引ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());}return indexFile;
}/*** IndexFile的方法* <p>* 构建Index索引** @param key            key* @param phyOffset      当前消息在commitlog中的物理偏移量* @param storeTimestamp 当前消息在commitlog中的消息存储时间* @return*/
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {//如果当前文件的index索引数量小于2000w,则表明当前文件还可以继续构建索引if (this.indexHeader.getIndexCount() < this.indexNum) {//计算Key的哈希值int keyHash = indexKeyHashMethod(key);//通过 哈希值 & hash槽数量 的方式获取当前key对应的hash槽下标位置,hashSlotNum默认为5000wint slotPos = keyHash % this.hashSlotNum;//计算该消息的绝对hash槽偏移量 absSlotPos = 40B + slotPos * 4Bint absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;FileLock fileLock = null;try {// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,// false);//获取当前hash槽的值,一个hash槽大小为4Bint slotValue = this.mappedByteBuffer.getInt(absSlotPos);//如果值不为0说明这个hash key已经存在,即存在hash冲突if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {slotValue = invalidIndex;}//当前消息在commitlog中的消息存储时间与该Index文件起始时间差long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();timeDiff = timeDiff / 1000;if (this.indexHeader.getBeginTimestamp() <= 0) {timeDiff = 0;} else if (timeDiff > Integer.MAX_VALUE) {timeDiff = Integer.MAX_VALUE;} else if (timeDiff < 0) {timeDiff = 0;}//获取该消息的索引存放位置的绝对偏移量 absIndexPos = 40B + 500w * 4B + indexCount * 20Bint absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ this.indexHeader.getIndexCount() * indexSize;//存入4B的当前消息的Key的哈希值this.mappedByteBuffer.putInt(absIndexPos, keyHash);//存入8B的当前消息在commitlog中的物理偏移量this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);//存入4B的当前消息在commitlog中的消息存储时间与该Index文件起始时间差this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);//存入4B的slotValue,即前面读出来的 slotValue,可能是0,也可能不是0,而是上一个发生hash冲突的索引条目的编号this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//更新当前hash槽的值为最新的IndexFile的索引条目计数的编号,也就是当前索引存入的编号this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());/** 从存入的数据可以看出来:* IndexFile采用用slotValue字段将所有冲突的索引用链表的方式串起来了,而哈希槽SlotTable并不保存真正的索引数据,* 而是保存每个槽位对应的单向链表的头,即可以看作是头插法插入数据*///如果索引数量小于等于1,说明时该文件第一次存入索引,那么初始化beginPhyOffset和beginTimestampif (this.indexHeader.getIndexCount() <= 1) {this.indexHeader.setBeginPhyOffset(phyOffset);this.indexHeader.setBeginTimestamp(storeTimestamp);}//如果slotValue为0,那么表示采用了一个新的哈希槽,此时hashSlotCount自增1if (invalidIndex == slotValue) {this.indexHeader.incHashSlotCount();}//因为存入了新的索引,那么索引条目计数indexCount自增1this.indexHeader.incIndexCount();//设置endPhyOffset和endTimestampthis.indexHeader.setEndPhyOffset(phyOffset);this.indexHeader.setEndTimestamp(storeTimestamp);return true;} catch (Exception e) {log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);} finally {if (fileLock != null) {try {fileLock.release();} catch (IOException e) {log.error("Failed to release the lock", e);}}}} else {log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()+ "; index max num = " + this.indexNum);}return false;
}

3.总结

在这里插入图片描述

在这里插入图片描述

IndexFile的构成包括40B的Header头信息, 4 x 500wB的Slot信息, 20 x 2000wB的Index信息

  1. Header: java为IndexHeader, 8B的的beginTimestamp, 8B的endTimestamp, 8B的beginPhyOffset, 8B的endPhyOffset, 4B的hashSlotCount 哈希槽计数, 4B的indexCount 索引条目计数。

  2. slot Table并不保存真正的索引数据, 存储的是每个槽位对应的单向链表的头。即最新消息的索引条目计数的编号indexCount。

  3. 索引信息: 4B的Key Hash, Key的哈希值。8B的CommitLog Offset, 当前消息在commitlog中的物理偏移量。4B的Timestamp, 当前消息在commitlog中的消息存储时间与该Index文件起始时间差。4B的NextIndex offset, 链表的下一个索引的Index位置。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/4877.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

go初识iris框架(二) - get,post请求和数据格式

继初步了解iris后 文章目录 获取url路径获取数据get请求post请求获取JSON数据格式JSON返回值获取XML数据格式XML返回值 获取url路径 package mainimport "github.com/kataras/iris/v12"func main(){app : iris.New()app.Get("/hello",func(ctx iris.Conte…

Redis报错-CROSSSLOT keys in request don‘t hash in the same slot

背景 问题涉及&#xff1a;spring security、spring session、redis 问题描述 springbootspringsecurityspringsessionantd 登录功能的时候&#xff0c;在源码中使用到了redis的rename命令&#xff08;如下图所示&#xff09; 在这里就会报错 CROSSSLOT keys in request d…

基于小波哈尔法(WHM)的一维非线性IVP测试问题的求解(Matlab代码实现)

&#x1f4a5;1 概述 小波哈尔法&#xff08;WHM&#xff09;是一种求解一维非线性初值问题&#xff08;IVP&#xff09;的数值方法。它基于小波分析的思想&#xff0c;通过将原始问题转化为小波空间中的线性问题&#xff0c;然后进行求解。以下是一维非线性IVP测试问题的求解…

守护数智未来,开源网安受邀参加2023OWASP北京论坛

2023年7月14日&#xff0c;OWASP中国与网安加社区联合举办的“2023OWASP中国北京安全技术论坛”在北京圆满召开&#xff0c;开源网安受邀参加本次论坛并分享“软件供应链安全治理实践”。 本次“2023OWASP中国北京安全技术论坛”是OWASP中国北京地区年度重要活动之一&#xff…

Vue+axios 使用CancelToken多次发送请求取消前面所有正在pendding的请求

需求&#xff1a; 项目中 折线图数据是循环调用的&#xff0c;此时勾选一个设备&#xff0c; 会出现多条线。 原因 折线图数据一进来接口循环在调用&#xff0c;勾选设备时&#xff0c;循环调用的接口有的处于pedding状态 &#xff0c;有的还在加载中&#xff0c;这就导致勾…

【PDFBox】PDFBox操作PDF文档之读取指定页面文本内容、读取所有页面文本内容、根据模板文件生成PDF文档

这篇文章&#xff0c;主要介绍PDFBox操作PDF文档之读取指定页面文本内容、读取所有页面文本内容、根据模板文件生成PDF文档。 目录 一、PDFBox操作文本 1.1、读取所有页面文本内容 1.2、读取指定页面文本内容 1.3、写入文本内容 1.4、替换文本内容 &#xff08;1&#xf…

如何在 Endless OS 上安装 ONLYOFFICE 桌面编辑器

ONLYOFFICE 桌面编辑器是一款基于依据 AGPL v.3 许可进行分发的开源办公套件。使用这款应用&#xff0c;您无需保持网络连接状态即可处理存储在计算机上的文档。本指南会向您介绍&#xff0c;如何在 Endless OS 上安装 ONLYOFFICE 桌面编辑器。 ONLYOFFICE 桌面版是什么 ONLYO…

Spring Boot进阶(55):SpringBoot之集成MongoDB及实战使用 | 超级详细,建议收藏

1. 前言&#x1f525; 前几期我们有介绍Mysql、Redis等数据库介绍及实战演示&#xff0c;对基本的数据存放有很好的共性&#xff0c;但是如果说遇到大面积的xml、Json、bson等格式文档数据存放&#xff0c;以上数据库并非是最优选择&#xff0c;最优选择是Mongodb数据库。 那么…

如何将jar 包下载到自定义maven仓库

下载命令 mvn install:install-file -Dfileartifactid-version.jar -DgroupIdgroupid -DartifactIdartifactid -Dversionversion -Dpackagingjar -DlocalRepositoryPath. -DcreateChecksumtrue参数解释 在上述命令中&#xff0c;需要替换以下参数&#xff1a; artifactid-vers…

HTTP原理解析-超详细

作者&#xff1a;20岁爱吃必胜客&#xff08;坤制作人&#xff09;&#xff0c;近十年开发经验, 跨域学习者&#xff0c;目前于海外某世界知名高校就读计算机相关专业。荣誉&#xff1a;阿里云博客专家认证、腾讯开发者社区优质创作者&#xff0c;在CTF省赛校赛多次取得好成绩。…

vue3+vue-router4:报错Uncaught (in promise) Error: Invalid navigation guard

报错图示&#xff1a; Error: Invalid navigation guard Uncaught (in promise) Error: Invalid navigation guard 错误影响描述&#xff1a; 配置开发、测试、生产时候&#xff0c;因为是公众号&#xff0c;所以想在开发环境下免鉴权&#xff0c;不走微信获取openid接口&a…

potplayer放大画面,画面拖拽。备份

放大画面&#xff1a; 按住alt和鼠标左键&#xff0c;就可以拖动放大后的画面了 窗口化示图

【DC-DC】APS54083 降压恒流驱动器大功率深度调光 舞台 RGB 汽车照明 台灯驱动芯片

产品描述 APS54083 是一款 PWM 工作模式,高效率、外围简单、外置功率 MOS 管&#xff0c;适用于 5-220V 输入高精度降压 LED 恒流驱动芯片。输出最大功率150W最大电流 6A。APS54083 可实现线性调光和 PWM 调光&#xff0c;线性调光脚有效电压范围 0.5-2.5V.PWM 调光频率范围 1…

学习babylon.js --- [3] 开启https

babylonjs提供WebVR功能&#xff0c;但是使用这个功能得用https&#xff0c;本文讲述如何使用自签名证书来开启https&#xff0c;基于第二篇文章中搭建的工程。 一 生成自签名证书 首先要安装openssl&#xff0c;这个去网上搜下就行了。安装完之后在终端下输入openssl回车可以…

MongoDB

MongoDB概述 MongoDB是一个基于分布式文件存储的数据库。由C语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。 MongoDB是一个介于关系数据库和非关系数据库之间的产品&#xff0c;是非关系数据库当中功能最丰富&#xff0c;最像关系数据库的。 它支持的数据结构非…

状态模式:游戏、工作流引擎中常用的状态机是如何实现的?

从今天起&#xff0c;我们开始学习状态模式。在实际的软件开发中&#xff0c;状态模式并不是很常用&#xff0c;但是在能够用到的场景里&#xff0c;它可以发挥很大的作用。从这一点上来看&#xff0c;它有点像我们之前讲到的组合模式。 可以简短的回顾一下组合模式&#xff1a…

uniapp安卓签名证书生成,签名证书的SHA1,SHA256,MD5获取

uniapp安卓证书生成有两种方式&#xff0c;一种是去dcloud开发者中心生成证书&#xff0c;另一种是安装jre环境&#xff0c;自己生成证书 第一种 dcloud生成证书 去该项目对应的应用处&#xff0c;生成证书需要等几分钟&#xff0c;生成后可以查看证书信息 第二种 自己生成…

如何下载SRA存放在AWS的原始数据

通常&#xff0c;我们都是利用prefetch从NCBI上获取数据&#xff0c;然后用fasterp-dump/fastq-dump 转成fastq。但遗憾的SRA的数据是原数据的有损压缩&#xff0c;比如说我19年参与发表的文章里单细胞数据上传的是3个文件&#xff0c;但是当时的faster-dump/fastq-dump只能拆出…

【ArcGIS Pro二次开发】(46):要素类从上到下、从左到右排序

要素类经过编辑之后&#xff0c;【OBJECTID】字段会变得不规律。应部分网友要求&#xff0c;做了这个从上到下、从左到右排序的工具。 不过后来在ArcGIS Pro中发现了一个【排序】工具&#xff0c;已经可以完美实现这个功能需求&#xff0c;发现自己做了个白工。 不过做了不能白…

Ghost Buster Pro for mac(快速清理卸载的应用残存文件)

Ghost Buster Pro for mac可从您已卸载的应用程序中查找并删除文件。该应用程序速度快如闪电&#xff0c;可立即释放内存。 许多应用程序都安装在计算机上&#xff0c;但它们通常只会在您的计算机上停留很短的时间。每个应用程序都会创建文件&#xff0c;但删除应用程序不会删…