RocketMQ 消费者源码解读:消费过程、负载原理、顺序消费原理

B站学习地址


上一遍学习了三种常见队列的消费原理,本次我们来从源码的角度来证明上篇中的理论。


1、准备


RocketMQ 版本

<!-- RocketMQ -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.0</version>
</dependency>

消费者代码

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(consumerGroup = "my-consumer_asyn-topic", topic = "rocketmq-topic")
public class RocketmqConsumer1 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();System.out.println("RocketMQ 001" + new String(body));}
}

2、源码阅读


2-1、对使用@RocketMQMessageListener的类进行增强,生成监听器ListenerContainer,并启动


在 RocketMQMessageListener 包下面有一个 Bean后置处理器,会对每个使用了 @RocketMQMessageListener 的类进行增强,生成 监听器,并启动这个监听器

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2-2、基于顺序消费和并发消费创建对应的Service,创建处理消息的的线程池


org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start 这个方法很长,这里简化一下来看看我们比较关注的几个点,详情看源码

// 默认就是 CREATE_JUST
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;public synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:// ... 省略 ...// 如果是顺序消费就创建顺序消费的 监听器 ConsumeMessageOrderlyServiceif (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} // 创建并发消费的监听器 ConsumeMessageConcurrentlyServiceelse if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}// ... 省略 ...// 把当前的消费者组和消费者存入本地的 ConcurrentHashMapboolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);// ... 省略 ...// 进行下一步的启动mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;// ... 省略 ...}// ... 省略 ...
}

上篇讲到RocketMQ消费的模式是一个线程去不停的拉消息,然后丢到一个线程池里面去消费,刚刚我们看到根据是否是顺序消费,创建不同的 service,这个线程池就是在这个地方创建的。

在这里插入图片描述

在这里插入图片描述

注: 默认情况下,consumeThreadMin = 20 、consumeThreadMax = 64


2-3、拉消息和负载均衡的开始


在这里插入图片描述


在这里插入图片描述


2-4、队列和消费者之间的负载均衡


虽然拉消息的代码在前面,但没有关系,负载和拉消息都是新开启线程去执行,我个人觉得负载均衡放在前面讲更合适一些

一个topic中的queue数量大多数时候是固定的,但消费者却不是,很多时候我们会动态的去调整消费者的数量,而在上一期的理论中得知消费组中的消费者数量如果大于queue的数量是没用的,下面通过源码来看它是如何实现的


this.rebalanceService.start();

在这里插入图片描述


循环遍历每一个消费者去负载均衡


在这里插入图片描述


consumerTable 数据的由来参看【2-4-1、consumerTable 数据的由来】


负载的核心代码 rebalanceByTopic

消费模式有集群消费和广播消费,负载均衡肯定是基于:集群消费

private boolean rebalanceByTopic(final String topic, final boolean isOrder) {boolean balanced = true;switch (messageModel) {case BROADCASTING: {// ... 省略 ...}case CLUSTERING: {// 获取当前 topic 的 queueSet<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);// 发起 netty请求,获取当前组下面的消费者List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);// ... 省略 ... 参数校验if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {// 使用策略进行分配,默认的策略是平均分配allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(), mqAll, cidAll);} catch (Throwable e) {log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);return false;}Set<MessageQueue> allocateResultSet = new HashSet<>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}// 对分配的结果进行 设置boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("client rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}balanced = allocateResultSet.equals(getWorkingMessageQueue(topic));}break;}default:break;}return balanced;
}

  1. 如何进入rebalanceByTopic,参看【2-4-2、进入 rebalanceByTopic】
  2. 默认的平均分配策略如何执行的,参看【2-4-3、平均分配策略原理】
  3. 分配结果参看【2-4-4、重置队列和消费者之间的关系】

2-4-1、consumerTable 数据的由来

在【2-3、拉消息和负载均衡的开始】开始的第一张图中 start开始之前执行了一个 registerConsumer 方法,这个方法就是把当前消费者和其组 consumerTable

在这里插入图片描述

2-4-2、进入 rebalanceByTopic

在这里插入图片描述
在这里插入图片描述


2-4-3、平均分配策略原理

  1. 这里假设当前queue只有 1个,消费者有 2个,当前消费者是第一个
  2. 下面的 index 、mod 等其它参数都是基于这个假设来计算的
public class AllocateMessageQueueAveragely extends AbstractAllocateMessageQueueStrategy {@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {List<MessageQueue> result = new ArrayList<>();if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return result;}// index = 1int index = cidAll.indexOf(currentCID);// mod = 2int mod = mqAll.size() % cidAll.size();// averageSize = 1int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());// startIndex = 1int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;// range = 0int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}// result 为空数组return result;}@Overridepublic String getName() {return "AVG";}
}

通过上面的计算可以得出,当消费者的数量大于队列数量的时候,返回值是 空数组


2-4-4、重置队列和消费者之间的关系

重置的操作分三步

  1. 删除进程中与当前消费者绑定的队列
  2. 删除broker中的绑定的关系
  3. 建立新的关系
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {boolean changed = false;// 删除进程中与当前消费者绑定的队列HashMap<MessageQueue, ProcessQueue> removeQueueMap = new HashMap<>(this.processQueueTable.size());Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();removeQueueMap.put(mq, pq);// ... 删除操作 ...}// 删除broker中的绑定的关系for (Entry<MessageQueue, ProcessQueue> entry : removeQueueMap.entrySet()) {MessageQueue mq = entry.getKey();ProcessQueue pq = entry.getValue();if (this.removeUnnecessaryMessageQueue(mq, pq)) {this.processQueueTable.remove(mq);changed = true;log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);}}// 建立新的关系boolean allMQLocked = true;List<PullRequest> pullRequestList = new ArrayList<>();for (MessageQueue mq : mqSet) {// ... 建立新的关系 ...// 并把新的结果存入 pullRequestList 这很重要}if (!allMQLocked) {mQClientFactory.rebalanceLater(500);}// 基于新的绑定关系去获取消息this.dispatchPullRequest(pullRequestList, 500);return changed;
}

如果上一步的平均分配的结果为 空数组,那在这里就会删除所有的绑定关系,并且无法建立新的关系,也就说明当消费组中的消费者的数量大于queue的数量是无用的


dispatchPullRequest,这个方法的实现类只有如下代码

@Override
public void dispatchPullRequest(final List<PullRequest> pullRequestList, final long delay) {for (PullRequest pullRequest : pullRequestList) {if (delay <= 0) {this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);} else {this.defaultMQPushConsumerImpl.executePullRequestLater(pullRequest, delay);}}
}

这个操作的最终结果就是把 pullRequest,放入 messageRequestQueue 中,delay不为空的时候,会开启一个定时认为每隔delay时间往messageRequestQueue里面塞一次, 这个点很重要


如果分配给当前消费者处理的 queue有2个,那这里就会生成两个 pullRequest


2-5、拉消息


解释完负载均衡,让我们再次回到【2-3】,现在来看看2-3提到的拉消息逻辑

在这里插入图片描述


2-5-1、固定一个线程去拉消息

在这里插入图片描述


  1. 在【2-4-4】中得出,分配给当前消费者的queue会生成一个 PullRequest,然后以500ms一次塞进 messageRequestQueue里面去
  2. take 方法是一个阻塞的方法,如果队列中没有数据,它会阻塞一直等待有数据为止
  3. public class PullRequest implements MessageRequest

2-5-2、拉消息的过程

在这里插入图片描述


pullMessage 是拉消息的核心代码,简单来说就是各种判断,组装参数去请求broker获取消息,这里要关注的几个参数

  1. CommunicationMode.ASYNC 使用异步拉取参数
  2. pullCallback 拉到数据后的回调方法

在这里插入图片描述


在真实发起netty请求之前也是一些参数的处理,流程参看下面的截图

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述


成功拉到消息后就调用回调的 onSuccess 方法


2-5-3、消息回调方法 onSuccess

org.apache.rocketmq.client.consumer.PullCallback#onSuccess
PullCallback 有onSuccess和onException,在 onSuccess 中 有个 switch语句,对于正常拉到消息的状态为 FOUND,所以来着重看这个部分的代码块

public void onSuccess(PullResult pullResult) {if (pullResult != null) {pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);switch(pullResult.getPullStatus()) {case FOUND:long prevRequestOffset = pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());long pullRT = System.currentTimeMillis() - beginTimestamp;DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset = 9223372036854775807L;if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty()) {firstMsgOffset = ((MessageExt)pullResult.getMsgFoundList().get(0)).getQueueOffset();DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), (long)pullResult.getMsgFoundList().size());boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());// 消息丢入线程池消费,分并发消费和顺序消费DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);// 继续把请求放入队列,由单线程继续去拉取消息 默认 pullInterval = 0if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0L) {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());} else {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}} else {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) {DefaultMQPushConsumerImpl.log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", new Object[]{pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset});}break;// ... 省略 ...}}

DefaultMQPushConsumerImpl.this.executePullRequestImmediately 这个方法在负载均衡的最后一步已经讲到了,其实就是把 pullRequest 存入 messageRequestQueue 中


2-5-4、并发消费

@Override
public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {// 丢进线程池去消费this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {this.submitConsumeRequestLater(consumeRequest);}} else {// 消息量过大,分批消费,逻辑一样}
}

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

这里可以看到并发消息,只是直接就把消息组装成一个可执行的 Runnable,然后交给线程池去执行


2-5-5、顺序消费

顺序消息可不一样,顺序消息必须要求同一个队列的消息只能单线程去消费才可以保证绝对的顺序

在这里插入图片描述


可以看到顺序消息也是直接把消息丢进了线程池,但是在进行消息处理的时候,使用队列进行加锁了,相当于这个队列只能单线程消费了,后续逻辑就都一样了,最终走到我们自己重写的 onMessage 里面

在这里插入图片描述


3、总结

看完上面的源码你最少可以回答下面几个问题

  1. RocketMQ消费的流程是怎么样的
  2. 为什么消费者大于queue的时候,消费者就没用了
  3. 顺序消费如何保证顺序的
  4. 添加消费者的时候,如何重新分配的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/787963.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vs2022断点找bug出错(打上100个断点)

初步分析&#xff1a;故障出自-具体功能模块 进一步分析&#xff1a;故障出自-该功能代码流程 进一步分析&#xff1a;从该功能起点-终点&#xff0c;一路打100个断点

ICLR 2024 | 鸡生蛋蛋生鸡?再论生成数据能否帮助模型训练

ChatGPT狂飙160天&#xff0c;世界已经不是之前的样子。 新建了人工智能中文站https://ai.weoknow.com 每天给大家更新可用的国内可用chatGPT资源 发布在https://it.weoknow.com 更多资源欢迎关注 随着生成模型&#xff08;如 ChatGPT、扩散模型&#xff09;飞速发展&#x…

Nomad Web更新没有最快只有更快

大家好&#xff0c;才是真的好。 很长时间没介绍运行在浏览器中的Notes客户端即Nomad Web更新情况。 不用安装&#xff0c;直接使用&#xff0c;还可以完美地兼容适应各种操作系统&#xff0c;Nomad Web一定是Notes/Domino产品现在和将来重点发展的用户访问模式。 不过&…

【CKA模拟题】一文教你用StorageClass轻松创建PV

题干 For this question, please set this context (In exam, diff cluster name) kubectl config use-context kubernetes-adminkubernetesYour task involves setting up storage components in a Kubernetes cluster. Follow these steps: Step 1: Create a Storage Class…

书生 浦语 大模型趣味 Demo

目录 一. 部署 InternLM2-Chat-1.8B 模型进行智能对话 1. 环境准备 2. 下载模型参数 3. 运行Demo 二. 部署实战营 八戒-Chat-1.8B 模型 1. 下载Demo仓库 2. 启动web服务端加载八戒模型&#xff1a; 3. 将SSH远程端口映射到本地 4. 在本地浏览器打开&#xff1a;http:/…

Python抓取抖音直播间数据:技术探索与实践

目录 一、引言 二、技术准备 三、分析抖音直播间网页结构 四、编写爬虫代码 五、处理反爬虫机制 六、数据清洗与存储 七、总结 一、引言 随着互联网的快速发展&#xff0c;直播行业已成为当下的热门领域。抖音作为其中的佼佼者&#xff0c;吸引了大量的用户和主播。对于…

元宇宙虚拟空间的场景构造(二)

前言 该文章主要讲元宇宙虚拟空间的场景构造&#xff0c;基本核心技术点&#xff0c;不多说&#xff0c;直接引入正题。 场景的构造 使用引入的天空模块 this.sky new Sky(this); 在Sky模块里&#xff0c;有设置对其中的阳光进行不同时间段的光线处理。而天空又是怎么样的…

STM32 DWT数据观察触发器作为延时函数的使用

STM32 DWT数据观察触发器作为延时函数的使用 &#x1f4d1;DWT(Data Watchpoint and Trace数据观察触发器&#xff09;描述 &#x1f4dd;DWT是属于处理器内核单元中的调试组件之一&#xff0c;由四个比较器组成。它们可配置为&#xff1a;硬件监视点或对ETM或PC采样器或数据地…

dcoker 下redis设置密码

修改Docker里面Redis密码 Redis是一个开源的内存数据结构存储系统&#xff0c;常用于缓存、消息队列和数据持久化等场景。在使用Docker部署Redis时&#xff0c;默认情况下是没有设置密码的&#xff0c;这可能会导致安全隐患。因此&#xff0c;为了保证数据的安全性&…

蓝桥杯真题Day44 倒计时10天 练了六道真题 !

[蓝桥杯 2020 省 B2] 平面切分 题目描述 平面上有 N 条直线, 其中第 i 条直线是 yAi​⋅xBi​ 。请计算这些直线将平面分成了几个部分。 输入格式 第一行包含一个整数 N。 以下 N 行, 每行包含两个整数 Ai​,Bi​。 输出格式 一个整数代表答案。 代码表示 #include<…

基于SpringBoot的图书馆管理系统设计与实现

介绍 基于&#xff1a;java8 SpringBoot thymeleaf MySQL8.0.17 mybatis-plus maven Xadmin 实现图书馆管理系统 系统要实现如下的基本管理功能&#xff1a; &#xff08;1&#xff09;用户分为两类&#xff1a;管理员&#xff0c;一般用户。 &#xff08;2&#xff09…

Day57:WEB攻防-SSRF服务端请求Gopher伪协议无回显利用黑白盒挖掘业务功能点

目录 SSRF-原理&挖掘&利用&修复 SSRF无回显解决办法 SSRF漏洞挖掘 SSRF协议利用 http:// &#xff08;常用&#xff09; file:/// &#xff08;常用&#xff09; dict:// &#xff08;常用&#xff09; sftp:// ldap:// tftp:// gopher:// &#xff08;…

群晖NAS使用Docker部署大语言模型Llama 2结合内网穿透实现公网访问本地GPT聊天服务

文章目录 1. 拉取相关的Docker镜像2. 运行Ollama 镜像3. 运行Chatbot Ollama镜像4. 本地访问5. 群晖安装Cpolar6. 配置公网地址7. 公网访问8. 固定公网地址 随着ChatGPT 和open Sora 的热度剧增,大语言模型时代,开启了AI新篇章,大语言模型的应用非常广泛&#xff0c;包括聊天机…

Nginx漏洞之未授权访问和源码泄漏漏洞处理

一、漏洞描述 某次安全扫描&#xff0c;发现某平台存在资源&#xff1a;未授权访问和源码泄漏&#xff1b;攻击者可能获取到网站的配置文件、敏感数据存储位置和访问凭证等信息。这意味着攻击者可以获得对网站的完全或部分控制权&#xff0c;进而进行恶意篡改、删除或添加恶意…

6.8物联网RK3399项目开发实录-驱动开发之RTC实时时钟的使用(wulianjishu666)

90款行业常用传感器单片机程序及资料【stm32,stc89c52,arduino适用】 链接&#xff1a;https://pan.baidu.com/s/1M3u8lcznKuXfN8NRoLYtTA?pwdc53f RTC 使用 简介 AIO-3399J 开发板上有 一个集成于 RK808 上的RTC(Real Time Clock)&#xff0c;主要功能有时钟&#xff0c…

【PowerDesigner】PGSQL反向工程过程已中断

问题 反向工程过程已中断,原因是某些字符无法通过ANSI–&#xff1e;UTF-16转换进行映射。pg导入sql时报错&#xff0c;一查询是power designer 反向工程过程已中断&#xff0c;某些字符无法通过ANSI–>UTF-16转换进行映射&#xff08;会导致数据丢失&#xff09; 处理 注…

代码随想录第28天| 131.分割回文串 93.复原IP地址 78.子集

131.分割回文串 131. 分割回文串 - 力扣&#xff08;LeetCode&#xff09; 代码随想录 (programmercarl.com) 带你学透回溯算法-分割回文串&#xff08;对应力扣题目&#xff1a;131.分割回文串&#xff09;| 回溯法精讲&#xff01;_哔哩哔哩_bilibili 给你一个字符串 s&…

连接Redis不支持集群错误,ERR This instance has cluster support disabled,解决方案

1. 问题背景 调整redis的配置后&#xff0c;启动程序时&#xff0c; 会报如下错误&#xff1a; [redis://172.16.0.8xxx]: ERR This instance has cluster support disabledSuppressed: io.lettuce.core.RedisCommandExecutionException: ERR This instance has cluster supp…

苹果安卓双端短视频直播系统源码,带后台-支持二开和采集

搭建教程 1.PHP5.6-7.2 mysql 5.6 redis5.0 nginx1.15 2.宝塔就完全满足了 我刚开了台服务器&#xff0c;建议用阿里云的 我这个是腾讯云 先让服务器 自己装着 时间比较长 3.搭建前需要准备的东西 腾讯云直播、七牛存储、百度语音、腾讯地图等好多东西 七牛存储…

Micron FY24 Q2业绩强劲,凭内存实现翻盘

根据TechInsights数据显示&#xff0c;美光科技24财年第二季度业绩强劲&#xff0c;公司通过技术创新和产能优化&#xff0c;成功抓住了AI服务器和其他高性能应用带来的市场需求增长机遇。尽管短期内面临供应紧张的问题&#xff0c;但美光通过加大研发投入和产能转换力度&#…