分布式 - 消息队列Kafka:Kafka 消费者的消费位移

文章目录

      • 01. Kafka 分区位移
      • 02. Kafka 消费位移
      • 03. kafka 消费位移的作用
      • 04. Kafka 消费位移的提交
      • 05. kafka 消费位移的存储位置
      • 06. Kafka 消费位移与消费者提交的位移
      • 07. kafka 消费位移的提交时机
      • 08. Kafka 维护消费状态跟踪的方法

01. Kafka 分区位移

对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。偏移量从0开始,每个新消息的偏移量比前一个消息的偏移量大1。

每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一个生产者向一个空分区写入了 10 条消息,那么这 10 条消息的位移依次是 0、1、2、…、9。

02. Kafka 消费位移

对于kafka中的消费者而言,也有一个offset的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置。

消费位移(偏移量)是指消费者在消费分区中的消息时,记录的已经消费的消息的位移。消费者会定期地将已经消费的消息的位移提交到Kafka集群中,以便在下一次启动时从上次消费的位置继续消费。

每个消费者在消费消息的过程中必然需要有个字段记录它当前消费到了分区的哪个位置上,这个字段就是消费者位移(Consumer Offset)。注意,这和分区位移完全不是一个概念。“分区位移”表征的是分区内的消息位置,它是不变的,即一旦消息被成功写入到一个分区上,它的位移值就是固定的了。而消费者位移则不同,它可能是随时变化的,毕竟它是消费者消费进度的指示器嘛。另外每个消费者有着自己的消费者位移。

03. kafka 消费位移的作用

消费者位移(偏移量)是指消费者在消费分区中的消息时,记录的已经消费的消息的位移。它的作用主要有以下几个方面:

① 消费者可以通过记录偏移量来实现断点续传。当消费者下线或者重启时,它可以通过记录的偏移量来恢复之前的消费状态,从而避免重复消费已经处理过的消息。

② Kafka 通过偏移量来保证消息的顺序性。在同一个分区中,消息的顺序是有序的,消费者可以通过记录偏移量来保证消费的顺序性。

③ Kafka 还可以通过偏移量来实现消息的回溯。消费者可以通过指定偏移量来重新消费之前的消息,这在某些场景下非常有用,比如重新处理之前出现的错误。

总之,Kafka 消息偏移量是非常重要的一个概念,它可以帮助消费者实现断点续传、保证消息的顺序性以及实现消息的回溯等功能。

04. Kafka 消费位移的提交

消费者可以通过订阅一个或多个主题来拉取消息。当消费者调用 poll() 方法时,它会从 Kafka 集群中拉取一批消息,这些消息会被缓存在消费者的本地缓存中,等待消费者进一步处理。在消费者处理完这批消息后,它可以再次调用 poll() 方法来拉取下一批消息。如果消费者在处理消息时发生了错误,那么这批消息将会被重新拉取,直到消费者成功地处理它们为止。

因此每次调用poll()方法,它总是会返回还没有被消费者读取过的记录,这意味着我们可以追踪哪些记录是被群组里的哪个消费者读取过的。要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。再考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作,对于同一分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。

在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在Kafka内部的主题 __consumer_offsets 中。这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。

05. kafka 消费位移的存储位置

消费者默认将 offset 保存在Kafka一个内置的 topic 中,该 topic 为 __consumer_offsets。

[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --list
__consumer_offsets

消费者会向一个叫作 __consumer_offset 的内置主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么实际作用。但是,如果消费者发生崩溃或有新的消费者加入群组,则会触发再均衡。再均衡完成之后,每个消费者可能会被分配新的分区,而不是之前读取的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的位置继续读取消息。

消费 offset 案例:

① __consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。但是需要在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

② 创建主题 haha

[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --create --partitions 3 --replication-factor 2  --topic haha
Created topic test1.

③ 启动生产者生产数据:

[root@master01 kafka01]# bin/kafka-console-producer.sh --broker-list 10.65.132.2:9093 --topic haha
>hello,haha!
>你好,haha!
>

④ 启动消费者消费数据:

[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic haha --group group-haha --from-beginning
hello,haha!
你好,haha!

⑤ 查看消费者消费主题__consumer_offsets:

[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning# key是[消费者组,消费的主题,消费的分区],value中已经消费的消息在当前分区的offset+1
[group-haha,haha,2]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)
[group-haha,haha,1]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)
[group-haha,haha,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)

06. Kafka 消费位移与消费者提交的位移

如下图,x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x,不过当前消费者需要提交的消费位移并不是 x,而是 x+1,它表示下一条需要拉取的消息的位置。
在这里插入图片描述
如果使用自动提交或不指定提交的偏移量,那么将默认提交poll()返回的最后一个位置之后的偏移量,即提交比客户端从poll()返回的最后一个位置大1的偏移量。在进行手动提交或需要提交特定的偏移量时,一定要记住这一点。

07. kafka 消费位移的提交时机

当前一次 poll() 操作所拉取的消息集为[x+2,x+7],x+2代表上一次提交的消费位移,说明已经完成了x+1之前(包括x+1在内)的所有消息的消费,x+5表示当前正在处理的位置。
在这里插入图片描述
① 如果最后一次提交的偏移量大于客户端处理的最后一条消息的偏移量,那么处于两个偏移量之间的消息就会丢失:

如图,如果拉取到消息之后就进行了位移提交,即提交了x+8,那么当前消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+8开始的。也就是说,x+5至x+7之间的消息并未能被消费,如此便发生了消息丢失的现象。

② 如果最后一次提交的偏移量小于客户端处理的最后一条消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理:

如图,如果消费完所有拉取到的消息之后才进行位移提交,那么当消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+2开始的。也就是说,x+2至x+4之间的消息又重新消费了一遍,故而又发生了重复消费的现象。

08. Kafka 维护消费状态跟踪的方法

在Kafka中,消费者组可以通过消费者偏移量(consumer offset)来跟踪它们在分区中消费的消息。消费者偏移量是一个整数,表示消费者已经成功读取的消息的位置。当消费者读取消息时,它会将偏移量保存在内存中,以便在下一次读取消息时能够从正确的位置开始读取。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/39962.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sql server 存储过程 set ansi_nulls set quoted_identifier,out 、output

SQL-92 标准要求在对空值(NULL) 进行等于 () 或不等于 (<>) 比较时取值为 FALSE。 当 SET ANSI_NULLS 为 ON 时&#xff0c;即使 column_name 中包含空值&#xff0c;使用 WHERE column_name NULL 的 SELECT 语句仍返回零行。即使 column_name 中包含非空值&#xff0c…

5G无人露天矿山解决方案

1、5G无人露天矿山解决方案背景 ①2010.10&#xff0c;国家安监总局《金属非金属地下矿山安全避险“六大系统”安装使用和监督检查暂行规定》 ②2016.03&#xff0c;国家发改委《能源技术革命创新行动计划&#xff08;2016-2030&#xff09;》&#xff0c;2025 年重点煤矿区采…

每天一道leetcode:1192. 查找集群内的关键连接(图论困难tarjan算法)

今日份题目&#xff1a; 力扣数据中心有 n 台服务器&#xff0c;分别按从 0 到 n-1 的方式进行了编号。它们之间以 服务器到服务器 的形式相互连接组成了一个内部集群&#xff0c;连接是无向的。用 connections 表示集群网络&#xff0c;connections[i] [a, b] 表示服务器 a …

Quivr 基于GPT和开源LLMs构建本地知识库 (更新篇)

一、前言 自从大模型被炒的越来越火之后&#xff0c;似乎国内涌现出很多希望基于大模型构建本地知识库的需求&#xff0c;大概在5月底的时候&#xff0c;当时Quivr发布了第一个0.0.1版本&#xff0c;第一个版本仅仅只是使用LangChain技术结合OpenAI的GPT模型实现了一个最基本的…

升级STM32电机PID速度闭环编程:从F1到F4的移植技巧与实例解析

引言&#xff1a; 在嵌入式系统开发中&#xff0c;STM32系列微控制器广泛应用于各种应用领域。而对于直流有刷电机的控制&#xff0c;PID速度闭环是一种常用的控制方式。本文将以此为例&#xff0c;探讨如何从STM32F1系列移植到STM32F4系列&#xff0c;并详细介绍HAL库在不同型…

Python学习笔记_基础篇(十)_socket编程

本章内容 1、socket 2、IO多路复用 3、socketserver Socket socket起源于Unix&#xff0c;而Unix/Linux基本哲学之一就是“一切皆文件”&#xff0c;对于文件用【打开】【读写】【关闭】模式来操作。socket就是该模式的一个实现&#xff0c;socket即是一种特殊的文件&…

Linux安装Docker

一、Docker系统版本介绍 Docker 是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的容器中&#xff0c;然后发布到任何流行的 Linux 或 Windows 操作系统的机器上&#xff0c;也可以实现虚拟化。 容器是完全使用沙箱机制&#xff0c;相…

诚迈科技荣膺小米“最佳供应商奖”

近日&#xff0c;诚迈科技受邀参加小米战略合作伙伴HBR总结会。诚迈科技以尽职尽责的合作态度、精益求精的交付质量荣膺小米公司颁发的最佳供应商奖&#xff0c;其性能测试团队荣获优秀团队奖。 诚迈科技与小米在手机终端方向一直保持着密切的合作关系&#xff0c;涉及系统框架…

【Java基础】Java对象的生命周期

【Java基础】Java对象的生命周期 一、概述 一个类通过编译器将一个Java文件编译为Class字节码文件&#xff0c;然后通过JVM中的解释器编译成不同操作系统的机器码。虽然操作系统不同&#xff0c;但是基于解释器的虚拟机是相同的。java类的生命周期就是指一个class文件加载到类…

python控制obs实现无缝切换场景!obs-websocket-py

前言 最近一直在研究孪生数字人wav2lip。目前成果可直接输入高清嘴型&#xff0c;2070显卡1分钟音频2.6分钟输出。在直播逻辑上可以做到1比1.3这样&#xff0c;所以现在开始研究直播。在逻辑上涉及到了无缝切换&#xff0c;看到csdn上有一篇文章还要vip解锁。。。那自己研究吧…

尚硅谷MySQL笔记 3-9

我不会记录的特别详细 大体框架 基本的Select语句运算符排序与分页多表查询单行函数聚合函数子查询 第三章 基本的SELECT语句 SQL分类 这个分类有很多种&#xff0c;大致了解下即可 DDL&#xff08;Data Definition Languages、数据定义语言&#xff09;&#xff0c;定义了…

项目难点:解决IOS调用起软键盘之后页面样式布局错乱问题

需求背景 &#xff1a; 开发了一个问卷系统重构项目&#xff0c;刚开始开发的为 PC 端&#xff0c;其中最头疼的一点无非就是 IE 浏览器的兼容适配性问题&#xff1b; 再之后项目经理要求开发移动端&#xff0c;简单的说就是写 H5 页面&#xff0c;到时候会内嵌在 App 应用、办…

multiple definition of......first defined here

一、背景 环境&#xff1a; 银河麒麟–ARM–GCC7.4.0 写了一个动态库&#xff0c;依赖opencv和freeImage等第三方库&#xff0c;用cmake进行编译。原本在centos6-x86-gcc7.5.0上面进行编译非常的顺利&#xff0c;但是拿到麒麟arm上面编译就提示了这个错误&#xff1a;这个报错…

Ruby软件外包开发语言特点

Ruby 是一种动态、开放源代码的编程语言&#xff0c;它注重简洁性和开发人员的幸福感。在许多方面都具有优点&#xff0c;但由于其动态类型和解释执行的特性&#xff0c;它可能不适合某些对性能和类型安全性要求较高的场景。下面和大家分享 Ruby 语言的一些主要特点以及适用的场…

【C语言】动态通讯录 -- 详解

⚪前言 前面详细介绍了静态版通讯录【C语言】静态通讯录 -- 详解_炫酷的伊莉娜的博客-CSDN博客&#xff0c;但是静态版通讯录的空间是无法被改变的&#xff0c;而且空间利用率也不高。为了解决静态通讯录这一缺点&#xff0c;这时就要有一个能够随着存入联系人数量的增加而增大…

Ansys Zemax | 手机镜头设计 - 第 1 部分:光学设计

本文是 3 篇系列文章的一部分&#xff0c;该系列文章将讨论智能手机镜头模组设计的挑战&#xff0c;从概念、设计到制造和结构变形的分析。本文是三部分系列的第一部分&#xff0c;将专注于OpticStudio中镜头模组的设计、分析和可制造性评估。&#xff08;联系我们获取文章附件…

安防监控视频云存储平台EasyNVR通道频繁离线的原因排查与解决

安防视频监控汇聚EasyNVR视频集中存储平台&#xff0c;是基于RTSP/Onvif协议的安防视频平台&#xff0c;可支持将接入的视频流进行全平台、全终端分发&#xff0c;分发的视频流包括RTSP、RTMP、HTTP-FLV、WS-FLV、HLS、WebRTC等格式。为了满足用户的集成与二次开发需求&#xf…

企业计算机服务器遭到了locked勒索病毒攻击如何解决,勒索病毒解密

网络技术的不断发展&#xff0c;也为网络安全埋下了隐患&#xff0c;近期&#xff0c;我们收到很多企业的求助&#xff0c;企业的计算机服务器遭到了locked勒索病毒的攻击&#xff0c;导致企业的财务系统内的所有数据被加密无法读取&#xff0c;严重影响了企业的正常运行。最近…

如何通过观测云的RUM找到前端加载的瓶颈--可观测性入门篇

声明与保证 本文写作于2023年6月&#xff0c;性能优化的评价标准和优化方式仅适用于当前观测云控制台&#xff0c;当然随着产品迭代及技术更新&#xff0c;本文也会应要求适当更新。 创建、修订时间创建修改人版本2023/6/24观测云***v1.0.0 1.网站性能评价的发展史&#xff…

打开vim的语法高亮

在一个Ubuntu中自带的vim版本是8.2.4919&#xff0c;默认就是开始了语法高亮的&#xff0c;打开一个Java文件效果如下&#xff1a; 它不仅仅对Java文件有语法高亮&#xff0c;对很多的文件都有&#xff0c;比如vim的配置文件也有语法高亮&#xff0c;有语法高亮时读起来会容易…