kafka:java client使用总结塈seek() VS commitSync()的区别(三)

最近一段日子接触了kafka这个消息系统,主要为了我的开源中间件项目simplemq增加kafka支持(基于kafka-client【java】),如今总算完成,本文是对这个过程中对kafka消息系统的使用总结

线程安全

关于线程安全,kafka-client的代码注释有明确说明,

KafkaProducer是线程安全的

The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
– from Java Comment of org.apache.kafka.clients.producer.KafkaProducer

也就是说在工程实践中,KafkaProducer实例可以使用单例模式。不需要为了发送一条消息而频繁创建KafkaProducer实例。

KafkaConsumer不是线程安全的

Multi-threaded Processing
The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
making the call. It is the responsibility of the user to ensure that multi-threaded access
is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}.
– from Java Comment of org.apache.kafka.clients.consumerKafkaConsumer

在工程实践中,如果希望对订阅的主题单独管理,那么对于订阅的每一个主题(topic)必须创建一个单独的KafkaConsumer实例负责接收消息。并且要注意对KafkaConsumer实例的多数方法也只能在消息接收线程中。

分区

KafkaConsumer.poll()方法返回拉取的消息对象迭代对象(Iterable),迭代元素类型为ConsumerRecord,从ConsumerRecord返回的字段可知包括了key,value,offset,partition,partition即为分区。
也就是说,如果topic有多个分区,那么每次摘取的一批消息可能是来自不同分区的。所以不能想当然认为每一批消息都是一个分区的。
每批次拉取的消息同一个分区的消息的消息偏移值都是连续的。即[33,34,35]这样的连续数字,
不同的分区的偏移值没有相关性

手动提交

创建KafkaConsumer实例时如果不指定enable.auto.commit参数为true,默认KafkaConsumer是自动提交的。
自动提交模式没啥好说的,不会存在重复消费和遗漏消息的问题。
如果要使用手动提交模式,调用方就要自己维护分区的偏移,以确保不会出现重复消费和遗漏消息问题。
本节讲述手动提交模式下,设计需要注意的问题

团进团出

团进团出是旅游行业的一个术语,即要求一个旅行团,整团出发入境时是多少人,返程出境时要一个不少的回来
在这里的意思就是手动提交模式下每次KafkaConsumer.poll()方法每次拉取一批消息(数量不等),处理完消息后,就要对这批消息进行手动提交处理。提交完成后,才能继续拉取下一批消息。不能在上一批消息还没有完成提交的时候,就调用KafkaConsumer.poll()方法拉取下一批消息。

所以如果你的项目中消息处理是异步的,那么一定要同步等待当前这批消息被处理完,才能再次执行KafkaConsumer.poll()方法拉取消息。

前面说过如果主题有多个分区,每批拉取的消息可能是来自不同分区的。
为方便举例,我们以如下格式表示收到的一条消息

0-100-true

消息由-号三段数字字母代表,

  • 第一段数字代表分区,
  • 第二段数字为偏移,
  • 最后的true/false代表该消息是否正确处理并提交确认,
    为true的需要提交,
    false则是因为各种原因处理失败不需要提交,希望下一轮拉取消息继续处理。

完整提交

如下面的分区0,如果一批消息中同一个分区的所有消息都被正确处理需要提交,那么它就是完整提交

[0-100-true,0-101-true,0-103-true]

如下调用 KafkaConsumer.commitSync方法就可以了。

/** 分区完整提交,提交偏移为最后一个偏移+1 */
// 分区0
TopicPartition topicPartition = new TopicPartition(topic_name, 0);
long lastTrueOffset = 103;
/** 提交的偏移指向最后偏移量的下一条记录 */
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(lastTrueOffset+1);
consumer.commitSync(Collections.singletonMap(topicPartition, offsetAndMetadata));

不完整提交

如下面的分区1,如果一批消息中同一个分区的消息有部分消息标记为false不能提交,那么它就是不完整提交。

[1-41-true,1-42-false,1-43-false,1-44-true]

对于不完整提交,我们只能从将第一个false之前的记录下次循环不用再处理,第一个false及之后的消息只能留给下次循环拉取消息再处理。如下使用seek()方法修改分区偏移

/** * 分区不完整提交:* 记录本轮第一个标记为false的记录之后所有提交标记为true的偏移 * 下一轮拉取消息从第一个标记为false的偏移开始*/
// 分区1
TopicPartition topicPartition = new TopicPartition(topic_name, 1);
long firstFalseOffset =42;
consumer.seek(topicPartition,firstFalseOffset);

在不完整提交的状态下,下次执行poll()方法拉取的消息中包含上一批消息为标记为true的消息,所以还需要有机制记录上一轮拉取的消息中不完整提交中标记为true的消息,这些消息不需要再被处理,否则就会出现重复消费问题。

重复消费问题

即使如上面所说在程序中有机制记录上次不完整提交中标记为true的消息,在下次循环拉取消息后,对上次已经标记为true的消息不再被重复处理,还是无法完全避免重复消费问题。因为这只是解决当前消费者实例在当前消费循环中的重复消费问题。
在消息循环结束前最后一次拉取消息如果是不完整提交,如果这些不完整提交的数据没有持久化保存,那么在下次创建的消费者实例还是会有已经被确认消费的消息被重复消费的情况。
所以如果要完全解决重复消费问题,需要应用层对不完全提交的消息进行额外处理:

  1. 将确认为false的消息存储到缓冲区或持久化存储中:在处理确认为false的消息时,你可以将这些消息存储到缓冲区或持久化存储中,例如内存队列、数据库或文件系统。这样,下次启动消费者时,可以从缓冲区或存储中加载这些消息,并进行再次处理。
  2. 使用定时任务重新处理消息:你可以设置一个定时任务,定期检查确认为false的消息,并重新进行处理。定时任务可以根据需要从缓冲区或持久化存储中获取这些消息,并重新发送给消费者进行处理。

seek() VS commitSync()

seek()方法和commitSync()方法的作用都是通过更新分区的偏移值,控制拉取消息的位置,但这两个方法肯定是有区别的否则不可能设计两个方法干同样的事儿。

commitAsync()commitSync()方法作用是一样的,区别在于commitAsync()是异步提交
事实上我通过输出日志的方式发现commitAsync()执行结束调用OffsetCommitCallback对象时所在线程与commitAsync()执行在同一线程,也就是说commitAsync()可能也是同步提交

我通过反复的实验,对它们的差别有了初步的判断。但并不太确定。
于是,关于seek()方法和commitSync()方法的区别我问了bito机器人,这是它的回答,证实了我的想法,与我的实验结论是一致的。
在这里插入图片描述

我在机器人回答的基础上再做一些示例补充就是如下完整的说明:
commitSync() 方法管理的是消费者下次启动时获取消息的偏移量。当调用 commitSync() 方法时,消费者会将当前消费的最新偏移量提交给Kafka,并在下次启动时从该偏移量处继续消费。

比如:本次poll拉取了 100,101,102三条消息,commitSync提交偏移101,那么下次一轮执行poll拉取消息会从偏移103开始,此刻如果中止拉取消息,下次再重新启动消费者时拉取偏移为101。

seek() 方法更直接,它会修改当前消费者实例下次循环拉取消息的偏移量。如果你在消费者实例中调用 seek() 方法来设置偏移量,并在之后中止拉取消息,下次再启动消费者实例时,它会从你设置的偏移量处开始拉取消息。

还以上例,本次poll拉取了 100,101,102三条消息,seek修改偏移101,那么下次一轮执行poll拉取消息会从偏移101开始,如果此刻中止拉取消息,下次再重新启动消费者时拉取偏移为100,因为我们没有执行commitSync将偏移量持久化。

因此, commitSync() 方法影响的是下次消费者启动时的偏移量,而 seek() 方法影响的是当前消费者实例下次循环拉取消息的偏移量,并不会影响下次再启动消费者实例时的偏移量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/27827.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QT 使用单例模式

目录 1. 单例模式介绍 2.单例模式实现 1. 单例模式介绍 有些时候我们在做 qt 项目的时候,要用到很多类. 例如我们用到的类有 A,B,C,D. 其中,A 是 B,C,D 中都需要用到的类,A 类非常的抢手. 但是,A 类非常的占内存,定义一个 A 对象需要 500M 内存,假如在 B,C,D 中都定义一个 A 类…

面试之快速学习C++11- constexpr以及constexpr和const区别

学习地址: http://c.biancheng.net/view/3730.html 10.constexpr:验证是否为常量表达式 常量表达式: 指的就是由多个(≥1)常量组成的表达式,换句话说,如果表达式中的成员都是常量,…

模型训练之train.py代码解析

题目 作者:安静到无声 个人主页 from __future__ import absolute_import from __future__ import division from __future__ import print_function这段代码使用了Python 2.x的__future__模块来导入Python 3.x的一些特性。在Python 2.x中,使用print语句来输出内容,而在Py…

精准定位:私域流量运营方法解析

随着市场竞争的不断加剧,企业越来越意识到私域流量运营的重要性。与传统的广告推广相比,私域流量运营可以更加精准地定位目标用户,提高用户参与度和忠诚度,从而实现更高的转化率和销售增长。在本文中,我们将深入探讨私…

Modbus TCP转Profibus DP网关modbus tcp报文解析

捷米JM-DPM-TCP网关。在Profibus总线侧作为主站,在以太网侧作为ModbusTcp服务器功能, 下面是介绍捷米JM-DPM-TCP主站网关组态工具的配置方法 2, Profibus主站组态工具安装 执行资料光盘中的安装文件setup64.exe或setup.exe安装组态工具。安装过程中一直…

vscode 设置滑条颜色

1. 默认的滑条是灰黑色的,很难看的清 2. 左下角,打开VS Code 设置功能 3. 输入命令 workbench color,回车 4. 找到工作台:自定义颜色设置,打开设置文件 setting.json 5. 打开配置文件 6. 添加颜色配置 "workben…

C 语言的 getchar() 函数和 putchar() 函数

getchar() 函数和 putchar() 函数是一对字符输入和输出函数. getchar() 作用&#xff1a;get a character from stdin 原型&#xff1a;int getchar( void ); Required Header&#xff1a;<stdio.h> Compatibility&#xff1a;ANSI Return value&#xff1a;return…

暑假刷题第23天--8/7

D-游游的k-好数组_牛客周赛 Round 6 (nowcoder.com)&#xff08;关键--a[1]a[k1]&#xff09; #include<iostream> #include<algorithm> using namespace std; const int N100005; int a[N]; typedef pair<int,int>PII; PII b[N]; void solve(){int n,k,x;…

unraid docker桥接模式打不开页面,主机模式正常

unraid 80x86版filebrowser&#xff0c;一次掉电后&#xff0c;重启出现权限问题&#xff0c;而且filebrowser的核显驱动不支持amd的VA-API 因为用不上核显驱动&#xff0c;解压缩功能也用不上&#xff0c;官方版本的filebrowser还小巧一些&#xff0c;18m左右 安装的时候总是…

C语言每日一题:14《数据结构》复制带随机指针的链表

题目一&#xff1a; 题目链接&#xff1a; 思路一&#xff1a; 找相对位置暴力求解的方法&#xff1a; 1.复制一个新的链表出来遍历老的节点给新的节点赋值&#xff0c;random这个时候不去值。 2.两个链表同时遍历&#xff0c;遍历老链表的时候去寻找相对位置&#xff0c;在遍…

HarmonyOS应用开发的新机遇与挑战

HarmonyOS 4已经于2023年8月4日在HDC2023大会上正式官宣。对广大HarmonyOS开发者而言&#xff0c;这次一次盛大的大会。截至目前&#xff0c;鸿蒙生态设备已达7亿台&#xff0c;HarmonyOS开发者人数超过220万。鸿蒙生态充满着新机遇&#xff0c;也必将带来新的挑战。 HarmonyO…

Java volatile关键字分析

每个线程创建时&#xff0c;JVM会为其创建一份私有的工作内存&#xff08;栈空间&#xff09;&#xff0c;不同线程的工作内存之间不能直接互相访问 JMM规定所有的变量都存在主内存&#xff0c;主内存是共享内存区域&#xff0c;所有线程都可以访问 线程对变量进行读写&#xf…

Rocketmq Filter 消息过滤(TAGS、SQL92)原理详解 源码解析

1. 背景 1.1 Rocketmq 支持的过滤方式 Rocketmq 作为金融级的业务消息中间件&#xff0c;拥有强大的消息过滤能力。其支持多种消息过滤方式&#xff1a; 表达式过滤&#xff1a;通过设置过滤表达式的方式进行过滤 TAG&#xff1a;根据消息的 tag 进行过滤。SQL92&#xff1a…

windows服务器自动重启?

在使用电脑过程中&#xff0c;很多玩家都遇到过系统无故自动重启的情况&#xff0c;实际上引起计算机重启的原因有多种&#xff0c;其中硬件方面原因大致有以下几条。 1、电源是引起系统自动重启的最大嫌疑之一。 劣质的电源不能提供足够的电量&#xff0c;当系统中的设备增多…

【雕爷学编程】MicroPython动手做(33)——物联网之天气预报

天气&#xff08;自然现象&#xff09; 是指某一个地区距离地表较近的大气层在短时间内的具体状态。而天气现象则是指发生在大气中的各种自然现象&#xff0c;即某瞬时内大气中各种气象要素&#xff08;如气温、气压、湿度、风、云、雾、雨、闪、雪、霜、雷、雹、霾等&#xff…

【docker】设置 docker 国内镜像报错问题,解决方案

一、报错&#xff1a; [rootlocalhost ~]# systemctl restart docker Job for docker.service failed because the control process exited with error code. See "systemctl status docker.service" and "journalctl -xe" for details.二、原因&#xf…

24届近5年江南大学自动化考研院校分析

今天给大家带来的是江南大学控制考研分析 满满干货&#xff5e;还不快快点赞收藏 一、江南大学 学校简介 江南大学&#xff08;Jiangnan University&#xff09;是国家“双一流”建设高校&#xff0c;“211工程”、“985工程优势学科创新平台”重点建设高校&#xff0c;入选…

Windows新版文件资源管理器经常在后台弹出的临时解决方案

禁用组策略自动刷新 运行gpedit.msc找到计算机配置->管理模板->系统->组策略找到 “关闭组策略的后台刷新”启用 参考 https://answers.microsoft.com/en-us/windows/forum/all/windows-11-most-recently-opened-explorer-window/26e097bd-1eba-4462-99bd-61597b5…

C#打开文件对话框、保存文件对话框、字体以及颜色对话框

打开文件对话框 //创建打开文件的对象OpenFileDialog openFileDialog new OpenFileDialog();openFileDialog.Title "请选择要打开的文件";//设置对话框标题openFileDialog.Multiselect true; //设置对话框可以多选openFileDialog.InitialDirectory "C:\Use…