前言
Apache Kafka 和 Apache Flink 的结合,为构建实时流处理应用提供了一套强大的解决方案[1]。Kafka 作为高吞吐量、低延迟的分布式消息队列,负责数据的采集、缓冲和分发;而 Flink 则是功能强大的流处理引擎,负责对数据进行实时计算和分析。两者相辅相成,优势互补,共同构成了实时流处理应用的坚实基础。
其中 Flink Kafka Source 成为了连接 Kafka 与 Flink 的桥梁, 为 Apache Flink 提供了从 Apache Kafka 读取数据流的功能。它作为 Flink 数据输入的起点,负责高效、可靠地将 Kafka Topic 中的消息数据接入 Flink 流处理程序,为后续的实时计算、分析和处理提供数据基础。
值得一提的是,AutoMQ 作为 Apache Kafka 的社区分叉项目,对其存储层进行了重新设计与实现,但是完整保留了 Apache Kafka 计算层的代码。对于 Apache Kafka 具有 100% 的兼容性。这意味着在 Flink 生态系统中,专为 Kafka 开发的 Flink Kafka Source/Sink 可以与 AutoMQ 完全兼容。
Flink Source 接口重构动机
从 Flink 1.12 开始,基于 new source API(FLIP-27)[2]和 new sink API (FLIP-143)[3]开发的 KafkaSource
和 KafkaSink
是推荐的 Kafka 连接器。 FlinkKafkaConsumer
和 FlinkKafkaProducer
则已被弃用。
在 FLIP-27: Refactor Source Interface 中旨在解决当前 streaming source 接口(SourceFunction)中的诸多问题与缺点,并同时统一批处理和 streaming APIs 之间的 source 接口。
在 FLIP-27 中,具体阐述 SourceFunction 中存在的问题,总结下来,可以分为如下:
-
批处理和流处理的 Source 实现不一致: Flink 为批处理和流处理提供了不同的 Source 接口,导致代码重复,维护困难。
-
逻辑耦合: “work discovery”(例如,发现 Kafka 的分区或文件系统的 Split )和实际读取数据的逻辑在
SourceFunction
接口和DataStream API
中混合在一起,导致实现复杂,例如 Kafka 和 Kinesis 的 Source 实现。 -
缺乏对分区/ Split 的显式支持: 当前接口没有明确表示分区或 Split 的概念。这使得难以以独立于 Source 的方式实现某些功能,例如事件时间对齐、每个分区的 watermark、动态 Split 分配和工作窃取。例如,Kafka 和 Kinesis 消费者都支持每个分区的 watermark,但截至 Flink 1.8.1,只有 Kinesis 消费者支持事件时间对齐(选择性地从 Split 读取数据,以确保事件时间均匀地推进)。
-
Checkpoint 锁的问题:
-
SourceFunction 持有 checkpoint 锁,导致实现必须确保在锁下进行元素发送和状态更新,限制了 Flink 对锁的优化空间。
-
锁不是公平锁,在锁竞争激烈的情况下,某些线程(例如 checkpoint 线程)可能无法及时获取锁。
-
当前的锁机制也阻碍了基于无锁 Actor/Mailbox 模型的 operator 实现。
-
-
缺乏统一线程模型: 每个 Source 都需要自己实现复杂的线程模型,导致开发和测试新 Source 变得困难。
重构后的 KafkaSource
核心抽象
Split:Flink 中的可追踪数据单元
在 Flink 中,记录分片 (Record Split) 是指一个具有唯一标识符的有序记录集合,它代表了数据源中的一段连续数据。记录分片是 Flink 进行并行处理、容错恢复和状态管理的基本单元。
分片的定义灵活可变,以 Kafka 为例:
-
分片可以是一个完整的分区。
-
分片也可以是分区内的一部分,例如 offset 100 到 200 的记录。
同时以 Kafka 为例,来解释 Split 的特征:
-
有序的记录集合: 分片中的记录是有序的,例如按照 Kafka 中的 offset 排序。
-
唯一标识符: 每个分片都有一个唯一的 ID,用于区分不同的分片,例如 Topic-PartitionId。
-
进度可追踪: Flink 会记录每个分片的处理进度,以便在发生故障时进行恢复,例如某个分区的消费位点。
Split Enumerator:Flink 数据读取的指挥官
Flink 中的记录分片枚举器 (Split Enumerator) 负责管理和分配数据源中的记录分片给 Source Reader 读取数据,它在 Flink 数据读取过程中扮演着“指挥官”的角色。
主要职责:
-
发现记录分片 (Split Discovery):
-
定期扫描外部数据源,例如 Kafka、文件系统等,检测新增的记录分片。
-
例如,Kafka 的 Split Enumerator 会监听 topic 的分区变化,并在新增分区时创建新的分片。
-
-
分配记录分片 (Split Assignment):
-
将发现的记录分片分配给 Source Reader 进行读取。
-
协调多个 Source Reader 之间的分片分配,尽量保证负载均衡。
-
监控 Source Reader 的处理进度,动态调整分片分配,例如将部分分片从过载的 Reader 转移到空闲的 Reader。
-
-
协调 Source Reader:
-
控制 Source Reader 的读取速度,避免个别 Reader 读取过快或过慢,影响整体的 watermark 推进和数据处理进度。
-
处理 Source Reader 的故障,例如将故障 Reader 负责的分片重新分配给其他 Reader。
-
Source Reader:Flink 数据读取的执行者
Source Reader 是 Flink 中真正执行数据读取操作的组件,它负责从 Split Enumerator 分配的记录分片中读取数据,并将数据传递给下游算子进行处理。
主要职责:
-
从记录分片读取数据:
-
根据 Split Enumerator 分配的记录分片信息,连接到外部数据源。
-
从指定位置开始读取数据,例如从 Kafka 的指定 offset 开始消费数据。
-
持续读取数据,直到分片结束或者收到停止信号。
-
-
事件时间水印处理:
-
从读取的记录中提取事件时间信息。
-
根据事件时间生成水印 (Watermark),并将其发送到下游算子,用于处理乱序数据和事件时间窗口。
-
-
数据反序列化:
- 将从外部数据源读取的原始数据(例如字节流)反序列化成 Flink 内部可以处理的数据结构(例如 DataStream 中的元素)。
-
数据发送:
- 将反序列化后的数据发送给下游算子进行处理。
将 Work Discovery 与 Reading 分离
将 Source 的功能拆分为两个主要组件:
-
SplitEnumerator( Split 枚举器):
-
负责发现和分配 Split (splits),例如文件、Kafka 分区等。
-
可以在 JobManager 或 TaskManager 上运行。
-
-
Reader(读取器):
-
负责从分配的 Split 中读取实际数据。
-
包含了当前 Source 接口的大部分功能。
-
可以按顺序读取一系列有界 Split ,也可以并行读取多个(无界) Split 。
-
之前 FlinkKafkaConsumerBase [4]的设计中,集中了 kafka partition 发现逻辑(KafkaPartitionDiscoverer)、数据读取逻辑(KafkaFetcher)、基于阻塞队列实现的生产者消费者模型等等。整体设计相对来说代码复杂,难以维护和扩展。
@Override
public void run(SourceContext<T> sourceContext) throws Exception {// ... (省略部分初始化代码)// ... (省略部分逻辑)this.kafkaFetcher =createFetcher(// ... (省略部分参数));// ... (省略部分逻辑)// 根据是否开启分区发现机制,选择不同的执行路径if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {// 直接运行数据读取循环kafkaFetcher.runFetchLoop(); } else {// 运行包含分区发现逻辑的代码runWithPartitionDiscovery(); }
}
在该思路下就可以分离并设计为:
KafkaSourceEnumerator:
-
发现分区: 定期或一次性地发现 Kafka 主题中的所有分区。
-
初始化分区: 获取每个分区的起始偏移量和结束偏移量。
-
分配分区: 将分区分配给不同的 Source Reader,并管理分区的分配状态
KafkaSourceReader 负责从分配的 Kafka 分区中读取数据,并处理 checkpoint 相关的逻辑。
-
接收并处理 SplitEnumerator 分配的分区
-
处理读取到的数据
-
处理 checkpoint
将 “Work Discovery” 和数据读取逻辑分离,提高了代码的模块化和可重用性。例如,可以为不同的分区发现策略实现不同的 SplitEnumerator,而无需修改 Reader 的代码
KafkaSourceEnumerator
SourceCoordinator 启动
-
当 Flink 作业启动时,会为每个 Kafka Source 任务创建一个
SourceCoordinator
实例。 -
SourceCoordinator
的start()
方法会被调用,开始执行以下操作:-
如果是第一次启动(非从 Checkpoint 恢复),则调用
source.createEnumerator()
创建一个KafkaSourceEnumerator
实例。 -
调用
enumerator.start()
启动KafkaSourceEnumerator
。
-
KafkaSourceEnumerator 启动
-
KafkaSourceEnumerator
的start()
方法会被调用:-
初始化 Kafka 消费者和 Kafka 管理客户端。
-
根据配置决定分区发现模式(周期性或单次)。
-
异步调用
discoverAndInitializePartitionSplit()
方法进行初始分区发现。
-
分区发现与初始化
-
discoverAndInitializePartitionSplit()
方法执行以下操作:-
获取 Kafka 分区变化信息。
-
获取新增分区的起始和终止偏移量(针对有限制的流)。
-
为每个新增分区创建
KafkaPartitionSplit
对象。 -
将新增分片添加到待分配列表 (
pendingPartitionSplitAssignment
) 中。 -
调用
assignPendingPartitionSplits()
方法分配分片。
-
分片分配
-
assignPendingPartitionSplits()
方法执行以下操作:-
将待分配分片分配给可用的 Source Reader。
-
如果禁用了周期性分区发现,则在初始分片分配完成后,向 Source Reader 发送
NoMoreSplitsEvent
事件。
-
Enumerator-Reader 通信机制
在 Flink 新的 Source 设计中,SplitEnumerator 和 SourceReader 是两个独立的组件,分别负责 Split 管理和数据读取。然而,在实际应用中,这两个组件之间 often 需要进行通信,例如在 Kafka Source 场景下:
-
KafkaSourceReader 需要请求 KafkaSplitEnumerator 进行 KafkaSourceReader 注册
-
KafkaSplitEnumerator 需要通知 KafkaSourceReader 有新的 KafkaPartitionSplit 需要读取。
通用通信机制:
为了满足 SplitEnumerator 和 SourceReader 之间的通信需求,Flink 引入了一种通用的消息传递机制,其核心是 SourceEvent
接口。
-
SourceEvent
: 定义了 SplitEnumerator 和 SourceReader 之间传递的消息类型。 -
OperatorEvent
:是在 OperatorCoordinator 和 Operator 之间传递消息的接口。
消息传递链条:
-
OperatorEventGateway: 接收
OperatorEvent
,并添加OperatorID
信息。 -
TaskOperatorEventGateway: 接收来自
OperatorEventGateway
的事件,添加ExecutionAttemptID
信息,并将其转发给JobMasterOperatorEventGateway
。 -
JobMasterOperatorEventGateway: Task Manager 与 JobManager 之间的 RPC 接口,负责将事件最终发送到 JobManager 上的 OperatorCoordinator。
public interface JobMasterOperatorEventGateway {CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(ExecutionAttemptID task,OperatorID operatorID,SerializedValue<OperatorEvent> event);}
public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {
...void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception;
...
}
对于 SourceCoordinator 来说,handleOperatorEvent 内到处理逻辑如下:
-
RequestSplitEvent
: 请求分配新的 Split ,调用enumerator.handleSplitRequest()
处理。 -
SourceEventWrapper
: 来自 SourceReader 的事件,调用enumerator.handleSourceEvent()
处理。 -
ReaderRegistrationEvent
: Reader 注册事件,调用handleReaderRegistrationEvent()
处理。 -
其他事件类型: 抛出异常,表示无法识别该事件类型。
(在实际实现当中,OperatorEvent
有时也可以直接传递到 SourceReader/SplitEnumerator,而不需要在转换为SourceEvent
)
对于 SourceOperator 来说,handleOperatorEvent 内到处理逻辑如下:
-
AddSplitEvent
: 新增 Split 事件,表示SplitEnumerator
分配了新的 Split 给该SourceReader
。 -
SourceEventWrapper
: 调用sourceReader.handleSourceEvents()
将事件传递给SourceReader
处理。 -
NoMoreSplitsEvent
: 没有更多 Split 事件,表示SplitEnumerator
已经分配完所有 Split 。
KafkaSourceReader
Reader 接口与线程模型
Flink 新 Source API 中的 SourceReader
接口,它负责从 Source Split 中读取数据,并与 SplitEnumerator
进行交互。SourceReader
接口代码如下:
public interface SourceReader<T, SplitT extends SourceSplit>extends AutoCloseable, CheckpointListener {void start();InputStatus pollNext(ReaderOutput<T> output) throws Exception;CompletableFuture<Void> isAvailable();void addSplits(List<SplitT> splits);void notifyNoMoreSplits();default void handleSourceEvents(SourceEvent sourceEvent) {}List<SplitT> snapshotState(long checkpointId);@Overridedefault void notifyCheckpointComplete(long checkpointId) throws Exception {}}
SourceReader 被设计为无锁的、非阻塞的接口,以支持 Actor/Mailbox/Dispatcher 风格的 operator 实现。所有方法都在同一个线程中调用,因此实现者无需处理并发问题。
-
SourceReader
使用异步的方式读取数据,并通过isAvailable()
方法通知运行时数据是否可读。 -
pollNext
可以非阻塞地读取下一条记录,并将记录发送到ReaderOutput
。 返回一个InputStatus
枚举值,表示读取状态,例如MORE_AVAILABLE
(还有更多数据)、END_OF_INPUT
(数据读取完毕) 等。
高层抽象简化 Source Reader 实现
-
底层的
SourceReader
接口非常通用,但实现起来比较复杂,尤其是对于像 Kafka 或 Kinesis 这样需要处理多路复用和并发读取的 Source 来说。 -
大多数连接器使用的 I/O 库都是阻塞式的,需要创建额外的 I/O 线程才能实现非阻塞读取。
因此在此 FP 中提出了一个解决方案:
- 高层抽象: 提供更简单的接口,允许使用阻塞式调用,并封装了多路复用和事件时间处理等复杂逻辑。
大多数 Reader 属于以下类别之一:
-
单 Reader 单 splits: 最简单的类型,例如读取单个文件。
-
单 Reader 多 splits: 一个 Reader 可以读取多个 Split ,例如:
- Sequential Single Split 读取: 单个 IO 线程依次顺序读取各个 Split,例如文件或数据库查询结果。
Sequential Single Split
- 多路复用多 splits 读取: 单个 IO 线程使用多路复用技术读取多个 Split ,例如 Kafka、Pulsar、Pravega 等。
Multi-split Multiplexed
- 多线程多 splits 读取: 使用多个线程并发读取多个 Split ,例如 Kinesis。
Multi-split Multi-threaded
以上分析,抽象如下接口,开发者可根据实际需求选择不同的高层 Reader 类型,并通过实现简单的接口来创建自定义的 Source Reader。
public interface SplitReader<E, SplitT extends SourceSplit> {RecordsWithSplitIds<E> fetch() throws InterruptedException;void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);void wakeUp();
}
-
fetch()
: 从 Split 中读取数据,返回一个RecordsWithSplitIds
对象,包含读取到的记录和对应的 Split ID。 -
handleSplitsChanges()
: 处理 Split 的变化,例如新增 Split 或移除 Split。 -
wakeUp()
: 唤醒阻塞的fetch()
操作,例如在有新的 Split 可用时。
public interface RecordEmitter<E, T, SplitStateT> {void emitRecord(E element, SourceOutput<T> output, SplitStateT splitState) throws Exception;
}
emitRecord
: 负责将SplitReader
读取的原始记录 (E
) 转换为最终的记录类型 (T
)
SourceReaderBase:提供了 SourceReader 的基础实现,封装了事件队列、 Split 状态管理、SplitFetcher 管理等通用逻辑
Split 分配流程:
-
SplitEnumerator 分配 Split :
SplitEnumerator
发现新的 Split ,并将它们分配给对应的SourceReader
。 -
SourceReader 接收 Split :
SourceReader
收到新的 Split 后,会进行初始化 state,随后调用SplitFetcherManager
的addSplits()
方法。 -
SplitFetcherManager 获取或创建 SplitFetcher,将 Splits 添加到 SplitFetcher
-
SplitFetcher 将
AddSplitsTask
添加到任务队列,唤醒 SplitFetcher 的工作线程 -
AddSplitsTask 通知 SplitReader 处理 SplitsChanges
-
SplitReader 更新被分配的 Split
Source 数据获取流程:
-
SplitReader 读取数据:
SplitReader
从 Split 中读取数据,并将数据封装成RecordsWithSplitIds
对象返回给SourceReader
。 -
SourceReader 处理数据:
SourceReader
遍历RecordsWithSplitIds
中的每条记录,并根据记录所属的 Split ID 获取对应的SplitState
。 -
调用 RecordEmitter 处理记录:
SourceReader
将记录和SplitState
传递给RecordEmitter
进行处理。 -
RecordEmitter 处理记录:
-
将原始记录类型 (
E
) 转换为最终的记录类型 (T
)。 -
更新
SplitState
,例如记录读取进度等信息。 -
将处理后的记录加入到
SourceOutput
。
-
Checkpoint 和 Failover 流程
Flink 的容错机制依赖于 检查点 (Checkpoint),它会定期生成数据流的快照,包括数据源的读取位置和算子的状态信息。当发生故障时,Flink 可以从最近的 Checkpoint 恢复,保证 Exactly-Once 语义。
在 Flink Kafka Source 中,KafkaSourceEnumerator
和 KafkaSourceReader
两个关键组件分别就有自己的 Checkpoint 和 Failover 的流程。如图所示,Flink Kafka Source 通过 Checkpoint 机制记录数据源的读取位置和 Source Reader 的状态信息,并在 Failover 时利用这些信息进行恢复,保证数据不会丢失或重复处理。
总结
Apache Flink 与消息队列的结合是构建实时流处理应用的强大方案。本文首先介绍了 Flink 与 Kafka 的集成,并深入探讨了 Flink Kafka Source 的重构,以解决原有设计上的不足。
Flink Kafka Source 的重构主要包括:
-
引入 Split Enumerator 和 Source Reader,实现 “Work Discovery” 与 Reading 的分离,提高代码模块化和可重用性。
-
通过 Source Event 机制实现 Enumerator 和 Reader 之间的异步通信,提高代码可维护性。
-
提供 SplitReader 和 RecordEmitter 等高层抽象,提供 SourceReaderBase 的实现,使得 Kafka Source 可以只需专注于 SplitReader 和 RecordEmitter 的实现。
重构后的 Flink Kafka Source 通过 Checkpoint 机制记录数据源读取位置和 Source Reader 状态信息,保证 Exactly-Once 语义。
然而,传统的 Shared Nothing 架构消息队列(如 Kafka)在面对海量数据和高并发场景时,存在存储成本高、运维复杂、扩缩容困难等挑战。
AutoMQ 作为新一代云原生消息队列,采用 Shared Storage 架构和基于对象存储的低成本存储,并与 Kafka 100% 兼容。未来,AutoMQ 与 Flink 的结合将为云原生实时流处理应用带来以下优势:
-
更低的成本: 尤其在处理冷数据时,成本优势更加明显。
-
更高的弹性: 支持集群自动扩缩容和流量自平衡,灵活应对业务变化,保证系统稳定运行。
-
更简化的运维: Shared Storage 架构简化了集群部署和运维。
-
与 Kafka 生态的无缝衔接: 方便企业平滑迁移。
AutoMQ 与 Flink 的结合将成为未来云原生实时流处理应用的重要发展方向,为企业提供更低成本、更高效、更便捷的流处理解决方案。
[1]: Apache Kafka (including Kafka Streams) + Apache Flink = Match Made in Heaven
[2]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
[3]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[4]: https://github.com/apache/flink/blob/b1e7b892cc9241f568150135b8bcf7bcd9f0c125/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L757-L830