【Kafka】Kafka再平衡机制及相关参数

背景

Kafka作为一款基于发布订阅模式的消息队列,生产者将消息发送到Kafka集群(Brokers)中,消费者(Consumer Group )拉取消息进行消费,实现了异步机制。Kafka中,消费者通常以消费者组的方式进行消费,消费特点为:

  • 每个分区只能被一个消费组中的一个消费者所消费。
  • 消费组中一个消费者可以消费多个分区。
  • 多个消费组,每个消费组都可以消费topic中的所有数据,且消费位移之间互不影响。

一、Kafka的再平衡机制

在Kafka中,如果消费者数量、分区数变更或者消费者订阅的topic发生变化,也就需要再进行消费者消费分区的重新分配,这也就是所谓的再平衡。

1.1 再平衡定义

再平衡是指的是Consumer Group 下的 Consumer 所订阅的Topic发生变化时 发生的一种分区重分配机制。

也就是说,再平衡也就是一种协议,它规定了如何让消费组下的所有消费者来分配 Topic 中的每一个分区。

举个栗子:一个 Topic 有 100 个分区,一个消费者组内有有 20 个消费者,在协调者的控制下让消费者组内的每一个消费者分配到 5 个分区,这个分区分配的过程就是再平衡。

1.2 再平衡触发条件

一般来说,触发Kafka再平衡的条件一般是以下三种:

  • 主题分区发生改变,Kafka 目前只支持分区增加,当出现分区数增加的时候就会触发再平衡。

  • Consumer Group 中Consumer 个数发生变化(新增或者减少),导致其所消费的分区需要分配到组内其他的Consumer 上。这里的减少有很大可能是被动的,就是某个消费者出现崩溃掉线了。

  • Consumer 所订阅的Topic发生了新增分区的行为(Kafka目前只支持新增分区),那么新增的分区就会分配给当前的Consumer ,此时就会触发再平衡。

  • Consumer 订阅的topic发生变化,比如订阅的Topic采用的是正则表达式的形式。如 test-* 此时如果有新建了一个topic test-user,那么这个Topic的所有分区也是会自动分配给当前的Consumer 的,此时就会发生再平衡。

简洁一点,触发再平衡的条件就是:

  • Consumer Group 成员数变更。
  • Consumer Group 订阅的主题的分区数发生变更。
  • Consumer Group 的订阅主题数发生变更。

再平衡有什么危害呢,首先我们要知道,再平衡的过程中,消费者是无法从 Kafka集群中消费消息的,这对 Kafka的 系统吞吐量(TPS)影响极大,而如果 Kafka 集群内节点较多,那么再平衡可能比较耗时。数分钟到数小时都有可能,而这段时间,Kafka 是处于不可用状态。所以在实际环境中,应该尽量避免。

1.3 再平衡通知机制

那么发生再平衡的时候Kafka集群是如何通知到消费者的呢,答案就是通过消费者与Kafka集群之间的心跳机制。Kafka 消费者需要定期地发送心跳请求(Heartbeat Request) 到 Broker 端的协调者(Coordinator ),以证明消费者还活着。
   在 Kafka 0.10.0.1版本之前,发送心跳请求是在消费者主线程中完成的。这样就有很多问题,最大的问题在于,消息处理逻辑也是在这个线程中完后的。因此,一旦消息处理消耗很长的时间,心跳请求将无法及时发送到协调者那里,使协调者误以为该消费者死掉。
   Kafka 0.10.0.1 版本之后,Kafka 就提供了一个专门的线程去发送心跳,避免了这个问题。

再平衡就是通过这个心跳线程去通知其他消费者触发再平衡机制。当协调者开启新一轮的再平衡之后,它会将"REBALANCE_IN_PROGREESS"封装到心跳线程的响应信息中,返回给消费者实例,当消费者收到响应信息中含有 “REBALANCE_IN_PROGREESS” 信息,就知道再平衡开始了,这就是再平衡的通知机制。

二、再平衡流程

从再平衡的定义和触发再平衡条件中我们可以看出,再平衡主要是由Kafka集群和Kafka消费端一起完成的,更精确的来说,是Kafka的Broker端的Coordinator 和Consumer端一起完成的。

2.1 消费端再平衡流程

在消费者端,再平衡主要分为两个步骤:

  • 重新加入消费者组中。
  • 等待领导消费者(Leader Consumer) 分配方案。

这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求。

当组内成员加入组时,消费者会向协调者发送 JoinGroup 请求,在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。

通常情况下,第一个发送 JoinGroup 请求的成员会自动成为领导者领导消费者的任务是收集所有成员的订阅信息,然后根据这些信息,指定具体的分区消费分配方案。

选出领导者之后,协调者会把消费者的订阅信息封装在 JoinGroup 请求的响应中,然后发送给领导者,由领导者统一做出分区消费分配方案后,在进行下一步,发送 SyncGroup 请求。
在这里插入图片描述
在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。当然,其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式发给所有成员,这样组内的成员都知道自己该消费哪些分区的数据了。
在这里插入图片描述

因此,JoinGroup 请求的主要作用是将组成员的订阅信息发送给领导者消费者,待领导者制定好分配方案后,再平衡流程进入到 SyncGroup 请求阶段。而SyncGroup 请求的主要目的就是让协调者把领导者的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入 Stable 状态,即开始正常的消费工作。

2.2 Broker端的再平衡流程

Broker端的再平衡主要是Coordinator 处理再平衡的流程。从触发再平衡的条件来看,与Coordinator 相关的主要是新成员加入消费者组、消费者组成员主动离开、消费者组成员崩溃离组、组成员提交位移。

再平衡一旦开启,Broker 端的协调者组件就要开始忙了,主要涉及到控制消费者组的状态流转。当前 Kafka 设计了一套消费者组状态机(State Machine),来帮助协调者完成整个再平衡过程。严格来说,这套状态机属于非常底层的设计,Kafka 官网并没有提及过。目前,Kafka 为消费者组定义了 5 中状态,分别是:EmptyDeadPreparingRebalanceCompletingReblanceStable。每种状态对应的含义如下:

状态含义
Empty组内没有任何成员,但消费者可能存在已提交的位移数据,而且这些位移尚未过期。
Dead同样是在组内没有成员,但组内元数据信息已经在协调者端被移除。协调者组件保存着当前向它注册过的所有组信息,所谓的元数据信息类似于这个注册信息。
PreparingRebalance消费者组开启再平衡,此时所有成员都要重新加入消费者组。
CompletingRebalance消费者组小所有成员已经加入,各个成员已在等待分配方案。该状态在老一点版本中称为AwaitingSync,它和CompletingReblance是等价的。
Stable消费者组的稳定状态。该状态表名再平衡已经完成,组内各成员能够正常消费数据了。

一个消费者组最开始是 Empty状态,当再平衡开启后,它会被置为 PreparingRebalance 状态等待成员加入,之后变更为 CompletingReblance 状态等待分配方案,最后流转为 Stable,完成再平衡过程。
在这里插入图片描述

2.2.1 新成员加入消费者组

新成员加入消费者组导致触发再平衡主要指的当消费者组处于 Stable 状态后,有新成员加入。如果对全新启动一个消费者组,Kafka 是有一些自己的优化,流程会有些许的不同。我们这里要讨论的是,消费者组稳定之后有新成员加入的情形。

当协调者收到新的 JoinkGroup 请求后,它会通过心跳响应的方式通知组内现有的所有成员,强制它们开启新一轮的再平衡。具体的过程和之前的客户端再平衡流程是一样的。现在,用一张时序图说明协调者一端是如何处理新成员入组的。

在这里插入图片描述

2.2.2 消费者组成员主动离组

消费者实例所在线程或进程调用 Close() 方法主动通知协调者它要退出。这个场景就涉及到第三类请求:LeaveGroup 请求。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员。
在这里插入图片描述

2.2.3 消费者组成员崩溃离组

崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为主动发起的离组,协调者能马上感知并处理。但是崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms控制的。也就是说,Kafka 一般不会超过 session.timeout.ms 就能感知到这个崩溃。处理崩溃离组的流程如下:
在这里插入图片描述

2.2.4 协调者对组内成员移交位移处理

正常情况下,每个组成员都会定期汇报位移给协调者。当再平衡开启时,协调者会给予成员一端缓冲时间,要求每个成员必须在这段时间内快速上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup请求发送。
在这里插入图片描述

三、再平衡相关参数

  • session.timeout.ms:该参数是 Coordinator 检测消费者失败的时间,即在这段时间内客户端是否跟 Coordinator 保持心跳,如果该参数设置较小,可以更早发现消费者崩溃的信息,从而更快的开启再平衡,避免消费滞后,但同时这也会频繁的再平衡,需要根据实际业务来衡量。
  • max.poll.interval.ms:该参数表示消费者处理消息逻辑的最大时间,对于某些业务来说,处理消息可能需要很长时间,比如需要 1 分钟,那么该参数就需要设置成大于 1 分钟的值,否则就会被 Coordinator 剔除消息组然后再平衡。
  • heartbeat.interval.ms:该参数是消费端与Coordinator的心跳时间,该参数跟 session.timeout.ms 紧密相关,前面也说过,只要在 session.timeout.ms时间内与 Coordinator 保持心跳,就不会被 Coordinator 剔除,那么心跳间隔时间就是 session.timeout.ms,因此,该参数值必须小于 session.timeout.ms,以保持 session.timeout.ms时间内有心跳。每个consumer 都会根据 heartbeat.interval.ms参数指定的时间周期性地向Coordinator 发送 hearbeat,Coordinator 会给各个consumer响应,若发生了 rebalance,各个consumer收到的响应中会包含 REBALANCE_IN_PROGRESS标识,这样各个consumer就知道已经发生了rebalance,同时Coordinator 也知道了各个consumer的存活情况。

3.1 heartbeat.interval.ms 与 session.timeout.ms 的对比

session.timeout.ms是指:Coordinator检测consumer发生崩溃所需的时间。一个consumer group里面的某个consumer挂掉了,最长需要 session.timeout.ms 秒检测出来。

举个栗子:session.timeout.ms=10heartbeat.interval.ms=3

session.timeout.ms指定了一个阈值—10秒,在这个阈值内如果Coordinator未收到Consumer的任何消息,那Coordinator就认为Consumer挂了。而heartbeat.interval.ms主要是告诉Consumer要每3秒给Coordinator发一个心跳包,heartbeat.interval.ms越小,发的心跳包越多,它是会影响发TCP包的数量的。

如果Coordinator在一个heartbeat.interval.ms周期内未收到Consumer的心跳,就把该Consumer移出group,这有点说不过去。就好像Consumer犯了一个小错,就一棍子把它打死了。事实上,有可能网络延时,有可能Consumer出现了一次长时间GC,影响了心跳包的到达,说不定下一个heartbeat就正常了。

因此,heartbeat.interval.ms肯定是要小于session.timeout.ms的,如果Consumer Group发生了rebalance,通过心跳包里面的REBALANCE_IN_PROGRESS,Consumer就能及时知道发生了rebalance,从而更新Consumer可消费的分区。而如果超过了session.timeout.ms,Coordinator都认为Consumer挂了,那也当然不用把 rebalance信息告诉该Consumer了。

3.2 session.timeout.ms 和 max.poll.interval.ms

在kafka0.10.1之后的版本中,将session.timeout.msmax.poll.interval.ms 解耦了。

也就是说:创建Kafka消费者实例后,消费者不停地执行consumer.poll拉取消息这个过程中,其实背后是有2个线程的,即一个Kafka Consumer实例包含2个线程:一个是heartbeat 线程,另一个是processing线程

processing线程可理解为调用consumer.poll方法执行消息处理逻辑的线程,而heartbeat线程是一个后台线程,对程序员是"隐藏不见"的。如果消息处理逻辑很复杂,比如说需要处理5min,那么 max.poll.interval.ms可设置成比5min大一点的值。而heartbeat 线程则和上面提到的参数 heartbeat.interval.ms有关,heartbeat线程 每隔heartbeat.interval.ms向Coordinator发送一个心跳包,证明自己还活着。只要 heartbeat线程 在 session.timeout.ms时间内向 Coordinator发送过心跳包,那么Coordinator就认为当前的Kafka Consumer是活着的。

在kafka0.10.1之前,发送心跳包和消息处理逻辑这2个过程是耦合在一起的。

如果一条消息处理时长要5min,而session.timeout.ms=3000ms,那么等 Kafka Consumer处理完消息,Coordinator早就将Consumer 移出group了,因为只有一个线程,在消息处理过程中就无法向Coordinator发送心跳包,超过3000ms未发送心跳包,Coordinator就将该Consumer移出group了。

而将二者分开,一个processing线程负责执行消息处理逻辑,一个heartbeat线程负责发送心跳包,那么,就算一条消息需要处理5min,只要heartbeat线程在session.timeout.ms向Coordinator发送了心跳包,那Consumer可以继续处理消息,而不用担心被移出group了。另一个好处是:如果Consumer出了问题,那么在session.timeout.ms内就能检测出来,而不用等到max.poll.interval.ms 时长后才能检测出来。

TODO:后续根据sarama源码来看Kafka的再平衡过程。

参考

1、kafka 中参数:session.timeout.ms 和 heartbeat.interval.ms的区别
2、sarama 源码解析–Kafka的重平衡
3、kafka学习(五):消费者分区策略(再平衡机制)
4、Kafka【再平衡】

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/80344.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决方案| anyRTC远程检修应用场景

背景 在这个科技飞速发展的时代,各行各业都要求高效运转。然而,当出现问题时,我们却常常因为无法及时解决而感到困扰,传统解决问题的方式是邀请技术人员现场解决问题,如果技术人员解决不了,还要邀请专家从…

编程参考 - std::exchange和std::swap的区别

这两个功能是C standard library中的Standard template library中的一部分。容易混淆,我们来看下它们的区别。 exchange: 这个函数是一个返回原先值的set函数。 std::exchange is a setter returning the old value. int z std::exchange(x, y); Af…

C语言指针详解(3)———指针题目,你确定你学会指针了?进来看看吧!(几十个指针小题+超详解)

你确定你学会指针了? 你确定你明白数组名了? 如果你觉得你学的还不错,就进来看看吧,相信你看完之后一定能收获更多。 数组名的理解一定要弄清楚 数组名是数组首元素的地址 但是有2个例外: sizeof(数组名),这…

【MySQL多表查询以及事务、索引】

1. 多表查询 1.1 概述 1.1.1 数据准备 #建议:创建新的数据库 create database db04; use db04;-- 部门表 create table tb_dept (id int unsigned primary key auto_increment comment 主键ID,name varchar(10) not null unique comment 部门名称…

Jmeter 实现 mqtt 协议压力测试

1. 下载jmeter,解压 https://jmeter.apache.org/download_jmeter.cgi 以 5.4.3 为例,下载地址: https://dlcdn.apache.org//jmeter/binaries/apache-jmeter-5.4.3.zip linux下解压: unzip apache-jmeter-5.4.3.zip 2. 下载m…

proxy相对于object.defineproperty有哪些优点?

Proxy相对于Object.defineProperty的优点有1: 功能更强大:Proxy提供了更灵活的方式来操作对象,可以捕获和定制对象的各种操作,包括获取属性、设置属性、删除属性等。捕获多种操作:Proxy可以捕获多种不同的操作&#x…

在线海报图片设计器、图片编辑器源码/仿照稿定设计源码

在线海报设计系统素材设计源码是一个漂亮且功能强大的在线海报图片设计器,仿照稿定设计而成。该系统适用于多种场景,包括海报图片生成、电商分享图、文章长图、视频/公众号封面等。用户无需下载软件,即可轻松实现创意,迅速完成排版…

使用 React Native 针对 Android 进行开发

🎬 岸边的风:个人主页 🔥 个人专栏 :《 VUE 》 《 javaScript 》 ⛺️ 生活的理想,就是为了理想的生活 ! 目录 概述 通过安装所需工具开始使用 React Native 创建新的 React Native 项目 本指南将有助于开始使用 Windows 上的…

性能测试工具有哪些?原理是什么?怎么选择适合的工具?

前言 本篇文章主要简单总结下性能测试工具的原理以及如何选型。性能测试和功能测试不同,性能测试的执行是基本功能的重复和并发,需要模拟多用户,在性能测试执行时需要监控指标参数,同时性能测试的结果不是那么显而易见&#xff0…

43.228.64.X游戏盾在业务高峰时间段,出现用户大量掉线问题,可能是什么原因导致

服务器负载过高:业务高峰期间,服务器可能无法处理大量的用户请求,导致性能下降,甚至引起服务器崩溃,从而导致用户掉线。 网络带宽不足:当大量用户同时访问游戏盾时,网络带宽可能不足以支持所有…

Smart Community(1)之设计规范

通过前面大数据开发相关知识的学习,准备做一个项目进行练习---我给他起了一个响亮的名字:基于HadoopHA的智慧社区服务平台 设计规范: 做一个项目之前肯定要先规定一些开发过程中的设计规范 (一)数据埋点规范&#xf…

linux jenkins2.414.1-1.1版本安装

文章目录 前言一、rpm文件下载二、安装jenkins2.1.升级jdk1.82.2安装jenkins2.3 启动服务2.4 使用密码登录2.5 修改插件源2.6 汉化插件安装演示 总结 前言 之前也安装过jenkins,但是那个版本是2.1的,太老了很多插件都不支持,现在安装目前为止…

XSS笔记

一、xss漏洞通常是通过php的输出函数将javascript代码输出到html页面中&#xff0c;通过用户本地浏览器执行的&#xff0c;所以xss漏洞关键就是寻找参数未过滤的输出函数。 二、XSS的攻击方式 1、 反射型XSS&#xff1a;<非持久化> 攻击者事先制作好攻击链接, 需要欺骗用…

RocketMq(一)安装部署

一、linux单机部署&#xff1a; 1、到apache官网下载 | RocketMQ (apache.org)下载binary zip包&#xff0c;如我下载的4.9.6版本。 上传到建好的/usr/local/rocketmq目录下。 2、解压zip包 unzip rocketmq-all-4.9.6-bin-release.zip 3、进入解压后的文件夹,启动 Name Serv…

vue之$emit返回值

使用场景&#xff1a;子组件调用父组件的方法并获取到父组件方法的返回值 方法(一): 回调函数: 1.子组件方法&#xff1a; ……methods:{ShowMore(step){this.$emit("ChildClick",step,(res)>{//回调函数的方法体.处理自己的业务.});}}…… 2、父组件方法 ……

java23种设计模式与追MM

1、FACTORY—追MM少不了请吃饭了&#xff0c;麦当劳的鸡翅和肯德基的鸡翅都是MM爱吃的东西&#xff0c;虽然口味有所不同&#xff0c;但不管你带MM去麦当劳或肯德基&#xff0c;只管向服务员说“来四个鸡翅”就行了。麦当劳和肯德基就是生产鸡翅的Factory 工厂模式&#xff1a…

02目标检测-传统检测方法

目录 一、目标学习的检测方法变迁及对比 二、 基于传统手工特征的检测算法的定义 三、传统主要手工特征与算法 Haar特征与 人脸检测算法 - Viola-Jones(了解) HOG特征与 SVM 算法(了解)&#xff08;行人检测、opencv实现&#xff09; SIFT特征与SIFT算法(了解) DPM&#…

redis--windows配置--redis基础

写在前面&#xff1a; 文章目录 win安装配置密码配置服务服务已经存在 可视化工具运行类型基础类型 帮助文档命令通用命令string命令hashlistsetsortedset win安装 下载地址 然后一路next就可以了。 记得添加到环境变量 配置密码 在目录打开配置文件 搜索requirepass …

Fluent实现大量气泡的随机分布案例

1、问题介绍 气液两相管流在石油、化工、能源、制冷等工业领域中大量存在。气泡在生存发展过程中往往会导致噪声和引起管道振动&#xff0c;自来水管路中如有空气时往往会产生啸叫声和管道剧烈振动。掌握流动过程中气泡的生成、发展及其破裂等动力规律是控制气液两相流气动噪声…

C++qt day8

1.用代码实现简单的图形化界面&#xff08;并将工程文件注释&#xff09; 头文件 #ifndef MYWIDGET_H #define MYWIDGET_H //防止头文件冲突#include <QWidget> //父类的头文件class MyWidget : public QWidget //自定义自己的界面类&#xff0c;公共继承…