消息中间件之RocketMQ源码分析(三)

RocketMQ中的Consumer启动流程

RocketMQ客户端中有两个独立的消费者实现类分别为DefaultMQPullConsumer和DefaultMQPushConsumer,

DefaultMQPullConsumer

DefaultMQPullConsumer,该消费者使用时需要用户主动从Broker中Pull消息和消费消息,提交消费位点

继承关系图

在这里插入图片描述

核心属性

  • namesrvAddr:继承自ClientConfig,表示RocketMQ集群的Namesrv地址,如果是多个,则用逗号分开
    如:127.0.0.1:9876,127.0.0.2:9876
  • clientIP:使用客户端的程序所在机器的IP地址,目前支持IPV4和IPV6,同时排除了本地环会地址(127.0.xxx.xxx)
    和私有内网地址(192.168.xxx.xxx),如果在Docket中运行,获取的IP地址是容器所在的IP地址,而非宿主主机的IP地址
  • instanceName:实例名,顾名思义每个实例都需要取不一样的名字,加入要在多个机器上部署
    多个程序进程,那么每个进程的实例名必须不相同,否则程序会启动失败,因为在创建MQClient时,
    会用到IP和instancename名称来
    在这里插入图片描述

在这里插入图片描述

  • vipChannelEnabled:这是一个boolean值,表示是否开启VIP通道。VIP通道和非VIP通道的区别是使用不同的端口号进行通信
  • clientCallbackExecutorThreads:客户端回调线程数。该线程数等于Netty通信层回调线程的个数,默认值为
    Runtime.getRuntime().availableProcessors();表示当前有效的CPU个数
  • pollNameServerInterval:获取Topic路由信息间隔,单位为ms,默认为30000ms(30s)
  • heartbeatBrokerInterval:客户端和Broker心跳间隔,单位为ms,默认30000ms(30s)
  • persistCOnsumerOffsetInterval:持久化消费位点时间间隔,单位为ms,默认为5000ms(5s)
  • defaultMQPullConsumer:默认pull消费者的具体实现
  • consumerGroup:消费者组名字
  • brokerSuspendMaxTimeMills:在长轮询模式下,Broker的最大挂起请求时间,建议不要修改此值
  • consumerTimeoutMillsWhenSuspend:在长轮询模式下,消费者的最大请求超时时间,必须比brokerSuspendMaxTimeMills大,不建议修改
  • messageModel:消费模式,现在支持集群模式消费和广播模式消费
  • messageQueueListener:消息路由信息变化时回调处理监听器,一般在重新平衡时被调用
  • offsetStore:位点存储模块。集群模式位点会持久化到Broker中,广播模式持久化到本地文件中(某个实例消费失败,生产者也不会重发),位点存储模块有两个实现类,分别为RemoteBrokerOffsetStore和LocalFileOffsetStore
  • allocateMessageQUeueStrategy:消费Queue分配策略管理器,默认是平均分配策略
    private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
  • maxReconsumeTimes:最大重试次数,可以配置

核心方法

  • registerMessageQueueListener():注册队列变化监听器,当队列发生变化是会被监听到
    在这里插入图片描述

  • pull():从Broker中Pull消息,如果有PullCallback参数,则表示异步拉取
    在这里插入图片描述

  • pullBlockIfNotFound():长轮询方式拉取,如果没有拉取到消息,那么Broker会讲请求Hold住一段时间,
    当有消息来临时再发送pull请求
    在这里插入图片描述

  • updateConsumeOffset():更新某一个Queue的消费位点
    在这里插入图片描述

  • fetchConsumeOffset():查找某个Queue的消费位点
    在这里插入图片描述

  • sendMessageBack():如果消费发送失败,则可以讲消息重新发回Broker,这个消费者组延迟一段时间后可以再消费(也就是重试)
    在这里插入图片描述

  • fetchSubscribeMessageQueues():获取一个Topic的全部Queue信息

在这里插入图片描述

Pull启动流程

在这里插入图片描述

  • 1.最初创建defaultMQPullConsumerImpl时的状态为ServiceState.CREATE_JSUT,然后设置消费者的默认启动状态为失败

在这里插入图片描述
在这里插入图片描述

  • 2.检查消费者的配置比,如消费者组名、消费类型、Queue的分配策略等参数是否符合规范,将订阅关系数据发给Rebalance服务对象
    在这里插入图片描述
    在这里插入图片描述

  • 3.校验消费者实例名,如果时默认的名字,则更改为当前的程序进程id
    在这里插入图片描述

  • 4.获取一个MQClientInstance,如果MQClientInstance已经初始化,则直接返回初始化的实例。这是核心对象,每个ClientID缓存一个实例
    在这里插入图片描述

  • 5.设置Rebalance对象消费组、消费类型、Queue分配策略、MQClientInstance等参数
    在这里插入图片描述

  • 6.对BrokerAPI的封装类pullAPIWrapper进行初始化,同时注册消息,过滤filter
    在这里插入图片描述

  • 7.初始化位点管理器并加载位点信息,位点管理器分为本地管理和远程管理,集群消费时
    消费位点保存在Broker中,由远程管理器管理,广播消息时位点在本地,由本地管理其管理
    在这里插入图片描述

  • 8.本地注册消费者实例,如果注册成功,则表示消费者启动成功
    在这里插入图片描述

DefaultMQPushConsumer

大部分属性、方法和DefaultMQPullConsumer是一样的

核心属性和方法

  • defaultMQPushConsumerImpl:默认的Push消费者具体实现类
  • consumeFromWhere:一个枚举,表示从什么位点开始消费,
    CONSUME_FROM_LAST_OFFSET:默认从上次消费的位点开始消费,相当于断点继续
    CONSUME_FROM_TIMESTAMP:从指定时间开始消费
    CONSUME_FROM_FIRST_OFFSET:从ConsumeQueue的最小位点开始消费
  • consumeTimestamp:表示从哪一时刻开始消费,时间格式为yyyyMMDDHHmmss,默认半小时前,当consumeFromWhere=CONSUME_FROM_TIMESTAMP时,consumeTimestamp设置的值才生效
  • allocateMessageQueueStrategy:消费者订阅topic-queue策略
  • subscription:订阅关系,表示当前消费者订阅了哪些Topic的哪些Tag
  • messageListener:消息Push回调监听器
  • consumeThreadMin:最小消费线程数,必须小于consumeThreadMax
    consumeThreadMax:最大线程数,必须大于consumeThreadMin
  • adjustThreadPoolNumsThreshold:动态调整消费线程池的线程数大小,开源版本不支持
  • consumeConcurrentlyMaxSpan:并发消息的最大位点差,,如果Pull消息的位点差超过该值,拉取变慢
  • pullThresholdForQueue:一个Queue能缓存的最大消息数,超过该值则采取拉取流控措施,默认是1000
  • pullThresholdSizeForQueue:一个Queue最大能缓存的消息字节数,单位是MB,默认是10MB
  • pullThresholdForTopic:一个Topic最大能缓存的消息数。超过该值则采取拉取流控措施,该字段值默认是-1,该值根据pullThresholdForQueue的配置决定是否生效,pullThresholdForTopic的优先级低于pullThresholdForQueue
  • pullThreasholdSizeForTopic:一个Topic最大能缓存的消息字节数,单位是MB,默认为-1,结合pullThresholdSizeForQueue配置项生效,该配置项的优先级低于pullThresholdSizeForQueue
  • pullInterval:拉取间隔,单位为ms
  • consumeMessageBatchMaxSize:消费者每次批量消费时,最多消费多少条消息,默认是1
  • pullBatchSize:一次最大拉取多少条消息,默认32条
  • postSubscriptionWhenPull:每次拉取消息时是否更新订阅关系,默认false
  • maxReconsumeTimes:最大重试次数,默认-1,表示最大重试次数为16次
  • suspendCurrentQueueTimeMillis为段轮询场景设置的挂起时间,比如顺序消息场景
  • consumeTimeout:消费超时时间,单位为min,默认是15

Push启动流程

在这里插入图片描述

  • 1-7和Pull模式类似
  • 8.初始化消费服务并启动,之所以用户"感觉"消息是Broker主动推送给自己的,
    是因为DefaultMQPushConsumer通过Pull服务将消息
    拉取到本地,再通过Callbakc的形式,将本地消息Push给用户的消费代码,
    DefaultMQPushConsumer和DefaultMQPullConsumer
    获取消息的方式一样,本质上都是拉取。

消费服务分为两种,即并行消费服务和顺序消费服务,对应的实现类分别是
ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService
根据用户监听器继承的不同接口初始化不同的消费服务程序

在这里插入图片描述

  • 9.启动MQClientInstance实例
    在这里插入图片描述
  • 10.更新本地订阅关系和路由信息,通过Broker检查是否支持消费者的过滤类型;
    向集群中的所有Broker发送消费者组的心跳信息
    在这里插入图片描述
  • 11.立即执行一次Rebalance
    this.mQClientFactory.rebalanceImmediately();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/659050.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue-router 实现页面路由

vue-router介绍 vue 的官方路由组件 功能包括 嵌套路由映射动态路由选择模块化、基于组件的路由配置路由参数、查询、通配符HTML5 的 history 模式 和 hash 模式 vue-router使用 结合 tabs 组件,实现页面路由 安装 vant-ui 实现底部导航栏 Tabbar-CSDN博客 重点…

图的学习

图的基本概念和术语 图的定义:图是由顶点的有穷非空集合和顶点之间的边的集合组成的,G表示,V是图G中顶点的集合,E是图G中边的集合 无向图:任意两点的边都是无向边组成的图(无向边:&#xff08…

R高级绘图 | P1 | 带边缘分布散点图 | 代码注释 + 结果解读

新系列 —— R高级绘图,准备整理所有曾经绘制过的图图和未来需要的图图们的代码!预计这个系列会囊括所有常见图形,只提供高级绘图代码,基础绘图主要在 R语言绘图 系列中进行介绍,这个系列咱们主打:需要XX图…

【Web前端实操21】商城官网_白色导航

今日份实现白色导航栏部分,也就是第三部分,效果如图中划线所示: 本次实现代码如之前的全局样式不再赘述,如有需要可以去我博客的Web前端实操19或者20自行查看。 本次主要更新mi.css和index.htm。 实现导航栏所需要的CSS样…

2015年苏州大学837复试机试C/C++

2015年苏州大学复试机试 第一题 题目 有36块砖&#xff0c;现在有36个人&#xff0c;男人能搬4块&#xff0c;女人能搬3块&#xff0c;小孩子两人搬一块&#xff0c;求一次搬完这些砖要男人&#xff0c;女人&#xff0c;小孩多少人&#xff1f; 代码 #include <iostrea…

仰暮计划|“那时候在生产队下面,集体干活,吃大锅饭,由队里分粮食,吃不饱饭是常事,队里分的粮食就那么点,想要吃饱真的太难了”

希望未来的中国越来越好&#xff0c;大家的生活也越来越好 老人是1955年在河南省洛阳市洛宁县的一个小山村里出生的&#xff0c;前半辈子为了生活&#xff0c;为了孩子而打拼&#xff0c;虽然经历了不少的苦难&#xff0c;但后半辈子也算是苦尽甘来&#xff0c;生活美满。现在就…

《Is dataset condensation a silver bullet for healthcare data sharing?》

一篇数据浓缩在医疗数据集应用中的论文。 其实就是在医疗数据集上使用了data condensation的方法&#xff0c;这里使用了DM的方式&#xff0c;并且新增了浓缩时候使用不同的网络。 1. 方法 数据浓缩DC的目的是&#xff1a; E x ∼ P D [ L ( φ θ O ( x ) , y ) ] ≃ E x ∼…

【Vue3+Vite】Vue3视图渲染技术 快速学习 第二期

文章目录 一、模版语法1.1 插值表达式和文本渲染1.1.1 插值表达式 语法1.1.2 文本渲染 语法 1.2 Attribute属性渲染1.3 事件的绑定 二、响应式基础2.1 响应式需求案例2.2 响应式实现关键字ref2.3 响应式实现关键字reactive2.4 扩展响应式关键字toRefs 和 toRef 三、条件和列表渲…

考研高数(共轭根式)

1.定义 共轭根式&#xff1a;是指两个不等于零的根式A、B&#xff0c;若它们的积AB不含根式&#xff0c;则称A、B互为共轭根式。 共轭根式的一个显著特点是通过相乘能把根号去掉&#xff0c;这是很有帮助的 2.常用的共轭根式 3.例题 1&#xff09;求极限 2&#xff09;证明…

常见分类网络的结构

VGG16 图片来自这里 MobilenetV3 small和large版本参数,图片来着这里 Resnet 图片来自这里

【Deep Dive: AI Webinar】数据合作和开源人工智能

【深入探讨人工智能】网络研讨系列总共有 17 个视频。我们按照视频内容&#xff0c;大致上分成了 3 个大类&#xff1a; 1. 人工智能的开放、风险与挑战&#xff08;4 篇&#xff09; 2. 人工智能的治理&#xff08;总共 12 篇&#xff09;&#xff0c;其中分成了几个子类&…

02、全文检索 ------ Solr(企业级的开源的搜索引擎) 的下载、安装、Solr的Web图形界面介绍

目录 Solr 的下载和安装Solr的优势&#xff1a;Lucene与Solr 安装 Solr1、下载解压2、添加环境变量3、启动 Solr Solr 所支持的子命令&#xff1a;Solr 的 Core 和 Collection 介绍Solr 的Web控制台DashBoard&#xff08;仪表盘&#xff09;Logging&#xff08;日志&#xff09…

代码随想录算法训练营29期|day34 任务以及具体任务

第八章 贪心算法 part03 1005.K次取反后最大化的数组和 class Solution {public int largestSumAfterKNegations(int[] nums, int K) {// 将数组按照绝对值大小从大到小排序&#xff0c;注意要按照绝对值的大小nums IntStream.of(nums).boxed().sorted((o1, o2) -> Math.ab…

华为1.24秋招笔试题

华为1.24秋招笔试题 1.题目1 题目详情 - 2024.1.24-华为秋招笔试-第一题-计算积分 - CodeFun2000 1.1题解 import java.util.Scanner;class Main{public static void main(String[] args){Scanner scnew Scanner(System.in);String ssc.next();char[] chs.toCharArray();in…

qt语言国际化(翻译),并实现多窗口同时翻译

一、.pro文件中添加支持的语言 在.pro文件中添加下面几句&#xff0c;支持中文和英文 TRANSLATIONS lanague_cn.ts\lanague_en.ts二、通过qt语言家更新翻译生成.ts文件 完成以后在工程目录可以看到.ts文件 三、通过linguist翻译文件 打开文件 将两个文件同时选中&#xf…

【WPF.NET开发】优化性能:图形呈现层

本文内容 图形硬件呈现层定义其他资源 呈现层为运行 WPF 应用程序的设备定义图形硬件功能和性能级别。 1、图形硬件 对呈现层级别影响最大的图形硬件功能包括&#xff1a; 视频 RAM - 图形硬件中的视频内存量决定了可用于合成图形的缓冲区大小和数量。 像素着色器 - 像素着…

【优秀案例】回本周期缩短10%!日安装量级高达5000以上!看NetMarvel如何赋能Ball Sort达成多项目标

“合成大西瓜在海外火了” 没想到&#xff0c;在国内已经过气的玩法转战到海外后&#xff0c;还能够翻红的这么彻底&#xff1f; 实际上&#xff0c;市面上很多在本土市场不温不火但转战海外赛道却盈利感人的应用不在少数&#xff0c;比如我们今天的重头戏《Ball Sort - Colo…

【云上建站】快速在云上构建个人网站4——网站备案

快速在云上构建个人网站4——网站备案 一、为网站配置域名1、使用域名的原因2、域名使用逻辑3、域名配置流程 二、域名注册1、查询域名&#xff1a;2、确认订单&#xff1a;3、实名认证域名4、域名解析配置解析域名&#xff1a;解析设置&#xff1a;访问域名&#xff1a; 一、为…

【深度学习:多关节嵌入模型】 Meta 解释的 ImageBind 多关节嵌入模型

【深度学习&#xff1a;多关节嵌入模型】 Meta 解释的 ImageBind 多关节嵌入模型 Meta 发布开源人工智能工具的历史分段任何模型DINOv2 什么是多模态学习&#xff1f;什么是嵌入&#xff1f;什么是 ImageBind&#xff1f;集成在 ImageBind 中的模式图像绑定架构特定模式编码器跨…

Flask框架开发学习笔记《6》前后端不分离基础框架

Flask框架开发学习笔记《6》前后端不分离基础框架 Flask是使用python的后端&#xff0c;由于小程序需要后端开发&#xff0c;遂学习一下后端开发。 主要包含如下文件&#xff1a; static 目录中存储了图片templates 目录中存储了 html 文件utils.py 包含了 log 函数server.p…