RocketMQ中的Consumer启动流程
RocketMQ客户端中有两个独立的消费者实现类分别为DefaultMQPullConsumer和DefaultMQPushConsumer,
DefaultMQPullConsumer
DefaultMQPullConsumer,该消费者使用时需要用户主动从Broker中Pull消息和消费消息,提交消费位点
继承关系图
核心属性
- namesrvAddr:继承自ClientConfig,表示RocketMQ集群的Namesrv地址,如果是多个,则用逗号分开
如:127.0.0.1:9876,127.0.0.2:9876 - clientIP:使用客户端的程序所在机器的IP地址,目前支持IPV4和IPV6,同时排除了本地环会地址(127.0.xxx.xxx)
和私有内网地址(192.168.xxx.xxx),如果在Docket中运行,获取的IP地址是容器所在的IP地址,而非宿主主机的IP地址 - instanceName:实例名,顾名思义每个实例都需要取不一样的名字,加入要在多个机器上部署
多个程序进程,那么每个进程的实例名必须不相同,否则程序会启动失败,因为在创建MQClient时,
会用到IP和instancename名称来
- vipChannelEnabled:这是一个boolean值,表示是否开启VIP通道。VIP通道和非VIP通道的区别是使用不同的端口号进行通信
- clientCallbackExecutorThreads:客户端回调线程数。该线程数等于Netty通信层回调线程的个数,默认值为
Runtime.getRuntime().availableProcessors();表示当前有效的CPU个数 - pollNameServerInterval:获取Topic路由信息间隔,单位为ms,默认为30000ms(30s)
- heartbeatBrokerInterval:客户端和Broker心跳间隔,单位为ms,默认30000ms(30s)
- persistCOnsumerOffsetInterval:持久化消费位点时间间隔,单位为ms,默认为5000ms(5s)
- defaultMQPullConsumer:默认pull消费者的具体实现
- consumerGroup:消费者组名字
- brokerSuspendMaxTimeMills:在长轮询模式下,Broker的最大挂起请求时间,建议不要修改此值
- consumerTimeoutMillsWhenSuspend:在长轮询模式下,消费者的最大请求超时时间,必须比brokerSuspendMaxTimeMills大,不建议修改
- messageModel:消费模式,现在支持集群模式消费和广播模式消费
- messageQueueListener:消息路由信息变化时回调处理监听器,一般在重新平衡时被调用
- offsetStore:位点存储模块。集群模式位点会持久化到Broker中,广播模式持久化到本地文件中(某个实例消费失败,生产者也不会重发),位点存储模块有两个实现类,分别为RemoteBrokerOffsetStore和LocalFileOffsetStore
- allocateMessageQUeueStrategy:消费Queue分配策略管理器,默认是平均分配策略
private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely(); - maxReconsumeTimes:最大重试次数,可以配置
核心方法
-
registerMessageQueueListener():注册队列变化监听器,当队列发生变化是会被监听到
-
pull():从Broker中Pull消息,如果有PullCallback参数,则表示异步拉取
-
pullBlockIfNotFound():长轮询方式拉取,如果没有拉取到消息,那么Broker会讲请求Hold住一段时间,
当有消息来临时再发送pull请求
-
updateConsumeOffset():更新某一个Queue的消费位点
-
fetchConsumeOffset():查找某个Queue的消费位点
-
sendMessageBack():如果消费发送失败,则可以讲消息重新发回Broker,这个消费者组延迟一段时间后可以再消费(也就是重试)
-
fetchSubscribeMessageQueues():获取一个Topic的全部Queue信息
Pull启动流程
- 1.最初创建defaultMQPullConsumerImpl时的状态为ServiceState.CREATE_JSUT,然后设置消费者的默认启动状态为失败
-
2.检查消费者的配置比,如消费者组名、消费类型、Queue的分配策略等参数是否符合规范,将订阅关系数据发给Rebalance服务对象
-
3.校验消费者实例名,如果时默认的名字,则更改为当前的程序进程id
-
4.获取一个MQClientInstance,如果MQClientInstance已经初始化,则直接返回初始化的实例。这是核心对象,每个ClientID缓存一个实例
-
5.设置Rebalance对象消费组、消费类型、Queue分配策略、MQClientInstance等参数
-
6.对BrokerAPI的封装类pullAPIWrapper进行初始化,同时注册消息,过滤filter
-
7.初始化位点管理器并加载位点信息,位点管理器分为本地管理和远程管理,集群消费时
消费位点保存在Broker中,由远程管理器管理,广播消息时位点在本地,由本地管理其管理
-
8.本地注册消费者实例,如果注册成功,则表示消费者启动成功
DefaultMQPushConsumer
大部分属性、方法和DefaultMQPullConsumer是一样的
核心属性和方法
- defaultMQPushConsumerImpl:默认的Push消费者具体实现类
- consumeFromWhere:一个枚举,表示从什么位点开始消费,
CONSUME_FROM_LAST_OFFSET:默认从上次消费的位点开始消费,相当于断点继续
CONSUME_FROM_TIMESTAMP:从指定时间开始消费
CONSUME_FROM_FIRST_OFFSET:从ConsumeQueue的最小位点开始消费 - consumeTimestamp:表示从哪一时刻开始消费,时间格式为yyyyMMDDHHmmss,默认半小时前,当consumeFromWhere=CONSUME_FROM_TIMESTAMP时,consumeTimestamp设置的值才生效
- allocateMessageQueueStrategy:消费者订阅topic-queue策略
- subscription:订阅关系,表示当前消费者订阅了哪些Topic的哪些Tag
- messageListener:消息Push回调监听器
- consumeThreadMin:最小消费线程数,必须小于consumeThreadMax
consumeThreadMax:最大线程数,必须大于consumeThreadMin - adjustThreadPoolNumsThreshold:动态调整消费线程池的线程数大小,开源版本不支持
- consumeConcurrentlyMaxSpan:并发消息的最大位点差,,如果Pull消息的位点差超过该值,拉取变慢
- pullThresholdForQueue:一个Queue能缓存的最大消息数,超过该值则采取拉取流控措施,默认是1000
- pullThresholdSizeForQueue:一个Queue最大能缓存的消息字节数,单位是MB,默认是10MB
- pullThresholdForTopic:一个Topic最大能缓存的消息数。超过该值则采取拉取流控措施,该字段值默认是-1,该值根据pullThresholdForQueue的配置决定是否生效,pullThresholdForTopic的优先级低于pullThresholdForQueue
- pullThreasholdSizeForTopic:一个Topic最大能缓存的消息字节数,单位是MB,默认为-1,结合pullThresholdSizeForQueue配置项生效,该配置项的优先级低于pullThresholdSizeForQueue
- pullInterval:拉取间隔,单位为ms
- consumeMessageBatchMaxSize:消费者每次批量消费时,最多消费多少条消息,默认是1
- pullBatchSize:一次最大拉取多少条消息,默认32条
- postSubscriptionWhenPull:每次拉取消息时是否更新订阅关系,默认false
- maxReconsumeTimes:最大重试次数,默认-1,表示最大重试次数为16次
- suspendCurrentQueueTimeMillis为段轮询场景设置的挂起时间,比如顺序消息场景
- consumeTimeout:消费超时时间,单位为min,默认是15
Push启动流程
- 1-7和Pull模式类似
- 8.初始化消费服务并启动,之所以用户"感觉"消息是Broker主动推送给自己的,
是因为DefaultMQPushConsumer通过Pull服务将消息
拉取到本地,再通过Callbakc的形式,将本地消息Push给用户的消费代码,
DefaultMQPushConsumer和DefaultMQPullConsumer
获取消息的方式一样,本质上都是拉取。
消费服务分为两种,即并行消费服务和顺序消费服务,对应的实现类分别是
ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService
根据用户监听器继承的不同接口初始化不同的消费服务程序
- 9.启动MQClientInstance实例
- 10.更新本地订阅关系和路由信息,通过Broker检查是否支持消费者的过滤类型;
向集群中的所有Broker发送消费者组的心跳信息
- 11.立即执行一次Rebalance
this.mQClientFactory.rebalanceImmediately();