java kafka 拉取_java获取kafka consumer lag

maven依赖

org.apache.kafka

kafka-clients

0.10.1.0

注意:kafka-clients版本需要0.10.1.0以上,因为调用了新增接口endOffsets;

lag=logsize-offset

logsize通过consumer的endOffsets接口获得;offset通过consumer的committed接口获得;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;

import org.apache.kafka.common.PartitionInfo;

import org.apache.kafka.common.TopicPartition;

public class KafkaConsumeLagMonitor {

public static Properties getConsumeProperties(String groupID, String bootstrap_server) {

Properties props = new Properties();

props.put("group.id", groupID);

props.put("bootstrap.servers", bootstrap_server);

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

return props;

}

public static void main(String[] args) {

String bootstrap_server = args[0];

String groupID = args[1];

String topic = args[2];

Map endOffsetMap = new HashMap();

Map commitOffsetMap = new HashMap();

Properties consumeProps = getConsumeProperties(groupID, bootstrap_server);

System.out.println("consumer properties:" + consumeProps);

//查询topic partitions

KafkaConsumer consumer = new KafkaConsumer(consumeProps);

List topicPartitions = new ArrayList();

List partitionsFor = consumer.partitionsFor(topic);

for (PartitionInfo partitionInfo : partitionsFor) {

TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());

topicPartitions.add(topicPartition);

}

//查询log size

Map endOffsets = consumer.endOffsets(topicPartitions);

for (TopicPartition partitionInfo : endOffsets.keySet()) {

endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));

}

for (Integer partitionId : endOffsetMap.keySet()) {

System.out.println(String.format("at %s, topic:%s, partition:%s, logSize:%s", System.currentTimeMillis(), topic, partitionId, endOffsetMap.get(partitionId)));

}

//查询消费offset

for (TopicPartition topicAndPartition : topicPartitions) {

OffsetAndMetadata committed = consumer.committed(topicAndPartition);

commitOffsetMap.put(topicAndPartition.partition(), committed.offset());

}

//累加lag

long lagSum = 0l;

if (endOffsetMap.size() == commitOffsetMap.size()) {

for (Integer partition : endOffsetMap.keySet()) {

long endOffSet = endOffsetMap.get(partition);

long commitOffSet = commitOffsetMap.get(partition);

long diffOffset = endOffSet - commitOffSet;

lagSum += diffOffset;

System.out.println("Topic:" + topic + ", groupID:" + groupID + ", partition:" + partition + ", endOffset:" + endOffSet + ", commitOffset:" + commitOffSet + ", diffOffset:" + diffOffset);

}

System.out.println("Topic:" + topic + ", groupID:" + groupID + ", LAG:" + lagSum);

} else {

System.out.println("this topic partitions lost");

}

consumer.close();

}

}

另外一个思路可参考kafka源码kafka.tools.ConsumerOffsetChecker实现,offset直接读取 zk节点内容,logsize通过consumer的getOffsetsBefore方法获取,整体来说,较麻烦;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/529682.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java开源图像处理ku_83 项开源视觉 SLAM 方案够你用了吗?

原标题:83 项开源视觉 SLAM 方案够你用了吗?公众号:3D视觉工坊主要关注:3D视觉算法、SLAM、vSLAM、计算机视觉、深度学习、自动驾驶、图像处理以及技术干货分享运营者和嘉宾介绍:运营者来自国内一线大厂的算法工程师&a…

java 方法的拆分_java – 字符串拆分和比较 – 最快的方法

>将输入读入byte []数组以将指针保持在代码的一侧.>逐字节读取,计算整数元素&#xff1a;int b inputBytes[p];int d b - 0;if (0 < d) {if (d < 9) {element element * 10 d;} else {// b :}} else {// b ,// add element to the hash; element 0;...}if (…

java sql异常_java.sql.SQLException: Io 异常: Got minus one from a

java.sql.SQLException: Io 异常: Got minus one from a read callat oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:111)at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:145)at oracle.jdbc.driver.DatabaseError.thro…

java 拦截器ajax_(转)拦截器深入实践 - JAVA XML JAVASCRIPT AJAX CSS - BlogJava

Interceptor的定义我们来看一下Interceptor的接口的定义&#xff1a;Java代码 publicinterfaceInterceptorextendsSerializable {/*** Called to let an interceptor clean up any resources it has allocated.*/voiddestroy();/*** Called after an interceptor is created, b…

php学的是什么意思_为什么要学习PHP?到底什么是PHP?

为什么要学习PHP?到底什么是PHP?PHP可以做什么?相信这样的问题困扰着很多的人&#xff0c;在我没工作之前&#xff0c;都没有听说过PHP&#xff0c;自从工作后&#xff0c;慢慢接触到代码&#xff0c;慢慢知道什么是PHP。PHP是做网站一种语言&#xff0c;很多工程师都使用PH…

php 多数据库联合查询,php如何同时连接多个数据库_PHP教程

下面是一个函数能够保证连接多个数据库的下不同的表的函数&#xff0c;可以收藏一下&#xff0c;比较实用&#xff0c;测试过是有用的。function mysql_oper($oper,$db,$table,$where1,$limit10){$connmysql_connect(localhost,like,admin,true) or mysql_error();mysql_select…

java判断有没有修改,java字节码判断对象应用是否被修改

原创1 背景在学习并发的时候看到了ConcurrentLinkedQueue队列的源码&#xff0c;刚开始的时候是看网上的帖子&#xff0c;然后就到IDE里边看源码&#xff0c;发现offer()方法在1.7版的时候有过修改。楼主的问题不是整个方法&#xff0c;而是其中的一截代码“(t ! (t tail))”&…

php接口 含义,php晋级必备:一文读懂php接口特点和使用!

PHP接口与类是什么关系&#xff1f;前面提到了php中抽象类和抽象方法&#xff0c;今天给大家谈谈php中接口技术。在PHP中每个类只能继承一个父类&#xff0c;如果声明的新类继承了抽象类实现了以后&#xff0c;这个新类就不能有其它的父类了。但是在实际中需要继承多个类实现功…

php获取不重复的随机数字,php如何生成不重复的随机数字

【摘要】PHP即“超文本预处理器”&#xff0c;是一种通用开源脚本语言。PHP是在服务器端执行的脚本语言&#xff0c;与C语言类似&#xff0c;是常用的网站编程语言。PHP独特的语法混合了C、Java、Perl以及 PHP 自创的语法。下面是php如何生成不重复的随机数字&#xff0c;让我们…

java 素数乘积,求助2424379123 = 两个素数的乘积,求这两个素数?

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼import java.util.ArrayList;import java.util.Date;public class Test {static ArrayList list new ArrayList();/*** 初始化素数表* return*/public static ArrayList initArrayList() {list.add(2);list.add(3);list.add(5);li…

php header什么意思,php header是什么意思

header函数在PHP中是发送一些头部信息的, 我们可以直接使用它来做301跳转等&#xff0c;下面我来总结关于header函数用法与一些常用见问题解决方法。发送一个原始 HTTP 标头[Http Header]到客户端。标头 (header) 是服务器以 HTTP 协义传 HTML 资料到浏览器前所送出的字串&…

matlab dct稀疏系数,Matlab DCT详解

转自&#xff1a;http://blog.csdn.net/ahafg/article/details/48808443DCT变换DCT又称离散余弦变换&#xff0c;是一种块变换方式&#xff0c;只使用余弦函数来表达信号&#xff0c;与傅里叶变换紧密相关。常用于图像数据的压缩&#xff0c;通过将图像分成大小相等(一般为8*8)…

matlab验潮站,验潮站的作用是什么

验潮站的作用是什么?验潮站的建成投入使用&#xff0c;可实时观测沿海潮汐等观测要素&#xff0c;为潮汐预报、赤潮的发生、风暴潮预警报、海啸预警及海平面变化提供基础数据保障以及预测&#xff0c;同时为科学开发海洋提供有力的支持&#xff0c;为海洋经济健康发展保驾护航…

答题闯关php,成语答题闯关红包流量主小程序源码

修复红包页面提现提示文字得叠的问题限制过关红包每天领取个数左侧影响美观的小程序链接的文字去掉了增加版本号没有问题的可以暂不更新此版本修复前一版本客服提现没有授权的问题管理后台增加主动推送客服消息(红包)给用户的功能&#xff0c;唤醒用户使用自定义分享的配置增加…

php是音频吗,只要是用PHP和JS发布的HTML5是否可以播放音频?

我正在尝试创建一个可以上传播客的页面。我想拥有“发布”或“取消发布”的能力。我让每个播客添加到一个数据库中,包含它的信息和发布列,可以是真是假。目前我使用以下代码PHP:if(isPublished()){header(Cache-Control: max-age100000);header(Content-Transfer-Encoding: bin…

php收购,php中文网收购全国用户量最大的phpstudy集成开发环境揭秘

phpstudy介绍2008年第一个版本诞生&#xff0c;至今已有&#xff19;年,该程序包集成最新的ApachePHPMySQLphpMyAdminZendOptimizer,一次性安装,无须配置即可使用,是非常方便、好用的PHP调试环境.该程序不仅包括PHP调试环境,还包括了开发工具、开发手册等.总之学习PHP只需一个包…

复杂电网三相短路计算的matlab仿真,复杂电网三相短路计算的MATLAB仿真电力系统分析课设报告 - 图文...

XG?XT**35.3100??0.11003000.856100???0.05100120发电厂B&#xff1a;XG?XT**17.65100 ??0.051003000.853100???0.025100120发电厂H&#xff1a;XG?XT**17.65100??0.051003000.8512100???0.1100120变电站C&#xff1a;3.6100*XT???0.03100120 线路&#x…

php 将多个数组 相同的键重组,PHP – 合并两个类似于array_combine但具有重复键的数组...

你可以使用array_map&#xff1a;$arrKeys array(str, str, otherStr);$arrVals array(1.22, 1.99, 5.17);function foo($key, $val) {return array($key>$val);}$arrResult array_map(foo, $arrKeys, $arrVals);print_r($arrResult);Array([0] > Array([str] > 1.…

C php反序列化,php反序列化漏洞 - anansec的个人空间 - OSCHINA - 中文开源技术交流社区...

反序列化本身是没有漏洞的&#xff0c;但是当反序列化和一些魔术方法结合使用时就可能会产生安全风险。常用的魔术方法__wakeup反序列化漏洞示例(__wekeup)class A{var $test "demo";function __wakeup(){eval($this->test);}}$b new A();$c serialize($b);$a …

oracle lob值是什么,关于Oracle数据库LOB大字段总结

概述在ORACLE数据库中&#xff0c;DBA_OBJECTS视图中OBJECT_TYPE为LOB的对象是什么东西呢&#xff1f;其实OBJECT_TYPE为LOB就是大对象(LOB)&#xff0c;它指那些用来存储大量数据的数据库字段。Oracle 11gR2 文档&#xff1a;http://download.oracle.com/docs/cd/E11882_01/Ap…