【大数据之Kafka】十二、Kafka之offset位移及漏消费和重复消费

1 offset的默认维护位置

  Kafka0.9版本之前, consumer默认将offset保存在Zookeeper中。从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为 consumer_offsets。
在这里插入图片描述
  consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间, kafka 内部会对这个 topic 进行 compact,也就是每个group.id+topic+分区号就保留最新数据。

消费offset案例:

(1) consumer_offsets 作为Kafka 中的 topic,那就可以通过消费者进行消费。
  在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false, 默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false,并分发。

(2)hadoop102用命令行方式,创建一个新的topic。

bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --create --topic newtopic --partitions 2 --replication-factor 2

(3)hadoop102启动生产者往 newtopic 生产数据。

bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic newtopic

(4)hadoop104启动消费者消费 newtopic 数据。注意:指定消费者组名称,更好观察数据存储位置(key 是 group.id+topic+分区号)。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic newtopic --group test

(5)hadoop103查看消费者消费主题 consumer_offsets。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

2 自动提交offset
  Kafka提供了自动提交offset的功能。

低层原理:
  生产者向对应的分区发送数据,之后消费者不断地主动拉取分区中的数据,每5soffset自动提交到_consumer_offsets。
在这里插入图片描述
在这里插入图片描述
消费者自动提交offset

package com.study.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerAutoOffset {public static void main(String[] args) {//0.配置Properties properties = new Properties();//连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");//配置反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组(组名任意取)properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");//是否自动提交offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//提交offset的时间周期1000ms,默认5sproperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);//1.创建消费者对象KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//2.订阅主题,注册要消费的主题,可以有多个ArrayList<String> topics = new ArrayList<>();topics.add("first1");kafkaConsumer.subscribe(topics);//3.消费,拉取数据,打印while (true){//设置1s消费一批数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

  依次启动CustomConsumerAutoOffset和CustomProducerCallback,观察CustomConsumerAutoOffset能不能接受到数据,能接受则说明自动提交offset功能是ok的。

3 手动提交offset

  虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API,供用户自己掌握提交时间。

底层原理:
  生产者向相应分区发送数据,消费者不断地从分区中拉取数据,拉取之后的offset得根据判断进行提交。
  手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。

相同点:将本次提交的一批数据最高的偏移量提交。
不同点:同步提交阻塞当前线程,一直到提交成功才会拉取数据,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

(1)commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
(2)commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
在这里插入图片描述
(1)手动同步提交offset
  由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。

package com.study.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {//0.配置Properties properties = new Properties();//连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");//配置反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组(组名任意取)properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");//是否自动提交offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//1.创建消费者对象KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//2.订阅主题,注册要消费的主题,可以有多个ArrayList<String> topics = new ArrayList<>();topics.add("first1");kafkaConsumer.subscribe(topics);//3.消费,拉取数据,打印while (true){//设置1s消费一批数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 同步提交offset kafkaConsumer.commitSync();}}
}

  依次启动CustomConsumerByHandSync和CustomProducerCallback,观察CustomConsumerByHandSync能不能接受到数据,能接受则说明手动提交offset功能是ok的。

(2)手动异步提交offset
  虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

package com.study.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerByHandAsync {public static void main(String[] args) {//0.配置Properties properties = new Properties();//连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");//配置反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组(组名任意取)properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");//是否自动提交offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//1.创建消费者对象KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//2.订阅主题,注册要消费的主题,可以有多个ArrayList<String> topics = new ArrayList<>();topics.add("first1");kafkaConsumer.subscribe(topics);//3.消费,拉取数据,打印while (true){//设置1s消费一批数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 异步提交 offsetkafkaConsumer.commitAsync();}}
}

  依次启动CustomConsumerByHandAsync和CustomProducerCallback,观察CustomConsumerByHandAsync能不能接受到数据,能接受则说明手动提交offset功能是ok的。

4 指定offset消费

auto.offset.reset = earliest | latest | none,默认是 latest。
  当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),使用auto.offset.reset

(1)earliest:自动将偏移量重置为最早的偏移量,即从最开始的地方进行消费–from-beginning。

(2)latest(默认值):自动将偏移量重置为最新偏移量,即从最后的地方进行消费。

(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
在这里插入图片描述
注意:每次执行完,需要修改消费者组名。

package com.study.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;public class CustomConsumerSeek {public static void main(String[] args) {//0.配置Properties properties = new Properties();//连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");//配置反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组(组名任意取)properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");//1.创建消费者对象KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//2.订阅主题,注册要消费的主题,可以有多个ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);//制定位置进行消费Set<TopicPartition> assignment = new HashSet<>();//保证分区分配方案已经制定完毕while (assignment.size() == 0){kafkaConsumer.poll(Duration.ofSeconds(1));//获取消费者分区分配信息(有了分区分配信息才能开始消费)assignment = kafkaConsumer.assignment();}//遍历所有分区,并制定offset从300的位置开始消费for (TopicPartition topicPartition : assignment) {kafkaConsumer.seek(topicPartition,300);}//3.消费,拉取数据,打印while (true){//设置1s消费一批数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

  依次启动CustomConsumerSeek和CustomProducerCallback,观察CustomConsumerSeek能不能接受到数据,能接受则说明指定位置offset消费功能是ok的。

4.5 指定时间消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据。

package com.study.kafka.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;public class CustomConsumerForTime {public static void main(String[] args) {//0.配置Properties properties = new Properties();//连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");//配置反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组(组名任意取)properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");//1.创建消费者对象KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//2.订阅主题,注册要消费的主题,可以有多个ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);//制定位置进行消费Set<TopicPartition> assignment = new HashSet<>();//保证分区分配方案已经制定完毕while (assignment.size() == 0){kafkaConsumer.poll(Duration.ofSeconds(1));//获取消费者分区分配信息(有了分区分配信息才能开始消费)assignment = kafkaConsumer.assignment();}//把时间转换为对应的offsetHashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();//封装集合存储,每个分区对应一天前的数据for (TopicPartition topicPartition : assignment) {timestampToSearch.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);}//获取从1天前开始消费的每个分区的offsetMap<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);//遍历每个分区,对每个分区设置消费时间for (TopicPartition topicPartition : assignment) {OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);//根据时间制定开始消费的位置if (offsetAndTimestamp != null){kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());}}//3.消费,拉取数据,打印while (true){//设置1s消费一批数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

  依次启动CustomConsumerForTime和CustomProducerCallback,观察CustomConsumerForTime能不能接受到数据,能接受则说明指定时间offset消费功能是ok的。

6 漏消费和重复消费

6.1 重复消费

  自动提交offset引起,已经消费了数据,但是offset没有提交。
例如:提交offset后的2s,消费者挂了。再次启动消费者,则从上一次提交的offset出继续消费,导致重复消费。
在这里插入图片描述
解决:消费者事务

6.2 漏消费

  先提交offset后消费、设置offset为手动提交,当offset被提交时,数据还在内存中国没有落盘,此时刚好消费者线程被kill掉。因为offset已经提交,但是数据没有处理,导致这部分内存中的数据消失。
在这里插入图片描述
解决:消费者事务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/77775.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【网络教程】记一次使用Docker手动搭建BT宝塔面板的全过程(包含问题解决如:宝塔面板无法开启防火墙,ssh,nginx等)

文章目录 准备安装安装宝塔面板开启ssh和修改ssh的密码导出镜像问题解决宝塔面板无法开启防火墙无法启动ssh设置密码nginx安装失败设置开机启动相关服务准备 演示的系统环境:Ubuntu 22.04.3 LTS更新安装/升级docker到最新版本升级docker相关命令如下# 更新软件包列表并自动升级…

TypeScript断言

什么是断言&#xff1f; 一个编译时语法&#xff0c;用于告诉编译器用户比编译器更加确定变量的类型&#xff0c;进而解除编译错误&#xff0c;类型断言有点类似于其他语言的类型转换&#xff0c;但它没有运行时的影响&#xff0c;只是在编译阶段起作用。所以&#xff0c;即使通…

用Navicat备份Mysql演示系统数据库的时候出:Too Many Connections

今天用Navicat进行数据备份的时候&#xff0c;发现由于数据库连接数目过多导致连接锁定&#xff0c;这种情况在多人协同开发的场景中很常见。当然我这里也因为多个应用使用了数据库连接&#xff0c;所以出现了Too Many Connections。 可能是超过最大连接数了。 1、进入Navicat…

【nosql】redis之高可用(主从复制、哨兵、集群)搭建

redis群集有三种模式 redis群集有三种模式&#xff0c;分别是主从同步/复制、哨兵模式、Cluster集群&#xff0c;下面会讲解一下三种模式的工作方式&#xff0c;以及如何搭建cluster群集 ●主从复制&#xff1a;主从复制是高可用Redis的基础&#xff0c;哨兵和集群都是在主从…

学习笔记|定时器|STC中断|定时器时间计算|STC32G单片机视频开发教程(冲哥)|第十一集:定时器的作用和意义

文章目录 1.定时器的作用和意义定时器中断定时器是定时器和计数器的统称。 2.STC32G单片机定时器使用原理2.1 先设置功能为定时器/计数器(本质都是加法计数器)2.2、在定时器模式下&#xff0c;设置不分频或者12分频∶Tips&#xff1a;选择不分频还是12分频2.3、定时器的工作模式…

【腾讯云Cloud Studio实战训练营】戏说cloud studio

文章目录 前言产品概述项目体验登录空间模板模板项目体验 总结 前言 在奇幻世界中&#xff0c;存在着一片神秘的云海&#xff0c;被人们称为腾讯云云端开发环境 Cloud Studio。这片云海是一座巨大的浮岛&#xff0c;上面漂浮着一个集成式开发环境&#xff08;Integrated Devel…

自动化测试入门知识 —— 数据驱动测试

一、什么是数据驱动测试&#xff1f; 数据驱动测试是一种测试方法&#xff0c;它的核心思想是通过不同的测试数据来验证同一个测试逻辑。通常情况下&#xff0c;测试用例中的输入数据和预期结果会被提取出来&#xff0c;以便可以通过不同的测试数据进行重复执行。 数据驱动测…

分享一个复合故障数据集

复合故障数据集 1.本数据集采集了轴承从正常状态到故障状态的振动信号&#xff0c; 包含失效的原因&#xff1a;内圈磨损&#xff0c;保持架断裂&#xff0c;外圈磨损和外圈裂损。其中有单一类型故障、单一故障组合的复合故障等多种失效形式&#xff0c;可用于诊断滚动轴承早期…

飞行动力学 - 第22节-动稳定性与运动方程 之 基础点摘要

飞行动力学 - 第22节-动稳定性与运动方程 之 基础点摘要 1. 稳定性定义2. 动稳定性示意图3. 数值仿真结构框图4. 运动响应类型5. 参考资料 1. 稳定性定义 飞机在平衡状态&#xff08;静止或匀速运动&#xff09;受到扰动&#xff1a; 有回到初始状态的趋势&#xff1b;静稳定…

微信怎么定时发圈?

定时发圈的妙用 在合适的时间点发布新的产品、促销活动&#xff0c;不仅能够及时提醒用户品牌的存在&#xff0c;还可以引发用户的兴趣&#xff0c;增加品牌的曝光率。 选择最佳的发朋友圈时间段&#xff0c;以确保推广内容得到最大的曝光和关注&#xff0c;提高广告投放的效果…

录音怎么转换成mp3格式?支持二十多种格式

录音怎么转换成mp3格式&#xff1f;在我们的日常生活和工作中&#xff0c;录音是一项非常有用的工具&#xff0c;随着手机以及录音设备越来越普及化&#xff0c;让录音这件事情变得非常的简单&#xff0c;录音可以帮助我们解决非常多的事情。例如通过录音&#xff0c;我们可以记…

Redis十大数据类型

✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a; Java从入门到精通 ✨特色专栏&#xf…

无涯教程-JavaScript - CUMIPMT函数

描述 CUMIPMT函数返回start_period和end_period之间的贷款累计利息。 语法 CUMIPMT (rate, nper, pv, start_period, end_period, type)争论 Argument描述Required/OptionalRateThe interest rate.RequiredNperThe total number of payment periods.RequiredPvThe present …

SpringBoot原理-自动配置-概述

自动配置 SpringBoot的自动配置就是当Spring容器启动后&#xff0c;一些配置类、bean对象就会自动存入IOC容器中&#xff0c;不需要我们手动去声明&#xff0c;从而简化了开发&#xff0c;省去了繁琐的配置操作。启动一个SpringBoot项目后&#xff0c;观察如下

【C++杂货铺】优先级队列的使用指南与模拟实现

文章目录 一、priority_queue的介绍二、priority_queue的使用2.1 数组中的第k个最大元素 三、priority_queue模拟实现3.1 仿函数3.2 成员变量3.3 成员函数3.3.1 构造函数3.3.2 AdjustDown3.3.3 push3.3.4 AdjustUp3.3.5 pop3.3.6 empty3.3.7 size 四、结语 一、priority_queue的…

PBR纹理的10种贴图

PBR 是基于物理的渲染的首字母缩写。它试图通过模拟材料如何吸收和反射光&#xff0c;以模仿现实世界中的光流的方式产生视觉效果。最近的游戏引擎由于其逼真的效果而越来越多地使用 PBR 纹理。对于实时渲染&#xff0c;它们被认为是真实世界场景的最佳近似值。 推荐&#xff…

JAVA 从入门到起飞 day8 面向对象01

1.面向对象的介绍 老师的讲解&#xff1a; 面向&#xff1a;就相当于拿找 对象&#xff1a;能干活的东西 面向对象编程&#xff1a;拿东西过来做对应的事 我的理解: 让我们通过一个比喻来了解 JAVA 中的面向对象思想。 想象一下你正在建一座房子&#xff1a; 1. **类&#…

Jmeter系列进阶-获取图片验证码(4)

安装工具 通过ocrserver工具识别图片验证码&#xff0c;解压后 .exe双击启动即可。 jmeter中使用 &#xff08;1&#xff09;HTTP请求获取验证码 &#xff08;2&#xff09;在获取验证码图片的接口下面添加监听器》保存响应到文件&#xff1b;如下图&#xff1a; &#x…

Qt/C++音视频开发51-推流到各种流媒体服务程序

一、前言 最近将推流程序完善了很多功能,尤其是增加了对多种流媒体服务程序的支持,目前支持mediamtx、LiveQing、EasyDarwin、nginx-rtmp、ZLMediaKit、srs、ABLMediaServer等,其中经过大量的对比测试,个人比较建议使用mediamtx和ZLMediaKit,因为这两者支持的格式众多,不…

【Linux环境】编译器 gcc/g++的使用

​&#x1f47b;内容专栏&#xff1a; Linux操作系统基础 &#x1f428;本文概括&#xff1a; 预处理、编译、汇编、链接、动静态库、gcc选项等。 &#x1f43c;本文作者&#xff1a; 阿四啊 &#x1f438;发布时间&#xff1a;2023.9.13 背景知识 预处理&#xff08;进行宏替换…