如何查看Kafka的偏移量offset

本文介绍三种方法查看Kafka的偏移量offset。

1. API:ConsumerRecord的offset()方法查看offset。

2. API:KafkaConsumer的position(TopicPartition partition)方法查看offset。

3. 命令行:kafka-consumer-groups.sh命令查看offset。

前提条件

Kafka安装及基本操作,可参考:Kafka安装及基本操作

Kafka API操作,可参考:Kafka API操作

三种方法查看Kafka的偏移量offset

1. API:ConsumerRecord的offset()方法查看offset。

生产者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;
import java.util.concurrent.Future;public class MyProducer {public static void main(String[] args) {// 1.创建kafka生产者对象Properties prop = new Properties();prop.put("bootstrap.servers","node1:9092");prop.put("acks","all");prop.put("retries","0");// 16k一个批量prop.put("batch.size", 16384);prop.put("linger.ms",5);prop.put("buffer.memory", 33554432);prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<Object, Object> producer = new KafkaProducer<>(prop);// 2.使用send方法生产数据for (int i = 0; i < 10; i++) {
//            producer.send(new ProducerRecord<>("Hello-Kafka", Integer.toString(i), Integer.toString(i)));producer.send(new ProducerRecord<>("bigdata12", Integer.toString(i), Integer.toString(i)));}// 3.关闭生产者producer.close();}
}

消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class MyConsumer {public static void main(String[] args) {//1.创建消费者对象Properties prop = new Properties();prop.put("bootstrap.servers","node1:9092");prop.put("group.id","test");prop.put("enable.auto.commit","true");prop.put("auto.commit.interval.ms","1000");prop.put("session.timeout.ms","30000");prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//注意不是StringSerializerprop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(prop);//2.消费者订阅主题consumer.subscribe(Arrays.asList("bigdata12"));// 将数组转为List集合//3.使用poll方法消费数据while (true){
//            ConsumerRecords<Object,Object> records = consumer.poll(Duration.ofSeconds(5));ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));for (ConsumerRecord<Object, Object> record : records) {System.out.printf("offset=%d, key=%s, value=%s\n",record.offset(),record.key(),record.value());}}}
}

测试:

IDEA中,运行消费者,再运行生产者。提示:没有topic,将自动创建。

返回IDEA的消费者控制台,输出类似如下数据

...

offset=30, key=8, value=8
offset=31, key=9, value=9

这里显示的是最后一条数据的offset=31。

2. API:KafkaConsumer的position(TopicPartition partition)方法查看offset。

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.util.Arrays;
import java.util.Properties;public class KafkaOffsetViewer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "node1:9092");props.put("group.id", "test");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);String topic = "bigdata12";TopicPartition partition = new TopicPartition(topic, 0);try {consumer.assign(Arrays.asList(partition));consumer.seekToEnd(Arrays.asList(partition));long offset = consumer.position(partition);System.out.println("Offset of partition 0 is: " + offset);} finally {consumer.close();}}
}

IDEA运行结果:

Offset of partition 0 is: 32

看到offset为32,是最新的offset值,也就是下一条数据从32开始。

3. 命令行:kafka-consumer-groups.sh命令查看offset。

在命令行中运行以下命令:

kafka-consumer-groups.sh --bootstrap-server <kafka-broker-list> --describe --group <consumer-group-id>

例如:

[hadoop@node1 ~]$ kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group test
​
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                          HOST            CLIENT-ID
test            bigdata12       0          32              32              0               consumer-test-1-64d17e50-69e9-47e3-9380-f2441a09cae2 /117.189.125.24 consumer-test-1
​

看到offset为32,是最新的offset值。

感兴趣可以再使用生产者发送数据测试,看到三种查看offset方法,offset值的变化情况。

总结

1. API:ConsumerRecord的offset()方法查看offset,查看到最后一条数据的offset,最新offset=最后一条数据offset+1。

2. API:KafkaConsumer的position(TopicPartition partition)方法查看offset,查到最新offset。

3. 命令行:kafka-consumer-groups.sh命令查看offset,查到最新offset。

完成! enjoy it!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/49488.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前置-Linux相关知识速记

linux Linux命令大全 [!IMPORTANT] chown-chmod-ls-chgrp-cdpwd-mkdir-rmdir-cp-rm-mv-cat-tac-nl-more-less-head-tail 应用领域 通常服务器使用 LAMP&#xff08;Linux Apache MySQL PHP&#xff09;或 LNMP&#xff08;Linux Nginx MySQL PHP&#xff09;组合。 目前…

STM32 BootLoader 刷新项目 (五) 获取软件版本号-命令0x51

STM32 BootLoader 刷新项目 (五) 获取软件版本号-命令0x51 下面我们来讲解第一个指令&#xff0c;获取软件版本号命令-0x51. 在BootLoader中获取软件版本号的操作有多个重要的作用&#xff0c;具体如下&#xff1a; 版本管理&#xff1a; 识别当前版本&#xff1a;通过获取软…

无人机上磁航技术详解

磁航技术&#xff0c;也被称为地磁导航&#xff0c;是一种利用地球磁场信息来实现导航的技术。在无人机领域&#xff0c;磁航技术主要用于辅助惯性导航系统&#xff08;INS&#xff09;进行航向角的测量与校正&#xff0c;提高无人机的飞行稳定性和准确性。其技术原理是&#x…

vue3 + antd vue 纯前端 基于xlsx 实现导入excel 转 json,将json数据转换XLSX导出(模版下载)

一、导入 0、关键代码 // 安装插件 npm i xlsx/yarn add xlsx // 导入xlsx import * as XLSX from xlsx; 点击提交的时候才整理数据。上传的时候文件保存在 state.form.file[0] 中的 // 定义字段映射关系 const fieldMap {sheet2json: {技能名称: skill_name,技能等级: …

微服务实战系列之玩转Docker(六)

前言 刚进入大暑&#xff0c;“清凉不肯来&#xff0c;烈日不肯暮”&#xff0c;空调开到晚&#xff0c;还是满身汗。——碎碎念 我们知道&#xff0c;仓库可见于不同领域&#xff0c;比如粮食仓库、数据仓库。在容器领域&#xff0c;自然也有镜像仓库&#xff08;registry&…

代码随想录——零钱兑换Ⅱ(Leetcode518)

题目链接 完全背包 class Solution {public int change(int amount, int[] coins) {int[] dp new int[amount 1];dp[0] 1;for(int i 0; i < coins.length; i){for(int j coins[i]; j < amount; j){dp[j] dp[j - coins[i]];}}return dp[amount];} }本题为组合问题…

uni-app AppStore Connect上传拒绝汇总

1.Guideline 2.3.3 - Performance - Accurate Metadata 问题是图片不对&#xff0c;最好是自己截图&#xff0c;然后用香蕉云编 上传图片合成图片 2.Guideline 5.1.2 - Legal - Privacy - Data Use and Sharing 解决办法&#xff1a;在uniapp manifest.json找到 APP常用其他…

【软考】系统集成项目管理工程师【第二版】

&#x1f44a;重要通知&#x1f44a; &#x1f44a; 1. 2024年中考上半年取消&#xff0c;改下半年&#x1f44a; &#x1f44a; 2. 2024下半年 使用《系统集成项目管理工程师教程》第三版&#x1f44a; &#x1f44a; 3. 为了方便大家学习&#xff0c;博主正在整理第三版 &am…

使用uniapp开发小程序(基础篇)

本文章只介绍微信小程序的开发流程&#xff0c;如果需要了解其他平台的开发的流程的话&#xff0c;后续根据情况更新相应的文章,也可以根据uniapp官网的链接了解不同平台的开发流程 HBuilderX使用&#xff1a;https://uniapp.dcloud.net.cn/quickstart-hx.html 开发工具 开始…

# OpenCV 图像预处理—形态学:膨胀、腐蚀、开运算、闭运算 原理详解

文章目录 形态学概念膨胀使用膨胀操作来修复裂痕示例代码关键解析&#xff1a; 腐蚀使用腐蚀操作消除噪点示例代码&#xff1a; 开运算—先腐蚀后膨胀闭运算—先膨胀后腐蚀 形态学概念 首先看这两张图片 一张图周围有大大小小的噪音和彩点&#xff0c;另一张图片中字母有间隙&…

php连接sql server

php连接sqlserver有三种方式 一&#xff1a;odbc连接&#xff0c;废话不多说直接上代码,封装了一个单例 <?php /*** odbcServer.php* Author: Erekys*/namespace App\Model; class odbcServer{public static $server;public static $username;public static $password;pu…

基于jeecgboot-vue3的Flowable流程仿钉钉流程设计器-发送信息服务处理

因为这个项目license问题无法开源&#xff0c;更多技术支持与服务请加入我的知识星球。 1、因为仿钉钉设计器里发送消息处理是一个服务任务&#xff0c;所以要根据这个服务任务进行处理 2、这里目前只对消息进行处理&#xff0c;就是用websocket的发送方式 输入相应的内容&…

go语言Gin框架的学习路线(十)

目录 GORM的CRUD教程 查询 普通查询 定义 User 结构体 查询所有用户 查询第一个用户 总结 条件查询 内联条件 额外查询选项 高级查询 链式操作 Scopes 多个立即执行方法 GORM的CRUD教程 CRUD 是 "Create, Read, Update, Delete"&#xff08;创建、查询…

AIoTedge边缘物联网平台,开启智能物联新架构

边缘物联网平台是一种将计算能力、数据处理和应用服务部署在网络边缘的解决方案&#xff0c;旨在提高响应速度、降低带宽需求和增强数据安全。根据搜索结果&#xff0c;边缘物联网平台应具备以下功能&#xff1a; 云边协同&#xff1a; 云边一体架构&#xff0c;通过云端管理边…

【BUG】已解决:Downgrade the protobuf package to 3.20.x or lower.

Downgrade the protobuf package to 3.20.x or lower. 目录 Downgrade the protobuf package to 3.20.x or lower. 【常见模块错误】 【解决方案】 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&#xff0c;我是博主英杰&#xff0c;211科班出身…

【昇腾AI创新大赛集训营南京站学习笔记】-Ascend算子开发课程

昇腾AI创新大赛训练营 14:00-14:30 基础知识-理论课 一、CANN 、达芬奇架构和算子 1.AI Core逻辑架构 达芬奇架构包含三部分&#xff1a; 1&#xff09;计算类&#xff1a;矩阵计算单元&#xff08;两个矩阵扔进去相乘&#xff09;、向量计算单元、标量计算单元 2&#xff09;控…

基于单片机控制的变压器油压油温故障检测

摘 要 在电力系统的运行中&#xff0c;通过对其核心设备变压器的故障进行检测&#xff0c;以此能够及时、准确的发现变压器的故障&#xff0c;基于单片机控制的变压器油压油温的故障检测的方法&#xff0c;利用压力传感器、温度传感器对变压器的油压、油温进行采集并送入单片机…

vCenter 错误提示 “目标主机上的vmotion接口未配置”

vCenter 错误提示 “目标主机上的vmotion接口未配置” VMware 使用 vCenter 迁移 虚拟机报错 “目标主机上的 vMotion 接口未配置”&#xff0c;配置启用 vMotion 的步骤如下&#xff1a; &#xff08;END&#xff09;

leetcode3098. 求出所有子序列的能量和

官解 class Solution(object):# 定义常量mod int(1e9 7) # 模数&#xff0c;用于防止结果溢出inf float(inf) # 无穷大&#xff0c;用于初始化时的特殊值def sumOfPowers(self, nums, k):n len(nums) # 数组长度res 0 # 用于存储最终结果# 三维动态规划表&#xff0c;…

Nacos-2.4.0最新版本docker镜像,本人亲自制作,部署十分方便,兼容postgresql最新版本17和16,奉献给大家了

基于Postgresql数据库存储的nacos最新版本2.4.0,采用docker镜像安装方式 因业务需要,为了让nacos支持postgresql,特意花了两天时间修改了源码,然后制作了docker镜像,如果你也在找支持postgresql的nacos最新版本,恭喜你,你来的正好~ nacos-2.4.0 postgresql的数据库脚本…