Kafka-消费者-KafkaConsumer分析-ConsumerCoordinator

在前面介绍了Kafka中Rebalance操作的相关方案和原理。

在KafkaConsumer中通过ConsumerCoordinator组件实现与服务端的GroupCoordinator的交互,ConsumerCoordinator继承了AbstractCoordinator抽象类。

下面我们先来介绍AbstractCoordinator的核心字段,如图所示。

在这里插入图片描述- heartbeat:心跳任务的辅助类,其中记录了两次发送心跳消息的间隔(interval字段)、最近发送心跳的时间(lastHeartbeatSend字段)、最后收到心跳响应的时间(lastHeartbeatReceive字段)、过期时间(timeout字段)、心跳任务重置时间(lastSessionReset字段),同时还提供了计算下次发送心跳的时间(timeToNextHeartbeat()方法)、检测是否过期的方法(sessionTimeoutExpired()方法)。

  • heartbeatTask:HeartbeatTask是一个定时任务,负责定时发送心跳请求和心跳响应的处理,会被添加到前面介绍的ConsumerNetworkClient.delayedTasks定时任务队列中。
  • groupld:当前消费者所属的Consumer Group的Id。
  • client:ConsumerNetworkClient对象,负责网络通信和执行定时任务。
  • needsJoinPrepare:标记是否需要执行发送JoinGroupRequest请求前的准备操作。
  • rejoinNeeded:此字段是否重新发送JoinGroupRequest请求的条件之一。

下面先简单了解修改其值的地方和含义,如图所示。

在这里插入图片描述
上图①处是收到正常的JoinGroupResponse响应,将rejoinNeeded设置为false,防止重复发送JoinGroupRequest请求。

②、③、④三处分别是收到异常的SyncGroupResponse或HeartbeatResponse或消费者离开Consumer Group时执行的操作,它们都会将rejoinNeeded设置为true,表示可以重新发送JoinGroupRequest。

  • coordinator:Node类型,记录服务端GroupCoordinator所在的Node节点。
  • memberld:服务端GroupCoordinator返回的分配给消费者的唯一Id。
  • generation:服务端GroupCoordinator返回的年代信息,用来区分两次Rebalance操作。由于网络延迟等问题,在执行Rebalance操作时可能收到上次Rebalance过程的请求,避免这种干扰,每次Rebalance操作都会递增generation的值。

下面是ConsumerCoordinator的核心字段。

  • assignors:PartitionAssignor列表。在消费者发送的JoinGroupRequest请求中包含了消费者自身支持的PartitionAssignor信息,GroupCoordinator从所有消费者都支持的分配策略中选择一个,通知Leader使用此分配策略进行分区分配。此字段的值通过partition.assignment.strategy参数配置,可以配置多个。

  • metadata:记录了Kafka集群的元数据。

  • subscriptions:SubscriptionState对象,参考SubscriptionState小节。

  • autoCommitEnabled:是否开启了自动提交offset。

  • autoCommitTask:自动提交offset的定时任务。

  • interceptors:ConsumerInterceptor集合。

  • excludeInternalTopics:标识是否排除内部Topic。

  • metadataSnapshot:用来存储Metadata的快照信息,主要用来检测Topic是否发生了分区数量的变化。在ConsumerCoordinator的构造方法中,会为Metadata添加一个监听器,当Metadata更新时会做下面几件事。

    • 如果是AUTO_PATTERN模式,则使用用户自定义的正则表达式过滤Topic,得到需要订阅的Topic集合后,设置到SubscriptionState的subscription集合和groupSubscription集合中。

    • 如果是AUTO_PATTERN或AUTO_TOPICS模式,为当前Metadata做一个快照,这个快照底层是使用HashMap记录每个Topic中Partition的个数。将新旧快照进行比较,发生变化的话,则表示消费者订阅的Topic发生分区数量变化,则将SubscriptionState的needsPartitionAssignment字段置为true,需要重新进行分区分配。

    • 使用metadataSnapshot字段记录变化后的新快照。

  • assignmentSnapshot:也是用来存储Metadata的快照信息,不过是用来检测Partition分配的过程中有没有发生分区数量变化。具体是在Leader消费者开始分区分配操作前,使用此字段记录Metadata快照;收到SyncGroupResponse后,会比较此字段记录的快照与当前Metadata是否发生变化。如果发生变化,则要重新进行分区分配。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/630568.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GO——与PHP的并发对比

背景 go比php可支持的并发数更高,为什么 目标 分析点: 系统的并发瓶颈go语言的并发瓶颈php语言的并发瓶颈 系统并发 参考:https://juejin.cn/post/6844904025553534990 提到并发,我们这里指的是web服务web系统的第一层&…

Apache JMeter 3.1压力测试监控服务器数据(cpu、内存、磁盘io等)

Apache JMeter 3.1压力测试 Apache JMeter 3.1压力测试监控cpu、内存情况1.下载Apache JMeter 3.11.1 添加线程组1.2 添加http请求1.3 增加http请求头设置1.4 添加csv配置1.5 添加测试结果监控配置 2. 监控插件下载3. 服务端插件下载并启动3.1 下载3.2 解压并启动3.3 增加服务器…

渗透测试之Kali如何利用CVE-2019-0708漏洞渗透Win7

环境: 1.攻击者IP:192.168.1.10 系统: KALI2022(vmware 16.0) 2.靶机IP:192.168.1.8 系统:Windows 7 6.1.7601 Service Pack 1 Build 7601 已开启远程协助RDP服务开启了3389端口 问题描述: KALI 如何利用CVE-2019-0708漏洞渗透Win7 解决方案: 1.打开kali,msf搜索…

【每周AI简讯】GPT-5将有指数级提升,GPT Store正式上线

AI7 - Chat中文版最强人工智能 OpenAI的CEO奥特曼表示GPT-5将有指数级提升 GPT奥特曼参加Y-Combinator W24启动会上表示,我们已经非常接近AGI。GPT-5将具有更好的推理能力、更高的准确性和视频支持。 GPT Store正式上线 OpenAI正式推出GPT store,目前…

​一套uni-app + .net医院线上预约挂号系统源码(公众号+小程序预约挂号)

线上预约挂号系统构建了医院和患者的连接,通过改善患者院内的就医服务流程,以微信公众号、支付宝小程序为患者服务入口,为居民提供导诊、预约、支付、报告查询等线上线下一体化的就医服务,缩短患者就诊环节,提高医疗机…

springboot第50集:File类,IO流,网络编程,反射机制周刊

image.png FileReader、FileWriter的使用 FileInputStream、FileOutputStream的使用 image.png image.png image.png image.png image.png image.png image.png image.png image.png image.png image.png image.png 服务器内存优化是一个复杂的过程,通常需要综合考虑…

深入了解 Pytest Markers:提升测试用例的组织和控制能力

​从这篇开始,逐一解决fixture是啥?mark是啥?参数request是啥?钩子函数是啥?parametrize参数化是啥?这些问题。本片先介绍一下mark是啥?以及如何使用 Markers有啥用? 当使用 Pytest…

ZooKeeper 实战(五) Curator实现分布式锁

文章目录 ZooKeeper 实战(五) Curator实现分布式锁1.简介1.1.分布式锁概念1.2.Curator 分布式锁的实现方式1.3.分布式锁接口 2.准备工作3.分布式可重入锁3.1.锁对象3.2.非重入式抢占锁测试代码输出日志 3.3.重入式抢占锁测试代码输出日志 4.分布式非可重入锁4.1.锁对象4.2.重入…

SAP PI之Rest adapter

一,简介 REST风格接口是以http为传输协议,以xml或json或text为有效负载。下图展示了REST到XI再返回的一个过程,一个REST接口包含的信息有:服务URL、URL中带的参数、http方法(post/get/put等)、http头部、body部分的有效载荷。而X…

Sentinel限流、熔断

1、限流 单个服务节点限流 sentinel 提供了两种不同的隔离机制:信号量隔离和线程池隔离,它们的主要区别如下: 信号量隔离(Semaphore Isolation): 原理:信号量隔离基于计数器(或称令…

域名群站开源系统分享开源域名授权系统

一、需要自己安装PHP和MYSQL服务器环境。 二、务必设置伪静态规则,否则将无法访问文章栏目页面。 三、启用伪静态功能,请在站点设置中选择使用thinkphp的伪静态规则。 四、在域名的根目录下找到”data/config.php”文件,填入数据库的账号和…

配置zabbix监控平台

目录 内容纯手敲,难免有误,若发现请私信我。 配置zabbix监控平台 一、进入官网 ​编辑​ 二、配置zabbix-server(服务端) 1.下载zabbix的yum源 2.安装Zabbix服务器、前端、代理 3.安装Zabbix前端 4.编辑文件/etc/yum.rep…

openssl3.2 - quic服务的运行

文章目录 openssl3.2 - quic服务的运行概述笔记运行openssl编译好的quic服务程序todo - 如果自己编译quic服务工程补充 - 超过30秒不连接uqic服务会退出END openssl3.2 - quic服务的运行 概述 在看 官方 guide目录下的工程. 都是客户端程序, 其中有quic客户端, 需要运行quic服…

【算法Hot100系列】旋转图像

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学习,不断总结,共同进步,活到老学到老导航 檀越剑指大厂系列:全面总结 jav…

软件测试|python如何去除文件后缀名?

简介 在Python中,我们常常需要操作文件,包括文件的读取、写入、重命名等操作。在文件操作中,我们经常会遇到需要去除文件后缀的问题。那么,Python如何去除文件后缀呢?本文我们将介绍如何使用Python来去除文件后缀。 …

大模型学习与实践笔记(六)

一、finetune 简介 两种微调模式:增量预训练 与指令跟随 1.增量预训练 2.指令微调 二、LoRA 与 QLoRA 介绍 三、XTuner 介绍 四、低显存玩转LLM的方法

Linux网络之PXE高效批量装机、Kickstart全自动化安装

一. PXE网络装机简介和相关知识 1. 常见的三种系统安装方式和相关文件 ① 三种系统安装方式 u启动安装:在U盘中下载相关的安装系统及镜像文件,u盘插机安装 光驱安装:将带有所需系统的光盘放进电脑服务器中,按照官方引导装机 …

春节假期出游一些很实用的手机技巧!这样玩,就很哇塞~

随着春节的脚步越来越近,无论是准备出游还是回家,你蠢蠢欲动的心是否已经拦不住了?华为 nova 12系列这些很哇塞的玩法你必须知道!这个新年让你旅行出圈有秘籍! 出发前智慧播报航班信息不错过。智慧播报的功能就很实…

AI大模型学习笔记之二:什么是 AI 大模型的训练和推理?

在人工智能(AI)的领域中,我们经常听到训练(Training) 和 推理(Inference) 这两个词汇,它们是构建强大 AI 模型的关键步骤。我们通过类比人类的学习过程来理解这两个概念,可以更加自然而生动地理…

Servlet中访问网页常遇到的问题

网页出现404 出现这一种情况是浏览器访问的资源不存在 第一种情况通常是路径出错请检查你的路径是否一致 第二种情况确认你的webapp是否被正确加载 smart tomcat由于只加载一个webapp 如果加载失败 就会直接启动失败 拷贝war方式到Tomcat要加载多个webapp如果失败只有日志 查…