rocketmq 初探(五)

大家好,我是烤鸭:

    上一篇简单介绍部分 NettyRequestProcessor (AdminBrokerProcessor、ClientManageProcessor、ConsumerManageProcessor、EndTransactionProcessor),这一篇介绍其他的。

PullMessageProcessor、QueryMessageProcessor、ReplyMessageProcessor、SendMessageProcessor

NettyRequestProcessor

PullMessageProcessor,一个将近300行的代码,差点没给我送走了,把一些无关的判断直接…了,为了省篇幅。

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)throws RemotingCommandException {RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();final PullMessageRequestHeader requestHeader =(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);response.setOpaque(request.getOpaque());log.debug("receive PullMessage request command, {}", request);// 当前节点是否有读权限if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));return response;}// 获取订阅关系进行判断SubscriptionGroupConfig subscriptionGroupConfig =        this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());// ... 是否为空,(里边的customer)是否可以消费// ...// 获取topic 配置进行判断TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());// ... 是否为空,是否读权限,请求的queueId<0 或者 <topic的readQueueNumsSubscriptionData subscriptionData = null;ConsumerFilterData consumerFilterData = null;// 有订阅关系,解析订阅信息(获取消费标记)if (hasSubscriptionFlag) {try {subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType());// expressionType 不是 tag类型的,根据request创建了一个consumerFilterData(里边有topic、customerGroup)if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),requestHeader.getExpressionType(), requestHeader.getSubVersion());assert consumerFilterData != null;}} catch (Exception e) {// ...}} else {// 获取 consumer 组信息ConsumerGroupInfo consumerGroupInfo =           this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());// ... 判断非空,判断订阅组的配置是非广播且消息类型是广播消息,返回错误// 根据topic获取订阅信息subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());// ... 判断非空,判断时间戳// ... expressionType 不是 tag类型的,从 filter获取 consumerFilterData(里边有topic、customerGroup)// ... expressionType 不是 tag类型的,broker配置也不支持属性过滤,返回错误if (!ExpressionType.isTagType(subscriptionData.getExpressionType())&& !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());return response;}MessageFilter messageFilter;// 根据是否支持filter的重试,构建不同的 messageFilterif (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());} else {messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());}// 从commitlog中获取message,为空的话,response设置error msgfinal GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);if (getMessageResult != null) {response.setRemark(getMessageResult.getStatus().name());responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());responseHeader.setMinOffset(getMessageResult.getMinOffset());responseHeader.setMaxOffset(getMessageResult.getMaxOffset());// 消费过慢,可以从 salve节点拉取消息(配置的从节点brokerId)if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());} else {responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:break;case SLAVE:// 从节点要有读权限if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}break;}if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {// consume too slow ,redirect to another machineif (getMessageResult.isSuggestPullingFromSlave()) {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());}// consume okelse {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());}} else {responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}switch (getMessageResult.getStatus()) {case FOUND:response.setCode(ResponseCode.SUCCESS);break;// ... // 节点的queue中没有message,queue的offset是否为0case NO_MESSAGE_IN_QUEUE:if (0 != requestHeader.getQueueOffset()) {response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify me} else {response.setCode(ResponseCode.PULL_NOT_FOUND);}break;default:assert false;break;}// 是否有消费消息的钩子if (this.hasConsumeMessageHook()) {ConsumeMessageContext context = new ConsumeMessageContext();context.setConsumerGroup(requestHeader.getConsumerGroup());context.setTopic(requestHeader.getTopic());context.setQueueId(requestHeader.getQueueId());String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);switch (response.getCode()) {// ... 不同code 构建不同的 context}// 回调钩子this.executeConsumeMessageHookBefore(context);}switch (response.getCode()) {case ResponseCode.SUCCESS:// group.getNums + 1(消费数量)this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),getMessageResult.getMessageCount());// group.getSize + 1(buffer总和)
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),getMessageResult.getBufferTotalSize());// broker..getNums + 1(消费数量)
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());// 默认是消息从内存中获取if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {final long beginTimeMills = this.brokerController.getMessageStore().now();// 从byteBuffer中获取,记录时间final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());// getLatency + 1(延迟消息数量) this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId(),(int) (this.brokerController.getMessageStore().now() - beginTimeMills));response.setBody(r);} else {// 不记录刷盘时间,netty response 写回try {FileRegion fileRegion =new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {getMessageResult.release();if (!future.isSuccess()) {log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());}}});} catch (Throwable e) {log.error("transfer many message by pagecache exception", e);getMessageResult.release();}response = null;}break;case ResponseCode.PULL_NOT_FOUND:// 允许延迟,超时延迟拉取if (brokerAllowSuspend && hasSuspendFlag) {long pollingTimeMills = suspendTimeoutMillisLong;if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();}String topic = requestHeader.getTopic();long offset = requestHeader.getQueueOffset();int queueId = requestHeader.getQueueId();PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response = null;break;}// 立即重试case ResponseCode.PULL_RETRY_IMMEDIATELY:break;case ResponseCode.PULL_OFFSET_MOVED:// 主节点或者从节点开启了offset检测if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE|| this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {// 记录 warn 日志MessageQueue mq = new MessageQueue();mq.setTopic(requestHeader.getTopic());mq.setQueueId(requestHeader.getQueueId());mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());OffsetMovedEvent event = new OffsetMovedEvent();event.setConsumerGroup(requestHeader.getConsumerGroup());event.setMessageQueue(mq);event.setOffsetRequest(requestHeader.getQueueOffset());event.setOffsetNew(getMessageResult.getNextBeginOffset());this.generateOffsetMovedEvent(event);log.warn("PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),responseHeader.getSuggestWhichBrokerId());} else {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),responseHeader.getSuggestWhichBrokerId());}break;default:assert false;}} else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("store getMessage return null");}boolean storeOffsetEnable = brokerAllowSuspend;storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;storeOffsetEnable = storeOffsetEnable&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;// 更新offsetTable(Map:key(group@topic),value(queueId,offset))if (storeOffsetEnable) {        this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());}return response;
}

QueryMessageProcessor

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {// 后台接口switch (request.getCode()) {// 条件查询msg(topic、key、maxNum、时间戳)case RequestCode.QUERY_MESSAGE:return this.queryMessage(ctx, request);// 根据offset获取commitlogcase RequestCode.VIEW_MESSAGE_BY_ID:return this.viewMessageById(ctx, request);default:break;}return null;
}

ReplyMessageProcessor

    @Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {SendMessageContext mqtraceContext = null;SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return null;}// 构建 traceContextmqtraceContext = buildMsgContext(ctx, requestHeader);// 执行hook之前,完善 sendMessageContext 对象this.executeSendMessageHookBefore(ctx, request, mqtraceContext);// belowRemotingCommand response = this.processReplyMessageRequest(ctx, request, mqtraceContext, requestHeader);// do nothingthis.executeSendMessageHookAfter(response, mqtraceContext);return response;}
private RemotingCommand processReplyMessageRequest(final ChannelHandlerContext ctx,final RemotingCommand request,final SendMessageContext sendMessageContext,final SendMessageRequestHeader requestHeader) {final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();response.setOpaque(request.getOpaque());response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));log.debug("receive SendReplyMessage request command, {}", request);final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();if (this.brokerController.getMessageStore().now() < startTimstamp) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));return response;}response.setCode(-1);// 校验topicConfig(如果是重试消息,topicConfig为空则创建)和queueId(大于topicConfig中的读写数量,返回错误)super.msgCheck(ctx, requestHeader, response);if (response.getCode() != -1) {return response;}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();}// 构建channel msgMessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));msgInner.setPropertiesString(requestHeader.getProperties());msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());// 完善header,同步到broker的clientPushReplyResult pushReplyResult = this.pushReplyMessage(ctx, requestHeader, msgInner);this.handlePushReplyResult(pushReplyResult, response, responseHeader, queueIdInt);// 保存返回值if (this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) {PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);this.handlePutMessageResult(putMessageResult, request, msgInner, responseHeader, sendMessageContext, queueIdInt);}return response;
}

SendMessageProcessor

    public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {// 消费者的回复消息case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {// 和batch相比,多个同步和异步刷盘还有事务的判断return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}

asyncConsumerSendMsgBack

private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final ConsumerSendMsgBackRequestHeader requestHeader =(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());// 回调钩子之后处理,完善context对象if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {ConsumeMessageContext context = buildConsumeMessageContext(namespace, requestHeader, request);this.executeConsumeMessageHookAfter(context);}SubscriptionGroupConfig subscriptionGroupConfig =// 订阅组关系和一些校验(非空、写权限) this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());if (null == subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));return CompletableFuture.completedFuture(response);}if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");return CompletableFuture.completedFuture(response);}// 无需重试,返回成功if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return CompletableFuture.completedFuture(response);}// 需要重试,构建新topicString newTopic = MixAll.getRetryTopic(requestHeader.getGroup());int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();int topicSysFlag = 0;if (requestHeader.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}// 创建topicConfigTopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);// 校验(非空、写权限)if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return CompletableFuture.completedFuture(response);}if (!PermName.isWriteable(topicConfig.getPerm())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));return CompletableFuture.completedFuture(response);}// 根据offset获取msgMessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());if (null == msgExt) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("look message by offset failed, " + requestHeader.getOffset());return CompletableFuture.completedFuture(response);}// retryTopic 为空的话,给默认的final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (null == retryTopic) {MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());}msgExt.setWaitStoreMsgOK(false);// 延迟等级int delayLevel = requestHeader.getDelayLevel();// 客户端版本<3.4.9,最大重试次数取request的int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {Integer times = requestHeader.getMaxReconsumeTimes();if (times != null) {maxReconsumeTimes = times;}}// 重试次数 > 最大次数 或 非延时if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) {// 进入前缀为死信的topicnewTopic = MixAll.getDLQTopic(requestHeader.getGroup());queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE, 0);if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return CompletableFuture.completedFuture(response);}} else {// 延时为0的时候,延迟级别=次数+3(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h )if (0 == delayLevel) {delayLevel = 3 + msgExt.getReconsumeTimes();}msgExt.setDelayTimeLevel(delayLevel);}// 构建store msgMessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(newTopic);msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));msgInner.setQueueId(queueIdInt);msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);String originMsgId = MessageAccessor.getOriginMessageId(msgExt);MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));// 存入commitlog,并更新最后操作时间CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);return putMessageResult.thenApply((r) -> {if (r != null) {switch (r.getPutMessageStatus()) {case PUT_OK:String backTopic = msgExt.getTopic();String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (correctTopic != null) {backTopic = correctTopic;}// 消费次数 + 1this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;default:break;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(r.getPutMessageStatus().name());return response;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("putMessageResult is null");return response;});
}

asyncSendBatchMessage

private CompletableFuture<RemotingCommand> asyncSendBatchMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}int queueIdInt = requestHeader.getQueueId();// 获取topic配置TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {response.setCode(ResponseCode.MESSAGE_ILLEGAL);response.setRemark("message topic length too long " + requestHeader.getTopic().length());return CompletableFuture.completedFuture(response);}// 构建批量messageMessageExtBatch messageExtBatch = new MessageExtBatch();messageExtBatch.setTopic(requestHeader.getTopic());messageExtBatch.setQueueId(queueIdInt);int sysFlag = requestHeader.getSysFlag();if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;}messageExtBatch.setSysFlag(sysFlag);messageExtBatch.setFlag(requestHeader.getFlag());MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));messageExtBatch.setBody(request.getBody());messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());messageExtBatch.setBornHost(ctx.channel().remoteAddress());messageExtBatch.setStoreHost(this.getStoreHost());messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName);// 刷盘CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch);// 处理刷盘结果return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt);
}

handlePutMessageResultFuture —> handlePutMessageResult

private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,RemotingCommand request, MessageExt msg,SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,int queueIdInt) {if (putMessageResult == null) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("store putMessage return null");return response;}boolean sendOK = false;switch (putMessageResult.getPutMessageStatus()) {// Successcase PUT_OK:sendOK = true;response.setCode(ResponseCode.SUCCESS);break;// ...// Failedcase CREATE_MAPEDFILE_FAILED:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("create mapped file failed, server is busy or broken.");break;// ...    default:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UNKNOWN_ERROR DEFAULT");break;}String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);// 发送成功if (sendOK) {// 计数,topic刷盘数量this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);// 计数,topic刷盘数据大小this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),putMessageResult.getAppendMessageResult().getWroteBytes());// 计数,broker刷盘数量this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());response.setRemark(null);responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());responseHeader.setQueueId(queueIdInt);responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());// response写回doResponse(ctx, request, response);// 有钩子的话,构建 sendMessageContext 对象if (hasSendMessageHook()) {sendMessageContext.setMsgId(responseHeader.getMsgId());sendMessageContext.setQueueId(responseHeader.getQueueId());sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);sendMessageContext.setCommercialSendTimes(incValue);sendMessageContext.setCommercialSendSize(wroteSize);sendMessageContext.setCommercialOwner(owner);}return null;} else {// 发送失败,有钩子的话,构建 sendMessageContext 对象,状态是发送失败if (hasSendMessageHook()) {int wroteSize = request.getBody().length;int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);sendMessageContext.setCommercialSendTimes(incValue);sendMessageContext.setCommercialSendSize(wroteSize);sendMessageContext.setCommercialOwner(owner);}}return response;
}

小结

PullMessageProcessor

先看下方法的流转,broker接收到消息会触发监听,同时通知 PullMessageProcessor.processRequest() 对请求进行处理

DefaultMessageStore#ReputMessageService.doReput() —> NotifyMessageArrivingListener.arriving() —>

pullRequestHoldService.notifyMessageArriving() —> PullMessageProcessor.executeRequestWhenWakeup()

获取订阅关系和topic配置,必要的判断。

获取消费标记,产生 subscriptionData 和 consumerFilterData。做一些版本和tag类型的判断。

根据messageFilter的配置,初始化消息过滤。

消费过慢,可以从 salve节点拉取消息(配置的从节点brokerId)。从节点要有权限。

是否有消费消息的钩子,回调钩子。

拉取成功:更新组的消息数量和buffer的大小。默认从内存中获取消息,记录时间并netty返回。

拉取失败:允许延迟的话,调用延迟拉取。

offset移动:记录warn日志。

更新offsetTable(Map:key(group@topic),value(queueId,offset))。

QueryMessageProcessor

后台接口,条件查询msg(topic、key、maxNum、时间戳)。

后台接口,根据offset获取commitlog。

ReplyMessageProcessor

DefaultMQProducerImpl.sendMessage(SEND_REPLY_MESSAGE/SEND_REPLY_MESSAGE_V2) —>

ReplyMessageProcessor.processRequest()

构建 traceContext

执行hook之前,完善 sendMessageContext 对象

校验topicConfig(如果是重试消息,topicConfig为空则创建)和queueId(大于topicConfig中的读写数量,返回错误)

建channel msg

完善header,同步到broker的client

保存返回值

SendMessageProcessor

DefaultMQProducerImpl.sendMessage(SEND_MESSAGE/SEND_MESSAGE_V2/SEND_BATCH_MESSAGE) / DefaultMQPullConsumerImpl/DefaultMQPushConsumerImpl.consumerSendMessageBack ——>

SendMessageProcessor.processRequest ()

消费的回复消息:

做校验和判断,定义重试topic

重试次数 > 最大次数 或 非延时,创建死信的topic

延时为0的时候,延迟级别=次数+3

构建store msg 存入 commitlog,更新消费成功数量 +1

正常异步发送消息单条/批量:

单条:同步/异步刷盘、事务的话构建半消息

批量:刷盘,计数,topic刷盘数量/数据大小,broker刷盘数量。构建回调钩子的sendMessageContext,并标记状态。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/412509.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python 装饰器初探

Python 装饰器初探 在谈及Python的时候&#xff0c;装饰器一直就是道绕不过去的坎。面试的时候&#xff0c;也经常会被问及装饰器的相关知识。总感觉自己的理解很浅显&#xff0c;不够深刻。是时候做出改变&#xff0c;对Python的装饰器做个全面的了解了。 1. 函数装饰器 直接上…

[css] 解释下 CSS sprites的原理和优缺点分别是什么

[css] 解释下 CSS sprites的原理和优缺点分别是什么 我来说下我的观点 原理&#xff1a; 多张图合并成一张图优点&解决的问题hover效果&#xff0c;如果是多个图片&#xff0c;网络正常的情况下首次会闪烁一下。如果是断网情况下&#xff0c;就没图片了。sprites 就很好的…

《自律100天,穿越人生盲点》读书笔记

大家好&#xff0c;我是烤鸭&#xff1a; 《自律100天&#xff0c;穿越人生盲点》&#xff0c;读书笔记。 第一章 “自律100天”的华丽开启 第一节 “自律100天”的底层逻辑 习惯没办法用金钱换&#xff0c;只能用时间。 训练延迟满足(增强自控、培养耐心、减少短期诱惑…

递推数列

题目描述 给定a0,a1,以及anpa(n-1) qa(n-2)中的p,q。这里n > 2。 求第k个数对10000的模。 输入描述: 输入包括5个整数&#xff1a;a0、a1、p、q、k。 输出描述: 第k个数a(k)对10000的模。 分析 循环求出ak即可 #include <iostream>using namespace std;int main(){in…

[css] 请描述margin边界叠加是什么及解决方案

[css] 请描述margin边界叠加是什么及解决方案 1&#xff0c;使用padding代替&#xff0c;但是父盒子要减去相应的高度 2&#xff0c;使用boder&#xff08;透明&#xff09;代替&#xff08;不推荐&#xff0c;不符合书写规范&#xff0c;如果父盒子子盒子时有颜色的不好处理&…

从线上慢sql看explain关键字

大家好&#xff0c;我是烤鸭&#xff1a; 最近有点忙的头晕&#xff0c;又懒又累&#xff0c;正好线上遇到慢sql的问题&#xff0c;就说下 MySQL Explain 关键字的解析和使用示例。 explain 关键字说明 使用explain关键字可以模拟优化器执行sql查询语句&#xff0c;从而得…

[css] style标签写在body前和body后的区别是什么?

[css] style标签写在body前和body后的区别是什么&#xff1f; 渲染机制的区别&#xff0c;在body前是已经把样式浏览一遍&#xff0c;到了对应标签直接&#xff0c;渲染样式。显示块。 在body后&#xff0c;是浏览器已经把标签浏览了&#xff0c;但基于没有样式&#xff0c;显…

自然语言处理的一些链接

Word2Vec Tutorial - The Skip-Gram ModelVisualizing A Neural Machine Translation Model (Mechanics of Seq2seq Models With Attention) 转载于:https://www.cnblogs.com/linyihai/p/10200351.html

《Java并发编程实践-第一部分》-读书笔记

大家好&#xff0c;我是烤鸭&#xff1a; 《Java并发编程实战-第一部分》-读书笔记。 第一章&#xff1a;介绍 1.1 并发历史&#xff1a; 多个程序在各自的进程中执行&#xff0c;由系统分配资源&#xff0c;如&#xff1a;内存、文件句柄、安全证书。进程间通信方式&#x…

[css] 说说你对css盒子模型的理解

[css] 说说你对css盒子模型的理解 css盒模型由两个盒子组成&#xff0c;外在的控制是否换行的盒子&#xff0c;以及内在的控制元素内容的盒子。比如&#xff1a;display: inline-block, 则它的外在的盒子就是inline也就是不占据一行&#xff0c;而block则表示内部的元素具有块状…

go语言基础之格式化输出

1、fmt包的格式化输出输入 格式说明 格式 含义 %% 一个%字面量 %b 一个二进制整数值(基数为2)&#xff0c;或者是一个(高级的)用科学计数法表示的指数为2的浮点数 %c 字符型。可以把输入的数字按照ASCII码相应转换为对应的字符 %d 一个十进制数值(基数为10) %e 以科…

2021 年终总结

2021 年终总结 大家好&#xff0c;我是烤鸭&#xff0c;这是一篇无关技术的记录&#xff0c;总结一下这一年干了什么。 年初的目标 减肥 炒股回本 PMP拿证 多赚钱 多学习新技术 坚持写博客(一周一篇) 多看书 没达成的目标 减肥这个事&#xff0c;拖了好久&#xff0c…

[css] ::before和:after中单冒号和双冒号的区别是什么,这两个伪元素有什么作用?

[css] ::before和:after中单冒号和双冒号的区别是什么&#xff0c;这两个伪元素有什么作用&#xff1f; 区别&#xff1a;伪元素在css1中已经存在当时用单冒号&#xff0c;css3时做了修订用双冒号 ::before ::after表示伪元素用来区别伪类。作用&#xff1a;在元素前面&#x…

[css] css常用的布局方式有哪些?

[css] css常用的布局方式有哪些&#xff1f; 1&#xff1a;圣杯布局 2&#xff1a;双飞翼 3&#xff1a;flex个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 与歌谣一起通关前端面试题

记一次线上cpu飙升100%的排查过程

大家好&#xff0c;我是烤鸭&#xff1a; 最近没怎么写技术文章&#xff0c;还是得回归下初心&#xff0c;正好前几天出现个线上问题&#xff0c;记录下排查过程。 问题描述 某个时间点&#xff0c;接收到接口响应慢报警。 过一会收到服务器cpu可用率低(<10%)报警。 去c…

[css] 对比下px、em、rem有什么不同?

[css] 对比下px、em、rem有什么不同&#xff1f; px是css中的逻辑像素&#xff0c;和移动端的物理像素之间会有一个比值dpr em是指相对于父元素的大小 rem中的r就是root&#xff0c;也就是相对于root元素的大小&#xff08;html标签&#xff09;个人简介 我是歌谣&#xff0c…

Node.js(爱前端) 一

一 Node.js 简介 1.1 官网 https://nodejs.org/en/ 官网介绍&#xff1a; Node.js是一个构建在 Chrome 浏览器V8引擎上的 JavaScript 运行环境。 Node.js 使用了事件驱动、非阻塞I/O模型&#xff0c;这些都使它轻量、好用。 Node.js 的包生态&#xff08;npm&#xff09;&#…

记一次线上服务假死排查过程

大家好&#xff0c;我是烤鸭&#xff1a; 最近线上问题有点多啊&#xff0c;分享一个服务假死的排查过程。 问题描述 9点10分&#xff0c;收到进程无响应报警(一共6台机器&#xff0c;有1台出现)&#xff0c;后来又有1台出现。 排查思路 首先确认是否误报或者网络抖动&…

[css] 简述下你理解的优雅降级和渐进增强

[css] 简述下你理解的优雅降级和渐进增强 优雅降级&#xff0c;先做好一个完善的具备完整体验的版本&#xff0c;再向下做兼容。 渐进增强&#xff0c;先做好一个可以基本正常使用的版本&#xff0c;再慢慢丰富体验和内容。个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前…

vue小记

1.vue绑定属性&#xff0c;点击事件 1.<!-- 完整语法 --> <a v-bind:href"url">...</a><!-- 缩写 --> <a :href"url">...</a>2.<!-- 完整语法 --> <a v-on:click"doSomething">...</a>&l…