本文内容来自尚硅谷B站公开教学视频,仅做个人总结、学习、复习使用,任何对此文章的引用,应当说明源出处为尚硅谷,不得用于商业用途。
如有侵权、联系速删
视频教程链接:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)
文章目录
- 1 副本基本信息
- 2 Leader 选举流程
- 3Leader 和 Follower 故障处理细节
- 3.1 Follower故障处理细节
- 3.2 Leader故障处理细节
- 4 分区副本分配
- 5 手动调整分区副本存储
- 6 Leader Partition 负载平衡
- 7 增加副本因子
1 副本基本信息
(1)Kafka 副本作用:提高数据可靠性。
(2)Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
(3)Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。
(4)Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。
AR = ISR + OSR
ISR: 表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。
OSR: 表示 Follower 与 Leader 副本同步时,延迟过多的副本。
2 Leader 选举流程
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。
Controller 的信息同步工作是依赖于 Zookeeper 的。
模拟一下一个集群从启动,到leader挂掉,是怎么重新选leader的
- 首先我们启动集群,所有的节点启动,并在zookeeper中注册自己的信息
- 此时,谁先注册,谁就工作积极,那这个节点就负责选举,这个节点被称为controller,剩下的按积极性依次排列
- controller负责监听各个节点的信息,并维护进zookeeper中
- 现在,确定出controller节点
- 这个controller节点将数据上传到zk,并开始维护
- 别的节点,读取zk的数据
- 这时,假设有个节点挂了
- controller发现有个节点没了,那这个节点上的所有主题的leader等于没了,要重新选举
- 首先获取了ISR,看看要重新选举leader的主题,它还有哪些节点活着
- 从活着的节点中,选一个在AR队列里排名最靠前的节点,它就是新的leader
- 将新的节点信息上传到zk,其他节点同步数据
3Leader 和 Follower 故障处理细节
我们上面知道了,如何重新选leader的,但选举时的数据如何处理呢,往下接着分析,我们将挂掉的情况分为leader挂掉和follower挂掉两种情况
3.1 Follower故障处理细节
首先引出两个概念,LEO和WH
LEO: 每个副本的最后一个offset,也就是最后一条消息的ID,或者说同步到哪条消息了
HW: 所有副本中最小的LEO,也就是所有副本中,同步最慢的那个,同步到哪条消息了
现在,如下图,节点2挂掉了,上面的副本离线,不影响其余副本,当它重新上线时,将挂掉时的HW和它自己的LEO之间的数据清理掉,重新从掉时的HW处开始同步,直到追上现在的HW时,重新加入ISR
为啥要清理掉这批数据?
数据都是一批一批的同步过来的,挂掉时并不知道你这一批数据同步到哪一个了,也就是HW到你的LEO这中间的数据都可能不完整,所以干脆全部舍弃,重新同步
3.2 Leader故障处理细节
和follower差不多,区别是,新的leader并不知道故障时别的节点同步的具体,认为故障时高于HW的数据都不可靠,所以全部清掉重新同步
这就导致了一个问题,leader挂掉时,通过清掉HW后面的数据,让新的leader和follower之间的数据,保持了一致性,但是挂掉的leader可能已经处理了一部分数据了,那这一部分数据如果没同步到别的节点上,等于这部分数据就丢失了
4 分区副本分配
会依次往每个节点上放
如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数大于服务器台数,在 kafka底层如何分配存储副本呢?
- 创建 16 分区,3 个副本
①创建一个新的 topic,名称为 second。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 16 --replication-factor 3 --topic second
②查看分区和副本情况。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic second
Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
5 手动调整分区副本存储
在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。
需求:创建一个新的topic,4个分区,两个副本,名称为three。将 该topic的所有副本都存储到broker0和broker1两台服务器上。
手动调整分区副本存储的步骤如下:
(1)创建一个新的 topic,名称为 three。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 4 --replication-factor 2 --topic three
(2)查看分区副本存储情况。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
(3)创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。
vim increase-replication-factor.json
输入如下内容:
{
"version":1,
"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}]
}
(4)执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
(6)查看分区副本存储情况。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
6 Leader Partition 负载平衡
正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。
auto.leader.rebalance.enable,默认是true。自动Leader Partition 平衡。生产环境中,leader 重选举的代价比较大,可能会带来性能影响,建议设置为 false 关闭。
leader.imbalance.per.broker.percentage,默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。
leader.imbalance.check.interval.seconds,默认值300秒。检查leader负载是否平衡的间隔时间。
下面拿一个主题举例说明,假设集群只有一个主题如下图所示:
针对broker0节点,分区2的AR优先副本是0节点,但是0节点却不是Leader节点,
所以不平衡数加1,AR副本总数是4
所以broker0节点不平衡率为1/4>10%,需要再平衡。
broker2和broker3节点和broker0不平衡率一样,需要再平衡。
Broker1的不平衡数为0,不需要再平衡
7 增加副本因子
在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。
1)创建 topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic four
2)手动增加副本存储
创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)
vim increase-replication-factor.json
输入如下内容:
{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}
执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute