KAFKA消费者-进阶用法

Apache Kafka 是一个分布式流处理平台,用于构建实时流数据管道和应用程序。在 Kafka 中,消费者(Consumer)用于从 Kafka 主题(Topic)中读取消息并进行处理。本文将介绍 Kafka 消费者的进阶用法,包括手动提交偏移量、消费者组、重新平衡等功能。

一. 创建 Kafka 消费者

首先,我们需要创建 Kafka 消费者,并配置消费者属性。

而创建消费在方式有俩种,一种是注解,另一种则是通过new KafkaConsumer()的方式来获取到消费者实例;

1.注解:

@KafkaListener(topic = "your-topic")public void handle(ConsumerRecord consumerRecord) {System.out.println("消费者消费消息:" + consumerRecord);System.out.println(String.format("消费者收到消息,topic:%s,partition:%s", consumerRecord.topic(), consumerRecord.partition()));System.out.println("消费内容:" + consumerRecord.value());}@KafkaListener(topics = {"your-topic1", "your-topic2"})public void handleCMDB(ConsumerRecord consumerRecord) {System.out.println("消费者消费消息:" + consumerRecord);System.out.println(String.format("消费者收到消息,topic:%s,partition:%s", consumerRecord.topic(), consumerRecord.partition()));logger.info(String.format("消费者收到消息,topic:%s,partition:%s", consumerRecord.topic(), consumerRecord.partition()));System.out.println("消费内容:" + consumerRecord.value());}

 使用注解通过topic=指定需要监听的通道,同时可以使用topics监听多个通道,通过consumerRecord.value()即可获取到通道中的数据值;而kafka的配置需要在yml配置文件中指定

2.new KafkaConsumer()实例:

使用创建实例的方式需要指定properties配置,否则获取到的消费者实例为空,没有任何意义,而创建消费者来进行消费的方法有俩种,一种方式必须指定groupid,一种方式是默认会指定groupid不需要手动指定,以下是代码示例:

// 方式1:需要指定gruopid
public String useMsg(){Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "yourKafkaServers");props.put(ConsumerConfig.GROUP_ID_CONFIG, "defaultConsumerGroup"); // 默认组props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("kafka_test_topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}} catch (WakeupException e) {// Ignore exception for shutdown} finally {consumer.close();}return "消费成功";}// 方式2:无需手动指定gruopid
public String useOtherMsg(){Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "yourKafkaServers");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 使用TopicPartition指定消费者主题和分区TopicPartition topicPartition = new TopicPartition("yourtopic", 0);consumer.assign(Arrays.asList(topicPartition));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("消费消息!!!!");System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}} catch (WakeupException e) {// Ignore exception for shutdown} finally {consumer.close();}return "无group消费者消费数据";}

二. 手动提交偏移量

手动提交偏移量可以确保消息被成功处理后再提交偏移量,避免消息丢失或重复消费。

设置偏移量有一种简单粗暴的方式:

consumer.seek(topicPartition, 0);

直接将偏移量设置为0,即从队列最开始的地方读取数据

第二种方式根据时间戳来设置偏移量,代码如下:

TopicPartition topicPartition = new TopicPartition("yourtopic", 0);consumer.assign(Arrays.asList(topicPartition));// 指定偏移量开始时间戳,默认为当天00:00的时间戳long timestampToSearch = LocalDate.now().atStartOfDay(ZoneId.systemDefault()).toEpochSecond() * 1000;if (kafkaConfig.getLastRunTime() != null) {// 默认十秒提前量容错long advanceTime = kafkaConfig.getAdvanceTime() != null ? kafkaConfig.getAdvanceTime() : 10000;timestampToSearch = kafkaConfig.getLastRunTime().getTime() - advanceTime;}logger.info("设置当前偏移量开始时间戳:" + timestampToSearch);// 查找每个分区在指定时间的偏移量Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();timestampsToSearch.put(topicPartition, timestampToSearch);Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(timestampsToSearch);// 将消费者指针移动到指定时间的偏移量for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetsForTimes.entrySet()) {TopicPartition partition = entry.getKey();OffsetAndTimestamp offsetAndTimestamp = entry.getValue();if (offsetAndTimestamp != null) {consumer.seek(partition, offsetAndTimestamp.offset());}}

这里的getLastRunTime即为获取到上次从队列中退出时的时间,advanceTime为容错时间,即退出时间需要往前推迟多久,这样可能会读取到重复的一俩条数据,但并不会丢失数据,有利有弊且利大于弊;有细心读者会发现这里的队列设置偏移量是使用的不需要指定groupid的消费者,那么指定groupid的消费者该如何设置偏移量呢?别急,下面就给你们端上来:
 

consumer.subscribe(Collections.singletonList("yourtopic"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {// 分区被撤销时的处理,这里可以清空任何与这些分区相关的信息}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {// 分区被分配时的处理TopicPartition topicPartition = new TopicPartition("yourtopic", 0);// 指定偏移量开始时间戳,默认为当天00:00的时间戳long timestampToSearch = LocalDate.now().atStartOfDay(ZoneId.systemDefault()).toEpochSecond() * 1000;if (kafkaConfig.getLastRunTime() != null) {// 默认十秒提前量容错long advanceTime = kafkaConfig.getAdvanceTime() != null ? kafkaConfig.getAdvanceTime() : 10000;timestampToSearch = kafkaConfig.getLastRunTime().getTime() - advanceTime;}logger.info("设置当前偏移量开始时间戳:" + timestampToSearch);Map<TopicPartition, Long> timestampsToSearch = Collections.singletonMap(topicPartition, timestampToSearch);Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(timestampsToSearch);for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetsForTimes.entrySet()) {OffsetAndTimestamp offsetAndTimestamp = entry.getValue();if (offsetAndTimestamp != null) {consumer.seek(entry.getKey(), offsetAndTimestamp.offset());}}}});

通过内部类,重写了ConsumerRebalanceListener 接口的实现来处理分区的重新分配情况,同时根据时间戳重新设置偏移量,这也是所谓的重新平衡,以实现从指定时间点开始消费消息的功能

三. 消费者组和重新平衡

消费者组允许多个消费者共同消费一个主题,并通过重新平衡(Rebalance)来分配分区给不同的消费者。

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);consumer.subscribe(Collections.singletonList(TOPIC_NAME));consumer.poll(0); // 触发消费者加入消费者组
consumer.seekToBeginning(consumer.assignment());consumeMessages(consumer);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/841484.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux(六)

Linux&#xff08;六&#xff09; 自定义头文件自定义头文件中写什么如何引入头文件条件编译条件编译作用 gcc工作原理Make 工作管理器什么是Make什么是Makefile/makefileMakefile假目标Makefile中的变量自定义变量预定义变量自动变量 Makefile中变量展开方式递归展开方式简单展…

正运动机器视觉运动控制一体机应用例程

机器视觉运动控制一体机应用例程-多目标形状匹配-正运动技术 (zmotion.com.cn) 机器视觉运动控制一体机应用例程&#xff08;二&#xff09; 颜色识别-正运动技术 (zmotion.com.cn) 机器视觉运动控制一体机应用例程&#xff08;三&#xff09; 基于BLOB分析的多圆定位-正运动…

2024攻防演练利器之必修高危漏洞合集

随着网络安全的发展和攻防演练工作的推进&#xff0c;红蓝双方的技术水平皆在实践中得到了很大的提升&#xff0c;但是数字化快速发展也导致了企业的影子资产增多&#xff0c;企业很多老旧系统依旧存在历史漏洞&#xff0c;与此同时&#xff0c;在攻防演练期间&#xff0c;往往…

TalkingData 数据统计详解

一、引言 在现代数据驱动的商业环境中&#xff0c;准确、及时的数据统计与分析对于企业的决策具有至关重要的作用。TalkingData 是中国领先的独立第三方数据智能服务平台&#xff0c;专注于提供专业的数据统计和分析解决方案。本文将详细介绍 TalkingData 的基本概念、主要功能…

利用EAS自动生成数据模型和sql脚本

EAS适用于敏捷开发中小系统,这节主要讲解EAS对应的模型和数据库脚本输出应用。 在这个应用程序中,用户可自定义实体模型和枚举模型,只要选择相应的实体或者枚举进行右击添加即可。 解决方案参数设定,在解决方案的设定中可设置项目名称、通用语言,命名空间和输出位置。 连…

C语言系列文章 | 函数 (共 10209 字)

目前主要分为三个专栏&#xff0c;后续还会添加&#xff1a; 专栏如下&#xff1a; C语言刷题解析 C语言系列文章 我的成长经历 感谢阅读&#xff01; 初来乍到&#xff0c;如有错误请指出&#xff0c;感谢&#xff01; 目录 函数的概念库函数自…

设计软件有哪些?建模和造型工具篇(1),渲染100邀请码1a12

之前我们介绍的都是渲染软件&#xff0c;但对于设计师来说建模和造型是在渲染之前&#xff0c;所以从现在开始&#xff0c;我们会介绍一批建模和造型工具。 1、ZBrush ZBrush是由Pixologic公司开发的数字雕刻和绘画软件&#xff0c;专为艺术家和设计师而设计。它结合了3D建模…

抖店如何打造出爆品?学好这几招,轻松打爆新品流量

大家好&#xff0c;我是电商花花。 近年来&#xff0c;抖店商家越来越多&#xff0c;而选品&#xff0c;爆品就是我们商家竞争的核心了&#xff0c;谁能选出好的新品&#xff0c;打造出爆品&#xff0c;谁的会赚的多&#xff0c;销量多。 做抖音小店想出单&#xff0c;想赚钱…

转置卷积简明教程

转置卷积层也被&#xff08;错误地&#xff09;称为反卷积层。反卷积层反转了标准卷积层的操作&#xff0c;即如果对通过标准卷积层生成的输出进行反卷积&#xff0c;则会返回原始输入。转置卷积层与反卷积层相似&#xff0c;因为两者生成的空间维度相同。转置卷积不是通过值反…

Java+Spring Boot +MySQL + MyBatis Plus一款数字化管理平台源码:云MES系统

JavaSpring Boot MySQL MyBatis Plus一款数字化管理平台源码&#xff1a;云MES系统 MES是为企业提供制造全过程的信息化产品&#xff0c;支持企业智能制造。MES可实现与企业的ERP、PDM等其他信息化系统进行无缝连接&#xff0c;也可与现场生产设备进行连接、数据采集&#xff…

【WEB前端2024】开源智体世界:乔布斯3D纪念馆-第29课-会员制展厅

【WEB前端2024】开源智体世界&#xff1a;乔布斯3D纪念馆-第29课-会员制展厅 使用dtns.network德塔世界&#xff08;开源的智体世界引擎&#xff09;&#xff0c;策划和设计《乔布斯超大型的开源3D纪念馆》的系列教程。dtns.network是一款主要由JavaScript编写的智体世界引擎&…

docker 常用服务的持久化安装

安装mongo docker pull docker.mirrors.sjtug.sjtu.edu.cn/library/mongo mkdir -p /root/mongo/data docker run -itd --name mongo -v /root/mongo/data:/data/db -p 27017:27017 mongo安装普罗米修斯 # 下载普罗米 docker pull docker.mirrors.sjtug.sjtu.edu.cn/prom/pro…

亲测有效,通过接口实现完美身份证号有效性验证+身份证与姓名匹配查询身份实名认证接口(实时)

最近发现一个限时认证的接口分享给大家&#xff0c;有需要的拿去试下吧. 附上部分密钥f478186edba9854f205a130aa888733d227a8f82f98d84b9【剩余约125450次&#xff0c;无时间限制】 b6131281611f6e1fc86c8662f549bdd683a68517203ba312【剩余约1300次&#xff0c;无时段限制】 …

MySQL技术点合集

目录 1. MySQL目录 2. 验证是否首次登陆方法 3. 在Liunx中使用命令来输入sql语句方法 4. 获取修改密码 5. 关闭密码策略 6. 忘记MySQL密码找回 7. 旋转90度横向查看表 8. 添加一个远程连接的用户 1. MySQL目录 /usr/bin/mysql相关命令vim /etc/my.cnfmysql配置文件ls /…

$subcribe的使用

$subcribe的使用 只要是store都有$subscribe函数&#xff0c;是订阅的意思&#xff0c;可以监测到store中数据的变化 使用$subscribe函数可以实现刷新不丢失&#xff0c;将数据保存到浏览器的本地存储中&#xff0c;每次进入页面都使用localStorage的数据填充页面

手把手教学,一站式教你实现服务器(Ubuntu)Anaconda多用户共享

背景&#xff1a;书接上回&#xff0c;一站式安装Ubuntu及配置服务器手把手教学&#xff0c;一站式安装ubuntu及配置服务器-CSDN博客 在安装及配置好服务器后&#xff0c;因为课题组可能涉及多个用户共用一台服务器&#xff0c;为了防止服务器上代码误删和Anaconda环境管理混乱…

⌈ 传知代码 ⌋ 实现沉浸式交互故事体验

&#x1f49b;前情提要&#x1f49b; 本文是传知代码平台中的相关前沿知识与技术的分享~ 接下来我们即将进入一个全新的空间&#xff0c;对技术有一个全新的视角~ 本文所涉及所有资源均在传知代码平台可获取 以下的内容一定会让你对AI 赋能时代有一个颠覆性的认识哦&#x…

springboot相关知识集锦----2

一、spring简化配置具体是如何简化的&#xff1f; springboot通过自动配置&#xff0c;让开发者无需手动配置大量繁琐的配置项。它基于项目的依赖关系&#xff0c;自动配置合适的bean和参数&#xff0c;从而极大地简化了spring应用初始化过程。 二、springboot是通过什么实现的…

批量漏洞挖掘思路小结

漏洞挖掘是指对应用程序中未知漏洞的探索&#xff0c;通过综合应用各种技术和工具&#xff0c;尽可能地找出其中的潜在漏洞。一般情况下漏洞挖掘针对单一的应用系统&#xff0c;通过端口扫描、目录扫描、文件扫描等方式对其安全性进行评估&#xff0c;而本文主要针对Nday和1day…

如何计算YOLOv8的推理速度FPS指标?

要计算YOLO系列模型的推理速度,可以使用FPS(每秒帧数)作为指标。以下是计算YOLO推理速度的步骤: 首先,确定用于推理的图像数量(例如,N张图像)。 记录推理过程中的起始时间。 对于每个图像,将其输入模型进行推理,并记录推理结束时间。 计算总共花费的时间,即推理结束…