Kafka 位移

Consumer位移管理机制

Consumer的位移数据作为一条条普通的Kafka消息,提交到__consumer_offsets中。可以这么说,__consumer_offsets的主要作用是保存Kafka消费者的位移信息。使用Kafka主题来保存位移。

消息格式

位移主题就是普通的Kafka主题。也是一个内部主题,但它的消息格式却是Kafka自己定义的KV对(Key和Value分别表示消息的键值和消息体),用户不能修改,Kafka Consumer有API去提交位移,也就是向位移主题写消息。不要自己写个Producer随意向该主题发送消息。

主题消息的Key中应该保存标识Consumer的字段,也就是Consumer GroupGroup ID,标识唯一的Consumer Group,因为Consumer提交位移是在分区层面上进行的,即它提交的是某个或某些分区的位移,那么很显然,Key中还应该保存 Consumer要提交位移的分区

总结:位移主题的Key中应该保存3部分内容:<Group ID,主题名,分区号>

还有2种格式:

        1. 用于保存Consumer Group信息的消息,用来注册Consumer Group

        2. tombstone消息,即墓碑消息,也称delete mark:用于删除Group过期位移甚至是删除Group的消息。

位移主题的创建

当Kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题。

分区数是怎么设置的呢?这就要看Broker端参数offsets.topic.num.partitions的取值了。它的默认值是50,因此Kafka会自动创建一个50分区的位移主题。Broker端另一个参数offsets.topic.replication.factor 控制副本数,默认为3。所以:如果位移主题是Kafka自动创建的,那么该主题的分区数是50,副本数是3。

提交位移(Committing Offsets)

Consumer需要向Kafka汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。当Consumer发生故障重启之后,就能够从Kafka中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍

从用户的角度来说,位移提交分为自动提交手动提交从Consumer端的角度来说,位移提交分为同步提交异步提交

Kafka Consumer提交位移的方式有两种:自动提交位移手动提交位移

手动提交位移

enable.auto.commit 如果值是false,则为手动提交,它能够把控位移提交的时机和频率可以使用Kafka Consumer API的consumer.commitSync等方法,当调用这些方法时,Kafka会向位移主题写入相应的消息。

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
}

调用consumer.commitSync()方法的时机,是在处理完了poll()方法返回的所有消息之后。如果过早提交了位移,就可能会出现消费数据丢失的情况。它还也有一个缺陷,就是在调用commitSync()时,Consumer程序会处于阻塞状态,直到远端的Broker返回提交结果,这个状态才会结束,影响整个应用程序的TPS。

Kafka社区为手动提交位移提供了另一个API方法:KafkaConsumer#commitAsync() ,这是一个异步操作。调用commitAsync()之后,它会立即返回,不会阻塞,因此不会影响Consumer应用的TPS。由于它是异步的,Kafka提供了回调函数(callback),在实现提交之后的逻辑,比如记录日志或处理异常等。下面这段代码展示了调用commitAsync()的方法:

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception);
});
}

commitAsync是否能够替代commitSync呢?

        答案是不能。commitAsync的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过 期”或不是最新值了。因此,异步提交的重试其实没有意义,所以commitAsync是不会重试的。 

将commitSync和commitAsync组合使用

try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息commitAysnc(); // 使用异步提交规避阻塞}
} catch(Exception e) {handle(e); // 处理异常
} finally {try {consumer.commitSync(); // 最后一次提交使用同步阻塞式提交} finally {consumer.close();}
}

对于常规性、阶段性的手动提交,我们调用commitAsync()避免程序阻塞,而在Consumer要关闭前,我们调用commitSync()方法执行同步阻塞式的位移提交,以确保Consumer关闭前能够保存正确的位移数据。将两者结合后,既实现了异步无阻塞式的位移管理,也确保了Consumer位移的正确性。

分批处理(细粒度的位移提交)

        commitSync(Map<TopicPartition, OffsetAndMetadata>)

         commitAsync(Map<TopicPartition, OffsetAndMetadata>)

它们的参数是一个Map对象,键就 是TopicPartition,即消费的分区,而值是一个OffsetAndMetadata对象,保存的主要是位移数据。

例如:如何每处理100条消息就提交一次位移呢?以commitAsync为例,展示一段代码,实际上,commitSync的调用方法和它是一模一样的。

private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
// 其他操作
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record: records) {process(record); // 处理消息offsets.put(new TopicPartition(record.topic(), record.partition()) , new OffsetAndMetadata(record.offset() + 1);if(count % 100 == 0)consumer.commitAsync(offsets, null); // 回调处理逻辑是nullcount++;}}
}

程序先是创建了一个Map对象,用于保存Consumer消费处理过程中要提交的分区位移,之后开始逐条处理消息,并构造要提交的位移值。要提交下一条消息的位移,这里构造OffsetAndMetadata对象时,使用当前消息位移加1的原因。代码的最后部分是做位移的提交。这里设置了一个计数器,每累计100条消息就统一提交一次位移。与调用无参的 commitAsync不同,这里调用了带Map对象参数的commitAsync进行细粒度的位移提交。这样,这段代码就能够实现每处理100条消息就提交一次位移,不用再受poll方法返回的消息总数的限制了。 

自动提交位移

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

Consumer端有个参数叫enable.auto.commit,如果值是true,则Consumer 定期提交位移,提交间隔由一个专属的参数auto.commit.interval.ms来控制。但是没法把控Consumer端的位移管理。

 一旦设置了enable.auto.commit为true,Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息。从顺序上来说,poll方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。

在默认情况下,Consumer每5秒自动提交一次位移。现在,我们假设提交位移之后的3秒发生了Rebalance操作。在Rebalance之后,所有Consumer从上一次提交的位移处继续消费但该位移已经是3秒前的位移数据了,故在Rebalance发生前3秒消费的所有数据都要重新再消费一次。虽然能够通过减少auto.commit.interval.ms的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。 

自动提交位移问题:

自动提交位移,那么就可能存在一个问题:只要Consumer一直启动着,它就会无限期地向位移主题写入消息。

假设Consumer当前消费到了某个主题的最新一条消息,位移是100,之后该主题没有任何新消息产生,故Consumer无消息可消费了,所以位移永远保持在100。由于是自动提交位移位移主题中会不停地写入位移=100的消息。显然Kafka只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。这就要求Kafka必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。

Kafka使用Compact策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义Compact策略中的过期呢?对于同一个Key的两条消息M1M2,如果M1的发送时间早于 M2,那么M1就是过期消息。Compact的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起

图中位移为0、2和3的消息的Key都是K1。Compact之后,分区只需要保存位移为3的消息,因为它是最新发送的。 

Kafka提供了专门的后台线程定期地巡检待Compact的主题,看看是否存在满足条件的可删除数据。这个后台线程叫LogCleaner

参考:Kafka 核心技术与实战 (geekbang.org)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/864352.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HCIE实验这样玩太高级了吧?实现FRR+BFD+OSPF与BGP的联动

号主&#xff1a;老杨丨11年资深网络工程师&#xff0c;更多网工提升干货&#xff0c;请关注公众号&#xff1a;网络工程师俱乐部 晚上好&#xff0c;我的网工朋友。 今天搞个HCIE实验玩玩&#xff0c;上回分享了个张总讲解的防火墙配置实验思路&#xff0c;后来还特地搞了个视…

GPT-4o文科成绩超一本线,理科为何表现不佳?

目录 01 评测榜单 02 实际效果 什么&#xff1f;许多大模型的文科成绩竟然超过了一本线&#xff0c;还是在竞争激烈的河南省&#xff1f; 没错&#xff0c;最近有一项大模型“高考大摸底”评测引起了广泛关注。 河南高考文科今年的一本线是521分&#xff0c;根据这项评测&…

【TB作品】打地鼠游戏,ATMEGA16单片机,Proteus仿真 打地鼠游戏

11个按键LCD1602显示器9个灯蜂鸣器打地鼠小游戏就是九个灯泡&#xff0c;对应九个按键&#xff0c;灯泡有红黄蓝&#xff0c;每间隔一会儿就会亮一个灯&#xff0c;代表地鼠冒出来&#xff0c;按一下按键让灯泡灭掉代表打地鼠&#xff0c;红的三分&#xff0c;黄的两分&#xf…

一句话介绍什么是AI智能体?

什么是AI智能体&#xff1f; 一句话说就是利用各种AI的功能的api组合&#xff0c;完成你想要的结果。 例如你希望完成一个关于主题为啤酒主题的小红书文案图片&#xff0c;那么它就可以完成 前面几个步骤类似automa的组件&#xff0c;最后生成一个结果。

IT专业入门——高考假期预习指南,我来做你的引路人

目录 认识IT知识体系 什么是计算机 按规模、速度和功能分类 按照其工作模式分类 硬件 操作系统 编程语言 对学习语言的一点建议 对于学python的一点看法 网络 数据结构与算法 数据库 Web开发 Web前端 Web后端 基础预习指南 技术路线学习一览 学习资源推荐 刷…

开放式耳机哪个品牌最好?2024精选5款热门品牌,新手必看的开放式耳机指南!

最近想买开放式耳机&#xff0c;但面对众多品牌和型号&#xff0c;真的太难挑选了&#xff1f;别担心&#xff0c;作为耳机发烧友和测评专家&#xff0c;我为大家带来了几款热门开放式耳机的横向对比。从6个方面告诉大家怎么样去挑选开放式耳机&#xff0c;并且推荐了几款我觉得…

深度学习 --- stanford cs231学习笔记八(训练神经网络之dropout)

6&#xff0c;dropout 6&#xff0c;1 线性分类器中的正则化 在线性分类器中&#xff0c;我们提到过正则化&#xff0c;其目的就是为了防止过度拟合。例如&#xff0c;当我们要用一条curve去拟合一些散点的数据时&#xff0c;常常是不希望训练出来的curve过所有的点&#xff0c…

<电力行业> - 《第1课:电力行业的五大四小》

1 什么是电力行业的五大四小&#xff1f; 我们常说的电力行业的五大四小&#xff0c;指的是电力行业有实力的公司&#xff0c;分为&#xff1a;较强梯队的五大集团、较弱梯队的四小豪门。 五个实力雄厚的集团&#xff0c;分别是&#xff1a; 中国华能集团公司中国大唐集团公…

文件操作~

目录 1.为什么使用文件&#xff1f; 2.什么是文件&#xff1f; 2.1 程序文件 2.2 数据文件 2.3 文件名 3.⼆进制文件和文本文件&#xff1f; 4.文件的打开和关闭 4.1 流和标准流 4.1.1 流 4.1.2 标准流 4.2 文件指针 4.3 ⽂件的打开和关闭 5.文件的顺序读写 5.1 …

QT+winodow 代码适配调试总结(二)

已经好多年了&#xff0c; linux环境下不同版本的QT程序开发和部署&#xff0c;突然需要适配window环境程序调试&#xff0c;一堆大坑&#xff0c;还真是一个艰巨的任务&#xff0c;可是kpi下的任务计划&#xff0c;开始吧&#xff01;&#xff01; 1、首先我们自定义的动态库…

【PYTORCH,TENSORFLOW环境配置,安装,自用代码】

conda -V&#xff08;查看版本&#xff0c;这步不要也罢&#xff09; conda create -n test python3.7&#xff08;创建环境&#xff09; conda activate test&#xff08;激活&#xff09; conda env list&#xff08;查看自己的环境&#xff09; nvidia-smi&#xff08;查…

以太网电缆专家手册:掌握RJ45连接器压接的艺术与科学

在这个日新月异的数字时代&#xff0c;正确的连接方式至关重要&#xff0c;而RJ45连接器正是实现这一点的关键工具之一。无论您是在家中布置办公网络&#xff0c;还是在公司部署复杂的IT基础架构&#xff0c;或是进行任何需要设备间高效数据传输的活动&#xff0c;掌握如何正确…

【深度学习】调整加/减模型用于体育运动评估

摘要 一种基于因果关系的创新模型&#xff0c;名为调整加/减模型&#xff0c;用于精准量化个人在团队运动中的贡献。该模型基于明确的因果逻辑&#xff0c;将个体运动员的价值定义为&#xff1a;在假设情景下&#xff0c;用一名价值为零的球员替换该球员后&#xff0c;预期比赛…

获取onnx模型输入输出结构信息的3种方式:ONNX、onnxruntime、netron

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

每日Attention学习7——Frequency-Perception Module

模块出处 [link] [code] [ACM MM 23] Frequency Perception Network for Camouflaged Object Detection 模块名称 Frequency-Perception Module (FPM) 模块作用 获取频域信息&#xff0c;更好识别伪装对象 模块结构 模块代码 import torch import torch.nn as nn import to…

【Python函数编程实战】:从基础到进阶,打造代码复用利器

文章目录 &#x1f68b;前言&#x1f680;一、认识函数&#x1f308;二、函数定义❤️三、函数调用⭐四、实参与形参&#x1f4a5;1. 形式参数&#x1f6b2;2. 实际参数&#x1f525;1. 位置参数☔2. 关键字参数&#x1f3ac;3. 默认参数&#x1f525;4. 可变数量参数(不定长参…

Nomad Web 1.0.12还能这样

大家好&#xff0c;才是真的好。 前几天讲代码和开发&#xff0c;忽略了大家的真实感受&#xff0c;那就是不爱开。我也记起来我们很久没有讲Notes/Domino产品的更新&#xff0c;因为除了补丁程序外&#xff0c;确实没多少更新。 不过就在前两天&#xff0c;有一项产品得到了…

零成本、高效率:免费可视化工具的魅力所在

在如今这个数据驱动的时代&#xff0c;免费可视化工具越来越受到人们的欢迎。这些工具不仅降低了数据分析的门槛&#xff0c;还为用户提供了强大的功能和极高的灵活性&#xff0c;使得各行各业的人们都能够轻松地利用数据做出明智的决策。首先&#xff0c;免费可视化工具的零成…

【Qt】认识Qt界面Hello world小程序

一.认识Qt界面 1.左边栏 在编辑模式下&#xff0c;左边竖排的两个窗⼝叫做 "边栏" 。 ① 是项⽬⽂件管理窗⼝ ② 是打开⽂件列表窗⼝。 边栏⾥的窗⼝数⽬可以增加&#xff0c;边栏⼦窗⼝标题栏有⼀排⼩按钮&#xff0c;最右边的是关闭按钮&#xff0c;倒数第⼆个是 …

嵌入式学习——硬件(IIC、ADC)——day56

1. IIC 1.1 定义&#xff08;同步串行半双工通信总线&#xff09; IIC&#xff08;Inter-Integrated Circuit&#xff09;又称I2C&#xff0c;是是IICBus简称&#xff0c;所以中文应该叫集成电路总线。是飞利浦公司在1980年代为了让主板、嵌入式系统或手机用以连接低速周边设备…