Kafka 3.x.x 入门到精通(04)——对标尚硅谷Kafka教程
- 2. Kafka基础
- 2.1 集群部署
- 2.2 集群启动
- 2.3 创建主题
- 2.4 生产消息
- 2.5 存储消息
- 2.5.1 存储组件
- 2.5.2 数据存储
- 2.5.2.1 ACKS校验
- 2.5.2.2 内部主题校验
- 2.5.2.3 ACKS应答及副本数量关系校验
- 2.5.2.4 日志文件滚动判断
- 2.5.2.5 请求数据重复性校验
- 2.5.2.6 请求数据序列号校验
- 2.5.2.7 数据存储
- 2.5.3 存储文件格式
- 2.5.3.1 数据日志文件
- 2.5.3.1.1 批次头
- 2.5.3.1.2 数据体
- 2.5.3.1.3 数据含义
- 2.5.3.2 数据索引文件
- 2.5.3.3 数据时间索引文件
- 2.5.3.4 查看文件内容
- 2.5.4 数据刷写
- 2.5.5 副本同步
- 2.5.6.1 启动数据同步线程
- 2.5.6.2 生成数据同步请求
- 2.5.6.3 处理数据响应
- 2.5.6.4 更新数据偏移量⚠️
- 2.5.6.4.1 Offset
- 2.5.6.4.2 LSO
- 2.5.6.4.3 LEO
- 2.5.6.4.1 HW
- 2.5.6 数据一致性
- 2.5.6.1 数据一致性
- 2.5.6.2 HW在副本之间的传递
- 2.5.6.3 ISR(In-Sync Replicas)伸缩
本文档参看的视频是:
- 尚硅谷Kafka教程,2024新版kafka视频,零基础入门到实战
- 黑马程序员Kafka视频教程,大数据企业级消息队列kafka入门到精通
- 小朋友也可以懂的Kafka入门教程,还不快来学
本文档参看的文档是:
- 尚硅谷官方文档,并在基础上修改 完善!非常感谢尚硅谷团队!!!!
在这之前大家可以看我以下几篇文章,循序渐进:
❤️Kafka 3.x.x 入门到精通(01)——对标尚硅谷Kafka教程
❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程
❤️Kafka 3.x.x 入门到精通(03)——对标尚硅谷Kafka教程
2. Kafka基础
2.1 集群部署
❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程
2.2 集群启动
❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程
2.3 创建主题
❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程
2.4 生产消息
❤️Kafka 3.x.x 入门到精通(03)——对标尚硅谷Kafka教程
2.5 存储消息
数据已经由生产者Producer发送给Kafka集群,当Kafka接收到数据后,会将数据写入本地文件中。
2.5.1 存储组件
-
KafkaApis : Kafka应用接口组件,当Kafka Producer向Kafka Broker发送数据请求后,Kafka Broker接收请求,会使用Apis组件进行请求类型的判断,然后选择相应的方法进行处理。
-
ReplicaManager : 副本管理器组件,用于提供主题副本的相关功能,在数据的存储前进行ACK校验和事务检查,并提供数据请求的响应处理
-
Partition : 分区对象,主要包含分区状态变换的监控,分区上下线的处理等功能,在数据存储是主要用于对分区副本数量的相关校验,并提供追加数据的功能
-
UnifiedLog : 同一日志管理组件,用于管理数据日志文件的新增,删除等功能,并提供数据日志文件偏移量的相关处理。
-
LocalLog : 本地日志组件,管理整个分区副本的数据日志文件。假设当前主题分区中有3个日志文件,那么3个文件都会在组件中进行管理和操作。
-
LogSegment : 文件段组件,对应具体的某一个数据日志文件,假设当前主题分区中有3个日志文件,那么3个文件每一个都会对应一个LogSegment组件,并打开文件的数据管道FileChannel。数据存储时,就是采用组件中的FileChannel实现日志数据的追加
-
LogConfig: 日志配置对象,常用的数据存储配置
参数名 | 参数作用 | 类型 | 默认值 | 推荐值 |
---|---|---|---|---|
min.insync.replicas | 最小同步副本数量 | 推荐 | 1 | 2 |
log.segment.bytes | 文件段字节数据大小限制 | 可选 | 1 G = 1024 ∗ 1024 ∗ 1024 b y t e 1G = 1024*1024*1024 byte 1G=1024∗1024∗1024byte | |
log.roll.hours | 文件段强制滚动时间阈值 | 可选 | 7天 =24 * 7 * 60 * 60 * 1000L ms | |
log.flush.interval.messages | 满足刷写日志文件的数据条数 | 可选 | Long.MaxValue | 不推荐 |
log.flush.interval.ms | 满足刷写日志文件的时间周期 | 可选 | Long.MaxValue | 不推荐 |
log.index.interval.bytes | 刷写索引文件的字节数 | 可选 | 4 * 1024 | |
replica.lag.time.max.ms | 副本延迟同步时间 | 可选 | 30s |
2.5.2 数据存储
Kafka Broker节点从获取到生产者的数据请求到数据存储到文件的过程相对比较简单,只是中间会进行一些基本的数据检查和校验。所以接下来我们就将数据存储的基本流程介绍一下:
2.5.2.1 ACKS校验
Producer将数据发送给Kafka Broker时,会告知Broker当前生产者的数据生产场景,从而要求Kafka对数据请求进行应答响应确认数据的接收情况,Producer获取应答后可以进行后续的处理。这个数据生产场景主要考虑的就是数据的可靠性和数据发送的吞吐量。由此,Kafka将生产场景划分为3种不同的场景:
- ACKS = 0: Producer端将数据发送到网络输出流中,此时Kafka就会进行响应。在这个场景中,数据的应答是非常快的,但是因为仅仅将数据发送到网络输出流中,所以是无法保证kafka broker节点能够接收到消息,假设此时网络出现抖动不稳定导致数据丢失,而由于Kafka已经做出了确认收到的应答,所以此时Producer端就不会再次发送数据,而导致数据真正地丢失了。所以此种场景,数据的发送是不可靠的。
- ACKS = 1: Producer端将数据发送到Broker中,并保存到当前节点的数据日志文件中,Kafka就会进行确认收到数据的响应。因为数据已经保存到了文件中,也就是进行了持久化,那么相对于ACKS=0,数据就更加可靠。但是也要注意,因为Kafka是分布式的,所以集群的运行和管理是非常复杂的,难免当前Broker节点出现问题而宕掉,那么此时,消费者就消费不到我们存储的数据了,此时,数据我们还是会认为丢失了。
- ACKS = -1(all): Kafka在管理分区时,会了数据的可靠性和更高的吞吐量,提供了多个副本,而多个副本之间,会选出一个副本作为数据的读写副本,称之为Leader领导副本,而其他副本称之Follower追随副本。普通场景中,所有的这些节点都是需要保存数据的。而Kafka会优先将Leader副本的数据进行保存,保存成功后,再由Follower副本向Leader副本拉取数据,进行数据同步。一旦所有的这些副本数据同步完毕后,Kafka再对Producer进行收到数据的确认。此时ACKS应答就是-1(all)。明显此种场景,多个副本本地文件都保存了数据,那么数据就更加可靠,但是相对,应答时间更长,导致Kafka吞吐量降低。
基于上面的三种生产数据的场景,在存储数据前,需要校验生产者需要的应答场景是否合法有效。
2.5.2.2 内部主题校验
Producer向Kafka Broker发送数据时,是必须指定主题Topic的,但是这个主题的名称不能是kafka的内部主题名称。Kafka为了管理的需要,创建了2个内部主题,一个是用于事务处理的__transaction_state
内部主题,还有一个是用于处理消费者偏移量的__consumer_offsets
内部主题。生产者是无法对这两个主题生产数据的,所以在存储数据之前,需要对主题名称进行校验有效性校验。
2.5.2.3 ACKS应答及副本数量关系校验
Kafka为了数据可靠性更高一些,需要分区的所有副本都能够存储数据,但是分布式环境中难免会出现某个副本节点出现故障,暂时不能同步数据。在Kafka中,能够进行数据同步的所有副本,我们称之为In Sync Replicas
,简称ISR列表。
当生产者Producer要求的数据ACKS应答为 -1 的时候,那么就必须保证能够同步数据的所有副本能够将数据保存成功后,再进行数据的确认应答。但是一种特殊情况就是,如果当前ISR列表中只有一个Broker存在,那么此时只要这一个Broker数据保存成功了,那么就产生确认应答了,数据依然是不可靠的,那么就失去了设置ACK=all的意义了,所以此时还需要对ISR列表中的副本数量进行约束,至少不能少于2个。这个数量是可以通过配置文件配置的。参数名为:min.insync.replicas
。默认值为1(不推荐)
所以存储数据前,也需要对ACK应答和最小分区副本数量的关系进行校验。
2.5.2.4 日志文件滚动判断
数据存储到文件中,如果数据文件太大,对于查询性能是会有很大影响的,所以副本数据文件并不是一个完整的大的数据文件,而是根据某些条件分成很多的小文件,每个小文件我们称之为文件段。其中的一个条件就是文件大小,参数名为:log.segment.bytes
。默认值为1G。如果当前日志段剩余容量可能无法容纳新消息集合,因此有必要创建一个新的日志段来保存待写入的所有消息。此时日志文件就需要滚动生产新的。
除了文件大小外,还有时间间隔,如果文件段第一批数据有时间戳,那么当前批次数据的时间戳和第一批数据的时间戳间隔大于滚动阈值,那么日志文件也会滚动生产新的。如果文件段第一批数据没有时间戳,那么就用当前时间戳和文件创建时间戳进行比对,如果大于滚动阈值,那么日志文件也会滚动生产新的。这个阈值参数名为:log.roll.hours
,默认为7天。如果时间到达,但是文件不满1G,依然会滚动生产新的数据文件。
如果索引文件或时间索引文件满了,或者索引文件无法存放当前索引数据了,那么日志文件也会滚动生产新的。
基于以上的原则,需要在保存数据前进行判断。
2.5.2.5 请求数据重复性校验
因为Kafka允许生产者进行数据重试操作,所以因为一些特殊的情况,就会导致数据请求被Kafka重复获取导致数据重复,所以为了数据的幂等性操作,需要在Broker端对数据进行重复性校验。这里的重复性校验只能对同一个主题分区的5个在途请求中数据进行校验,所以需要在生产者端进行相关配置。
2.5.2.6 请求数据序列号校验
因为Kafka允许生产者进行数据重试操作,所以因为一些特殊的情况,就会导致数据请求被Kafka重复获取导致数据顺序发生改变从而引起数据乱序。为了防止数据乱序,需要在Broker端对数据的序列号进行连续性(插入数据序列号和Broker缓冲的最后一个数据的序列号差值为1)校验。
2.5.2.7 数据存储
将数据通过LogSegment中FileChannel对象。将数据写入日志文件,写入完成后,更新当前日志文件的数据偏移量。
2.5.3 存储文件格式
我们已经将数据存储到了日志文件中,当然除了日志文件还有其他的一些文件,所以接下来我们就了解一下这些文件:
2.5.3.1 数据日志文件
Kafka系统早期设计的目的就是日志数据的采集和传输,所以数据是使用log文件进行保存的。我们所说的数据文件就是以.log
作为扩展名的日志文件。文件名长度为20位长度
的数字字符串,数字含义为当前日志文件的第一批数据的基础偏移量,也就是文件中保存的第一条数据偏移量。字符串数字位数不够的,前面补0。
我们的常规数据主要分为两部分:批次头 + 数据体
2.5.3.1.1 批次头
数据项 | 含义 | 长度 |
---|---|---|
BASE_OFFSET_OFFSET | 基础偏移量偏移量 | 8 |
LENGTH_OFFSET | 长度偏移量 | 4 |
PARTITION_LEADER_EPOCH_OFFSET | Leaader分区纪元偏移量 | 4 |
MAGIC_OFFSET | 魔数偏移量 | 1 |
ATTRIBUTES_OFFSET | 属性偏移量 | 2 |
BASE_TIMESTAMP_OFFSET | 基础时间戳偏移量 | 8 |
MAX_TIMESTAMP_OFFSET | 最大时间戳偏移量 | 8 |
LAST_OFFSET_DELTA_OFFSET | 最后偏移量偏移量 | 4 |
PRODUCER_ID_OFFSET | 生产者ID偏移量 | 8 |
PRODUCER_EPOCH_OFFSET | 生产者纪元偏移量 | 2 |
BASE_SEQUENCE_OFFSET | 基础序列号偏移量 | 4 |
RECORDS_COUNT_OFFSET | 记录数量偏移量 | 4 |
CRC_OFFSET | CRC校验偏移量 | 4 |
批次头总的字节数为:61 byte
2.5.3.1.2 数据体
数据项 | 含义 | 长度 |
---|---|---|
size | 固定值 | 1 |
offsetDelta | 固定值 | 1 |
timestampDelta | 时间戳 | 1 |
keySize | Key字节长度 | 1(动态) |
keySize(Varint) | Key变量压缩长度算法需要大小 | 1(动态) |
valueSize | value字节长度 | 1(动态) |
valueSize(Varint) | Value变量压缩长度算法需要大小 | 1(动态) |
Headers | 数组固定长度 | 1(动态) |
sizeInBytes | 上面长度之和的压缩长度算法需要大小 | 1 |
表中的后5个值为动态值,需要根据数据的中key,value变化计算得到。此处以数据key=key1,value=value1为例。
# 压缩长度算法:
中间值1 = (算法参数 << 1) ^ (算法参数 >> 31));
中间值2 = Integer.numberOfLeadingZeros(中间值1);
结果 = (38 - 中间值2) / 7 + 中间值2 / 32;假设当前key为:key1,调用算法时,参数为key.length = 4
中间值1 = (4<<1) ^ (4>>31) = 8
中间值2 = Integer.numberOfLeadingZeros(8) = 28
结果 = (38-28)/7 + 28/32 = 1 + 0 = 1
所以如果key取值为key1,那么key的变长长度就是1
按照上面的计算公式可以计算出,如果我们发送的数据是一条为(key1,value1)的数据, 那么Kafka当前会向日志文件增加的数据大小为:
# 追加数据字节计算
批次头 = 61
数据体 = 1 + 1 + 1 + 4 + 1 + 6 + 1 + 1 + 1 = 17
总的字节大小为61 + 17 = 78
如果我们发送的数据是两条为(key1,value1),(key2,value2)的数据, 那么Kafka当前会向日志文件增加的数据大小为:
# 追加数据字节计算
第一条数据:
批次头 = 61
数据体 = 1 + 1 + 1 + 4 + 1 + 6 + 1 + 1 + 1 = 17
第二条数据:
# 因为字节少,没有满足批次要求,所以两条数据是在一批中的,那么批次头不会重新计算,直接增加数据体即可
数据体 = 1 + 1 + 1 + 4 + 1 + 6 + 1 + 1 + 1 = 17
总的字节大小为61 + 17 + 17 = 95
2.5.3.1.3 数据含义
数据项 | 含义 |
---|---|
baseOffset | 当前batch中第一条消息的位移 |
lastOffset | 最新消息的位移相对于第一条消息的唯一增量 |
count | 当前batch有的数据数量,kafka在进行消息遍历的时候,可以通过该字段快速的跳跃到下一个batch进行数据读取 |
partitionLeaderEpoch | 记录了当前消息所在分区的 leader 的服务器版本(纪元),主要用于进行一些数据版本的校验和转换工作 |
crc | 当前整个batch的数据crc校验码,主要用于对数据进行差错校验的 |
compresscode | 数据压缩格式,主要有GZIP 、LZ4 、Snappy 、zstd 四种 |
baseSequence | 当前批次中的基础序列号 |
lastSequence | 当前批次中的最后一个序列号 |
producerId | 生产者ID |
producerEpoch | 记录了当前消息所在分区的Producer的服务器版本(纪元) |
isTransactional | 是否开启事务 |
magic | 魔数(Kafka服务程序协议版本号) |
CreateTime(data) | 数据创建的时间戳 |
isControl | 控制类数据(produce的数据为false,事务Marker为true) |
compresscodec | 压缩格式,默认无 |
isvalid | 数据是否有效 |
offset | 数据偏移量,从0开始 |
key | 数据key |
payload | 数据value |
sequence | 当前批次中数据的序列号 |
CreateTime(header) | 当前批次中最后一条数据的创建时间戳 |
2.5.3.2 数据索引文件
Kafka的基础设置中,数据日志文件到达1G才会滚动生产新的文件。那么从1G文件中想要快速获取我们想要的数据,效率还是比较低的。通过前面的介绍,如果我们能知道数据在文件中的位置(position),那么定位数据就会快很多,问题在于我们如何才能在知道这个位置呢。
Kafka在存储数据时,都会保存数据的偏移量信息,而偏移量是从0开始计算的。简单理解就是数据的保存顺序。比如第一条保存的数据,那么偏移量就是0,第二条保存的数据偏移量就是1,但是这个偏移量只是告诉我们数据的保存顺序,却无法定位数据,不过需要注意的是,每条数据的大小是可以确定的(参考上一个小节的内容)。既然可以确定,那么数据存放在文件的位置起始也就是确定了,所以Kafka在保存数据时,其实是可以同时保存位置的,那么我们在访问数据时,只要通过偏移量其实就可以快速定位日志文件的数据了。
不过这依然有问题,就是数据量太多了,对应的偏移量也太多了,并且主题分区的数据文件会有很多,那我们是如何知道数据在哪一个文件中呢?
为了定位方便Kafka在提供日志文件保存数据的同时,还提供了用于数据定位的索引文件,索引文件中保存的就是逻辑偏移量和数据物理存储位置(偏移量)的对应关系。并且还记得吗?每个数据日志文件的名称就是当前文件中数据䣌起始偏移量,所以通过偏移量就可以快速选取文件以及定位数据的位置从而快速找到数据。这种感觉就有点像Java的HashMap通过Key可以快速找到Value的感觉一样,如果不知道Key,那么从HashMap中获取Value是不是就特别慢。道理是一样的。
Kafka的数据索引文件都保存了什么呢?咱们来看一下:
通过图片可以看到,索引文件中保存的就是逻辑偏移量和物理偏移量位置的关系。
有了这个索引文件,那么我们根据数据的顺序获取数据就非常的方便和高效了。不过,相信大家也注意到了,那就是索引文件中的offset并不连续。那如果我想获取offset等于3的数据怎么办?
其实也不难,因为offset等于3不就是offset等于2的一下条吗?那我使用offset等于2的数据的position + size不就定位了offset等于3的位置了吗,当然了我举得例子有点过于简单了,不过本质确实差的不多,kafka在查询定位时其实采用的就是二分查找法。
不过,为什么Kafka的索引文件是不连续的呢?
那是因为如果每条数据如果都把偏移量的定位保存下来,数据量也不小,还有就是,如果索引数据丢了几条,其实并不会太影响查询效率,比如咱们之前举得offset等于3的定位过程。因为Kafka底层实现时,采用的是虚拟内存映射技术mmap,将内存和文件进行双向映射,操作内存数据就等同于操作文件,所以效率是非常高的,但是因为是基于内存的操作,所以并不稳定,容易丢数据,因此Kafka的索引文件中的索引信息是不连续的,而且为了效率,kafka默认情况下,4kb的日志数据才会记录一次索引,但是这个是可以进行配置修改的,参数为
log.index.interval.bytes
,默认值为4096。所以我们有的时候会将kafka的不连续索引数据称之为稀疏索引。
2.5.3.3 数据时间索引文件
某些场景中,我们不想根据顺序(偏移量)获取Kafka的数据,而是想根据时间来获取的数据。这个时候,可没有对应的偏移量来定位数据,那么查找的效率就非常低了,因为kafka还提供了时间索引文件,咱们来看看它的内容是什么
通过图片,大家可以看到,这个时间索引文件起始就是将时间戳和偏移量对应起来了,那么此时通过时间戳就可以找到偏移量,再通过偏移量找到定位信息,再通过定位信息找到数据不就非常方便了吗。
2.5.3.4 查看文件内容
如果我们想要查看文件的内容,直接看是看不了,需要采用特殊的之类
# 进入bin/windows目录
cd bin/windows
# 执行查看文件的指令
kafka-run-class.bat kafka.tools.DumpLogSegments --files ../../data/kafka/test-0/00000000000000000000.log --print-data-log
2.5.4 数据刷写
在Linux系统中,当我们把数据写入文件系统之后,其实数据在操作系统的PageCache(页缓冲)里面,并没有刷到磁盘上。如果操作系统挂了,数据就丢失了。
一方面,应用程序可以调用fsync
这个系统调用来强制刷盘
另一方面,操作系统有后台线程,定时刷盘。频繁调用fsync会影响性能,需要在性能和可靠性之间进行权衡。实际上,Kafka提供了参数进行数据的刷写
log.flush.interval.messages
:达到消息数量时,会将数据flush到日志文件中。log.flush.interval.ms
:间隔多少时间(ms),执行一次强制的flush操作。flush.scheduler.interval.ms
:所有日志刷新到磁盘的频率
log.flush.interval.messages和log.flush.interval.ms无论哪个达到,都会flush。官方不建议通过上述的三个参数来强制写盘,数据的可靠性应该通过replica来保证,而强制flush数据到磁盘会对整体性能产生影响。
2.5.5 副本同步
Kafka中,分区的某个副本会被指定为 Leader,负责响应客户端的读写请求。分区中的其他副本自动成为 Follower,主动拉取(同步)Leader 副本中的数据,写入自己本地日志,确保所有副本上的数据是一致的。
2.5.6.1 启动数据同步线程
Kafka创建主题时,会根据副本分配策略向指定的Broker节点发出请求,将不同的副本节点设定为Leader或Follower。一旦某一个Broker节点设定为Follower节点,那么Follower节点会启动数据同步线程ReplicaFetcherThread
,从Leader副本节点同步数据。
线程运行后,会不断重复两个操作:截断(truncate)和抓取(fetch)。
- 截断:为了保证分区副本的数据一致性,当分区存在
Leader Epoch
值时,会将副本的本地日志截断到Leader Epoch
对应的最新位移处,如果分区不存在对应的 Leader Epoch 记录,那么依然使用原来的高水位机制,将日志调整到高水位值处。 - 抓取:向Leader同步最新的数据。
2.5.6.2 生成数据同步请求
启动线程后,需要周期地向Leader节点发送FETCH请求,用于从Leader获取数据。
等待Leader节点的响应的过程中,会阻塞当前同步数据线程。
2.5.6.3 处理数据响应
当Leader副本返回响应数据时,其中会包含多个分区数据,当前副本会遍历每一个分区,将分区数据写入数据文件中。
2.5.6.4 更新数据偏移量⚠️
当Leader副本返回响应数据时,除了包含多个分区数据外,还包含了和偏移量相关的数据HW和LSO,副本需要根据场景对Leader返回的不同偏移量进行更新。
2.5.6.4.1 Offset
Kafka的每个分区的数据都是有序的,所谓的数据偏移量,指的就是Kafka在保存数据时,用于快速定位数据的标识,类似于Java中数组的索引,从0开始。
Kafka的数据文件以及数据访问中包含了大量和偏移量的相关的操作。
2.5.6.4.2 LSO
起始偏移量(Log Start Offset),每个分区副本都有起始偏移量,用于表示副本数据的起始偏移位置,初始值为0。
LSO一般情况下是无需更新的,但是如果数据过期,或用户手动删除数据时,Leader的Log Start Offset可能发生变化,Follower副本的日志需要和Leader保持严格的一致,因此,如果Leader的该值发生变化,Follower自然也要发生变化保持一致。
2.5.6.4.3 LEO
日志末端位移(Log End Offset),表示下一条待写入消息的offset,每个分区副本都会记录自己的LEO。对于Follower副本而言,它能读取到Leader副本 LEO 值以下的所有消息。
2.5.6.4.1 HW
高水位值(High Watermark),定义了消息可见性,标识了一个特定的消息偏移量(offset),消费者只能拉取到这个水位offset之前的消息,同时这个偏移量还可以帮助Kafka完成副本数据同步操作。
2.5.6 数据一致性
2.5.6.1 数据一致性
Kafka的设计目标是:高吞吐、高并发、高性能。为了做到以上三点,它必须设计成分布式的,多台机器可以同时提供读写,并且需要为数据的存储做冗余备份。
图中的主题有3个分区,每个分区有3个副本,这样数据可以冗余存储,提高了数据的可用性。并且3个副本有两种角色,Leader和Follower,Follower副本会同步Leader副本的数据。
一旦Leader副本挂了,Follower副本可以选举成为新的Leader副本, 这样就提升了分区可用性,但是相对的,在提升了分区可用性的同时,也就牺牲了数据的一致性。
我们来看这样的一个场景
一个分区有3个副本,一个Leader和两个Follower。Leader副本作为数据的读写副本,所以生产者的数据都会发送给leader副本,而两个follower副本会周期性地同步leader副本的数据,但是因为网络,资源等因素的制约,同步数据的过程是有一定延迟的,所以3个副本之间的数据可能是不同的。
具体如下图所示:
此时,假设leader副本因为意外原因宕掉了,那么Kafka为了提高分区可用性,此时会选择2个follower副本中的一个作为Leader对外提供数据服务。此时我们就会发现,对于消费者而言,之前leader副本能访问的数据是D,但是重新选择leader副本后,能访问的数据就变成了C,这样消费者就会认为数据丢失了,也就是所谓的数据不一致了。
有点类似于数据库中的 脏读!
为了提升数据的一致性,Kafka引入了高水位(HW :High Watermark)机制,Kafka在不同的副本之间维护了一个水位线的机制(其实也是一个偏移量的概念),消费者只能读取到水位线以下的的数据。这就是所谓的木桶理论:木桶中容纳水的高度,只能是水桶中最短的那块木板的高度。这里将整个分区看成一个木桶,其中的数据看成水,而每一个副本就是木桶上的一块木板,那么这个分区(木桶)可以被消费者消费的数据(容纳的水)其实就是数据最少的那个副本的最后数据位置(木板高度)。
也就是说,消费者一开始在消费Leader的时候,虽然Leader副本中已经有a、b、c、d 4条数据,但是由于高水位线的限制,所以也只能消费到a、b这两条数据。
这样即使leader挂掉了,但是对于消费者来讲,消费到的数据其实还是一样的,因为它能看到的数据是一样的,也就是说,消费者不会认为数据不一致。
不过也要注意,因为follower要求和leader的日志数据严格保持一致,所以就需要根据现在Leader的数据偏移量值对其他的副本进行数据截断(truncate)操作。
2.5.6.2 HW在副本之间的传递
HW高水位线会随着follower的数据同步操作,而不断上涨,也就是说,follower同步的数据越多,那么水位线也就越高,那么消费者能访问的数据也就越多。接下来,我们就看一看,follower在同步数据时HW的变化。
首先,初始状态下,Leader和Follower都没有数据,所以和偏移量相关的值都是初始值0,而由于Leader需要管理follower,所以也包含着follower的相关偏移量(LEO——Log End Offeset)数据。
生产者向Leader发送两条数据,Leader收到数据后,会更新自身的偏移量信息。
Leader副本偏移量更新:
LEO=LEO+2=2
接下来,Follower开始同步Leader的数据,同步数据时,会将自身的LEO值作为参数传递给Leader。此时,Leader会将数据传递给Follower,且同时Leader会根据所有副本的LEO值更新HW。
Leader副本偏移量更新:
HW = Math.max[HW, min(LeaderLEO,F1-LEO,F2-LEO)]=0
由于两个Follower的数据拉取速率不一致,所以Follower-1抓取了2条数据,而Follower-2抓取了1条数据。Follower再收到数据后,会将数据写入文件,并更新自身的偏移量信息。
Follower-1副本偏移量更新:
LEO=LEO+2=2
HW = Math.min[LeaderHW, LEO]=0
Follower-2副本偏移量更新:
LEO=LEO+1=1
HW = Math.min[LeaderHW, LEO]=0
接下来Leader收到了生产者的数据C,那么此时会根据相同的方式更新自身的偏移量信息
Leader副本偏移量更新:
LEO=LEO+1=3
follower接着向Leader发送Fetch请求,同样会将最新的LEO作为参数传递给Leader。Leader收到请求后,会更新自身的偏移量信息。
Leader副本偏移量更新:
HW = Math.max[HW, min(LeaderLEO,F1-LEO,F2-LEO)]=0
此时,Leader会将数据发送给Follower,同时也会将HW一起发送。
Follower收到数据后,会将数据写入文件,并更新自身偏移量信息
Follower-1副本偏移量更新:
LEO=LEO+1=3
HW = Math.min[LeaderHW, LEO]=1
Follower-2副本偏移量更新:
LEO=LEO+1=2
HW = Math.min[LeaderHW, LEO]=1
因为Follower会不断重复Fetch数据的过程,所以前面的操作会不断地重复。最终,follower副本和Leader副本的数据和偏移量是保持一致的。
上面演示了副本列表ISR中Follower副本和Leader副本之间HW偏移量的变化过程,但特殊情况是例外的。比如当前副本列表ISR中,只剩下了Leader一个副本的场合下,是不需要等待其他副本的,直接推高HW即可。
2.5.6.3 ISR(In-Sync Replicas)伸缩
在Kafka中,一个Topic(主题)包含多个Partition(分区),Topic是逻辑概念,而Partition是物理分组。一个Partition包含多个Replica(副本),副本有两种类型Leader Replica/Follower Replica,Replica之间是一个Leader副本对应多个Follower副本。注意:分区数可以大于节点数,但副本数不能大于节点数。因为副本需要分布在不同的节点上,才能达到备份的目的。
Kafka的分区副本中只有Leader副本具有数据写入的功能,而Follower副本需要不断向Leader发出申请,进行数据的同步。这里所有同步的副本会形成一个列表,我们称之为同步副本列表(In-Sync Replicas),也可以简称ISR,除了ISR以外,还有已分配的副本列表(Assigned Replicas),简称AR。这里的AR其实不仅仅包含ISR,还包含了没有同步的副本列表(Out-of-Sync Replicas),简称OSR
生产者Producer生产数据时,ACKS应答机制如果设置为all(-1),那此时就需要保证同步副本列表ISR中的所有副本全部接收完毕后,Kafka才会进行确认应答。
数据存储时,只有ISR中的所有副本LEO数据都更新了,才有可能推高HW偏移量的值。这就可以看出,ISR在Kafka集群的管理中是非常重要的。
在Broker节点中,有一个副本管理器组件(ReplicaManager),除了读写副本、管理分区和副本的功能之外,还有一个重要的功能,那就是管理ISR。这里的管理主要体现在两个方面:
- 周期性地查看 ISR 中的副本集合是否需要收缩。这里的收缩是指,把ISR副本集合中那些与Leader差距过大的副本移除的过程。
相对的,有收缩,就会有扩大,在Follower抓取数据时,判断副本状态,满足扩大ISR条件后,就可以提交分区变更请求。完成ISR列表的变更。
- 向集群Broker传播ISR的变更。ISR发生变化(包含Shrink和Expand)都会执行传播逻辑。ReplicaManager每间隔2500毫秒就会根据条件,将ISR变化的结果传递给集群的其他Broker。
多少个分区多少个副本 都是在 zookeeper的