RocketMQ源码阅读-Message拉取与消费-Consumer篇
- 1. Consumer
- 2. PushConsumer
- 3. PushConsumer 订阅
- 3.1 subscribe订阅
- 3.2 registerMessageListener注册监听器
- 4. PushConsumer 消息队列Rebalance
- 4.1 Rebalance流程
- 4.2 Rebalance策略
- AllocateMessageQueueAveragely
- AllocateMessageQueueByMachineRoom
- AllocateMessageQueueAveragelyByCircle
- AllocateMessageQueueByConfig
- AllocateMessageQueueConsistentHash
- AllocateMachineRoomNearby
- 4.3 消费进度读取
- 5. 消息拉取
- 6. 消费消息
- 7. 过期消息清理
- 8. 发回失败消息到Broker
- 9. 消费进度
- load加载消费进度
- readOffset读取消费进度
- updateOffset更新消费进度
- persistAll持久化消费进度
- 10. 小结
上一篇分析了Broker中对于Message拉取与消费的支持。本篇继续分析Consumer中的拉取与消费消息的逻辑。
1. Consumer
RocketMQ提供了两种Consumer:
- PushConsumer:(接口MQPushConsumer)是最多使用的一种Consumer,名字是Push,实际是不断的Pull Broker的消息,不存在新消息时,Broker挂起请求(上一篇提到的挂起服务),知道有新的消息产生。实时性较高。
- PullConsumer:(接口MQPullConsumer)不是分析的重点
2. PushConsumer
PushConsumer(接口MQPushConsumer)涉及到的组件如下:
- RebalanceService:均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。当有新的 Consumer 的加入或移除,都会重新分配消息队列。
- PullMessageService:拉取消息服务,不断从 Broker 拉取消息,并提交消费任务到 ConsumeMessageService。
- ConsumeMessageService:消费消息服务,不断消费消息,并处理消费结果。
- RemoteBrokerOffsetStore:Consumer 消费进度管理,负责从 Broker 获取消费进度,同步消费进度到 Broker。
- ProcessQueue :消息处理队列。
- MQClientInstance :封装对 Namesrv,Broker 的 API调用,提供给 Producer、Consumer 使用。
他们之前的关系:
3. PushConsumer 订阅
查看官方的example方法。在example model中:
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(false);System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) == 0) {return ConsumeOrderlyStatus.SUCCESS;} else if ((this.consumeTimes.get() % 3) == 0) {return ConsumeOrderlyStatus.ROLLBACK;} else if ((this.consumeTimes.get() % 4) == 0) {return ConsumeOrderlyStatus.COMMIT;} else if ((this.consumeTimes.get() % 5) == 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");
}
示例中新建了一个DefaultMQPushConsumer并通过subscribe方法完成消息订阅,并通过registerMessageListener方法注册了一个消息监听器。
以上便是注册一个Consumer的流程。
PushConsumer对应接口MQPushConsumer,其默认实现为DefaultMQPushConsumer,提供了消息Topic订阅的功能。
3.1 subscribe订阅
其中进行消息订阅的接口为subscribe(…),DefaultMQPushConsumer只是对接口的一层封装,其具体方法实现DefaultMQPushConsumerImpl#subscribe(…):
public void subscribe(String topic, String subExpression) throws MQClientException {try {// 创建订阅数据SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subExpression);this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);if (this.mQClientFactory != null) {// 通过心跳同步Consumer信息到Brokerthis.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}} catch (Exception e) {throw new MQClientException("subscription exception", e);}
}
可以看到,此方法首先创建订阅数据,然后通过心跳同步Consumer信息到Broker。
- topic
- subExpression:订阅表达式
其中创建订阅数据的方法为FilterAPI.buildSubscriptionData(…):
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,String subString) throws Exception {SubscriptionData subscriptionData = new SubscriptionData();subscriptionData.setTopic(topic);subscriptionData.setSubString(subString);// 处理订阅表达式if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {subscriptionData.setSubString(SubscriptionData.SUB_ALL);} else {String[] tags = subString.split("\\|\\|");if (tags.length > 0) {for (String tag : tags) {if (tag.length() > 0) {String trimString = tag.trim();if (trimString.length() > 0) {subscriptionData.getTagsSet().add(trimString);subscriptionData.getCodeSet().add(trimString.hashCode());}}}} else {throw new Exception("subString split error");}}return subscriptionData;
}
此方法的主要逻辑是根据Topic和订阅表达式创建订阅数据。SubscriptionData是订阅数据实体:
public class SubscriptionData implements Comparable<SubscriptionData> {public final static String SUB_ALL = "*";private boolean classFilterMode = false;private String topic;private String subString;private Set<String> tagsSet = new HashSet<String>();private Set<Integer> codeSet = new HashSet<Integer>();private long subVersion = System.currentTimeMillis();private String expressionType = ExpressionType.TAG;@JSONField(serialize = false)private String filterClassSource;
}
其中的属性subVersion代表订阅版本,取得是当前系统时间。
- tagsSet:订阅列表
- codeSet:订阅列表的hash
3.2 registerMessageListener注册监听器
创建Consumer的示例代码中注册了一个监听器,其方法为DefaultMQPushConsumer#registerMessageListener(…):
@Override
public void registerMessageListener(MessageListenerOrderly messageListener) {this.messageListener = messageListener;this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
DefaultMQPushConsumerImpl#registerMessageListener:
public void registerMessageListener(MessageListener messageListener) {this.messageListenerInner = messageListener;
}
只是将监听器赋值给内部属性。
4. PushConsumer 消息队列Rebalance
在消息Consumer注册时,会触发Rebalance进行消息队列的重分配DefaultMQPushConsumerImpl#subscribe(…):
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
4.1 Rebalance流程
消息队列Rebalance是由类RebalanceService实现的,作为均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue ):
public class RebalanceService extends ServiceThread {// 等待间隔,单位:毫秒private static long waitInterval =Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));private final InternalLogger log = ClientLogger.getLog();// MQClient对象private final MQClientInstance mqClientFactory;public RebalanceService(MQClientInstance mqClientFactory) {this.mqClientFactory = mqClientFactory;}@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}@Overridepublic String getServiceName() {return RebalanceService.class.getSimpleName();}
}
是一个单独的线程,默认每20s执行一次Rebalance,还有两种触发方式:
- PushConsumer 启动时,调用 rebalanceService#wakeup(…) 触发
- Broker 通知 Consumer 加入 或 移除时,Consumer 响应通知,调用 rebalanceService#wakeup(…) 触发
调用方法MQClientInstance#doRebalance:
public void doRebalance() {for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();if (impl != null) {try {impl.doRebalance();} catch (Throwable e) {log.error("doRebalance exception", e);}}}
}
会遍历当前 Client 包含的 consumerTable( Consumer集合 ),执行消息队列分配。
调用方法MQConsumerInner#doRebalance,MQConsumerInner是接口,其实现为DefaultMQPushConsumerImpl,所以实际调用DefaultMQPushConsumerImpl#doRebalance:
@Override
public void doRebalance() {if (!this.pause) {this.rebalanceImpl.doRebalance(this.isConsumeOrderly());}
}
调用RebalanceImpl#doRebalance:
public void doRebalance(final boolean isOrder) {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}// 移除未订阅的topic对应的消息队列this.truncateMessageQueueNotMyTopic();
}// 移除未订阅的topic对应的消息队列
private void truncateMessageQueueNotMyTopic() {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();for (MessageQueue mq : this.processQueueTable.keySet()) {if (!subTable.containsKey(mq.getTopic())) {ProcessQueue pq = this.processQueueTable.remove(mq);if (pq != null) {pq.setDropped(true);log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);}}}
}
此方法获取所有Topic循环遍历,分配每个 topic 的消息队列,调用RebalanceImpl#rebalanceByTopic:
private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}case CLUSTERING: {// 获取 topic 对应的 队列 和 consumer信息Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {// 排序 消息队列 和 消费者数组。因为是在Client进行分配队列,排序后,各Client的顺序才能保持一致。List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;// 根据 队列分配策略 分配消息队列try {allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}// 更新消息队列boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;}
}
对广播和集群形式的messageModel,最终都会调用RebalanceImpl#updateProcessQueueTableInRebalance,更新消息处理队列,做两个操作:
- 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
- 增加 不在processQueueTable && 存在于mqSet 里的消息队列
源码如下:
参数mqSet :负载均衡结果后的消息队列数组
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {boolean changed = false;// 移除 在processQueueTable && 不存在于 mqSet 里的消息队列Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();// 处理不需要rebalance的消息if (mq.getTopic().equals(topic)) {if (!mqSet.contains(mq)) {pq.setDropped(true);// 移除不需要的消息队列if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);}} else if (pq.isPullExpired()) {// 队列拉取超时,进行清理switch (this.consumeType()) {case CONSUME_ACTIVELY:break;case CONSUME_PASSIVELY:pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",consumerGroup, mq);}break;default:break;}}}}// 增加 不在processQueueTable && 存在于mqSet 里的消息队列。List<PullRequest> pullRequestList = new ArrayList<PullRequest>();for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);continue;}this.removeDirtyOffset(mq);ProcessQueue pq = new ProcessQueue();// 计算需要从哪里pull消息long nextOffset = this.computePullFromWhere(mq);if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}}// 发起消息拉取请求this.dispatchPullRequest(pullRequestList);return changed;
}
可以看到,#rebalanceByTopic(…),
- 广播模式( BROADCASTING ) 下,分配 Topic 对应的所有消息队列
- 集群模式( CLUSTERING ) 下,分配 Topic 对应的部分消息队列
最终发起消息拉取请求,RebalancePushImpl#dispatchPullRequest(…):
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {for (PullRequest pullRequest : pullRequestList) {this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);}
}
该方法的作用就是发起消息拉取请求。该调用是PushConsumer不断拉取消息的起点。调用了DefaultMQPushConsumerImpl#executePullRequestImmediately(…):
public void executePullRequestImmediately(final PullRequest pullRequest) {this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
调用了PullMessageService#executePullRequestImmediately:
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();public void executePullRequestImmediately(final PullRequest pullRequest) {try {this.pullRequestQueue.put(pullRequest);} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);}
}@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");
}
将拉取请求,放进了请求队列中。PullMessageService会启动线程,异步不断执行拉取请求。调用的方法是PullMessageService#pullMessage:
private void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}
}
实际上还是会调用DefaultMQPushConsumerImpl#pullMessage方法拉取消息,这个后续流程在第6小结分析。
整体时序图为(来源https://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/?github&1601):
4.2 Rebalance策略
在Rebalance时,会获取Rebalance策略,对应的接口为AllocateMessageQueueStrategy,其有6个实现类:
AllocateMessageQueueAveragely
平均分配队列策略
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}// 平均分配// 第几个consumerint index = cidAll.indexOf(currentCID);// 余数,即多少消息队列无法平均分配int mod = mqAll.size() % cidAll.size();int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());// 有余数的情况下,[0, mod) 平分余数,即每consumer多分配一个节点;第index开始,跳过前mod余数int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;}@Overridepublic String getName() {return "AVG";}
}
分配示例:
** | Consumer * 2 可以整除 | Consumer * 3 不可整除 | Consumer * 5 无法都分配 |
---|---|---|---|
消息队列[0] | Consumer[0] | Consumer[0] | Consumer[0] |
消息队列[1] | Consumer[0] | Consumer[0] | Consumer[1] |
消息队列[2] | Consumer[1] | Consumer[1] | Consumer[2] |
消息队列[3] | Consumer[1] | Consumer[2] | Consumer[3] |
AllocateMessageQueueByMachineRoom
平均分配可消费的 Broker 对应的消息队列,源码就不深究了。
AllocateMessageQueueAveragelyByCircle
环状分配消息队列
AllocateMessageQueueByConfig
按配置分配消息队列
AllocateMessageQueueConsistentHash
一致性hash队列算法分配
AllocateMachineRoomNearby
基于机房近侧优先级的分配策略代理。可以指定实际的分配策略。如果任何消费者在机房中处于活动状态,则部署在同一台机器中的代理的消息队列应仅分配给这些消费者。否则,这些消息队列可以与所有使用者共享,因为没有活的使用者来垄断它们。(引用类说明)
4.3 消费进度读取
rebalance流程中,有一步是获取消费进度RebalanceImpl#updateProcessQueueTableInRebalance:
// 计算消息队列开始消费位置
long nextOffset = this.computePullFromWhere(mq);
调用的方法为RebalancePushImpl#computePullFromWhere(…):
@Override
public long computePullFromWhere(MessageQueue mq) {long result = -1;final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();switch (consumeFromWhere) {case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:case CONSUME_FROM_MIN_OFFSET:case CONSUME_FROM_MAX_OFFSET:case CONSUME_FROM_LAST_OFFSET: {long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {result = lastOffset;}// First start,no offsetelse if (-1 == lastOffset) {if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {result = 0L;} else {try {result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);} catch (MQClientException e) {result = -1;}}} else {result = -1;}break;}case CONSUME_FROM_FIRST_OFFSET: {long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {result = lastOffset;} else if (-1 == lastOffset) {result = 0L;} else {result = -1;}break;}case CONSUME_FROM_TIMESTAMP: {long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {result = lastOffset;} else if (-1 == lastOffset) {if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {try {result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);} catch (MQClientException e) {result = -1;}} else {try {long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),UtilAll.YYYYMMDDHHMMSS).getTime();result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);} catch (MQClientException e) {result = -1;}}} else {result = -1;}break;}default:break;}return result;
}
此方法是用来计算消息队列开始消费位置,有三种选项:
- 一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费。
- 一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费。
- 一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费。
5. 消息拉取
在rebalance流程的最后,分析到PullMessageService#pullMessage会调用DefaultMQPushConsumerImpl#pullMessage方法拉取消息。
PullMessageService#pullMessage源码为:
public class PullMessageService extends ServiceThread {private final InternalLogger log = ClientLogger.getLog();//拉取消息请求队列private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();//MQClient对象private final MQClientInstance mQClientFactory;// 定时器。用于延迟提交拉取请求private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "PullMessageServiceScheduledThread");}});public PullMessageService(MQClientInstance mQClientFactory) {this.mQClientFactory = mQClientFactory;}// 执行延迟拉取消息请求public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {if (!isStopped()) {this.scheduledExecutorService.schedule(new Runnable() {@Overridepublic void run() {PullMessageService.this.executePullRequestImmediately(pullRequest);}}, timeDelay, TimeUnit.MILLISECONDS);} else {log.warn("PullMessageServiceScheduledThread has shutdown");}}// 执行立即拉取消息请求public void executePullRequestImmediately(final PullRequest pullRequest) {try {this.pullRequestQueue.put(pullRequest);} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);}}// 执行延迟任务public void executeTaskLater(final Runnable r, final long timeDelay) {if (!isStopped()) {this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);} else {log.warn("PullMessageServiceScheduledThread has shutdown");}}public ScheduledExecutorService getScheduledExecutorService() {return scheduledExecutorService;}// 拉取消息private void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}@Overridepublic void shutdown(boolean interrupt) {super.shutdown(interrupt);ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);}@Overridepublic String getServiceName() {return PullMessageService.class.getSimpleName();}}
该类完成的服务是从 Broker 拉取消息,并提交消费任务到 ConsumeMessageService。
实际调用DefaultMQPushConsumerImpl#pullMessage方法拉取消息:
public void pullMessage(final PullRequest pullRequest) {final ProcessQueue processQueue = pullRequest.getProcessQueue();if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return;}// 设置队列最后拉取消息时间pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());// 判断consumer状态是否运行中。如果不是,则延迟拉取消息。try {this.makeSureStateOK();} catch (MQClientException e) {log.warn("pullMessage exception, consumer state not ok", e);this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);return;}if (this.isPause()) {log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);return;}// 判断是否超过最大持有消息数量。默认最大值为1000long cachedMessageCount = processQueue.getMsgCount().get();long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}if (!this.consumeOrderly) {if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);}return;}} else {if (processQueue.isLocked()) {if (!pullRequest.isLockedFirst()) {final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());boolean brokerBusy = offset < pullRequest.getNextOffset();log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);if (brokerBusy) {log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",pullRequest, offset);}pullRequest.setLockedFirst(true);pullRequest.setNextOffset(offset);}} else {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);log.info("pull message later because not locked in broker, {}", pullRequest);return;}}final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (null == subscriptionData) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);log.warn("find the consumer's subscription failed, {}", pullRequest);return;}final long beginTimestamp = System.currentTimeMillis();PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);switch (pullResult.getPullStatus()) {case FOUND:long prevRequestOffset = pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());long pullRT = System.currentTimeMillis() - beginTimestamp;DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset = Long.MAX_VALUE;if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());// 提交拉取到的消息到消息处理队列boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());// 提交消费请求DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);// 提交下次拉取消息请求if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());} else {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}}if (pullResult.getNextBeginOffset() < prevRequestOffset|| firstMsgOffset < prevRequestOffset) {log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",pullResult.getNextBeginOffset(),firstMsgOffset,prevRequestOffset);}break;case NO_NEW_MSG:pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;case NO_MATCHED_MSG:pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;case OFFSET_ILLEGAL:log.warn("the pull request offset illegal, {} {}",pullRequest.toString(), pullResult.toString());pullRequest.setNextOffset(pullResult.getNextBeginOffset());pullRequest.getProcessQueue().setDropped(true);DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {@Overridepublic void run() {try {DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),pullRequest.getNextOffset(), false);DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());log.warn("fix the pull request offset, {}", pullRequest);} catch (Throwable e) {log.error("executeTaskLater Exception", e);}}}, 10000);break;default:break;}}}@Overridepublic void onException(Throwable e) {if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("execute the pull request exception", e);}DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);}};boolean commitOffsetEnable = false;long commitOffsetValue = 0L;if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);if (commitOffsetValue > 0) {commitOffsetEnable = true;}}String subExpression = null;boolean classFilter = false;SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (sd != null) {if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {subExpression = sd.getSubString();}classFilter = sd.isClassFilterMode();}int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, // commitOffsettrue, // suspendsubExpression != null, // subscriptionclassFilter // class filter);// 核心!!拉取消息。如果拉取请求发生异常时,提交延迟拉取消息请求try {this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,pullCallback);} catch (Exception e) {log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);}
}
拉取消息的核心方法为PullAPIWrapper#pullKernelImpl(…):
/*** 拉取消息** @param mq 消息队列* @param subExpression 订阅表达式* @param subVersion 订阅版本号* @param offset 拉取队列开始位置* @param maxNums 拉取消息数量* @param sysFlag 拉取请求系统标识* @param commitOffset 提交消费进度* @param brokerSuspendMaxTimeMillis broker挂起请求最大时间* @param timeoutMillis 请求broker超时时长* @param communicationMode 通讯模式* @param pullCallback 拉取回调* @return 拉取消息结果。只有通讯模式为同步时,才返回结果,否则返回null* @throws MQClientException 当寻找不到 broker 时,或发生其他client异常* @throws RemotingException 当远程调用发生异常时* @throws MQBrokerException 当 broker 发生异常时。只有通讯模式为同步时才会发生该异常* @throws InterruptedException 当发生中断异常时*/
public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final String expressionType,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 获取BrokerFindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);if (null == findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);}// 拉取消息请求if (findBrokerResult != null) {{// check versionif (!ExpressionType.isTagType(expressionType)&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {throw new MQClientException("The broker[" + mq.getBrokerName() + ", "+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);}}int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);requestHeader.setExpressionType(expressionType);String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);}// 发送拉取消息请求PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
该方法首先获取Broker信息,包括Broker的地址以主从节点信息,然后执行拉取消息请求。
获取Broker消息先调用方法PullAPIWrapper#recalculatePullFromWhichNode方法,获取消息队列拉取消息对应的Broker编号。源码:
// 存储消息队列与Broer的映射,拉取消息时,通过此映射获取拉取请求对应的Broker
private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>(32);private volatile boolean connectBrokerByUser = false;private volatile long defaultBrokerId = MixAll.MASTER_ID;public long recalculatePullFromWhichNode(final MessageQueue mq) {// 是否使用默认的Brokerif (this.isConnectBrokerByUser()) {return this.defaultBrokerId;}// 若消息队列映射拉取Broker存在,则返回映射Broker编号AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);if (suggest != null) {return suggest.get();}// 返回Broker的主节点编号return MixAll.MASTER_ID;
}
再调用MQClientInstance#findBrokerAddressInSubscribe方法,获取Broker的信息,源码:
// 存储Broker名字 和 Broker地址相关 Map
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =new ConcurrentHashMap<String, HashMap<Long, String>>();public FindBrokerResult findBrokerAddressInSubscribe(final String brokerName, // broker名字final long brokerId, // broker编号final boolean onlyThisBroker //是否必须是该broker
) {String brokerAddr = null;boolean slave = false;boolean found = false;// 获得Broker信息HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {brokerAddr = map.get(brokerId);slave = brokerId != MixAll.MASTER_ID;found = brokerAddr != null;// 如果不强制获得,选择一个Brokerif (!found && !onlyThisBroker) {Entry<Long, String> entry = map.entrySet().iterator().next();brokerAddr = entry.getValue();slave = entry.getKey() != MixAll.MASTER_ID;found = true;}}// 找到Broker,返回if (found) {return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));}// 找不到返回空return null;
}
可以看到,此方法获取Broker信息,包括Broker的地址以及是否为从节点等信息。
上文分析的DefaultMQPushConsumerImpl#pullMessage方法中,注册了一个PullCallback回调,来处理拉取结果:
PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);switch (pullResult.getPullStatus()) {case FOUND:...//省略
调用了PullAPIWrappe#processPullResult方法,源码:
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,final SubscriptionData subscriptionData) {PullResultExt pullResultExt = (PullResultExt) pullResult;// 更新消息队列拉取消息Broker编号的映射this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());if (PullStatus.FOUND == pullResult.getPullStatus()) {// 解析消息ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);// 根据订阅信息消息tagCode匹配合适消息List<MessageExt> msgListFilterAgain = msgList;if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());for (MessageExt msg : msgList) {if (msg.getTags() != null) {if (subscriptionData.getTagsSet().contains(msg.getTags())) {msgListFilterAgain.add(msg);}}}}if (this.hasHook()) {FilterMessageContext filterMessageContext = new FilterMessageContext();filterMessageContext.setUnitMode(unitMode);filterMessageContext.setMsgList(msgListFilterAgain);this.executeHook(filterMessageContext);}for (MessageExt msg : msgListFilterAgain) {String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (traFlag != null && Boolean.parseBoolean(traFlag)) {msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));}MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,Long.toString(pullResult.getMinOffset()));MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,Long.toString(pullResult.getMaxOffset()));}// 设置消息列表pullResultExt.setMsgFoundList(msgListFilterAgain);}// 清空消息二进制数组pullResultExt.setMessageBinary(null);return pullResult;
}
该方法实现的功能主要是:
- 更新消息队列拉取消息 Broker 编号的映射
- 解析消息,并根据订阅信息消息 tagCode匹配合适消息
这样消息就被拉取到了,后面就可以进行消息消费了,在这里会提交拉取到的消息到消息处理队列,在PullCallback回调中:
// 提交拉取到的消息到消息处理队列
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);
调用方法为ProcessQueue#putMessage(…):
// 消息映射读写锁
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
//消息映射 key:消息队列位置
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
// 消息数
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();
private final Lock lockConsume = new ReentrantLock();
/*** A subset of msgTreeMap, will only be used when orderly consume*/
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
// 添加消息最大队列位置
private volatile long queueOffsetMax = 0L;
private volatile boolean dropped = false;
private volatile long lastPullTimestamp = System.currentTimeMillis();
private volatile long lastConsumeTimestamp = System.currentTimeMillis();
private volatile boolean locked = false;
private volatile long lastLockTimestamp = System.currentTimeMillis();
// 是否正在消费
private volatile boolean consuming = false;
// Broker累计消息数量 计算公式 = queueMaxOffset - 新添加消息数组[n - 1].queueOffset
// Acc = Accumulation
// cnt = (猜测)对比度
private volatile long msgAccCnt = 0;public boolean putMessage(final List<MessageExt> msgs) {boolean dispatchToConsume = false;try {this.lockTreeMap.writeLock().lockInterruptibly();try {// 添加消息int validMsgCnt = 0;for (MessageExt msg : msgs) {MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);if (null == old) {validMsgCnt++;this.queueOffsetMax = msg.getQueueOffset();msgSize.addAndGet(msg.getBody().length);}}msgCount.addAndGet(validMsgCnt);// 计算是否正在消费if (!msgTreeMap.isEmpty() && !this.consuming) {dispatchToConsume = true;this.consuming = true;}// Broker累计消息数量if (!msgs.isEmpty()) {MessageExt messageExt = msgs.get(msgs.size() - 1);String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);if (property != null) {long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();if (accTotal > 0) {this.msgAccCnt = accTotal;}}}} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("putMessage exception", e);}return dispatchToConsume;
}
此方法提交消息给消费者存储,返回是否提交成功。
然后会调用ConsumeMessageConcurrentlyService#submitConsumeRequest(…)方法提交消费请求,这个下一节分析。
至此消费者就可以消费消息了。
整体时序图为(出处https://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/?github&1601):
6. 消费消息
上一节分析完了消息拉取的过程,这一节来看消息消费的过程。
上一节消息拉取后,将消息提交存储到了ProcessQueue。将有专门的线程完成对ProcessQueue的消费,这个线程是由ConsumeMessageConcurrentlyService 提供的。入口是ConsumeMessageConcurrentlyService#submitConsumeRequest(…)方法,源码如下:
// 消费线程池队列
private final BlockingQueue<Runnable> consumeRequestQueue;
// 消费线程池
private final ThreadPoolExecutor consumeExecutor;@Override
public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();// 根据消息量,看是否走批量逻辑if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {// 提交消费请求this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {// 被拒绝,会提交延迟消息消费请求this.submitConsumeRequestLater(consumeRequest);}} else {for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {// 提交消费请求this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}// 被拒绝,会提交延迟消息消费请求this.submitConsumeRequestLater(consumeRequest);}}}
}
可以看到该方法的功能是提交立即消费消息的请求,被拒绝会提交延迟消息消费请求,拒绝的原因可能是线程池满了。
提交延迟消息的方法为ConsumeMessageConcurrentlyService#submitConsumeRequestLater:
private void submitConsumeRequestLater(final ConsumeRequest consumeRequest) {this.scheduledExecutorService.schedule(new Runnable() {@Overridepublic void run() {ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);}}, 5000, TimeUnit.MILLISECONDS);
}
实际上是提交给了scheduledExecutorService,延迟任务线程池。
提交的立即消费消息和延迟消费消息请求,都是ConsumeRequest实体进行的封装,其是ConsumeMessageConcurrentlyService的内部类,源码为:
class ConsumeRequest implements Runnable {// 消费消息列表private final List<MessageExt> msgs;// 消息处理队列private final ProcessQueue processQueue;// 消息队列private final MessageQueue messageQueue;public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {this.msgs = msgs;this.processQueue = processQueue;this.messageQueue = messageQueue;}public List<MessageExt> getMsgs() {return msgs;}public ProcessQueue getProcessQueue() {return processQueue;}@Overridepublic void run() {if (this.processQueue.isDropped()) {// 废弃队列不进行消费log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}//监听器MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;// 上下文ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);// 消费结果状态ConsumeConcurrentlyStatus status = null;defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext = null;if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;// 消费结果ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}// 消费消息status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);hasException = true;}long consumeRT = System.currentTimeMillis() - beginTimestamp;// 解析消费结果if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {// 消费结果为空,设置状态为稍后重新消费log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);// 处理消费结果if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}public MessageQueue getMessageQueue() {return messageQueue;}}
该方法处理消费请求,并将请求提交给listener。这里的listener就是创建消费者时提供的。
在上述代码最后一步是处理消费结果,ConsumeMessageConcurrentlyService#processConsumeResult(…):
public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest
) {int ackIndex = context.getAckIndex();// 消息空,返回if (consumeRequest.getMsgs().isEmpty())return;// 计算从consumeRequest.msgs[0]到consumeRequest.msgs[ackIndex]的消息消费结果switch (status) {case CONSUME_SUCCESS:if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}// 统计成功/失败数量int ok = ackIndex + 1;int failed = consumeRequest.getMsgs().size() - ok;this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;case RECONSUME_LATER:ackIndex = -1;// 统计成功/失败数量this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size());break;default:break;}// 处理消费失败的消息switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:// 广播模式,无论是否消费失败,不发回消息到Broker,只打印Logfor (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;case CLUSTERING:// 发回消息失败到BrokerList<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);boolean result = this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}// 发回Broker失败的消息,直接提交延迟重新消费if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}// 移除消费成功消息,并更新最新消费进度long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}
}
此方法统计消费成功/失败的数量,并且处理消费失败的消息,消费失败时,如果是CLUSTERING模式,会将消费失败的消息发回到broker,这个逻辑在Broker篇分析过,Consumer的处理逻辑在第9小节分析。
这里继续看移除消费成功消息,并更新最新消费进度的流程。
移除消费成功消息调用方法ProcessQueue#removeMessage(…):
public long removeMessage(final List<MessageExt> msgs) {long result = -1;final long now = System.currentTimeMillis();try {this.lockTreeMap.writeLock().lockInterruptibly();this.lastConsumeTimestamp = now;try {if (!msgTreeMap.isEmpty()) {result = this.queueOffsetMax + 1;// 移除消息int removedCnt = 0;for (MessageExt msg : msgs) {MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());if (prev != null) {removedCnt--;msgSize.addAndGet(0 - msg.getBody().length);}}msgCount.addAndGet(removedCnt);if (!msgTreeMap.isEmpty()) {result = msgTreeMap.firstKey();}}} finally {this.lockTreeMap.writeLock().unlock();}} catch (Throwable t) {log.error("removeMessage exception", t);}return result;
}
7. 过期消息清理
过期消息清理逻辑入口在ConsumeMessageConcurrentlyService#cleanExpireMsg(…):
public void start() {this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {cleanExpireMsg();}}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
上面这段代码是ConsumeMessageConcurrentlyService启东时调用的。继续看源码:
private void cleanExpireMsg() {Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();while (it.hasNext()) {Map.Entry<MessageQueue, ProcessQueue> next = it.next();ProcessQueue pq = next.getValue();pq.cleanExpiredMsg(this.defaultMQPushConsumer);}
}
这个方法默认周期:15min。
其调用的方法为ProcessQueue#cleanExpiredMsg(…):
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {// 顺序消费时,直接返回if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {return;}// 每次循环最多移除16条int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;for (int i = 0; i < loop; i++) {MessageExt msg = null;// 获取消息,并看是否超时,不超时,结束循环try {this.lockTreeMap.readLock().lockInterruptibly();try {if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {msg = msgTreeMap.firstEntry().getValue();} else {break;}} finally {this.lockTreeMap.readLock().unlock();}} catch (InterruptedException e) {log.error("getExpiredMsg exception", e);}try {// 超时消息,发回BrokerpushConsumer.sendMessageBack(msg, 3);log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());try {// 判断此时消息是否依然是第一条,若是,则进行移除this.lockTreeMap.writeLock().lockInterruptibly();try {if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {try {removeMessage(Collections.singletonList(msg));} catch (Exception e) {log.error("send expired msg exception", e);}}} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("getExpiredMsg exception", e);}} catch (Exception e) {log.error("send expired msg exception", e);}}
}
消息清理每15min执行一次,如果超时,会发回超时消息到broker,并从ProcessQueue移除。
8. 发回失败消息到Broker
在消息消费失败和超时时,都会发回到Broker。调用的方法为DefaultMQPushConsumerImpl#sendMessageBack(…):
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {try {String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());// Consumer发回消息this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());} catch (Exception e) {// 异常时,使用Client内置Producer发回消息log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());String originMsgId = MessageAccessor.getOriginMessageId(msg);MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);newMsg.setFlag(msg.getFlag());MessageAccessor.setProperties(newMsg, msg.getProperties());MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());this.mQClientFactory.getDefaultMQProducer().send(newMsg);} finally {msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}
}
方法主要是使用Consumer发回消息,异常时会使用内置的Producer发回消息。
Consumer发回消息调用方法MQClientAPIImpl#consumerSendMessageBack(…):
/*** Consumer发回消息* @param addr Broker地址* @param msg 消息* @param consumerGroup 消费分组* @param delayLevel 延迟级别* @param timeoutMillis 超时* @param maxConsumeRetryTimes 消费最大重试次数* @throws RemotingException 当远程调用发生异常时* @throws MQBrokerException 当Broker发生异常时* @throws InterruptedException 当线程中断时*/
public void consumerSendMessageBack(final String addr,final MessageExt msg,final String consumerGroup,final int delayLevel,final long timeoutMillis,final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);requestHeader.setGroup(consumerGroup);requestHeader.setOriginTopic(msg.getTopic());requestHeader.setOffset(msg.getCommitLogOffset());requestHeader.setDelayLevel(delayLevel);requestHeader.setOriginMsgId(msg.getMsgId());requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());
}
此方法就是构建请求,并执行发回流程。
9. 消费进度
Consumer会存储消费进度,操作的接口为OffsetStore,其有两个实现类:
- RemoteBrokerOffsetStore :Consumer 集群模式 下,使用远程 Broker 消费进度
- LocalFileOffsetStore :Consumer 广播模式下,使用本地 文件 消费进度
下面分别分析下其接口。
load加载消费进度
OffsetStore#load(…)是接口定义的方法,RemoteBrokerOffsetStore并没有实现是个空方法:
@Override
public void load() {
}
也就是不进行加载,实际读取消费进度时,从 Broker 获取。
只有LocalFileOffsetStore#load(…)对齐进行了实现,源码如下:
@Override
public void load() throws MQClientException {// 从本地硬盘读取消费进度OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());// 打印每个消息队列的消费进度for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);log.info("load consumer's offset, {} {} {}",this.groupName,mq,offset.get());}}
}
可以看出本方法从本地文件加载消费进度到内存。存储消费进度的类为OffsetSerializeWrapper:
public class OffsetSerializeWrapper extends RemotingSerializable {private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =new ConcurrentHashMap<MessageQueue, AtomicLong>();public ConcurrentMap<MessageQueue, AtomicLong> getOffsetTable() {return offsetTable;}public void setOffsetTable(ConcurrentMap<MessageQueue, AtomicLong> offsetTable) {this.offsetTable = offsetTable;}
}
可以看出这个类非常简单,只是提供了一个Map存储消费进度。
readOffset读取消费进度
readOffset读取消费进度,读取消费进度有以下几种读取类型:
- READ_FROM_MEMORY :从内存读取
- READ_FROM_STORE :从存储( Broker 或 文件 )读取
- MEMORY_FIRST_THEN_STORE :优先从内存读取,读取不到,从存储读取
先来看下LocalFileOffsetStore的实现,LocalFileOffsetStore#readOffset(…):
@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {if (mq != null) {switch (type) {case MEMORY_FIRST_THEN_STORE:case READ_FROM_MEMORY: {AtomicLong offset = this.offsetTable.get(mq);if (offset != null) {return offset.get();} else if (ReadOffsetType.READ_FROM_MEMORY == type) {return -1;}}case READ_FROM_STORE: {OffsetSerializeWrapper offsetSerializeWrapper;try {offsetSerializeWrapper = this.readLocalOffset();} catch (MQClientException e) {return -1;}if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);if (offset != null) {this.updateOffset(mq, offset.get(), false);return offset.get();}}}default:break;}}return -1;
}
可以看到,主要实现了从文件读取消费进度。
再来看下RemoteBrokerOffsetStore的实现,RemoteBrokerOffsetStore#readOffset(…):
@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {if (mq != null) {switch (type) {case MEMORY_FIRST_THEN_STORE:case READ_FROM_MEMORY: {AtomicLong offset = this.offsetTable.get(mq);if (offset != null) {return offset.get();} else if (ReadOffsetType.READ_FROM_MEMORY == type) {return -1;}}case READ_FROM_STORE: {try {long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);AtomicLong offset = new AtomicLong(brokerOffset);this.updateOffset(mq, offset.get(), false);return brokerOffset;}// No offset in brokercatch (MQBrokerException e) {return -1;}//Other exceptionscatch (Exception e) {log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);return -2;}}default:break;}}return -1;
}
实现了从broker读取消费进度。
updateOffset更新消费进度
RemoteBrokerOffsetStore 与 LocalFileOffsetStore 实现相同:
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {if (mq != null) {AtomicLong offsetOld = this.offsetTable.get(mq);if (null == offsetOld) {offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));}if (null != offsetOld) {if (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);} else {offsetOld.set(offset);}}}
}
是采用原子方式更新进度。
persistAll持久化消费进度
LocalFileOffsetStore的实现,LocalFileOffsetStore#persistAll(…):
@Override
public void persistAll(Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty())return;OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {if (mqs.contains(entry.getKey())) {AtomicLong offset = entry.getValue();offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);}}String jsonString = offsetSerializeWrapper.toJson(true);if (jsonString != null) {try {MixAll.string2File(jsonString, this.storePath);} catch (IOException e) {log.error("persistAll consumer offset Exception, " + this.storePath, e);}}
}
会将消费进度写入文件。
RemoteBrokerOffsetStore的实现,RemoteBrokerOffsetStore#persistAll(…):
@Override
public void persistAll(Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty())return;final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();if (!mqs.isEmpty()) {for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {MessageQueue mq = entry.getKey();AtomicLong offset = entry.getValue();if (offset != null) {if (mqs.contains(mq)) {try {this.updateConsumeOffsetToBroker(mq, offset.get());log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",this.groupName,this.mQClientFactory.getClientId(),mq,offset.get());} catch (Exception e) {log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);}} else {unusedMQ.add(mq);}}}}if (!unusedMQ.isEmpty()) {for (MessageQueue mq : unusedMQ) {this.offsetTable.remove(mq);log.info("remove unused mq, {}, {}", mq, this.groupName);}}
}
此实现持久化指定消息队列数组的消费进度到 Broker,并移除非指定消息队列。
persistAll方法会被DefaultMQPullConsumerImpl#persistConsumerOffset调用:
@Override
public void persistConsumerOffset() {try {this.makeSureStateOK();Set<MessageQueue> mqs = new HashSet<MessageQueue>();Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();mqs.addAll(allocateMq);this.offsetStore.persistAll(mqs);} catch (Exception e) {log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);}
}
继而被MQClientInstance#persistAllConsumerOffset方法调用:
private void persistAllConsumerOffset() {Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();impl.persistConsumerOffset();}
}
这个方法是在MQClientInstance初始化时,提交到线程池中的MQClientInstance#startScheduledTask:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
每5000ms执行一次。
不仅如此,在拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
10. 小结
本篇主要分析了Consumer中对于Message拉取与消费的处理流程。
- Consumer订阅Topic并注册监听器
- 通过Rebalance策略进行Consuer和队列的分配,指定consumer可以消费哪些队列
- 进行消息的拉取
- 进行消息的消费
- 如果消息消费失败或过期,会发回到Broker
- 最后是对消费进度的处理,加载读取以及持久化