kafka位移提交

目录

前言:

位移提交: 

小结:

参考资料 


前言:

     Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。

位移提交: 

     提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。

     因为位移提交非常灵活,你完全可以提交任何位移值,但由此产生的后果你也要一并承担。假设你的 Consumer 消费了 10 条消息,你提交的位移值却是 20,那么从理论上讲,位移介于 11~19 之间的消息是有可能丢失的;相反地,如果你提交的位移值是 5,那么位移介于 5~9 之间的消息就有可能被重复消费。所以,Kafka 只会“无脑”地接受你提交的位移。

     从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。 

     所谓自动提交,就是指 Kafka Consumer 在后台默默地为你提交位移,作为用户的你完全不必操心这些事;而手动提交,则是指你要自己提交位移,Kafka Consumer 压根不管。

   自动提交的代码示意如下:

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "2000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}

 和自动提交相反的,就是手动提交了。开启手动提交位移的方法就是设置 enable.auto.commit 为 false。但是,仅仅设置它为 false 还不够,因为你只是告诉 Kafka Consumer 不要自动提交位移而已,你还需要调用相应的 API 手动提交位移。

最简单的 API 就是 KafkaConsumer#commitSync()。该方法会提交 KafkaConsumer#poll() 返回的最新位移。从名字上来看,它是一个同步操作,即该方法会一直等待,直到位移被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。下面这段代码展示了 commitSync() 的使用方法:

while (true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息try {consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常}
}

可见,调用 consumer.commitSync() 方法的时机,是在你处理完了 poll() 方法返回的所有消息之后。如果你莽撞地过早提交了位移,就可能会出现消费数据丢失的情况。那么你可能会问,自动提交位移就不会出现消费数据丢失的情况了吗?它能恰到好处地把握时机进行位移提交吗?为了搞清楚这个问题,我们必须要深入地了解一下自动提交位移的顺序。

一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。

 在默认情况下,Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。

手动提交位移,它的好处就在于更加灵活,你完全能够把控位移提交的时机和频率。但是,它也有一个缺陷,就是在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。当然,你可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。 

 为了解决上述同步提交位移的问题,我们可以选择使用异步提交位移的方式KafkaConsumer#commitAsync()

 调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。下面这段代码展示了调用 commitAsync() 的方法:

while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息consumer.commitAsync((offsets, exception) -> {if (exception != null)handle(exception);});
}

 commitAsync 是否能够替代 commitSync 呢?答案是不能。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。

 显然,如果是手动提交,我们需要将 commitSync 和 commitAsync 组合使用才能达到最理想的效果,原因有两个:

  • 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
  • 我们不希望程序总处于阻塞状态,影响 TPS。 
   try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息commitAysnc(); // 使用异步提交规避阻塞}
} catch(Exception e) {handle(e); // 处理异常
} finally {try {consumer.commitSync(); // 最后一次提交使用同步阻塞式提交} finally {consumer.close();
}
}

这段代码同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性

 设想这样一个场景:你的 poll 方法返回的不是 500 条消息,而是 5000 条。那么,你肯定不想把这 5000 条消息都处理完之后再提交位移,因为一旦中间出现差错,之前处理的全部都要重来一遍。这类似于我们数据库中的事务处理。很多时候,我们希望将一个大事务分割成若干个小事务分别提交,这能够有效减少错误恢复的时间。

 就拿刚刚提过的那个例子来说,如何每处理 100 条消息就提交一次位移呢?在这里,我以 commitAsync 为例,展示一段代码,实际上,commitSync 的调用方法和它是一模一样的。

private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record: records) {process(record);  // 处理消息offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1);if(count % 100 == 0)consumer.commitAsync(offsets, null); // 回调处理逻辑是nullcount++;}
}

小结:

 Kafka Consumer 的位移提交,是实现 Consumer 端语义保障的重要手段。位移提交分为自动提交和手动提交,而手动提交又分为同步提交和异步提交。在实际使用过程中,推荐你使用手动提交机制,因为它更加可控,也更加灵活。另外,建议你同时采用同步提交和异步提交两种方式,这样既不影响 TPS,又支持自动重试,改善 Consumer 应用的高可用性

参考资料 

18 | Kafka中位移提交那些事儿-极客时间

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/76115.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何写http mjpeg server

目的 是为了让unity ue 等三维引擎直接读取mjpeg图像进行纹理贴图 使用qt&#xff0c;opencv等等&#xff0c;因为经常要进行图像处理 opencv 使用opencv 和QImage 来转换图像 QImage Widget::Mat2QImage(cv::Mat const& src) { cv::Mat temp; // make the same cv:…

时序分解 | MATLAB实现RIME-VMD霜冰优化算法优化VMD变分模态分解信号分量可视化

时序分解 | MATLAB实现RIME-VMD霜冰优化算法优化VMD变分模态分解信号分量可视化 目录 时序分解 | MATLAB实现RIME-VMD霜冰优化算法优化VMD变分模态分解信号分量可视化效果一览基本介绍程序设计参考资料 效果一览 基本介绍 RIME-VMD【23年新算法】霜冰优化算法优化VMD变分模态分…

TinTin Web3 动态精选:以太坊基金会推出 EELS、Arbitrum Stylus 上线

TinTin 快讯由 TinTinLand 开发者技术社区打造&#xff0c;旨在为开发者提供最新的 Web3 新闻、市场时讯和技术更新。TinTin 快讯将以周为单位&#xff0c; 汇集当周内的行业热点并以快讯的形式排列成文。掌握一手的技术资讯和市场动态&#xff0c;将有助于 TinTinLand 社区的开…

QML、C++ 和 JS 三者之间的交互

QML、C++ 和 JS 三者之间的交互是 Qt Quick 应用开发的核心。以下是它们之间交互的常见方式: 从 QML 调用 C++ 函数要从 QML 调用 C++ 函数,您可以使用 Qt 的 QML 注册机制,例如 qmlRegisterType,将 C++ 类注册为 QML 类型。 C++ 代码: #include <QGuiApplication>…

docker push image harbor http 镜像

前言 搭建的 harbor 仓库为 http 协议&#xff0c;在本地登录后&#xff0c;推送镜像发生如下报错&#xff1a; docker push 192.168.xx.xx/test/grafana:v10.1.1 The push refers to repository [192.168.xx.xx/test/grafana] Get "https://192.168.xx.xx/v2/": dia…

LeetCode 之 二分查找

网址&#xff1a; LeetCode 704.二分查找 算法模拟&#xff1a; Algorithm Visualizer 在线工具&#xff1a; C 在线工具 如果习惯性使用Visual Studio Code进行编译运行&#xff0c;需要C11特性的支持&#xff0c;可参考博客&#xff1a; VisualStudio Code 支持C11插件配…

如何获得一个Oracle 23c免费开发者版

获取23c开发者版 简单介绍可参考这里。 获取数据库可以参考这篇文章Introducing Oracle Database 23c Free – Developer Release或这里。 Docker Image 这是最快的方法。在OCI上创建一个计算实例&#xff0c;然后就可以拉取image使用了。 docker的安装和配置不赘述了。 …

html实现邮件模版布局-flex布局table布局-demo

邮件模版布局 flex - 布局简单方便 兼容性差 table - 优点 就是兼容性好&#xff0c;其他没有优点 注&#xff1a;使用图片需要png最好&#xff0c;使用svg图google邮箱会出现不能使用的情况 效果图 flex布局 <!DOCTYPE html> <html lang"en" xmlns:th&qu…

sql server事务隔离别 、 mysql 事务隔离级别、并发性问题

隔离级别和锁 SQL中 mysql 、Oracle 、sql server 等数据库 都是客户端和服务器架构的软件&#xff0c;对于同一个服务器来说&#xff0c;可以有若干个客户端与之连接&#xff0c;每个客户端与服务器连接上之后&#xff0c;就可以称为一个 【会话&#xff08;session&#xff0…

Linkstech多核并行仿真丨光伏发电系统模型及IEEE 39 bus模型多核并行实测

新能源场站和区域电网作为复杂且具有动态特性的大规模电力系统&#xff0c;需要实时仿真测试来验证其性能、稳定性和响应能力。在这种背景下&#xff0c;多核并行仿真运算显得尤为重要。多核并行仿真能够同时处理电力系统的复杂模型&#xff0c;加速仿真过程&#xff0c;实现接…

YOLOV7改进-具有隐式知识学习的Efficient解耦头

[解耦头][https://github.com/z1069614715/objectdetection_script/blob/master/yolo-improve/yolov7-DecoupledHead.py] 1、复制这些到yolo.py 2、到这 3、复制下半部分到yolo.py 4、替换这里 5、最后的加到上面的这里 6、添加 7、添加 8、V5大概一个点的提升 9、解…

Android Jetpack 中Hilt的使用

Hilt 是 Android 的依赖项注入库&#xff0c;可减少在项目中执行手动依赖项注入的样板代码。执行 手动依赖项注入 要求您手动构造每个类及其依赖项&#xff0c;并借助容器重复使用和管理依赖项。 Hilt 通过为项目中的每个 Android 类提供容器并自动管理其生命周期&#xff0c;…

uni-app(微信小程序)图片旋转放缩,文字绘制、海报绘制

总结一下&#xff1a; 要进行海报绘制离不开canvas&#xff0c;我们是先进行图片&#xff0c;文字的拖拽、旋转等操作 最后再对canvas进行绘制&#xff0c;完成海报绘制。 背景区域设置为 position: relative&#xff0c;方便图片在当前区域中拖动等处理。添加图片&#xff0…

MFC 更改控件的大小和位置

获取当前主窗体的位置rect CRect dlgNow;GetWindowRect(&dlgNow);获取某一个控件当前的位置 CRect rect;CButton* pBtn (CButton*)GetDlgItem(IDC_BUTTONXXX);//获取按钮控件pBtn->GetWindowRect(rect);CWnd* pWnd(CWnd*)GetDlgItem(IDC_EDITXXX);//其它控件&#xff0…

基于 Web HID API 的HID透传测试工具(纯前端)

前言 最近在搞HID透传 《STM32 USB使用记录&#xff1a;HID类设备&#xff08;后篇&#xff09;》 。 市面上的各种测试工具都或多或少存在问题&#xff0c;所以就自己写一个工具进行测试。目前来说纯前端方案编写这个工具应该是最方便的&#xff0c;这里放上相关代码。 项目…

MVVM中wpf设置控件是否可见

背景&#xff1a;某个页面基本一样&#xff0c;但是又有点不一样的地方&#xff0c;设置是否可见就可以实现页面的共用 样例Lable 步骤一&#xff1a;资源字典中添加转换器 <Window.Resources><ResourceDictionary><!--用来判断是否隐藏--><BooleanTo…

数据库实现学生管理系统

1.QT将数据库分为三个层次&#xff1a; 1> 数据库驱动层&#xff1a;QSqlDriver、QSqlDriverCreator、QSqlDriverCreatorBase、QSqlDriverPlugin 2> sql接口层&#xff1a;QSqlDatabase、QSqlQuery、QSqlRecord、QSqlError 3> 用户接口层&#xff1a;提供一些模型QSql…

linux非root安装特定版本的cuda

由于一些代码实现&#xff08;cuda写的外部扩展包&#xff09;对cuda版本要求比较高&#xff0c;因此&#xff0c;我在实验室linux系统下默认的cuda版本上&#xff0c;没办法编译扩展包。需要重新安装特定版本的cuda。 一. 首先&#xff0c;需要查看系统版本&#xff1a; lsb…

Python模板注入

概念 发生在使用模板引擎解析用户提供的输入时。模板注入漏洞可能导致攻击者能够执行恶意代码或访问未授权的数据。 模板引擎可以让&#xff08;网站&#xff09;程序实现界面与数据分离&#xff0c;业务代码与逻辑代码分离。即也拓宽了攻击面&#xff0c;注入到模板中的代码可…

LabVIEW利用人工神经网络辅助进行结冰检测

LabVIEW利用人工神经网络辅助进行结冰检测 结冰对各个领域构成重大威胁&#xff0c;包括但不限于航空航天和风力涡轮机行业。在起飞过程中&#xff0c;飞机机翼上轻微积冰会导致升力降低25%。研究报告称&#xff0c;涡轮叶片上的冰堆积可在19个月的运行时间内造成29MWh的功率损…