KafkaQ - 好用的 Kafka Linux 命令行可视化工具

鉴于并没有在网上找到比较好的linux平台的kafka可视化工具,今天为大家介绍一下自己开发的在 Linux 平台上使用的可视化工具KafkaQ

虽然简陋,主要可以实现下面的这些功能:

1)查看当前topic的分片数量和副本数量

2)查看当前topic下面每个分片的最大offset

3)查看当前topic某个分片下面指定offset范围的数据

4)搜索当前topic指定关键词的message

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

KafkaQ分为普通版本和搜索版本:

* 普通版本支持上述3种查询

* 搜索版本支持上述3种查询之外,增加关键词搜索,即在分片中搜索指定关键词的message

一、普通版 KafkaQ.sh

使用方法:

Usage: KafkaQ.sh --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>]--topic 话题名称
--partition 分片索引(可选)
--offset 从第k个offset开始检索(可选)
--limit 从第k个offset开始检索X条结果(可选)

显示的效果如下,十分简洁,分片数据里面左边一列是消息入库的时间,右边是message内容:

KafkaQ 源码如下:

#!/bin/bash# 默认值
PARTITION=${2:-0}
OFFSET=${3:-0}
LIMIT=${4:-0}# 检查参数
if [ -z "$1" ]; thenecho "Usage: $0 --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>]"exit 1
fiTOPIC="$1"# 检查Kafka命令是否存在
if ! command -v /usr/local/kafka/bin/kafka-topics.sh >/dev/null 2>&1; thenecho "Kafka not found at /usr/local/kafka/bin/"exit 1
fi# 获取Topic信息
echo -e "\033[0;31m* 话题: $TOPIC\033[0m"# 获取分区数和副本数
PARTITION_INFO=$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic "$TOPIC")
PARTITION_COUNT=$(echo "$PARTITION_INFO" | awk '/Partition:/ {print $2}' | wc -l)
REPLICA_COUNT=$(echo "$PARTITION_INFO" | grep -oP 'ReplicationFactor: \K\d+')echo "* 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT"# 获取分片a和分片b的最大偏移量
MAX_OFFSET=$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic "$TOPIC"  |  awk -F: '{ printf "  分片: %s,MaxOffset: %s\n", $2, $3 }')
echo "$MAX_OFFSET"# 获取分片数据
if [ "$LIMIT" -gt 0 ]; thenecho -e "\033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0m"MESSAGES=$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "$TOPIC" --partition "$PARTITION" --offset "$OFFSET" --max-messages "$LIMIT" --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer)# 格式化输出消息echo "$MESSAGES" | awk -F'\t' 'BEGIN {print "* 分片数据:"}{if ($3 != "null") {timestamp = substr($1, 12) / 1000 # 从第10个字符开始提取时间戳,并除以1000以转换为秒级时间戳value = $3printf "%s %s\n", strftime("%Y-%m-%d %H:%M:%S", timestamp), value}}'
fi

二、搜索版 KafkaQ-Search.sh

使用方法:

Usage: KafkaQ-Search.sh --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>] [--search<keyword>]--topic 话题名称
--partition 分片索引(可选)
--offset 从第k个offset开始检索(可选)
--limit 从第k个offset开始检索X条结果(可选)
--search 搜索字符串

显示效果如下:

KafkaQ-Search.sh 源码如下:

#!/bin/bash# 默认值
PARTITION=${2:-0}
OFFSET=${3:-0}
LIMIT=${4:-0}
SEARCH=${5:-""}# 检查参数
if [ -z "$1" ]; thenecho "Usage: $0 --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>] [--search<keyword>]"exit 1
fiwhile [[ $# -gt 0 ]]; docase "$1" in--topic)TOPIC="$2"shift 2;;--partition)PARTITION="$2"shift 2;;--offset)OFFSET="$2"shift 2;;--limit)LIMIT="$2"shift 2;;--search)SEARCH="$2"shift 2;;*)echo "Unknown parameter: $1"exit 1;;esac
done# 检查Kafka命令是否存在
if ! command -v /usr/local/kafka/bin/kafka-topics.sh >/dev/null 2>&1; thenecho "Kafka not found at /usr/local/kafka/bin/"exit 1
fi# 获取Topic信息
echo -e "\033[0;31m* 话题: $TOPIC\033[0m"# 获取分区数和副本数
PARTITION_INFO=$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic "$TOPIC")
PARTITION_COUNT=$(echo "$PARTITION_INFO" | awk '/Partition:/ {print $2}' | wc -l)
REPLICA_COUNT=$(echo "$PARTITION_INFO" | grep -oP 'ReplicationFactor: \K\d+')echo "* 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT"# 获取分片a和分片b的最大偏移量
MAX_OFFSET=$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic "$TOPIC"  |  awk -F: '{ printf "  分片: %s,MaxOffset: %s\n", $2, $3 }')
echo "$MAX_OFFSET"# 获取分片数据
if [ "$LIMIT" -gt 0 ]; thenecho -e "\033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0m"MESSAGES=$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "$TOPIC" --partition "$PARTITION" --offset "$OFFSET" --max-messages "$LIMIT" --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer)# 搜索关键词并输出结果if [[ ! -z $SEARCH ]]; thenecho -e "\033[0;32m* 搜索条件:$SEARCH\033[0m"echo "  搜索结果:"echo "$MESSAGES" | grep --color=never "$SEARCH" | awk -F'\t' '{timestamp = substr($1, 12) / 1000 # 从第12个字符开始提取时间戳,并除以1000以转换为秒级时间戳value = $3printf "%s  %s\n", strftime("%Y-%m-%d %H:%M:%S", timestamp), value}'fi
fi

 * (附注)参考的shell如下

1、获取kafka的topic 分区数量

/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic <topic>

2、获取kafka每个分片最大的offset

/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic <topic>

3、获取kafka分片指定offset范围的具体信息

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic> --partition <partition> --offset <offset> --max-messages <max-message> --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/26454.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【GlobalMapper】去除数据黑边

tif数据加载出来有这种黑色的边框&#xff0c;使用 Global Mapper切片之后&#xff0c;会有那种黑色的色块加载出来 影像黑边出现的原因&#xff1a; 通常&#xff0c;由于影像格式、像素深度、无效值、背景值等原因&#xff0c;会产生黑边或者白边&#xff0c;给影像的拼接或…

【图书推荐】《Vue.js 3.x+Element Plus前端开发实战》

本书重点 Element Plus是一套采用Vue.js 3.x实现的UI组件库&#xff0c;它为开发者、设计师和产品经理提供了配套设计资源&#xff0c;可以帮助网站快速成型。 本书详解Vue.js 3.x和Element Plus开发方法&#xff0c;配套源码、PPT课件。 内容简介 本书分为两篇&#xff0c…

d3dcompiler_43.dll是什么文件?怎么高效率的解决d3dcompiler_43.dll丢失问题

d3dcompiler_43.dll是什么文件&#xff1f;当你知道d3dcompiler_43.dll这个文件名字的时候&#xff0c;相信你是遇到了d3dcompiler_43.dll丢失的问题了&#xff01;所以才会这样问&#xff0c;其实这就是一个普通的dll文件&#xff0c;对于电脑系统有着至关重要的作用&#xff…

干货分享:宏集物联网HMI通过S7 MPI协议采集西门子400PLC数据

前言 为了实现和西门子PLC的数据交互&#xff0c;宏集物联网HMI集成了S7 PPI、S7 MPI、S7 Optimized、S7 ETH等多个驱动来适配西门子200、300、400、1200、1500、LOGO等系列PLC。 本文主要介绍宏集HMI通过S7 MPI协议采集西门子400PLC数据的操作步骤&#xff0c;其他协议的操作…

【面经总结】Java基础 - IO

序列化 什么是序列化和反序列化&#xff1f; 序列化&#xff1a;将对象转换为二进制数据 反序列化&#xff1a;将二进制数据转换为对象 目的&#xff1a;方便网络传输、持久化保存 Java 是怎么实现序列化的&#xff1f; Java 通过对象输入输出流来实现序列化和反序列化&a…

tp6+swoole+mysql+nginx+redis高并发优化

1.服务器 IDC机房提供的物理机&#xff1a;单机 40核&#xff0c;64G内存&#xff0c;带宽100M&#xff0c; 2.redis 7.2配置 timeout600 #空闲连接超时时间,0表示不断开 maxclients100000 #最大连接数 3.Mysql 5.7配置&#xff0c;按宝塔16-32G优化方案并调整&#xff1a;…

通用大模型VS垂直大模型,相辅相成!

1.通用大模型&#xff1a; 如OpenAI的GPT系列、Google的PaLM等&#xff0c;因其广泛的训练数据来源和强大的泛化能力&#xff0c;展现出在多种任务和场景中的应用潜力。它们能够处理从文本生成、代码编写到语言翻译等多种复杂任务&#xff0c;适应性强&#xff0c;减少了针对单…

第二届京津冀现代商贸物流金融创新发展百人大会将于6月16日在廊坊举行

编辑&#xff5c;Ray 物流是实体经济的“筋络”&#xff0c;联接生产和消费、内贸和外贸&#xff0c;必须有效降低全社会物流成本&#xff0c;增强产业核心竞争力&#xff0c;提高经济运行效率。《京津冀协同发展规划纲要》赋予河北“三区一基地”的功能定位&#xff0c;建设全…

第十七章 策略模式

目录 1 策略模式概述 2 策略模式原理 3 策略模式实现 4 策略模式应用实例 5 策略模式总结 1 策略模式概述 策略模式(strategy pattern)的原始定义是&#xff1a;定义一系列算法&#xff0c;将每一个算法封装起来&#xff0c;并使它们可以相互替换。策略模式让算法可以独立…

java实现文件的压缩及解压

一、起因 开发中需要实现文件的压缩及解压功能&#xff0c;以满足某些特定场景的下的需要&#xff0c;在此说下具体实现。 二、实现 1.定义一个工具类ZipUtils,实现文件的压缩及解压&#xff0c;代码如下&#xff1a; import java.io.*; import java.nio.charset.Charset; impo…

TCPListen客户端和TCPListen服务器

创建项目 TCPListen服务器 public Form1() {InitializeComponent();//TcpListener 搭建tcp服务器的类&#xff0c;基于socket套接字通信的//1创建服务器对象TcpListener server new TcpListener(IPAddress.Parse("192.168.107.83"), 3000);//2 开启服务器 设置最大…

据阿谱尔统计显示,2023年全球凹版印刷机市场销售额约为9.1亿美元

根据阿谱尔 (APO Research&#xff09;的统计及预测&#xff0c;2023年全球凹版印刷机市场销售额约为9.1亿美元&#xff0c;预计在2024-2030年预测期内将以超过2.54%的CAGR&#xff08;年复合增长率&#xff09;增长。 由于对软包装和印刷包装的需求不断增长&#xff0c;全球凹…

数据分析-Excel基础函数的使用

Excel基础函数&#xff1a; sum:求和 sumif:单条件求和 sumifs:多条件求和 subtotal:根据筛选求和 if:逻辑判断 vlookup:连接匹配数据 match:查找数值在区域中的位置 index:根据区域的位置返回数值 match、index:一起使用&#xff1a;自动根据列名查找数据 sumifs、match、ind…

2.线性神经网络

目录 1.线性回归一个简化模型线性模型&#xff1a;可以看做是单层神经网络衡量预估质量训练数据参数学习显示解总结 2.基础优化方法小批量随机梯度下降总结 3.Softmax回归&#xff1a;其实是一个分类问题回归VS分类从回归到多类分类---均方损失Softmax和交叉熵损失 4.损失函数L…

web前端:作业三

1.回到顶部案例(固定定位) <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title><style>#container{height: 5000px;border: 1px solid blue;}#back-button{width: 100px;height: 100px;border: 1px solid…

如何申请小程序SSL证书

在互联网时代&#xff0c;数据安全和用户隐私保护变得尤为重要。SSL证书作为网站、应用或小程序与用户之间建立安全连接的关键工具&#xff0c;其重要性不言而喻。SSL证书能够加密数据传输&#xff0c;防止信息被窃取&#xff0c;提升用户信任度&#xff0c;对于小程序开发者来…

Redux 与 MVI:Android 应用的对比

Redux 与 MVI&#xff1a;Android 应用的对比 在为 Android 应用选择合适的状态管理架构时可能会感到困惑。在这个领域中&#xff0c;有两种流行的选择是 Redux 和 MVI&#xff08;Model-View-Intent&#xff09;。两者都有各自的优缺点&#xff0c;因此在深入研究之前了解它们…

WebGIS开发:你还在纠结的10大问题合集!

问题1&#xff1a;GIS开发到底是学Java还是Python&#xff1f; Java是后端语言&#xff0c;Python更重数据分析和算法。 假设通常说的GIS开发是指Webgis&#xff0c;Web就是指网页端&#xff0c;所以我们说的GIS开发大部分情况下是指网页端的地图可视化开发。 GIS开发需要学…

工业烤箱设备厂家:专业制造,助力工业发展

随着现代工业的不断发展&#xff0c;工业烤箱设备在各个领域的应用越来越广泛。作为专业的工业烤箱设备厂家&#xff0c;我们致力于为客户提供高质量、高效率的烤箱设备&#xff0c;助力工业生产的顺利进行。 工业烤箱设备在工业生产中扮演着至关重要的角色。无论是电子、化工、…

Flask快速入门

Flask快速入门&#xff08;路由、CBV、请求和响应、session&#xff09; 目录 Flask快速入门&#xff08;路由、CBV、请求和响应、session&#xff09;安装创建页面Debug模式快速使用Werkzeug介绍watchdog介绍快速体验 路由系统源码分析手动配置路由动态路由-转换器 Flask的CBV…