【shell-10】shell实现的各种kafka脚本

kafka-shell工具

    • 背景
    • 日志 log
    • 一.启动kafka->(start-kafka)
    • 二.停止kafka->(stop-kafka)
    • 三.创建topic->(create-topic)
    • 四.删除topic->(delete-topic)
    • 五.获取topic列表->(list-topic)
    • 六. 将文件数据 录入到kafka->(file-to-kafka)
    • 七.将kafka数据 下载到文件->(kafka-to-file)
    • 八. 查看topic的groupID消费情况->(list-group)

背景

注意:我用的kafka版本是 3.2.1 其他版本kafka提供的 命令行可能有细微区别。
因为经常要用kafka环境参与测试,所以写了不少脚本。在很多时候可以大大提高测试的效率。
主要包含如下功能:
topic的管理【创建,删除】
topic信息查看【topic列表,topic groupid 消费情况】
topic数据传输【file数据录入到topic,topic数据下载到本地文件】
脚本中做了各种检查,日志的输出做了颜色区分,用起来没啥问题。

日志 log

此文件是个额外的日志文件主要用于打印日志,该文件会被下面的shell文件引用

#!/bin/bash
#日志级别 debug-1, info-2, warn-3, error-4, always-5
LOG_LEVEL=2#调试日志
function log_debug(){content="$(date '+%Y-%m-%d %H:%M:%S') [DEBUG]: $@"[ $LOG_LEVEL -le 1  ] && echo -e "\033[32m"  ${content}  "\033[0m"
}
#信息日志
function log_info(){content="$(date '+%Y-%m-%d %H:%M:%S') [INGO]: $@"[ $LOG_LEVEL -le 2  ] && echo -e "\033[32m"  ${content} "\033[0m"
}
#警告日志
function log_warn(){content="$(date '+%Y-%m-%d %H:%M:%S') [WARN] $@"[ $LOG_LEVEL -le 3  ] && echo -e "\033[33m" ${content} "\033[0m"
}
#错误日志
function log_err(){content="$(date '+%Y-%m-%d %H:%M:%S') [ERROR]: $@"[ $LOG_LEVEL -le 4  ] && echo -e "\033[31m" ${content} "\033[0m"
}
~  

一.启动kafka->(start-kafka)

下面代码中的路径你要替换成自己的路径

#!/bin/bash
source /home/shell/logpid=`ps -aux | grep /home/kafka/kafka_2.12-3.2.1/bin/ | grep -v grep |awk '{print$2}'`
log_info "Start checking kafka process"
if [ -z $pid ]; thenlog_info "The kafka process does not exist, startting.........................................................................................."
elselog_warn "The kafka process exists and does not need to be started"exit 1
fi
nohup kafka-server-start.sh /home/kafka/kafka_2.12-3.2.1/config/server.properties >>/home/kafka/kafka.log 2>&1 &
# 日志的路径是安装kafka的时候指定的,也要替换成自己的路径
tail -f 20 /home/kafka/kafka.log

二.停止kafka->(stop-kafka)

下面代码中的路径你要替换成自己的路径

#!/bin/bash
source /home/shell/log
log_info "Start checking kafka process"
pid=`ps -aux | grep /home/kafka/kafka_2.12-3.2.1/bin/ | grep -v grep |awk '{print$2}'`
if [ -z $pid ]; thenlog_warn "The kafka process does not exist and does not need to be stopped"exit 1
elselog_info "The kafka process alive, stopping.............................................................................................................."
fi
kafka-server-stop.sh
log_info "Stop kafka success"

三.创建topic->(create-topic)

下面代码中的路径你要替换成自己的路径

#!/bin/bash
source /home/shell/log
log_info "脚本功能: 创建topic"
log_info "脚本参数: topic名称(必选)"
if [ $# -ne 1 ]; thenlog_err "错误:请传入topic名称"exit 1
fi
#TOPIC名称
TOPIC_NAME=$1
#KAFKA地址
KAFKA_BROKER=ip:9092
# 检查Kafka主题是否存在, 若已存在则放弃创建
if kafka-topics.sh --bootstrap-server $KAFKA_BROKER  --list | grep -q "^$TOPIC_NAME$";thenlog_warn "$TOPIC_NAME 已经存在,放弃创建"
else# 默认1副本,3分区kafka-topics.sh --create --bootstrap-server $KAFKA_BROKER --replication-factor 1 --partitions 3 --topic $TOPIC_NAMElog_info "请执行topic-list检查创建是否成功"
fi
~     

在这里插入图片描述

四.删除topic->(delete-topic)

下面代码中的路径你要替换成自己的路径

#!/bin/bashsource /home/shell/log
log_info "脚本作用:删除topic"
log_info "脚本参数: 支持多个topic,用空格分开,可以批量删除"
KAFKA_BROKER=ip:9092
function check_kafka_topic() {local local_topic_name=$1if kafka-topics.sh --bootstrap-server $KAFKA_BROKER  --list | grep -q "^$local_topic_name$";thenlog_info "$local_topic_name存在->true"return 0  # 返回true  elselog_warn "$local_topic_name 不存在->false"return 1  # return falsefi
}# 逐个删除topic
for topic in "$@"
doif ! check_kafka_topic $topic; thenlog_info "tpoic->$topic 不存在,跳过删除行为"continueelselog_info "topic->$topic 执行删除"kafka-topics.sh --delete --bootstrap-server $KAFKA_BROKER --topic $topiclog_info "topic->$topic 删除成功"fi
done

在这里插入图片描述

五.获取topic列表->(list-topic)

#!/bin/bash
source /home/shell/log
KAFKA_BROKER=ip:9092  
log_info "脚本作用: 列出topic信息"
log_info "脚本参数: topic名称(可选->未指定topic则列出所有topic信息)"
if [ $# -eq 1 ]; thenlog_info "目标$1 详情如下"kafka-topics.sh --describe  --bootstrap-server $KAFKA_BROKER | grep -v "__consumer_offsets" | grep $1
elselog_info "所有topic 列表如下:"kafka-topics.sh --describe  --bootstrap-server $KAFKA_BROKER | grep -v "__consumer_offsets"
fi

在这里插入图片描述

六. 将文件数据 录入到kafka->(file-to-kafka)

#!/bin/bash
source /home/shell/log
log_info "脚本作用: 将文件中的数据录入指定topic"
log_info "脚本参数: 1.文件路劲(必选) 2.topic(必选)"
log_info "参数校验"
log_info "执行条件检查.........................................................................................................."
if [ $# -ne 2 ]; thenlog_err "必须传入两个参数: 1.文件路劲(必选) 2.topic(必选)"exit 1
fiif ! [ -f $1 ]; thenlog_err "$1不是一个有效的数据文件"exit 1
fiFILE_PATH=$1
TOPIC_NAME=$2
KAFKA_BROKER=ip:9092  #检查topic是否存在
function check_kafka_topic() {local local_KAFKA_BROKER=$1if kafka-topics.sh --bootstrap-server $KAFKA_BROKER  --list | grep -q "^$local_KAFKA_BROKER$";thenreturn 0  # 返回true  elsereturn 1  # return falsefi
}#将文件数据推送到kafka
function send_to_kafka(){local local_path=$1local count=0while IFS= read -r line; do  kafka-console-producer.sh --broker-list $KAFKA_BROKER --topic $TOPIC_NAME <<< "$line"  count=$((count+1))done < "$local_path"echo $count
}        if ! check_kafka_topic $TOPIC_NAME;thenlog_err "条件检查不通过, 原因: topic->$TOPIC_NAME不存在, 请先创建topic"exit 1
filog_info "参数检查通过.........................................................................................................."
start_time=`date "+%Y-%m-%d %H:%M:%S"`
start_seconds=$(date -d "$start_time" +%s)log_info "开始录入数............................................................................................................"
count=$(send_to_kafka $FILE_PATH)end_time=`date "+%Y-%m-%d %H:%M:%S"`
end_seconds=$(date -d "$end_time" +%s)
time_diff=$((end_seconds - start_seconds))  log_info "录入条数: $count"
log_info "花费时间:$time_diff 秒"
log_info "录入完成.............................................................................................................."

在这里插入图片描述

七.将kafka数据 下载到文件->(kafka-to-file)

#!/bin/bash
source /home/shell/log
log_info "脚本作用: 将kafka指定topic的数据消费到指定文件中"
log_info "脚本参数:1.数据文件路径(必选) 2.topic名称(必选) 3.groupID(可选->不存在则从头消费,存在则从grooupID offset 开始消费)"
log_info "group-list 脚本可以查看当前的"
# Kafka的bin目录  
KAFKA_BIN_DIR=/path/to/kafka/bin#kafka 地址  
KAFKA_SERVER=ip:9092 # Kafka的配置文件目录  
KAFKA_CONFIG_DIR=/home/kafka/kafka_2.12-3.2.1/config# Kafka消费者配置文件  
CONSUMER_CONFIG=$KAFKA_CONFIG_DIR/consumer.properties# 指定要消费的主题  
TOPIC_NAME=your_topic_name# 指定要写入的文件 
FILE_PATH=$1
TOPIC_NAME=$2
GROUP_ID=$3log_info "执行检察............................................................................................................................"function check_kafka_topic() {local local_topic_name=$1if kafka-topics.sh --bootstrap-server $KAFKA_SERVER  --list | grep -q "^$local_topic_name$";thenreturn 0  # 返回true  elsereturn 1  # return falsefi
}if ! check_kafka_topic $TOPIC_NAME;thenlog_err "topic->$TOPIC_NAME 未找到"exit 1
fi
log_info "检查通过............................................................................................................................"log_info "当前topic,所有groupID的消费情况如下>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
while IFS= read -r line; doif [[ $line == *"PARTITION"* ]]; thencontent="$(date '+%Y-%m-%d %H:%M:%S') [INFO] $line"echo -e "\033[45m" ${content} "\033[0m"else  log_info "$line"fi
done< <(kafka-consumer-groups.sh --bootstrap-server $KAFKA_SERVER --describe  --all-groups | grep -v '__consumer_offsets' | grep "$TOPIC_NAME\|PARTITION")log_info "当前topic,所有groupID的消费情况输出完成>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"log_info "消费进程运行中( CTRL+C 可退出消费 )................................................................................................."
# 运行消费者脚本并将输出重定向到文件  
if [ $# -eq 2 ]; thenkafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER --topic $TOPIC_NAME --from-beginning > $FILE_PATH
fi
if [ $# -eq 3 ]; thenkafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER --topic $TOPIC_NAME --from-beginning --group $GROUP_ID > $FILE_PATH
fi

在这里插入图片描述

八. 查看topic的groupID消费情况->(list-group)

#!/bin/bash
kafka_broker=ip:9092
source /home/shell/log
log_info "脚本功能: 查看topic的groupID信息"
log_info "脚本参数: topic名称(可选->未指定topic则列出所有topic的groupID信息)"
function check_kafka_topic() {local local_topic_name=$1if kafka-topics.sh --bootstrap-server $kafka_broker  --list | grep -q "^$local_topic_name$";thenlog_info "$local_topic_name存在->true"return 0  # 返回true  elselog_warn "$local_topic_name 不存在->false"return 1  # return falsefi
}if [ $# -eq 1 ]; thenif ! check_kafka_topic $1; then#topic 不存在则直接退出程序log_warn "topic=$1, 不存在"exit 1filog_info "topic_name=$1 的gruoupID信息如下:"kafka-consumer-groups.sh --bootstrap-server $kafka_broker --describe --all-groups | grep $1 | grep -v __consumer_offsets
elselog_info "所有groupID信息如下:"kafka-consumer-groups.sh --bootstrap-server $kafka_broker --describe --all-groups | grep -v __consumer_offsets
fi

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/649491.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux内核中USB设备驱动实现

USB 设备驱动&#xff1a; 一、USB 描述符&#xff1a;&#xff08;存在于USB 的E2PROM里面&#xff09; 1、 设备描述符&#xff1a;struct usb_device_descriptor 2、 配置描述符&#xff1a;struct usb_config_descriptor 3、 接口描述符&#xff1a;struct usb_interfa…

【Deeplabv3+】Ubutu18.04中使用pytorch复现Deeplabv3+第三步)-----CityscapesScripts生成自己的标签

本文是在前面两篇文章的基础上&#xff0c;讲解如何更改训练数据集颜色&#xff0c;需要与前面两篇文章连起来看。 本文用于修改cityscapes数据集的标签颜色与Semankitti数据集的标签一致&#xff0c;对修改后的数据集进行训练。需要下载两个开发工具包和一个数据集&#xff0…

Git标签推送

标签默认属于本地分支&#xff0c;推送分支的时候并不会上传。需要自己手动推送 通过命令 git push origin <tagname>推送指定的标签 通过命令git push origin --tags批量推送所有的标签 在VS里打开git命令行窗口的方法&#xff1a;Git更改-操作-打开命令行提示符 对于…

1.19信息学,信息熵(wordle)

所谓均方误差实际上就是方差 分析&#xff1a;对单词进行编码后&#xff0c;采用聚类方法&#xff0c;可以将单词难度分为三类或者更多&#xff0c;如困难、一般、简单。然后对每一类的单词可视化分析&#xff0c;并描述数据得出结论。 聚类算法较多&#xff0c;在论文中可以…

Docker镜像

创建镜像有三种方法&#xff0c;分别为基于已有镜像创建、基于本地模板创建以及基于Dockerfile创建。 基于现有镜像创建 首先启动一个镜像&#xff0c;在容器里做修改 然后将修改后的容器提交为新的镜像&#xff0c;需要使用该容器的 ID 号创建新镜像 常用选项&#xff1a; -…

【Unity】【游戏开发】Pico打包后项目出现运行时错误如何Debug

【背景】 开发过程中的报错可以通过控制台查看&#xff0c;但是PICO项目这类依赖特定设备环境的应用往往存在打包后在设备端发生运行时错误。这时如何能查看到Debug信息呢&#xff1f; 【分析】 Pico也是安卓系统&#xff0c;所以这个问题就可以泛化为Unity有哪些在安卓端运…

C++实现推箱子游戏

推箱子游戏 运行之后的效果如视频所示&#xff0c;在完成游戏后播放音乐 准备工作&#xff1a;建立一个新的文件夹&#xff0c;并在文件夹中任意增加一张背景图片&#xff0c;以及各个部件的照片文件 因为这里用到了贴图技术&#xff0c;要使用graphic.h这个函数&#xff0c…

海外云手机三大优势

在全球化潮流下&#xff0c;企业因业务需求对海外手机卡等设备的需求不断攀升&#xff0c;推动了海外云手机业务的蓬勃发展。相较于自行置备手机设备&#xff0c;海外云手机不仅能够降低成本&#xff0c;还具备诸多优势&#xff0c;让我们深入探讨其中的三大黄金优势。 经济实惠…

【Linux】进程概述

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; &#x1f525;c系列专栏&#xff1a;C/C零基础到精通 &#x1f525; 给大…

统计学-R语言-8.1

文章目录 前言方差分析方差分析的原理什么是方差分析误差分解 单因子方差分析数学模型效应检验 练习 前言 本片开始介绍有关方差分析的知识。 方差分析 方差分析的基本原理是在20世纪20年代由英国统计学家Ronald A.Fisher在进行实验设计时为解释实验数据而首先引入的。方差分…

最新多功能PHP图床源码 /兰空图床Lsky Pro开源版v2.1/ 单纯的图床程序源码

源码介绍&#xff1a; Lsky Pro 是一个用于在线上传、管理图片的图床程序&#xff0c;中文名&#xff1a;兰空图床&#xff0c;你可以将它作为自己的云上相册&#xff0c;亦可以当作你的写作贴图库。 该程序的最初版本诞生于2017年10月&#xff0c;由ThinkPHP 5框架精心打造而…

【Linux 基础】常用基础指令(上)

文章目录 一、 创建新用户并设置密码二、ls指令ls指令基本概念ls指令的简写操作 三、pwd指令四、cd指令五、touch指令六、rm指令七、mkdir指令八、rmdir 指令 一、 创建新用户并设置密码 ls /home —— 查看存在多少用户 whoami —— 查看当前用户名 adduser 用户名 —— 创建新…

08 BGP 华为官方文档 十一条选路原则

BGP 华为官方文档 十一条选路原则 丢弃下一跳不可达的路由 1&#xff09;比较“协议首选值-pref-val”属性&#xff0c;数值越大越好&#xff0c;默认值是0&#xff0c;只在本设备生效&#xff0c;不在网络中传递 2&#xff09;比较“本地优先级-local_pref”属性&#xff0c;…

Ceph分布式存储自动化运维平台开发实践

文章目录 1. 背景介绍1.1 什么是Ceph&#xff1f;1.1.1 Ceph的核心组件1.1.2 Ceph的优势 1.2 自动化运维的需求目标 2. 平台架构设计和组件版本2.1 平台架构设计2.2 组件版本2.3 模块划分&#xff08;已经脱敏处理&#xff09;2.3.1 当前版本V1.0支持功能2.3.2 前后端代码结构t…

利用 “diart“ 和 OpenAI 的 Whisper 简化实时转录

利用 "diart" 和 OpenAI 的 Whisper 简化实时转录 工作原理 Diart 是一个基于人工智能的 Python 库&#xff0c;用于实时记录说话者语言&#xff08;即 "谁在什么时候说话"&#xff09;&#xff0c;它建立在 pyannote.audio 模型之上&#xff0c;专为实时…

微信小程序 仿微信聊天界面

1. 需求效果图 2. 方案 为实现这样的效果&#xff0c;首先要解决两个问题&#xff1a; 2.1.点击输入框弹出软键盘后&#xff0c;将已有的少许聊天内容弹出&#xff0c;导致看不到的问题 点击输入框弹出软键盘后&#xff0c;将已有的少许聊天内容弹出&#xff0c;导致看不到的问…

银行数据仓库体系实践(8)--主数据模型设计

主数据区域中保留了数据仓库的所有基础数据及历史数据&#xff0c;是数据仓库中最重要的数据区域之一&#xff0c;那主数据区域中主要分为近源模型区和整合&#xff08;主题&#xff09;模型区。上一节讲到了模型的设计流程如下图所示。那近源模型层的设计在第2.3和3这两个步骤…

85 总结一下最近遇到的一些 jar发布 相关的知识

前言 呵呵 最近有一些构建服务, 发布服务的一些需求 我们这里的服务 一般来说是 java application, spring boot application 针对发布, 当然最好是 增量发布, 尽量的减少需要传递给 发布服务器 的资源的大小 比如 我的这个 java application, 可能会存在很多依赖, 常规…

探讨Go语言在构建HTTP代理时的优势和挑战

亲爱的读者&#xff0c;让我们一起来探讨一下Go语言在构建HTTP代理时的优势和挑战。 首先&#xff0c;让我们来谈谈Go语言在构建HTTP代理时的优势。Go语言是一种高性能的编程语言&#xff0c;它具有简洁、高效的特点&#xff0c;非常适合构建高效的代理服务器。使用Go语言&…

springboot第52集:微服务分布式架构,统一验证,oauth,订单,地区管理周刊

在计算机领域中&#xff0c;FGC 通常代表 Full Garbage Collection&#xff0c;即全垃圾收集。垃圾收集是一种自动管理内存的机制&#xff0c;它负责回收不再被程序使用的内存&#xff0c;以便释放资源和提高程序性能。 当系统执行 Full Garbage Collection 时&#xff0c;它会…