Kafka消费者组重平衡(二)

文章目录

    • 概要
    • 重平衡通知机制
    • 消费组组状态
    • 消费端重平衡流程
    • Broker端重平衡流程

概要

上一篇Kafka消费者组重平衡主要介绍了重平衡相关的概念,本篇主要梳理重平衡发生的流程。

为了更好地观察,数据准备如下:
kafka版本:kafka_2.13-3.2.1
控制台创建topic (2个分区1个副本):
bin/kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 1 --partitions 2 --topic test-rebalance

本地启动两个SpringBoot项目实例,代码如下

@KafkaListener(topics = "test-rebalance", groupId = "test-group")
public void rebalanceConsumer(ConsumerRecord<String, String> recordInfo) {int partition = recordInfo.partition();System.out.println("partition:" + partition + " value:" + recordInfo.value());
}

重平衡通知机制

Kafka Java 消费者需要定期地发送心跳请求(Heartbeat Request)到 Broker 端的协调者,以表明它还存活着。在 Kafka 0.10.1.0 版本之前,发送心跳请求是在消费者主线程完成的,也就是调用 KafkaConsumer.poll 方法的那个线程。
这样的设计存在弊端,一旦消息处理消耗了过长的时间,心跳请求将无法及时发到协调者那里,导致协调者“错误地”认为该消费者已“死”。自 0.10.1.0 版本开始,Kafka引入了一个单独的心跳线程来专门执行心跳请求发送,避免了这个问题。

重平衡的通知机制正是通过心跳线程来完成的。当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制。

消费组组状态

Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。那么,这 5 种状态的含义如下:

状态含义
Empty组内没有任何成员
Dead组内没有任何成员,但组的元数据已经在协调者端删除
PreparingRebalance消费组组准备开启重平衡,此时所有成员都要重新请求加入消费者组
CompletingRebalance消费者组下所有成员已经加入,各个成员正在等待分配方案
Stable消费者组的稳定状态,该状态表明重平衡已经完成,组内各成员都能够正常消费数据了

以下是各个状态的流转:

在这里插入图片描述

一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于PreparingRebalance 状态等待成员加入,之后变更到CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。

创建一个topic并逐步启动两个消费者实例:
服务端日志:

INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group test-group in Empty state. Created a new member id consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Preparing to rebalance group test-group in state PreparingRebalance with old generation 3 (__consumer_offsets-12) (reason: Adding new member consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Stabilized group test-group generation 4 (__consumer_offsets-12) with 1 members (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Assignment received from leader consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 for group test-group for generation 4. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

客户端一启动日志

o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Successfully joined group with generation 4
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting newly assigned partitions: test-rebalance-1, test-rebalance-0
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=node1:9092 (id: 0 rack: null), epoch=0}}
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=node1:9092 (id: 0 rack: null), epoch=0}}
o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions assigned: [test-rebalance-1, test-rebalance-0]

有新成员加入后broker日志

INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group test-group in Stable state. Created a new member id consumer-1-97eba1d4-7f5c-4d78-b979-ba6ab9c82395 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Preparing to rebalance group test-group in state PreparingRebalance with old generation 4 (__consumer_offsets-12) (reason: Adding new member consumer-1-97eba1d4-7f5c-4d78-b979-ba6ab9c82395 with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Stabilized group test-group generation 5 (__consumer_offsets-12) with 2 members (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Assignment received from leader consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 for group test-group for generation 5. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

接下来启动客户端二

客户端一日志

 o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Successfully joined group with generation 5
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting newly assigned partitions: test-rebalance-0
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=node1:9092 (id: 0 rack: null), epoch=0}}
o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions assigned: [test-rebalance-0]

客户端二日志


o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Attempt to heartbeat failed since group is rebalancing
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Revoking previously assigned partitions [test-rebalance-1, test-rebalance-0]
o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions revoked: [test-rebalance-1, test-rebalance-0]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Successfully joined group with generation 5
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting newly assigned partitions: test-rebalance-1
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions assigned: [test-rebalance-1]

当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。

消费端重平衡流程

重平衡的完整流程需要消费者端和协调者组件(什么是协调者)共同参与才能完成。下面先梳理消费者端的重平衡流程。主要分为3各阶段。

第一阶段:确定组协调器
关于如何确定组协调器,参考Kafka消费者重平衡(一)

第二阶段:JoinGroup

在此阶段的消费者会向Group Coordinator 发送 JoinGroupRequest 请求,并处理响应。然后GroupCoordinator 从一个consumer group中选择第一个(通常情况下)加入group的consumer作为leader(消费组协调器),把consumer group情况发送给这个leader,接着这个leader会负责制定分区方案。

在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步的主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。

在这里插入图片描述

第三阶段:SyncGroup

待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段

在这里插入图片描述
SyncGroup 请求的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。

Broker端重平衡流程

下面只要从几个常见的场景梳理Broker端重平衡的流程。

新成员加入

当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。
具体流程如下:

在这里插入图片描述

组成员主动离组

消费者实例所在线程或进程调用 close() 方法时,就会主动通知协调者要退出组。以下时具体的流程

在这里插入图片描述

组成员崩溃离组

崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/83983.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nodejs定时任务

项目需求&#xff1a; 每5秒执行一次&#xff0c;多个定时任务错开&#xff0c;即cron表达式中斜杆前带数字&#xff0c;例如 ‘1/5 * * * * *’定时任务准时&#xff0c;延误低 搜索了nodejs的定时任务&#xff0c;其实不多&#xff0c;找到了以下三个常用的&#xff1a; n…

OpenCV中的HoughLines函数和HoughLinesP函数到底有什么区别?

一、简述 基于OpenCV进行直线检测可以使用HoughLines和HoughLinesP函数完成的。这两个函数之间的唯一区别在于,第一个函数使用标准霍夫变换,第二个函数使用概率霍夫变换(因此名称为 P)。概率版本之所以如此,是因为它仅分析点的子集并估计这些点都属于同一条线的概率。此实…

2D游戏开发和3D游戏开发有什么不同?

2D游戏开发和3D游戏开发是两种不同类型的游戏制作方法&#xff0c;它们之间有一些显著的区别&#xff1a; 1. 图形和视觉效果&#xff1a; 2D游戏开发&#xff1a; 2D游戏通常使用二维图形&#xff0c;游戏世界和角色通常在一个平面上显示。这种类型的游戏具有平面的外观&…

数据仓库模型设计V2.0

一、数仓建模的意义 数据模型就是数据组织和存储方法&#xff0c;它强调从业务、数据存取和使用角度合理存储数据。只有将数据有序的组织和存储起来之后&#xff0c;数据才能得到高性能、低成本、高效率、高质量的使用。 高性能&#xff1a;良好的数据模型能够帮助我们快速查询…

shell脚本命令

Shell命令是在类Unix操作系统中使用的命令行解释器&#xff08;shell&#xff09;中执行的命令。Shell命令可以用于执行系统命令、操作文件、进行文本处理、管理进程等。以下是一些常见的Shell命令&#xff1a; 1. ls&#xff1a;列出当前目录下的文件和文件夹。 2. cd&#x…

界面组件DevExpress WinForms v23.1亮点 - 全新升级HTML CSS模板

DevExpress WinForms拥有180组件和UI库&#xff0c;能为Windows Forms平台创建具有影响力的业务解决方案。DevExpress WinForms能完美构建流畅、美观且易于使用的应用程序&#xff0c;无论是Office风格的界面&#xff0c;还是分析处理大批量的业务数据&#xff0c;它都能轻松胜…

2020-2023中国高等级自动驾驶产业发展趋势研究-概念界定

1.1 概念界定 自动驾驶发展过程中&#xff0c;中国出现了诸多专注于研发L3级以上自动驾驶的公司&#xff0c;其在业界地位也越来越重要。本报告围绕“高等级自动驾驶” 展开&#xff0c;并聚焦于该技术2020-2023年在中国市场的变化趋势进行研究。 1.1.1 什么是自动驾驶 自动驾驶…

C#中的方法

引言 在C#编程语言中&#xff0c;方法是一种封装了一系列可执行代码的重要构建块。通过方法&#xff0c;我们可以将代码逻辑进行模块化和复用&#xff0c;提高代码的可读性和可维护性。本文将深入探讨C#中的方法的定义、参数传递、返回值、重载、递归等方面的知识&#xff0c;…

小型水库雨水情测报和大坝安全监测解决方案

一、建设背景 我国小型水库数量众多&#xff0c;大多由农村集体经济组织管理&#xff0c;灌溉、供水、防洪、生 态效益突出&#xff0c;是农业生产、农民生活、农村发展和区域防洪的重要基础设施&#xff0c;实施乡 村振兴战略和生态文明建设的重要支撑保障。由于小型水库工程存…

zabbix自定义监控内容案例

一、自定义监控内容 案列&#xff1a;自定义监控客户端服务器登录的人数需求&#xff1a;限制登录人数不超过 3 个&#xff0c;超过 3 个就发出报警信息 1、在客户端创建自定义key 明确需要执行的linux命令 创建zabbix监控项配置文件&#xff0c;用于自定义Key #在zabbix的…

小谈设计模式(3)—策略模式

小谈设计模式&#xff08;3&#xff09;—策略模式 专栏介绍专栏地址专栏介绍 策略模式主要角色环境&#xff08;Context&#xff09;抽象策略&#xff08;Strategy&#xff09;具体策略&#xff08;Concrete Strategy&#xff09;角色总结 核心思想封装算法定义抽象策略使用环…

Selenium Grid 的搭建方法

传统 Selenium Grid 的搭建方法 搭建一个具有 1 个 Node 的 Selenium Grid。那么通常来讲我们需要 2 台机器&#xff0c;其中一台作为 Hub&#xff0c;另外一台作为 Node&#xff0c;并要求这两台机器已经具备了 Java 执行环境。 1.通过官网下载 selenium-server-standalone-…

SpringMVC之JSON数据返回异常处理机制

目录 前言 一、JSON数据返回 1.导入依赖 2.配置spring-mvc.xml 3.使用ResponseBody注解 4.Jackson 4.1.介绍 4.2.常用注解 二、异常处理机制 1.为什么要全局异常处理 2.异常处理思路 3.SpringMVC异常分类 4.综合案例 4.1.异常处理方式一 4.2.异常处理方式二 4.3…

git提示:remote origin already exists

目录 问题场景 问题原因 问题解决 问题场景 在GitLab中新建仓库后&#xff0c;然后将本地项目提交提示&#xff1a;remote origin already exists. 问题原因 error: remote origin already exists. 错误&#xff1a;远程源点已存在&#xff08;翻译&#xff09; 出现该错误的…

AI AIgents时代-(四.)应用上手

HuggingGPT & MetaGPT . &#x1f7e2; HuggingGPT HuggingGPT是一个多模型调用的 Agent 框架&#xff0c;利用 ChatGPT 作为任务规划器&#xff0c;根据每个模型的描述来选择 HuggingFace 平台上可用的模型&#xff0c;最后根据模型的执行结果生成总结性的响应。 这个项…

软件测试 —— 答疑篇

什么是软件测试&#xff1a; 软件测试是不是就是找 bug &#xff1f; 软件测试就是证明软件不存在错误的过程 软件测试就是为了证明程序能够正确运行 刚新买来一部手机&#xff0c;我们要干什么&#xff1f; 一场考试 , 做完一遍题目之后 , 进行一遍检查 , 就是在 "…

【LeetCode热题100】--560.和为K的子数组

560.和为K的子数组 示例2的结果&#xff1a; 输入&#xff1a;nums [1,2,3] ,k3的时候 连续子数组有[1,2],[3]&#xff0c;一共有2个 利用枚举法&#xff1a; 枚举[0,…i]里所有的下标j来判断是否符合条件 class Solution {public int subarraySum(int[] nums, int k) {i…

不得不爱的AI艺术写真头像二维码生成小程序开发

最近什么最火&#xff1f;AI最火&#xff01; AI里什么最火&#xff1f;艺术写真生成和二维码美化最火。 一款小程序集合了高还原度的AI写真艺术照和二维码美化&#xff0c;你们说香还是不香&#xff1f; 并且加入了输入心愿就能生成独一无二的个性头像功能&#xff0c;直接…

uniapp 轮播列表左右滑动,滑动到中间放大

html <!-- 轮播 --><view class"heade"><swiper class"swiper" display-multiple-items3 circulartrue previous-margin1rpxnext-margin1rpx current0 change"swiperChange" ><block v-for"(item,index) in list"…

【面试经典150 | 数组】删除有序数组中的重复项 II

文章目录 写在前面Tag题目解读题目来源解题思路方法一&#xff1a;原地操作 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法&#xff0c;两到三天更新一篇文章&#xff0c;欢迎催更…… 专栏内容以分析题目为主&#xff0c;并附带一些对于本题涉及到的数据结构等…