如何查看Kafka的偏移量offset

本文介绍三种方法查看Kafka的偏移量offset。

1. API:ConsumerRecord的offset()方法查看offset。

2. API:KafkaConsumer的position(TopicPartition partition)方法查看offset。

3. 命令行:kafka-consumer-groups.sh命令查看offset。

前提条件

Kafka安装及基本操作,可参考:Kafka安装及基本操作

Kafka API操作,可参考:Kafka API操作

三种方法查看Kafka的偏移量offset

1. API:ConsumerRecord的offset()方法查看offset。

生产者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;
import java.util.concurrent.Future;public class MyProducer {public static void main(String[] args) {// 1.创建kafka生产者对象Properties prop = new Properties();prop.put("bootstrap.servers","node1:9092");prop.put("acks","all");prop.put("retries","0");// 16k一个批量prop.put("batch.size", 16384);prop.put("linger.ms",5);prop.put("buffer.memory", 33554432);prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<Object, Object> producer = new KafkaProducer<>(prop);// 2.使用send方法生产数据for (int i = 0; i < 10; i++) {
//            producer.send(new ProducerRecord<>("Hello-Kafka", Integer.toString(i), Integer.toString(i)));producer.send(new ProducerRecord<>("bigdata12", Integer.toString(i), Integer.toString(i)));}// 3.关闭生产者producer.close();}
}

消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class MyConsumer {public static void main(String[] args) {//1.创建消费者对象Properties prop = new Properties();prop.put("bootstrap.servers","node1:9092");prop.put("group.id","test");prop.put("enable.auto.commit","true");prop.put("auto.commit.interval.ms","1000");prop.put("session.timeout.ms","30000");prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//注意不是StringSerializerprop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(prop);//2.消费者订阅主题consumer.subscribe(Arrays.asList("bigdata12"));// 将数组转为List集合//3.使用poll方法消费数据while (true){
//            ConsumerRecords<Object,Object> records = consumer.poll(Duration.ofSeconds(5));ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));for (ConsumerRecord<Object, Object> record : records) {System.out.printf("offset=%d, key=%s, value=%s\n",record.offset(),record.key(),record.value());}}}
}

测试:

IDEA中,运行消费者,再运行生产者。提示:没有topic,将自动创建。

返回IDEA的消费者控制台,输出类似如下数据

...

offset=30, key=8, value=8
offset=31, key=9, value=9

这里显示的是最后一条数据的offset=31。

2. API:KafkaConsumer的position(TopicPartition partition)方法查看offset。

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.util.Arrays;
import java.util.Properties;public class KafkaOffsetViewer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "node1:9092");props.put("group.id", "test");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);String topic = "bigdata12";TopicPartition partition = new TopicPartition(topic, 0);try {consumer.assign(Arrays.asList(partition));consumer.seekToEnd(Arrays.asList(partition));long offset = consumer.position(partition);System.out.println("Offset of partition 0 is: " + offset);} finally {consumer.close();}}
}

IDEA运行结果:

Offset of partition 0 is: 32

看到offset为32,是最新的offset值,也就是下一条数据从32开始。

3. 命令行:kafka-consumer-groups.sh命令查看offset。

在命令行中运行以下命令:

kafka-consumer-groups.sh --bootstrap-server <kafka-broker-list> --describe --group <consumer-group-id>

例如:

[hadoop@node1 ~]$ kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group test
​
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                          HOST            CLIENT-ID
test            bigdata12       0          32              32              0               consumer-test-1-64d17e50-69e9-47e3-9380-f2441a09cae2 /117.189.125.24 consumer-test-1
​

看到offset为32,是最新的offset值。

感兴趣可以再使用生产者发送数据测试,看到三种查看offset方法,offset值的变化情况。

总结

1. API:ConsumerRecord的offset()方法查看offset,查看到最后一条数据的offset,最新offset=最后一条数据offset+1。

2. API:KafkaConsumer的position(TopicPartition partition)方法查看offset,查到最新offset。

3. 命令行:kafka-consumer-groups.sh命令查看offset,查到最新offset。

完成! enjoy it!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/49488.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pglogical扩展的基本用法介绍

瀚高数据库 目录 环境 文档用途 详细信息 环境 系统平台&#xff1a;Linux x86-64 Red Hat Enterprise Linux 7 版本&#xff1a;14 文档用途 本文翻译了pglogical扩展的官方文档,介绍了pglogical扩展的各类管理函数及使用限制,详情请看下文. 一、节点管理 节点可以使用以下…

前置-Linux相关知识速记

linux Linux命令大全 [!IMPORTANT] chown-chmod-ls-chgrp-cdpwd-mkdir-rmdir-cp-rm-mv-cat-tac-nl-more-less-head-tail 应用领域 通常服务器使用 LAMP&#xff08;Linux Apache MySQL PHP&#xff09;或 LNMP&#xff08;Linux Nginx MySQL PHP&#xff09;组合。 目前…

STM32 BootLoader 刷新项目 (五) 获取软件版本号-命令0x51

STM32 BootLoader 刷新项目 (五) 获取软件版本号-命令0x51 下面我们来讲解第一个指令&#xff0c;获取软件版本号命令-0x51. 在BootLoader中获取软件版本号的操作有多个重要的作用&#xff0c;具体如下&#xff1a; 版本管理&#xff1a; 识别当前版本&#xff1a;通过获取软…

无人机上磁航技术详解

磁航技术&#xff0c;也被称为地磁导航&#xff0c;是一种利用地球磁场信息来实现导航的技术。在无人机领域&#xff0c;磁航技术主要用于辅助惯性导航系统&#xff08;INS&#xff09;进行航向角的测量与校正&#xff0c;提高无人机的飞行稳定性和准确性。其技术原理是&#x…

vue3 + antd vue 纯前端 基于xlsx 实现导入excel 转 json,将json数据转换XLSX导出(模版下载)

一、导入 0、关键代码 // 安装插件 npm i xlsx/yarn add xlsx // 导入xlsx import * as XLSX from xlsx; 点击提交的时候才整理数据。上传的时候文件保存在 state.form.file[0] 中的 // 定义字段映射关系 const fieldMap {sheet2json: {技能名称: skill_name,技能等级: …

qt事件类型列表

t提供了一系列丰富的事件类型&#xff0c;这些事件允许应用程序响应各种用户输入、系统通知以及其他类型的交互。以下是一些常见的Qt事件类型及其用途概述&#xff1a; QEvent::None (0): 无事件&#xff0c;用于初始化或作为默认值。 QEvent::Timer (1): 定时器事件&#xff…

(三)C语言的变量与常量

一.C语言中标识符(变量)的命名规则 (1)可以由数字、字母、下划线_组成 (2)不能以数字开头 (3)不能是关键字 (4)区分大小写 二.在C语言中定义常量的方法及其区别 1.使用#define宏定义&#xff1a;#define LENGTH 10 2.使用const关键字&#xff1a;const int LENGTH 10 或 i…

微服务实战系列之玩转Docker(六)

前言 刚进入大暑&#xff0c;“清凉不肯来&#xff0c;烈日不肯暮”&#xff0c;空调开到晚&#xff0c;还是满身汗。——碎碎念 我们知道&#xff0c;仓库可见于不同领域&#xff0c;比如粮食仓库、数据仓库。在容器领域&#xff0c;自然也有镜像仓库&#xff08;registry&…

代码随想录——零钱兑换Ⅱ(Leetcode518)

题目链接 完全背包 class Solution {public int change(int amount, int[] coins) {int[] dp new int[amount 1];dp[0] 1;for(int i 0; i < coins.length; i){for(int j coins[i]; j < amount; j){dp[j] dp[j - coins[i]];}}return dp[amount];} }本题为组合问题…

实时系统中的目标检测:速度与精度的较量

实时系统中的目标检测&#xff1a;速度与精度的较量 目标检测算法是计算机视觉领域的基石之一&#xff0c;广泛应用于实时系统中&#xff0c;如视频监控、自动驾驶汽车、工业自动化等。然而&#xff0c;将目标检测算法应用于实时系统并非易事&#xff0c;它带来了一系列技术挑…

uni-app AppStore Connect上传拒绝汇总

1.Guideline 2.3.3 - Performance - Accurate Metadata 问题是图片不对&#xff0c;最好是自己截图&#xff0c;然后用香蕉云编 上传图片合成图片 2.Guideline 5.1.2 - Legal - Privacy - Data Use and Sharing 解决办法&#xff1a;在uniapp manifest.json找到 APP常用其他…

【软考】系统集成项目管理工程师【第二版】

&#x1f44a;重要通知&#x1f44a; &#x1f44a; 1. 2024年中考上半年取消&#xff0c;改下半年&#x1f44a; &#x1f44a; 2. 2024下半年 使用《系统集成项目管理工程师教程》第三版&#x1f44a; &#x1f44a; 3. 为了方便大家学习&#xff0c;博主正在整理第三版 &am…

使用uniapp开发小程序(基础篇)

本文章只介绍微信小程序的开发流程&#xff0c;如果需要了解其他平台的开发的流程的话&#xff0c;后续根据情况更新相应的文章,也可以根据uniapp官网的链接了解不同平台的开发流程 HBuilderX使用&#xff1a;https://uniapp.dcloud.net.cn/quickstart-hx.html 开发工具 开始…

# OpenCV 图像预处理—形态学:膨胀、腐蚀、开运算、闭运算 原理详解

文章目录 形态学概念膨胀使用膨胀操作来修复裂痕示例代码关键解析&#xff1a; 腐蚀使用腐蚀操作消除噪点示例代码&#xff1a; 开运算—先腐蚀后膨胀闭运算—先膨胀后腐蚀 形态学概念 首先看这两张图片 一张图周围有大大小小的噪音和彩点&#xff0c;另一张图片中字母有间隙&…

Android 适配:版本适配

文章目录 自定义ViewGroup添加View时对View对应的LayoutParams的处理操作 自定义ViewGroup添加View时对View对应的LayoutParams的处理操作 Android 7只需要重写ViewGroup的generateLayoutParams方法&#xff0c;创建需要的LayoutParmas Overridepublic LayoutParams generateL…

php连接sql server

php连接sqlserver有三种方式 一&#xff1a;odbc连接&#xff0c;废话不多说直接上代码,封装了一个单例 <?php /*** odbcServer.php* Author: Erekys*/namespace App\Model; class odbcServer{public static $server;public static $username;public static $password;pu…

【2024最新】汇总国内免费GPT接口(响应快)

一、什么是GPT接口&#xff1f; GPT接口通常指的是一种编程接口&#xff08;API&#xff09;&#xff0c;它允许开发者将生成预训练转换器&#xff08;Generative Pre-trained Transformer&#xff0c;简称GPT&#xff09;的能力集成到自己的应用程序中。GPT是一种人工智能模型…

基于jeecgboot-vue3的Flowable流程仿钉钉流程设计器-发送信息服务处理

因为这个项目license问题无法开源&#xff0c;更多技术支持与服务请加入我的知识星球。 1、因为仿钉钉设计器里发送消息处理是一个服务任务&#xff0c;所以要根据这个服务任务进行处理 2、这里目前只对消息进行处理&#xff0c;就是用websocket的发送方式 输入相应的内容&…

go语言Gin框架的学习路线(十)

目录 GORM的CRUD教程 查询 普通查询 定义 User 结构体 查询所有用户 查询第一个用户 总结 条件查询 内联条件 额外查询选项 高级查询 链式操作 Scopes 多个立即执行方法 GORM的CRUD教程 CRUD 是 "Create, Read, Update, Delete"&#xff08;创建、查询…

电影类平台如何选择服务器

电影类平台如何选择服务器 1、数据存储 电影网站对服务器的要求是比较高的&#xff0c;对存储空间的需求特别大&#xff0c;所以在服务器选择上首先要确保足够大的存储空间。另外&#xff0c;当你的网站内容特别多时&#xff0c;内存不够用&#xff0c;可以选择增加内存&#x…