kafka的位移

文章目录

    • 概要
    • 消费位移
    • __consumer_offsets主题
    • 位移提交

概要

本文主要总结kafka的位移是如何管理的,在broker端如何通过命令行查看到位移信息,并从代码层面总结了位移的提交方式。

消费位移

对于 Kafka 中的分区而言,它的每条消息都有唯一offset ,用来表示消息在分区中对应位置;对于消费者来说,它也有 offset 的概念,消费者使用 offse 来表示消费到分区中某个消息所在的位置。可通过命令行在查看到一个群组,在topic中两者当前的位置
bin/kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group kafka-boot

[root@node1 kafka_2.13-3.2.1]# bin/kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe  --group kafka-bootConsumer group 'kafka-boot' has no active members.GROUP           TOPIC            PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
kafka-boot      test-error-topic 0          26              26              0               -               -               -
kafka-boot      normal-test      0          23              24              1               -               -               -

这里对offse 做些区分 对于消息在分区中的位置 CURRENT-OFFSET称为“偏移量” 或消息位移;对于消费者消费到的位置,LOG-END-OFFSET称为“位移 ,有时候也会更明确地称之为“消费位移“。

生产者位移跟消费者位移的关系可以用下图来说明:
在这里插入图片描述

总结几个需要注意的点:

  • 分区副本有两种类型
    领导者副本:生产者跟消费者的请求都只会经过领导者副本
    跟随者副本:首领之外的副本,不处理客户端请求,从领导者副本那里通过拉取的方式同步消息
  • 消费位移存储在Zookeeper或Kafka中,新消费者客户端,偏移量存储咋Kafka内部主题 __consumer_offsets
  • 消费者提交的位移是当前消费消息位移的下一个位置,即:lastConsumeedOffset+1

__consumer_offsets主题

Consumer需要向Kafka记录自己的位移数据,这个汇报过程称为提交位移(Committing Offsets)。

老版本 Consumer 的位移是提交到 ZooKeeper 中保存的。当 Consumer 重启后,它能自动从 ZooKeeper 中读取位移数据,从而在上次消费截止的地方继续消费。这种设计使得Kafka Broker 不需要保存位移数据,减少了 Broker 端需要持有的状态空间,因而有利于实现高伸缩性。

但是,ZooKeeper 其实并不适用于这种高频的写操作,Kafka 社区自 0.8.2.x 版本开始推出了全新的位移管理
机制,将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。可以这么说,
__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。这种方式能够满足高频的写操作。

两个相关参数:
offsets.topic.num.partitions : 设置 __consumer_offsets主题的分区数,默认是50个分区
offsets.topic.replication.factor : 设置__consumer_offsets主题的副本数,默认是3(下载安装的包中此值可能为1 )

当Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题

一共有50个分区,那么消费者将位移提交到了哪个分区呢?

通过如下公式可以选出consumer消费的offset要提交到__consumer_offsets的哪个分区,这个分区leader对应的broker
就是这个consumer group的coordinator
公式:Math.abs(groupID.hashCode()) % numPartitions

Kafka 1.0.2及以后提供了kafka_consumer_groups.sh脚本供用户查看consumer信息

1. 创建一个topic,分区数设置为1,副本数设置为1

[root@node1 kafka_2.13-3.2.1]# bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic test-offset --partitions 1 --replication-factor 1
Created topic test-offset.[root@node1 kafka_2.13-3.2.1]# bin/kafka-topics.sh --bootstrap-server node1:9092 --describe --topic test-offset
Topic: test-offset      TopicId: in6gxQ5OQS6x9R8V3oJ7AQ PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824Topic: test-offset      Partition: 0    Leader: 0       Replicas: 0     Isr: 0

2. 向主题test-offset中发送消息

[root@node1 kafka_2.13-3.2.1]# bin/kafka-console-producer.sh --broker-list node1:9092 --topic test-offset
>hello

3. 创建一个消费组,并从头开始消费

[root@node1 kafka_2.13-3.2.1]# bin/kafka-console-consumer.sh --bootstrap-server node1:9092  --from-beginning --consumer-property group.id=testOffsetGroup   --topic test-offset
hello

4. 用代码根据上面的公式计算消费组testOffsetGroup提交位移的分区数

@Test
void getCommitOffsetPartitionTest() {String groupId = "testOffsetGroup";// 运行结果为16System.out.println(Math.abs(groupId.hashCode() % 50));
}
  1. 将kafka配置文件consumer.properties中设置exclude.internal.topics=false,并重启服务
    6. 查看主题__consumer_offsets第16分区上的信息,可以看到消费组testOffsetGroup提交的位移确实保存在了16分区上

[root@node1 kafka_2.13-3.2.1]# bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 16 --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896116191, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896121189, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896126188, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896131188, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896133573, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896162124, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896167124, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896172123, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896177124, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896178781, expireTimestamp=None)

从上面也可看出__consumer_offsets topic的每一日志项的格式都是:
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

客户端提交消费位移是使用OffsetCommitRequest 请求实现的,其结构如下
在这里插入图片描述

__consumer_offsets这个主题中的消息格式为KV对,key为[Group, Topic, Partition],value可以简单理解为记录了偏移量;这样的记录方式,使得broker端不需要关系group下有多少个消费者,新增消费者或者减少消费者发生重平衡时,都能准确地定位到对应地分区应该从哪个位置开始消费。

位移提交

鉴于位移提交甚至是位移管理对 Consumer 端的巨大影响,Kafka,特别是KafkaConsumer API,提供了多种提交位移的方法。从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。

自动提交

自动提交,就是指 Kafka Consumer 在后台默默地为你提交位移
两个重要的参数

  • enable.auto.commit设置是否自动提交位移,默认是true
  • auto.commit.interval.ms:设置自动提交为true时,该参数生效,标识多久提交一次位移,默认5s,
public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");// 设置偏移量自动提交。自动提交是默认值。这里做示例。configs.put("enable.auto.commit", "true");// 偏移量自动提交的时间间隔configs.put("auto.commit.interval.ms", "2000");KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(configs);consumer.subscribe(Collections.singleton("tp_demo_01"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println(record.topic()+ "\t" + record.partition()+ "\t" + record.offset()+ "\t" + record.key()+ "\t" + record.value());}}}

设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但是会出现消息重复消费

在默认情况下,Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。

手动同步提交

开启手动提交位移的方法就是设置enable.auto.commit 为 false。但是,仅仅设置它为 false 还不够,因为你只是告诉
Kafka Consumer 不要自动提交位移而已,你还需要调用相应的 API 手动提交位移。

public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(configs);consumer.subscribe(Collections.singleton("tp_demo_01"));while (true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息try {consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常}}
}

调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束,这样就会影响TPS。

鉴于此问题,还有另外一个提交方式

手动异步提交

public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(configs);while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));process(records); // 处理消息consumer.commitAsync((offsets, exception) -> {if (exception != null) {handle(exception);}});}
}

commitAsync 是否能够替代 commitSync 呢?答案是不能。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

是手动提交,需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果,原因有两个:

  1. 利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。这些问题都是短暂的,自动重试通常都会成功。
  2. 不希望程序总处于阻塞状态,影响 TPS。
public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(configs);consumer.subscribe(Collections.singleton("tp_demo_01"));try {while (true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));consumer.commitAsync();process(records); // 处理消息consumer.commitAsync(); // 异步提交}} catch (Exception e) {handle(e); // 处理异常} finally {try {consumer.commitSync();// 最后一次提交使用同步阻塞式提交} finally {consumer.close();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/41630.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

0基础学习VR全景平台篇 第86篇:智慧眼-为什么要设置分组选择?

一、功能说明 分组选择&#xff0c;也就是给全景的每个分组去设置其所属的行政区划&#xff0c;设置后只有属于同行政区划的成员才可进入其场景进行相关操作&#xff0c;更便于实现城市的精细化管理。 二、后台编辑界面 分组名称&#xff1a;场景的分组名称。 对应分类&…

网络安全--linux下Nginx安装以及docker验证标签漏洞

目录 一、Nginx安装 二、docker验证标签漏洞 一、Nginx安装 1.首先创建Nginx的目录并进入&#xff1a; mkdir /soft && mkdir /soft/nginx/cd /soft/nginx/ 2.下载Nginx的安装包&#xff0c;可以通过FTP工具上传离线环境包&#xff0c;也可通过wget命令在线获取安装包…

【数据结构与算法】队列

文章目录 一&#xff1a;队列1.1 队列的概念1.2 队列的介绍1.3 队列示意图 二&#xff1a;数组模拟队列2.1 介绍2.2 思路2.3 代码实现2.3.1 定义队列基本信息2.3.2 初始化队列2.3.3 判断队列是否满&#xff0c;是否为空2.3.4 添加数据到队列2.3.5 获取队列数据&#xff0c;出队…

图数据库_Neo4j和SpringBoot整合使用_创建节点_删除节点_创建关系_使用CQL操作图谱---Neo4j图数据库工作笔记0009

首先需要引入依赖 springboot提供了一个spring data neo4j来操作 neo4j 可以看到它的架构 这个是下载下来的jar包来看看 有很多cypher对吧 可以看到就是通过封装的驱动来操作graph database 然后开始弄一下 首先添加依赖

【实用黑科技】如何 把b站的缓存视频弄到本地——数据恢复软件WinHex 和 音视频转码程序FFmpeg

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;效率…

Baumer工业相机堡盟工业相机如何通过BGAPISDK设置相机的固定帧率(C#)

Baumer工业相机堡盟工业相机如何通过BGAPI SDK设置相机的固定帧率&#xff08;C#&#xff09; Baumer工业相机Baumer工业相机的固定帧率功能的技术背景CameraExplorer如何查看相机固定帧率功能在BGAPI SDK里通过函数设置相机固定帧率 Baumer工业相机通过BGAPI SDK设置相机固定帧…

蓝牙资讯|中国智能家居前景广阔,蓝牙Mesh照明持续火爆

据俄罗斯卫星通讯社报道&#xff0c;中国已成为全球最大的智能家居消费国&#xff0c;占全球50%—60%的市场份额。未来&#xff0c;随着人工智能技术的发展以及智能家居生态的不断进步&#xff0c;智能家居在中国的渗透率将加速提升。德国斯塔蒂斯塔调查公司数据显示&#xff0…

win10系统docker创建ubuntu容器解决开发环境问题

一、win10系统使用docker的原因 最近啊&#xff0c;在学习人工智能-深度学习&#xff0c;用的win10系统进行开发&#xff0c;老是出现一些莫名其妙的问题&#xff0c;无法解决&#xff0c;每天都在为环境问题搞得伤透了脑筋。 说到底还是要使用Linux系统进行开发比较合适。 …

【MT32F006】MT32F006之HT1628驱动LED

本文最后修改时间&#xff1a;2023年03月30日 一、本节简介 本文介绍如何使用MT32F006连接HT1628芯片驱动LED。 二、实验平台 库版本&#xff1a;V1.0.0 编译软件&#xff1a;MDK5.37 硬件平台&#xff1a;MT32F006开发板&#xff08;主芯片MT32F006&#xff09; 仿真器&a…

LeetCode算法心得——限制条件下元素之间的最小绝对差(TreeSet)

大家好&#xff0c;我是晴天学长&#xff0c;今天用到了Java一个非常实用的类TreeSet&#xff0c;能解决一些看起来棘手的问题。 1 &#xff09;限制条件下元素之间的最小绝对差 2) .算法思路 初始化变量&#xff1a;n为列表nums的大小。 min为整型最大值&#xff0c;用于记录…

python3 0学习笔记之基本知识

0基础学习笔记之基础知识 &#x1f4da; 基础内容1. 条件语句 if - elif - else2. 错误铺捉try - except(一种保险策略&#xff09;3. 四种开发模式4. 函数&#xff1a;def用来定义函数的5. 最大值最小值函数&#xff0c;max &#xff0c;min6. is 严格的相等&#xff0c;is no…

机器学习:基本介绍

机器学习介绍 Hnad-crafted rules Hand-crafted rules&#xff0c;叫做人设定的规则。那假设今天要设计一个机器人&#xff0c;可以帮忙打开或关掉音乐&#xff0c;那做法可能是这样&#xff1a; 设立一条规则&#xff0c;就是写一段程序。如果输入的句子里面看到**“turn of…

C#__使用Type类反射数据的基本用法

// 简单介绍 // 元数据&#xff08;metadata&#xff09;&#xff1a;与程序及其类型有关的数据。 // 反射&#xff1a;一个运行的程序查看本身元数据或其他程序集中的元数据的行为 // Assembly类&#xff1a;允许访问给定程序集的元数据&#xff0c;包含了可以加载和执行程序…

【C# 基础精讲】文件读取和写入

文件读取和写入是计算机程序中常见的操作&#xff0c;用于从文件中读取数据或将数据写入文件。在C#中&#xff0c;使用System.IO命名空间中的类来进行文件读写操作。本文将详细介绍如何在C#中进行文件读取和写入&#xff0c;包括读取文本文件、写入文本文件、读取二进制文件和写…

选择大型语言模型自定义技术

推荐&#xff1a;使用 NSDT场景编辑器 助你快速搭建可二次编辑器的3D应用场景 企业需要自定义模型来根据其特定用例和领域知识定制语言处理功能。自定义LLM使企业能够在特定的行业或组织环境中更高效&#xff0c;更准确地生成和理解文本。 自定义模型使企业能够创建符合其品牌…

BDA初级分析——认识SQL,认识基础语法

一、认识SQL SQL作为实用技能&#xff0c;热度高、应用广泛 在对数据分析人员的调查中SQL长期作为热度排名第-一的编程语言超过Python和R SQL&#xff1a;易学易用&#xff0c;高效强大的语言 SQL&#xff1a;Structured Query Language 结构化查询语言 SQL&#xff1a;易学…

多维时序 | MATLAB实现WOA-CNN-BiGRU-Attention多变量时间序列预测

多维时序 | MATLAB实现WOA-CNN-BiGRU-Attention多变量时间序列预测 目录 多维时序 | MATLAB实现WOA-CNN-BiGRU-Attention多变量时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 多维时序 | MATLAB实现WOA-CNN-BiGRU-Attention多变量时间序列预测 1.程…

java 向上取整 java对小数取整

取整方法 Math.floor(double a) 向下取整 Math.ceil(double a) 向上取整 Math.round(double a) 四舍五入 0.5向下取整 Math.rint(double a) 就近取整 1.6接近2&#xff0c;所以就取2 1.4接近1&#xff0c;所以就取1 1.5跟1和2都很接近&#xff0c;这时候就取偶数 (int) 类型强转…

MongoDB:数据库初步应用

一.连接MongoDB 1.MongoDBCompass连接数据库 连接路径:mongodb://用户名:密码localhost:27017/ 2.创建数据库(集合) MongoDB中数据库被称为集合. MongoDBCompass连接后,点击红色框加号创建集合,点击蓝色框加号创建文档(数据表) 文档中的数据结构(相当于表中的列)设计不用管…

一个DW的计算

一个DW的计算 1- 题目: 已知一个DW1.1 要求: 从DW中取出指定的位的值1.1.1 分析1.1.2 实现1.1.3 简化实现1.1.4 验证 2- 题目: 已知一个DW2.1 要求: 从DW中的指定的P和S,取出指定的位的值2.1.1 分析2.1.2 实现 1- 题目: 已知一个DW 有图中所示一行信息&#xff0c;表示一个DW(…