Kafka 消费者启动后与服务器的交互流程

Kafka 消费者启动后与服务器的交互流程涉及多个关键步骤,主要包括初始化、查找组协调器、加入消费者组、分区分配、心跳维持、拉取数据和提交偏移量等。以下是详细的流程说明:

1. 初始化消费者

  • 创建消费者实例:应用程序通过调用KafkaConsumer的构造函数,传入配置参数创建消费者实例。
  • 配置参数:包括bootstrap.servers(Kafka集群地址)、group.id(消费者组ID)、key.deserializervalue.deserializer等。

2. 订阅主题

  • 调用subscribe方法:消费者通过调用subscribe方法订阅一个或多个主题,也可以使用正则表达式来匹配多个主题。

3. 查找组协调器

  • 消费者发送FindCoordinator请求:消费者向Kafka集群中的任意Broker发送FindCoordinator请求,请求中包含消费者组ID。
  • Broker服务器接收请求:Broker根据消费者组ID计算出组协调器所在的Broker节点,并返回该节点的地址信息。
    - 计算组协调器算法:
/*** 表示 内部主题 __consumer_offsets 的分区数量,默认初始化值是50(顺带一提__consumer_offsets 副本因子默认值是3)* 初始值为 -1,表示尚未设置。* 使用 volatile 关键字确保在多线程环境中对该变量的修改是可见的。*/private volatile int numPartitions = -1;/*** 内部主题 __consumer_offsets 的各个分区分布在各个Broker服务器上,算出当前消费者组的协调器在哪个服务器* 消费者组协调器所在brokerId = 消费者组id的哈希值 % 50 */coordinator_broker_id = Utils.abs(groupId.hashCode()) % numPartitionspublic static int abs(int n) {return (n == Integer.MIN_VALUE) ? 0 : Math.abs(n);}
  • 消费者连接组协调器
    • 消费者根据FindCoordinator响应中的地址信息,连接到组协调器。

4. 加入消费者组

  • Kafka消费者加入消费者组的过程主要涉及JoinGroup和SyncGroup两个关键步骤。这个过程确保消费者能够以协调的方式加入消费者组,并且分区能够被合理地分配给消费者组内的消费者。以下是JoinGroup和SyncGroup的具体流程:
  • 阶段一:JoinGroup阶段
    • (1)发送JoinGroup请求:当消费者启动并调用poll方法时,如果它尚未加入消费者组,或者需要重新加入(例如,因为再平衡),它会向组协调器发送JoinGroup请求。这个请求包含消费者的group.id、订阅的主题列表以及消费者使用的分区分配策略。

    • (2)等待响应:组协调器收到JoinGroup请求后,会等待一段时间,以允许其他消费者也发送他们的JoinGroup请求。这个等待时间是为了收集同一消费者组内所有消费者的信息。

    • (3)选择Leader:对于同一个消费者组的第一次JoinGroup请求,协调器会选择第一个消费者作为Leader。Leader负责为组内的所有消费者分配分区。Leader的选择基于消费者的JoinGroup请求顺序。

    • (4)分区分配策略:Leader消费者收到协调器的响应后,会根据提供的分区分配策略(如RangeRoundRobin等)和所有消费者的订阅信息来决定分区的分配方案。

  • 阶段二:SyncGroup阶段
    • (1)发送SyncGroup请求:Leader消费者将分区分配方案通过SyncGroup请求发送给组协调器。随后,组内的其他消费者也发送SyncGroup请求,但不包含分区分配方案。

    • (2)协调器广播分区分配方案:组协调器接收到SyncGroup请求后,将leader消费者的分区分配方案广播给消费者组内的所有消费者。

5. 开始消费

  • 消费者接收分区分配:每个消费者接收到SyncGroup响应后,会知道自己被分配到了哪些分区。
  • 初始化分区消费:消费者根据分配到的分区,初始化分区消费的相关资源,如设置分区的偏移量。
  • 拉取数据并消费:消费者开始从分配给它的分区拉取数据并进行消费。

6. 心跳维持和再平衡

  • 发送心跳:消费者会定期向组协调器发送心跳,以表明它仍然活跃。
  • 处理再平衡:如果有新的消费者加入或现有消费者离开消费者组,协调器会触发再平衡过程,重新分配分区。

7. 拉取数据

  • 发送Fetch请求:消费者向分配给它的分区的Leader Broker发送Fetch请求,请求包含拉取数据的偏移量。
  • 接收数据:Broker返回包含消息的响应,消费者处理这些消息。

8. 提交偏移量

  • 自动提交:如果启用了自动提交(enable.auto.commit=true),消费者会定期自动提交消费的偏移量。
  • 手动提交:如果使用手动提交,消费者需要调用commitSynccommitAsync方法提交偏移量。

9. 处理再平衡

  • 再平衡触发:当消费者组成员发生变化,协调器会触发再平衡。
      1. 消费者加入消费者组
        当新的消费者加入现有的消费者组时,会触发再平衡。新消费者可能是新启动的实例,或者是之前失败后重新加入的实例。
      1. 消费者离开消费者组
      • 主动离开:消费者调用close方法或者主动离开消费者组时,会触发再平衡。
      • 被动离开:如果消费者因为网络问题、崩溃或者长时间未发送心跳而被组协调器认为已经离开,也会触发再平衡。
      1. 订阅主题的分区数变化
        如果消费者组订阅的主题新增了分区,那么为了将新增的分区分配给消费者,也会触发再平衡。
      1. 消费者订阅模式变化
        如果消费者组内的任何消费者更改了其订阅模式(例如,通过subscribe方法订阅了新的主题或者取消订阅了某些主题),这也会触发再平衡。
      1. 组协调器变更
        如果负责管理消费者组的组协调器(Group Coordinator)发生变化(例如,因为原协调器所在的Broker宕机),新的协调器在接管消费者组管理职责时,会触发再平衡。
      1. 主题元数据变化
        消费者定期从Broker获取订阅主题的元数据(如分区信息)。如果检测到元数据变化,可能会触发再平衡,尽管这种情况较少见。
  • 暂停拉取:在再平衡期间,消费者会暂停拉取数据。
  • 重新分配分区:协调器重新分配分区,并通知消费者新的分区分配情况。
  • 恢复拉取:再平衡完成后,消费者恢复拉取数据。

示例代码

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic")); // 订阅主题// 消费者加入消费者组并开始消费的过程是在第一次调用poll方法时触发的
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}// 提交偏移量
consumer.commitSync();

在上述代码中,消费者通过调用subscribe方法订阅了主题test-topic,然后通过调用poll方法触发了加入消费者组的完整流程,包括查找组协调器、加入消费者组、分区分配、拉取数据和提交偏移量等步骤。

总结

Kafka消费者启动后与服务器的交互流程是一个复杂的过程,涉及与组协调器的多次交互。这个流程确保了消费者能够正确地加入消费者组,分区能够被合理地分配给消费者组内的消费者,并且在消费者组成员变化时能够进行适当的再平衡,同时保证了消费者能够从正确的位置继续消费数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/50300.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PHP表单必需字段

在PHP中处理表单时&#xff0c;确保必填字段被正确填写是非常重要的。这通常涉及到在客户端&#xff08;使用HTML5&#xff09;和服务器端&#xff08;使用PHP&#xff09;进行验证。以下是一个关于PHP表单必需字段的详细教程&#xff0c;包括如何在客户端和服务器端进行验证。…

【计算机网络】TCP和UDP的封装以及案例

TCP和UDP的封装以及案例 背景知识TCP实现UDP实现封装Network用NetWork再次实现TCP和UDP小知识点 背景知识 TCP&#xff1a;传输控制协议&#xff08;Transmission Control Protocol&#xff09; UDP&#xff1a;用户数据报协议 &#xff08;User Datagram Protocol&#xff09…

Spring Bean的初始化过程

在Spring框架中&#xff0c;Bean是应用程序的基本构建块。每个Bean都是由Spring IoC容器管理的对象实例&#xff0c;用于封装业务逻辑或资源。理解Spring Bean的初始化过程对于有效地使用和配置Spring框架至关重要。本文将详细解释Spring Bean的生命周期&#xff0c;包括其创建…

AI的欺骗游戏:揭示多模态大型语言模型的易受骗性

人工智能咨询培训老师叶梓 转载标明出处 多模态大型语言模型&#xff08;MLLMs&#xff09;在处理包含欺骗性信息的提示时容易生成幻觉式响应。尤其是在生成长响应时&#xff0c;仍然是一个未被充分研究的问题。来自 Apple 公司的研究团队提出了MAD-Bench&#xff0c;一个包含8…

Spring的优缺点?

Spring的优缺点 直接回答相关的Spring的特点&#xff1a; IOC AOP 事务 简化开发&#xff1a; 容易集成JDBCTemplateRestTemplate&#xff08;接口远程调用&#xff09;邮件发送相关异步消息请求支持 更加深入就讲源码了 优点&#xff1a; 方便解耦&#xff0c;简化开发…

网站打不开怎么办,收藏以备不时之需

DNS设置示范教程 部分地区有使用移动网络的小伙伴们吐槽无法访问部分网站的情况&#xff0c;同样的网站&#xff0c;使用电信和联通的用户就能正常访问。 这其实有很大几率是由于运营商的网络问题导致的&#xff0c;容易出现网站打不开的结果。 要解决移动网络无法访问的情况…

docker 部署mysql nginx redis

设置镜像 sudo mkdir -p /etc/docker sudo tee /etc/docker/daemon.json <<-EOF {"registry-mirrors": ["https://tddq0ov6.mirror.aliyuncs.com"] } EOF sudo systemctl daemon-reload 重启 sudo systemctl restart docker mysql mkdir /docker/my…

[React]如何提高大数据量场景下的Table性能?

[React]如何提高大数据量场景下的Table性能&#xff1f; 两个方向&#xff1a;虚拟列表&#xff0c;发布订阅 虚拟列表 虚拟列表实际上只对可视区域的数据项进行渲染 可视区域&#xff08;visibleHeight&#xff09;: 根据屏幕可视区域动态计算或自定义固定高度数据渲染项&…

python_合并同一个文件夹下的excel文件

python_合并同一个文件夹下的excel文件 import os import glob import pandas as pddef merge_excel_sheets(input_folder, output_file):# 创建一个空的 DataFrame 用于存储所有数据combined_data pd.DataFrame()# 获取指定文件夹内所有的 Excel 文件excel_files glob.glob…

el-select下拉数据量太大,改成滚动加载数据

问题描述&#xff1a;当接口返回下拉数据量特别大的时候&#xff0c; 页面会卡顿&#xff0c; 下面采用下拉加载指定数据的方式来优化。 <template><el-selectv-model"value"filterableplaceholder"Select"v-focus"loadData(loadNumber)&qu…

(面试必看!)一些和多线程相关的面试考点

文章导读 引言考点1. CAS 指令&#xff08;重点&#xff09;一、什么是CAS二、CAS 的优点三、CAS 的缺点四、ABA问题五、相关面试题 考点2. 信号量&#xff08;semaphore&#xff09;一、基本概念二、信号量的主要操作三、信号量的应用四、相关面试题 考点3、CountDownLatch 类…

DHCP笔记

DHCP---动态主机配置协议 作用&#xff1a;为终端动态提供IP地址&#xff0c;子网掩码&#xff0c;网关&#xff0c;DNS网址等信息 具体流程 报文抓包 在DHCP服务器分配iP地址之间会进行广播发送arp报文&#xff0c;接收IP地址的设备也会发送&#xff0c;防止其他设备已经使用…

网络编程 - 粘包与拆包第一弹 - 深入理解TCP粘包与拆包问题

作者&#xff1a;逍遥Sean 简介&#xff1a;一个主修Java的Web网站\游戏服务器后端开发者 主页&#xff1a;https://blog.csdn.net/Ureliable 觉得博主文章不错的话&#xff0c;可以三连支持一下~ 如有疑问和建议&#xff0c;请私信或评论留言&#xff01; 前言 在网络编程中&a…

Unity3D 二进制序列化器详解

前言 在Unity3D开发中&#xff0c;二进制序列化是一种重要的数据持久化和网络传输技术。通过二进制序列化&#xff0c;游戏对象或数据结构可以被转换成二进制格式&#xff0c;进而高效地存储于文件中或通过网络传输。本文将详细介绍Unity3D中的二进制序列化技术&#xff0c;包…

如何利用 NLP 技术提高机器翻译中对文化特定词汇和习语的理解与翻译准确性?

要利用 NLP 技术提高机器翻译中对文化特定词汇和习语的理解与翻译准确性&#xff0c;可以采用以下方法&#xff1a; 数据收集与预处理&#xff1a;收集与文化特定词汇和习语相关的大量平行语料&#xff0c;确保数据集中包含丰富的文化特定内容。进行数据预处理&#xff0c;包括…

【手撕数据结构】栈和队列高频面试题

目录 括号匹配问题用队列实现栈用栈实现队列 括号匹配问题 给定一个只包括 ‘(’&#xff0c;‘)’&#xff0c;‘{’&#xff0c;‘}’&#xff0c;‘[’&#xff0c;‘]’ 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&#xff1a; 1.左括号必须用相同类…

卓码软件测评:软件功能测试和非功能测试详情介绍

随着信息技术的不断发展&#xff0c;软件在我们日常生活与工作中扮演着越来越重要的角色。然而&#xff0c;软件质量的好坏直接关系到使用者的体验和企业的声誉。在软件开发过程中&#xff0c;功能测试和非功能测试作为保证软件质量的重要手段&#xff0c;受到了越来越多的关注…

【过题记录】 7.28 (树上dp,背包,换根,基环树)

[ZJOI2007] 时态同步 分析&#xff1a; 不难发现&#xff0c;中断点就是叶子节点&#xff0c; 首先&#xff0c;所有叶子节点的高度肯定就等于最深的那个叶子节点的深度。 且不可能去调整最深的叶子结点的深度了。 这样经过一遍dfs之后我们可以计算出每个叶子需要增加的高度。…

古文:文天祥《正气歌》

原文 正气歌 【作者】文天祥 【朝代】宋 余囚北庭&#xff0c;坐一土室。室广八尺&#xff0c;深可四寻。单扉低小&#xff0c;白间短窄&#xff0c;污下而幽暗。当此夏日&#xff0c;诸气萃然&#xff1a;雨潦四集&#xff0c;浮动床几&#xff0c;时则为水气&#xff1b;涂泥…

内容营销专家刘鑫炜:极狐车自燃风波自救,堪称品牌危机公关范本

近日&#xff0c;极狐电车自燃事件在社交媒体上迅速发酵&#xff0c;尤其是厂家在事故现场的第一反应——先抠车标、覆盖黑布的行为&#xff0c;更是引发了公众的广泛质疑与愤慨。这一突发事件不仅考验着极狐汽车的产品安全性能&#xff0c;更对其品牌危机公关能力提出了严峻挑…