RocketMq详解:六、RocketMq的负载均衡机制

上一章:《SpringBoot+Aop实现RocketMq的幂等》


文章目录

  • 1.背景
    • 1.1 什么是负载均衡
    • 1.2 负载均衡的意义
  • 2.RocketMQ消息消费
    • 2.1 消息的流转过程
    • 2.2 Consumer消费消息的流程
  • 3.RocketMq的负载均衡策略
    • 3.1 Broker负载均衡
    • 3.2 Producer发送消息负载均衡
    • 3.3 消费端的负载均衡
      • 3.3.1 Rebalance组件
      • 3.3.2 Rebalance触发时机
      • 3.3.3 负载均衡流程
      • 3.3.4 Queue分配算法
      • 3.3.5 负载均衡对消费的影响
      • 3.3.6 RocketMQ 5.0 消息级别负载均衡
  • 4.RocketMQ指定机器消费设计思路

1.背景

在RocketMQ中,它的负载均衡主要可以分为

  • Consumer订阅消息的负载均衡
  • Broker端的消息分发策略
  • Producer端的发送消息的负载均衡

其中在消费者端还有一个重量级的组件:Rebalance负载均衡组件,他负责相对均匀的给消费者分配需要拉取的队列信息。

在了解负载均衡组件之前,我们先看看什么是负载均衡以及为什么要使用负载均衡

1.1 什么是负载均衡

负载均衡在分布式服务里会频繁的出现,其主要是用来在多个资源 (一般是服务器)中分配负载,达到最优化资源使用,避免单台服务器过载

RocketMQ中的负载均衡,指的是如何将消息队列(Message Queue)均匀地分配给消费者组中的各个消费者实例

通过负载均衡机制,可以避免某些消费者实例处理过多消息队列而过载,或者某些实例没有消息可处理,从而提高系统的整体处理能力和资源利用率

1.2 负载均衡的意义

在这里插入图片描述

上图是 RocketMQ 的消息储存模型:消息是按照队列的方式分区有序储存的

RocketMQ 的队列模型使得生产者、消费者和读写队列都是多对多的映射关系,彼此之间都可以无限水平扩展

对比传统的消息队列如 RabbitMQ 是很大的优势。尤其是在流式处理场景下有天然优势,能够保证同一队列的消息被相同的消费者处理,对于批量处理、聚合处理更友好
在这里插入图片描述

消费者消费某个 topic 的消息等同于消费这个 topic 上所有队列的消息(上图中 Consumer A1 消费队列 1,Consumer A2 消费队列 2、3)。

所以,要保证每个消费者的负载尽量均衡,也就是要给这些消费者分配相同数量的队列,并保证在异常情况下(如客户端宕机)队列可以在不同消费者之间迁移

在具体了解RocketMQ的负载均衡策略之前,我们先了解一些RocketMq的整个消费逻辑,以便我们后期可以更好的理解

2.RocketMQ消息消费

2.1 消息的流转过程

RocketMQ 支持两种消费模式:集群消费( Clustering )和广播消费( Broadcasting )。

集群消费:同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。
在这里插入图片描述

广播消费:当使用广播消费模式时,每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次
在这里插入图片描述

上面提到了两个名词:消费者组、消息队列
这两个名词在:《RocketMQ 介绍及基本概念》这篇文章中已进行说明,下面在简单概述一下

  • 消费者组

消费者组是 RocketMQ 中负载均衡的基本单位

一个主题(Topic)的消息队列可以被多个消费者组订阅,每个消费者组中的消费者实例会共享消息队列。

如果一个主题有 8 个队列,而一个消费者组有 4 个消费者实例,那么负载均衡机制会将这 8 个队列均匀分配给这 4 个消费者实例。

  • 消息队列

消息队列是 RocketMQ 中消息存储和消费的基本单位

一个主题可以有多个消息队列,这些队列中的消息会被消费者组中的消费者实例消费。通过将消息队列均匀地分配给各个消费者实例,RocketMQ 实现了负载均衡。

2.2 Consumer消费消息的流程

consumer消息消费过程:
在这里插入图片描述

因为广播模式所有的Consumer都会收到全量消息,所以RocketMQ的负载均衡只针对于Consumer集群消费的模式

3.RocketMq的负载均衡策略

3.1 Broker负载均衡

Broker是以group(消费者组)为单位提供服务。

一个group里面分master和slave,master和slave存储的数据一样,slave从master同步数据(同步双写或异步复制看配置)。

通过nameserver暴露给客户端后,只是客户端关心(注册或发送)一个个的topic路由信息。

路由信息中会细化为message queue的路由信息。而message queue会分布在不同的broker group。所以对于客户端来说,分布在不同broker group的message queue为成为一个服务集群,但客户端会把请求分摊到不同的queue。

而由于压力分摊到了不同的queue,不同的queue实际上分布在不同的Broker group,也就是说压力会分摊到不同的broker进程,这样消息的存储和转发均起到了负载均衡的作用。

Broker一旦需要横向扩展,只需要增加broker group,然后把对应的topic建上,客户端的message queue集合即会变大,这样对于broker的负载则由更多的broker group来进行分担。

并且由于每个group下面的topic的配置都是独立的,也就说可以让group1下面的那个topic的queue数量是4,其他group下的topic queue数量是2,这样group1则得到更大的负载。

  • commit log

虽然每个topic下面有很多message queue,但是message queue本身并不存储消息。真正的消息存储会写在SetCommitLog的文件ter,message queue只是存储CommitLog中对应的位置信息,方便通过message queue找到对应存储在CommitLog的消息。

不同的topic,message queue都是写到相同的CommitLog 文件,也就是说CommitLog完全的顺序写

具体如下图:
在这里插入图片描述

3.2 Producer发送消息负载均衡

Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:
在这里插入图片描述

注: 另外多个队列可以部署在一台机器上,也可以分别部署在多台不同的机器上

上图所示的若干队列可以部署在一台机器上,也可以分别部署在不同的机器上,发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。通过增加机器,可以水平扩展队列容量。另外也可以自定义方式选择发往哪个队列。

RocketMQ的顺序消息发送的时候,就要求我们自己实现队列选择器,根据消息唯一标识选择对应的队列进行发送。

3.3 消费端的负载均衡

3.3.1 Rebalance组件

消费端的负载均衡主要依赖于Rebalance组件,将 Broker 端中多个队列按照某种算法分配给同一个消费组中的不同消费者

Rebalance即再均衡,指的是将一个Topic下的多个Queue在同一个Consumer Group中的多个 Consumer间进行重新分配的过程,它能够提升消息的并行消费能力

RocketMQ 5.0以前是按照队列粒度进行负载均衡的,5.0以后提供了按消息粒度进行负载均衡。

对于4.x/3.x的版本,包括DefaultPushConsumer、DefaultPullConsumer、LitePullConsumer等,默认且仅能使用队列粒度负载均衡策略

  • 队列粒度负载均衡策略

队列粒度负载均衡策略中,同一消费者组内的多个消费者将按照队列粒度消费消息,每个队列只能被其中一个消费者消费

队列粒度负载均衡是在每个消费者端进行的,并不是由某个节点统一进行负载均衡之后将分配结果通知到每个消费者。

费者增加或者减少会影响消息队列的分配消,所以Broker需要感知消费者的上下线情况

消费者在启动时会向所有的Broker发送心跳包进行注册,通知Broker消费者上线,下线的时候也会向Broker发送取消注册的请求

Broker会维护消费者信息的注册信息,在消费者发生变更时会通知消费者进行负载均衡

由于负载均衡是每个客户端独立进行计算,那么何时触发呢?

3.3.2 Rebalance触发时机

  • 消费端启动时,立即进行负载均衡

消费者在启动时会进行一次负载均衡,为自己分配消息队列。

  • 消费端定时任务每隔 20 秒触发负载均衡

消费者本身也会定时执行负载均衡,默认是20s执行一次。
在这里插入图片描述

  • 消费者上下线,Broker 端通知消费者触发负载均衡

如果有消费者向Broker发送UNREGISTER_CLIENT取消注册请求,并且开启了允许通知变更,会触发变更事件。

变更事件同上,Broker会通知该消费者组下的所有消费者进行一次负载均衡。

比如我们动态添加了Consumer进行消费,那么此时肯定是要重新分配一下,也就是触发Rebalance再均衡。

例如,一个Topic下5个队列,在只有1个消费者的情况下,这个消费者将负责消费这5个队列的消息。如果此时其中一个消费者分配2个队列,给另一个分配3个队列,从而提升消息我们增加一个消费者,那么就可以给的并行消费能力。 如下图:
在这里插入图片描述

  • 消费者所订阅Topic的队列数量发生变化

比如我们动态调整了Topic对应的队列数量,那么此时肯定是要重新分配一下,也就是触发Rebalance再均衡。

例如一个Topic下5个队列,有2个消费者的情况下,那么就可以给其中一个消费者分配2个队列,给另一个分配3个队列,假设我们调整到Topic下有7个队列,还是2个消费者的情况下,那么就可以给其中一消费者分配4个队列,给另一个分配3个队列;从而提升消息的并行消费能力。如下图:
在这里插入图片描述
像Consumer Group扩容或缩容、Consumer与NameServer间发生网络异常、Consumer发生宕机等都会导致消费者组中消费者的数量发生变化。

需要注意的是,由于一个队列最多分配给一个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,多余的消费者实例将分配不到任何队列,等于是多余的消费者什么都不做,白白浪费

3.3.3 负载均衡流程

1、发送心跳
消费者启动后,"它就会通过定时任务不断地向 RocketMQ 集群中的所有 Broker 实例发送心跳包消息消费分组名称、订阅关系集合、消息通信模式和客户端实例编号等信息

Broker 端在收到消费者的心跳消息后,会将它维护在 ConsumerManager 的本地缓存变量 consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量 channelinfoTable 中,为之后做消费端的负载均衡提供可以依据的元数据信息。

2、启动负载均衡服务
负载均衡核心代码:
负载均衡服务执行逻辑在doRebalance函数,里面会对每个消费者组执行负载均衡操作。

/* group */
private ConcurrentMap<String, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();public void doRebalance() {//每个消费者组都有负载均衡for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();if (impl != null) {try {impl.doRebalance();} catch (Throwable e) {log.error("doRebalance exception", e);}}}
}

由于每个消费者组可能会消费很多topic,每个topic都有自己的不同队列,最终是按topic的维度进行负载均衡。

 public boolean doRebalance(boolean isOrder) {boolean balanced = true;Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {Iterator var4 = subTable.entrySet().iterator();while(var4.hasNext()) {Map.Entry<String, SubscriptionData> entry = (Map.Entry)var4.next();String topic = (String)entry.getKey();try {if (!this.clientRebalance(topic) && this.tryQueryAssignment(topic)) {balanced = this.getRebalanceResultFromBroker(topic, isOrder);} else {balanced = this.rebalanceByTopic(topic, isOrder);}} catch (Throwable var8) {Throwable e = var8;if (!topic.startsWith("%RETRY%")) {log.warn("rebalance Exception", e);balanced = false;}}}}this.truncateMessageQueueNotMyTopic();return balanced;}

里面最核心的便是:
在这里插入图片描述

这段代码的逻辑如下:
1.clientRebalance(topic):首先调用clientRebalance方法,尝试对指定主题(topic)进行客户端再平衡。如果这个方法返回false,意味着客户端再平衡没有成功或不需要进行。

2.tryQueryAssignment(topic):接着调用tryQueryAssignment方法,尝试从代理服务器(broker)查询最新的订阅分配信息。这个方法通常会在客户端再平衡失败或者不需要进行时被调用,目的是检查是否可以从代理服务器获取新的分配信息。

3.如果上述两个条件都满足(即clientRebalance返回false且tryQueryAssignment成功),则会调用getRebalanceResultFromBroker(topic, isOrder)方法,从代理服务器获取再平衡的结果,并将结果赋值给balanced变量。这通常意味着需要根据来自代理服务器的信息来更新本地的消费队列分配。

4.否则,如果上述两个条件不同时满足,则调用rebalanceByTopic(topic, isOrder)方法,通过其他方式(可能是基于当前已有的分配信息)来进行再平衡,并将结果赋值给balanced变量。

总结来说,这段代码是在判断是否应该从代理服务器获取最新的分配信息来完成再平衡,还是基于现有的分配信息自行处理再平衡。

选择哪条路径取决于clientRebalancetryQueryAssignment方法的执行结果。这种设计允许RocketMQ灵活地应对不同的网络状况和系统状态,以确保消息能够高效、公平地分发给各个消费者。

最终最终负载均衡逻辑处理的实现在:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic。

    private boolean rebalanceByTopic(String topic, boolean isOrder) {boolean balanced = true;Set mqSet;switch (this.messageModel) {//广播模式case BROADCASTING:mqSet = (Set)this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}", new Object[]{this.consumerGroup, topic, mqSet, mqSet});}balanced = mqSet.equals(this.getWorkingMessageQueue(topic));} else {this.messageQueueChanged(topic, Collections.emptySet(), Collections.emptySet());log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);}break;//集群模式case CLUSTERING:mqSet = (Set)this.topicSubscribeInfoTable.get(topic);List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, this.consumerGroup);if (null == mqSet && !topic.startsWith("%RETRY%")) {this.messageQueueChanged(topic, Collections.emptySet(), Collections.emptySet());log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", this.consumerGroup, topic);}if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);} catch (Throwable var11) {Throwable e = var11;log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);return false;}Set<MessageQueue> allocateResultSet = new HashSet();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("client rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", new Object[]{strategy.getName(), this.consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet});this.messageQueueChanged(topic, mqSet, allocateResultSet);}balanced = allocateResultSet.equals(this.getWorkingMessageQueue(topic));}}return balanced;}

我们一起来看一下这段代码:
负载均衡服务会根据消费模式为”广播模式”还是“集群模式”做不同的逻辑处理,这里主要来看下集群模式下的主要处理流程:
在这里插入图片描述

(1) 获取该主题下的消息消费队列集合;

(2) 查询 Broker 端获取该消费组下消费者 Id 列表;

(3) 先对 Topic 下的消息消费队列、消费者 Id 排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列;
在这里插入图片描述

这里的平均分配算法,类似于分页的算法,将所有 MessageQueue 排好序类似于记录,将所有消费端排好序类似页数,并求出每一页需要包含的平均 size 和每个页面记录的范围 range ,最后遍历整个 range 而计算出当前消费端应该分配到的记录。

(4) 分配到的消息队列集合与 processQueueTable 做一个过滤比对操作
在这里插入图片描述

消费者实例内 ,processQueueTable 对象存储着当前负载均衡的队列 ,以及该队列的消费快照。

标红的部分表示与分配到的消息队列集合互不包含,则需要将这些红色队列 Dropped 属性为 true , 然后从 processQueueTable 对象中移除。

绿色的部分表示与分配到的消息队列集合的交集,processQueueTable 对象中已经存在该队列。

黄色的部分表示这些队列需要添加到 processQueueTable 对象中,创建这些队列的消费快照。最后创建拉取消息请求列表,并将请求分发到消息拉取服务,进入拉取消息环节。

3.3.4 Queue分配算法

一个Topic中的Queue只能由Consumer Group中的一个Consumer进行消费,而一个Consumer可以同时消费多个Queue中的消息。

那么Queue与Consumer间的配对关系是如何确定的,即Queue要分配给哪个Consumer进行消费,也是有算法策略的。

负载均衡策略顶层接口:

/*** Strategy Algorithm for message allocating between consumers*/
public interface AllocateMessageQueueStrategy {/*** Allocating by consumer id* 给消费者id分配消费队列*/List<MessageQueue> allocate(final String consumerGroup, //消费者组final String currentCID, //当前消费者idfinal List<MessageQueue> mqAll, //所有的队列final List<String> cidAll //所有的消费者);}

他默认共有7种负载均衡策略实现:
在这里插入图片描述

常见的有四种策略,分别是:平均分配策略环形平均策略一致性hash策略同机房策略

这些策略是通过在创建Consumer时的构造器传进去的。

(1)、平均分配策略 (默认)
该算法是根据
$[avg = QueueCount/ ConsumerCount] $的计算结果进行分配的,如果能够整除,则按顺序将avg个Queue逐个分配,如果不能整除,则将多余出的Queue按照Consumer顺序逐个分配
在这里插入图片描述

(2)、环形分配策略
环形平均算法是指,根据消费者的顺序,依次由Queue队列组成的环形图逐个分配,该方法不需要提前计算,如下图:
在这里插入图片描述

(3)、一致性哈希分配策略

该算法会将consumer的hash值作为Node节点存放到hash环上,然后将queue的hash值也放到hash环 上,通过顺时针方向,距离queue最近的那个consumer就是该queue要分配的consumer。
在这里插入图片描述

一致性哈希算法可以有效减少由于消费者组扩容或缩容所带来的大量的Rebalance,所以它适合用在Consume数量变化较频繁的场景,如下图:
在这里插入图片描述

但是一致性哈希算法也存在不足,就是分配效率较低,容易导致分配不均的情况。即每个消费者消费的队列数,有可能相差很大,这样就会造成个别消费者压力过大。

我们可以引入虚拟桶,让queue在hash环中尽可能分配均匀

在负载均衡的分配策略中,一致性哈希算法数见不鲜,感兴趣的同学可以移步至:《负载均衡的常见几种算法》
(4)、机房分配策略

该算法会根据queue的部署机房位置consumer的位置,过滤出当前consumer相同机房的queue。

然后按照平均分配策略或环形平均策略对同机房queue进行分配。如果没有同机房queue,则按照平均分配策略或环形平均策略对所有queue进行分配。如下图:
在这里插入图片描述

上面我们讲了那么多好处,但是没有什么事情都是能够十分完美的兼容所有,下面我们来辩证的讨论一下负载均衡对消费的影响

3.3.5 负载均衡对消费的影响

Rebalance的在提升消费能力的同时,也带来一些问题

a、消费暂停: 在只有一个Consumer时,其负责消费所有队列;在新增了一个Consumer后会触发 Rebalance的发生。此时原Consumer就需要暂停部分队列的消费,等到这些队列分配给新的Consumer后,这些暂停消费的队列才能继续被消费。

b、消费重复: Consumer在消费新分配给自己的队列时,必须接着之前Consumer 提交的消费进度的offset 继续消费。然而默认情况下,offset是异步提交的,这个异步性导致提交到Broker的offset与Consumer实际消费的消息并不一致。这个不一致的差值就是可能会重复消费的消息。

C、消费突刺: 由于Rebalance可能导致重复消费,如果需要重复消费的消息过多,或者因为Rebalance暂停时间过长从而导致积压了部分消息。那么有可能会导致在Rebalance结束之后瞬间需要消费很多消息
在这里插入图片描述

上图是真实的一个线上case,这两个时间点在进行应用发布,根据我们上文的分析某个消费者下线后同组的其他消费者感知这一变化需要一定时间,导致有秒级的消费延迟产生。在发布结束后消费者快速处理堆积的消息,可以发现消费速度有一个明显的上涨。

这个例子展示了下线时由于负载均衡带来了短暂的消息处理延迟,新的消费者会从服务端获取消费位点继续之前的消费进度。如果消费者异常宕机或者没有调用 shutdown 优雅下线,没有上传自己的最新消费位点,会使得新分配的消费者重复消费。

当某个客户端触发负载均衡时,就会出现:

  • 对于新分配的队列可能会重复消费,这也是官方要求消费要做好幂等的原因
  • 对于不再负责的队列会短时间消费停止,如果原本的消费 TPS 很高或者正好出现生产高峰就会造成消费毛刺。

为了避免这些影响,则需要我们在使用时注意:

  • 1.避免频繁上下线,为了避免负载均衡的影响应该尽量减少客户端的上下线,同时做好消费幂等;
  • 2.同时在有应用重启或下线前要调用 shutdown 方法,这样服务端在收到客户端的下线请求后会通知客户端及时触发负载均衡,减少消费延迟;
  • 3.选择合适的负载均衡策略;
    • 需要根据业务需要灵活选择负载均衡策略:
    • 需要保证客户端的负载尽可能的均衡:选择默认的平均分配策略;
    • 需要降低应用重启带来的消费延迟:选择一致性哈希的分配策略。
  • 4.RocketMQ 的负载均衡是每个客户端独立进行计算,所以务必要保证每个客户端的负载均衡算法和订阅语句一致:
    • 负载均衡策略不一致会导致多个客户端分配到相同队列或有客户端分不到队列;
    • 订阅语句不一致会导致有消息未能消费。

3.3.6 RocketMQ 5.0 消息级别负载均衡

为了彻底解决客户端负载均衡导致的重复消费和消费延迟问题,RocketMQ 5.0 提出了消息级别的负载均衡机制

同一个队列的消息可以由多个消费者消费,服务端会确保消息不重不漏的被客户端消费到:

消息粒度的负载均衡机制,是基于内部的单条消息确认语义实现的

消费者获取某条消息后,服务端会将该消息加锁,保证这条消息对其他消费者不可见,直到该消息消费成功或消费超时

因此,即使多个消费者同时消费同一队列的消息,服务端也可保证消息不会被多个消费者重复消费

在 4.x 的客户端中,顺序消费的实现强依赖于队列的分配

RocketMQ 5.0 在消息维度的负载均衡的基础上也实现了顺序消费的语意:不同消费者处理同一个消息组内的消息时,会严格按照先后顺序锁定消息状态,确保同一消息组的消息串行消费
在这里插入图片描述

如上图所述,队列 Queue1 中有 4 条顺序消息,这 4 条消息属于同一消息组 G1,存储顺序由 M1 到 M4。

在消费过程中,前面的消息 M1、M2 被 消费者Consumer A1 处理时,只要消费状态没有提交,消费者 A2 是无法并行消费后续的 M3、M4 消息的,必须等前面的消息提交消费状态后才能消费后面的消息。

4.RocketMQ指定机器消费设计思路

日常测试环境当中会存在多台consumer进行消费,但实际开发当中某台consumer新上了功能后希望消息只由该机器进行消费进行逻辑覆盖,这个时候consumerGroup的集群模式就会给我们造成困扰,因为消费负载均衡的原因不确定消息具体由哪台consumer进行消费。当然我们可以通过介入consumer的负载均衡机制来实现指定机器消费。

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();// 通过改写这部分逻辑,增加判断是否是指定IP的机器,如果不是直接返回空列表表示该机器不负责消费if (!cidAll.contains(currentCID)) {return result;}int index = cidAll.indexOf(currentCID);int mod = mqAll.size() % cidAll.size();int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/62175.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

主打极致性价比,AMD RX 8600/8800显卡定了

*以下内容仅为网络爆料及传闻&#xff0c;一切以官方消息为准。 这谁能想到&#xff0c;率先掏出下一代桌面独立显卡的不是老大哥 NVIDIA&#xff0c;也不是 AMD&#xff0c;反而是三家中存在感最弱的 Intel&#xff01; 就在 12 月 3 日&#xff0c;Intel 正式发布了自家第二…

npm, yarn, pnpm之间的区别

前言 在现代化的开发中&#xff0c;一个人可能同时开发多个项目&#xff0c;安装的项目越来越多&#xff0c;所随之安装的依赖包也越来越臃肿&#xff0c;而且有时候所安装的速度也很慢&#xff0c;甚至会安装失败。 因此我们就需要去了解一下&#xff0c;我们的包管理器&#…

C语言连接数据库

文章目录 一、初始化数据库二、创建数据库连接三、执行增删改查语句1、增删改2、查 四、执行增删改查语句 接下来我简单的介绍一下怎么用C语言连接数据库。 初始化数据库创建数据库连接执行增删改查语句关闭数据库连接 一、初始化数据库 // 数据库初始化 MYSQL mysql; MYSQL* r…

优化LabVIEW数据运算效率的方法

在LabVIEW中进行大量数据运算时&#xff0c;提升计算效率并减少时间占用是开发过程中常遇到的挑战。为此&#xff0c;可以从多个角度着手优化&#xff0c;包括合理选择数据结构与算法、并行处理、多线程技术、硬件加速、内存管理和界面优化等。通过采用这些策略&#xff0c;可以…

计算机的错误计算(一百七十六)

摘要 利用某一大语言模型计算 的值&#xff0c;输出为 0 . 例1. 在某一大语言模型下&#xff0c;计算 的值。其中sin中值取弧度。结果保留16位有效数字。 直接贴图吧&#xff1a; 点评&#xff1a; &#xff08;1&#xff09;以上为一个大模型给的答案。从其回答可知&…

数据结构与算法——1204—递归分治法

1、斐波那契数列优化 使用滚动变量&#xff0c;保存当前计算结果和前两项值 (1)RAB (2)更新计算对象&#xff0c;AB&#xff0c;BR #include<iostream> using namespace std;int fun(int n) {if (n 0)return 0;if (n 1 || n 2)return 1;int num11;int num21;int su…

openstack内部rpc消息通信源码分析

我们知道openstack内部消息队列基于AMQP协议&#xff0c;默认使用的rabbitmq 消息队列。谈到rabbitmq&#xff0c;大家或许并不陌生&#xff0c;但或许会对oslo message有些陌生。openstack内部并不是直接使用rabbitmq&#xff0c;而是使用了oslo.message 。oslo.message 后端的…

Postman自定义脚本Pre-request-script以及Test

这两个都是我们进行自定义script脚本的地方&#xff0c;分别是在请求执行的前后运行。 我们举两个可能经常运用到的场景。 (一)请求A先执行&#xff0c;请求B使用请求A响应结果作为参数。如果我们不用自定义脚本&#xff0c;可能得先执行请求A&#xff0c;然后手动复制响应结果…

总结的一些MySql面试题

目录 一&#xff1a;基础篇 二&#xff1a;索引原理和SQL优化 三&#xff1a;事务原理 四&#xff1a;缓存策略 一&#xff1a;基础篇 1&#xff1a;定义&#xff1a;按照数据结构来组织、存储和管理数据的仓库&#xff1b;是一个长期存储在计算机内的、有组织的、可共享 的…

116. UE5 GAS RPG 实现击杀掉落战利品功能

这一篇&#xff0c;我们实现敌人被击败后&#xff0c;掉落战利品的功能。首先&#xff0c;我们将创建一个新的结构体&#xff0c;用于定义掉落体的内容&#xff0c;方便我们设置掉落物。然后&#xff0c;我们实现敌人死亡时的掉落函数&#xff0c;并在蓝图里实现对应的逻辑&…

Excel技巧:如何批量调整excel表格中的图片?

插入到excel表格中的图片大小不一&#xff0c;如何做到每张图片都完美的与单元格大小相同&#xff1f;并且能够根据单元格来改变大小&#xff1f;今天分享&#xff0c;excel表格里的图片如何批量调整大小。 方法如下&#xff1a; 点击表格中的一个图片&#xff0c;然后按住Ct…

智能合约

06-智能合约 0 啥是智能合约&#xff1f; 定义 智能合约&#xff0c;又称加密合约&#xff0c;在一定条件下可直接控制数字货币或资产在各方之间转移的一种计算机程序。 角色 区块链网络可视为一个分布式存储服务&#xff0c;因为它存储了所有交易和智能合约的状态 智能合约还…

智慧油客:从初识、再识OceanBase,到全栈上线

今天&#xff0c;我们邀请了智慧油客的研发总监黄普友&#xff0c;为我们讲述智慧油客与 OceanBase 初识、熟悉和结缘的故事。 智慧油客自2016年诞生以来&#xff0c;秉持新零售的思维&#xff0c;成功从过去二十年间以“以销售产品为中心”的传统思维模式&#xff0c;转向“以…

【深度学习】手机SIM卡托缺陷检测【附链接】

一、手机SIM卡托用途 SIM卡托是用于固定和保护SIM卡的部件&#xff0c;通过连接SIM卡与手机主板的方式&#xff0c;允许设备访问移动网络&#xff0c;用户可以通过SIM卡进行通话、发送短信和使用数据服务。 二、手机SIM卡托不良影响 SIM卡接触不良&#xff0c;造成信号中断&…

消防物证管理系统|DW-S404实现消防物证智能化管理

一、系统概述 智慧消防物证管理系统DW-S404系统旨在借助现代信息技术&#xff0c;达成消防物证管理的高效化、安全化及智能化管理目标。该系统运用物联网、大数据、云计算等先进技术&#xff0c;实现对消防物证从产生到销毁的全生命周期跟踪与监控&#xff0c;从而增强物证管理…

Odoo :一款免费且开源的食品生鲜领域ERP管理系统

文 / 贝思纳斯 Odoo金牌合作伙伴 引言 提供业财人资税的精益化管理&#xff0c;实现研产供销的融通、食品安全的追踪与溯源&#xff0c;达成渠道的扁平化以及直面消费者的 D2C 等数字化解决方案&#xff0c;以此提升运营效率与核心竞争力&#xff0c;支撑高质量的变速扩张。…

如何部署vue项目到Github Pages

1.创建vue项目 npm create vitelatest my-vue-app -- --template vue 2.创建github仓库 3.连接仓库 在项目根目录右键选择open git base here&#xff0c;如果没有安装git请先安装git。 初始化仓库 $ git init $ git add . $ git commit -m "init"将项目与仓库连…

Dubbo应用篇

文章目录 一、Dubbo简介二、SSM项目整合Dubbo1.生产者方配置2.消费者方配置 三、Spring Boot 项目整合Dubbo1.生产者方配置2.消费者方配置 四、应用案例五、Dubbo配置的优先级别1. 方法级配置&#xff08;Highest Priority&#xff09;2. 接口级配置3. 消费者/提供者级配置4. 全…

ubuntu的matlab使用心得

1.读取视频 v VideoReader(2222.mp4);出问题&#xff0c;报错&#xff1a; matlab 错误使用 VideoReader/initReader (第 734 行) 由于出现意外错误而无法读取文件。原因: Unable to initialize the video properties 出错 audiovideo.internal.IVideoReader (第 136 行) init…

消息中间件-Kafka1-实现原理

消息中间件-Kafka 一、kafka简介 1、概念 Kafka是最初由Linkedin公司开发&#xff0c;是一个分布式、支持分区&#xff08;partition&#xff09;、多副本的&#xff08;replica&#xff09;&#xff0c;基于zookeeper协调的分布式消息系统&#xff0c;它的最大的特性就是可以…