Broker
log.dirs
这是非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。比如/home/kafka1,/home/kafka2,/home/kafka3这样
log.dirs = /home/kafka1,/home/kafka2,/home/kafka3
如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。
1、可以提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
2、能够实现故障转移:即 Failover。只要 Kafka Broker 使用的任何一块磁盘挂掉了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。
log.dir :注意这是 dir,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。
zookeeper.connect
单个Kafka 集群使用,直接配置为
zookeeper.connect = zk1:2181,zk2:2181,zk3:2181
多个 Kafka 集群使用同一套 ZooKeeper 集群
比如 两套 Kafka 集群分别为kafka1 和 kafka2,那么两套集群的zookeeper.connect参数可以这样指定:
kafka1 中配置:
zookeeper.connect = zk1:2181,zk2:2181,zk3:2181/kafka1
kafka2 中配置:
zookeeper.connect = zk1:2181,zk2:2181,zk3:2181/kafka2
监听器
listeners : 监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。它是若干个逗号分隔的三元组,每个三元组的格式为<协议名称,主机名,端口号>。
注意 :主机名这个设置中到底使用 IP 地址还是主机名。最好全部使用主机名,即 Broker 端和 Client 端应用配置中全部填写主机名。不然可能会发生无法连接的问题
advertised.listeners :和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。
Topic相关
auto.create.topics.enable: 是否允许自动创建 Topic。
最好设置成 false,即不允许自动创建 Topic。在线上环境里面有很多名字稀奇古怪的 Topic,大概都是因为该参数被设置成了 true 的缘故。
unclean.leader.election.enable: 是否允许 Unclean Leader 选举。
关闭 Unclean Leader 选举的。何谓 Unclean? Kafka 有多个副本,每个分区都有多个副本来提供高可用。在这些副本中只能有一个副本对外提供服务,即所谓的 Leader 副本。
那么问题来了,这些副本都有资格竞争 Leader 吗?显然不是,只有保存数据比较多的那些副本才有资格竞选,那些落后进度太多的副本没资格做这件事。即防止非lsr副本当选为Leader副本。
**auto.leader.rebalance.enable:**是否允许定期进行 Leader 选举。
置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。严格来说它与上一个参数中 Leader 选举的最大不同在于,它不是选 Leader,而是换 Leader!比如 Leader A 一直表现得很好,但若auto.leader.rebalance.enable=true,那么有可能一段时间后 Leader A 就要被强行卸任换成 Leader B。
要知道换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此建议你在生产环境中把这个参数设置成 false。
数据留存
log.retention.{hours|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hours 最低。
log.retention.hours=168表示默认保存 7 天的数据,自动删除 7 天前的数据
log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。这个值默认是 -1,表明你想在这台 Broker 上保存多少数据都可以。
这个参数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集群:设想要做一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间,为了避免有个“恶意”租户使用过多的磁盘空间,设置这个参数就显得至关重要了。
**message.max.bytes:**控制 Broker 能够接收的最大消息大小。
message.max.bytes 默认的 1000012 太少了,还不到 1MB。
实际场景中突破 1MB 的消息都是很常见的,因此在线上环境中设置一个比较大的值还是比较保险的做法。毕竟它仅仅是衡量 Broker 能够处理的最大消息大小,即使设置大一点也不会耗费什么磁盘空间的。
TOPIC
topic配置优先级
如果同时设置了 Topic 级别参数和全局 Broker 参数,哪个参数生效?
Topic 级别参数会覆盖全局 Broker 参数的值,而每个 Topic 都能设置自己的参数值
Topic 的数据保存时间
retention.ms:Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。
retention.bytes:要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。
max.message.bytes: 决定了 Kafka Broker 能够正常接收该 Topic 的最大消息大小。很多公司都把 Kafka 作为一个基础架构组件来运行,上面跑了很多的业务数据。如果在全局层面上,不好给出一个合适的最大消息值,那么不同业务能够自行设定自己业务 Topic 级别参数就显得非常必要了。
创建 Topic 时进行设置
限制5MB,一般消息不会超过 5MB
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bigData --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
修改 Topic 时设置
使用kafka-configs来修改 Topic 级别参数,修改成限制10MB
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name bigData --alter --add-config max.message.bytes=10485760
JVM 参数
- 如果是Jdk8 且有资源配置较高 JVM 堆大小设置成 6GB ,垃圾回收选择G1;
- 如果 Broker 所在机器的内存不是很充足,CPU 资源非常空闲,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。
- 上面两个都不满足,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。
$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties
KAFKA_HEAP_OPTS:指定堆大小。KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数。
操作系统参数
文件描述符限制
经常看到“Too many open files”的错误。直接给大点ulimit -n 1000000
- 临时设置 ulimit -n ,仅在当前终端会话中生效,重启后失效。
# 设置
ulimit -n 1000000
# 验证设置是否成功
ulimit -n
- 永久设置 ulimit -n
修改 /etc/sysctl.conf
sudo nano /etc/sysctl.conf
#添加内容
fs.file-max = 1000000
# 配置生效
sudo sysctl -p
# 查看
cat /proc/sys/fs/file-max
文件系统类型
指的是如 ext3、ext4 或 XFS 这样的日志型文件系统。XFS 的性能要强于 ext4,所以生产环境性能要求很高,最好还是使用 XFS。
# 查看可用磁盘
1. lsblk
# 创建分区并格式化为 XFS
2. sudo mkfs.xfs /dev/sdb
# 挂载到 Kafka 数据目录 log.dirs=/data/kafka-logs
3. sudo mkdir -p /data/kafka-logs
sudo mount /dev/sdb /data/kafka-logs
# 验证挂载和文件系统类型
4. df -hT /data/kafka-data
# 配置开机自动挂载并优化: 编辑 /etc/fstab,添加:
#noatime: 禁止访问时间更新,提高性能。
#nodiratime: 禁止目录访问时间更新。
#allocsize=512m: 增大文件的预分配大小,减少碎片化。
5. /dev/sdb /data/kafka-logs xfs
defaults,noatime,nodiratime,allocsize=512m 0 0
Swappiness
将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。如果设置为0,将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。
- 临时修改(无需重启)
sudo sysctl vm.swappiness=1
cat /proc/sys/vm/swappiness
- 永久修改(重启后生效)
编辑系统配置文件 /etc/sysctl.conf,添加或修改以下内容:
vm.swappiness=1
让配置生效命令
sudo sysctl -p
cat /proc/sys/vm/swappiness
提交时间(Flush 落盘时间)
向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。
这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。
疑问点: 如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,多写几次内存总比写一次硬盘块而且保险,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。
配置在Kafka Broker 的配置文件 server.properties中:
log.flush.interval.messages=10000 # 设置消息数量触发落盘
log.flush.interval.ms=10000 # 设置时间间隔触发落盘(10秒)
log.flush.scheduler.interval.ms=1000 # 后台检查间隔
如果两个条件都设置(log.flush.interval.messages 和 log.flush.interval.ms),只要满足任意一个条件就会触发落盘。
对性能要求较高的场景,可以适当增加 log.flush.interval.ms 和 log.flush.interval.messages 的值,以减少磁盘 I/O,但可能增加数据丢失的风险。