自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例

Kafka:分布式消息系统的核心原理与安装部署-CSDN博客

自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例-CSDN博客

Kafka 生产者全面解析:从基础原理到高级实践-CSDN博客

Kafka 生产者优化与数据处理经验-CSDN博客

Kafka 工作流程解析:从 Broker 工作原理、节点的服役、退役、副本的生成到数据存储与读写优化-CSDN博客

Kafka 消费者全面解析:原理、消费者 API 与Offset 位移-CSDN博客

Kafka 分区分配及再平衡策略深度解析与消费者事务和数据积压的简单介绍-CSDN博客

Kafka 数据倾斜:原因、影响与解决方案-CSDN博客

Kafka 核心要点解析_kafka mirrok-CSDN博客

Kafka 核心问题深度解析:全面理解分布式消息队列的关键要点_kafka队列日志-CSDN博客

目录

一、脚本功能概述

二、生产者性能测试

三、消费者性能测试

四、查看可用主题列表

五、脚本使用示例

六、脚本源码

七、总结


        在大数据处理的领域中,Kafka 扮演着极为重要的角色,它作为一个分布式流处理平台,能够高效地处理大规模的实时数据。而 Kafka 提供了一系列的脚本工具,帮助我们更好地管理和测试 Kafka 集群。今天,我们就来深入探讨一个自定义的 Kafka 脚本 kf-use.sh,了解其功能与使用场景。

一、脚本功能概述

        kf-use.sh 脚本主要提供了两个核心功能:生产者性能测试和消费者性能测试,同时还具备查看可用主题列表以及退出脚本等功能。通过这些功能,我们可以对 Kafka 集群的生产和消费能力进行评估,以便优化集群配置和数据处理流程。

二、生产者性能测试

  1. 参数输入
    • 当选择生产者性能测试功能时,脚本首先会提示用户输入主题名称。如果输入的主题不存在,脚本会要求用户重新输入,确保测试的主题是有效的。
    • 接着,用户需要输入要生产的记录数量,并且脚本会验证输入是否为整数,如果不是则提示重新输入。
    • 最后,用户要输入每条记录的大小(单位为字节),同样会进行整数验证。
  2. 性能测试执行
    • 一旦用户输入了正确的参数,脚本会调用 kafka-producer-perf-test.sh 工具,并传入相应的参数,如主题名称、记录数量、记录大小等,同时设置吞吐量为 -1(表示不限制吞吐量),并指定 Kafka 集群的地址 bigdata01:9092。这样就可以开始对生产者的性能进行测试,测试结果将反映出在给定条件下生产者向指定主题发送数据的效率。

三、消费者性能测试

  1. 主题选择
    • 在消费者性能测试功能中,首先会调用 kafka-topics.sh 工具列出当前可用的主题列表,然后提示用户输入要测试的主题名称。如果输入的主题不存在,脚本会要求重新输入。
  2. 性能测试执行
    • 当用户选择了存在的主题后,脚本会调用 kafka-consumer-perf-test.sh 工具,传入 Kafka 集群地址 bigdata01:9092、主题名称以及要消费的消息数量(这里固定为 100000 条),从而对消费者从指定主题消费数据的性能进行测试,测试结果可以帮助我们了解消费者处理数据的速度和效率。

四、查看可用主题列表

        无论是在生产者还是消费者性能测试功能中,都提供了查看可用主题列表的选项。通过调用 kafka-topics.sh 工具并传入集群地址 bigdata01:9092,可以获取当前 Kafka 集群中所有的主题名称,方便用户了解集群中的数据主题情况,以便做出正确的测试选择。

五、脚本使用示例

假设我们要对一个名为 test_topic 的主题进行生产者性能测试,我们可以按照以下步骤操作:

  1. 运行 kf-use.sh 脚本。
  2. 选择生产者性能测试功能(可能是对应的数字选项,如 1)。
  3. 输入主题名称 test_topic
  4. 输入要生产的记录数量,例如 10000。
  5. 输入每条记录的大小,比如 100 字节。
  6. 脚本会自动执行生产者性能测试,并输出测试结果,包括发送的总字节数、每秒发送的记录数、每秒发送的字节数等信息。

同样,如果要进行消费者性能测试,例如对 test_topic 主题:

  1. 运行 kf-use.sh 脚本并选择消费者性能测试功能(可能是数字 7 后再选择 1)。
  2. 查看可用主题列表,确认 test_topic 存在后输入该主题名称。
  3. 脚本会执行消费者性能测试,并给出消费 100000 条消息的相关性能数据,如每秒消费的消息数等。

六、脚本源码

#!/bin/bashwhile true; do# 命令大全系统界面echo "Kafka 命令大全系统:"echo "1. 主题操作(topics)"echo "2. 生产者操作(producer)"echo "3. 消费者操作(consumer)"echo "4. 配置操作(configs)"echo "5. 消费者组操作(consumer groups)"echo "6. 生产者性能测试(producer perf test)"echo "7. 消费者性能测试(consumer perf test)"echo "0. 退出"read -p "请输入功能选项数字:" choicecase $choice in1)# 主题操作菜单while true; doecho "主题操作功能:"echo "1. 查看所有主题"echo "2. 创建主题"echo "3. 查看某主题详细信息"echo "4. 修改某主题分区数"echo "5. 删除主题"echo "0.返回命令大全系统界面"read -p "请输入主题操作选项数字:" topic_choicecase $topic_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --list;;2)# 创建主题read -p "请输入要创建的主题名称:" topic_namewhile true; doread -p "请输入分区数(整数):" partitionsif [[ $partitions =~ ^[0-9]+$ ]]; thenbreakelseecho "分区数必须是整数,请重新输入。"fidonewhile true; doread -p "请输入副本数(整数,且不超过可用 broker 数量):" replication_factorif [[ $replication_factor =~ ^[0-9]+$ ]]; then# 这里可添加检查副本数不超过可用 broker 数量的逻辑,暂时省略breakelseecho "副本数必须是整数,请重新输入。"fidonekafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions $partitions --replication-factor $replication_factor --topic $topic_name;;3)# 查看某主题详细信息kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要查看详细信息的主题名称:" topic_to_describekafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic $topic_to_describe;;4)# 修改某主题分区数kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要修改分区数的主题名称:" topic_to_alterwhile true; doread -p "请输入新的分区数(整数且大于当前分区数):" new_partitionsif [[ $new_partitions =~ ^[0-9]+$ ]]; then# 这里可添加检查新分区数是否大于当前分区数的逻辑,暂时省略breakelseecho "新分区数必须是整数,请重新输入。"fidonekafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic $topic_to_alter --partitions $new_partitions;;5)# 删除主题kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要删除的主题名称:" topic_to_deletekafka-topics.sh --bootstrap-server bigdata01:9092 --delete --topic $topic_to_delete;;0)break;;*)echo "无效的主题操作选择。";;esacdone;;2)# 生产者操作菜单while true; doecho "生产者操作功能:"echo "1. 发送消息到指定主题"echo "0.返回命令大全系统界面"read -p "请输入生产者操作选项数字:" producer_choicecase $producer_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要发送消息的主题名称:" topic_nameecho "开始发送消息到主题 $topic_name。输入'EXIT'退出发送。"while true; doread -p "请输入消息内容:" messageif [ "$message" = "EXIT" ]; thenbreakfikafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic $topic_name <<< "$message"done;;0)break;;*)echo "无效的生产者操作选择。";;esacdone;;3)# 消费者操作菜单while true; doecho "消费者操作功能:"echo "1. 消费指定主题的消息"echo "2. 从主题开头消费所有消息"echo "0.返回命令大全系统界面"read -p "请输入消费者操作选项数字:" consumer_choicecase $consumer_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要消费消息的主题名称:" topic_namekafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic $topic_name;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要从开头消费消息的主题名称:" topic_namekafka-console-consumer.sh --bootstrap-server bigdata01:9092 --from-beginning --topic $topic_name;;0)break;;*)echo "无效的消费者操作选择。";;esacdone;;4)# 配置操作菜单while true; doecho "配置操作功能:"echo "1. 查看主题配置"echo "2. 修改主题配置"echo "0.返回命令大全系统界面"read -p "请输入配置操作选项数字:" config_choicecase $config_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要查看配置的主题名称:" topic_namekafka-configs.sh --bootstrap-server bigdata01:9092 --describe --entity-type topics --entity-name $topic_name;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要修改配置的主题名称:" topic_nameread -p "请输入配置项名称:" config_nameread -p "请输入配置项值:" config_valuekafka-configs.sh --bootstrap-server bigdata01:9092 --alter --entity-type topics --entity-name $topic_name --add-config $config_name=$config_value;;0)break;;*)echo "无效的配置操作选择。";;esacdone;;5)# 消费者组操作菜单while true; doecho "消费者组操作功能:"echo "1. 查看消费者组列表"echo "2. 查看消费者组详情"echo "3. 重置消费者组偏移量"echo "0.返回命令大全系统界面"read -p "请输入消费者组操作选项数字:" consumer_group_choicecase $consumer_group_choice in1)kafka-consumer-groups.sh --bootstrap-server bigdata01:9092 --list;;2)read -p "请输入要查看详情的消费者组名称:" group_namekafka-consumer-groups.sh --bootstrap-server bigdata01:9092 --describe --group $group_name;;3)read -p "请输入要重置偏移量的消费者组名称:" group_nameread -p "请输入要重置偏移量的主题名称:" topic_namekafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"kafka-consumer-groups.sh --bootstrap-server bigdata01:9092 --reset-offsets --group $group_name --topic $topic_name --to-earliest;;0)break;;*)echo "无效的消费者组操作选择。";;esacdone;;6)# 生产者性能测试菜单while true; doecho "生产者性能测试功能:"echo "1. 执行生产者性能测试"echo "2. 查看可用主题列表"echo "0.返回命令大全系统界面"read -p "请输入生产者性能测试选项数字:" producer_perf_choicecase $producer_perf_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要测试的主题名称:" topic_nameexisting_topics=$(kafka-topics.sh --bootstrap-server bigdata01:9092 --list)if echo "$existing_topics" | grep -q "$topic_name"; thenwhile true; doread -p "请输入要发送的记录数量(整数):" num_recordsif [[ $num_records =~ ^[0-9]+$ ]]; thenbreakelseecho "记录数量必须是整数,请重新输入。"fidonewhile true; doread -p "请输入每条记录的大小(整数,单位字节):" record_sizeif [[ $record_size =~ ^[0-9]+$ ]]; thenbreakelseecho "记录大小必须是整数,请重新输入。"fidonekafka-producer-perf-test.sh --topic $topic_name --num-records $num_records --record-size $record_size --throughput -1 --producer-props bootstrap.servers=bigdata01:9092elseecho "主题 $topic_name 不存在,请重新输入。"fi;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --list;;0)break;;*)echo "无效的生产者性能测试选择。";;esacdone;;7)# 消费者性能测试菜单while true; doecho "消费者性能测试功能:"echo "1. 执行消费者性能测试"echo "2. 查看可用主题列表"echo "0.返回命令大全系统界面"read -p "请输入消费者性能测试选项数字:" consumer_perf_choicecase $consumer_perf_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要测试的主题名称:" topic_nameexisting_topics=$(kafka-topics.sh --bootstrap-server bigdata01:9092 --list)if echo "$existing_topics" | grep -q "$topic_name"; thenkafka-consumer-perf-test.sh --broker-list bigdata01:9092 --topic $topic_name --messages 100000elseecho "主题 $topic_name 不存在,请重新输入。"fi;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --list;;0)break;;*)echo "无效的消费者性能测试选择。";;esacdone;;0)echo "退出脚本。"break;;*)echo "无效的选择。";;esac
done

七、总结

        kf-use.sh 脚本为我们提供了一个便捷的方式来测试 Kafka 集群的生产者和消费者性能,同时方便地查看可用主题列表。通过合理使用这个脚本,我们可以更好地了解 Kafka 集群在数据生产和消费方面的能力,及时发现潜在的性能瓶颈并进行优化,从而提高整个大数据处理流程的效率。无论是对于 Kafka 初学者还是有一定经验的开发者,这个脚本都是一个非常实用的工具,可以帮助我们更好地管理和优化 Kafka 集群的运行。

        在实际应用中,我们可以根据不同的业务需求和数据处理场景,灵活调整测试参数,深入分析测试结果,以确保 Kafka 集群能够稳定、高效地运行,满足日益增长的大数据处理需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/60706.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言菜鸟入门·关键字·int的用法

目录 1. int关键字 1.1 取值范围 1.2 符号类型 1.3 运算 1.3.1 加法运算() 1.3.2 减法运算(-) 1.3.3 乘法运算(*) 1.3.4 除法运算(/) 1.3.5 取余运算(%) 1.3.6 自增()与自减(--) 1.3.7 位运算 2. 更多关键字 1. int关键字 int 是一个关键字&#xff0…

神经网络(系统性学习三):多层感知机(MLP)

相关文章&#xff1a; 神经网络中常用的激活函数 神经网络&#xff08;系统性学习一&#xff09;&#xff1a;入门篇 神经网络&#xff08;系统性学习二&#xff09;&#xff1a;单层神经网络&#xff08;感知机&#xff09; 多层感知机&#xff08;MLP&#xff09; 多层感…

Vue——响应式数据,v-on,v-bind,v-if,v-for(内含项目实战)

目录 响应式数据 ref reactive 事件绑定指令 v-on v-on 鼠标监听事件 v-on 键盘监听事件 v-on 简写形式 属性动态化指令 v-bind iuput标签动态属性绑定 img标签动态属性绑定 b标签动态属性绑定 v-bind 简写形式 条件渲染指令 v-if 遍历指令 v-for 遍历对象的值 遍历…

蓝桥杯c++算法秒杀【6】之动态规划【上】(数字三角形、砝码称重(背包问题)、括号序列、组合数问题:::非常典型的必刷例题!!!)

下将以括号序列、组合数问题超级吧难的题为例子讲解动态规划 别忘了请点个赞收藏关注支持一下博主喵&#xff01;&#xff01;&#xff01;! ! ! ! &#xff01; 关注博主&#xff0c;更多蓝桥杯nice题目静待更新:) 动态规划 一、数字三角形 【问题描述】 上图给出了一…

YOLO-FaceV2: A Scale and Occlusion Aware Face Detector

《YOLO-FaceV2:一种尺度与遮挡感知的人脸检测器》 1.引言2.相关工作3.YOLO-FaceV23.1网络结构3.2尺度感知RFE模型3.3遮挡感知排斥损失3.4遮挡感知注意力网络3.5样本加权函数3.6Anchor设计策略3.7 归一化高斯Wasserstein距离 4.实验4.1 数据集4.2 训练4.3 消融实验4.3.1 SEAM块4…

【SQL Server】华中农业大学空间数据库实验报告 实验三 数据操作

1.实验目的 熟悉了解掌握SQL Server软件的基本操作与使用方法&#xff0c;以及通过理论课学习与实验参考书的帮助&#xff0c;熟练掌握使用T-SQL语句和交互式方法对数据表进行插入数据、修改数据、删除数据等等的操作&#xff1b;作为后续实验的基础&#xff0c;根据实验要求重…

【Elasticsearch入门到落地】2、正向索引和倒排索引

接上篇《1、初识Elasticsearch》 上一篇我们学习了什么是Elasticsearch&#xff0c;以及Elastic stack(ELK)技术栈介绍。本篇我们来什么是正向索引和倒排索引&#xff0c;这是了解Elasticsearch底层架构的核心。 上一篇我们学习到&#xff0c;Elasticsearch的底层是由Lucene实…

【Spring Boot】# 使用@Scheduled注解无法执行定时任务

1. 前言 在 Spring Boot中&#xff0c;使用Scheduled注解来定义定时任务时&#xff0c;定时任务不执行&#xff1b;或未在规定时间执行。 import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;Component public c…

STM32总体架构简单介绍

目录 一、引言 二、STM32的总体架构 1、三个被动单元 &#xff08;1&#xff09;内部SRAM &#xff08;2&#xff09;内部闪存存储器 &#xff08;3&#xff09;AHB到APB的桥&#xff08;AHB to APBx&#xff09; 2、四个主动&#xff08;驱动&#xff09;单元 &#x…

C# Postman或者PostApi调试前端webapi接口发送带有request/body/head信息

知识&#xff1a; 前端接口&#xff0c;表单形式提交。 req.ContentType "application/x-www-form-urlencoded"; x-www-form-urlencoded 是一种常见的 MIME 类型&#xff0c;用于将键值对编码为 HTTP 请求体中的 URL 编码格式。在 Web API 中&#xff0c;x-www-for…

李宏毅机器学习课程知识点摘要(1-5集)

前5集 过拟合&#xff1a; 参数太多&#xff0c;导致把数据集刻画的太完整。而一旦测试集和数据集的关联不大&#xff0c;那么预测效果还不如模糊一点的模型 所以找的数据集的量以及准确性也会影响 由于线性函数的拟合一般般&#xff0c;所以用一组函数去分段来拟合 sigmoi…

七、SElinux

一、SElinux简介 SELinux是Security-Enhanced Linux的缩写&#xff0c;意思是安全强化的linuxSELinux 主要由美国国家安全局(NSA)开发&#xff0c;当初开发的目的是为了避免资源的误用传统的访问控制在我们开启权限后&#xff0c;系统进程可以直接访问当我们对权限设置不严谨时…

小程序25- iconfont 字体图标的使用

项目中使用到图标&#xff0c;一般由公司设计进行设计&#xff0c;设计好后上传到阿里巴巴矢量图标库 日常开发过程中&#xff0c;也可以通过 iconfont 图标库下载使用自带的图标 补充&#xff1a;使用 iconfont 图标库报错&#xff1a;Failed to load font 操作步骤&#xff…

鸢尾花植物的结构认识和Python中scikit-learn工具包的安装

鸢尾花植物的结构认识和Python中scikit-learn工具包的安装 鸢尾花植物的结构认识和Python中scikit-learn工具包的安装 鸢尾花植物的结构认识和Python中scikit-learn工具包的安装一、鸢尾花的认识1.1 对花结构和功能认识1.2、鸢尾花认识1.2.1 鸢尾花种类1.2.2 鸢尾花结构 二. Py…

Unity3D 截图

使用 Unity3D 自带的截图接口&#xff0c;制作截图工具。 截图 有时候我们想对 Unity 的窗口进行截图&#xff0c;如果直接使用一些截图工具&#xff0c;很难截取到一张完整分辨率的图片&#xff08;例如&#xff0c;我们想要截取一张 1920 * 1080 的图片&#xff09;。 其实…

Mysql的加锁情况详解

最近在复习mysql的知识点&#xff0c;像索引、优化、主从复制这些很容易就激活了脑海里尘封的知识&#xff0c;但是在mysql锁的这一块真的是忘的一干二净&#xff0c;一点映像都没有&#xff0c;感觉也有点太难理解了&#xff0c;但是还是想把这块给啃下来&#xff0c;于是想通…

丹摩征文活动 | AI创新之路,DAMODEL助你一臂之力GPU

目录 前言—— DAMODEL&#xff08;丹摩智算&#xff09; 算力服务 直观的感受算力提供商的强大​ 平台功能介绍​ 镜像选择 云磁盘创建 总结 前言—— 只需轻点鼠标,开发者便可拥有属于自己的AI计算王国 - 从丰富的GPU实例选择,到高性能的云磁盘,再到预配置的深度学习…

Linux之日志

日志 在编写网络服务器, 各种软件时, 程序一定要打印一些日志信息. 1. 可以向显示器打印, 也可以向文件中写入. 2. 日志是软件在运行时记录的流水账, 用于排查服务进程挂掉的信息. 其中必须要有的是: 日志等级, 时间, 日志内容.可选的是文件名, 代码行数, 进程pid 等 日志…

IDEA指定Maven的settings不生效问题处理

文章目录 一、问题描述二、问题分析三、问题解决 一、问题描述 在Idea中手动指定了maven的settings配置文件&#xff0c;但是一直没生效。 如下图&#xff1a;设置加载settings-aliyun.xml文件&#xff0c;但是最后发现还是在加载settings.xml文件 二、问题分析 ‌在Intel…

【软考】数据库

1. 数据模型 1.1 概念数据模型 概念数据模型一般用 E-R 图表示&#xff0c;常用术语如下&#xff1a; 实体&#xff1a;客观存在的事物&#xff0c;如&#xff1a;一个单位、一个职工、一个部门、一个项目。属性&#xff1a;学生实体有学号、姓名、出生日期等属性。码&#…