文章目录
- Zk中存储的kafka的信息
- Kafka Broker总体工作流程
- 1. broker启动后向zk中注册
- 2. Controller谁先启动注册,谁说了算
- 3. 由选举出来的Controller监听brokers节点的变化
- 4. Controller决定leader选举
- 5. Controller将节点信息上传到Zk中
- 6. 其他Controller从zk中同步相关信息
- 消息的发送和存储
- 7. 假设Broker1中的Leader挂了
- 8 Controller监听到节点变化
- 9 获取 ISR
- 10 选举新的Leader
- 11 . 更新Leader 和 ISR
- 实例模拟
- Broker重要参数
Zk中存储的kafka的信息
当前直接存储在根目录下
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
Kafka Broker总体工作流程
1. broker启动后向zk中注册
2. Controller谁先启动注册,谁说了算
3. 由选举出来的Controller监听brokers节点的变化
4. Controller决定leader选举
5. Controller将节点信息上传到Zk中
6. 其他Controller从zk中同步相关信息
消息的发送和存储
7. 假设Broker1中的Leader挂了
8 Controller监听到节点变化
9 获取 ISR
10 选举新的Leader
11 . 更新Leader 和 ISR
实例模拟
1)案例内容:模拟kafka上下线,查看zookeeper中数据变化
2)查看kafka节点相关信息:① 查看zookeeper上的kafka集群节点信息
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids
[102, 103, 104]
② 查看当前kafka集群节点中的controller信息
[zk: localhost:2181(CONNECTED) 2] get /kafka/controller
{"version":1,"brokerid":103,"timestamp":"1637292471777"}
③ 查看kafka中的first主题的0号分区的状态
[zk: localhost:2181(CONNECTED) 2] get /kafka/brokers/topics/first/partitions/0/state
{"controller_epoch":24,"leader":102,"version":1,"leader_epoch":18,"isr":[102,103,104]}
3)模拟kafka下线:停止hadoop103上的kafka
[xxx@hadoop103 kafka]$ bin/kafka-server-stop.sh
4)查看kafka相关节点信息
① 查看zookeeper上的kafka集群节点信息
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids
[102, 104]
② 查看当前kafka集群节点中的controller信息
[zk: localhost:2181(CONNECTED) 2] ls /kafka/controller
{"version":1,"brokerid":102,"timestamp":"1637292471777"}
③ 查看kafka中的first主题的0号分区的状态
[zk: localhost:2181(CONNECTED) 2] get /kafka/brokers/topics/partitions/0/state
{"controller_epoch":24,"leader":102,"version":1,"leader_epoch":18,"isr":[102,104]}
5)重新启动hadoop103上的kafka服务
[xxx@hadoop103 kafka]$ bin/kafka-server-stop.sh
6)再次查看上述节点,观察区别变化
Broker重要参数
参数名称 | 描述 |
---|---|
replica.lag.time.max.ms | ISR中的Follower超过该事件阈值(默认30s)未向Leader发送同步数据,则该Follower将被踢出ISR。 |
auto.leader.rebalance.enable | 默认是true。自动Leader Partition 平衡。 |
leader.imbalance.per.broker.percentage | 默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。 |
leader.imbalance.check.interval.seconds | 默认值300秒。检查leader负载是否平衡的间隔时间。 |
log.segment.bytes | Kafka中log日志是分成一块块存储的,此配置是指log日志划分成块的大小,默认值1G。 |
log.index.interval.bytes | 默认4kb,kafka里面每当写入了4kb大小的日志(.log),然后就往index文件里面记录一个索引。 |
log.retention.hours | Kafka中数据保存的时间,默认7天。 |
log.retention.minutes | Kafka中数据保存的时间,分钟级别,默认关闭。 |
log.retention.ms | Kafka中数据保存的时间,毫秒级别,默认关闭。(优先级最高) |
log.retention.check.interval.ms | 检查数据是否保存超时的间隔,默认是5分钟。 |
log.retention.bytes | 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的segment。 |
log.cleanup.policy | 默认是delete,表示所有数据启用删除策略;如果设置值为compact,表示所有数据启用压缩策略。 |
num.io.threads | 默认是8。负责写磁盘的线程数。整个参数值要占总核数的50%。 |
num.replica.fetchers | 副本拉取线程数,这个参数占总核数的50%的1/3。 |
num.network.threads | 默认是3。数据传输线程数,这个参数占总核数的50%的2/3。 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是Max(long) (9223372036854775807)。一般交给系统管理。 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。 |