文章目录
- 1、介绍
- 1_Kafka&MQ场景
- 2_Kafka 架构剖析
- 3_分区&日志
- 4_生产者&消费者组
- 5_核心概念总结
- 6_顺写&mmap
- 7_Kafka的数据存储形式
- 2、Kafka的数据同步机制
- 1_高水位(High Watermark)
- 2_LEO
- 3_高水位更新机制
- 4_副本同步机制解析
- 5_消息丢失问题
- 6_数据不一致问题
- 6_Leader Epoch 机制
- 3、总结
1、介绍
Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。Kafka是一种高吞吐量、低延迟和高可扩展的分布式发布订阅消息系统,它可以收集并处理用户在网站中的所有动作流数据以及物联网设备的采样信息。
1_Kafka&MQ场景
Apache Kafka提供了消息的订阅与发布的消息队列,一般用作系统间解耦、异步通信、削峰填谷等作用。同时Kafka又提供了Kafka streaming插件包实现了实时在线流处理。相比较一些专业的流处理框架不同,Kafka Streaming计算是运行在应用端,具有简单、入门要求低、部署方便等优点。
-
消息队列 Message Queue:
-
Kafka Streaming 流处理(插件,其它运行在服务端的流计算框架——storm、flink、spark stream):
2_Kafka 架构剖析
消息队列作为一种在分布式和大数据开发中不可或缺的中间件。在分布式开发或者大数据开发中通常使用消息队列进行缓冲、系统间解耦和削峰填谷等业务场景,常见的消息队列工作模式大致会分为两大类:
-
至多一次:消息生产者将数据写入消息系统,然后由消费者负责去拉去消息服务器中的消息,一旦消息被确认消费之后 ,由消息服务器主动删除队列中的数据,这种消费方式一般只允许被一个消费者消费,并且消息队列中的数据不允许被重复消费,传统MQ。
-
没有限制:同上诉消费形式不同,生产者发不完数据以后,该消息可以被多个消费者同时消费,并且同一个消费者可以多次消费消息服务器中的同一个记录。主要是因为消息服务器一般可以长时间存储海量消息,常见大数据领域——Kafka是这种。
Kafka集群以Topic形式负责分类集群中的Record
,每一个Record属于一个Topic
。
每个Topic底层都会对应一组分区的日志用于持久化Topic中的Record。同时在Kafka集群中,Topic的每一个日志的分区都一定会有1个Borker担当该分区的Leader
,其他的Broker担当该分区的follower
(如下图 —— 一个Broker既是当前分区的 Leader,也是其他分区的 follower )。Leader
负责分区数据的读写操作,follower
负责同步改分区的数据(备份)。
这样如果分区的Leader宕机,改分区的其他follower会选取出新的leader继续负责该分区数据的读写。如下图—— 当broker-0
下线后,broker-2
身兼 partition0
和partition1
的 Leader。
其中集群的Leader的监控和Topic的部分元数据是存储在Zookeeper中。
3_分区&日志
Kafka中所有消息通过Topic为单位进行管理,每个Kafka中的Topic通常会有多个订阅者,负责订阅发送到改Topic中的数据。Kafka负责管理集群中每个Topic的一组日志分区数据。
生产者将数据发布到相应的Topic。负责选择将哪个记录分发送到Topic
中的哪个Partition
。例如可以round-robin
方式完成此操作,然而这种仅是为了平衡负载。也可以根据某些语义分区功能(例如基于记录中的Key)进行此操作——比如,使用订单号作为 key,这个 key 对应的消息都会发送到同一个 partition 中。
每组日志分区是一个有序的不可变的日志序列(只能保证分区内部先进先出——局部FIFO,不同分区之间无法保证————无法保证全局的有序,因此Kafka不是严格意义上的有序),分区中的每一个Record都被分配了唯一的序列编号称为是 offset,Kafka 集群会持久化所有发布到Topic中的Record信息,改Record的持久化时间是通过配置文件指定,默认是168小时(7天)——还可以设置一些其它的日志清除策略(时间、日志大小、segment大小)。
############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
Log Deletion(日志删除),Kafka底层会定期的检查日志文件,然后将过期的数据从log中移除(.delete),由于Kafka使用硬盘存储日志文件,因此使用Kafka长时间缓存一些日志文件是不存在问题的。
Log Compaction(日志合并),如果在一些key-value数据中,一个key可以对应多个不同版本的value,经过日志合并,就会只保留最新的一个版本。
Kafka中对Topic实现日志分区的有以下目的:
-
首先,它们允许日志扩展到超出单个服务器所能容纳的大小。每个单独的分区都必须适合托管它的服务器,但是一个Topic可能有很多分区,因此它可以处理任意数量的数据。
-
其次每个服务器充当其某些分区的Leader,也可能充当其他分区的Follwer,因此群集中的负载得到了很好的平衡。
在消费者消费Topic中数据的时候,每个消费者会维护本次消费对应分区的偏移量——offset,消费者会在消费完一个批次的数据之后,会将本次消费的偏移量提交给Kafka集群(其实Kafka消费者真正提交的是下一次读取的偏移量位置),因此对于每个消费者而言可以随意的控制改消费者的偏移量。
因此在Kafka中,消费者可以从一个topic分区中的任意位置读取队列数据,由于每个消费者控制了自己的消费的偏移量,因此多个消费者之间彼此相互独立。
4_生产者&消费者组
消费者使用Consumer Group名称标记自己,并且发布到Topic的每条记录都会传递到每个订阅Consumer Group中的一个消费者实例。
如果所有Consumer实例都具有相同的Consumer Group,那么Topic中的记录会在改ConsumerGroup中的Consumer实例进行均分消费;如果所有Consumer实例具有不同的ConsumerGroup,则每条记录将广播到所有Consumer Group进程。
更常见的是,我们发现Topic具有少量的 Consumer Group,每个Consumer Group可以理解为一个 “逻辑的订阅者”。
每个Consumer Group均由许多Consumer实例组成,以实现可伸缩性和容错能力。这无非就是发布—订阅模型,其中订阅者是消费者的集群而不是单个进程。
这种消费方式Kafka会将Topic 按照分区的方式均分 给一个Consumer Group下的实例,如果ConsumerGroup下有新的成员介入,则新介入的Consumer实例会去接管ConsumerGroup内其他消费者负责的某些分区,同样如果一下ConsumerGroup下的有其他Consumer实例宕机,则改由 ConsumerGroup 其他实例接管。如果消费组中的消费者数量大于分区数量,多余的消费者将处于空闲状态——如果其他实例宕机,将会接管工作。
由于Kafka的Topic的分区策略,因此Kafka仅提供分区中记录的有序性,也就意味着相同Topic的不同分区记录之间无顺序。
因为针对于绝大多数的大数据应用和使用场景, 使用分区内部有序或者使用key进行分区策略已经足够满足绝大多数应用场景。
但是,如果您需要记录全局有序,则可以通过只有一个分区Topic来实现,尽管这将意味着每个ConsumerGroup只有一个Consumer进程。
5_核心概念总结
Broker(代理):
- Kafka 集群由多个 Broker 组成。每个 Broker 是 Kafka 的一个实例,负责存储数据和处理读写请求。Kafka 的高可用性和可扩展性来自于多个 Broker 的协同工作。
Topic(主题):
- Topic 是 Kafka 中的数据分类方式。数据在 Kafka 中是以 Topic 为单位进行组织的。生产者将数据发送到 Topic,消费者则从 Topic 中读取数据。Topic 可以被划分为多个分区(Partition),每个分区是一个有序的、不可变的消息日志。
Partition(分区):
- Topic 可以分为多个分区,每个分区都是一个独立的日志文件。分区使得 Kafka 可以在多个 Broker 之间分散数据,提高吞吐量和并发处理能力。每个分区的消息是有序的,但不同分区之间没有全局顺序 ——可以只使用一个分区(不分区)来保证全局 FIFO。
Producer(生产者):
- 生产者是向 Kafka 发送数据的客户端。生产者将消息写入到特定的 Topic,Kafka 会根据 Topic 的分区策略将消息分发到对应的分区中。
Consumer(消费者):
- 消费者从 Kafka 中读取消息。消费者可以是单独的客户端,也可以是多个消费者组成的消费者组。Kafka 保证消费者组内的每个消费者从 Topic 的不同分区读取数据,从而实现负载均衡。
Zookeeper(扎克斯):
- Zookeeper 是 Kafka 用于集群管理和协调的工具。它负责 Broker 的元数据管理、选举和状态监控等任务。Kafka 集群的元数据和配置信息都保存在 Zookeeper 中。
Offset(偏移量):
- 偏移量是 Kafka 中消息的唯一标识,用于追踪消费者的读取进度。每个分区中的消息都有一个唯一的偏移量,消费者可以根据偏移量来获取消息
6_顺写&mmap
Kafka的特性之一就是高吞吐率,但是Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,但是Kafka即使是普通的服务器,Kafka也可以轻松支持每秒百万级的写入请求,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。Kafka会把收到的消息都写入到硬盘中,防止丢失数据。为了优化写入速度Kafka采用了两个技术——顺序写入和MMFile 。
因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最讨厌随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。这样省去了大量的内存开销以及节省了IO寻址的时间。但是单纯的使用顺序写入,Kafka的写入性能也不可能和内存进行对比,因此Kafka的数据并不是实时的写入硬盘中 。
Kafka充分利用了现代操作系统分页存储来利用内存提高I/O效率。Memory Mapped Files(后面简称mmap
)也称为内存映射文件,在64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page实现文件到物理内存的直接映射。完成MMP映射后,用户对内存的所有操作会被操作系统自动的刷新到磁盘上,极大地降低了IO使用率。
既然生产者写入数据得到了优化,那么数据的读取呢?
Kafka服务器在响应客户端读取的时候,底层使用ZeroCopy技术,直接将磁盘无需拷贝到用户空间,而是直接将数据通过内核空间传递输出,数据并没有抵达用户空间 —— 以操作系统为界限。
传统IO操作:
- 用户进程调用read等系统调用向操作系统发出IO请求,请求读取数据到自己的内存缓冲区中。自己进入阻塞状态。
- 操作系统收到请求后,进一步将IO请求发送磁盘。
- 磁盘驱动器收到内核的IO请求,把数据从磁盘读取到驱动器的缓冲中。此时不占用CPU。当驱动器的缓冲区被读满后,向内核发起中断信号告知自己缓冲区已满。
- 内核收到中断,使用CPU时间将磁盘驱动器的缓存中的数据拷贝到内核缓冲区中。
- 如果内核缓冲区的数据少于用户申请的读的数据,重复步骤3、步骤4,直到内核缓冲区的数据足够多为止。
- 将数据从内核缓冲区拷贝到用户缓冲区,同时从系统调用中返回。完成任务
DMA读取(直接存储器访问——内存读取的协处理器):
- 用户进程调用read等系统调用向操作系统发出IO请求,请求读取数据到自己的内存缓冲区中。自己进入阻塞状态。
- 操作系统收到请求后,进一步将IO请求发送DMA。然后让CPU干别的活去。
- DMA进一步将IO请求发送给磁盘。
- 磁盘驱动器收到DMA的IO请求,把数据从磁盘读取到驱动器的缓冲中。当驱动器的缓冲区被读满后,向DMA发起中断信号告知自己缓冲区已满。
- DMA收到磁盘驱动器的信号,将磁盘驱动器的缓存中的数据拷贝到内核缓冲区中。此时不占用CPU。这个时候只要内核缓冲区的数据少于用户申请的读的数据,内核就会一直重复步骤3跟步骤4,直到内核缓冲区的数据足够多为止。
- 当DMA读取了足够多的数据,就会发送中断信号给CPU。
- CPU收到DMA的信号,知道数据已经准备好,于是将数据从内核拷贝到用户空间,系统调用返回。
跟IO中断模式相比,DMA模式下,DMA就是CPU的一个代理,它负责了一部分的拷贝工作,从而减轻了CPU的负担。DMA的优点就是:中断少,CPU负担低。
常规IO与 ZeroCopy 在网络环境下的工作:
常规IO:
ZeroCopy:
总结:
一般方案:
- 文件在磁盘中数据被copy到内核缓冲区
- 从内核缓冲区copy到用户缓冲区
- 用户缓冲区copy到内核与socket相关的缓冲区。
- 数据从socket缓冲区copy到相关协议引擎发送出去
Zero拷贝:
- 文件在磁盘中数据被copy到内核缓冲区
- 从内核缓冲区copy到内核与socket相关的缓冲区。
- 数据从socket缓冲区copy到相关协议引擎发送出去
7_Kafka的数据存储形式
我们上面说过 Kafka 分区中的数据内部有序,且是以日志的形式存储的,但是具体是怎么样的呢?
- 一个topic由多个分区组成
- 一个分区(partition)由多个segment(段)组成
- 一个segment(段)由多个文件组成(log、index、timeindex)
综上,其实一个分区对应多个.log
日志文件,而且是按照一个个文件块进行存储的——每个块、块和块之间都是有序的。
接下来,我们来看一下Kafka中的数据到底是如何在磁盘中存储的。
- Kafka中的数据是保存在
log.dirs
配置参数指定的数据目录中 - 消息是保存在以:「主题名-分区ID」的文件夹中的
- 数据文件夹中包含以下内容:
这些分别对应:
文件名 | 说明 |
---|---|
00000000000000000000.index | 索引文件,根据offset查找数据就是通过该索引文件来操作的 |
00000000000000000000.log | 日志数据文件 |
00000000000000000000.timeindex | 时间索引 |
leader-epoch-checkpoint | 持久化每个partition leader对应的LEO (log end offset、日志文件中下一条待写入消息的offset) |
- 每个日志文件的文件名为起始偏移量,因为每个分区的起始偏移量是0,所以,分区的日志文件都以0000000000000000000.log开始
- 默认的每个日志文件最大为「log.segment.bytes =102410241024」1G
- 为了简化根据offset查找消息,Kafka日志文件名设计为开始的偏移量
写入消息:
- 新的消息总是写入到最后的一个日志文件中
- 该文件如果到达指定的大小(默认为:1GB)时,将滚动到一个新的文件中
读取消息:
- 根据「offset」首先需要找到存储数据的 segment 段(注意:offset指定分区的全局偏移量)
- 然后根据这个「全局分区offset」找到相对于文件的「segment段offset」
- 最后再根据 「segment段offset」读取消息
- 为了提高查询效率,每个文件都会维护对应的范围内存,查找的时候就是使用简单的二分查找
删除消息:
- 在Kafka中,消息是会被定期清理的。一次删除一个segment段的日志文件
- Kafka的日志管理器,会根据Kafka的配置,来决定哪些文件可以被删除
2、Kafka的数据同步机制
Kafka的Topic被分为多个分区,分区是是按照Segments存储文件块。分区日志是存储在磁盘上的日志序列,Kafka可以保证分区里的事件是有序的。
其中Leader负责对应分区的读写、Follower负责同步分区的数据,0.11 版本之前Kafka使用highwatermarker
机制保证数据的同步,但是基于highwatermarker的同步数据可能会导致数据的不一致或者是乱序。在Kafka数据同步有以下概念。
-
LEO:log end offset 标识的是每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的 LEO.
-
HW: high watermarker称为高水位线,所有HW之前的的数据都理解是已经备份的,当所有节点都备份成功,Leader会更新水位线。
-
ISR:In Sync Replicas,正在同步的副本(可以理解为当前有几个follower是存活的)。kafka的leader会维护一份处于同步的副本集和,如果在
replica.lag.time.max.ms
时间内系统没有发送fetch请求,或者已然在发送请求,但是在该限定时间内没有赶上Leader的数据就被剔除ISR列表。在Kafka-0.9.0版本剔除replica.lag.max.messages
消息个数限定,因为这个会导致其他的Broker节点频繁的加入和退出ISR。
AR\ISR\OSR
-
AR:表示一个topic下的所有副本
-
OSR:Out of Sync Replicas,不再同步的副本
-
AR = ISR + OSR
1_高水位(High Watermark)
水位一词多用于流式处理领域,比如,Spark Streaming 或 Flink 框架中都有水位的概念。教科书中关于水位的经典定义通常是这样的:
在时刻 T,任意创建时间(Event Time)为 T’,且 T’≤T 的所有事件都已经到达或被观测到,那么 T 就被定义为水位。
具体如下图所示:
图中标注“Completed”的蓝色部分代表已完成的工作,标注“In-Flight”的红色部分代表正在进行中的工作,两者的边界就是水位线。
在 Kafka 的世界中,水位的概念有一点不同,它是用消息位移来表征的。
作用:
在 Kafka 中,高水位的作用主要有 2 个。
- 1)定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。
- 2)帮助 Kafka 完成副本同步。
我们假设这是某个分区 Leader 副本的高水位图。首先,请你注意图中的“已提交消息”和“未提交消息”。
在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。消费者只能消费已提交消息,即图中位移小于 8 的所有消息。
注意,这里我们不讨论 Kafka 事务,因为事务机制会影响消费者所能看到的消息的范围,它不只是简单依赖高水位来判断。它依靠一个名为 LSO(Log Stable Offset)的位移值来判断事务型消费者的可见性。
2_LEO
图中还有一个日志末端位移的概念,即 Log End Offset,简写是 LEO。它表示副本写入下一条消息的位移值。
注意,数字 15 所在的方框是虚线,这就说明,这个副本当前只有 15 条消息,位移值是从 0 到 14,下一条新消息的位移是 15
高水位和 LEO 是副本对象的两个重要属性。 Kafka 所有副本都有对应的高水位和 LEO 值,而不仅仅是 Leader 副本。只不过 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位
3_高水位更新机制
实际上,除了保存一组高水位值和 LEO 值=之外,在 Leader 副本所在的 Broker 上,还保存了其他 Follower 副本(也称为远程副本)的 LEO 值。
在这张图中,我们可以看到,Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。
为什么要在 Broker 0 上保存这些远程副本呢?
其实,它们的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位。
更新机制如下表:
更新对象 | 更新时机 |
---|---|
Broker 1 上的 Follow 副本 LEO | Follower 副本从 Leader 副本拉取消息,写入到本地磁盘后,会更新其 LEO 值。 |
Broker 0 上Leader 副本 LEO | Leader副本接收到生产者发送的消息,写入到本地磁盘后,会更新其LEO值。 |
Broker 0 上远程副本 LEO | Follower副本从eader副本拉取消息时,会告诉L eader副本从哪个位移处开始拉取。L eader副本会使用这个位移值来更新远程副本的L EO。 |
Broker 1 上Follower副本高水位 | Follower副本成功更新完LEO之后,会比较其LEO值与Leader副本发来的高水位值,并用两者的较小值去更新它自己的高水位。 |
Broker 0上Leader副本高水位 | 主要有两个更新时机: 一个是Leader副本更新其LEO之后;另一个是更新完远程副本LEO之后。具体的算法是:取 Leader副本和所有与Leader同步的远程副本LEO中的最小值 |
Leader 副本
处理生产者请求的逻辑如下:
- 1)写入消息到本地磁盘。
- 2)更新分区高水位值。
- 获取 Leader 副本所在 Broker 端保存的所有远程副本 LEO 值(LEO-1,LEO-2,……,LEO-n)。
- 获取 Leader 副本高水位值:currentHW。
- 更新 currentHW = max{currentHW, min(LEO-1, LEO-2, ……,LEO-n)}。
处理 Follower 副本拉取消息的逻辑如下:
- 1)读取磁盘(或页缓存)中的消息数据。
- 2)使用 Follower 副本发送请求中的位移值更新远程副本 LEO 值。
- 3)更新分区高水位值(具体步骤与处理生产者请求的步骤相同)。
Follower 副本
从 Leader 拉取消息的处理逻辑如下:
- 1)写入消息到本地磁盘。
- 2)更新 LEO 值。
- 3)更新高水位值。
- 获取 Leader 发送的高水位值:currentHW。
- 获取步骤 2 中更新过的 LEO 值:currentLEO。
- 更新高水位为 min(currentHW, currentLEO)。
4_副本同步机制解析
首先是初始状态。下面这张图中的 remote LEO 就是刚才的远程副本的 LEO 值。在初始状态时,所有值都是 0。
当生产者给主题分区发送一条消息后,状态变更为:
此时,Leader 副本成功将消息写入了本地磁盘,故 LEO 值被更新为 1。
Follower 再次尝试从 Leader 拉取消息。和之前不同的是,这次有消息可以拉取了,因此状态进一步变更为:
这时,Follower 副本也成功地更新 LEO 为 1。此时,Leader 和 Follower 副本的 LEO 都是 1,但各自的高水位依然是 0,还没有被更新。它们需要在下一轮的拉取中被更新,如下图所示:
在新一轮的拉取请求中,由于位移值是 0 的消息已经拉取成功,因此 Follower 副本这次请求拉取的是位移值 =1 的消息。Leader 副本接收到此请求后,更新远程副本 LEO 为 1,然后更新 Leader 高水位为 1。做完这些之后,它会将当前已更新过的高水位值 1 发送给 Follower 副本。Follower 副本接收到以后,也将自己的高水位值更新成 1。至此,一次完整的消息同步周期就结束了。事实上,Kafka 就是利用这样的机制,实现了 Leader 和 Follower 副本之间的同步
5_消息丢失问题
从刚才的分析中,我们知道,Follower 副本的高水位更新需要一轮额外的拉取请求才能实现。如果把上面那个例子扩展到多个 Follower 副本,情况可能更糟,也许需要多轮拉取请求。也就是说,Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。
开始时,副本 A 和副本 B 都处于正常状态,A 是 Leader 副本。某个使用了默认 acks 设置的生产者程序向 A 发送了两条消息,A 全部写入成功,此时 Kafka 会通知生产者说两条消息全部发送成功。
现在我们假设 Leader 和 Follower 都写入了这两条消息,而且 Leader 副本的高水位也已经更新了,但 Follower 副本高水位还未更新——这是可能出现的。还记得吧,Follower 端高水位的更新与 Leader 端有时间错配。倘若此时副本 B 所在的 Broker 宕机,当它重启回来后,副本 B 会执行日志截断操作,将 LEO 值调整为之前的高水位值,也就是 1。这就是说,位移值为 1 的那条消息被副本 B 从磁盘中删除,此时副本 B 的底层磁盘文件中只保存有 1 条消息,即位移值为 0 的那条消息。
当执行完截断操作后,副本 B 开始从 A 拉取消息,执行正常的消息同步。如果就在这个节骨眼上,副本 A 所在的 Broker 宕机了,那么 Kafka 就别无选择,只能让副本 B 成为新的 Leader,此时,当 A 回来后,需要执行相同的日志截断操作,即将高水位调整为与 B 相同的值,也就是 1。这样操作之后,位移值为 1 的那条消息就从这两个副本中被永远地抹掉了。
6_数据不一致问题
依赖HW的概念实现数据同步,还存在数据不一致的问题。如下图:
我们还是使用与上面相似的开始情况进行举例。开始时,副本 A 和副本 B 都处于正常状态,A 是 Leader 副本。某个使用了默认 acks 设置的生产者程序向 A 发送了两条消息,A 全部写入成功,此时 Kafka 会通知生产者说两条消息全部发送成功。
我们假设 Leader 和 Follower 都写入了这两条消息,而且 Leader 副本的高水位也已经更新了,但 Follower 副本高水位(HW)还未更新。副本A的HW在 1 的位置,B 在 0 的位置。
就在此时,A、B同时宕机,并且副本 B先启动成功,执行日志截断操作并成为新的 Leader——副本B的HW为0。
副本B成为新的 Leader 后接收到了一条新的请求写入 3,并更新水位线完成,此时 B的HW变为1,并且副本A还在故障中。
副本A故障恢复后发现自己与 Leader (副本额B)高水位线一致,数据同步完成。最终,数据不一致问题产生。
6_Leader Epoch 机制
社区在 0.11 版本正式引入了 Leader Epoch 概念,来规避因高水位更新错配导致的各种不一致问题。
所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。
- 1)Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
- 2)起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。
任意一个Leader持有一个LeaderEpoch。该LeaderEpoch这是一个由Controller管理的32位数字,存储在Zookeeper的分区状态信息中,并作为LeaderAndIsrRequest的一部分传递给每个新的Leader。Leader接受Producer请求数据上使用LeaderEpoch标记每个Message。然后,该LeaderEpoch编号将通过复制协议传播,并用于替换HW标记,作为消息截断的参考点。
场景和之前大致是类似的,只不过引用 Leader Epoch 机制后,Follower 副本 B 重启回来后,需要向 A 发送一个特殊的请求去获取 Leader 的 LEO 值。在这个例子中,该值为 2。当获知到 Leader LEO=2 后,B 发现该 LEO 值不比它自己的 LEO 值小,而且缓存中也没有保存任何起始位移值 > 2 的 Epoch 条目,因此 B 无需执行任何日志截断操作。这是对高水位机制的一个明显改进,即副本是否执行日志截断不再依赖于高水位进行判断。
现在,副本 A 宕机了,B 成为 Leader。同样地,当 A 重启回来后,执行与 B 相同的逻辑判断,发现也不用执行日志截断,至此位移值为 1 的那条消息在两个副本中均得到保留。后面当生产者程序向 B 写入新消息时,副本 B 所在的 Broker 缓存中,会生成新的 Leader Epoch 条目:[Epoch=1, Offset=2]。之后,副本 B 会使用这个条目帮助判断后续是否执行日志截断操作。
这样,通过 Leader Epoch 机制,Kafka 完美地规避了这种数据丢失场景。
数据不一致的问题也类似,当副本B成功启动时,经过 Leader Epoch 地判断不再进行数据的截断。
3、总结
Apache Kafka 是一个强大的分布式流处理平台,凭借其高吞吐量、低延迟和高可扩展性,在大数据处理和实时数据流领域中发挥了重要作用。无论是日志收集、数据集成还是实时流处理,Kafka 都提供了可靠、高效的解决方案。了解 Kafka 的核心概念和工作原理,对于构建现代化的数据处理架构和实现高效的实时数据流管理至关重要。
参考:https://www.lixueduan.com/posts/kafka/12-hw-leader-epoch/。
多种消息队列对比——截至当前2024:
特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
---|---|---|---|---|
所属社区/公司 | Apache | Mozilla Public License | Apache | Apache/Ali |
协议支持 | OpenWire,STOMP,REST,XMPP,AMQP | AMQP,XMPP,SMTP,STOMP | 自定义协议 | 自定义协议 |
开发语言 | java | Erlang | Scala+Java | java |
成熟度 | 成熟 | 成熟 | 成熟 | 成熟 |
生产者-消费者模式 | 支持 | 支持 | 支持 | 支持 |
发布-订阅 | 支持 | 支持 | 支持 | 支持 |
REQUEST-REPLY | 支持 | 支持 | - | 支持 |
API完备性 | 高 | 高 | 高 | 低(静态配置) |
多语言支持 | 支持JAVA优先 | 语言无关 | 支持,JAVA优先 | 支持 |
单机呑吐量 | 万级(最差) | 万级 | 十万级(非常高) | 十万级(高) |
消息延迟 | 毫秒级 | 微秒级 | 毫秒以内 | 毫秒级 |
可用性 | 一般(主从) | 高(主从) | 非常高(分布式) | 高 |
消息丢失 | - | 低 | 理论上不会丢失 | - |
消息重复 | - | 可控制 | 理论上会有重复 | - |
事务 | 支持 | AMQP 支持 | 支持 | 支持 |
文档的完备性 | 高 | 高 | 高 | 一般 |
提供快速入门 | 有 | 有 | 有 | 有 |
首次部署难度 | - | 低 | 中 | 高 |
系统场景 | 传统企业应用(如 ERP、CRM) JMS 兼容系统 | 网站通知系统 任务队列系统 微服务通信系统 | 大数据处理平台(如 Hadoop、Spark) 流处理平台(如 Flink、Storm) 日志收集系统(如 ELK) | 电商系统,金融系统,物流系统 |