rocketmq-pull模式-消费重平衡和拉取PullTaskImpl线程

1、观察consumer的线程模型

使用arthas分析
在这里插入图片描述

  • MQClientFactoryScheduledThread 定时任务线程 (和push模式一致)
    定时任务线程,包含如下任务:
    每2分钟更新nameServer列表
    每30秒更新topic的路由信息
    每30秒检查broker的存活,发送心跳请求
    每5秒持久化消费队列的offset。如果是广播模式,持久化在本地;如果是集群模式,反馈给broker
    每分钟调整线程池大小(实际上并没有作用。因为最终执行是空方法)

  • PullMessageService 这个线程是专门给push模式使用的。在pull模式下,没有作用

  • RebalanceService 重平衡线程。每20秒执行一次
    pull模式和push模式都用该线程。区别在于messageQueue变更后,处理有区别。
    详细看
    org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
    org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance
    org.apache.rocketmq.client.impl.consumer.RebalanceImpl#messageQueueChanged
    后面的类图也会红色标记出来

  • PullMsgThread-lite_pull_consumer_testN线程
    pull模式下,每个队列messageQueue会包装成一个PullTaskImpl任务,从broker中拉取msg。放到线程池中死循环执行。这是线程名称。下文有描述

整体工作流程图如下:

在这里插入图片描述

步骤分析

步骤1、启动后触发重平衡线程RebalanceService

重平衡任务org.apache.rocketmq.client.impl.consumer.RebalanceService#run()
重平衡线程获取到topic对应的Set<> mqSet,当前消费者组group的所有消费者List<> cidAll
cid就是消费端的唯一标识。格式如下:“ip@pid#时间戳”,比如127.01.01.01@1723#2926328724786400
关于重平衡的前期部分,已经在push模式有描述,参考文章rocketmq-push模式-消费侧重平衡-类流程图分析
不再详述

步骤2、messageQueue有更新,调用MessageQueueListener

监听器类org.apache.rocketmq.client.consumer.MessageQueueListener是专门给pull模式使用的

public void updateAssignQueueAndStartPullTask(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {MessageModel messageModel = defaultLitePullConsumer.getMessageModel();switch (messageModel) {case BROADCASTING:updateAssignedMessageQueue(topic, mqAll);updatePullTask(topic, mqAll);break;case CLUSTERING:updateAssignedMessageQueue(topic, mqDivided);updatePullTask(topic, mqDivided);break;default:break;}}

监听变化后,会做两件事updateAssignedMessageQueue和updatePullTask

步骤3、更新updateAssignedMessageQueue

org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl#assignedMessageQueue
该对象是pull模式专用。

public class AssignedMessageQueue {private final ConcurrentHashMap<MessageQueue, MessageQueueState> assignedMessageQueueState;private RebalanceImpl rebalanceImpl;public AssignedMessageQueue() {assignedMessageQueueState = new ConcurrentHashMap<MessageQueue, MessageQueueState>();}public void setRebalanceImpl(RebalanceImpl rebalanceImpl) {this.rebalanceImpl = rebalanceImpl;}

从该类来看,最重要的字段是assignedMessageQueueState,存储的是订阅的所有topic的所有MessageQueue,做map存储。
MessageQueueState的信息如下

 private class MessageQueueState {private MessageQueue messageQueue;private ProcessQueue processQueue;private volatile boolean paused = false;private volatile long pullOffset = -1;private volatile long consumeOffset = -1;private volatile long seekOffset = -1;private MessageQueueState(MessageQueue messageQueue, ProcessQueue processQueue) {this.messageQueue = messageQueue;this.processQueue = processQueue;}

pullOffset 、consumeOffset 、seekOffset 这几个字段有比较大的作用,消费过程中,有多处会设置这几个值。较复杂,后续再做统一的分析

updateAssignedMessageQueue()方法,主要的目的是对assignedMessageQueueState做处理,重平衡后做新增add或者移除操作

步骤4、更新updatePullTask

private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();while (it.hasNext()) {Map.Entry<MessageQueue, PullTaskImpl> next = it.next();if (next.getKey().getTopic().equals(topic)) {if (!mqNewSet.contains(next.getKey())) {next.getValue().setCancelled(true);it.remove();}}}startPullTask(mqNewSet);}

用到的队列MessageQueue,taskTable会维护对应的PullTaskImpl线程任务。
org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl#taskTable

针对该topic,如果当前内存的taskTable中的MessageQueue已经不再有效,就会将PullTaskImpl的状态设置成Cancel,使得对应线程终止运行。

startPullTask()

 private void startPullTask(Collection<MessageQueue> mqSet) {for (MessageQueue messageQueue : mqSet) {if (!this.taskTable.containsKey(messageQueue)) {PullTaskImpl pullTask = new PullTaskImpl(messageQueue);this.taskTable.put(messageQueue, pullTask);this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);}}}

是将当前队列,包装成PullTaskImpl的线程任务,提交给线程池scheduledThreadPoolExecutor。在arthas上,线程名称就是PullMsgThread-lite_pull_consumer_testN。核心线程数是20

步骤5、PullTaskImpl线程执行

对队列做流控策略。检查cachedMessageCount、cachedMessageSizeInMiB、processQueue.getMaxSpan()
如果不满足要求,则延时再做任务

long cachedMessageCount = processQueue.getMsgCount().get();long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);}return;}if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);}return;}if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {log.warn("The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);}return;}

步骤6、同步请求broker,拉取消息

try {SubscriptionData subscriptionData;String topic = this.messageQueue.getTopic();if (subscriptionType == SubscriptionType.SUBSCRIBE) {subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);} else {String subExpression4Assign = topicToSubExpression.get(topic);subExpression4Assign = subExpression4Assign == null ? SubscriptionData.SUB_ALL : subExpression4Assign;subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression4Assign);}PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());if (this.isCancelled() || processQueue.isDropped()) {return;}switch (pullResult.getPullStatus()) {case FOUND:final Object objLock = messageQueueLock.fetchLockObject(messageQueue);synchronized (objLock) {if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {processQueue.putMessage(pullResult.getMsgFoundList());submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));}}break;case OFFSET_ILLEGAL:log.warn("The pull request offset illegal, {}", pullResult.toString());break;default:break;}updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);} catch (InterruptedException interruptedException) {

关键代码是PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
该方法是同步调用,默认一次拉取10条消息

步骤7、将消息msg放到consumeRequestCache中

private void submitConsumeRequest(ConsumeRequest consumeRequest) {try {consumeRequestCache.put(consumeRequest);} catch (InterruptedException e) {log.error("Submit consumeRequest error", e);}}

org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl#consumeRequestCache

步骤8、任务PullTaskImpl循环执行

PullTaskImpl的run方法,最后是将this,又重新投到线程池中,实现循环执行。

 if (!this.isCancelled()) {scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);} else {log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);}

查看官方消费的例子,调用poll方法,从consumeRequestCache上获取msg

public class LitePullConsumerSubscribe {public static volatile boolean running = true;public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static void main(String[] args) throws Exception {DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");litePullConsumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);litePullConsumer.subscribe("TopicTest", "*");litePullConsumer.start();try {while (running) {List<MessageExt> messageExts = litePullConsumer.poll();System.out.println(new Date());System.out.printf("%s%n", messageExts);}} finally {litePullConsumer.shutdown();}}
}

关键代码是litePullConsumer.poll();死循环执行。
poll方法,从consumeRequestCache上获取msg

public synchronized List<MessageExt> poll(long timeout) {try {checkServiceState();if (timeout < 0) {throw new IllegalArgumentException("Timeout must not be negative");}if (defaultLitePullConsumer.isAutoCommit()) {maybeAutoCommit();}long endTime = System.currentTimeMillis() + timeout;ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);if (endTime - System.currentTimeMillis() > 0) {while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);if (endTime - System.currentTimeMillis() <= 0) {break;}}}

总结

后续有空再总结。主要是比较push和pull模式。

后续分析

offset在消费拉取过程中,是如何使用的?和broker是如何交互的?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/66110.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ungoogled Chromium127 编译指南 MacOS 篇(二)- 项目要求

1. 引言 在开始编译 Ungoogled Chromium 之前&#xff0c;我们需要确保系统满足所有必要的硬件和软件要求。由于浏览器编译是一个资源密集型的任务&#xff0c;合适的硬件配置和完整的软件环境至关重要。本文将详细介绍编译 Ungoogled Chromium 所需的各项要求。 2. 硬件要求…

51单片机——共阴数码管实验

数码管中有8位数字&#xff0c;从右往左分别为LED1、LED2、...、LED8&#xff0c;如下图所示 如何实现点亮单个数字&#xff0c;用下图中的ABC来实现 P2.2管脚控制A&#xff0c;P2.3管脚控制B&#xff0c;P2.4管脚控制C //定义数码管位选管脚 sbit LSAP2^2; sbit LSBP2^3; s…

调试:用电脑开发移动端网页,然后用手机真机调试

一、背景 电脑开发移动端&#xff0c;然后想真机调试... 二、实现 2.1、电脑和手机链接相同局域网 2.2、pnpm run dev 启动项目 2.3、浏览器访问 localhost:3001/login 2.4、Windowsr 输入cmd&#xff0c;在cmd输入 ipconfig 2.5、浏览器访问 ip地址加/login 2.6、手机端…

Kbuild学习知识点

1.Kbuild本质&#xff1a;一个可扩展、可配置的Makefile框架&#xff0c;递归式Makefile&#xff0c;菜单式配置。 2.Kbuild构成&#xff1a; Makefile:顶层目录下的Makefile.config:内核的配置文件arch/S(ARCH)/Makefile:跟平台架构相关的Makefilescripts/Makefile.*:通用编…

《Vue3实战教程》39:Vue3无障碍访问

如果您有疑问&#xff0c;请观看视频教程《Vue3实战教程》 无障碍访问​ Web 无障碍访问 (也称为 a11y) 是指创建可供任何人使用的网站的做法——无论是身患某种障碍、通过慢速的网络连接访问、使用老旧或损坏的硬件&#xff0c;还是仅处于某种不方便的环境。例如&#xff0c;…

抢先体验:人大金仓数据库管理系统KingbaseES V9 最新版本 CentOS 7.9 部署体验

一、简介 KingbaseES 是中国人大金仓信息技术股份有限公司自主研发的一款通用关系型数据库管理系统&#xff08;RDBMS&#xff09;。 作为国产数据库的杰出代表&#xff0c;它专为中国市场设计&#xff0c;广泛应用于政府、金融、能源、电信等关键行业&#xff0c;以高安全性…

基于 GitHub API 的 Issue 和 PR 自动化解决方案

文章目录 摘要引言优化 Issue 和 PR 管理的方法工具选择流程优化 自动化 Issue 和 PR 管理代码逻辑详解获取 Issue 数据为 Issue 添加标签将 Issue 分配给开发者主逻辑 实际运行效果进一步扩展QA 环节总结参考资料 摘要 在开源项目中&#xff0c;Issue 和 Pull Request&#x…

趣味编程:心形曲线

目录 1.序言 2.代码展示 3.代码详解 3.1 头文件包含 3.2 绘制坐标轴函数 3.3 main 函数主体部分 4. 小结 1.序言 2025年的第一篇博客就用这个笛卡尔心形图开篇吧&#xff0c;寓意着新年大家能够有心有所属&#xff0c;祝诸位程序猿 / 程序媛 能够早点遇到自己的另一半。…

安装和配置 Apache 及 PHP

安装和配置 Apache 及 PHP # 1. 停止当前 Apache 服务 sudo apachectl stop# 2. 清除现有的 Apache 配置和文件 sudo rm -rf /etc/apache2 sudo rm -rf /usr/sbin/httpd sudo rm -rf /Library/WebServer# 3. 使用 Homebrew 安装 Apache brew install httpd# 4. 启动 Apache su…

解决uniapp H5页面限制输入框只能输数字问题

工作记录 最最近在做 uniapp 开发的移动端 H5 页面&#xff0c;有个需求是金额输入框只能输入数字&#xff0c;不能输入小数点和其他字符&#xff0c;经过各种尝试&#xff0c;发现其他字符可以通过正则过滤掉&#xff0c;但是输入小数点的话&#xff0c;因为没有触发 input 和…

group by 执行顺序

后面也会持续更新&#xff0c;学到新东西会在其中补充。 建议按顺序食用&#xff0c;欢迎批评或者交流&#xff01; 缺什么东西欢迎评论&#xff01;我都会及时修改的&#xff01; 感谢各位大佬写的文章让我学到很多东西&#xff01;只是在各位大佬的基础加了我自己的思路&a…

通过爬虫方式实现视频号助手发布视频

1、将真实的cookie贴到解压后目录中cookie.txt文件里,修改python代码里的user_agent和video_path, cover_path等变量的值,最后运行python脚本即可; 2、运行之前根据import提示安装一些常见依赖,比如requests等; 3、2025年1月份最新版; 代码如下: import json import…

Docker入门常用命令总结

1.从远程仓库拉取一个纯净的镜像 docker pull docker .io/centos 2.创建并进入容器&#xff08;左外右内&#xff09; docker run --name xxx -dit 镜像id&#xff08;镜像名称:Tag&#xff09; /bin/bash 【参数必须放在镜像ID之前】 -i 让Docker分配一个伪终端&#xff0c;并…

初学stm32 --- FSMC驱动LCD屏

目录 FSMC简介 FSMC框图介绍 FSMC通信引脚介绍 FSMC_NWE 的作用 FSMC_NWE 的时序关系 FSMC_NOE 的含义 FSMC_NOE 的典型用途 FSMC_NOE 的时序关系 使用FSMC驱动LCD FSMC时序介绍 时序特性中的 OE ILI9341重点时序&#xff1a; FSMC地址映射 HADDR与FSMC_A关系 LCD的…

CSS系列(47)-- Animation Timeline详解

前端技术探索系列&#xff1a;CSS Animation Timeline详解 ⏱️ 致读者&#xff1a;探索动画时间线的艺术 &#x1f44b; 前端开发者们&#xff0c; 今天我们将深入探讨 CSS Animation Timeline&#xff0c;这个强大的动画控制特性。 基础概念 &#x1f680; 时间线定义 …

Nginx - 整合lua 实现对POST请求的参数拦截校验(不使用Openresty)

文章目录 概述步骤 1: 安装 Nginx 和 Lua 模块步骤 2: 创建 Lua 脚本用于参数校验步骤 3: 配置 Nginx 使用 Lua 脚本写法二&#xff1a; 状态码写法三 &#xff1a; 返回自定义JSON复杂的正则校验 步骤 4: 测试和验证ngx.HTTP_* 枚举值 概述 一个不使用 OpenResty 的 Nginx 集…

GRAPE——RLAIF微调VLA模型:通过偏好对齐提升机器人策略的泛化能力(含24年具身模型汇总)

前言 24年具身前沿模型大汇总 过去的这两年&#xff0c;工作之余&#xff0c;我狂写大模型与具身的文章&#xff0c;加之具身大火&#xff0c;每周都有各种朋友通过CSDN私我及我司「七月在线」寻求帮助/指导(当然&#xff0c;也欢迎各大开发团队与我司合作共同交付&#xff09…

Appium 2.0:移动自动化测试的革新之旅

关注开源优测不迷路 大数据测试过程、策略及挑战 测试框架原理&#xff0c;构建成功的基石 在自动化测试工作之前&#xff0c;你应该知道的10条建议 在自动化测试中&#xff0c;重要的不是工具 在移动应用开发的领域中&#xff0c;Appium 作为一款强大的自动化测试工具&#xf…

Mysql SQL 超实用的7个日期算术运算实例(10k)

文章目录 前言1. 加上或减去若干天、若干月或若干年基本语法使用场景注意事项运用实例分析说明2. 确定两个日期相差多少天基本语法使用场景注意事项运用实例分析说明3. 确定两个日期之间有多少个工作日基本语法使用场景注意事项运用实例分析说明4. 确定两个日期相隔多少个月或多…

VSCode设置ctrl或alt+mouse(left)跳转

总结&#xff1a; &#xff08;1&#xff09;VSCode初次远程连接服务器时&#xff0c;需要在服务器上下载 python 拓展&#xff0c;然后选择对应的环境 &#xff08;2&#xff09;VSCode设置ctrl或altmouse(left)跳转到定义