【快速解决】kafka崩了,重启之后,想继续消费,怎么做?

目录

一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 

1、一个问题:

 2、查看消费者消费主题__consumer_offsets

3、一个重要前提:消费时要提交offset

二、指定 Offset 消费


假如遇到kafka崩了,你重启kafka之后,想要继续消费,应该怎么办?

  1. 首先确定要消费的主题是哪几个
  2. 其次使用命令或者其他的组件查看 __consumer_offset 主题下的偏移量信息,找到我们关心的主题在崩溃之前消费到了哪里。
  3. 最后使用 java 代码,里面有一个非常重要的方法 seek,指定需要消费的主题,分区以及偏移量,就可以继续消费了。

下面是解决这个问题的具体步骤!!! 

一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 

        因为__consumer_offset 主题下记录了主题的偏移量信息,所以提交offset之后,消费__consumer_offset 主题便可查看所有主题的偏移量信息

1、一个问题:

__consumer_offset 主题下的数据是不能查看的,怎么解决?

解决方案:

在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false。然后分发一下

分发:(这里我使用了脚本)

xsync.sh /opt/installs/kafka3/config/consumer.properties

注意:修改之前要先关闭kafka和zookeeper,修改完毕后再开启!

说明:默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。如果不修改是无法查看offset的值的,因为这些都是加密数据。

 2、查看消费者消费主题__consumer_offsets

kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server bigdata01:9092 --consumer.config /opt/installs/kafka3/config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" 

查询结果如下:

图中:1是消费者组名;2是Topic主题名;3是分区;4是偏移量,说明该主题崩溃前消费到了这里

此时便查询到了偏移量信息!!!

3、一个重要前提:消费时要提交offset

能查询到偏移量的前提是消费时要自动提交 offset(默认开启)或者手动提交 offset

一般不用管,因为自动提交会默认开启!!!

二、指定 Offset 消费

 kafka提供了seek方法,可以让我们从分区的固定位置开始消费。

seek (TopicPartition Partition,offset offset):指定分区和偏移量

package com.bigdata._03offsetTest;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;public class _03CustomConsumerSeek {public static void main(String[] args) {Properties properties = new Properties();//连接kafka setProperty和put都行properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test5");// 是否自动提交 offset  通过这个字段设置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?List<String> topics = new ArrayList<>();topics.add("bigdata");// list可以设置多个主题的名称kafkaConsumer.subscribe(topics);// 执行计划// 此时的消费计划是空的,因为没有时间生成Set<TopicPartition> assignment = kafkaConsumer.assignment();while(assignment.size() == 0){// 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环assignment = kafkaConsumer.assignment();}// 获取分区0的offset =100 以后的数据kafkaConsumer.seek(new TopicPartition("bigdata",0),100);// 因为消费者是不停的消费,所以是while truewhile(true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record:records) {// 打印一条数据System.out.println(record);// 打印数据中的值System.out.println(record.value());}}}
}

执行这个java代码就可以从精确的指定位置继续消费了!!!

结果如下:

从上图可以看出,确实是从指定的主题、分区、偏移量开始消费的! 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/886443.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

查询DBA_FREE_SPACE缓慢问题

这个是一个常见的问题&#xff0c;理论上应该也算是一个bug&#xff0c;在oracle10g&#xff0c;到19c&#xff0c;我都曾经遇到过&#xff1b;今天在给两套新建的19C RAC添加监控脚本时&#xff0c;又发现了这个问题&#xff0c;在这里记录一下。 Symptoms 环境&#xff1a;…

用OMS进行 OceanBase 租户间数据迁移的测评

基本概念 OceanBase迁移服务&#xff08;&#xff0c;简称OMS&#xff09;&#xff0c;可以让用户在同构或异构 RDBMS 与OceanBase 数据库之间进行数据交互&#xff0c;支持数据的在线迁移&#xff0c;以及实时增量同步的复制功能。 OMS 提供了可视化的集中管控平台&#xff…

IDEA一键部署SpringBoot项目到服务器

安装Alibaba Cloud Toolkit插件 配置部署环境 1&#xff1a;设置服务名称 2&#xff1a;选择文件上传的类型 3:选择打包之后的jar文件 4: 添加需要上传的服务器信息 5:需要上传到服务器的地址 输入绝对路径 6: 选择上传文件后执行的脚本 可以参考另一篇文章 Linux启…

渗透测试之信息收集 DNS主机发现探测方式NetBIOS 协议发现主机 以及相关PorCheck scanline工具的使用哟

目录 主机发现 利用NetBIOS 协议发现主机 利用TCP/UDP发现主机 PorCheck scanline 利用DNS协议发现主机 主机发现 信息收集中的一项重要工作是发现内网中的主机、数据库、IP段网络设备、安全设备等资产&#xff0c;以便于更快地获取更多权限和密码&#xff0c;更加接近红…

打造专业问答社区:Windows部署Apache Answer结合cpolar实现公网访问

文章目录 前言1. 本地安装Docker2. 本地部署Apache Answer2.1 设置语言选择简体中文2.2 配置数据库2.3 创建配置文件2.4 填写基本信息 3. 如何使用Apache Answer3.1 后台管理3.2 提问与回答3.3 查看主页回答情况 4. 公网远程访问本地 Apache Answer4.1 内网穿透工具安装4.2 创建…

基于java的医院门诊信息管理系统

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 数据…

67页PDF |埃森哲_XX集团信息发展规划IT治理优化方案(限免下载)

一、前言 这份报告是埃森哲_XX集团信息发展规划IT治理优化方案&#xff0c;报告中详细阐述了XX集团如何优化IT治理结构以适应新的要求。报告还分析了集团管控模式的变化&#xff0c;提出了六大业务中心的差异化管控策略&#xff0c;并探讨了这些变化对IT治理模式的影响。报告进…

【AI大模型】ELMo模型介绍:深度理解语言模型的嵌入艺术

学习目标 了解什么是ELMo.掌握ELMo的架构.掌握ELMo的预训练任务.了解ELMo的效果和成绩.了解ELMo的优缺点. 目录 &#x1f354; ELMo简介 &#x1f354; ELMo的架构 2.1 总体架构 2.2 Embedding模块 2.3 两部分的双层LSTM模块 2.4 词向量表征模块 &#x1f354; ELMo的预…

【srm,招标询价】采购电子化全流程,供应商准入审核,在线询价流程管理(JAVA+Vue+mysql)

前言&#xff1a; 随着互联网和数字技术的不断发展&#xff0c;企业采购管理逐渐走向数字化和智能化。数字化采购平台作为企业采购管理的新模式&#xff0c;能够提高采购效率、降低采购成本、优化供应商合作效率&#xff0c;已成为企业实现效益提升的关键手段。系统获取在文末…

优选算法 - 4 ( 链表 哈希表 字符串 9000 字详解 )

一&#xff1a;链表 1.1 链表常用技巧和操作总结 1.2 两数相加 题目链接&#xff1a;两数相加 /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode() {}* ListNode(int val) { this.val val; }* …

小鸡模拟器 1.8.11 | 街机怀旧重温经典游戏,支持手柄

小鸡模拟器是一款支持多种经典游戏机模拟的游戏应用&#xff0c;包括街机、索尼(SONY)、世嘉、任天堂等主流掌机游戏以及PSP、GBA、NDS、SFC(超级任天堂SNES)、FC(红白机NES)、MD(世嘉MEGA DRIVE)、PS1、PS2等。应用支持手柄完美操作&#xff0c;兼容安卓手柄&#xff0c;让玩家…

用go语言后端开发速查

文章目录 一、发送请求和接收请求示例1.1 发送请求1.2 接收请求 二、发送form-data格式的数据示例 用go语言发送请求和接收请求的快速参考 一、发送请求和接收请求示例 1.1 发送请求 package mainimport ("bytes""encoding/json""fmt""ne…

针对股票评论的情感分类器

&#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;编程探索专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2024年11月16日13点39分 神秘男子影, 秘而不宣藏。 泣意深不见, 男子自持重, 子夜独自沉。 论文链接 点击开启你的论文编程之旅…

Transformer学习笔记(一)

Transformer学习笔记 基于 3B1B 可视化视频 自注意力机制 1.每个词的初始嵌入是一个高维向量&#xff0c;只编码该单词含义&#xff0c;与上下文没有关联 2.对初始向量进行位置编码&#xff0c;在高维向量中编码进位置信息&#xff08;单词在语言序列中的位置信息&#xff…

antd table表格设置最小宽度,列宽等比例显示

最近ui有个设计稿&#xff0c;表格要求如图&#xff1a; 由于本地antd table列宽column没有设置最小宽度minWidth属性&#xff0c;只有width属性&#xff0c;所以开发时我考虑按照列宽等比例计算后去设置width属性&#xff1b; 一、实现&#xff1a; 1.表头数组中设置minWidth…

算法沉淀一:双指针

目录 前言&#xff1a; 双指针介绍 对撞指针 快慢指针 题目练习 1.移动零 2.复写零 3.快乐数 4.盛水最多的容器 5.有效三角形的个数 6.和为s的两个数 7.三数之和 8.四数之和 前言&#xff1a; 此章节介绍一些算法&#xff0c;主要从leetcode上的题来讲解&#xff…

若点集A=B则A必能恒等变换地变为B=A这一几何常识推翻直线(平面)公理

黄小宁 关键词&#xff1a;“更无理”复数 复平面z各点z的对应点z1的全体是z1面。z面平移变为z1面就使x轴⊂z面沿本身平移变为ux1轴。R可几何化为R轴&#xff0c;R轴可沿本身平移变为R′轴&#xff0c;R′轴可沿本身平移变为R″轴&#xff0c;...。直线公理和平面公理使几百年…

HelloMeme 上手即用教程

HelloMeme是一个集成空间编织注意力的扩散模型&#xff0c;用于生成高保真图像和视频。它提供了一个代码库&#xff0c;包含实验代码和预训练模型&#xff0c;支持PyTorch和FFmpeg。用户可以通过简单的命令行操作来生成图像和视频。 本文将详细介绍&#xff0c;如何在GPU算力租…

Vue2+ElementUI:用计算属性实现搜索框功能

前言&#xff1a; 本文代码使用vue2element UI。 输入框搜索的功能&#xff0c;可以在前端通过计算属性过滤实现&#xff0c;也可以调用后端写好的接口。本文介绍的是通过计算属性对表格数据实时过滤&#xff0c;后附完整代码&#xff0c;代码中提供的是死数据&#xff0c;可…

blind-watermark - 水印绑定

文章目录 一、关于 blind-watermark安装 二、bash 中使用三、Python 调用1、基本使用2、attacks on Watermarked Image3、embed images4、embed array of bits 四、并发五、相关 Project 一、关于 blind-watermark Blind watermark 基于 DWT-DCT-SVD. github : https://githu…