RocketMQ源码阅读-Message拉取与消费-Consumer篇

RocketMQ源码阅读-Message拉取与消费-Consumer篇

  • 1. Consumer
  • 2. PushConsumer
  • 3. PushConsumer 订阅
    • 3.1 subscribe订阅
    • 3.2 registerMessageListener注册监听器
  • 4. PushConsumer 消息队列Rebalance
    • 4.1 Rebalance流程
    • 4.2 Rebalance策略
      • AllocateMessageQueueAveragely
      • AllocateMessageQueueByMachineRoom
      • AllocateMessageQueueAveragelyByCircle
      • AllocateMessageQueueByConfig
      • AllocateMessageQueueConsistentHash
      • AllocateMachineRoomNearby
    • 4.3 消费进度读取
  • 5. 消息拉取
  • 6. 消费消息
  • 7. 过期消息清理
  • 8. 发回失败消息到Broker
  • 9. 消费进度
    • load加载消费进度
    • readOffset读取消费进度
    • updateOffset更新消费进度
    • persistAll持久化消费进度
  • 10. 小结

上一篇分析了Broker中对于Message拉取与消费的支持。本篇继续分析Consumer中的拉取与消费消息的逻辑。

1. Consumer

RocketMQ提供了两种Consumer:

  1. PushConsumer:(接口MQPushConsumer)是最多使用的一种Consumer,名字是Push,实际是不断的Pull Broker的消息,不存在新消息时,Broker挂起请求(上一篇提到的挂起服务),知道有新的消息产生。实时性较高。
  2. PullConsumer:(接口MQPullConsumer)不是分析的重点

2. PushConsumer

PushConsumer(接口MQPushConsumer)涉及到的组件如下:

  • RebalanceService:均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。当有新的 Consumer 的加入或移除,都会重新分配消息队列。
  • PullMessageService:拉取消息服务,不断从 Broker 拉取消息,并提交消费任务到 ConsumeMessageService。
  • ConsumeMessageService:消费消息服务,不断消费消息,并处理消费结果。
  • RemoteBrokerOffsetStore:Consumer 消费进度管理,负责从 Broker 获取消费进度,同步消费进度到 Broker。
  • ProcessQueue :消息处理队列。
  • MQClientInstance :封装对 Namesrv,Broker 的 API调用,提供给 Producer、Consumer 使用。

他们之前的关系:

3. PushConsumer 订阅

查看官方的example方法。在example model中:
image.png

public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(false);System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) == 0) {return ConsumeOrderlyStatus.SUCCESS;} else if ((this.consumeTimes.get() % 3) == 0) {return ConsumeOrderlyStatus.ROLLBACK;} else if ((this.consumeTimes.get() % 4) == 0) {return ConsumeOrderlyStatus.COMMIT;} else if ((this.consumeTimes.get() % 5) == 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");
}

示例中新建了一个DefaultMQPushConsumer并通过subscribe方法完成消息订阅,并通过registerMessageListener方法注册了一个消息监听器。
以上便是注册一个Consumer的流程。

PushConsumer对应接口MQPushConsumer,其默认实现为DefaultMQPushConsumer,提供了消息Topic订阅的功能。
image.png

3.1 subscribe订阅

其中进行消息订阅的接口为subscribe(…),DefaultMQPushConsumer只是对接口的一层封装,其具体方法实现DefaultMQPushConsumerImpl#subscribe(…):

public void subscribe(String topic, String subExpression) throws MQClientException {try {// 创建订阅数据SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subExpression);this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);if (this.mQClientFactory != null) {// 通过心跳同步Consumer信息到Brokerthis.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}} catch (Exception e) {throw new MQClientException("subscription exception", e);}
}

可以看到,此方法首先创建订阅数据,然后通过心跳同步Consumer信息到Broker。

  • topic
  • subExpression:订阅表达式

其中创建订阅数据的方法为FilterAPI.buildSubscriptionData(…):

public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,String subString) throws Exception {SubscriptionData subscriptionData = new SubscriptionData();subscriptionData.setTopic(topic);subscriptionData.setSubString(subString);// 处理订阅表达式if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {subscriptionData.setSubString(SubscriptionData.SUB_ALL);} else {String[] tags = subString.split("\\|\\|");if (tags.length > 0) {for (String tag : tags) {if (tag.length() > 0) {String trimString = tag.trim();if (trimString.length() > 0) {subscriptionData.getTagsSet().add(trimString);subscriptionData.getCodeSet().add(trimString.hashCode());}}}} else {throw new Exception("subString split error");}}return subscriptionData;
}

此方法的主要逻辑是根据Topic和订阅表达式创建订阅数据。SubscriptionData是订阅数据实体:

public class SubscriptionData implements Comparable<SubscriptionData> {public final static String SUB_ALL = "*";private boolean classFilterMode = false;private String topic;private String subString;private Set<String> tagsSet = new HashSet<String>();private Set<Integer> codeSet = new HashSet<Integer>();private long subVersion = System.currentTimeMillis();private String expressionType = ExpressionType.TAG;@JSONField(serialize = false)private String filterClassSource;
}

其中的属性subVersion代表订阅版本,取得是当前系统时间。

  • tagsSet:订阅列表
  • codeSet:订阅列表的hash

3.2 registerMessageListener注册监听器

创建Consumer的示例代码中注册了一个监听器,其方法为DefaultMQPushConsumer#registerMessageListener(…):

@Override
public void registerMessageListener(MessageListenerOrderly messageListener) {this.messageListener = messageListener;this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}

DefaultMQPushConsumerImpl#registerMessageListener:

public void registerMessageListener(MessageListener messageListener) {this.messageListenerInner = messageListener;
}

只是将监听器赋值给内部属性。

4. PushConsumer 消息队列Rebalance

在消息Consumer注册时,会触发Rebalance进行消息队列的重分配DefaultMQPushConsumerImpl#subscribe(…):

this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);

4.1 Rebalance流程

消息队列Rebalance是由类RebalanceService实现的,作为均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue ):

public class RebalanceService extends ServiceThread {// 等待间隔,单位:毫秒private static long waitInterval =Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));private final InternalLogger log = ClientLogger.getLog();// MQClient对象private final MQClientInstance mqClientFactory;public RebalanceService(MQClientInstance mqClientFactory) {this.mqClientFactory = mqClientFactory;}@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}@Overridepublic String getServiceName() {return RebalanceService.class.getSimpleName();}
}

是一个单独的线程,默认每20s执行一次Rebalance,还有两种触发方式:

  • PushConsumer 启动时,调用 rebalanceService#wakeup(…) 触发
  • Broker 通知 Consumer 加入 或 移除时,Consumer 响应通知,调用 rebalanceService#wakeup(…) 触发

调用方法MQClientInstance#doRebalance:

public void doRebalance() {for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();if (impl != null) {try {impl.doRebalance();} catch (Throwable e) {log.error("doRebalance exception", e);}}}
}

会遍历当前 Client 包含的 consumerTable( Consumer集合 ),执行消息队列分配。
调用方法MQConsumerInner#doRebalance,MQConsumerInner是接口,其实现为DefaultMQPushConsumerImpl,所以实际调用DefaultMQPushConsumerImpl#doRebalance:

@Override
public void doRebalance() {if (!this.pause) {this.rebalanceImpl.doRebalance(this.isConsumeOrderly());}
}

调用RebalanceImpl#doRebalance:

public void doRebalance(final boolean isOrder) {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}// 移除未订阅的topic对应的消息队列this.truncateMessageQueueNotMyTopic();
}// 移除未订阅的topic对应的消息队列
private void truncateMessageQueueNotMyTopic() {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();for (MessageQueue mq : this.processQueueTable.keySet()) {if (!subTable.containsKey(mq.getTopic())) {ProcessQueue pq = this.processQueueTable.remove(mq);if (pq != null) {pq.setDropped(true);log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);}}}
}

此方法获取所有Topic循环遍历,分配每个 topic 的消息队列,调用RebalanceImpl#rebalanceByTopic:

private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}case CLUSTERING: {// 获取 topic 对应的 队列 和 consumer信息Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {// 排序 消息队列 和 消费者数组。因为是在Client进行分配队列,排序后,各Client的顺序才能保持一致。List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;// 根据 队列分配策略 分配消息队列try {allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}// 更新消息队列boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;}
}

对广播和集群形式的messageModel,最终都会调用RebalanceImpl#updateProcessQueueTableInRebalance,更新消息处理队列,做两个操作:

  • 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
  • 增加 不在processQueueTable && 存在于mqSet 里的消息队列

源码如下:
参数mqSet :负载均衡结果后的消息队列数组

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {boolean changed = false;// 移除 在processQueueTable && 不存在于 mqSet 里的消息队列Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();// 处理不需要rebalance的消息if (mq.getTopic().equals(topic)) {if (!mqSet.contains(mq)) {pq.setDropped(true);// 移除不需要的消息队列if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);}} else if (pq.isPullExpired()) {// 队列拉取超时,进行清理switch (this.consumeType()) {case CONSUME_ACTIVELY:break;case CONSUME_PASSIVELY:pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",consumerGroup, mq);}break;default:break;}}}}// 增加 不在processQueueTable && 存在于mqSet 里的消息队列。List<PullRequest> pullRequestList = new ArrayList<PullRequest>();for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);continue;}this.removeDirtyOffset(mq);ProcessQueue pq = new ProcessQueue();// 计算需要从哪里pull消息long nextOffset = this.computePullFromWhere(mq);if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}}// 发起消息拉取请求this.dispatchPullRequest(pullRequestList);return changed;
}

可以看到,#rebalanceByTopic(…),

  • 广播模式( BROADCASTING ) 下,分配 Topic 对应的所有消息队列
  • 集群模式( CLUSTERING ) 下,分配 Topic 对应的部分消息队列

最终发起消息拉取请求,RebalancePushImpl#dispatchPullRequest(…):

@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {for (PullRequest pullRequest : pullRequestList) {this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);}
}

该方法的作用就是发起消息拉取请求。该调用是PushConsumer不断拉取消息的起点。调用了DefaultMQPushConsumerImpl#executePullRequestImmediately(…):

public void executePullRequestImmediately(final PullRequest pullRequest) {this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}

调用了PullMessageService#executePullRequestImmediately:

private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();public void executePullRequestImmediately(final PullRequest pullRequest) {try {this.pullRequestQueue.put(pullRequest);} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);}
}@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");
}

将拉取请求,放进了请求队列中。PullMessageService会启动线程,异步不断执行拉取请求。调用的方法是PullMessageService#pullMessage:

private void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}
}

实际上还是会调用DefaultMQPushConsumerImpl#pullMessage方法拉取消息,这个后续流程在第6小结分析。

整体时序图为(来源https://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/?github&1601):
image.png

4.2 Rebalance策略

在Rebalance时,会获取Rebalance策略,对应的接口为AllocateMessageQueueStrategy,其有6个实现类:
image.png

AllocateMessageQueueAveragely

平均分配队列策略

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}// 平均分配// 第几个consumerint index = cidAll.indexOf(currentCID);// 余数,即多少消息队列无法平均分配int mod = mqAll.size() % cidAll.size();int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());// 有余数的情况下,[0, mod) 平分余数,即每consumer多分配一个节点;第index开始,跳过前mod余数int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;}@Overridepublic String getName() {return "AVG";}
}

分配示例:

**
Consumer * 2 可以整除Consumer * 3 不可整除Consumer * 5 无法都分配
消息队列[0]Consumer[0]Consumer[0]Consumer[0]
消息队列[1]Consumer[0]Consumer[0]Consumer[1]
消息队列[2]Consumer[1]Consumer[1]Consumer[2]
消息队列[3]Consumer[1]Consumer[2]Consumer[3]

AllocateMessageQueueByMachineRoom

平均分配可消费的 Broker 对应的消息队列,源码就不深究了。

AllocateMessageQueueAveragelyByCircle

环状分配消息队列

AllocateMessageQueueByConfig

按配置分配消息队列

AllocateMessageQueueConsistentHash

一致性hash队列算法分配

AllocateMachineRoomNearby

基于机房近侧优先级的分配策略代理。可以指定实际的分配策略。如果任何消费者在机房中处于活动状态,则部署在同一台机器中的代理的消息队列应仅分配给这些消费者。否则,这些消息队列可以与所有使用者共享,因为没有活的使用者来垄断它们。(引用类说明)

4.3 消费进度读取

rebalance流程中,有一步是获取消费进度RebalanceImpl#updateProcessQueueTableInRebalance:

// 计算消息队列开始消费位置
long nextOffset = this.computePullFromWhere(mq);

调用的方法为RebalancePushImpl#computePullFromWhere(…):

@Override
public long computePullFromWhere(MessageQueue mq) {long result = -1;final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();switch (consumeFromWhere) {case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:case CONSUME_FROM_MIN_OFFSET:case CONSUME_FROM_MAX_OFFSET:case CONSUME_FROM_LAST_OFFSET: {long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {result = lastOffset;}// First start,no offsetelse if (-1 == lastOffset) {if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {result = 0L;} else {try {result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);} catch (MQClientException e) {result = -1;}}} else {result = -1;}break;}case CONSUME_FROM_FIRST_OFFSET: {long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {result = lastOffset;} else if (-1 == lastOffset) {result = 0L;} else {result = -1;}break;}case CONSUME_FROM_TIMESTAMP: {long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {result = lastOffset;} else if (-1 == lastOffset) {if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {try {result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);} catch (MQClientException e) {result = -1;}} else {try {long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),UtilAll.YYYYMMDDHHMMSS).getTime();result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);} catch (MQClientException e) {result = -1;}}} else {result = -1;}break;}default:break;}return result;
}

此方法是用来计算消息队列开始消费位置,有三种选项:

  1. 一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费。
  2. 一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费。
  3. 一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费。

5. 消息拉取

在rebalance流程的最后,分析到PullMessageService#pullMessage会调用DefaultMQPushConsumerImpl#pullMessage方法拉取消息。

PullMessageService#pullMessage源码为:

public class PullMessageService extends ServiceThread {private final InternalLogger log = ClientLogger.getLog();//拉取消息请求队列private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();//MQClient对象private final MQClientInstance mQClientFactory;// 定时器。用于延迟提交拉取请求private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "PullMessageServiceScheduledThread");}});public PullMessageService(MQClientInstance mQClientFactory) {this.mQClientFactory = mQClientFactory;}// 执行延迟拉取消息请求public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {if (!isStopped()) {this.scheduledExecutorService.schedule(new Runnable() {@Overridepublic void run() {PullMessageService.this.executePullRequestImmediately(pullRequest);}}, timeDelay, TimeUnit.MILLISECONDS);} else {log.warn("PullMessageServiceScheduledThread has shutdown");}}// 执行立即拉取消息请求public void executePullRequestImmediately(final PullRequest pullRequest) {try {this.pullRequestQueue.put(pullRequest);} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);}}// 执行延迟任务public void executeTaskLater(final Runnable r, final long timeDelay) {if (!isStopped()) {this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);} else {log.warn("PullMessageServiceScheduledThread has shutdown");}}public ScheduledExecutorService getScheduledExecutorService() {return scheduledExecutorService;}// 拉取消息private void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}@Overridepublic void shutdown(boolean interrupt) {super.shutdown(interrupt);ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);}@Overridepublic String getServiceName() {return PullMessageService.class.getSimpleName();}}

该类完成的服务是从 Broker 拉取消息,并提交消费任务到 ConsumeMessageService。
实际调用DefaultMQPushConsumerImpl#pullMessage方法拉取消息:

public void pullMessage(final PullRequest pullRequest) {final ProcessQueue processQueue = pullRequest.getProcessQueue();if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return;}// 设置队列最后拉取消息时间pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());// 判断consumer状态是否运行中。如果不是,则延迟拉取消息。try {this.makeSureStateOK();} catch (MQClientException e) {log.warn("pullMessage exception, consumer state not ok", e);this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);return;}if (this.isPause()) {log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);return;}// 判断是否超过最大持有消息数量。默认最大值为1000long cachedMessageCount = processQueue.getMsgCount().get();long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}if (!this.consumeOrderly) {if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);}return;}} else {if (processQueue.isLocked()) {if (!pullRequest.isLockedFirst()) {final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());boolean brokerBusy = offset < pullRequest.getNextOffset();log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);if (brokerBusy) {log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",pullRequest, offset);}pullRequest.setLockedFirst(true);pullRequest.setNextOffset(offset);}} else {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);log.info("pull message later because not locked in broker, {}", pullRequest);return;}}final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (null == subscriptionData) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);log.warn("find the consumer's subscription failed, {}", pullRequest);return;}final long beginTimestamp = System.currentTimeMillis();PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);switch (pullResult.getPullStatus()) {case FOUND:long prevRequestOffset = pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());long pullRT = System.currentTimeMillis() - beginTimestamp;DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset = Long.MAX_VALUE;if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());// 提交拉取到的消息到消息处理队列boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());// 提交消费请求DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);// 提交下次拉取消息请求if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());} else {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}}if (pullResult.getNextBeginOffset() < prevRequestOffset|| firstMsgOffset < prevRequestOffset) {log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",pullResult.getNextBeginOffset(),firstMsgOffset,prevRequestOffset);}break;case NO_NEW_MSG:pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;case NO_MATCHED_MSG:pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;case OFFSET_ILLEGAL:log.warn("the pull request offset illegal, {} {}",pullRequest.toString(), pullResult.toString());pullRequest.setNextOffset(pullResult.getNextBeginOffset());pullRequest.getProcessQueue().setDropped(true);DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {@Overridepublic void run() {try {DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),pullRequest.getNextOffset(), false);DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());log.warn("fix the pull request offset, {}", pullRequest);} catch (Throwable e) {log.error("executeTaskLater Exception", e);}}}, 10000);break;default:break;}}}@Overridepublic void onException(Throwable e) {if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("execute the pull request exception", e);}DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);}};boolean commitOffsetEnable = false;long commitOffsetValue = 0L;if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);if (commitOffsetValue > 0) {commitOffsetEnable = true;}}String subExpression = null;boolean classFilter = false;SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (sd != null) {if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {subExpression = sd.getSubString();}classFilter = sd.isClassFilterMode();}int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, // commitOffsettrue, // suspendsubExpression != null, // subscriptionclassFilter // class filter);// 核心!!拉取消息。如果拉取请求发生异常时,提交延迟拉取消息请求try {this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,pullCallback);} catch (Exception e) {log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);}
}

拉取消息的核心方法为PullAPIWrapper#pullKernelImpl(…):

/*** 拉取消息** @param mq 消息队列* @param subExpression 订阅表达式* @param subVersion 订阅版本号* @param offset 拉取队列开始位置* @param maxNums 拉取消息数量* @param sysFlag 拉取请求系统标识* @param commitOffset 提交消费进度* @param brokerSuspendMaxTimeMillis broker挂起请求最大时间* @param timeoutMillis 请求broker超时时长* @param communicationMode 通讯模式* @param pullCallback 拉取回调* @return 拉取消息结果。只有通讯模式为同步时,才返回结果,否则返回null* @throws MQClientException 当寻找不到 broker 时,或发生其他client异常* @throws RemotingException 当远程调用发生异常时* @throws MQBrokerException 当 broker 发生异常时。只有通讯模式为同步时才会发生该异常* @throws InterruptedException 当发生中断异常时*/
public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final String expressionType,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 获取BrokerFindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);if (null == findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);}// 拉取消息请求if (findBrokerResult != null) {{// check versionif (!ExpressionType.isTagType(expressionType)&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {throw new MQClientException("The broker[" + mq.getBrokerName() + ", "+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);}}int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);requestHeader.setExpressionType(expressionType);String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);}// 发送拉取消息请求PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

该方法首先获取Broker信息,包括Broker的地址以主从节点信息,然后执行拉取消息请求。

获取Broker消息先调用方法PullAPIWrapper#recalculatePullFromWhichNode方法,获取消息队列拉取消息对应的Broker编号。源码:

// 存储消息队列与Broer的映射,拉取消息时,通过此映射获取拉取请求对应的Broker
private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>(32);private volatile boolean connectBrokerByUser = false;private volatile long defaultBrokerId = MixAll.MASTER_ID;public long recalculatePullFromWhichNode(final MessageQueue mq) {// 是否使用默认的Brokerif (this.isConnectBrokerByUser()) {return this.defaultBrokerId;}// 若消息队列映射拉取Broker存在,则返回映射Broker编号AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);if (suggest != null) {return suggest.get();}// 返回Broker的主节点编号return MixAll.MASTER_ID;
}

再调用MQClientInstance#findBrokerAddressInSubscribe方法,获取Broker的信息,源码:

// 存储Broker名字 和 Broker地址相关 Map
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =new ConcurrentHashMap<String, HashMap<Long, String>>();public FindBrokerResult findBrokerAddressInSubscribe(final String brokerName, // broker名字final long brokerId, // broker编号final boolean onlyThisBroker //是否必须是该broker
) {String brokerAddr = null;boolean slave = false;boolean found = false;// 获得Broker信息HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {brokerAddr = map.get(brokerId);slave = brokerId != MixAll.MASTER_ID;found = brokerAddr != null;// 如果不强制获得,选择一个Brokerif (!found && !onlyThisBroker) {Entry<Long, String> entry = map.entrySet().iterator().next();brokerAddr = entry.getValue();slave = entry.getKey() != MixAll.MASTER_ID;found = true;}}// 找到Broker,返回if (found) {return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));}// 找不到返回空return null;
}

可以看到,此方法获取Broker信息,包括Broker的地址以及是否为从节点等信息。

上文分析的DefaultMQPushConsumerImpl#pullMessage方法中,注册了一个PullCallback回调,来处理拉取结果:

PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);switch (pullResult.getPullStatus()) {case FOUND:...//省略

调用了PullAPIWrappe#processPullResult方法,源码:

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,final SubscriptionData subscriptionData) {PullResultExt pullResultExt = (PullResultExt) pullResult;// 更新消息队列拉取消息Broker编号的映射this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());if (PullStatus.FOUND == pullResult.getPullStatus()) {// 解析消息ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);// 根据订阅信息消息tagCode匹配合适消息List<MessageExt> msgListFilterAgain = msgList;if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());for (MessageExt msg : msgList) {if (msg.getTags() != null) {if (subscriptionData.getTagsSet().contains(msg.getTags())) {msgListFilterAgain.add(msg);}}}}if (this.hasHook()) {FilterMessageContext filterMessageContext = new FilterMessageContext();filterMessageContext.setUnitMode(unitMode);filterMessageContext.setMsgList(msgListFilterAgain);this.executeHook(filterMessageContext);}for (MessageExt msg : msgListFilterAgain) {String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (traFlag != null && Boolean.parseBoolean(traFlag)) {msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));}MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,Long.toString(pullResult.getMinOffset()));MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,Long.toString(pullResult.getMaxOffset()));}// 设置消息列表pullResultExt.setMsgFoundList(msgListFilterAgain);}// 清空消息二进制数组pullResultExt.setMessageBinary(null);return pullResult;
}

该方法实现的功能主要是:

  • 更新消息队列拉取消息 Broker 编号的映射
  • 解析消息,并根据订阅信息消息 tagCode匹配合适消息

这样消息就被拉取到了,后面就可以进行消息消费了,在这里会提交拉取到的消息到消息处理队列,在PullCallback回调中:

// 提交拉取到的消息到消息处理队列
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);

调用方法为ProcessQueue#putMessage(…):

// 消息映射读写锁
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
//消息映射 key:消息队列位置
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
// 消息数
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();
private final Lock lockConsume = new ReentrantLock();
/*** A subset of msgTreeMap, will only be used when orderly consume*/
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
// 添加消息最大队列位置
private volatile long queueOffsetMax = 0L;
private volatile boolean dropped = false;
private volatile long lastPullTimestamp = System.currentTimeMillis();
private volatile long lastConsumeTimestamp = System.currentTimeMillis();
private volatile boolean locked = false;
private volatile long lastLockTimestamp = System.currentTimeMillis();
// 是否正在消费
private volatile boolean consuming = false;
// Broker累计消息数量  计算公式 = queueMaxOffset - 新添加消息数组[n - 1].queueOffset
// Acc = Accumulation
// cnt = (猜测)对比度
private volatile long msgAccCnt = 0;public boolean putMessage(final List<MessageExt> msgs) {boolean dispatchToConsume = false;try {this.lockTreeMap.writeLock().lockInterruptibly();try {// 添加消息int validMsgCnt = 0;for (MessageExt msg : msgs) {MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);if (null == old) {validMsgCnt++;this.queueOffsetMax = msg.getQueueOffset();msgSize.addAndGet(msg.getBody().length);}}msgCount.addAndGet(validMsgCnt);// 计算是否正在消费if (!msgTreeMap.isEmpty() && !this.consuming) {dispatchToConsume = true;this.consuming = true;}// Broker累计消息数量if (!msgs.isEmpty()) {MessageExt messageExt = msgs.get(msgs.size() - 1);String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);if (property != null) {long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();if (accTotal > 0) {this.msgAccCnt = accTotal;}}}} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("putMessage exception", e);}return dispatchToConsume;
}

此方法提交消息给消费者存储,返回是否提交成功。
然后会调用ConsumeMessageConcurrentlyService#submitConsumeRequest(…)方法提交消费请求,这个下一节分析。

至此消费者就可以消费消息了。

整体时序图为(出处https://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/?github&1601):
image.png

6. 消费消息

上一节分析完了消息拉取的过程,这一节来看消息消费的过程。
上一节消息拉取后,将消息提交存储到了ProcessQueue。将有专门的线程完成对ProcessQueue的消费,这个线程是由ConsumeMessageConcurrentlyService 提供的。入口是ConsumeMessageConcurrentlyService#submitConsumeRequest(…)方法,源码如下:

// 消费线程池队列
private final BlockingQueue<Runnable> consumeRequestQueue;
// 消费线程池
private final ThreadPoolExecutor consumeExecutor;@Override
public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();// 根据消息量,看是否走批量逻辑if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {// 提交消费请求this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {// 被拒绝,会提交延迟消息消费请求this.submitConsumeRequestLater(consumeRequest);}} else {for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {// 提交消费请求this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}// 被拒绝,会提交延迟消息消费请求this.submitConsumeRequestLater(consumeRequest);}}}
}

可以看到该方法的功能是提交立即消费消息的请求,被拒绝会提交延迟消息消费请求,拒绝的原因可能是线程池满了。
提交延迟消息的方法为ConsumeMessageConcurrentlyService#submitConsumeRequestLater:

private void submitConsumeRequestLater(final ConsumeRequest consumeRequest) {this.scheduledExecutorService.schedule(new Runnable() {@Overridepublic void run() {ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);}}, 5000, TimeUnit.MILLISECONDS);
}

实际上是提交给了scheduledExecutorService,延迟任务线程池。

提交的立即消费消息和延迟消费消息请求,都是ConsumeRequest实体进行的封装,其是ConsumeMessageConcurrentlyService的内部类,源码为:

class ConsumeRequest implements Runnable {// 消费消息列表private final List<MessageExt> msgs;// 消息处理队列private final ProcessQueue processQueue;// 消息队列private final MessageQueue messageQueue;public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {this.msgs = msgs;this.processQueue = processQueue;this.messageQueue = messageQueue;}public List<MessageExt> getMsgs() {return msgs;}public ProcessQueue getProcessQueue() {return processQueue;}@Overridepublic void run() {if (this.processQueue.isDropped()) {// 废弃队列不进行消费log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}//监听器MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;// 上下文ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);// 消费结果状态ConsumeConcurrentlyStatus status = null;defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext = null;if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;// 消费结果ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}// 消费消息status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);hasException = true;}long consumeRT = System.currentTimeMillis() - beginTimestamp;// 解析消费结果if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {// 消费结果为空,设置状态为稍后重新消费log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);// 处理消费结果if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}public MessageQueue getMessageQueue() {return messageQueue;}}

该方法处理消费请求,并将请求提交给listener。这里的listener就是创建消费者时提供的。
在上述代码最后一步是处理消费结果,ConsumeMessageConcurrentlyService#processConsumeResult(…):

public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest
) {int ackIndex = context.getAckIndex();// 消息空,返回if (consumeRequest.getMsgs().isEmpty())return;// 计算从consumeRequest.msgs[0]到consumeRequest.msgs[ackIndex]的消息消费结果switch (status) {case CONSUME_SUCCESS:if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}// 统计成功/失败数量int ok = ackIndex + 1;int failed = consumeRequest.getMsgs().size() - ok;this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;case RECONSUME_LATER:ackIndex = -1;// 统计成功/失败数量this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size());break;default:break;}// 处理消费失败的消息switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:// 广播模式,无论是否消费失败,不发回消息到Broker,只打印Logfor (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;case CLUSTERING:// 发回消息失败到BrokerList<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);boolean result = this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}// 发回Broker失败的消息,直接提交延迟重新消费if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}// 移除消费成功消息,并更新最新消费进度long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}
}

此方法统计消费成功/失败的数量,并且处理消费失败的消息,消费失败时,如果是CLUSTERING模式,会将消费失败的消息发回到broker,这个逻辑在Broker篇分析过,Consumer的处理逻辑在第9小节分析。
这里继续看移除消费成功消息,并更新最新消费进度的流程。
移除消费成功消息调用方法ProcessQueue#removeMessage(…):

public long removeMessage(final List<MessageExt> msgs) {long result = -1;final long now = System.currentTimeMillis();try {this.lockTreeMap.writeLock().lockInterruptibly();this.lastConsumeTimestamp = now;try {if (!msgTreeMap.isEmpty()) {result = this.queueOffsetMax + 1;// 移除消息int removedCnt = 0;for (MessageExt msg : msgs) {MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());if (prev != null) {removedCnt--;msgSize.addAndGet(0 - msg.getBody().length);}}msgCount.addAndGet(removedCnt);if (!msgTreeMap.isEmpty()) {result = msgTreeMap.firstKey();}}} finally {this.lockTreeMap.writeLock().unlock();}} catch (Throwable t) {log.error("removeMessage exception", t);}return result;
}

7. 过期消息清理

过期消息清理逻辑入口在ConsumeMessageConcurrentlyService#cleanExpireMsg(…):

public void start() {this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {cleanExpireMsg();}}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}

上面这段代码是ConsumeMessageConcurrentlyService启东时调用的。继续看源码:

private void cleanExpireMsg() {Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();while (it.hasNext()) {Map.Entry<MessageQueue, ProcessQueue> next = it.next();ProcessQueue pq = next.getValue();pq.cleanExpiredMsg(this.defaultMQPushConsumer);}
}

这个方法默认周期:15min。
其调用的方法为ProcessQueue#cleanExpiredMsg(…):

public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {// 顺序消费时,直接返回if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {return;}// 每次循环最多移除16条int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;for (int i = 0; i < loop; i++) {MessageExt msg = null;// 获取消息,并看是否超时,不超时,结束循环try {this.lockTreeMap.readLock().lockInterruptibly();try {if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {msg = msgTreeMap.firstEntry().getValue();} else {break;}} finally {this.lockTreeMap.readLock().unlock();}} catch (InterruptedException e) {log.error("getExpiredMsg exception", e);}try {// 超时消息,发回BrokerpushConsumer.sendMessageBack(msg, 3);log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());try {// 判断此时消息是否依然是第一条,若是,则进行移除this.lockTreeMap.writeLock().lockInterruptibly();try {if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {try {removeMessage(Collections.singletonList(msg));} catch (Exception e) {log.error("send expired msg exception", e);}}} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("getExpiredMsg exception", e);}} catch (Exception e) {log.error("send expired msg exception", e);}}
}

消息清理每15min执行一次,如果超时,会发回超时消息到broker,并从ProcessQueue移除。

8. 发回失败消息到Broker

在消息消费失败和超时时,都会发回到Broker。调用的方法为DefaultMQPushConsumerImpl#sendMessageBack(…):

public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {try {String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());// Consumer发回消息this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());} catch (Exception e) {// 异常时,使用Client内置Producer发回消息log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());String originMsgId = MessageAccessor.getOriginMessageId(msg);MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);newMsg.setFlag(msg.getFlag());MessageAccessor.setProperties(newMsg, msg.getProperties());MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());this.mQClientFactory.getDefaultMQProducer().send(newMsg);} finally {msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}
}

方法主要是使用Consumer发回消息,异常时会使用内置的Producer发回消息。
Consumer发回消息调用方法MQClientAPIImpl#consumerSendMessageBack(…):

/*** Consumer发回消息* @param addr Broker地址* @param msg 消息* @param consumerGroup 消费分组* @param delayLevel 延迟级别* @param timeoutMillis 超时* @param maxConsumeRetryTimes 消费最大重试次数* @throws RemotingException 当远程调用发生异常时* @throws MQBrokerException 当Broker发生异常时* @throws InterruptedException 当线程中断时*/
public void consumerSendMessageBack(final String addr,final MessageExt msg,final String consumerGroup,final int delayLevel,final long timeoutMillis,final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);requestHeader.setGroup(consumerGroup);requestHeader.setOriginTopic(msg.getTopic());requestHeader.setOffset(msg.getCommitLogOffset());requestHeader.setDelayLevel(delayLevel);requestHeader.setOriginMsgId(msg.getMsgId());requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());
}

此方法就是构建请求,并执行发回流程。

9. 消费进度

Consumer会存储消费进度,操作的接口为OffsetStore,其有两个实现类:

  • RemoteBrokerOffsetStore :Consumer 集群模式 下,使用远程 Broker 消费进度
  • LocalFileOffsetStore :Consumer 广播模式下,使用本地 文件 消费进度

下面分别分析下其接口。

load加载消费进度

OffsetStore#load(…)是接口定义的方法,RemoteBrokerOffsetStore并没有实现是个空方法:

@Override
public void load() {
}

也就是不进行加载,实际读取消费进度时,从 Broker 获取。

只有LocalFileOffsetStore#load(…)对齐进行了实现,源码如下:

@Override
public void load() throws MQClientException {// 从本地硬盘读取消费进度OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());// 打印每个消息队列的消费进度for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);log.info("load consumer's offset, {} {} {}",this.groupName,mq,offset.get());}}
}

可以看出本方法从本地文件加载消费进度到内存。存储消费进度的类为OffsetSerializeWrapper:

public class OffsetSerializeWrapper extends RemotingSerializable {private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =new ConcurrentHashMap<MessageQueue, AtomicLong>();public ConcurrentMap<MessageQueue, AtomicLong> getOffsetTable() {return offsetTable;}public void setOffsetTable(ConcurrentMap<MessageQueue, AtomicLong> offsetTable) {this.offsetTable = offsetTable;}
}

可以看出这个类非常简单,只是提供了一个Map存储消费进度。

readOffset读取消费进度

readOffset读取消费进度,读取消费进度有以下几种读取类型:

  1. READ_FROM_MEMORY :从内存读取
  2. READ_FROM_STORE :从存储( Broker 或 文件 )读取
  3. MEMORY_FIRST_THEN_STORE :优先从内存读取,读取不到,从存储读取

先来看下LocalFileOffsetStore的实现,LocalFileOffsetStore#readOffset(…):

@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {if (mq != null) {switch (type) {case MEMORY_FIRST_THEN_STORE:case READ_FROM_MEMORY: {AtomicLong offset = this.offsetTable.get(mq);if (offset != null) {return offset.get();} else if (ReadOffsetType.READ_FROM_MEMORY == type) {return -1;}}case READ_FROM_STORE: {OffsetSerializeWrapper offsetSerializeWrapper;try {offsetSerializeWrapper = this.readLocalOffset();} catch (MQClientException e) {return -1;}if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);if (offset != null) {this.updateOffset(mq, offset.get(), false);return offset.get();}}}default:break;}}return -1;
}

可以看到,主要实现了从文件读取消费进度。

再来看下RemoteBrokerOffsetStore的实现,RemoteBrokerOffsetStore#readOffset(…):

@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {if (mq != null) {switch (type) {case MEMORY_FIRST_THEN_STORE:case READ_FROM_MEMORY: {AtomicLong offset = this.offsetTable.get(mq);if (offset != null) {return offset.get();} else if (ReadOffsetType.READ_FROM_MEMORY == type) {return -1;}}case READ_FROM_STORE: {try {long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);AtomicLong offset = new AtomicLong(brokerOffset);this.updateOffset(mq, offset.get(), false);return brokerOffset;}// No offset in brokercatch (MQBrokerException e) {return -1;}//Other exceptionscatch (Exception e) {log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);return -2;}}default:break;}}return -1;
}

实现了从broker读取消费进度。

updateOffset更新消费进度

RemoteBrokerOffsetStore 与 LocalFileOffsetStore 实现相同:

@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {if (mq != null) {AtomicLong offsetOld = this.offsetTable.get(mq);if (null == offsetOld) {offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));}if (null != offsetOld) {if (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);} else {offsetOld.set(offset);}}}
}

是采用原子方式更新进度。

persistAll持久化消费进度

LocalFileOffsetStore的实现,LocalFileOffsetStore#persistAll(…):

@Override
public void persistAll(Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty())return;OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {if (mqs.contains(entry.getKey())) {AtomicLong offset = entry.getValue();offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);}}String jsonString = offsetSerializeWrapper.toJson(true);if (jsonString != null) {try {MixAll.string2File(jsonString, this.storePath);} catch (IOException e) {log.error("persistAll consumer offset Exception, " + this.storePath, e);}}
}

会将消费进度写入文件。

RemoteBrokerOffsetStore的实现,RemoteBrokerOffsetStore#persistAll(…):

@Override
public void persistAll(Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty())return;final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();if (!mqs.isEmpty()) {for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {MessageQueue mq = entry.getKey();AtomicLong offset = entry.getValue();if (offset != null) {if (mqs.contains(mq)) {try {this.updateConsumeOffsetToBroker(mq, offset.get());log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",this.groupName,this.mQClientFactory.getClientId(),mq,offset.get());} catch (Exception e) {log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);}} else {unusedMQ.add(mq);}}}}if (!unusedMQ.isEmpty()) {for (MessageQueue mq : unusedMQ) {this.offsetTable.remove(mq);log.info("remove unused mq, {}, {}", mq, this.groupName);}}
}

此实现持久化指定消息队列数组的消费进度到 Broker,并移除非指定消息队列。

persistAll方法会被DefaultMQPullConsumerImpl#persistConsumerOffset调用:

@Override
public void persistConsumerOffset() {try {this.makeSureStateOK();Set<MessageQueue> mqs = new HashSet<MessageQueue>();Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();mqs.addAll(allocateMq);this.offsetStore.persistAll(mqs);} catch (Exception e) {log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);}
}

继而被MQClientInstance#persistAllConsumerOffset方法调用:

private void persistAllConsumerOffset() {Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();impl.persistConsumerOffset();}
}

这个方法是在MQClientInstance初始化时,提交到线程池中的MQClientInstance#startScheduledTask:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

每5000ms执行一次。

不仅如此,在拉取消息、分配消息队列等等操作,都会进行消费进度持久化。

10. 小结

本篇主要分析了Consumer中对于Message拉取与消费的处理流程。

  1. Consumer订阅Topic并注册监听器
  2. 通过Rebalance策略进行Consuer和队列的分配,指定consumer可以消费哪些队列
  3. 进行消息的拉取
  4. 进行消息的消费
  5. 如果消息消费失败或过期,会发回到Broker
  6. 最后是对消费进度的处理,加载读取以及持久化

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/628511.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS 动态邮件查收效果

<template><view class="content"><view class="tooltip-container"><text class="tooltip">查看</text><text class="text">@</text></view></view> </template><sc…

Visual Studio调试模式下无法使用右键菜单将ppt转换到pdf

Visual Studio调试模式下无法使用右键菜单将ppt转换到pdf 症状 Visual Studio调试模式下&#xff0c;程序停在断点时&#xff0c;我临时需要将ppt转为pdf&#xff0c;遂右键单击文件&#xff0c;想直接转pdf&#xff0c;奈何光标转了几秒钟&#xff0c;毫无反应。 解决方法 …

未来科技五年人工智能行业产业发展趋势最新竞争力

人工智能&#xff08;Artificial Intelligence&#xff0c;AI&#xff09;是近年来快速发展的热门领域&#xff0c;被广泛应用于各个行业。随着技术的不断创新和突破&#xff0c;人工智能行业的竞争力也在不断提升。本文将分析未来科技五年人工智能行业产业发展趋势&#xff0c…

力扣精选算法100题——水果成篮(滑动窗口专题)

本题链接&#x1f449;水果成篮 第一步&#xff1a;了解题意 我就按照实例1来进行对这题的理解。 1代表种类类型&#xff0c;这个数组里面有2个种类类型 ps:种类1和种类2 &#xff0c;只不过种类1是有2个水果&#xff0c;种类2有一个水果&#xff0c;共计3个水果。 本题需要解…

如何区分GPT-3.5模型与GPT-4模型?

GPT 3.5 和 GPT-4 有什么区别&#xff1f; GPT-3.5 在经过大量数据训练后&#xff0c;成功地发展到可以考虑 1750 亿个参数以响应提示。这使其具备令人印象深刻的语言技能&#xff0c;以非常人性化的方式回应各种查询。然而&#xff0c;GPT-4 在更为庞大的训练数据基础上进行了…

Vue keep-alive的使用和原理解析

✨ 专栏介绍 在当今Web开发领域中&#xff0c;构建交互性强、可复用且易于维护的用户界面是至关重要的。而Vue.js作为一款现代化且流行的JavaScript框架&#xff0c;正是为了满足这些需求而诞生。它采用了MVVM架构模式&#xff0c;并通过数据驱动和组件化的方式&#xff0c;使…

linux建立基本网站

网站需求&#xff1a; 1.基于域名[www.openlab.com]可以访问网站内容为 welcome to openlab!!! 2.给该公司创建三个子界面分别显示学生信息&#xff0c;教学资料和缴费网站&#xff0c;基于[www.openlab.com/student] 网站访问学生信息 [www.openlab.com/data]网站访问教学资…

leetcode 67. 二进制求和

一、题目 二、解答 1.思路 1.1 思路1 转成2个二进制数字相加&#xff0c;之后再转回字符串 1.2 思路2 遍历字符串挨个相加&#xff1a; 补齐2个字符串到同样长度 while循环&#xff0c;如果指针>0不断循环如果a短&#xff0c;给字符串前插入&#xff08;a长度-b长度&a…

npm link 后怎么查看软连接和删除软连接的

一&#xff1a;在你的npm项目中&#xff0c;进行打包&#xff0c;形成一个dist文件 npm run build // 这是我的打包命令&#xff0c;具体可查看 package.json 文件 二&#xff1a; 打包完成后&#xff0c;运行pwd命令&#xff0c;可查看到你npm项目的路径。 pwd // 输出一…

Java开发笔记

一、参数校验 1、校验json字符串是否符合规范 &#xff08;1&#xff09;业务场景&#xff1a;接收前端传输过来的json串&#xff0c;需要将其写入数据库&#xff0c;写入之前需要校验其是否能够转换成对应实体类&#xff0c;以便后续从数据库读取   &#xff08;2&#xff0…

【Java 设计模式】创建型之工厂方法模式

文章目录 1. 定义2. 应用场景3. 代码实现4. 应用示例结语 在软件开发中&#xff0c;工厂方法模式是一种常见的创建型设计模式&#xff0c;它提供了一种将对象的实例化延迟到子类的方法。工厂方法模式通过定义一个创建对象的接口&#xff0c;但是让子类决定实例化哪个类。在本文…

C++ Primer 6.3 返回类型和return语句 知识点+练习题

C Primer 6.3 返回类型和return语句 无返回值函数有返回值的函数两个错误值是如何被返回的返回类类型的函数和调用运算符引用返回左值列表初始化返回值主函数main的返回值返回数组指针 递归练习题疑问待更新 无返回值函数 用在返回值类型为void的函数中&#xff0c;可以不写re…

若依基于jsencrypt实现前后端登录密码加密

若依虽然有加密解密功能&#xff0c;然后只有前端有&#xff0c;在用户点击保存密码的时候&#xff0c;会将密码保存到本地&#xff0c;但是为了防止密码泄露&#xff0c;所以在保存的时候&#xff0c;进行加密&#xff0c;在回显密码的时候进行解密显示&#xff0c;用户在登录…

29 旋转工具箱

效果演示 实现了一个菜单按钮的动画效果&#xff0c;当鼠标悬停在菜单按钮上时&#xff0c;菜单按钮会旋转315度&#xff0c;菜单按钮旋转的同时&#xff0c;菜单按钮旋转的8个小圆圈也会依次旋转360度&#xff0c;并且每个小圆圈的旋转方向和菜单按钮的旋转方向相反&#xff0…

数据结构期末复习(4)串 树和二叉树

串 在数据结构中&#xff0c;串是由零个或多个字符组成的有限序列。它是一种线性数据结构&#xff0c;常用于表示和处理文本、字符串等信息。 串的特点包括&#xff1a; 顺序性&#xff1a;串中的字符按照一定的先后顺序排列&#xff0c;每个字符都有一个唯一的位置。有限性&…

MATLAB - 利用非线性模型预测控制(Nonlinear MPC)来控制四旋翼飞行器

系列文章目录 前言 本示例展示了如何利用非线性模型预测控制&#xff08;MPC&#xff09;为四旋翼飞行器设计一个跟踪轨迹的控制器。 一、四旋翼模型 四旋翼飞行器有四个向上的旋翼。从四旋翼飞行器的质量中心出发&#xff0c;旋翼呈等距离的正方形排列。四旋翼飞行器动力学数…

uboot工作原理介绍

uboot其实和电脑的BIOS是一个原理&#xff0c;它主要做两件事: &#xff08;1&#xff09;初始化硬件&#xff1b; &#xff08;2&#xff09;将系统文件&#xff08;或者说是内核&#xff09;从flash中读出来加载到DDR里面执行。 给大家解释下面几个问题&#xff1a; 为什么…

zabbix6.4设置网络设备端口流量P95

P95概念&#xff1a; p95函数写法&#xff1a; 需要监控P95的设备如下&#xff1a; 先找到原来的端口接收发送速率的监控项&#xff1a; 可以看到他们归属于自动发现规则&#xff1a;端口表UP 找到自动发现规则&#xff1a; 点击创建监控项原型&#xff1a; 公式如下&#xff…

吴恩达-从人类反馈中进行强化学习RLHF

吴恩达-从人类反馈中进行强化学习RLHF https://www.bilibili.com/video/BV1R94y1P7QX?p1&vd_sourcee7939b5cb7bc219a05ee9941cd297ade 1、公开的LLM&#xff0c;Llama2&#xff0c; 使用LLM对同一个提示产生多个不同输出&#xff0c;然后人类评估这些输出。评估方法是对比…

数据结构【树+二叉树】

目录 线性表和非线性表 树的概念 树的存储表示 二叉树的概念 特殊二叉树 满二叉树 完全二叉树 二叉树的性质 二叉树的存储结构 顺序存储 链式存储 本篇我们开始进入数据结构中【树】的学习。 线性表和非线性表 逻辑结构&#xff1a;人想象出来的物理结构&#xf…