KafkaQ - 好用的 Kafka Linux 命令行可视化工具

鉴于并没有在网上找到比较好的linux平台的kafka可视化工具,今天为大家介绍一下自己开发的在 Linux 平台上使用的可视化工具KafkaQ

虽然简陋,主要可以实现下面的这些功能:

1)查看当前topic的分片数量和副本数量

2)查看当前topic下面每个分片的最大offset

3)查看当前topic某个分片下面指定offset范围的数据

4)搜索当前topic指定关键词的message

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

KafkaQ分为普通版本和搜索版本:

* 普通版本支持上述3种查询

* 搜索版本支持上述3种查询之外,增加关键词搜索,即在分片中搜索指定关键词的message

一、普通版 KafkaQ.sh

使用方法:

Usage: KafkaQ.sh --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>]--topic 话题名称
--partition 分片索引(可选)
--offset 从第k个offset开始检索(可选)
--limit 从第k个offset开始检索X条结果(可选)

显示的效果如下,十分简洁,分片数据里面左边一列是消息入库的时间,右边是message内容:

KafkaQ 源码如下:

#!/bin/bash# 默认值
PARTITION=${2:-0}
OFFSET=${3:-0}
LIMIT=${4:-0}# 检查参数
if [ -z "$1" ]; thenecho "Usage: $0 --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>]"exit 1
fiTOPIC="$1"# 检查Kafka命令是否存在
if ! command -v /usr/local/kafka/bin/kafka-topics.sh >/dev/null 2>&1; thenecho "Kafka not found at /usr/local/kafka/bin/"exit 1
fi# 获取Topic信息
echo -e "\033[0;31m* 话题: $TOPIC\033[0m"# 获取分区数和副本数
PARTITION_INFO=$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic "$TOPIC")
PARTITION_COUNT=$(echo "$PARTITION_INFO" | awk '/Partition:/ {print $2}' | wc -l)
REPLICA_COUNT=$(echo "$PARTITION_INFO" | grep -oP 'ReplicationFactor: \K\d+')echo "* 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT"# 获取分片a和分片b的最大偏移量
MAX_OFFSET=$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic "$TOPIC"  |  awk -F: '{ printf "  分片: %s,MaxOffset: %s\n", $2, $3 }')
echo "$MAX_OFFSET"# 获取分片数据
if [ "$LIMIT" -gt 0 ]; thenecho -e "\033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0m"MESSAGES=$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "$TOPIC" --partition "$PARTITION" --offset "$OFFSET" --max-messages "$LIMIT" --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer)# 格式化输出消息echo "$MESSAGES" | awk -F'\t' 'BEGIN {print "* 分片数据:"}{if ($3 != "null") {timestamp = substr($1, 12) / 1000 # 从第10个字符开始提取时间戳,并除以1000以转换为秒级时间戳value = $3printf "%s %s\n", strftime("%Y-%m-%d %H:%M:%S", timestamp), value}}'
fi

二、搜索版 KafkaQ-Search.sh

使用方法:

Usage: KafkaQ-Search.sh --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>] [--search<keyword>]--topic 话题名称
--partition 分片索引(可选)
--offset 从第k个offset开始检索(可选)
--limit 从第k个offset开始检索X条结果(可选)
--search 搜索字符串

显示效果如下:

KafkaQ-Search.sh 源码如下:

#!/bin/bash# 默认值
PARTITION=${2:-0}
OFFSET=${3:-0}
LIMIT=${4:-0}
SEARCH=${5:-""}# 检查参数
if [ -z "$1" ]; thenecho "Usage: $0 --topic<topic> [--partition<partition>] [--offset<offset>] [--limit<limit>] [--search<keyword>]"exit 1
fiwhile [[ $# -gt 0 ]]; docase "$1" in--topic)TOPIC="$2"shift 2;;--partition)PARTITION="$2"shift 2;;--offset)OFFSET="$2"shift 2;;--limit)LIMIT="$2"shift 2;;--search)SEARCH="$2"shift 2;;*)echo "Unknown parameter: $1"exit 1;;esac
done# 检查Kafka命令是否存在
if ! command -v /usr/local/kafka/bin/kafka-topics.sh >/dev/null 2>&1; thenecho "Kafka not found at /usr/local/kafka/bin/"exit 1
fi# 获取Topic信息
echo -e "\033[0;31m* 话题: $TOPIC\033[0m"# 获取分区数和副本数
PARTITION_INFO=$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic "$TOPIC")
PARTITION_COUNT=$(echo "$PARTITION_INFO" | awk '/Partition:/ {print $2}' | wc -l)
REPLICA_COUNT=$(echo "$PARTITION_INFO" | grep -oP 'ReplicationFactor: \K\d+')echo "* 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT"# 获取分片a和分片b的最大偏移量
MAX_OFFSET=$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic "$TOPIC"  |  awk -F: '{ printf "  分片: %s,MaxOffset: %s\n", $2, $3 }')
echo "$MAX_OFFSET"# 获取分片数据
if [ "$LIMIT" -gt 0 ]; thenecho -e "\033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0m"MESSAGES=$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "$TOPIC" --partition "$PARTITION" --offset "$OFFSET" --max-messages "$LIMIT" --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer)# 搜索关键词并输出结果if [[ ! -z $SEARCH ]]; thenecho -e "\033[0;32m* 搜索条件:$SEARCH\033[0m"echo "  搜索结果:"echo "$MESSAGES" | grep --color=never "$SEARCH" | awk -F'\t' '{timestamp = substr($1, 12) / 1000 # 从第12个字符开始提取时间戳,并除以1000以转换为秒级时间戳value = $3printf "%s  %s\n", strftime("%Y-%m-%d %H:%M:%S", timestamp), value}'fi
fi

 * (附注)参考的shell如下

1、获取kafka的topic 分区数量

/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic <topic>

2、获取kafka每个分片最大的offset

/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic <topic>

3、获取kafka分片指定offset范围的具体信息

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic> --partition <partition> --offset <offset> --max-messages <max-message> --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/26454.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

力扣2187.完成旅途的最少时间

力扣2187.完成旅途的最少时间 朴素做法 二分答案 class Solution {long long check(vector<int> time,long long k){long long res0;for(auto t:time)res (long long)k/t;return res;}public:long long minimumTime(vector<int>& time, int totalTrips) {…

【GlobalMapper】去除数据黑边

tif数据加载出来有这种黑色的边框&#xff0c;使用 Global Mapper切片之后&#xff0c;会有那种黑色的色块加载出来 影像黑边出现的原因&#xff1a; 通常&#xff0c;由于影像格式、像素深度、无效值、背景值等原因&#xff0c;会产生黑边或者白边&#xff0c;给影像的拼接或…

【图书推荐】《Vue.js 3.x+Element Plus前端开发实战》

本书重点 Element Plus是一套采用Vue.js 3.x实现的UI组件库&#xff0c;它为开发者、设计师和产品经理提供了配套设计资源&#xff0c;可以帮助网站快速成型。 本书详解Vue.js 3.x和Element Plus开发方法&#xff0c;配套源码、PPT课件。 内容简介 本书分为两篇&#xff0c…

d3dcompiler_43.dll是什么文件?怎么高效率的解决d3dcompiler_43.dll丢失问题

d3dcompiler_43.dll是什么文件&#xff1f;当你知道d3dcompiler_43.dll这个文件名字的时候&#xff0c;相信你是遇到了d3dcompiler_43.dll丢失的问题了&#xff01;所以才会这样问&#xff0c;其实这就是一个普通的dll文件&#xff0c;对于电脑系统有着至关重要的作用&#xff…

干货分享:宏集物联网HMI通过S7 MPI协议采集西门子400PLC数据

前言 为了实现和西门子PLC的数据交互&#xff0c;宏集物联网HMI集成了S7 PPI、S7 MPI、S7 Optimized、S7 ETH等多个驱动来适配西门子200、300、400、1200、1500、LOGO等系列PLC。 本文主要介绍宏集HMI通过S7 MPI协议采集西门子400PLC数据的操作步骤&#xff0c;其他协议的操作…

【面经总结】Java基础 - IO

序列化 什么是序列化和反序列化&#xff1f; 序列化&#xff1a;将对象转换为二进制数据 反序列化&#xff1a;将二进制数据转换为对象 目的&#xff1a;方便网络传输、持久化保存 Java 是怎么实现序列化的&#xff1f; Java 通过对象输入输出流来实现序列化和反序列化&a…

Vue全局组件

全局组件 首先说明一下&#xff0c;本人是前端小学生级别的菜鸡&#xff0c;吐槽的话请口下留情&#xff0c;在评论区指出错误或者补充不足&#xff0c;我会很喜欢&#xff0c;互喷不会进步&#xff0c;相互指点才会。。。。谢谢大家啦 目录 全局组件 目录内置模板引入模板优…

tp6+swoole+mysql+nginx+redis高并发优化

1.服务器 IDC机房提供的物理机&#xff1a;单机 40核&#xff0c;64G内存&#xff0c;带宽100M&#xff0c; 2.redis 7.2配置 timeout600 #空闲连接超时时间,0表示不断开 maxclients100000 #最大连接数 3.Mysql 5.7配置&#xff0c;按宝塔16-32G优化方案并调整&#xff1a;…

通用大模型VS垂直大模型,相辅相成!

1.通用大模型&#xff1a; 如OpenAI的GPT系列、Google的PaLM等&#xff0c;因其广泛的训练数据来源和强大的泛化能力&#xff0c;展现出在多种任务和场景中的应用潜力。它们能够处理从文本生成、代码编写到语言翻译等多种复杂任务&#xff0c;适应性强&#xff0c;减少了针对单…

第二届京津冀现代商贸物流金融创新发展百人大会将于6月16日在廊坊举行

编辑&#xff5c;Ray 物流是实体经济的“筋络”&#xff0c;联接生产和消费、内贸和外贸&#xff0c;必须有效降低全社会物流成本&#xff0c;增强产业核心竞争力&#xff0c;提高经济运行效率。《京津冀协同发展规划纲要》赋予河北“三区一基地”的功能定位&#xff0c;建设全…

第十七章 策略模式

目录 1 策略模式概述 2 策略模式原理 3 策略模式实现 4 策略模式应用实例 5 策略模式总结 1 策略模式概述 策略模式(strategy pattern)的原始定义是&#xff1a;定义一系列算法&#xff0c;将每一个算法封装起来&#xff0c;并使它们可以相互替换。策略模式让算法可以独立…

如何通过Python爬虫提升电商数据分析效率

如何通过Python爬虫提升电商数据分析效率 电商环境中&#xff0c;数据是决策的核心。无论是市场分析、竞争对手研究&#xff0c;还是商品定价和库存管理&#xff0c;获取并分析大量的实时数据都是至关重要的任务。Python 作为一种强大的编程语言&#xff0c;其简单易用的特性使…

java实现文件的压缩及解压

一、起因 开发中需要实现文件的压缩及解压功能&#xff0c;以满足某些特定场景的下的需要&#xff0c;在此说下具体实现。 二、实现 1.定义一个工具类ZipUtils,实现文件的压缩及解压&#xff0c;代码如下&#xff1a; import java.io.*; import java.nio.charset.Charset; impo…

######## redis各章节终篇索引(更新中) ############

其他 父子关系&#xff08;ctx、协程&#xff09;#### golang存在的父子关系 ####_子goroutine panic会导致父goroutine挂掉吗-CSDN博客 参数传递&#xff08;slice、map&#xff09;#### go中参数传递&#xff08;涉及&#xff1a;切片slice、map、channel等&#xff09; ###…

TCPListen客户端和TCPListen服务器

创建项目 TCPListen服务器 public Form1() {InitializeComponent();//TcpListener 搭建tcp服务器的类&#xff0c;基于socket套接字通信的//1创建服务器对象TcpListener server new TcpListener(IPAddress.Parse("192.168.107.83"), 3000);//2 开启服务器 设置最大…

ARM-V9 RME(Realm Management Extension)系统架构之系统安全能力的侧信道抵御

安全之安全(security)博客目录导读 目录 一、系统PMU计数器 二、使用信号和功耗操作进行的故障攻击 一、系统PMU计数器 性能监测单元 (PMU) 计数器可能成为泄露机密信息的侧信道,如访问模式或受RME安全保障保护的安全状态下的执行控制流。以下规则补充了《Arm CoreSight™…

(message): No CUDA toolset found.

解决方法&#xff1a; C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v10.2\extras\visual_studio_integration\MSBuildExtensions\ 下的4个文件 复制到 D:\Program Files\Microsoft Visual Studio\2022\Enterprise\MSBuild\Microsoft\VC\v170\BuildCustomizations\下。…

据阿谱尔统计显示,2023年全球凹版印刷机市场销售额约为9.1亿美元

根据阿谱尔 (APO Research&#xff09;的统计及预测&#xff0c;2023年全球凹版印刷机市场销售额约为9.1亿美元&#xff0c;预计在2024-2030年预测期内将以超过2.54%的CAGR&#xff08;年复合增长率&#xff09;增长。 由于对软包装和印刷包装的需求不断增长&#xff0c;全球凹…

数据分析-Excel基础函数的使用

Excel基础函数&#xff1a; sum:求和 sumif:单条件求和 sumifs:多条件求和 subtotal:根据筛选求和 if:逻辑判断 vlookup:连接匹配数据 match:查找数值在区域中的位置 index:根据区域的位置返回数值 match、index:一起使用&#xff1a;自动根据列名查找数据 sumifs、match、ind…

FreeSWITCH入门到精通系列(三):FreeSWITCH基础概念与架构

FreeSWITCH入门到精通系列&#xff08;三&#xff09;&#xff1a;FreeSWITCH基础概念与架构 前言 在前两篇博客中&#xff0c;我们介绍了FreeSWITCH的基本概念和安装与配置。本篇文章将深入探讨FreeSWITCH的基础概念和架构&#xff0c;帮助您更好地理解这个强大的通信平台的…