kafka单机安装及性能测试

kafka单机安装及性能测试

Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,并于2011年开源,随后成为Apache项目。Kafka的核心概念包括发布-订阅消息系统、持久化日志和流处理平台。它主要用于构建实时数据管道和流处理应用,广泛应用于日志聚合、数据传输、实时监控和分析等场景。Kafka具有高吞吐量、低延迟、扩展性强和容错性高等特点。

1. Kafka安装

安装kafka2.7.0: 下载地址:https://kafka.apache.org/downloads

# 下载
$ wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
$ tar xf kafka_2.13-2.7.0.tgz
$ sudo mv kafka_2.13-2.7.0/ /usr/local/kafka2.7.0/
# 修改zookeeper.properties的配置文件。修改dataDir的参数配置,其他的配置默认不变。dataDir=/usr/local/kafka2.7.0/zookeeper
$ sudo vi /usr/local/kafka2.7.0/config/zookeeper.properties

$
 sudo mkdir -p /usr/local/kafka2.7.0/zookeeper/
$ sudo mkdir -p /usr/local/kafka2.7.0/logs/
# 修改server.properties的配置文件。修改listeners、host.name、log.dirs、zookeeper.connect、create.topics.enable和delete.topic.enble的参数配置,没有的配置添加,其他的配置默认不变。
$ sudo vi /usr/local/kafka2.7.0/config/server.properties
######## Socket Server Settings ########
listeners=PLAINTEXT://172.16.0.9:9092
host.name=172.16.0.9
########### Log Basics ###########
log.dirs=/usr/local/kafka2.7.0/logs
########## Zookeeper ###########
zookeeper.connect=172.16.0.9:2181
########## Group Coordinator Settings #########
auto.create.topics.enable=false
delete.topic.enable=true

#
 启动Kafka,使用root用户操作。分为两步,先启动zookeeper,再启动Kafka。
[root@xx]# nohup /usr/local/kafka2.7.0/bin/zookeeper-server-start.sh /usr/local/kafka2.7.0/config/zookeeper.properties > /usr/local/kafka2.7.0/zookeeper-run.log  2>&1 &
[root@xx]# sleep 10
[root@xx]# nohup /usr/local/kafka2.7.0/bin/kafka-server-start.sh /usr/local/kafka2.7.0/config/server.properties > /usr/local/kafka2.7.0/kafka-run.log 2>&1 &

#
 验证。jps查询输出如下择表示启动成功
# jps
101981 Kafka
101420 QuorumPeerMain   #zookeeper
102575 Jps

2. Kafka性能测试

使用kafka自带的性能测试脚本,发起写入MQ消息和消费MQ消息的请求。根据不同数量级的消息写入和消息消费测试结果,评估kafka处理消息的能力。

2.1 Kafka写入消息压力测试

对kafka节点进行MQ消息服务的压力测试,关注Kafka消息写入的延迟时间是否满足需求。

# 脚本命令位于/usr/local/kafka2.7.0/bin
# 创建topic,单机环境replication-factor设置为1。上述server.properties中的auto.create.topics.enable设置为true可以自动创建主题。
$ sudo ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic test_perf
# 删除topic:sudo ./kafka-topics.sh --delete --topic test_perf --zookeeper localhost:2181
# 查询topic:sudo ./kafka-topics.sh --list --zookeeper localhost:2181

#
 指定吞吐量测试时延
$ sudo ./kafka-producer-perf-test.sh --topic test_perf --num-records 100000 --record-size 1000  --throughput 2000 --producer-props bootstrap.servers=172.16.0.9:9092
100000 records sent, 1999.760029 records/sec (1.91 MB/sec), 1.13 ms avg latency, 448.00 ms max latency, 0 ms 50th, 1 ms 95th, 17 ms 99th, 83 ms 99.9th.

$
 sudo ./kafka-producer-perf-test.sh --topic test_perf --num-records 1000000 --record-size 1000  --throughput 5000 --producer-props bootstrap.servers=172.16.0.9:9092
1000000 records sent, 4999.725015 records/sec (4.77 MB/sec), 0.51 ms avg latency, 481.00 ms max latency, 0 ms 50th, 1 ms 95th, 2 ms 99th, 53 ms 99.9th

$
 sudo ./kafka-producer-perf-test.sh --topic test_perf --num-records 10000000 --record-size 1000  --throughput 5000 --producer-props bootstrap.servers=172.16.0.9:9092
10000000 records sent, 4999.985000 records/sec (4.77 MB/sec), 0.35 ms avg latency, 424.00 ms max latency, 0 ms 50th, 1 ms 95th, 1 ms 99th, 5 ms 99.9th.

#
 throughput设置0-1,测试producer的最大吞吐量。
# 优化参数:compression.type=snappy,使用snappy算法压缩消息。
$ sudo ./kafka-producer-perf-test.sh --topic test_perf --num-records 10000000 --record-size 1000  --throughput -1 --producer-props bootstrap.servers=172.16.0.9:9092 batch_size=563840 linger_ms=30000 acks=0 compression_type=snappy
[2024-03-28 16:57:00,757] WARN The configuration 'batch_size' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2024-03-28 16:57:00,757] WARN The configuration 'compression_type' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2024-03-28 16:57:00,757] WARN The configuration 'linger_ms' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
577921 records sent, 115584.2 records/sec (110.23 MB/sec), 239.9 ms avg latency, 491.0 ms max latency.
646464 records sent, 128854.7 records/sec (122.89 MB/sec), 247.7 ms avg latency, 604.0 ms max latency.
313216 records sent, 62418.5 records/sec (59.53 MB/sec), 514.5 ms avg latency, 854.0 ms max latency.
206016 records sent, 41137.4 records/sec (39.23 MB/sec), 724.7 ms avg latency, 1781.0 ms max latency.
...
301184 records sent, 59949.0 records/sec (57.17 MB/sec), 545.7 ms avg latency, 725.0 ms max latency.
10000000 records sent, 62655.463870 records/sec (59.75 MB/sec), 494.30 ms avg latency, 5370.00 ms max latency, 506 ms 50th, 775 ms 95th, 1149 ms 99th, 5221 ms 99.9th.

结果解析:

以写入100w条MQ消息为例,每秒平均向kafka写入了4.77MB的数据,平均4999.725条消息/秒,每次写入的平均延迟为0.51毫秒,最大的延迟为481毫秒。

producer优化思路与优化参数

  1. 优化思路
  • 适当调大 batch.size和 linger.ms:这两个参数是配合起来使用的,目的就是缓存更多的数据,减少客户端发起请求的次数。这两个参数根据实际情况调整,注意要适量。
  • 关闭数据发送确认机制:适用于对数据完整性要求不高的场景,比如日志,丢几条无所谓那种
  • 指定数据发送时的压缩算法:默认不压缩,可选压缩算法gzip,snappy,lz4,zstd等
  1. 推荐一组优化参数
  • batch_size=563840: 默认值是 16384
  • linger_ms=30000: 默认值是 0
  • acks=0: 默认值是 1
  • compression_type="gzip": 默认值是 None

结果汇总:

设置消息总数(单位:w)设置单个消息大小(单位:字节)设置每秒发送消息数实际写入消息数/秒95%的消息延迟(单位:ms)
10100020001999.761ms
100100050004999.721ms
1000100050004999.961ms

2.2 Kafka消费消息压力测试

对Kafka节点进行MQ消息处理的压力测试,验证Kafka的消息处理能力。

# 消费10w消息压测结果。先写入10w消息,然后消费
$ sudo ./kafka-producer-perf-test.sh --topic test_perf --num-records 100000 --record-size 1000  --throughput 2000 --producer-props bootstrap.servers=172.16.0.9:9092
$ sudo ./kafka-consumer-perf-test.sh --broker-list 172.16.0.9:9092 --topic test_perf --fetch-size 1048576 --messages 100000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2024-03-27 14:14:36:989, 2024-03-27 14:14:38:053, 95.3674, 89.6310, 100000, 93984.9624, 1711520077451, -1711520076387, -0.0000, -0.0001

#
 消费100w消息压测结果。先写入100w消息,然后消费
$ sudo ./kafka-producer-perf-test.sh --topic test_perf --num-records 1000000 --record-size 1000  --throughput 5000 --producer-props bootstrap.servers=172.16.0.9:9092
$ sudo ./kafka-consumer-perf-test.sh --broker-list 172.16.0.9:9092 --topic test_perf --fetch-size 1048576 --messages 1000000 --threads 1
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2024-03-27 14:20:11:235, 2024-03-27 14:20:14:554, 953.8040, 287.3769, 1000136, 301336.5472, 1711520411703, -1711520408384, -0.0000, -0.0006

#
 消费1000w消息压测结果。先写入1000w消息,然后消费
$ sudo ./kafka-producer-perf-test.sh --topic test_perf --num-records 10000000 --record-size 1000  --throughput 5000 --producer-props bootstrap.servers=172.16.0.9:9092
$ sudo ./kafka-consumer-perf-test.sh --broker-list 172.16.0.9:9092 --topic test_perf --fetch-size 1048576 --messages 10000000 --threads 1
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2024-03-27 14:56:24:937, 2024-03-27 14:59:01:601, 9536.7823, 60.8741, 10000041, 63831.1354, 1716562585422, -1716562428758, -0.0000, -0.0058

结果解析:

以本例中消费100w条MQ消息为例总共消费了953.8M的数据,每秒消费数据大小为287.377M,总共消费了1000136条消息,每秒消费301336.547条消息。

参数解释:

  1. start.time:测试开始的时间,通常以时间戳形式表示,标志着性能测试或监控的开始时刻。

  2. end.time:测试结束的时间,通常以时间戳形式表示,标志着性能测试或监控的结束时刻。

  3. data.consumed.in.MB:在测试期间消费者从Kafka主题中消费的数据总量,以MB(兆字节)为单位。这个参数表示消费者在指定时间段内消费了多少数据。

  4. MB.sec:每秒消费的数据量,以MB(兆字节)为单位。它表示消费者的吞吐量,即每秒能够消费的数据量。

  5. data.consumed.in.nMsg:在测试期间消费者从Kafka主题中消费的消息总数。这个参数表示消费者在指定时间段内消费了多少条消息。

  6. nMsg.sec:每秒消费的消息数。它表示消费者的吞吐量,即每秒能够消费的消息数量。

  7. rebalance.time.ms:在测试期间由于消费者组重新平衡所花费的总时间,以毫秒为单位。消费者组重新平衡是指消费者组内的消费者发生变动(如新增或移除消费者)时,Kafka需要重新分配分区给各个消费者的过程。

  8. fetch.time.ms:在测试期间用于从Kafka获取消息的总时间,以毫秒为单位。这个参数表示消费者花在从Kafka拉取消息上的总时间。

  9. fetch.MB.sec:每秒从Kafka获取的数据量,以MB(兆字节)为单位。这个参数表示消费者在拉取消息时的吞吐量。

  10. fetch.nMsg.sec:每秒从Kafka获取的消息数。这个参数表示消费者在拉取消息时的吞吐量。

这些参数可以帮助评估Kafka消费者在不同负载下的性能,找出可能的瓶颈,并进行相应的优化。

结果汇总:

消费消息总数(单位:w)共消费数据(单位:M)每秒消费数据(单位:M)每秒消费消息数消费耗时(单位:s)
1095.3671089.63193984.96241.064
100953.8287.3769301336.54723.319
10009536.782360.874163831.1354156.664

参考:

  • Kafka压力测试(自带测试脚本)(单机版)
  • 如何做 Kafka 的性能测试

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/15436.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

电商项目之有趣的支付签名算法

文章目录 1 问题背景2 思路3 代码实现 1 问题背景 在发起支付的时候,一般都需要对发送的请求参数进行加密或者签名,下文简称这个过程为“签名”。行业内比较普遍的签发算法有: (1)按支付渠道给定的字段排序进行拼接&am…

C++|设计模式(〇)|设计模式的六大原则

这里文章只做简要描述,作为扫盲 在软件开发过程中,遵循一定的设计原则可以帮助开发者创建更加灵活、可维护和可扩展的系统。设计模式的六大原则是面向对象设计的核心理念,本文将详细介绍这些原则,并结合实例说明它们的重要性和应用…

Android Studio添加依赖 新版 和 旧版 的添加方式(Gradle添加依赖)(Java)

旧版的(在线添加) 1找 文件 在项目的build.gradle文件中添加依赖(在下面的节点中添加库 格式 ’ 组 :名字 : 版本号 ‘ ) dependencies {implementation com.example:library:1.0.0 }implementation 组:名字:版本…

【lambdastreammaven】

lambda 匿名函数 为了简化java中的匿名内部类 事件监听 写一个类 实现 ActionListener 接口 (外部类) | | 内部类 类在其他地方用不到, 索性就把这个类定义在类的内部使用 好处: 1.内部可以使用外部类的成员 …

互联网十万个为什么之什么是分布式计算?

分布式计算是一种计算方法,它将计算任务分散到多个物理或逻辑上分开的计算机(称为节点)上执行,这些节点通过网络互连并协作完成共同的目标。每个节点具备独立的处理能力和存储资源,在分布式系统中,它们共享…

论文阅读--CLIPasso

让计算机把真实图片抽象成简笔画,这个任务很有挑战性,需要模型捕获最本质的特征 以往的工作是找了素描的数据集,而且抽象程度不够高,笔画是固定好的,素描对象的种类不多,使得最后模型的效果十分受限 之所以…

小米财报:业绩远超预期,汽车推着手机跑!

随着一季度财报陆续出炉,企业间的分化越来越明显。 新环境下,很多公司都陷入停滞时,去讨论“掉队”已经没有多少意义,现在真正值得我们关注的,是那些在逆风情况下,还能“领先”的企业。毫无疑问&#xff0…

ES集群性能优化参考建议

Elasticsearch(ES)集群性能优化是一个多方面的任务,涉及硬件、配置、查询优化等多个方面。以下是一些建议,帮助你优化Elasticsearch集群的性能: 1. 硬件优化 内存:确保分配给Elasticsearch的内存足够大&a…

C++|设计模式(三)|抽象工厂模式

抽象工厂模式仍然属于创建型模式,我们在【简单工厂和工厂方法模式】这篇文章中,描述了简单工厂和工厂方法模式,并在文末,简单介绍了工厂方法模式的局限性。 本文将通过汽车工厂的例子继续来阐述使用抽象工厂模式相比较于工厂方法…

Linux修炼之路之冯系结构,操作系统

目录 一:冯诺依曼体系结构 1.五大组件 2.存储器存在的意义 3.几个问题 二:操作系统 接下来的日子会顺顺利利,万事胜意,生活明朗-----------林辞忧 一:冯诺依曼体系结构 我们当代的计算机的基本构成都是由冯诺依曼…

Kubernetes 容器编排

应用程序部署演变 主要有三个演变: 传统部署:互联网早期,会直接将应用程序部署在物理机上 优点:简单,不需要其它技术的参与 缺点:不能为应用程序定义资源使用边界,很难合理地分配计算资源&…

【开源】多语言大型语言模型的革新:百亿参数模型超越千亿参数性能

大型人工智能模型,尤其是那些拥有千亿参数的模型,因其出色的商业应用表现而受到市场的青睐。但是,直接通过API使用这些模型可能会带来数据泄露的风险,尤其是当模型提供商如OpenAI等可能涉及数据隐私问题时。私有部署虽然是一个解决…

PY32F003+RTL8710(AT) 实现获取天气情况

一、RTL8710主要AT指令 1、ATSR:模块重启 2、ATSE1:开启回显 3、ATPW1:station模式 4、ATPNssid,password,,:连接到AP 5、ATPK1:设置自动接收 6、ATPC0,v1.yiketianqi.com,80:与网站建立TCP连接 7、ATPT125…

关于pytorch加载模型报错问题

load_net[“params”] 报keyerror 加载模型后查看对应参数是什么 model2 torch.load(m1_path "xxx.pth") print(model1.keys())若输出如下: 已经有相应参数不需要执行 load_net[“params”]若输出如下 则需要load_net[“params”]

Linux-命令上

at是一次性的任务,crond是循环的定时任务 如果 cron.allow 文件存在,只有在文件中出现其登录名称的用户可以使用 crontab 命令。root 用户的登录名必须出现在 cron.allow 文件中,如果这个文件存在的话。系统管理员可以明确的停止一个用户&am…

3D 生成重建014-Bidiff使用二维和三维先验的双向扩散

3D 生成重建014-Bidiff使用二维和三维先验的双向扩散 文章目录 0 论文工作1 论文方法2 效果 0 论文工作 大多数三维生成研究集中在将二维基础模型向上投影到三维空间中,要么通过最小化二维评分蒸馏采样(SDS)损失,要么通过对多视图…

判断变量是否为数组的几种方法

1、isArray 方法 isArray() 方法用于判断一个对象是否为数组。如果对象是数组返回 true,否则返回 false。 Array.isArray(arr); // true 1 2、对象原型 通过原型链判断是否具有和数组同一原型链的顶端。 arr.__proto__ Array.prototype; // true 1 3、instanceof…

[数据结构] -- 双向循环链表

🌈 个人主页:白子寰 🔥 分类专栏:C打怪之路,python从入门到精通,数据结构,C语言,C语言题集👈 希望得到您的订阅和支持~ 💡 坚持创作博文(平均质量分82)&#…

一文理清database/sql包的使用场景和宕机查询流程

一文理清database/sql包你可能遇到的问题 那么database/sql包实现了什么功能呢?建立数据库连接检测连接是否能ping通通过连接进行具体的sql查询查询完将连接进行关闭当数据库宕掉重启后再次查询 database/sql包创建的db连接 对于数据库宕掉后重启是否仍然有效&#…

AI绘画工具:创意与技术的完美融合

随着人工智能技术的飞速发展,我们见证了无数领域的革新与变革。其中,AI绘画工具的出现,无疑为艺术界带来了一股清新的风潮。这些工具以其独特的魅力,吸引了无数艺术家和创意人士的目光,成为他们表达自我、探索未知的重…