消费者Rebalance机制

优质博文:IT-BLOG-CN

一、消费者Rebalance机制

Apache Kafka中,消费者组
Consumer Group会在以下几种情况下发生重新平衡Rebalance
【1】消费者加入或离开消费者组: 当一个新的消费者加入消费者组或一个现有的消费者离开消费者组时,Kafka会触发重新平衡,以重新分配分区给消费者。
【2】消费者崩溃或失去连接: 如果Kafka检测到某个消费者崩溃或失去连接(例如,由于网络问题或消费者进程被终止),它会触发重新平衡。
【3】主题的分区数量发生变化: 如果一个主题的分区数量增加或减少,Kafka会触发重新平衡,以确保新的分区被分配给消费者组中的消费者。
【4】消费者组协调器变更: 消费者组协调器是负责管理消费者组的一个Kafka Broker。如果消费者组协调器发生变更(例如,协调器所在的Broker崩溃),也会触发重新平衡。
【5】消费者组成员发送心跳失败: 消费者需要定期向消费者组协调器发送心跳heartbeat以表明它们仍然活跃。如果心跳失败,协调器会认为该消费者已经失去连接,从而触发重新平衡。

rebalance只针对subscribe这种不指定分区消费的情况,如果通过assign这种消费方式指定了分区,kafka不会进行rebanlance

Kafka在高峰期重平衡rebalancing会导致消费者组的停顿,影响系统的性能和稳定性。为了避免在高峰期发生重平衡,可以采取以下几种策略:
【1】优化分区分配策略: 使用RangeAssignorStickyAssignor等分区分配策略来减少重平衡的频率和影响。

RangeAssignorKafka默认的分区分配策略之一,它将分区按范围分配给消费者。

我们通过一个具体的例子来说明RangeAssignor如何分配分区。

假设我们有一个Kafka主题my-topic,它有6个分区P0, P1, P2, P3, P4, P5,并且我们有3个消费者C1, C2, C3在一个消费者组中。

初始分配:假设初始分配如下:

C1: P0, P1
C2: P2, P3
C3: P4, P5

消费者组成员变化:现在假设C2离开了消费者组,那么RangeAssignor会重新分配分区,以确保分区尽量按顺序和均匀地分配给剩余的消费者。新的分配可能如下:

C1: P0, P1, P2
C3: P3, P4, P5

在这个过程中,RangeAssignor将分区按顺序重新分配给剩余的消费者,确保每个消费者分配到的分区尽量连续。

新消费者加入:现在假设有一个新消费者C4加入了消费者组,RangeAssignor会再次按顺序和均匀地分配分区。新的分配可能如下:

C1: P0, P1
C3: P2, P3
C4: P4, P5

在这个过程中,RangeAssignor将分区重新分配,以确保每个消费者分配到的分区尽量连续和均匀。

通过这个例子,我们可以看到RangeAssignor的分配策略:
1、将分区按顺序分配给消费者。
2、当消费者组成员变化时,重新分配分区,以确保分区尽量按顺序和均匀地分配给所有消费者。
3、分区分配尽量保持连续性。
这种策略的好处是分区分配简单且稳定,减少了分区在消费者组成员变化时的重新分配范围,从而减少了重平衡的频率和影响。

以下是配置RangeAssignor的代码示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class RangeAssignorExample {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 设置分区分配策略为 RangeAssignorprops.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(List.of("example-topic"));// 消费消息的逻辑// ...}
}

StickyAssignorKafka 2.4及以上版本引入的一种分区分配策略,它的目标是尽量保持分区分配的稳定性,减少重平衡的频率。

我们通过一个具体的例子来说明StickyAssignor如何分配分区。

假设我们有一个Kafka主题my-topic,它有6个分区P0, P1, P2, P3, P4, P5,并且我们有3个消费者C1, C2, C3在一个消费者组中。

初始分配:假设初始分配如下:

C1: P0, P1
C2: P2, P3
C3: P4, P5

消费者组成员变化:现在假设C2离开了消费者组,那么StickyAssignor会尽量保持现有的分区分配不变,并重新分配C2的分区。新的分配可能如下:

C1: P0, P1, P2
C3: P3, P4, P5

在这个过程中,StickyAssignor尽量保持C1C3的分区分配不变,只是将C2的分区重新分配给其他消费者。

新消费者加入:现在假设有一个新消费者C4加入了消费者组,StickyAssignor会尝试保持现有的分区分配不变,并将分区尽量均匀地分配给所有消费者。新的分配可能如下:

C1: P0, P1
C3: P4, P5
C4: P2, P3

在这个过程中,StickyAssignor保持了C1C3的分区不变,并将C2的分区重新分配给C4

通过这个例子,我们可以看到StickyAssignor的分配策略:
1、尽量保持现有的分区分配不变。
2、当消费者组成员变化时,尽量最小化分区在消费者之间的移动。
3、尽量保持分区分配的平衡性。
这种策略的好处是减少了重平衡带来的影响,提高了分区分配的稳定性,减少了因分区移动带来的数据重新加载和处理的开销。

以下是配置StickyAssignor的代码示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class StickyAssignorExample {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 设置分区分配策略为 StickyAssignorprops.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(List.of("example-topic"));// 消费消息的逻辑// ...}
}

或者在配置中进行指定

group.id=my-consumer-group
partition.assignment.strategy=org.apache.kafka.clients.consumer.StickyAssignor

【2】增加session.timeout.msheartbeat.interval.ms:增加session.timeout.msheartbeat.interval.ms的值,这样可以减少消费者因为心跳超时而被认为失效,从而触发重平衡。

1、session.timeout.ms是消费者与Kafka broker之间的会话超时时间。如果在这个时间内Kafka broker没有收到某个消费者的心跳,broker就会认为该消费者已经失效,并触发重平衡
2、heartbeat.interval.ms是消费者发送心跳给Kafka broker的时间间隔。心跳是消费者向broker表示自己仍然活跃的方式。

session.timeout.ms=30000
heartbeat.interval.ms=3000

3、heartbeat.interval.ms的值通常要远小于session.timeout.ms的值。这样可以确保在会话超时之前,消费者有多次机会发送心跳。一般建议session.timeout.ms至少是heartbeat.interval.ms10倍,以确保有足够的时间进行多次心跳尝试。

【3】合理配置消费者组:确保消费者组中的消费者数量稳定,避免频繁地增加或减少消费者。尽量在低峰期进行消费者的添加或移除操作。

【4】优化消费者性能:提高消费者的处理能力,确保消费者能够及时处理消息,避免因为处理延迟导致的重平衡。使用异步处理或批量处理来提高消费者的吞吐量。

【5】监控和报警:实时监控Kafka集群和消费者组的状态,设置报警机制,当检测到重平衡风险时,及时采取措施。

【6】使用静态成员Static MembershipKafka 2.3及以上版本支持静态成员功能,可以通过配置group.instance.id来减少重平衡的频率。

group.instance.idKafka 2.4.0引入的一个配置项,用于为每个消费者实例指定一个唯一的标识符。当消费者组中的消费者具有唯一的group.instance.id时,Kafka可以更智能地处理消费者组成员的变化,从而减少不必要的重平衡。

静态成员:通过配置group.instance.id,消费者实例变成了“静态成员”,即使它们暂时断开连接,Kafka也会保留它们的成员身份。这与传统的动态成员(没有group.instance.id)不同,动态成员在断开连接后会被移除,从而触发重平衡。

group.id=my-consumer-group
group.instance.id=consumer-instance-1

【7】调整rebalance.timeout.ms:增加rebalance.timeout.ms的值,确保消费者有足够的时间完成重平衡过程,避免因超时导致的频繁重平衡。

消费者Rebalance分区分配策略

主要包含四种relalance策略:RangeAssignor(范围分配策略),RoundRobinAssignor(轮询分配策略),StickyAssignor(粘性分配策略),CooperativeStickyAssignor(协作粘性分配策略),之前已经讲过两个,这里聊聊剩下的两个

RoundRobinAssignor(轮询分配策略)

RoundRobinAssignor采用轮询的方式将分区分配给消费者。它会将所有分区和消费者按照字典顺序排序,然后依次将每个分区分配给下一个消费者,直到所有分区都被分配完毕。

CooperativeStickyAssignor(协作粘性分配策略)

CooperativeStickyAssignorStickyAssignor的改进版本,它引入了协作重平衡的概念,使得重平衡过程更加平滑,减少了重平衡期间的停顿时间。

二、Rebalance 过程

第一阶段:选择"组协调器"
组协调器GroupCoordinator:每个consumer group都会选择一个broker作为自己的组协调器coordinator,负责监控这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance

consumer group中的每个consumer启动时会向kafka集群中的某个节点发送FindCoordinatorRequest请求来查找对应的组协调器GroupCoordinator,并跟其建立网络连接。

组协调器选择方式:consumer消费的offset要提交到__consumer_offsets的哪个分区,这个分区leader对应的broker就是这个consumer groupcoordinator

第二阶段:加入消费组JOIN GROUP
在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。然后GroupCoordinator从一个consumer group中选择第一个加入groupconsumer作为leader(消费组协调器),把consumer group情况发送给这个leader,接着这个leader会负责制定分区方案。

第三阶段:SYNC GROUP
consumer leader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区方案下发给各个consumer,他们会根据指定分区的leader broker进行网络连接以及消息消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/54774.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ROS基础入门——实操教程

ROS基础入门——实操教程 前言 本教程实操为主&#xff0c;少说书。可供参考的文档中详细的记录了ROS的实操和理论&#xff0c;只是过于详细繁杂了&#xff0c;看得脑壳疼&#xff0c;于是做了这个笔记。 Ruby Rose&#xff0c;放在这里相当合理 本文初编辑于2024年10月4日 C…

MySQL--事务(详解)

目录 一、前言二、本文章目标三、什么是事务&#xff1f;四、事务的ACID特性五、为什么要使用事务六、如何使用事务6.1 查看支持使用事务的引擎6.2语法6.3 开启⼀个事务&#xff0c;执行更新后回滚6.4 开启一个事务更新后提交6.5 保存点6.6 自动/手动提交事务 七、事务的隔离性…

Spring异步线程池的问题

今天看一视频&#xff0c;提到说 Spring默认的异步线程池比较简单&#xff0c;每次执行异步任务&#xff0c;都会新建一个线程进行处理&#xff0c;不会重复利用&#xff0c;所以在用Spring框架开发的时候&#xff0c;需要自定义异步线程池。第一次听到这个说法。遂开始百度。 …

Word页眉内容自动填充为章节标题

Word页眉内容自动填充为章节标题 在写毕业论文的过程中&#xff0c;通常要求将页眉设置为章节标题&#xff0c;例如这样 通常&#xff0c;页眉内容我们都是手敲上去的&#xff0c;其实在Word中可以设置为自动引用章节标题&#xff0c;以下为设置方法&#xff0c;仅供参考&…

jmeter入门:脚本录制

1.设置代理。 网络连接-》代理-》手动设置代理. ip&#xff1a; 127.0.0.1&#xff0c; port&#xff1a;8888 2. add thread group 3. add HTTP(s) test script recorder, target controller chooses Test plan-> thread Group 4. click start. then open the browser …

利用 Python 爬虫采集 1688商品详情

1688是中国的一个大型B2B电子商务平台&#xff0c;主要用于批发和采购各种商品。对于需要从1688上获取商品详情数据、工程数据或店铺数据的用户来说&#xff0c;可以采用以下几种常见的方法&#xff1a; 官方API接口&#xff1a;如果1688提供了官方的API接口&#xff0c;那么可…

猎板PCB设计中的HDI板技术革新与实践

在设计工业控制器的HDI板时&#xff0c;需要注意以下几个关键方面&#xff1a; 布线设计&#xff1a;由于HDI板布线密度高&#xff0c;合理规划走线非常关键&#xff0c;避免交叉和重叠&#xff0c;确保信号传输的稳定性和可靠性。需要控制线宽和线距&#xff0c;根据电路板的…

C语言 | Leetcode C语言题解之第454题四数相加II

题目&#xff1a; 题解&#xff1a; struct hashTable {int key;int val;UT_hash_handle hh; };int fourSumCount(int* A, int ASize, int* B, int BSize, int* C, int CSize, int* D, int DSize) {struct hashTable* hashtable NULL;for (int i 0; i < ASize; i) {for (…

日语学习零基础生活日语口语柯桥外语学校|股票用日语怎么说?

在日语中&#xff0c;“股票”可以说&#xff1a; • 株&#xff08;かぶ&#xff09; 这是最常用的表达方式&#xff0c;直接表示“股票”。 例如&#xff1a; 株を買う - 买股票 株を売る - 卖股票 • 株式&#xff08;かぶしき&#xff09; 这个词也是“股票”的意…

模拟实现消息队列(基于SpringBoot实现)

项目代码 提要&#xff1a;此处的消息队列是仿照RabbitMQ实现&#xff08;参数之类的&#xff09;&#xff0c;实现一些基本的操作&#xff1a;创建/销毁交互机&#xff08;exchangeDeclare&#xff0c;exchangeDelete&#xff09;&#xff0c;队列&#xff08;queueDeclare&a…

【CF2021E】Digital Village(All Version)

题目 给你一张 n n n 个点 m m m 条边的无向图&#xff0c;有 p p p 个关键点。你需要选择 k k k 个点染黑&#xff0c;使得这 p p p 个关键点到这 k k k 个黑点的代价和最小。定义代价为两点之间边权最大的边的最小值。 你需要求出 k 1,2,…,n 的所有答案 E1 n,m,p&l…

WPS的JS宏实现删除某级标题下的所有内容

想要删除Word文档中&#xff0c;包含特定描述的标题下所有内容&#xff08;包含各级子标题以及正文描述&#xff09;。 例如下图中&#xff0c;想删除1.2.1.19.1业务场景下所有内容&#xff1a; 简单版&#xff1a; 删除光标停留位置的大纲级别下所有的内容。实现的JS代码如下…

在 ubantu 20.04 云服务器上基于 bochs 编译 linux0.11

安装 bochs 将下面的命令全部执行一遍&#xff1a; sudo apt-get install build-essential sudo apt-get install xorg-dev sudo apt-get install bison sudo apt-get install g 我们区官网下载一下bochs的源码&#xff1a;bochs下载 这里我下载好了bochs2.6.8 这个版本的…

【SQL】DDL语句

文章目录 1.SQL通用语法2.SQL的分类3.DDL3.1数据库操作3.2 表操作3.2.1 表操作--数据类型3.2.2 表操作--修改3.2.3 表操作--删除 SQL 全称 Structured Query Language&#xff0c;结构化查询语言。操作关系型数据库的编程语言&#xff0c;定义了一套操作关系型数据库统一标准 。…

Could not get JDBC Connection: wait millis 10000, active 500

Could not get JDBC Connection: nested exception is com,alibaba,druid.pool,GetConnectionTimeoutException: wait millis 10000, active 500 1、生产突然出现这样的问题&#xff0c;后经过各种分析查找 jmap -dump:formatb,filewar_l.hporf 10333 ‌jmap -dumpb命令用于生成…

vos3000外呼系统音质不好怎么解决

音质好坏主要取决于传输网络和经由设备的处理能力 如果 VOS 没有开启媒体转发的情况下&#xff0c;出现音质不好&#xff0c;那么排查问题时完全可以认为 VOS 是正常的&#xff0c;因为VOS没有参与语音流的处理&#xff0c;所以不涉及音质问题。可以尝试以下几个解决方案&…

OSPF的不规则区域

1.远离骨干非骨干区域 2.不连续骨干 解决方案 tunnel ---点到点GRE 在合法与非ABR间建立隧道&#xff0c;然后将其宣告于OSPF协议中&#xff1b; 缺点&#xff1a;1、周期和触发信息对中间穿越区域造成资源占用&#xff08;当同一条路由来自不同区域&#xff0c;路由器会先…

nacos源码修改持久化到postgreSQL数据库

很多业务场景&#xff0c;业务功能必须用pg数据库&#xff0c;这时候注册中心如果用mysql的话&#xff0c;显得浪费资源&#xff0c;基于此&#xff0c;nacos源码修改持久化到postgreSQL数据库是一个必然需求&#xff0c;此处我们修改为只支持pg数据库&#xff0c;2.4版本的源码…

Java基础:字符串详解

1 深入解读String类源码 1.1 String类的声明 public final class Stringimplements java.io.Serializable, Comparable<String>, CharSequence { }String类是final的&#xff0c;意味着它不能被子类继承&#xff1b;String 类实现了Serializable接口&#xff0c;意味着…

界星空科技漆包线行业称重系统

万界星空科技为漆包线行业提供的称重系统是其MES制造执行系统解决方案中的一个重要组成部分。以下是对该系统的详细介绍&#xff1a; 一、系统概述 万界星空科技漆包线行业称重系统&#xff0c;是集成在MES系统中的一个功能模块&#xff0c;专门用于漆包线生产过程中的重量检…