深度解析RocketMq源码-消息推送、持久化、消费全流程

1.绪论

前面的几篇文章都剖析了broker的存储文件。那么生产者发送一条消息到达broker过后是如何处理的,这条消息结果什么处理过后,消费者才能够消费这条消息。接下来,带我们将仔细剖析一下一条消息从生产者生产消息,到到达broker持久化到commitLog后,消费者来消费的具体流程。

2.生产者推送消息

生产者发送消息我们后面会详细讲解,这里我们先看一看推送消息的核心函数。其实就是根据消消息的queue找到所属的broker,然后通过netty将消息发送到对应的broker上去。

private SendResult sendKernelImpl(final Message msg, //需要发送的消息final MessageQueue mq, //推送消息到哪个queuefinal CommunicationMode communicationMode,//网络模式:同步,异步,onewayfinal SendCallback sendCallback, //推送消息后的回调函数final TopicPublishInfo topicPublishInfo, //topic的路由信息,topic和queue的对应关系final long timeout //超时时间) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();//通过ma的brokerName获取到broker地址String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());//如果broker地址为空,需要重新从NameServer拉取broker的配置信息if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {//构建broker的网络请求地址brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);//获取消息内容byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}boolean topicWithNamespace = false;if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}int sysFlag = 0;boolean msgBodyCompressed = false;//进行消息压缩if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;}final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}//是否有禁止消息发送的钩子函数,如果有,便执行钩子if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}//是否有消息发送前的钩子函数,如果有,便执行钩子if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}//构建消息发送的请求头SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());//生产者组requestHeader.setTopic(msg.getTopic());//topucrequestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); //默认的队列数量requestHeader.setQueueId(mq.getQueueId()); //发送到哪个queuerequestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis()); //消息产生的事件requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); //消息的额外属性requestHeader.setReconsumeTimes(0); //消息能够重新消费的次数,默认为0requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);requestHeader.setBname(mq.getBrokerName()); //broker的名称//如果是重试队列列明的消息if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {//设置重试的次数:RECONSUME_TIMErequestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}//设置最大的重试次数String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}//核心如果是异步发送,通过网络将消息发送给broker,异步调用sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}//通过网络将消息同步发送到brokersendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult;} catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {msg.setBody(prevBody);msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}

3. broker接收生产者的推送的消息-SendMessageProcessor

消息到达broker过后是交给谁处理呢,我们可以看到producer发送消息的code为SEND_MESSAGE,所以在broker中会交给SendMessageProcessor来处理请求。

public class RequestCode {public static final int SEND_MESSAGE = 10;
}

3.1 处理请求

public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {RemotingCommand response = null;try {//调用异步处理请求的函数response = asyncProcessRequest(ctx, request).get();} catch (InterruptedException | ExecutionException e) {log.error("process SendMessage error, request : " + request.toString(), e);}return response;}
 public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {//1.先执行异步处理请求 2.然后再执行回调函数asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getPutMessageFutureExecutor());}
  public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default://解析请求头SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}//荣国请求头获取到消息上下文,比如消息所属的broker或broker地址等mqtraceContext = buildMsgContext(ctx, requestHeader);//执行消息处理前的前置钩子this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {//真正处理消息的逻辑return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}
    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {//构造一个空的响应,并且回写该响应的opaque,方便请求方根据opaque来判断响应是否到达final RemotingCommand response = preSend(ctx, request, requestHeader);//获取请求头final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}//获取请求体final byte[] body = request.getBody();//获取到queueIdint queueIdInt = requestHeader.getQueueId();//根据消息的topic获取到topic的配置信息TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());//如果queueId小于0,随机投递到一个消息if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}//这是broker里面消息的映射MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}//设置消息体,生产时间,机器,重试时间等msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();//设置集群名MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {// There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.// It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));// Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked laterorigProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);} else {msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));}CompletableFuture<PutMessageResult> putMessageResult = null;//判断是否是事务消息里面的prepare消息String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}//如果是,边带哦呦事务消息service写入消息putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {//否者调用messageStore来存储消息putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}//根据存储结果,返回结果return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

其实在这里,我们很清晰了,如果是事务的prepare消息,便调用TransactionalMessageService这个组件写入消息,否者调用messageStore的asyncPutMessage写入消息。接下来我们来看看这两种情况发生了什么。

3.2 普通消息持久化到commitLog-DefaultMessageStore

普通消息到了这里我们应该是很清楚的,其实就是调用commitLog的asyncPutMessage方法将消息写入到commitLog中。如果我们采用的是DledgerCommitLog,会采用二阶段写入,主broker会将这条消息写入到commitLog中,然后通知follower节点写日志,如果follower写入成功,主节点会更新commited的索引,代表真正的写入成功。

不清楚的小伙伴可以看:

深度解析RocketMq源码-持久化组件(四) CommitLog

深度解析RocketMq源码-高可用存储组件(二)Dledger框架概览-CSDN博客

不懂的小伙伴可以看:

   public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {//校验存储组件的状态PutMessageStatus checkStoreStatus = this.checkStoreStatus();if (checkStoreStatus != PutMessageStatus.PUT_OK) {return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));}//校验,essahe的内容PutMessageStatus msgCheckStatus = this.checkMessage(msg);if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));}PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) {return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null));}long beginTime = this.getSystemClock().now();//调用commitLog的asyncPutMessage写入的消息中CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);//构建返回结果putResultFuture.thenAccept(result -> {long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().add(1);}});return putResultFuture;}

3.3 事务消息持久化到commitLog-TransactionalMessageServiceImpl

public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {return transactionalMessageBridge.asyncPutHalfMessage(messageInner);}
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {return store.asyncPutMessage(parseHalfMessageInner(messageInner));}

可以看出,事务消息本质上还是调用的DefaultMessageStore来将日志信息持久化到commitLog中,但是在这之前间消息转换成了half消息。其实就是重新设置了消息的topic为RMQ_SYS_TRANS_HALF_TOPIC,queueId为0,然后持久化到commitlog中。

    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msgInner.getQueueId()));//设置事务消息为TRANSACTION_NOT_TYPEmsgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));//重新设置topic为RMQ_SYS_TRANS_HALF_TOPICmsgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());//事务消息的queueId都为0msgInner.setQueueId(0);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));return msgInner;}

看到这里,可能发现消息已经完成持久化了,那什么时候才能消费消息,什么时候建立的IndexFile呢,我们接下来就来探讨一下这个问题。

4.消息重投递-ReputMessageService

broker中有一个ReputMessageService的线程,它一致会变量commitLog中的消息,并且转换成IndexFile和consumeQueue。

  //重投递就是一个线程,一致执行doReput方法@Overridepublic void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {Thread.sleep(1);this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}DefaultMessageStore.log.info(this.getServiceName() + " service end");}
        private void doReput() {//如果重投递的索引小于commitLog的最小的索引,便设置其从commitLog的第一条消息开始if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();}//一直循环,直到commitLog中存在消息for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {break;}//获取commitLog中reputFromOffset后面的所有消息SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);if (result != null) {try {this.reputFromOffset = result.getStartOffset();//遍历从commitLog中取出的消息内容for (int readSize = 0; readSize < result.getSize() && doNext; ) {//构建DispatchRequest进行消息重投递DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();if (dispatchRequest.isSuccess()) {if (size > 0) {//调用messageStore构建consumerqueue和indexFileDefaultMessageStore.this.doDispatch(dispatchRequest);//如果节点是主节点,并且消费方式采用长轮询的方式,通过messageArrivingListener通知唤醒消费者拉取消息的线程开始拉取消息if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()&& DefaultMessageStore.this.messageArrivingListener != null) {DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());notifyMessageArrive4MultiQueue(dispatchRequest);}//重投递索引往后加1this.reputFromOffset += size;readSize += size;//增加统计信息if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).add(dispatchRequest.getMsgSize());}} else if (size == 0) {this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) {if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;// If user open the dledger pattern or the broker is master node,// it will not ignore the exception and fix the reputFromOffset variableif (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",this.reputFromOffset);this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext = false;}}}

其实这个方法主要是两个作用:

遍历commitLog中的消息,然后:

1.建立consumequeue和IndexFile;

2.如果该broker为主节点,会唤醒消费者拉取消息的线程,(通知消费者消息到达)开始拉取消息。这里长轮询其实就是消费者会来broker中拉取消息,如果有就返回,如果没有变阻塞一段时间,等待消息产生。而这里其实就是通过messageArrivingListener来唤醒消费线程的。

4.2 建立consumQueue和indexFile

  public void doDispatch(DispatchRequest req) {for (CommitLogDispatcher dispatcher : this.dispatcherList) {dispatcher.dispatch(req);}}

可以看出CommitLogDispatcher有多个实现类,我们接下来看一看建立consumequeue的CommitLogDispatcherBuildConsumeQueue和建立indexFile的CommitLogDispatcherBuildIndex。

4.2.1 建立consumeQueue-CommitLogDispatcherBuildConsumeQueue

其实就是根据topic和queueId找到对应的consumemequeue,并顺序写入对应的索引数据。

    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {@Overridepublic void dispatch(DispatchRequest request) {//获取事务类型final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());switch (tranType) {//如果普通消息或者事务的commit消息会建立consumeQueue,表示消息可以被立即消费case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:DefaultMessageStore.this.putMessagePositionInfo(request);break;//如果是事务消息,不能建立consumequeuecase MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;}}}
  public void putMessagePositionInfo(DispatchRequest dispatchRequest) {//根据topic和queueId找到对应的consumequeueConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());//根据consumequeue顺序写入consumemequeue的索引数据cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));}

4.1.2 建立indexFile

前面讲过,indexFile其实就是索引数据,它根据msgKey的hash值定位到hash槽,然后得到对应index数据,里面包含commitLog的物理偏移量,由此可以定位到具体的消息位置。

  class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {@Overridepublic void dispatch(DispatchRequest request) {//如果开启了IndexFile,便构建一条IndexFile数据,并且存储到IndexFile文件中if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {DefaultMessageStore.this.indexService.buildIndex(request);}}}

4.3唤醒长轮询消费线程-NotifyMessageArrivingListener

首先获取到订阅这批消息的consumer请求,然后根据bigMap来进行判断这批消息的tag是否在cosumer的订阅范围内,如过在,便将这批消息交给PullMessageProcessor推送给consumer。

    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {//获取到对应的key topic@queueIdString key = this.buildKey(topic, queueId);//从pullRequestTable获取到请求这个topic和queueId的消费者请求ManyPullRequest mpr = this.pullRequestTable.get(key);if (mpr != null) {List<PullRequest> requestList = mpr.cloneListAndClear();if (requestList != null) {List<PullRequest> replayList = new ArrayList<PullRequest>();//遍历消费请求for (PullRequest request : requestList) {//当前这个queueID最大的consumeoffset是多好long newestOffset = maxOffset;if (newestOffset <= request.getPullFromThisOffset()) {newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);}//如果还有消息未拉取if (newestOffset > request.getPullFromThisOffset()) {//判断这批消息是包含consumer订阅的这个tagboolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));// match by bit map, need eval again when properties is not null.if (match && properties != null) {match = request.getMessageFilter().isMatchedByCommitLog(null, properties);}if (match) {try {//如果这批消息的tag是符合consumer消费的妖气,便开始让消费者拉取这批消息this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}}if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}replayList.add(request);}if (!replayList.isEmpty()) {mpr.addPullRequest(replayList);}}}}
    public void executeRequestWhenWakeup(final Channel channel,final RemotingCommand request) throws RemotingCommandException {Runnable run = new Runnable() {@Overridepublic void run() {try {//根据通过netty将这部分消息推送给消费者final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);if (response != null) {response.setOpaque(request.getOpaque());response.markResponseType();try {channel.writeAndFlush(response).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {log.error("processRequestWrapper response to {} failed",future.channel().remoteAddress(), future.cause());log.error(request.toString());log.error(response.toString());}}});} catch (Throwable e) {log.error("processRequestWrapper process request over, but response failed", e);log.error(request.toString());log.error(response.toString());}}} catch (RemotingCommandException e1) {log.error("excuteRequestWhenWakeup run", e1);}}};//构建好推送任务过后,交给线程池执行this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));}

4.4 broker接收consumer拉取消息请求-PullMessageProcessor

PullMessageProcessor主要用于响应consumer拉取消息的请求,processRequest由于代码太长,我们就介绍一下它大概干了什么。

1.检查消费者组的订阅关系。

2.根据consume的topic和queueid和consumeoffset在consumequeue中定位到consumequeue的日志数据,并且根据consumequeue中的commitLog的物理偏移量获取到具体的commitLog的日志消息。

3.如果没有找到消息,便阻塞线程1s钟,再来拉取消息。

4.如果是长轮询的方式,这里不会自动的提交offset。

5.consumer消费消息

前面讲过,consumer消费又推,拉和长轮询3种方式,而我们常用的方式就是长轮询,接下来我们来看看长轮询的方式是如何拉取消息的。

5.1 长轮询拉取消息-DefaultMQPushConsumerImpl

其步骤主要如下:

1.获取pullRequest正在处理的queue信息,其实每个pullRequest都会拉取消息到本地缓存起来,而缓存的位置就是processqueue;

2.果当前请求对应的缓存的消息数量大于了1000条或者缓存大小超过100m,便让该请求过50ms再拉取;

3如果是集群模式,从内存中读取消费偏移量,因为每个消费者消费的信息是不一样的,所以这个信息只能保存在每个消费者实例本地;

4.发送网络请求,根据messaqueue和消费的consumeoffset从broker中拉取消息;

5.拉取到消息过后根据tag过滤消息;

6.调用ConsumeMessageService的submitConsumeRequest方法,将消息封装成消费请求,放入到阻塞队列中,等待业务方消费消息;

7.如果pullInterval这个参数大于0,便会间隔一会儿,再去broker拉取消息,否则立刻再次拉取下一批次的消息。

至此,consumer便已经从broker中拉取到了消息,并且交到阻塞队列中,业务方只需要根据阻塞队列中的内容,便能拉取到消息并实现自己的业务逻辑。

public void pullMessage(final PullRequest pullRequest) {//获取pullRequest正在处理的queue信息,其实每个pullRequest都会拉取消息到本地缓存起来,而缓存的位置就是processqueuefinal ProcessQueue processQueue = pullRequest.getProcessQueue();if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return;}//更新pull消息的最新拉取消息的事件pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());try {this.makeSureStateOK();} catch (MQClientException e) {log.warn("pullMessage exception, consumer state not ok", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);return;}if (this.isPause()) {log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);return;}long cachedMessageCount = processQueue.getMsgCount().get();long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);//如果当前请求对应的缓存的消息数量大于了1000条,便让该请求过50ms再请求if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}//如果缓存大小超过了100m,也等50ms再请求if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}//如果不是顺序消息的话,超过了consumeConcurrentlyMaxSpan会进行流控if (!this.consumeOrderly) {if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);}return;}} else {//如果是顺序消息,需要判断当前processqueue已经被加锁if (processQueue.isLocked()) {if (!pullRequest.isPreviouslyLocked()) {long offset = -1L;try {//计算出当前consumer应该从哪个consumeoffset开始消费offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());} catch (Exception e) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);return;}boolean brokerBusy = offset < pullRequest.getNextOffset();log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);if (brokerBusy) {log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",pullRequest, offset);}pullRequest.setPreviouslyLocked(true);pullRequest.setNextOffset(offset);}} else {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.info("pull message later because not locked in broker, {}", pullRequest);return;}}//构建topic获取到订阅关系final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (null == subscriptionData) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.warn("find the consumer's subscription failed, {}", pullRequest);return;}final long beginTimestamp = System.currentTimeMillis();//这是需要执行的回调函数PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {//拉取到消息过后根据tag过滤消息pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);switch (pullResult.getPullStatus()) {case FOUND://更新上一次拉取消息的地方long prevRequestOffset = pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());//设置拉取消息的时间long pullRT = System.currentTimeMillis() - beginTimestamp;DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset = Long.MAX_VALUE;if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {//获取在broker的consumequeue中的offsetfirstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());//将消息放入到consumer的本地缓存processqueue中boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());//将消息提交到阻塞队列中,等待业务方消费消息DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);//如果pullInterval这个参数大于0,便会间隔一会儿,再去broker拉取消息,否则立刻再次拉取下一批次的消息if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());} else {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}}if (pullResult.getNextBeginOffset() < prevRequestOffset|| firstMsgOffset < prevRequestOffset) {log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",pullResult.getNextBeginOffset(),firstMsgOffset,prevRequestOffset);}break;case NO_NEW_MSG:case NO_MATCHED_MSG:pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;case OFFSET_ILLEGAL:log.warn("the pull request offset illegal, {} {}",pullRequest.toString(), pullResult.toString());pullRequest.setNextOffset(pullResult.getNextBeginOffset());pullRequest.getProcessQueue().setDropped(true);DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {@Overridepublic void run() {try {DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),pullRequest.getNextOffset(), false);DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());log.warn("fix the pull request offset, {}", pullRequest);} catch (Throwable e) {log.error("executeTaskLater Exception", e);}}}, 10000);break;default:break;}}}@Overridepublic void onException(Throwable e) {if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("execute the pull request exception", e);}if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL);} else {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}}};boolean commitOffsetEnable = false;long commitOffsetValue = 0L;//如果是集群模式if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {//从内存中读取消费偏移量,因为每个消费者消费的信息是不一样的,所以这个信息只能保存在每个消费者实例本地commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);if (commitOffsetValue > 0) {commitOffsetEnable = true;}}String subExpression = null;boolean classFilter = false;//获取订阅信息SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (sd != null) {if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {subExpression = sd.getSubString();}classFilter = sd.isClassFilterMode();}int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, // commitOffsettrue, // suspendsubExpression != null, // subscriptionclassFilter // class filter);//发送网络请求,根据messaqueue和消费的consumeoffset从broker中拉取消息try {this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,pullCallback);} catch (Exception e) {log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}}

5.2  从本地消息缓存中消费消息-ConsumeMessageService

ConsumeMessageService也有两种实现分别是并行消费和顺序消费。

5.2.1 从本地缓存中并行消费消息-ConsumeMessageConcurrentlyService

1.将一批消息拆分成一条一条的消息,并封装成消费请求,交给线程池执行

可以看出如果一次消息的数量小于1条(consumeMessageBatchMaxSize可以由它设置),便构建消息消费请求ConsumeRequest,并请交个线程池异步执行消费逻辑,如果消费失败,这批消息会重试。前面讲重复消费的时候说过这个参数,最好是每次从本地队列中取出一条消息,这样消费失败,只会重试本条消息,而不会导致重复消费

   public void submitConsumeRequest(final List<MessageExt> msgs,  //消息内容final ProcessQueue processQueue, //具体的消息内容便在processQueue中,里面有一个map用来存储消息,并且总的消息数量不能超过1000条,大小不能超过300mfinal MessageQueue messageQueue,//具体从broker哪个topic哪个queueId中开始消费的final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {//如果一次消息的数量小于1条(consumeMessageBatchMaxSize可以由它设置),便构建消息消费请求ConsumeRequest,并请交个线程池异步执行消费逻辑、//前面讲重复消费的时候说过这个参数,最好是每次从本地队列中取出一条消息,这样消费失败,只会重试本条消息,而不会导致重复消费ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {this.submitConsumeRequestLater(consumeRequest);}} else {//如果超过一条,便将消息进行切割成一条的大小for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}//并且提交消费请求ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}//如果消费失败,过一段时间再重试this.submitConsumeRequestLater(consumeRequest);}}}}
2.消费者开始执行消费逻辑-ConsumeMessageConcurrentlyService$ConsumeRequest

ConsumeRequest是一个线程池,他会启动线程消费提交来的消费请求,并做对应的逻辑处理,主要包括如下步骤:

1.如果拥有消费者前置钩子函数,便执行钩子函数中的before方法;

2.调用messageListener的consumeMessage方法,这个方法里面包含了我们真正消费消息的逻辑;

3.如果执行我们的消费逻辑超过了1分钟,便会返回消费超时;

4.这里会执行后续逻辑,主要是返回消费进度consumeoffset;

 public void run() {if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}//我们写代码需要实现messageListener来实现自己的业务逻辑MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext = null;//如果拥有执行消费者前置钩子函数,便执行钩子函数if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}//我们代码真正的消费逻辑在这里执行status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue), e);hasException = true;}//获取到消费时间long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}//如果消费时间大于1分钟,便单反回消费超时} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}//执行消费者的后置钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);consumeMessageContext.setAccessChannel(defaultMQPushConsumer.getAccessChannel());ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);if (!processQueue.isDropped()) {//这里会执行后续逻辑,主要是返回消费进度consumeoffsetConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}
3.维护本地消费进度

这一步主要是判断有多少消息消费成功,并且在本地维护消费进度。

public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest) {int ackIndex = context.getAckIndex();if (consumeRequest.getMsgs().isEmpty())return;//这里是consumer对消费者消费成功和失败批次消息的统计switch (status) {case CONSUME_SUCCESS://如果消费成功,初始时,将已经消费的index设置为0if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}//将消费成功的批次加1int ok = ackIndex + 1;int failed = consumeRequest.getMsgs().size() - ok;this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;case RECONSUME_LATER:ackIndex = -1;this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size());break;default:break;}//这里是返回消费成功的消息switch (this.defaultMQPushConsumer.getMessageModel()) {//如果是广播模式,打印消息,case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;//如果是集群模式,统计消费成功和失败的消息,再次重试case CLUSTERING:List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);boolean result = this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}//获取到消费成功的offsetlong offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {//维护存储在本地的consumeoffsetthis.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}}

5.2.1 从本地缓存中顺序消费消息-ConsumeMessageOrderlyService

1.提交消费请求
    public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispathToConsume) {if (dispathToConsume) {//直接构建消费,并且存储到消费队列中,让线程池消费ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);this.consumeExecutor.submit(consumeRequest);}}
2.消费者执行消费逻辑-ConsumeMessageOrderlyService$ConsumeRequest


其实这一逻辑和并发消息的逻辑一模一样,只是会加锁,防止多线程场景下,并发消费,打乱消息的顺序。

  public void run() {if (this.processQueue.isDropped()) {log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}//顺序消息为了防止并发导致的消费顺序问题,所以会加锁final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);synchronized (objLock) {//如果是广播模式 或者加锁成功 或者锁没有过期 if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {final long beginTime = System.currentTimeMillis();for (boolean continueConsume = true; continueConsume; ) {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);break;}//如果是集群模式 && 到这里锁已经过期,便稍后再试if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& !this.processQueue.isLocked()) {log.warn("the message queue not locked, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& this.processQueue.isLockExpired()) {log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}//如果抢锁时间超过了1分钟,便提交消费请求到消费队列中,稍后再试long interval = System.currentTimeMillis() - beginTime;if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);break;}//舒心消费也是一次只能消费一条消息final int consumeBatchSize =ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();//顺序消费会从消息本地缓存中取出一条消息,注意这个动作也是加锁的List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());if (!msgs.isEmpty()) {final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);ConsumeOrderlyStatus status = null;ConsumeMessageContext consumeMessageContext = null;//执行消费前置钩子函数if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);// init the consume context typeconsumeMessageContext.setProps(new HashMap<String, String>());ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;boolean hasException = false;try {this.processQueue.getConsumeLock().lock();if (this.processQueue.isDropped()) {log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue);break;}//调用我们的业务逻辑代码status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue), e);hasException = true;} finally {this.processQueue.getConsumeLock().unlock();}if (null == status|| ConsumeOrderlyStatus.ROLLBACK == status|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue);}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}//如果消费超过60分钟,便返回消费超时} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeOrderlyStatus.SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);consumeMessageContext.setAccessChannel(defaultMQPushConsumer.getAccessChannel());ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageOrderlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);//执行消费完成的逻辑,比如维护消费者的消费进度continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);} else {continueConsume = false;}}} else {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);}}}}
3.维护消费进度

这一逻辑和并发消费一摸一样,会更新本地的消费进度。

6.总结

至此,消费者从产生到存储,再到消费的怎个逻辑我们应该是很清晰了。

在生产端,生产者根据负载均衡策略(比如轮询或者一致性hash等)选择对应的messagequeue所在的broker中,然后通过Netty,将消费发送broker去;

broker收到消息过后,会直接将消息持久化到commitLog中,然后再单独的启动一个线程,根据持久化的commitLog建立IndexFile和consumequeue,并且会唤醒因为长轮询阻塞的消费线程,通过Netty将消发送到consume中;

消费者收到消息过后,会先存储到本地的一个Map中,提交一个消费请求到消费线程池中,消费线程会每次从Map中取出一条消息,调用我们重写的consumeMessage方法,进行消费,消费完成过后,维护好消费进度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/37973.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在线字节大端序小端序转换器

具体请前往&#xff1a;在线字节大端序小端序转换器

操作系统期末复习真题四

一、前言&#x1f680;&#x1f680;&#x1f680; 小郑在刷题的过程中帮大家整理了一些常见的考试题目&#xff0c;以及易于遗忘的知识点&#xff0c;希望对大家有所帮助。 二、正文☀️☀️☀️ 1.OS的不确定性是指(ABC)。 A.程序的运行次序不确定 B.程序多次运行的时间不…

独立开发者系列(13)——示例理解面向对象与过程

专业术语晦涩难懂&#xff0c;特别是当你没有写过稍微大点的系统的时候&#xff0c;你要理解这里面的区别很难。 从最简单的早期我们学习开始&#xff0c;我们除了练习hello world掌握了入门函数之后&#xff0c;基本都再练习算法。比如水仙花数的获取&#xff0c;冒泡排序&…

Redis的使用和原理

目录 1.初识Redis 1.1 Redis是什么&#xff1f; 1.2 Redis的特性 1.2.1 速度快 1.2.2 基于键值对的数据结构服务器 1.2.3 丰富的功能 1.2.4 简单稳定 1.2.5 持久化 1.2.6 主从复制 1.2.7 高可用和分布式 1.3 Redis的使用场景 1.3.1 缓存 1.3.2 排行榜系统 1.3.3 计数器应用 1.3…

【操作系统期末速成】 EP04 | 学习笔记(基于五道口一只鸭)

文章目录 一、前言&#x1f680;&#x1f680;&#x1f680;二、正文&#xff1a;☀️☀️☀️2.1 考点七&#xff1a;进程通信2.2 考点八&#xff1a;线程的概念2.3 考点九&#xff1a;处理机调度的概念及原则2.4 考点十&#xff1a;调度方式与调度算法 一、前言&#x1f680;…

排序(冒泡排序、选择排序、插入排序、希尔排序)-->深度剖析(一)

欢迎来到我的Blog&#xff0c;点击关注哦&#x1f495; 前言 排序是一种基本的数据处理操作&#xff0c;它涉及将一系列项目重新排列&#xff0c;以便按照指定的标准&#xff08;通常是数值大小&#xff09;进行排序。在C语言中&#xff0c;排序算法是用来对元素进行排序的一系…

FPGA 690T NVME高速存储设计

高速存储设计会有各种需求的考虑&#xff0c;那么对应的方案也不完全相同&#xff0c;这篇文章出一期纯FPGA实现的高速存储方案。用纯fpga实现高速存储板卡有易国产化&#xff0c;功耗低和体积小等特点&#xff0c;缺点就是灵活性不是很强&#xff0c;实现标准ext4和nfs文件系统…

数据仓库建模基础理论-01-为什么需要数据建模?

一、什么是数据模型&#xff1f; 数据模型是数据库的基础结构&#xff0c;用于描述和组织数据的方式。 它不仅是数据库的底层结构&#xff0c;还是一个概念性工具&#xff0c;帮助理解数据的含义和关系。 数据模型包括数据本身、数据之间的关系、数据的语义&#xff08;含义和…

C++ | Leetcode C++题解之第206题反转链表

题目&#xff1a; 题解&#xff1a; class Solution { public:ListNode* reverseList(ListNode* head) {if (!head || !head->next) {return head;}ListNode* newHead reverseList(head->next);head->next->next head;head->next nullptr;return newHead;} …

一秒记单词:音通义通,一秒牢记

一秒记单词&#xff0c;从小学到高中&#xff0c;一秒牢记 一、小学生记单词&#xff0c;快速突破 1.1 好的开始&#xff0c;是成功的一半 sun n.太阳 【通】尚 moon n.月亮 【通】母恩 mother n.母亲&#xff0c;妈 【通】妈汁 sea n.海&#xff0c;大海 【通】细 sand …

【MySQL基础篇】SQL指令:DQL及DCL

1、DQL DQL - 介绍 DQL英文全称是Data Query Language(数据查询语言)&#xff0c;数据查询语言&#xff0c;用来查询数据表中的记录。&#xff08;在MySQL中应用是最为广泛的&#xff09; 查询关键字&#xff1a;SELECT DQL - 语法 SELECT 字段列表 FROM 表名列表 WHER…

【人工智能学习之图像操作(六)】

【人工智能学习之图像操作&#xff08;六&#xff09;】 Hough变换直线检测圆检测 图像分割 Hough变换 在图像处理中&#xff0c;霍夫变换用来检测任意能够用数学公式表达的形状&#xff0c;即使这个形状被破坏或者有点扭曲 直线检测 import cv2 import numpy as np image …

利用微信开放标签<wx-open-launch-weapp>在H5中跳转微信小程序报错完美的解决方案

一、报错&#xff1a; [WXTAG] [JSCORE] The slot <template> or <script type"text/wxtag-template"> of <wx-open-launch-weapp> is missing 二、源码 官方源代码如下&#xff0c;<script type"text/wxtag-template"></sc…

美团外卖搜索基于Elasticsearch的优化实践--图文解析

美团外卖搜索基于Elasticsearch的优化实践–图文解析 前言 美团在外卖搜索业务场景中大规模地使用了 Elasticsearch 作为底层检索引擎&#xff0c;随着业务量越来越大&#xff0c;检索速度变慢了&#xff0c;CPU快累趴了&#xff0c;所以要进行优化。经过检测&#xff0c;发现…

[SAP ABAP] 数据字典

ABAP数据字典是定义和管理数据库对象的工具 系统的所有全局数据类型以及数据库表结构等都需要在数据字典中创建和维护(数据字典中的对象对所有ABAP程序都是全局的) 通过数据字典&#xff0c;我们可以把数据库对象管理好&#xff0c;后续才能顺利的进行功能开发&#xff0c;SA…

集合,Collection接口

可动态保存任意多个对象&#xff0c;使用比较方便 提供了一系列方便操作对象的方法&#xff1a;add&#xff0c;remove&#xff0c;set&#xff0c;get等 使用集合添加删除新元素&#xff0c;代码简洁明了 单列集合 多列集合 Collection接口 常用方法 List list new Arra…

多媒体基础

笔者按&#xff1a; 昨日复习的信息网络安全约莫是挂了&#xff0c;常言道&#xff1a;知耻而后勇。诚如斯言 于是决心多媒体是不能再挂了&#xff0c;不然直接变成xxx之流&#xff0c;自增笑耳 语雀链接&#xff1a;多媒体基础 一.多媒体计算机概述 媒体&#xff1a;承载信息…

动手学深度学习(Pytorch版)代码实践 -卷积神经网络-21多输入多输出通道

21多输入多输出通道 import torch from d2l import torch as d2ldef corr2d(X, K):"""计算二维互相关运算"""h, w K.shapeY torch.zeros((X.shape[0] - h 1, X.shape[1] - w 1))for i in range(Y.shape[0]):for j in range(Y.shape[1]):Y[i,…

go语言DAY7 字典Map 指针 结构体 函数

Go中Map底层原理剖析_go map底层实现-CSDN博客 目录 Map 键值对key,value 注意&#xff1a; map唯一确定的key值通过哈希运算得出哈希值 一、 map的声明及初始化&#xff1a; 二、 map的增删改查操作&#xff1a; 三、 map的赋值操作与切片对比&#xff1a; 四、 通用所有…

[leetcode hot 150]第一百二十二题,买卖股票的最佳时机Ⅱ

题目&#xff1a; 给你一个整数数组 prices &#xff0c;其中 prices[i] 表示某支股票第 i 天的价格。 在每一天&#xff0c;你可以决定是否购买和/或出售股票。你在任何时候 最多 只能持有 一股 股票。你也可以先购买&#xff0c;然后在 同一天 出售。 返回 你能获得的 最大…