戳蓝字“CSDN云计算”关注我们哦!
Kafka 对外使用 Topic 的概念,生产者往 Topic 里写消息,消费者从中读消息。为了做到水平扩展,一个 Topic 实际是由多个 Partition 组成的,遇到瓶颈时,可以通过增加 Partition 的数量来进行横向扩容。单个 Parition 内是保证消息有序。
每新写一条消息,Kafka 就是在对应的文件 append 写,所以性能非常高。
Kafka 的总体数据流是这样的:
基本流程是这样的:
- Key 有填。按照 Key 进行哈希,相同 Key 去一个 Partition。(如果扩展了 Partition 的数量那么就不能保证了)
- Key 没填。Round-Robin 来选 Partition。
API
Partition
- 怎么分配 Partition
- 怎么选 Leader
Partition 的分配:
将所有 Broker(假设共 n 个 Broker)和待分配的 Partition 排序。
将第 i 个 Partition 分配到第(i mod n)个 Broker 上 (这个就是 Leader)。
将第 i 个 Partition 的第 j 个 Replica 分配到第((i + j) mode n)个 Broker 上。
Leader容灾
Controller 会在 ZK 的 /brokers/ids 节点上注册 Watch,一旦有 Broker 宕机,它就能知道。当 Broker 宕机后,Controller 就会给受到影响的 Partition 选出新 Leader。
Controller 从 ZK 的 /brokers/topics/[topic]/partitions/[partition]/state 中,读取对应 Partition 的 ISR(in-sync replica 已同步的副本)列表,选一个出来做 Leader。选出 Leader后,更新ZK,然后发送 LeaderAndISRRequest 给受影响的 Broker,让它们知道改变这事。
为什么这里不是使用 ZK 通知,而是直接给 Broker 发送 RPC 请求,我的理解可能是这样做 ZK 有性能问题吧。如果 ISR 列表是空,那么会根据配置,随便选一个 Replica 做 Leader,或者干脆这个 Partition 就是歇菜;如果 ISR 列表的有机器,但是也歇菜了,那么还可以等 ISR 的机器活过来。
多副本同步
在 Acks=-1 的时候,如果 ISR 少于 min.insync.replicas 指定的数目,那么就会返回不可用。
这里 ISR 列表中的机器是会变化的,根据配置 replica.lag.time.max.ms,多久没同步,就会从 ISR 列表中剔除。以前还有根据落后多少条消息就踢出 ISR,在 1.0 版本后就去掉了,因为这个值很难取,在高峰的时候很容易出现节点不断的进出 ISR 列表。从 ISA 中选出 Leader 后,Follower 会把自己日志中上一个高水位后面的记录去掉,然后去和 Leader 拿新的数据。
因为新的 Leader 选出来后,Follower 上面的数据,可能比新 Leader 多,所以要截取。这里高水位的意思,对于 Partition 和 Leader,就是所有 ISR 中都有的最新一条记录。消费者最多只能读到高水位。
从 Leader 的角度来说高水位的更新会延迟一轮,例如写入了一条新消息,ISR 中的 Broker 都 Fetch 到了,但是 ISR 中的 Broker 只有在下一轮的 Fetch 中才能告诉 Leader。也正是由于这个高水位延迟一轮,在一些情况下,Kafka 会出现丢数据和主备数据不一致的情况,0.11 开始,使用 Leader Epoch 来代替高水位。
思考:当 Acks=-1 时
是 Follwers 都来 Fetch 就返回成功,还是等 Follwers 第二轮 Fetch?
Leader 已经写入本地,但是 ISR 中有些机器失败,那么怎么处理呢?
订阅 Topic 是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个 Partition。换句话来说,就是一个 Partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。
因此,如果消费组内的消费者如果比 Partition 多的话,那么就会有个别消费者一直空闲。
API
Offset的保存
一个消费组消费 Partition,需要保存 Offset 记录消费到哪,以前保存在 ZK 中,由于 ZK 的写性能不好,以前的解决方法都是 Consumer 每隔一分钟上报一次。这里 ZK 的性能严重影响了消费的速度,而且很容易出现重复消费。在 0.10 版本后,Kafka 把这个 Offset 的保存,从 ZK 总剥离,保存在一个名叫 consumeroffsets topic 的 Topic 中。
写进消息的 Key 由 Groupid、Topic、Partition 组成,Value 是偏移量 Offset。Topic 配置的清理策略是 Compact。总是保留最新的 Key,其余删掉。一般情况下,每个 Key 的 Offset 都是缓存在内存中,查询的时候不用遍历 Partition,如果没有缓存,第一次就会遍历 Partition 建立缓存,然后查询返回。
确定 Consumer Group 位移信息写入 consumers_offsets 的哪个 Partition,具体计算公式:
__consumers_offsets partition = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
//groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。
分配 Partition—Reblance
生产过程中 Broker 要分配 Partition,消费过程这里,也要分配 Partition 给消费者。类似 Broker 中选了一个 Controller 出来,消费也要从 Broker 中选一个 Coordinator,用于分配 Partition。
下面从顶向下,分别阐述一下:
怎么选 Coordinator
交互流程
Reblance 的流程
①选Coordinator:看Offset 保存在那个 Partition;该 Partition Leader 所在的 Broker 就是被选定的 Coordinator。
这里我们可以看到,Consumer Group的Coordinator,和保存Consumer Group Offset 的 Partition Leader是同一台机器。
②交互流程:把Coordinator选出来之后,就是要分配了。整个流程是这样的:
Consumer 启动、或者 Coordinator 宕机了,Consumer 会任意请求一个 Broker,发送 ConsumerMetadataRequest 请求。
Broker 会按照上面说的方法,选出这个 Consumer 对应 Coordinator 的地址。
Consumer 发送 Heartbeat 请求给 Coordinator,返回 IllegalGeneration 的话,就说明 Consumer 的信息是旧的了,需要重新加入进来,进行 Reblance。
返回成功,那么 Consumer 就从上次分配的 Partition 中继续执行。
③Reblance 流程:
Consumer 给 Coordinator 发送 JoinGroupRequest 请求。
这时其他 Consumer 发 Heartbeat 请求过来时,Coordinator 会告诉他们,要 Reblance 了。
其他 Consumer 发送 JoinGroupRequest 请求。
所有记录在册的 Consumer 都发了 JoinGroupRequest 请求之后,Coordinator 就会在这里 Consumer 中随便选一个 Leader。
然后回 JoinGroupRespone,这会告诉 Consumer 你是 Follower 还是 Leader,对于 Leader,还会把 Follower 的信息带给它,让它根据这些信息去分配 Partition。
Consumer向Coordinator 发送 SyncGroupRequest,其中 Leader 的 SyncGroupRequest 会包含分配的情况。
Coordinator 回包,把分配的情况告诉 Consumer,包括 Leader。
当 Partition 或者消费者的数量发生变化时,都得进行 Reblance。
列举一下会 Reblance 的情况:
增加 Partition
增加消费者
消费者主动关闭
消费者宕机了
Coordinator 自己也宕机了
———————————— 消息投递语义————————————
- At most once:最多一次,消息可能会丢失,但不会重复。
- At least once:最少一次,消息不会丢失,可能会重复。
- Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11 中实现,仅限于下游也是 Kafka)
先获取数据,再进行业务处理,业务处理成功后 Commit Offset:
生产者生产消息异常,消息是否成功写入不确定,重做,可能写入重复的消息。
消费者处理消息,业务处理成功后,更新 Offset 失败,消费者重启的话,会重复消费。
先获取数据,再 Commit Offset,最后进行业务处理:
生产者生产消息异常,不管,生产下一个消息,消息就丢了。
消费者处理消息,先更新 Offset,再做业务处理,做业务处理失败,消费者重启,消息就丢了。
思路是这样的,首先要保证消息不丢,再去保证不重复。所以盯着 At least once 的原因来搞。
首先想出来的:
生产者重做导致重复写入消息:生产保证幂等性。
消费者重复消费:消灭重复消费,或者业务接口保证幂等性重复消费也没问题。
由于业务接口是否幂等,不是 Kafka 能保证的,所以 Kafka 这里提供的 Exactly once 是有限制的,消费者的下游也必须是 Kafka。所以以下讨论的,没特殊说明,消费者的下游系统都是 Kafka(注:使用 Kafka Conector,它对部分系统做了适配,实现了 Exactly once)。生产者幂等性好做,没啥问题。
解决重复消费有两个方法:
下游系统保证幂等性,重复消费也不会导致多条记录。
把 Commit Offset 和业务处理绑定成一个事务。
本来 Exactly once 实现第 1 点就 OK 了。但是在一些使用场景下,我们的数据源可能是多个 Topic,处理后输出到多个 Topic,这时我们会希望输出时要么全部成功,要么全部失败。这就需要实现事务性。既然要做事务,那么干脆把重复消费的问题从根源上解决,把 Commit Offset 和输出到其他 Topic 绑定成一个事务。
- 消息的 Seq 比 Broker 的 Seq 大超过时,说明中间有数据还没写入,即乱序了。
- 消息的 Seq 不比 Broker 的 Seq 小,那么说明该消息已被保存。
先从多个源 Topic 中获取数据。
做业务处理,写到下游的多个目的 Topic。
更新多个源 Topic 的 Offset。
引入 Tid(transaction id),和 Pid 不同,这个 ID 是应用程序提供的,用于标识事务,和 Producer 是谁并没关系。
就是任何 Producer 都可以使用这个 Tid 去做事务,这样进行到一半就死掉的事务,可以由另一个 Producer 去恢复。
同时为了记录事务的状态,类似对 Offset 的处理,引入 Transaction Coordinator 用于记录 Transaction Log。
在集群中会有多个 Transaction Coordinator,每个 Tid 对应唯一一个 Transaction Coordinator。
注:Transaction Log 删除策略是 Compact,已完成的事务会标记成 Null,Compact 后不保留。
数据流:
首先使用 Tid 请求任意一个 Broker(代码中写的是负载最小的 Broker),找到对应的 Transaction Coordinator。
请求 Transaction Coordinator 获取到对应的 Pid,和 Pid 对应的 Epoch,这个 Epoch 用于防止僵死进程复活导致消息错乱。
当消息的 Epoch 比当前维护的 Epoch 小时,拒绝掉。Tid 和 Pid 有一一对应的关系,这样对于同一个 Tid 会返回相同的 Pid。
Client 先请求 Transaction Coordinator 记录的事务状态,初始状态是 Begin,如果是该事务中第一个到达的,同时会对事务进行计时。
Client 输出数据到相关的 Partition 中;Client 再请求 Transaction Coordinator 记录 Offset 的事务状态;Client 发送 Offset Commit 到对应 Offset Partition。
Client 发送 Commit 请求,Transaction Coordinator 记录 Prepare Commit/Abort,然后发送 Marker 给相关的 Partition。
全部成功后,记录 Commit/Abort 的状态,最后这个记录不需要等待其他 Replica 的 ACK,因为 Prepare 不丢就能保证最终的正确性了。
前面都是从生产的角度看待事务。还需要从消费的角度去考虑一些问题。消费时,Partition 中会存在一些消息处于未 Commit 状态,即业务方应该看不到的消息,需要过滤这些消息不让业务看到,Kafka 选择在消费者进程中进行过来,而不是在 Broker 中过滤,主要考虑的还是性能。
Kafka 高性能的一个关键点是 Zero Copy,如果需要在 Broker 中过滤,那么势必需要读取消息内容到内存,就会失去 Zero Copy 的特性。
Kafka 的数据,实际上是以文件的形式存储在文件系统的。Topic 下有 Partition,Partition 下有 Segment,Segment 是实际的一个个文件,Topic 和 Partition 都是抽象概念。
在目录 /partitionid}/ 下,存储着实际的 Log 文件(即 Segment),还有对应的索引文件。每个 Segment 文件大小相等,文件名以这个 Segment 中最小的 Offset 命名,文件扩展名是 .log。Segment 对应的索引的文件名字一样,扩展名是 .index。
有两个 Index 文件:
- 一个是 Offset Index 用于按 Offset 去查 Message。
- 一个是 Time Index 用于按照时间去查,其实这里可以优化合到一起,下面只说 Offset Index。
为了减少索引文件的大小,降低空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每一个 Message 都记录下具体位置,而是每隔一定的字节数,再建立一条索引。
索引包含两部分:
BaseOffset:意思是这条索引对应 Segment 文件中的第几条 Message。这样做方便使用数值压缩算法来节省空间。例如 Kafka 使用的是 Varint。
Position:在 Segment 中的绝对位置。
查找 Offset 对应的记录时,会先用二分法,找出对应的 Offset 在哪个 Segment 中,然后使用索引,在定位出 Offset 在 Segment 中的大概位置,再遍历查找 Message。
———————————— 常用配置项 ————————————
1. Broker 配置
架构师必备技能:教你画出一张合格的技术架构图
30 岁程序员生活图鉴,怎样算是活成了理想的模样?
千万不要和女程序员做同事!
阿里云智能 AIoT 首席科学家丁险峰:阿里全面进军 IoT 这一年 | 问底中国 IT 技术演进
只有程序员才能读懂的西游记
通信工程到底要不要转专业?
阿里云智能运维的自动化三剑客