1. 相关概念
1.1 代理:Broker
使用Kafka前,我们都会启动Kafka服务进程,这里的Kafka服务进程我们一般会称之为Kafka Broker 或 Kafka Server。因为Kafka是分布式消息系统所以再实际的生产环境中,是需要多个服务进程形成集群提供消息服务的。所以每一个服务节点都是一个broker,而且在 Kafka 集群中,为了区分不同的服务节点,每一个 broker都应该有一个不重复的全局ID,称之为 broker.id,这个 ID 可以在 kafka 软件的配置文件 server.properties 中进行配置。
############################# Server Basics ######################
# The id of the broker. This must be set to a unique integer for each broker
# 集群ID
broker.id=0
咱们的 Kafka 集群中每一个节点都有自己的ID,整数且唯一。
主机 | kafka-broker1 | kafka-broker2 | kafka-broker3 |
---|---|---|---|
broker.id | 1 | 2 | 3 |
1.2 控制器:Controller
Kafka是分布式消息传输系统,所以存在多个 Broker 服务节点,但是它的软件架构采用的是分布式系统中比较常见的主从(Master - Slave) 架构,也就说需要从多个 Broker 中找到一个用于管理整个 Kafka 集群的 Master 节点,这个节点,我们就称之为 Controller。它是 Apache Kafka 的核心组件非常重要。它的主要作用在 Apache ZooKeeper 的帮助下管理和协调控制整个 Kafka 集群。
如果在运行过程中,Controller 节点出现了故障,那么Kafka 会依托于 ZooKeeper选举其他的节点作为新的 Controller,让Kafka 集群实现高可用。
Kafka 集群中 Controler 的基本功能:
Broker 管理
监听 /brokers/ids 节点相关的变化:
- Broker 输了增加或减少的变化
- Broker 对应的数据变化
Topic 管理
- 新增:监听 /brokers/ids 节点相关的变化
- 修改:监听 /brokers/ids 节点相关的变化
- 删除:监听 /admin/delete_topics 节点相关的变化
Partation 管理
- 监听 /admin/reassign_partitions节点相关的变化
- 监听 /isr_change_notification节点相关的变化
- 监听 /preferred_replica_election节点相关的变化
数据服务
启动分区状态机和副本状态机
2. 启动ZooKeeper
Kafka 集群中含有多个服务节点,而分布式系统中经典的主从(Master-Slave)架构就要求从多个服务节点中找一个节点作为集群管理Master,Kafka 集群中的这个Master,我们称之为集群控制器 Controller。
如果此时Controller节点出现故障,它就不能再管理集群功能,那么其他的Slave节点该如何是好呢?
如果从剩余的两个Slave节点中选一个节点出来作为新的集群控制器是不是一个不错的方案,我们将这个选择的过程称之为:选举(elect)。方案是不错,但是问题就在于选哪一个Slave节点呢?不同的软件实现类似的选举功能都会有一些选举算法,而Kafka是依赖于ZooKeeper软件实现Broker节点选举功能。
ZooKeeper 如何实现 Kafka 的节点选举呢?这就要说到我们用到 ZooKeeper 的3个功能:
- 一个是在 ZooKeeper软件中创建节点 Node,创建一个 Node时,我们会设定这个节点时持久化创建,还是临时创建,就是Node 一旦创建后会一直存在,而临时创建,是根据当前的客户端连接创建的临时节点 Node,一旦客户端连接断开,那么这个临时节点 Node 也会被自动删除,所以这样的节点称之为临时节点。
- ZooKeeper 节点是不允许有重复的,所以多个客户端创建同一个节点,只能有一个创建成功。
- 另外一个是客户端可以在 ZooKeeper 的节点上增加监听器,用于监听节点的状态变化,一旦监听的节点状态发生变化,那么监听器就会触发响应,实现待监听功能。
Kafka 是如何利用 ZooKeeper 实现 Controller 节点的选举的:
1)第一次启动Kafka 集群时,会同时启动多个 Broker 节点,每一个 Broker 节点就会连接 ZooKeeper,并尝试创建一个临时节点 /controller
2)因为 ZooKeeper 中一个系欸但不允许重复创建,所以多个 Broker 节点,最终只能有一个 Broker 节点可以创建成功,那么这个创建成功的 Broker 节点聚会自动作为 Kafka 集群控制节点,用于管理整个 Kafa 集群。
3)没有选举成功的其他 Slave 节点会创建 Node 监听器,用于监听 /controller 节点的状态变化。
4)一旦Controller 节点出现故障或挂掉了,那么对应的 ZooKeeper 客户端连接就会中断。ZooKeeper 中的 /controller 节点就会自动被删除,而其他那些 Slave 节点因为增加了监听器,所以当监听到 /controller 节点被删除后,就会马上向 ZooKeeper 发出创建 /controller 节点的请求,一旦创建成功,那么该Broker 就变成了新的 Controller 节点了。
现在我们能明白启动 Kafka 集群之前为什么要先启动 ZooKeeper 集群了吧。就说因为 ZooKeeper 可以协助 Kafka 进行集群管理。
3. 启动Kafka
ZooKeeper 已经启动好了,那我们现在可以启动多个 Kafka Broker节点构建 Kafka 集群了。构建的过程中,每一个 Broker 节点就是一个 Java 进程,而在这个进程中,有很多需要 提前准备好,并进行初始化的内部组件对象。
3.1 初始化 ZooKeeper
Kafka Broker 启动时,首先会创建 ZooKeeper 客户端(KafkaZkClinet),用于 ZooKeeper 进行交互。客户端对象创建完成后,会通过该客户端对象向 ZooKeeper 发送创建 Node 的请求,注意,这里创建的Node都是持久化Node。
节点 | 类型 | 说明 |
---|---|---|
/admin/delete_topics | 持久化节点 | 配置需要删除的topic,因为删除过程中,可能broker下线,或执行失败,那么就需要在broker重新上线后,根据当前节点继续删除操作,一旦topic所有的分区数据全部删除,那么当前节点的数据才会进行清理 |
/brokers/ids | 持久化节点 | 服务节点ID标识,只要broker启动,那么就会在当前节点中增加子节点,brokerID不能重复 |
/brokers/topics | 持久化节点 | 服务节点中的主题详细信息,包括分区,副本 |
/brokers/seqid | 持久化节点 | seqid主要用于自动生产brokerId |
/config/changes | 持久化节点 | kafka的元数据发生变化时,会向该节点下创建子节点。并写入对应信息 |
/config/clients | 持久化节点 | 客户端配置,默认为空 |
/config/brokers | 持久化节点 | 服务节点相关配置,默认为空 |
/config/ips | 持久化节点 | IP配置,默认为空 |
/config/topics | 持久化节点 | 主题配置,默认为空 |
/config/users | 持久化节点 | 用户配置,默认为空 |
/consumers | 持久化节点 | 消费者节点,用于记录消费者相关信息 |
/isr_change_notification | 持久化节点 | ISR列表发生变更时候的通知,在kafka当中由于存在ISR列表变更的情况发生,为了保证ISR列表更新的及时性,定义了isr_change_notification这个节点,主要用于通知Controller来及时将ISR列表进行变更。 |
/latest_producer_id_block | 持久化节点 | 保存PID块,主要用于能够保证生产者的任意写入请求都能够得到响应。 |
/log_dir_event_notification | 持久化节点 | 主要用于保存当broker当中某些数据路径出现异常时候,例如磁盘损坏,文件读写失败等异常时候,向ZooKeeper当中增加一个通知序号,Controller节点监听到这个节点的变化之后,就会做出对应的处理操作 |
/cluster/id | 持久化节点 | 主要用于保存kafka集群的唯一id信息,每个kafka集群都会给分配要给唯一id,以及对应的版本号 |
3.2 初始化服务
Kafka Broker 中有很多的服务对象,用于实现内部管理和外部通信操作。
3.2.1 启动任务调度器
每一个Broker 在启动时都会创建内部调度器(KafkaScheduler) 并启动,用于完成节点内部的工作任务。底层就是Java中的定时任务线程池。
3.2.2 创建数据管理器
每一个 Broker 在启动时都会创建数据管理器(LogManager),用于接收到消息后,完成后续的数据创建,查询,清理等处理。
3.2.3 创建远程数据管理器
每一个 Broker 在启动时都会创建远程数据管理器(RemoteLogManager),用于和其他 Broker 节点进行数据状态同步。
3.2.4 创建副本管理器
每一个 Broker 在启动时都会创建副本管理器(ReplicaManager),用于对主题的副本进行处理。
3.2.5 创建 ZK 元数据缓存
每一个 Broker 在启动时会将 ZK 的关于 Kafka 的元数据进行缓存,创建元数据对象(ZKMetadataCache)
3.2.6 创建 Broker 通信对象
每一个 Broker 在启动时会创建 Broker 之间的通道管理器对象(BrokerToControllerChannelManager),用于管理Broker 和 Controller 之间的通信。
3.2.7 创建网络通信对象
每一个 Broker 在启动时会创建自己的网络通信对象(SockerServer),用于和其他 Broker 之间的通信,其中包含了 Java 用于 NIO 通信的 Channel、Selector 对象。
3.2.8 注册 Broker 节点
Broker启动时,会通过 ZK 客户端对象向 ZK 注册当前的 Broker 节点ID,注册后,创建的 ZK节点为临时节点。如果当前 Broker 的 ZK 客户端断开和ZK的连接,注册的节点会被删除。
3.3 启动控制器
控制器(KafkaController)是每一个 Broker 启动时都会创建的核心对象,用于和ZK 之间建立连接并申请自己为整个 Kafka 集群的 Master 管理者。如果申请成功,那么会完成管理者的初始化操作,并建立和其他 Broker 之间的数据通道接收各种事件,进行封装后交给事件管理器,并定义了 process 方法,用于真正处理各类事件。
3.3.1 初始化通道管理器
创建通道管理器(ControllerChannelManager),该管理器维护了 Controlelr 和 集群所有Broker 节点之间的网络连接,并向 Broker 发送控制类请求及接受响应。
3.3.2 初始化事件管理器
创建事件管理器(ControllerEventManager)维护了 Controller 和集群所有Broker节点之间的网络连接,并向 Broker 发送控制类请求及接受响应。
3.3.3 初始化状态管理器
创建状态管理器(ControllerChangerHandler)可以监听 /controller 节点的操作,一旦节点创建(ControllerChange),删除(Reelect),数据发生变化(ControllerChange),那么监听后执行相应的处理。
3.3.4 启动控制器
控制器对象启动后,会向事件管理器发送 Startup 事件,事件处理现场接收到事件后会通过 ZK 客户端向 ZK 申请 /controller 节点,申请成功后,执行当前节点成为 Controller 的一系列操作。主要是注册各类 ZooKeeper 监听器、删除日志路径变更和 ISR 副本变更通知事件、启动 Controller 通道管理器,以及启动副本状态机和分区状态机。