【快速解决】kafka崩了,重启之后,想继续消费,怎么做?

目录

一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 

1、一个问题:

 2、查看消费者消费主题__consumer_offsets

3、一个重要前提:消费时要提交offset

二、指定 Offset 消费


假如遇到kafka崩了,你重启kafka之后,想要继续消费,应该怎么办?

  1. 首先确定要消费的主题是哪几个
  2. 其次使用命令或者其他的组件查看 __consumer_offset 主题下的偏移量信息,找到我们关心的主题在崩溃之前消费到了哪里。
  3. 最后使用 java 代码,里面有一个非常重要的方法 seek,指定需要消费的主题,分区以及偏移量,就可以继续消费了。

下面是解决这个问题的具体步骤!!! 

一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 

        因为__consumer_offset 主题下记录了主题的偏移量信息,所以提交offset之后,消费__consumer_offset 主题便可查看所有主题的偏移量信息

1、一个问题:

__consumer_offset 主题下的数据是不能查看的,怎么解决?

解决方案:

在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false。然后分发一下

分发:(这里我使用了脚本)

xsync.sh /opt/installs/kafka3/config/consumer.properties

注意:修改之前要先关闭kafka和zookeeper,修改完毕后再开启!

说明:默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。如果不修改是无法查看offset的值的,因为这些都是加密数据。

 2、查看消费者消费主题__consumer_offsets

kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server bigdata01:9092 --consumer.config /opt/installs/kafka3/config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" 

查询结果如下:

图中:1是消费者组名;2是Topic主题名;3是分区;4是偏移量,说明该主题崩溃前消费到了这里

此时便查询到了偏移量信息!!!

3、一个重要前提:消费时要提交offset

能查询到偏移量的前提是消费时要自动提交 offset(默认开启)或者手动提交 offset

一般不用管,因为自动提交会默认开启!!!

二、指定 Offset 消费

 kafka提供了seek方法,可以让我们从分区的固定位置开始消费。

seek (TopicPartition Partition,offset offset):指定分区和偏移量

package com.bigdata._03offsetTest;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;public class _03CustomConsumerSeek {public static void main(String[] args) {Properties properties = new Properties();//连接kafka setProperty和put都行properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test5");// 是否自动提交 offset  通过这个字段设置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?List<String> topics = new ArrayList<>();topics.add("bigdata");// list可以设置多个主题的名称kafkaConsumer.subscribe(topics);// 执行计划// 此时的消费计划是空的,因为没有时间生成Set<TopicPartition> assignment = kafkaConsumer.assignment();while(assignment.size() == 0){// 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环assignment = kafkaConsumer.assignment();}// 获取分区0的offset =100 以后的数据kafkaConsumer.seek(new TopicPartition("bigdata",0),100);// 因为消费者是不停的消费,所以是while truewhile(true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record:records) {// 打印一条数据System.out.println(record);// 打印数据中的值System.out.println(record.value());}}}
}

执行这个java代码就可以从精确的指定位置继续消费了!!!

结果如下:

从上图可以看出,确实是从指定的主题、分区、偏移量开始消费的! 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/886443.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kotlin深度面试题:协程、密封类和高阶函数

文章目录 知识回顾前言源码分析1.面试题目1&#xff1a;Kotlin中的协程与线程的区别是什么&#xff1f;如何在Android中使用协程进行异步编程&#xff1f;2.面试题目2&#xff1a;Kotlin中的扩展函数和扩展属性是什么&#xff1f;如何在Android开发中使用它们&#xff1f;3. 面…

查询DBA_FREE_SPACE缓慢问题

这个是一个常见的问题&#xff0c;理论上应该也算是一个bug&#xff0c;在oracle10g&#xff0c;到19c&#xff0c;我都曾经遇到过&#xff1b;今天在给两套新建的19C RAC添加监控脚本时&#xff0c;又发现了这个问题&#xff0c;在这里记录一下。 Symptoms 环境&#xff1a;…

【网络安全】网络安全防护体系

1.网络安全防护体系概述 1.1 网络安全的重要性 网络安全是保护网络空间不受恶意攻击、数据泄露和其他安全威胁的关键。随着数字化转型的加速&#xff0c;网络安全的重要性日益凸显&#xff0c;它不仅关系到个人隐私和企业机密的保护&#xff0c;还涉及到国家安全和社会稳定。…

从零开始学习 sg200x 多核开发之 TF 存储卡根文件系统扩容

入式 Linux 镜像制作时&#xff0c;考虑体积等因素&#xff0c;会把根文件系统做的比较小&#xff0c;镜像包较小&#xff0c;方便量产&#xff1b;有时&#xff0c;我们的 tf 或 emmc 的容量较大&#xff0c;烧写镜像后&#xff0c;有较大的空余空间未使用&#xff0c;现尝试把…

【Apache Paimon】-- 1 -- Apache Paimon 是什么?

目录 1、简介 2、概览 3、哪些场景可以使用 Paimon 4、周边生态 5、小结 6、参考 1、简介 我们听说过数据仓库、数据湖、数据湖仓,那你听说过流式数据仓库(Stream warehouse,简称:Streamhouse)吗?那我们今天就来解锁看看他们之中的新秀: Apache paimon 到底是什么…

[实战]SpringBoot使用MongoTemplate存储Float精度丢失问题

问题&#xff1a;使用SpringBoot2.x版本进行MongoDB的存储操作&#xff0c;Float类型数据出现精度丢失问题 解决方案如下&#xff1a; 情况一、字段类型为JSONObject进行存储时 设置值时采用Decimal128类型 Decimal128 value new Decimal128(new BigDecimal(declaredField.g…

Oracle 单机及 RAC 环境 归档模式及路径修改

Oracle 数据库的使用过程中经常会根据需求的不同而调整归档模式&#xff0c;也经常会修改归档文件存放路径。 下面分别演示单机及 RAC 环境下修改归档模式及路径的操作步骤。 一、单机环境 1.查询当前归档模式及路径 SQL> archive log list Database log mode …

用OMS进行 OceanBase 租户间数据迁移的测评

基本概念 OceanBase迁移服务&#xff08;&#xff0c;简称OMS&#xff09;&#xff0c;可以让用户在同构或异构 RDBMS 与OceanBase 数据库之间进行数据交互&#xff0c;支持数据的在线迁移&#xff0c;以及实时增量同步的复制功能。 OMS 提供了可视化的集中管控平台&#xff…

IDEA一键部署SpringBoot项目到服务器

安装Alibaba Cloud Toolkit插件 配置部署环境 1&#xff1a;设置服务名称 2&#xff1a;选择文件上传的类型 3:选择打包之后的jar文件 4: 添加需要上传的服务器信息 5:需要上传到服务器的地址 输入绝对路径 6: 选择上传文件后执行的脚本 可以参考另一篇文章 Linux启…

渗透测试之信息收集 DNS主机发现探测方式NetBIOS 协议发现主机 以及相关PorCheck scanline工具的使用哟

目录 主机发现 利用NetBIOS 协议发现主机 利用TCP/UDP发现主机 PorCheck scanline 利用DNS协议发现主机 主机发现 信息收集中的一项重要工作是发现内网中的主机、数据库、IP段网络设备、安全设备等资产&#xff0c;以便于更快地获取更多权限和密码&#xff0c;更加接近红…

打造专业问答社区:Windows部署Apache Answer结合cpolar实现公网访问

文章目录 前言1. 本地安装Docker2. 本地部署Apache Answer2.1 设置语言选择简体中文2.2 配置数据库2.3 创建配置文件2.4 填写基本信息 3. 如何使用Apache Answer3.1 后台管理3.2 提问与回答3.3 查看主页回答情况 4. 公网远程访问本地 Apache Answer4.1 内网穿透工具安装4.2 创建…

基于java的医院门诊信息管理系统

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 数据…

67页PDF |埃森哲_XX集团信息发展规划IT治理优化方案(限免下载)

一、前言 这份报告是埃森哲_XX集团信息发展规划IT治理优化方案&#xff0c;报告中详细阐述了XX集团如何优化IT治理结构以适应新的要求。报告还分析了集团管控模式的变化&#xff0c;提出了六大业务中心的差异化管控策略&#xff0c;并探讨了这些变化对IT治理模式的影响。报告进…

Python基础学习-08字符串

目录 1、常见的字符串表达式 2、字符串函数 3、本节总结 1、常见的字符串表达式 1&#xff09; s “hello” 2&#xff09; s “张三说&#xff1a; \” 你好\”” 3&#xff09; ””” 多行字符串””” 4&#xff09; s1 s2 s1 * 3 5&#xff09; s[i…

ACIS的interop主要有哪些功能

‌ACIS的Interop组件‌主要用于实现不同3D应用程序之间的数据转换和集成。其主要功能包括&#xff1a; ‌数据转换‌&#xff1a;Interop组件能够对市场上流行的3D格式&#xff08;如CATIA V5、CATIA V4、IGES、STEP、VDA-FS、Pro/E、Parasolid、UG、SolidWorks、Inventor和AC…

【AI大模型】ELMo模型介绍:深度理解语言模型的嵌入艺术

学习目标 了解什么是ELMo.掌握ELMo的架构.掌握ELMo的预训练任务.了解ELMo的效果和成绩.了解ELMo的优缺点. 目录 &#x1f354; ELMo简介 &#x1f354; ELMo的架构 2.1 总体架构 2.2 Embedding模块 2.3 两部分的双层LSTM模块 2.4 词向量表征模块 &#x1f354; ELMo的预…

Java EE 技术基础知识体系梳理

1. Java EE 平台概述 1.1 发展历程 Java EE 从 J2EE 发展而来&#xff0c;经历了多个版本的演进&#xff0c;从早期的 J2EE 1.2 到最新的 Jakarta EE。 1.2 架构特点 多层架构&#xff1a; 客户端层&#xff1a;用户界面&#xff0c;如 Web 浏览器、移动应用等。Web 层&…

【srm,招标询价】采购电子化全流程,供应商准入审核,在线询价流程管理(JAVA+Vue+mysql)

前言&#xff1a; 随着互联网和数字技术的不断发展&#xff0c;企业采购管理逐渐走向数字化和智能化。数字化采购平台作为企业采购管理的新模式&#xff0c;能够提高采购效率、降低采购成本、优化供应商合作效率&#xff0c;已成为企业实现效益提升的关键手段。系统获取在文末…

优选算法 - 4 ( 链表 哈希表 字符串 9000 字详解 )

一&#xff1a;链表 1.1 链表常用技巧和操作总结 1.2 两数相加 题目链接&#xff1a;两数相加 /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode() {}* ListNode(int val) { this.val val; }* …

小鸡模拟器 1.8.11 | 街机怀旧重温经典游戏,支持手柄

小鸡模拟器是一款支持多种经典游戏机模拟的游戏应用&#xff0c;包括街机、索尼(SONY)、世嘉、任天堂等主流掌机游戏以及PSP、GBA、NDS、SFC(超级任天堂SNES)、FC(红白机NES)、MD(世嘉MEGA DRIVE)、PS1、PS2等。应用支持手柄完美操作&#xff0c;兼容安卓手柄&#xff0c;让玩家…