RocketMQ源码阅读-Message拉取与消费-Broker篇

RocketMQ源码阅读-Message拉取与消费-Broker篇

  • 1. ConsumeQueue是什么
  • 2. Message重放
    • 2.1 从MappedFile文件读取Message到ConsumeQueue
    • 2.2 ConsumeQueue持久化
  • 3. Broker提供的拉取接口
    • 3.1 请求Header
    • 3.2 拉取消息接口
    • 3.3 拉取失败处理
  • 4. Broker提供的更新消费进度接口
  • 5. Broker 提供发回消息接口
  • 6. 小结

上一篇分析到Message由CommitLog存储到了MappedFile 文件中。
消费者就可以去拉取并消费Message了。
Message的拉取与消费是由Broker和Consume共同完成的,本篇先来看Broker对于Message拉取的处理代码。

DefaultMessageStore的内部类ReputMessageService完成消息的重放,将CommitLog存储到了MappedFile的Message进行重放,存储到ConsumeQueue中。

Rocket框架图:

借用一张图(出处https://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/),描述了消费逻辑:
image.png
图中:

  • Prodcer:消息生产者
  • CommitLog:消息持久化存储的位置
  • ConsumeQueue:存储消息在CommitLog中的位置信息
  • Consumer:消息消费者

1. ConsumeQueue是什么

ConsumeQueue中存储消息在CommitLog中的位置信息。
ConsumeQueue、MappedFileQueue、MappedFile 的关系如下:

2. Message重放

2.1 从MappedFile文件读取Message到ConsumeQueue

消息存储到文件后,消费方需要感知新消息的到来。这个过程就是Message重放机制。Message重放是由DefaultMessageStore的内部类ReputMessageService完成的。
image.png

ReputMessageService实现了Runable接口,并重写了run方法:

@Override
public void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {Thread.sleep(1);this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

run方法执行的逻辑是,每1ms执行一遍ReputMessageService#doReput()方法。
这个线程是在BrokerStartup启动时调用调用BrokerStartup#start方法,再调用BrokerController#start方法,再调用DefaultMessageStore#start方法,最终调用ReputMessageService#start方法,其实是Thread的start方法,最终调用ReputMessageService#run方法。

ReputMessageService类源码如下:

class ReputMessageService extends ServiceThread {// 开始重放消息的CommitLog物理位置private volatile long reputFromOffset = 0;public long getReputFromOffset() {return reputFromOffset;}public void setReputFromOffset(long reputFromOffset) {this.reputFromOffset = reputFromOffset;}@Overridepublic void shutdown() {for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {try {Thread.sleep(100);} catch (InterruptedException ignored) {}}if (this.isCommitLogAvailable()) {log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);}super.shutdown();}// 剩余需要重放消息字节数public long behind() {return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;}// 是否commitLog需要重放消息private boolean isCommitLogAvailable() {return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();}private void doReput() {if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();}for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {break;}// 获取从reputFromOffset开始的commitLog对应的MappeFile对应的MappedByteBufferSelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);if (result != null) {try {this.reputFromOffset = result.getStartOffset();// 遍历MappedByteBufferfor (int readSize = 0; readSize < result.getSize() && doNext; ) {// 生成重放消息重放调度请求DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);// 消息长度int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();// 根据请求的结果处理if (dispatchRequest.isSuccess()) {if (size > 0) {// 读取MessageDefaultMessageStore.this.doDispatch(dispatchRequest);// 通知有新消息if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());}// 统计this.reputFromOffset += size;readSize += size;if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).addAndGet(dispatchRequest.getMsgSize());}} else if (size == 0) {// 读取到MappedFile文件尾this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) {// 读取失败if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;// If user open the dledger pattern or the broker is master node,// it will not ignore the exception and fix the reputFromOffset variableif (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",this.reputFromOffset);this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext = false;}}}@Overridepublic void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {Thread.sleep(1);this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}DefaultMessageStore.log.info(this.getServiceName() + " service end");}@Overridepublic String getServiceName() {return ReputMessageService.class.getSimpleName();}}

ReputMessageService的两个主要功能:

  • 不断生成消息位置信息到消息队列 ConsumeQueue
  • 不断生成消息索引到索引文件 IndexFile

查看第71行读取Message的方法DefaultMessageStore#doDispatch(…):

public void doDispatch(DispatchRequest req) {for (CommitLogDispatcher dispatcher : this.dispatcherList) {dispatcher.dispatch(req);}
}

循环调用CommitLogDispatcher#dispatch,CommitLogDispatcher是接口,有三个实现类:

  • CommitLogDispatcherBuildConsumeQueue(DefaultMessageStore的内部类):操作ConsumeQueue
  • CommitLogDispatcherBuildIndex(DefaultMessageStore的内部类):操作索引文件
  • CommitLogDispatcherCalcBitMap://todo 还没分析

在DefaultMessageStore的构造方法中会进行dispatcherList链表的初始化:

this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

因此,会依次调用CommitLogDispatcherBuildConsumeQueue#dispatch和CommitLogDispatcherBuildIndex#dispatch。
先来看CommitLogDispatcherBuildConsumeQueue#dispatch:

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {@Overridepublic void dispatch(DispatchRequest request) {final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());switch (tranType) {case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:// 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueueDefaultMessageStore.this.putMessagePositionInfo(request);break;case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;}}
}

主要功能是非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue。调用了DefaultMessageStore#putMessagePositionInfo:

// 建立 消息位置信息 到 ConsumeQueue
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());cq.putMessagePositionInfoWrapper(dispatchRequest);
}

DefaultMessageStore#putMessagePositionInfoWrapper:

// 添加位置信息封装
public void putMessagePositionInfoWrapper(DispatchRequest request) {final int maxRetries = 30;boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();// 多次循环写,直到成功for (int i = 0; i < maxRetries && canWrite; i++) {long tagsCode = request.getTagsCode();if (isExtWriteEnable()) {ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();cqExtUnit.setFilterBitMap(request.getBitMap());cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());cqExtUnit.setTagsCode(request.getTagsCode());long extAddr = this.consumeQueueExt.put(cqExtUnit);if (isExtAddr(extAddr)) {tagsCode = extAddr;} else {log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,topic, queueId, request.getCommitLogOffset());}}// 调用添加位置信息boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());if (result) {// 添加成功,使用消息存储时间 作为 存储check pointthis.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());return;} else {// XXX: warn and notify melog.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()+ " failed, retry " + i + " times");try {Thread.sleep(1000);} catch (InterruptedException e) {log.warn("", e);}}}// XXX: warn and notify melog.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}

调用添加位置信息方法为DefaultMessageStore#putMessagePositionInfo:

// 添加位置信息,并返回添加是否成功
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) {// 如果已经重放过,直接返回成功if (offset + size <= this.maxPhysicOffset) {log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);return true;}// 写入位置信息到byteBufferthis.byteBufferIndex.flip();this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this.byteBufferIndex.putLong(offset);this.byteBufferIndex.putInt(size);this.byteBufferIndex.putLong(tagsCode);// 计算consumeQueue存储位置,并获得对应的MappedFilefinal long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);if (mappedFile != null) {// 当是ConsumeQueue第一个MappedFile && 队列位置非第一个 && MappedFile未写入内容,则填充前置空白占位if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {this.minLogicOffset = expectLogicOffset;this.mappedFileQueue.setFlushedWhere(expectLogicOffset);this.mappedFileQueue.setCommittedWhere(expectLogicOffset);// 填充前置空白占位this.fillPreBlank(mappedFile, expectLogicOffset);log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "+ mappedFile.getWrotePosition());}// 校验consumeQueue存储位置是否合法if (cqOffset != 0) {long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();if (expectLogicOffset < currentLogicOffset) {log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);return true;}if (expectLogicOffset != currentLogicOffset) {LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset,currentLogicOffset,this.topic,this.queueId,expectLogicOffset - currentLogicOffset);}}// 设置commitLog重放消息到ConsumeQueue位置this.maxPhysicOffset = offset + size;// 插入mappedFilereturn mappedFile.appendMessage(this.byteBufferIndex.array());}return false;
}

填充前置空白占位调用的方法为DefaultMessageStore#fillPreBlank:

private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {// 写入前置空白占位到byteBufferByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);byteBuffer.putLong(0L);byteBuffer.putInt(Integer.MAX_VALUE);byteBuffer.putLong(0L);int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());// 循环填空for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {mappedFile.appendMessage(byteBuffer.array());}
}

再来看CommitLogDispatcherBuildIndex#dispatch:

class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {// 建立 索引信息 到 IndexFile@Overridepublic void dispatch(DispatchRequest request) {if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {DefaultMessageStore.this.indexService.buildIndex(request);}}
}

主要功能是建立 索引信息 到 IndexFile,详细流程暂时不做分析。

2.2 ConsumeQueue持久化

ConsumeQueue保存了消息在CommitLog中的位置信息。
FlushConsumeQueueService负责ConsumeQueue的持久化工作,源码如下:

class FlushConsumeQueueService extends ServiceThread {private static final int RETRY_TIMES_OVER = 3;// 最后flush时间戳private long lastFlushTimestamp = 0;private void doFlush(int retryTimes) {int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();// retryTimes == RETRY_TIMES_OVER时,进行强制flush。主要用于shutdown时。if (retryTimes == RETRY_TIMES_OVER) {flushConsumeQueueLeastPages = 0;} 当时间满足flushConsumeQueueThoroughInterval时,即使写入的数量不足flushConsumeQueueLeastPages,也进行flushlong logicsMsgTimestamp = 0;int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();long currentTimeMillis = System.currentTimeMillis();if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {this.lastFlushTimestamp = currentTimeMillis;flushConsumeQueueLeastPages = 0;logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();}// flush消费队列ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {for (ConsumeQueue cq : maps.values()) {boolean result = false;for (int i = 0; i < retryTimes && !result; i++) {result = cq.flush(flushConsumeQueueLeastPages);}}}// flush 存储 check pointif (0 == flushConsumeQueueLeastPages) {if (logicsMsgTimestamp > 0) {DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);}DefaultMessageStore.this.getStoreCheckpoint().flush();}}public void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();this.waitForRunning(interval);this.doFlush(1);} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}this.doFlush(RETRY_TIMES_OVER);DefaultMessageStore.log.info(this.getServiceName() + " service end");}@Overridepublic String getServiceName() {return FlushConsumeQueueService.class.getSimpleName();}@Overridepublic long getJointime() {return 1000 * 60;}
}

flush操作每1000ms执行一次。

3. Broker提供的拉取接口

3.1 请求Header

拉取消息的请求Header为PullMessageRequestHeader:

public class PullMessageRequestHeader implements CommandCustomHeader {// 消费者分组@CFNotNullprivate String consumerGroup;// Topic@CFNotNullprivate String topic;// 队列编号@CFNotNullprivate Integer queueId;// 队列开始位置@CFNotNullprivate Long queueOffset;// 消息数量@CFNotNullprivate Integer maxMsgNums;// 系统标识@CFNotNullprivate Integer sysFlag;// 提交消费进度位置@CFNotNullprivate Long commitOffset;// 挂起超时时间@CFNotNullprivate Long suspendTimeoutMillis;// 订阅表达式@CFNullableprivate String subscription;// 订阅版本号@CFNotNullprivate Long subVersion;private String expressionType;@Overridepublic void checkFields() throws RemotingCommandException {}
}
  • sysFlag :系统标识。
    • 第 0 位 FLAG_COMMIT_OFFSET :标记请求提交消费进度位置,和 commitOffset 配合。
    • 第 1 位 FLAG_SUSPEND :标记请求是否挂起请求,和 suspendTimeoutMillis 配合。当拉取不到消息时, Broker 会挂起请求,直到有消息。最大挂起时间:suspendTimeoutMillis 毫秒。
    • 第 2 位 FLAG_SUBSCRIPTION :是否过滤订阅表达式,和 subscription 配置。
  • subVersion :订阅版本号。请求时,如果版本号不对,则无法拉取到消息,需要重新获取订阅信息,使用最新的订阅版本号。

3.2 拉取消息接口

Broker提供的拉取消息接口为PullMessageProcessor#processRequest(…):
源码非常长,慢慢看

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)throws RemotingCommandException {RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();final PullMessageRequestHeader requestHeader =(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);response.setOpaque(request.getOpaque());log.debug("receive PullMessage request command, {}", request);// 校验 broker 是否可读if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));return response;}// 校验 consumer分组配置 是否存在SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());if (null == subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));return response;}// 校验 consumer分组配置 是否可消费if (!subscriptionGroupConfig.isConsumeEnable()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());return response;}final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;// 校验 topic配置 存在TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (null == topicConfig) {log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));return response;}// 校验 topic配置 权限可读if (!PermName.isReadable(topicConfig.getPerm())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");return response;}// 校验 读取队列 在 topic配置 队列范围内if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());log.warn(errorInfo);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorInfo);return response;}// 校验 订阅关系SubscriptionData subscriptionData = null;ConsumerFilterData consumerFilterData = null;if (hasSubscriptionFlag) {try {subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType());if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),requestHeader.getExpressionType(), requestHeader.getSubVersion());assert consumerFilterData != null;}} catch (Exception e) {log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),requestHeader.getConsumerGroup());response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);response.setRemark("parse the consumer's subscription failed");return response;}} else {// 校验 消费分组信息 是否存在ConsumerGroupInfo consumerGroupInfo =this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());if (null == consumerGroupInfo) {log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));return response;}// 校验 消费分组信息 消息模型是否匹配if (!subscriptionGroupConfig.isConsumeBroadcastEnable()&& consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");return response;}// 校验 订阅信息 是否存在subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());if (null == subscriptionData) {log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));return response;}// 校验 订阅信息版本 是否合法if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),subscriptionData.getSubString());response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);response.setRemark("the consumer's subscription not latest");return response;}if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),requestHeader.getConsumerGroup());if (consumerFilterData == null) {response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");return response;}if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);response.setRemark("the consumer's consumer filter data not latest");return response;}}}if (!ExpressionType.isTagType(subscriptionData.getExpressionType())&& !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());return response;}MessageFilter messageFilter;if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());} else {messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());}// 获取消息final GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);if (getMessageResult != null) {response.setRemark(getMessageResult.getStatus().name());responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());responseHeader.setMinOffset(getMessageResult.getMinOffset());responseHeader.setMaxOffset(getMessageResult.getMaxOffset());// 计算建议读取brokerIdif (getMessageResult.isSuggestPullingFromSlave()) {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());} else {responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:break;case SLAVE:if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}break;}if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {// consume too slow ,redirect to another machineif (getMessageResult.isSuggestPullingFromSlave()) {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());}// consume okelse {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());}} else {responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}switch (getMessageResult.getStatus()) {case FOUND:response.setCode(ResponseCode.SUCCESS);break;case MESSAGE_WAS_REMOVING:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;case NO_MATCHED_LOGIC_QUEUE:case NO_MESSAGE_IN_QUEUE:if (0 != requestHeader.getQueueOffset()) {response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",requestHeader.getQueueOffset(),getMessageResult.getNextBeginOffset(),requestHeader.getTopic(),requestHeader.getQueueId(),requestHeader.getConsumerGroup());} else {response.setCode(ResponseCode.PULL_NOT_FOUND);}break;case NO_MATCHED_MESSAGE:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;case OFFSET_FOUND_NULL:response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_OVERFLOW_BADLY:response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());break;case OFFSET_OVERFLOW_ONE:response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_TOO_SMALL:response.setCode(ResponseCode.PULL_OFFSET_MOVED);log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),getMessageResult.getMinOffset(), channel.remoteAddress());break;default:assert false;break;}if (this.hasConsumeMessageHook()) {ConsumeMessageContext context = new ConsumeMessageContext();context.setConsumerGroup(requestHeader.getConsumerGroup());context.setTopic(requestHeader.getTopic());context.setQueueId(requestHeader.getQueueId());String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);switch (response.getCode()) {case ResponseCode.SUCCESS:int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);context.setCommercialRcvTimes(incValue);context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());context.setCommercialOwner(owner);break;case ResponseCode.PULL_NOT_FOUND:if (!brokerAllowSuspend) {context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);context.setCommercialRcvTimes(1);context.setCommercialOwner(owner);}break;case ResponseCode.PULL_RETRY_IMMEDIATELY:case ResponseCode.PULL_OFFSET_MOVED:context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);context.setCommercialRcvTimes(1);context.setCommercialOwner(owner);break;default:assert false;break;}this.executeConsumeMessageHookBefore(context);}switch (response.getCode()) {case ResponseCode.SUCCESS:this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),getMessageResult.getMessageCount());this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),getMessageResult.getBufferTotalSize());this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());// 读取消息if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {final long beginTimeMills = this.brokerController.getMessageStore().now();final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId(),(int) (this.brokerController.getMessageStore().now() - beginTimeMills));response.setBody(r);} else {try {FileRegion fileRegion =new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {getMessageResult.release();if (!future.isSuccess()) {log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());}}});} catch (Throwable e) {log.error("transfer many message by pagecache exception", e);getMessageResult.release();}response = null;}break;case ResponseCode.PULL_NOT_FOUND:// 消息未查询到 && broker允许挂起请求 && 请求允许挂起if (brokerAllowSuspend && hasSuspendFlag) {long pollingTimeMills = suspendTimeoutMillisLong;if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();}String topic = requestHeader.getTopic();long offset = requestHeader.getQueueOffset();int queueId = requestHeader.getQueueId();PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response = null;break;}case ResponseCode.PULL_RETRY_IMMEDIATELY:break;case ResponseCode.PULL_OFFSET_MOVED:if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE|| this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {MessageQueue mq = new MessageQueue();mq.setTopic(requestHeader.getTopic());mq.setQueueId(requestHeader.getQueueId());mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());OffsetMovedEvent event = new OffsetMovedEvent();event.setConsumerGroup(requestHeader.getConsumerGroup());event.setMessageQueue(mq);event.setOffsetRequest(requestHeader.getQueueOffset());event.setOffsetNew(getMessageResult.getNextBeginOffset());this.generateOffsetMovedEvent(event);log.warn("PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),responseHeader.getSuggestWhichBrokerId());} else {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),responseHeader.getSuggestWhichBrokerId());}break;default:assert false;}} else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("store getMessage return null");}// 请求要求持久化进度 && broker非主,进行持久化进度。boolean storeOffsetEnable = brokerAllowSuspend;storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;storeOffsetEnable = storeOffsetEnable&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;if (storeOffsetEnable) {this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());}return response;
}

第160行调用了MessageStore#getMessage(…)获取消息:

public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,final int maxMsgNums,final MessageFilter messageFilter) {if (this.shutdown) {log.warn("message store has shutdown, so getMessage is forbidden");return null;}if (!this.runningFlags.isReadable()) {log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());return null;}long beginTime = this.getSystemClock().now();GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;long nextBeginOffset = offset;long minOffset = 0;long maxOffset = 0;GetMessageResult getResult = new GetMessageResult();final long maxOffsetPy = this.commitLog.getMaxOffset();// 获取消费队列ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);if (consumeQueue != null) {// 消费队列 最小队列编号minOffset = consumeQueue.getMinOffsetInQueue();// 消费队列 最大队列编号maxOffset = consumeQueue.getMaxOffsetInQueue();// 判断 队列位置(offset)if (maxOffset == 0) {// 判断 队列位置(offset)status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;nextBeginOffset = nextOffsetCorrection(offset, 0);} else if (offset < minOffset) {// 查询offset 太小status = GetMessageStatus.OFFSET_TOO_SMALL;nextBeginOffset = nextOffsetCorrection(offset, minOffset);} else if (offset == maxOffset) {// 查询offset 超过 消费队列 一个位置status = GetMessageStatus.OFFSET_OVERFLOW_ONE;nextBeginOffset = nextOffsetCorrection(offset, offset);} else if (offset > maxOffset) {// 查询offset 超过 消费队列 一个位置status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;if (0 == minOffset) {nextBeginOffset = nextOffsetCorrection(offset, minOffset);} else {nextBeginOffset = nextOffsetCorrection(offset, maxOffset);}} else {// 获得 映射Buffer结果(MappedFile)SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);if (bufferConsumeQueue != null) {try {status = GetMessageStatus.NO_MATCHED_MESSAGE;long nextPhyFileStartOffset = Long.MIN_VALUE;long maxPhyOffsetPulling = 0;int i = 0;final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();int sizePy = bufferConsumeQueue.getByteBuffer().getInt();long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();maxPhyOffsetPulling = offsetPy;if (nextPhyFileStartOffset != Long.MIN_VALUE) {if (offsetPy < nextPhyFileStartOffset)continue;}boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);// 是否已经获得足够消息if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),isInDisk)) {break;}boolean extRet = false, isTagsCodeLegal = true;if (consumeQueue.isExtAddr(tagsCode)) {extRet = consumeQueue.getExt(tagsCode, cqExtUnit);if (extRet) {tagsCode = cqExtUnit.getTagsCode();} else {// can't find ext content.Client will filter messages by tag also.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",tagsCode, offsetPy, sizePy, topic, group);isTagsCodeLegal = false;}}// 判断消息是否符合条件if (messageFilter != null&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.NO_MATCHED_MESSAGE;}continue;}SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);if (null == selectResult) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.MESSAGE_WAS_REMOVING;}nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);continue;}if (messageFilter != null&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.NO_MATCHED_MESSAGE;}// release...selectResult.release();continue;}this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();getResult.addMessage(selectResult);status = GetMessageStatus.FOUND;nextPhyFileStartOffset = Long.MIN_VALUE;}if (diskFallRecorded) {long fallBehind = maxOffsetPy - maxPhyOffsetPulling;brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);}nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);long diff = maxOffsetPy - maxPhyOffsetPulling;long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));getResult.setSuggestPullingFromSlave(diff > memory);} finally {bufferConsumeQueue.release();}} else {status = GetMessageStatus.OFFSET_FOUND_NULL;nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "+ maxOffset + ", but access logic queue failed.");}}} else {status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;nextBeginOffset = nextOffsetCorrection(offset, 0);}if (GetMessageStatus.FOUND == status) {this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();} else {this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();}long eclipseTime = this.getSystemClock().now() - beginTime;this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);getResult.setStatus(status);getResult.setNextBeginOffset(nextBeginOffset);getResult.setMaxOffset(maxOffset);getResult.setMinOffset(minOffset);return getResult;
}

上面的方法进行了非常多的校验,然后根据消息分组(group) + 主题(Topic) + 队列编号(queueId) + 队列位置(offset) + 订阅信息(subscriptionData) 获取 指定条数(maxMsgNums) 消息(Message)。

其中判断消息是否符合条件调用了方法DefaultMessageFilter#isMessageMatched(…):

public class DefaultMessageFilter implements MessageFilter {private SubscriptionData subscriptionData;public DefaultMessageFilter(final SubscriptionData subscriptionData) {this.subscriptionData = subscriptionData;}@Overridepublic boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {// 消息tagsCode 空if (null == tagsCode || null == subscriptionData) {return true;}// 订阅数据 空if (subscriptionData.isClassFilterMode()) {return true;}// 订阅表达式 全匹配// 订阅数据code数组 是否包含 消息tagsCodereturn subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)|| subscriptionData.getCodeSet().contains(tagsCode.intValue());}@Overridepublic boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {return true;}
}

3.3 拉取失败处理

当拉取消息请求获取不到消息时,会将请求挂起。处理挂起消息的服务类是PullRequestHoldService:

public class PullRequestHoldService extends ServiceThread {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);private static final String TOPIC_QUEUEID_SEPARATOR = "@";private final BrokerController brokerController;private final SystemClock systemClock = new SystemClock();// 拉取消息请求集合private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =new ConcurrentHashMap<String, ManyPullRequest>(1024);public PullRequestHoldService(final BrokerController brokerController) {this.brokerController = brokerController;}// 添加拉取消息挂起请求public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {String key = this.buildKey(topic, queueId);ManyPullRequest mpr = this.pullRequestTable.get(key);if (null == mpr) {mpr = new ManyPullRequest();ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);if (prev != null) {mpr = prev;}}mpr.addPullRequest(pullRequest);}// 根据 主题 + 队列编号 创建唯一标识private String buildKey(final String topic, final int queueId) {StringBuilder sb = new StringBuilder();sb.append(topic);sb.append(TOPIC_QUEUEID_SEPARATOR);sb.append(queueId);return sb.toString();}@Overridepublic void run() {log.info("{} service started", this.getServiceName());while (!this.isStopped()) {try {// 根据 长轮训 还是 短轮训 设置不同的等待时间if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {this.waitForRunning(5 * 1000);} else {this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}long beginLockTimestamp = this.systemClock.now();// 检查挂起请求是否有需要通知的this.checkHoldRequest();long costTime = this.systemClock.now() - beginLockTimestamp;if (costTime > 5 * 1000) {log.info("[NOTIFYME] check hold request cost {} ms.", costTime);}} catch (Throwable e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info("{} service end", this.getServiceName());}@Overridepublic String getServiceName() {return PullRequestHoldService.class.getSimpleName();}//遍历挂起请求,检查是否有需要通知的请求。private void checkHoldRequest() {for (String key : this.pullRequestTable.keySet()) {String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);if (2 == kArray.length) {String topic = kArray[0];int queueId = Integer.parseInt(kArray[1]);final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);try {this.notifyMessageArriving(topic, queueId, offset);} catch (Throwable e) {log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);}}}}// 检查是否有需要通知的请求public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);}// 检查是否有需要通知的请求public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {String key = this.buildKey(topic, queueId);ManyPullRequest mpr = this.pullRequestTable.get(key);if (mpr != null) {List<PullRequest> requestList = mpr.cloneListAndClear();if (requestList != null) {// 不符合唤醒的请求数组List<PullRequest> replayList = new ArrayList<PullRequest>();for (PullRequest request : requestList) {long newestOffset = maxOffset;if (newestOffset <= request.getPullFromThisOffset()) {newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);}// 有新的匹配消息,唤醒请求,即再次拉取消息。if (newestOffset > request.getPullFromThisOffset()) {boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));// match by bit map, need eval again when properties is not null.if (match && properties != null) {match = request.getMessageFilter().isMatchedByCommitLog(null, properties);}if (match) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}}// 超过挂起时间,唤醒请求,即再次拉取消息。if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}// 不符合再次拉取的请求,再次添加回去replayList.add(request);}if (!replayList.isEmpty()) {mpr.addPullRequest(replayList);}}}}
}

可以看到,此类为拉去消息请求挂起维护线程服务:

  • 当拉取消息请求获得不了消息时,则会将请求进行挂起,添加到该服务
  • 当有符合条件信息时 或 挂起超时时,重新执行获取消息逻辑

4. Broker提供的更新消费进度接口

更新消费进度接口会再BrokerController中启动,方法为BrokerController#initialize(…):

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

此方法每5s执行一次持久化逻辑。
调用的ConsumerOffsetManager#persist方法。实际上是抽象类ConfigManager的persist方法。
ConfigManager是抽象类,ConsumerOffsetManager继承了ConfigManager:
image.png

ConfigManager类源码为:

public abstract class ConfigManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);//编码内容public abstract String encode();// 加载文件public boolean load() {String fileName = null;try {fileName = this.configFilePath();String jsonString = MixAll.file2String(fileName);// 如果内容不存在,则加载备份文件if (null == jsonString || jsonString.length() == 0) {return this.loadBak();} else {this.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}}//配置文件地址public abstract String configFilePath();//加载备份文件private boolean loadBak() {String fileName = null;try {fileName = this.configFilePath();String jsonString = MixAll.file2String(fileName + ".bak");if (jsonString != null && jsonString.length() > 0) {this.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " Failed", e);return false;}return true;}//解码内容public abstract void decode(final String jsonString);//持久化public synchronized void persist() {String jsonString = this.encode(true);if (jsonString != null) {String fileName = this.configFilePath();try {MixAll.string2File(jsonString, fileName);} catch (IOException e) {log.error("persist file " + fileName + " exception", e);}}}// 编码存储内容public abstract String encode(final boolean prettyFormat);
}

可以看到ConfigManager主要进行文件读取写入操作
ConsumerOffsetManager类源码为:

public class ConsumerOffsetManager extends ConfigManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);private static final String TOPIC_GROUP_SEPARATOR = "@";// 消费进度集合private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);private transient BrokerController brokerController;public ConsumerOffsetManager() {}public ConsumerOffsetManager(BrokerController brokerController) {this.brokerController = brokerController;}public void scanUnsubscribedTopic() {Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();while (it.hasNext()) {Entry<String, ConcurrentMap<Integer, Long>> next = it.next();String topicAtGroup = next.getKey();String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);if (arrays.length == 2) {String topic = arrays[0];String group = arrays[1];if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic)&& this.offsetBehindMuchThanData(topic, next.getValue())) {it.remove();log.warn("remove topic offset, {}", topicAtGroup);}}}}private boolean offsetBehindMuchThanData(final String topic, ConcurrentMap<Integer, Long> table) {Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();boolean result = !table.isEmpty();while (it.hasNext() && result) {Entry<Integer, Long> next = it.next();long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, next.getKey());long offsetInPersist = next.getValue();result = offsetInPersist <= minOffsetInStore;}return result;}public Set<String> whichTopicByConsumer(final String group) {Set<String> topics = new HashSet<String>();Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();while (it.hasNext()) {Entry<String, ConcurrentMap<Integer, Long>> next = it.next();String topicAtGroup = next.getKey();String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);if (arrays.length == 2) {if (group.equals(arrays[1])) {topics.add(arrays[0]);}}}return topics;}public Set<String> whichGroupByTopic(final String topic) {Set<String> groups = new HashSet<String>();Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();while (it.hasNext()) {Entry<String, ConcurrentMap<Integer, Long>> next = it.next();String topicAtGroup = next.getKey();String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);if (arrays.length == 2) {if (topic.equals(arrays[0])) {groups.add(arrays[1]);}}}return groups;}// 提交消费进度public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,final long offset) {// topic@groupString key = topic + TOPIC_GROUP_SEPARATOR + group;this.commitOffset(clientHost, key, queueId, offset);}private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null == map) {map = new ConcurrentHashMap<Integer, Long>(32);map.put(queueId, offset);this.offsetTable.put(key, map);} else {Long storeOffset = map.put(queueId, offset);if (storeOffset != null && offset < storeOffset) {log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);}}}public long queryOffset(final String group, final String topic, final int queueId) {// topic@groupString key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null != map) {Long offset = map.get(queueId);if (offset != null)return offset;}return -1;}public String encode() {return this.encode(false);}@Overridepublic String configFilePath() {return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());}// 解码内容,格式:JSON@Overridepublic void decode(String jsonString) {if (jsonString != null) {ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);if (obj != null) {this.offsetTable = obj.offsetTable;}}}public String encode(final boolean prettyFormat) {return RemotingSerializable.toJson(this, prettyFormat);}public ConcurrentMap<String, ConcurrentMap<Integer, Long>> getOffsetTable() {return offsetTable;}public void setOffsetTable(ConcurrentHashMap<String, ConcurrentMap<Integer, Long>> offsetTable) {this.offsetTable = offsetTable;}public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) {Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>();Set<String> topicGroups = this.offsetTable.keySet();if (!UtilAll.isBlank(filterGroups)) {for (String group : filterGroups.split(",")) {Iterator<String> it = topicGroups.iterator();while (it.hasNext()) {if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) {it.remove();}}}}for (Map.Entry<String, ConcurrentMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {String topicGroup = offSetEntry.getKey();String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);if (topic.equals(topicGroupArr[0])) {for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) {long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, entry.getKey());if (entry.getValue() >= minOffset) {Long offset = queueMinOffset.get(entry.getKey());if (offset == null) {queueMinOffset.put(entry.getKey(), Math.min(Long.MAX_VALUE, entry.getValue()));} else {queueMinOffset.put(entry.getKey(), Math.min(entry.getValue(), offset));}}}}}return queueMinOffset;}public Map<Integer, Long> queryOffset(final String group, final String topic) {// topic@groupString key = topic + TOPIC_GROUP_SEPARATOR + group;return this.offsetTable.get(key);}public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {ConcurrentMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);if (offsets != null) {this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets));}}}

可以看出来,实际上上面的类就是消费进度管理器。

5. Broker 提供发回消息接口

该接口是Consumer消费消息失败时调用的。Broker存储发回的消息,下次Consumer拉取该消息,能够从CommitLog和ConsumeQueue顺序读取。

大部分逻辑和 Broker 提供接收消息接口 类似,此内容在第二篇《Broker消息接收》分析过。

发回消息接口的方法为SendMessageProcessor#consumerSendMsgBack(…):

private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)throws RemotingCommandException {// 初始化响应final RemotingCommand response = RemotingCommand.createResponseCommand(null);final ConsumerSendMsgBackRequestHeader requestHeader =(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {ConsumeMessageContext context = new ConsumeMessageContext();context.setNamespace(namespace);context.setConsumerGroup(requestHeader.getGroup());context.setTopic(requestHeader.getOriginTopic());context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);context.setCommercialRcvTimes(1);context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));this.executeConsumeMessageHookAfter(context);}// 判断消费分组是否存在SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());if (null == subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));return response;}// 检查 broker 是否有写入权限if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");return response;}// 检查 重试队列数 是否大于0if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}// 计算retry TopicString newTopic = MixAll.getRetryTopic(requestHeader.getGroup());// 计算队列编号int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();// 计算sysFlagint topicSysFlag = 0;if (requestHeader.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}// 获取topicConfig。如果获取不到,则进行创建TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return response;}if (!PermName.isWriteable(topicConfig.getPerm())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));return response;}// 查询消息。若不存在,返回异常错误。MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());if (null == msgExt) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("look message by offset failed, " + requestHeader.getOffset());return response;}// 设置retryTopic到拓展属性final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (null == retryTopic) {MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());}// 设置消息不等待存储完成msgExt.setWaitStoreMsgOK(false);int delayLevel = requestHeader.getDelayLevel();int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();}if (msgExt.getReconsumeTimes() >= maxReconsumeTimes|| delayLevel < 0) {// 如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名,即加入 死信队列(Dead Letter Queue)newTopic = MixAll.getDLQTopic(requestHeader.getGroup());queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE, 0);if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return response;}} else {if (0 == delayLevel) {delayLevel = 3 + msgExt.getReconsumeTimes();}msgExt.setDelayTimeLevel(delayLevel);}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(newTopic);msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));msgInner.setQueueId(queueIdInt);msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);String originMsgId = MessageAccessor.getOriginMessageId(msgExt);MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);if (putMessageResult != null) {switch (putMessageResult.getPutMessageStatus()) {case PUT_OK:String backTopic = msgExt.getTopic();String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (correctTopic != null) {backTopic = correctTopic;}this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;default:break;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(putMessageResult.getPutMessageStatus().name());return response;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("putMessageResult is null");return response;
}

6. 小结

本篇内容源码偏长,内容偏多,有很多细节选择性忽略了。
本篇主要将的Broker对于消息拉取所提供的功能的接口:

  1. Message重放功能:Broker接收到消息,会存储到CommitLog中,持久化到文件,Message重放,就是将文件中的消息,存放在ConsumeQueue中,ConsumeQueue中存储了消息在CommitLog中的位置信息。
  2. Broker提供了拉取消息的接口:该接口为Consumer调用的,主要完成消息的拉取
  3. Broker提供更新消费进度的接口:改接口也是Consumer调用,用于更新消费进度
  4. Broker提供消息发回接口:该接口是Consumer在消费消息失败时调用,以便于消息能够再次被消费,其中有一点,消息消费超过最大次数,会放入死信队列中

以上就是消息拉取过程中Broker提供的能力。
下一篇继续分析消息拉取与消费过程中,Consumer提供的能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/625170.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

短视频IP运营流程架构SOP模板PPT

【干货资料持续更新&#xff0c;以防走丢】 短视频IP运营流程架构SOP模板PPT 部分资料预览 资料部分是网络整理&#xff0c;仅供学习参考。 抖音运营资料合集&#xff08;完整资料包含以下内容&#xff09; 目录 抖音15秒短视频剧本创作公式 在抖音这个短视频平台上&#…

SpringBoot集成RabbitMq,RabbitMq消费与生产,消费失败重发机制,发送签收确认机制

RabbitMq消费与生产&#xff0c;消费失败重发机制&#xff0c;发送确认机制&#xff0c;消息发送结果回执 1. RabbitMq集成spring bootRabbitMq集成依赖RabbitMq配置RabbitMq生产者&#xff0c;队列&#xff0c;交换通道配置&#xff0c;消费者示例 2. RabbitMq消息确认机制消息…

【例7.5】 取余运算(mod) 快速幂

1326&#xff1a;【例7.5】 取余运算&#xff08;mod&#xff09; 时间限制: 1000 ms 内存限制: 65536 KB 【题目描述】 输入b&#xff0c;p&#xff0c;k的值&#xff0c;求bpmodk 的值。其中b&#xff0c;p&#xff0c;kk为长整型数。 【输入】 输入b&#xff0c;p&#xf…

Scott用户数据表的分析

Oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 如果想要知道某个用户所有的数据表: select * from tab; 此时结果中一共返回了四张数据表&#xff0c;分别为部门表&#xff08;dept&#xff09; &#xff0c;员工表&#xff08;emp&a…

【LV12 DAY20 RTC实验】

编程实现通过LED状态显示当前电压范围&#xff0c;并打印产生低压警报时的时间 注&#xff1a; 电压在1501mv~1800mv时&#xff0c;LED2、LED3、LED4、LED5点亮 电压在1001mv~1500mv时&#xff0c;LED2、LED3、LED4点亮 电压在501mv~1000mv时&#xff0c;LED2、LED3点亮 电压在…

HTML--CSS--浮动布局及定位布局

正常文档布局 块元素独占一行 行内元素在有多个的时候&#xff0c;就是从左到右排在一行 块元素包括&#xff1a;div,p,hr 行内元素&#xff1a;span,i,img 浮动布局 float 属性&#xff1a; left 向左 right 向右 作用我目前看起来就是浮动元素的宽度是由内容决定的&#x…

HDFS和MapReduce综合实训

文章目录 第1关&#xff1a;WordCount词频统计第2关&#xff1a;HDFS文件读写第3关&#xff1a;倒排索引第4关&#xff1a; 网页排序——PageRank算法 第1关&#xff1a;WordCount词频统计 测试说明 以下是测试样例&#xff1a; 测试输入样例数据集&#xff1a;文本文档test1…

Java实战之每日海报

前言 使用java生成每日海报。 项目起因是巧合下遇到了一篇很棒的文档&#xff0c;说的是用程序来实现每日生成一个海报。如果之后加上自动发布的功能&#xff0c;简直就是太棒了啊&#xff01; 样例图如下&#xff1a; 每日海报 思路 访问某词站的API获取网络图片&#…

Java持久层框架之争:选择最佳方案来提升你的开发效率!

1、前言 在现代软件开发领域&#xff0c;选择适合的持久层框架是至关重要的一步。持久层框架可以帮助我们管理数据访问、数据库连接、事务处理等复杂的数据库操作&#xff0c;从而提升开发效率和代码质量。 然而&#xff0c;在众多的Java持久层框架中&#xff0c;选择最佳方案并…

算法通关村番外篇-LeetCode编程从0到1系列五

大家好我是苏麟 , 今天带来算法通关村番外篇-LeetCode编程从0到1系列五 . 数学 1523. 在区间范围内统计奇数数目 描述 : 给你两个非负整数 low 和 high 。请你返回 low 和 high 之间&#xff08;包括二者&#xff09;奇数的数目。 题目 : LeetCode 1523. 在区间范围内统计奇…

Spring Data JPA 踩过的坑实录

前言 游戏中台一直在使用spring 全家桶&#xff0c; 本文会左右使用Spring Data JPA的坑点记录总结 主要给大家总结介绍了关于使用Spring JPA注意事项及踩过的坑。 案例1&#xff1a; 为什么只调用了 org.springframework.data.repository.CrudRepository#findById(ID id) 却…

孤儿进程与僵尸进程以及僵尸进程的解决

孤儿进程&#xff1a; 定义&#xff1a; 父进程运行结束&#xff0c;但子进程还在运行&#xff08;未运行结束&#xff09;&#xff0c;这样的子进程就称为孤儿进程&#xff08; Orphan Process &#xff09;。 过程&#xff1a; 每当出现一个孤儿进程的时候&#xff0c;内核就…

rtklib读取原始数据是一次读取了一个文件的全部数据

一般来说&#xff0c;rtklib读取观测值文件&#xff08;o文件&#xff09;和导航文件&#xff08;n文件&#xff09;进行解算。 读取文件的时候&#xff0c;并非一次读取一个历元&#xff0c;而是将一个文件所有历元的数据都读取完毕以后&#xff0c;再进行解算。 这看起来是…

《C++大学教程》4.34阶乘

题目&#xff1a; 对一个非负整数n来说&#xff0c;它的阶乘可以写成 n! (读作“n的阶乘”)&#xff0c;其计算公式定义如下&#xff1a; n! n x (n-1) x (n-2)x......x1&#xff08;对于大于1的 n &#xff09; 和 n! 1 ( 对于等于0或者等于1的n ) 例如&#xff0c;5&…

重学Java 6 流程控制语句

我与我&#xff0c;至死不渝 ——24.1.15 模块重点&#xff1a; ①会使用Scanner和Random ②会使用switch以及知道case的穿透性 ③会使用if ④会使用for循环&#xff0c;while循环&#xff0c;嵌套循环 一、键盘录入_Scanner 1.概述&#xff1a;是Java定义好的一个类 2.作用&am…

网络安全等级保护测评规划与设计

笔者单位网络结构日益复杂&#xff0c;应用不断增多&#xff0c;使信息系统面临更多的风险。同时&#xff0c;网络攻防技术发展迅速&#xff0c;攻击的技术门槛随着自动化攻击工具的应用也在不断降低&#xff0c;勒索病毒等未知威胁也开始泛滥。基于此&#xff0c;笔者单位拟进…

一篇文章带你搞懂多线程面试相关的一些问题

目录 1.Callable接口 1.1使用Callable接口来创建线程 1.1相关面试题&#xff1a; 介绍下 Callable 是什么 2.JUC常见的类&#xff08;java.util,concurrent) 2.1ReentrantLock ReentrantLock和sychronized的区别 3.信号量 4.CountDownLatch 5.线程安全的集合类 5.1多线…

yolov7_Obb环境安装

下载obb代码之后&#xff0c;除了安装python和pytorch环境&#xff0c;由于还需要编译nms部分的c代码&#xff0c;因此还需要安装Visual Studio. 这里推荐安装Visual Studio2019版本。 然后在系统环境中配置环境变量 C:\Program Files (x86)\Microsoft Visual Studio\2019\Co…

案例127:基于微信小程序的预约挂号系统

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

文件按名称分类,批量归类到指定文件夹

我们的生活中充满了各种各样的文件&#xff1a;工作报告、家庭照片、旅行纪念品等&#xff0c;然而文件管理却是一个让人头疼的问题。你是否也曾在寻找某些文件名的重要文件&#xff0c;却因为文件混乱无章的堆放而感到烦躁不安&#xff1f;现在&#xff0c;有了我们【文件批量…