【Kafka-3.x-教程】-【七】Kafka 生产调优、Kafka 压力测试

【Kafka-3.x-教程】专栏:

【Kafka-3.x-教程】-【一】Kafka 概述、Kafka 快速入门
【Kafka-3.x-教程】-【二】Kafka-生产者-Producer
【Kafka-3.x-教程】-【三】Kafka-Broker、Kafka-Kraft
【Kafka-3.x-教程】-【四】Kafka-消费者-Consumer
【Kafka-3.x-教程】-【五】Kafka-监控-Eagle
【Kafka-3.x-教程】-【六】Kafka 外部系统集成 【Flume、Flink、SpringBoot、Spark】
【Kafka-3.x-教程】-【七】Kafka 生产调优、Kafka 压力测试

【Kafka-3.x-教程】-【七】Kafka 生产调优、Kafka 压力测试

  • 1)Kafka 硬件配置选择
    • 1.1.场景说明
    • 1.2.服务器台数选择
    • 1.3.磁盘选择
    • 1.4.内存选择
    • 1.5.CPU 选择
    • 1.6.网络选择
  • 2)Kafka 生产者
    • 2.1.Kafka 生产者核心参数配置
    • 2.2.生产者如何提高吞吐量
    • 2.3.数据可靠性
    • 2.4.数据去重
    • 2.5.数据有序
    • 2.6.数据乱序
  • 3)Kafka Broker
    • 3.1.Broker 核心参数配置
    • 3.2.服役新节点/退役旧节点
    • 3.3.增加分区
    • 3.4.增加副本因子
    • 3.5.手动调整分区副本存储
    • 3.6.Leader Partition 负载平衡
    • 3.7.自动创建主题
  • 4)Kafka 消费者
    • 4.1.Kafka 消费者核心参数配置
    • 4.2.消费者再平衡
    • 4.3.指定 Offset 消费
    • 4.4.指定时间消费
    • 4.5.消费者事务
    • 4.6.消费者如何提高吞吐量
  • 5)Kafka 总体
    • 5.1.如何提升吞吐量
    • 5.2.数据精准一次
    • 5.3.合理设置分区数
    • 5.4.单条日志大于1m
    • 5.5.服务器挂了
  • 6)Kafka 压测
    • 6.1.Kafka Producer 压力测试
    • 6.2.Kafka Consumer 压力测试

1)Kafka 硬件配置选择

1.1.场景说明

100 万日活,每人每天 100 条日志,每天总共的日志条数是 100 万 * 100 条 = 1 亿条。

1 亿/24 小时/60 分/60 秒 = 1150 条/每秒钟。

每条日志大小:0.5k - 2k(取 1k)。

1150 条/每秒钟 * 1k ≈ 1m/s 。

高峰期每秒钟:1150 条 * 20 倍 = 23000 条。

每秒多少数据量:20MB/s。

1.2.服务器台数选择

服务器台数 = 2 * (生产者峰值生产速率 * 副本 / 100) + 1 = 2 * (20m/s * 2 / 100) + 1 = 3 台

建议 3 台服务器。

1.3.磁盘选择

kafka 底层主要是顺序写,固态硬盘和机械硬盘的顺序写速度差不多。

建议选择普通的机械硬盘。

每天总数据量:1 亿条 * 1k ≈ 100g

100g * 副本 2 * 保存时间 3 天 / 0.7 ≈ 1T。

建议三台服务器硬盘总大小,大于等于 1T。

1.4.内存选择

Kafka 内存组成:堆内存 + 页缓存

1、Kafka 堆内存建议每个节点:10g ~ 15g

在 kafka-server-start.sh 中修改

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"
fi

(1)查看 Kafka 进程号

jps2321 Kafka
5255 Jps
1931 QuorumPeerMain

(2)根据 Kafka 进程号,查看 Kafka 的 GC 情况

jstat -gc 2321 1s 10S0C  S1C  S0U   S1U     EC       EU       OC       OU       MC     MU      CCSC  CCSU  YGC YGCT FGC FGCT  GCT 
0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531

参数说明:

  • S0C:第一个幸存区的大小;
  • S1C:第二个幸存区的大小
  • S0U:第一个幸存区的使用大小;
  • S1U:第二个幸存区的使用大小
  • EC:伊甸园区的大小;
  • EU:伊甸园区的使用大小
  • OC:老年代大小;
  • OU:老年代使用大小
  • MC:方法区大小;
  • MU:方法区使用大小
  • CCSC:压缩类空间大小;
  • CCSU:压缩类空间使用大小
  • YGC:年轻代垃圾回收次数;
  • YGCT:年轻代垃圾回收消耗时间
  • FGC:老年代垃圾回收次数;
  • FGCT:老年代垃圾回收消耗时间
  • GCT:垃圾回收消耗总时间;

(3)根据 Kafka 进程号,查看 Kafka 的堆内存

jmap -heap 2321Attaching to process ID 2321, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.212-b10
using thread-local object allocation.
Garbage-First (G1) GC with 8 thread(s)
Heap Configuration:MinHeapFreeRatio = 40MaxHeapFreeRatio = 70MaxHeapSize = 2147483648 (2048.0MB)NewSize = 1363144 (1.2999954223632812MB)MaxNewSize = 1287651328 (1228.0MB)OldSize = 5452592 (5.1999969482421875MB)NewRatio = 2SurvivorRatio = 8MetaspaceSize = 21807104 (20.796875MB)CompressedClassSpaceSize = 1073741824 (1024.0MB)MaxMetaspaceSize = 17592186044415 MBG1HeapRegionSize = 1048576 (1.0MB)
Heap Usage:
G1 Heap:regions = 2048capacity = 2147483648 (2048.0MB)used = 246367744 (234.95458984375MB)free = 1901115904 (1813.04541015625MB)11.472392082214355% used
G1 Young Generation:
Eden Space:regions = 83capacity = 105906176 (101.0MB)used = 87031808 (83.0MB)free = 18874368 (18.0MB)82.17821782178218% used
Survivor Space:regions = 7capacity = 7340032 (7.0MB)used = 7340032 (7.0MB)free = 0 (0.0MB)100.0% used
G1 Old Generation:regions = 147capacity = 2034237440 (1940.0MB)used = 151995904 (144.95458984375MB)free = 1882241536 (1795.04541015625MB)7.471886074420103% used
13364 interned Strings occupying 1449608 bytes.

2、页缓存:页缓存是 Linux 系统服务器的内存。我们只需要保证 1 个 segment(1g)中 25%的数据在内存中就好。

每个节点页缓存大小 =(分区数 * 1g * 25%)/ 节点数。例如 10 个分区,页缓存大小 =(10 * 1g * 25%)/ 3 ≈ 1g

建议服务器内存大于等于 11G。

1.5.CPU 选择

num.io.threads = 8 负责写磁盘的线程数,整个参数值要占总核数的 50%。

num.replica.fetchers = 1 副本拉取线程数,这个参数占总核数的 50%的 1/3。

num.network.threads = 3 数据传输线程数,这个参数占总核数的 50%的 2/3。

建议 32 个 cpu core。

1.6.网络选择

网络带宽 = 峰值吞吐量 ≈ 20MB/s 选择千兆网卡即可。

100Mbps 单位是 bit;10M/s 单位是 byte ; 1byte = 8bit,100Mbps/8 = 12.5M/s。

一般百兆的网卡(100Mbps )、千兆的网卡(1000Mbps)、万兆的网卡(10000Mbps)。

2)Kafka 生产者

详见:【Kafka-3.x-教程】-【二】Kafka-生产者-Producer

3.1.1 Updating Broker Configs
From Kafka version 1.1 onwards, some of the broker configs can be 
updated without restarting the broker. See the Dynamic Update Mode 
column in Broker Configs for the update mode of each broker config.
read-only: Requires a broker restart for update
per-broker: May be updated dynamically for each broker
cluster-wide: May be updated dynamically as a cluster-wide default.
May also be updated as a per-broker value for testing.

2.1.Kafka 生产者核心参数配置

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2.2.生产者如何提高吞吐量

在这里插入图片描述

2.3.数据可靠性

在这里插入图片描述

2.4.数据去重

1、参数配置
在这里插入图片描述

2、Kafka 的事务一共有如下 5 个 API

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws 
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

2.5.数据有序

单分区内,有序(有条件的,不能乱序);多分区,分区与分区间无序;

2.6.数据乱序

在这里插入图片描述

3)Kafka Broker

详见:【Kafka-3.x-教程】-【三】Kafka-Broker、Kafka-Kraft

3.1.Broker 核心参数配置

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

3.2.服役新节点/退役旧节点

1、创建一个要均衡的主题。

vim topics-to-move.json {"topics": [{"topic": "first"}],"version": 1
}

2、生成一个负载均衡的计划。

bin/kafka-reassign-partitions.sh --
bootstrap-server hadoop102:9092 --topics-to-move-json-file 
topics-to-move.json --broker-list "0,1,2,3" --generate

3、创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)。

vim increase-replication-factor.json

4、执行副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increasereplication-factor.json --execute

5、验证副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increasereplication-factor.json --verify

3.3.增加分区

修改分区数(注意:分区数只能增加,不能减少)

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3

3.4.增加副本因子

1、创建 topic

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic four

2、手动增加副本存储

(1)创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)。

vim increase-replication-factor.json#输入如下内容:
{"version":1,"partitions":[{"topic":"four","partition":0,"replica
s":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"t
opic":"four","partition":2,"replicas":[0,1,2]}]}

(2)执行副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increasereplication-factor.json --execute

3.5.手动调整分区副本存储

1、创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。

vim increase-replication-factor.json#输入如下内容:
{
"version":1,
"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}]
}
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increasereplication-factor.json --execute

3、验证副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increasereplication-factor.json --verify

3.6.Leader Partition 负载平衡

在这里插入图片描述

3.7.自动创建主题

如果 broker 端配置参数 auto.create.topics.enable 设置为 true(默认值是 true),那么当生产者向一个未创建的主题发送消息时,会自动创建一个分区数为 num.partitions(默认值为1)、副本因子为 default.replication.factor(默认值为 1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会自动创建一个相应主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。

生产环境建议将该参数设置为 false

1、向一个没有提前创建 five 主题发送数据

bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic five
>hello world

2、查看 five 主题的详情

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic five

4)Kafka 消费者

详见:【Kafka-3.x-教程】-【四】Kafka-消费者-Consumer

4.1.Kafka 消费者核心参数配置

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4.2.消费者再平衡

在这里插入图片描述

4.3.指定 Offset 消费

kafkaConsumer.seek(topic, 1000);

4.4.指定时间消费

HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(topicPartition, System.currentTimeMillis() -1 * 24 * 3600 * 1000);
kafkaConsumer.offsetsForTimes(timestampToSearch);

4.5.消费者事务

4.6.消费者如何提高吞吐量

增加分区数;

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3

在这里插入图片描述

5)Kafka 总体

5.1.如何提升吞吐量

1、提升生产吞吐量

(1)buffer.memory:发送消息的缓冲区大小,默认值是 32m,可以增加到 64m。

(2)batch.size:默认是 16k。如果 batch 设置太小,会导致频繁网络请求,吞吐量下降;如果 batch 太大,会导致一条消息需要等待很久才能被发送出去,增加网络延时。

(3)linger.ms:这个值默认是 0,意思就是消息必须立即被发送。一般设置一个 5-100 毫秒。如果 linger.ms 设置的太小,会导致频繁网络请求,吞吐量下降;如果 linger.ms 太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。

(4)compression.type:默认是 none,不压缩,但是也可以使用 lz4 压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大 producer 端的 CPU 开销。

2、增加分区

3、消费者提高吞吐量

(1)调整 fetch.max.bytes 大小,默认是 50m。

(2)调整 max.poll.records 大小,默认是 500 条。

4、增加下游消费者处理能力

5.2.数据精准一次

1、生产者角度

  • acks 设置为 -1 (acks=-1)。
  • 幂等性(enable.idempotence = true) + 事务 。

2、broker 服务端角度

  • 分区副本大于等于 2 (–replication-factor 2)。
  • ISR 里应答的最小副本数量大于等于 2 (min.insync.replicas = 2)。

3、消费者

  • 事务 + 手动提交 offset (enable.auto.commit = false)。
  • 消费者输出的目的地必须支持事务(MySQL、Kafka)。

5.3.合理设置分区数

1、创建一个只有 1 个分区的 topic。

2、测试这个 topic 的 producer 吞吐量和 consumer 吞吐量。

3、假设他们的值分别是 Tp 和 Tc,单位可以是 MB/s。

4、然后假设总的目标吞吐量是 Tt,那么分区数 = Tt / min(Tp,Tc)。

例如:producer 吞吐量 = 20m/s;consumer 吞吐量 = 50m/s,期望吞吐量 100m/s;

分区数 = 100 / 20 = 5 分区

分区数一般设置为:3-10 个

分区数不是越多越好,也不是越少越好,需要搭建完集群,进行压测,再灵活调整分区
个数。

5.4.单条日志大于1m

在这里插入图片描述

5.5.服务器挂了

在生产环境中,如果某个 Kafka 节点挂掉。正常处理办法:

1、先尝试重新启动一下,如果能启动正常,那直接解决。

2、如果重启不行,考虑增加内存、增加 CPU、网络带宽。

3、如果将 kafka 整个节点误删除,如果副本数大于等于 2,可以按照服役新节点的方式重新服役一个新节点,并执行负载均衡。

6)Kafka 压测

用 Kafka 官方自带的脚本,对 Kafka 进行压测。

  • 生产者压测:kafka-producer-perf-test.sh

  • 消费者压测:kafka-consumer-perf-test.sh

在这里插入图片描述

6.1.Kafka Producer 压力测试

1、创建一个 test topic,设置为 3 个分区 3 个副本

bin/kafka-topics.sh --bootstrapserver hadoop102:9092 --create --replication-factor 3 --partitions 3 --topic test

2、在 /opt/module/kafka/bin 目录下面有这两个文件。我们来测试一下

bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092batch.size=16384 linger.ms=0

参数说明:

  • record-size 是一条信息有多大,单位是字节,本次测试设置为 1k。
  • num-records 是总共发送多少条信息,本次测试设置为 100 万条。
  • throughput 是每秒多少条信息,设成-1,表示不限流,尽可能快的生产数据,可测
    出生产者最大吞吐量。本次实验设置为每秒钟 1 万条。
  • producer-props 后面可以配置生产者相关参数,batch.size 配置为 16k

输出结果:

ap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=16384 
linger.ms=0
37021 records sent, 7401.2 records/sec (7.23 MB/sec), 1136.0 ms avg latency, 
1453.0 ms max latency.
50535 records sent, 10107.0 records/sec (9.87 MB/sec), 1199.5 ms avg 
latency, 1404.0 ms max latency.
47835 records sent, 9567.0 records/sec (9.34 MB/sec), 1350.8 ms avg latency, 
1570.0 ms max latency.
。。。 。。。
42390 records sent, 8444.2 records/sec (8.25 MB/sec), 3372.6 ms avg latency, 
4008.0 ms max latency.
37800 records sent, 7558.5 records/sec (7.38 MB/sec), 4079.7 ms avg latency, 
4758.0 ms max latency.
33570 records sent, 6714.0 records/sec (6.56 MB/sec), 4549.0 ms avg latency, 
5049.0 ms max latency.
1000000 records sent, 9180.713158 records/sec (8.97 MB/sec), 1894.78 ms 
avg latency, 5049.00 ms max latency, 1335 ms 50th, 4128 ms 95th, 4719 ms 
99th, 5030 ms 99.9th.

3、调整 batch.size 大小

(1)batch.size 默认值是 16k。本次实验 batch.size 设置为 32k。

bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=32768 linger.ms=0

输出结果:

49922 records sent, 9978.4 records/sec (9.74 MB/sec), 64.2 ms avg latency, 
340.0 ms max latency.
49940 records sent, 9988.0 records/sec (9.75 MB/sec), 15.3 ms avg latency, 
31.0 ms max latency.
50018 records sent, 10003.6 records/sec (9.77 MB/sec), 16.4 ms avg latency, 
52.0 ms max latency.
。。。 。。。
49960 records sent, 9992.0 records/sec (9.76 MB/sec), 17.2 ms avg latency, 
40.0 ms max latency.
50090 records sent, 10016.0 records/sec (9.78 MB/sec), 16.9 ms avg latency, 
47.0 ms max latency.
1000000 records sent, 9997.600576 records/sec (9.76 MB/sec), 20.20 ms avg 
latency, 340.00 ms max latency, 16 ms 50th, 30 ms 95th, 168 ms 99th, 249 
ms 99.9th.

(2)batch.size 默认值是 16k。本次实验 batch.size 设置为 4k。

bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=0

输出结果:

15598 records sent, 3117.1 records/sec (3.04 MB/sec), 1878.3 ms avg latency, 
3458.0 ms max latency.
17748 records sent, 3549.6 records/sec (3.47 MB/sec), 5072.5 ms avg latency, 
6705.0 ms max latency.
18675 records sent, 3733.5 records/sec (3.65 MB/sec), 6800.9 ms avg latency, 
7052.0 ms max latency.
。。。 。。。
19125 records sent, 3825.0 records/sec (3.74 MB/sec), 6416.5 ms avg latency, 
7023.0 ms max latency.
1000000 records sent, 3660.201531 records/sec (3.57 MB/sec), 6576.68 ms 
avg latency, 7677.00 ms max latency, 6745 ms 50th, 7298 ms 95th, 7507 ms 
99th, 7633 ms 99.9th.

4、调整 linger.ms 时间:linger.ms 默认是 0ms。本次实验 linger.ms 设置为 50ms。

bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50

输出结果:

16804 records sent, 3360.1 records/sec (3.28 MB/sec), 1841.6 ms avg latency, 
3338.0 ms max latency.
18972 records sent, 3793.6 records/sec (3.70 MB/sec), 4877.7 ms avg latency, 
6453.0 ms max latency.
19269 records sent, 3852.3 records/sec (3.76 MB/sec), 6477.9 ms avg latency, 
6686.0 ms max latency.
。。。 。。。
17073 records sent, 3414.6 records/sec (3.33 MB/sec), 6987.7 ms avg latency, 
7353.0 ms max latency.
19326 records sent, 3865.2 records/sec (3.77 MB/sec), 6756.5 ms avg latency, 
7357.0 ms max latency.
1000000 records sent, 3842.754486 records/sec (3.75 MB/sec), 6272.49 ms 
avg latency, 7437.00 ms max latency, 6308 ms 50th, 6880 ms 95th, 7289 ms 
99th, 7387 ms 99.9th.

5、调整压缩方式

(1)默认的压缩方式是 none。本次实验 compression.type 设置为 snappy。

bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50 compression.type=snappy

输出结果:

17244 records sent, 3446.0 records/sec (3.37 MB/sec), 5207.0 ms avg latency, 
6861.0 ms max latency.
18873 records sent, 3774.6 records/sec (3.69 MB/sec), 6865.0 ms avg latency, 
7094.0 ms max latency.
18378 records sent, 3674.1 records/sec (3.59 MB/sec), 6579.2 ms avg latency, 
6738.0 ms max latency.
。。。 。。。
17631 records sent, 3526.2 records/sec (3.44 MB/sec), 6671.3 ms avg latency, 
7566.0 ms max latency.
19116 records sent, 3823.2 records/sec (3.73 MB/sec), 6739.4 ms avg latency, 
7630.0 ms max latency.
1000000 records sent, 3722.925028 records/sec (3.64 MB/sec), 6467.75 ms 
avg latency, 7727.00 ms max latency, 6440 ms 50th, 7308 ms 95th, 7553 ms 
99th, 7665 ms 99.9th.

(2)默认的压缩方式是 none。本次实验 compression.type 设置为 zstd。

bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50 compression.type=zstd

输出结果:

23820 records sent, 4763.0 records/sec (4.65 MB/sec), 1580.2 ms avg latency, 
2651.0 ms max latency.
29340 records sent, 5868.0 records/sec (5.73 MB/sec), 3666.0 ms avg latency, 
4752.0 ms max latency.
28950 records sent, 5788.8 records/sec (5.65 MB/sec), 5785.2 ms avg latency, 
6865.0 ms max latency.
。。。 。。。
29580 records sent, 5916.0 records/sec (5.78 MB/sec), 6907.6 ms avg latency, 
7432.0 ms max latency.
29925 records sent, 5981.4 records/sec (5.84 MB/sec), 6948.9 ms avg latency, 
7541.0 ms max latency.
1000000 records sent, 5733.583318 records/sec (5.60 MB/sec), 6824.75 ms 
avg latency, 7595.00 ms max latency, 7067 ms 50th, 7400 ms 95th, 7500 ms 
99th, 7552 ms 99.9th.

(3)默认的压缩方式是 none。本次实验 compression.type 设置为 gzip。

bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50 compression.type=gzip

输出结果:

27170 records sent, 5428.6 records/sec (5.30 MB/sec), 1374.0 ms avg latency, 
2311.0 ms max latency.
31050 records sent, 6210.0 records/sec (6.06 MB/sec), 3183.8 ms avg latency, 
4228.0 ms max latency.
32145 records sent, 6427.7 records/sec (6.28 MB/sec), 5028.1 ms avg latency, 
6042.0 ms max latency.
。。。 。。。
31710 records sent, 6342.0 records/sec (6.19 MB/sec), 6457.1 ms avg latency, 
6777.0 ms max latency.
31755 records sent, 6348.5 records/sec (6.20 MB/sec), 6498.7 ms avg latency, 
6780.0 ms max latency.
32760 records sent, 6548.1 records/sec (6.39 MB/sec), 6375.7 ms avg latency, 
6822.0 ms max latency.
1000000 records sent, 6320.153706 records/sec (6.17 MB/sec), 6155.42 ms 
avg latency, 6943.00 ms max latency, 6437 ms 50th, 6774 ms 95th, 6863 ms 
99th, 6912 ms 99.9th.

(4)默认的压缩方式是 none。本次实验 compression.type 设置为 lz4。

bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50 compression.type=lz4

输出结果:

16696 records sent, 3339.2 records/sec (3.26 MB/sec), 1924.5 ms avg latency, 
3355.0 ms max latency.
19647 records sent, 3928.6 records/sec (3.84 MB/sec), 4841.5 ms avg latency, 
6320.0 ms max latency.
20142 records sent, 4028.4 records/sec (3.93 MB/sec), 6203.2 ms avg latency, 
6378.0 ms max latency.
。。。 。。。
20130 records sent, 4024.4 records/sec (3.93 MB/sec), 6073.6 ms avg latency, 
6396.0 ms max latency.
19449 records sent, 3889.8 records/sec (3.80 MB/sec), 6195.6 ms avg latency, 
6500.0 ms max latency.
19872 records sent, 3972.8 records/sec (3.88 MB/sec), 6274.5 ms avg latency, 
6565.0 ms max latency.
1000000 records sent, 3956.087430 records/sec (3.86 MB/sec), 6085.62 ms 
avg latency, 6745.00 ms max latency, 6212 ms 50th, 6524 ms 95th, 6610 ms 
99th, 6695 ms 99.9th.

6、调整缓存大小:默认生产者端缓存大小 32m。本次实验 buffer.memory 设置为 64m。

bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=4096 linger.ms=50 buffer.memory=67108864

输出结果:

20170 records sent, 4034.0 records/sec (3.94 MB/sec), 1669.5 ms avg latency, 
3040.0 ms max latency.
21996 records sent, 4399.2 records/sec (4.30 MB/sec), 4407.9 ms avg latency, 
5806.0 ms max latency.
22113 records sent, 4422.6 records/sec (4.32 MB/sec), 7189.0 ms avg latency, 
8623.0 ms max latency.
。。。 。。。
19818 records sent, 3963.6 records/sec (3.87 MB/sec), 12416.0 ms avg 
latency, 12847.0 ms max latency.
20331 records sent, 4062.9 records/sec (3.97 MB/sec), 12400.4 ms avg 
latency, 12874.0 ms max latency.
19665 records sent, 3933.0 records/sec (3.84 MB/sec), 12303.9 ms avg 
latency, 12838.0 ms max latency.
1000000 records sent, 4020.100503 records/sec (3.93 MB/sec), 11692.17 ms 
avg latency, 13796.00 ms max latency, 12238 ms 50th, 12949 ms 95th, 13691 
ms 99th, 13766 ms 99.9th.

6.2.Kafka Consumer 压力测试

1、修改 /opt/module/kafka/config/consumer.properties 文件中的一次拉取条数为 500:

max.poll.records=500

2、消费 100 万条日志进行压测

bin/kafka-consumer-perf-test.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --messages 1000000 --consumer.config config/consumer.properties

参数说明:

  • –bootstrap-server 指定 Kafka 集群地址
  • –topic 指定 topic 的名称
  • –messages 总共要消费的消息个数。本次实验 100 万条。

输出结果:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, 
nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-01-20 09:58:26:171, 2022-01-20 09:58:33:321, 977.0166, 136.6457, 
1000465, 139925.1748, 415, 6735, 145.0656, 148547.1418

3、一次拉取条数为 2000

(1)修改/opt/module/kafka/config/consumer.properties 文件中的一次拉取条数为 2000:

max.poll.records=2000

(2)再次执行

bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --messages 1000000 --consumer.config config/consumer.properties

输出结果:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, 
nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-01-20 10:18:06:268, 2022-01-20 10:18:12:863, 977.5146, 148.2206, 
1000975, 151777.8620, 358, 6237, 156.7283, 160489.8188

4、调整 fetch.max.bytes 大小为 100m

(1)修改/opt/module/kafka/config/consumer.properties 文件中的拉取一批数据大小 100m:

fetch.max.bytes=104857600

(2)再次执行

bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --messages 1000000 --consumer.config config/consumer.properties

输出结果:

start.time, end.time, data.consumed.in.MB, MB.sec, 
data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, 
fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-01-20 10:26:13:203, 2022-01-20 10:26:19:662, 977.5146, 
151.3415, 1000975, 154973.6801, 362, 6097, 160.3272, 164175.0041

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/610896.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

观成科技-加密C2框架EvilOSX流量分析

工具简介 EvilOSX是一款开源的&#xff0c;由python编写专门为macOS系统设计的C2工具&#xff0c;该工具可以利用自身释放的木马来实现一系列集成功能&#xff0c;如键盘记录、文件捕获、浏览器历史记录爬取、截屏等。EvilOSX主要使用HTTP协议进行通信&#xff0c;通信内容为特…

蓝凌EIS pdf.aspx 任意文件读取漏洞

漏洞描述&#xff1a; 蓝凌EIS智慧协同平台是一个简单、高效的工作方式专为成长型企业打造的沟通、协同、社交的移动办公平台&#xff0c;覆盖OA、沟通、客户、人事、知识等管理需求&#xff0c;集合了非常丰富的模块&#xff0c;满足组织企业在知识、项目管理系统建设等需求的…

jmeter循环控制器

1.循环控制器 简单粗暴 写几次 循环几次 经常结合自定义变量使用 2.foreach控制器 搭配 变量一起使用的循环 一般变量的值是一个集合或者 是2个及2个以上的内容

[中阶]1月29-2月2晚8点-软件需求设计方法学全程实例剖析

建模方法学包含以下技能&#xff1a; A-业务建模——定位需要改进的目标组织&#xff08;人群或机构&#xff09;以及该组织接下来最需要改进的问题。 B-需求——描述为了改进组织的问题&#xff0c;所引入的信息系统必须具有的表现。 C-分析——提炼为了满足功能需求&#…

《ORANGE’S:一个操作系统的实现》读书笔记(二十七)文件系统(二)

上一篇文章我们记录了如何操作硬盘&#xff0c;并且编写了简单的硬盘驱动程序用于获取一些硬盘的参数。这篇文章就在上一篇文章的基础上记录文件系统&#xff0c;完善硬盘驱动程序。 文件系统 现在我们该仔细考虑如何构建一个文件系统了。这并不是我们第一次接触文件系统&…

python 工作目录 与 脚本所在目录不一致

工作目录&#xff1a;执行脚本的地方 我以为工作目录会是当前执行脚本的目录位置&#xff0c;但其实不是&#xff0c;例如&#xff1a; 图中红色文件为我执行的脚本文件&#xff0c;但是实际的工作目录是PYTHON LEARNING 可以用如下代码查询当前工作目录&#xff1a; import os…

dubbo的springboot集成

1.什么是dubbo&#xff1f; Apache Dubbo 是一款 RPC 服务开发框架&#xff0c;用于解决微服务架构下的服务治理与通信问题&#xff0c;官方提供了 Java、Golang 等多语言 SDK 实现。使用 Dubbo 开发的微服务原生具备相互之间的远程地址发现与通信能力&#xff0c; 利用 Dubbo …

【三】把Python Tk GUI打包exe可执行程序,移植到其他机器可用

背景 这是一个系列文章。上一篇【【二】为Python Tk GUI窗口添加一些组件和绑定一些组件事件-CSDN博客】 使用python脚本写一个小工具。因为命令行运行的使用会有dos窗口&#xff0c;交互也不是很方便&#xff0c;开发环境运行也不方便分享给别人用&#xff0c;所以想到…

ubantu中的docker安装

1.Ubuntu Docker 安装 | 菜鸟教程 (runoob.com) 我就是看这个教程进行操作的 2.执行下面两步&#xff0c;就算是安装完成了 3.启动&#xff0c;并检查是否安装成功&#xff1a; 4.安装之后&#xff0c;怎么用&#xff0c;那就是自己随便探索咯&#xff0c;可以看博客&#xf…

3D Web可视化开发工具包HOOPS Communicator:提供Web端浏览大型模型新方案!

前言&#xff1a;HOOPS Communicator是Tech Soft 3D旗下的主流产品之一&#xff0c;具有强大的、专用的高性能图形内核&#xff0c;专注于基于Web的高级3D工程应用程序。其由HOOPS Server和HOOPS Web Viewer两大部分组成&#xff0c;提供了HOOPS Convertrer、Data Authoring的模…

【Spring Cloud】Gateway组件的三种使用方式

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的专栏《Spring Cloud》。&#x1f3af;&#x1f3af; &am…

游戏版 ChatGPT,要用 AI 角色完善生成工具实现 NPC 自由

微软与 AI 初创公司 Inworld 合作&#xff0c;推出基于 AI 的角色引擎和 Copilot 助理&#xff0c;旨在提升游戏中 NPC 的交互力和生命力&#xff0c;提升游戏体验。Inworld 致力于打造拥有灵魂的 NPC&#xff0c;通过生成式 AI 驱动 NPC 行为&#xff0c;使其动态响应玩家操作…

蜗牛目标检测数据集VOC格式480张

蜗牛&#xff0c;一种缓慢而坚韧的软体动物&#xff0c;以其螺旋形的外壳和黏附力极强的黏液而为人所熟知。 蜗牛体型呈螺旋形&#xff0c;有一个硬壳保护其柔软的身体。壳的形状和纹理因种类而异&#xff0c;有的光滑如玻璃&#xff0c;有的则布满细纹。蜗牛的头部有两对触角…

现代 C++ 及 C++ 的演变

C 活跃在程序设计领域。该语言写入了许多新项目&#xff0c;而且据 TIOBE 排行榜数据显示&#xff0c;C 的受欢迎度和使用率位居第 4&#xff0c;仅次于 Python、Java 和 C。 尽管 C 在过去二十年里的 TIOBE 排名都位居前列&#xff08;2008 年 2 月排在第 5 名&#xff0c;到…

el-table实现多行合并的效果,并可编辑单元格

背景 数据为数组包对象&#xff0c;对象里面有属性值是数组&#xff1b;无需处理数据&#xff0c;直接使用el-table包el-table的方法&#xff0c;通过修改el-table的样式直接实现多行合并的效果 html代码 <template><div><el-table size"mini" :d…

【Cadence】sprobe的使用

实验目的&#xff1a;通过sprobe测试电路中某个节点的阻抗 这里通过sprobe测试输入阻抗&#xff0c;可以通过port来验证 设置如下&#xff1a; 说明&#xff1a;Z1代表sprobe往left看&#xff0c;Z2代表sprobe往right看 结果如下&#xff1a; 可以看到ZM1I0.Z2 顺便给出了I…

基于GPT4+Python近红外光谱数据分析及机器学习与深度学习建模

详情点击链接&#xff1a;基于ChatGPT4Python近红外光谱数据分析及机器学习与深度学习建模教程 第一&#xff1a;GPT4基础 1、ChatGPT概述&#xff08;GPT-1、GPT-2、GPT-3、GPT-3.5、GPT-4模型的演变&#xff09; 2、ChatGPT对话初体验&#xff08;注册与充值、购买方法&am…

酒店客房管理系统设计与实现(代码+数据库+文档)

&#x1f345;点赞收藏关注 → 私信领取本源代码、数据库&#x1f345; 本人在Java毕业设计领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目 希望你能有所收获&#xff0c;少走一些弯路。&#x1f345;关注我不迷路&#x1f345;一、研究背景 1.1 研究背景 当…

N9914A FieldFox 手持式射频分析仪,6.5 GHz

N9914A FieldFox 手持式射频分析仪 简述&#xff1a; Keysight FieldFox 便携式分析仪可以在非常恶劣的工作环境中&#xff0c;轻松完成从日常维护到深入故障诊断的各项工作。 选择最适合您需求且有强大软件支持的 Keysight FieldFox 配置。 FieldFox 分析仪可配置为电缆与天线…

C语言中的指针变量p,特殊表达式p[0] ,(*p)[0],(px+3)[2] ,(*px)[3]化简方法

一.已知以下代码&#xff0c;请问以下 式子p[0] &#xff0c;p[1] &#xff0c;(*p)[0] &#xff0c;(*p)[1] 是什么意思&#xff1f; int A[3] {1,2,3}; int (*p)[3] &A; 因为前面的嵌入式C语言基础的章节中说过&#xff0c;数组下标其实就是数组首元素的地址往上偏…