该篇主要讲Flink中的时间语义、Flink 水印机制以及Flink对乱序数据的三重保障。
一、Flink的三种时间语义
1.1 Event Time
Event Time指的是数据流中每个元素或者每个事件自带的时间属性,一般是事件发生的时间。由于事件从发生到进入Flink时间算子之间有很多环节,一个较早发生的事件因为延迟可能较晚到达,因此使用Event Time意味着事件到达有可能是乱序的。
使用Event Time时,最理想的情况下,我们可以一直等待所有的事件到达后再进行时间窗口的处理。假设一个时间窗口内的所有数据都已经到达,基于Event Time的流处理会得到正确且一致的结果:无论我们是将同一个程序部署在不同的计算环境还是在相同的环境下多次计算同一份数据,都能够得到同样的计算结果。我们根本不同担心乱序到达的问题。但这只是理想情况,现实中无法实现,因为我们既不知道究竟要等多长时间才能确认所有事件都已经到达,更不可能无限地一直等待下去。在实际应用中,当涉及到对事件按照时间窗口进行统计时,Flink会将窗口内的事件缓存下来,直到接收到一个Watermark,以确认不会有更晚数据的到达。Watermark意味着在一个时间窗口下,Flink会等待一个有限的时间,这在一定程度上降低了计算结果的绝对准确性,而且增加了系统的延迟。在流处理领域,比起其他几种时间语义,使用Event Time的好处是某个事件的时间是确定的,这样能够保证计算结果在一定程度上的可预测性。
一个基于Event Time的Flink程序中必须定义Event Time,以及如何生成Watermark。我们可以使用元素中自带的时间,也可以在元素到达Flink后人为给Event Time赋值。
使用Event Time的优势是结果的可预测性,缺点是缓存较大,增加了延迟,且调试和定位问题更复杂。
1.2 Processing Time
对于某个算子来说,Processing Time指算子使用当前机器的系统时钟来定义时间。在Processing Time的时间窗口场景下,无论事件什么时候发生,只要该事件在某个时间段达到了某个算子,就会被归结到该窗口下,不需要Watermark机制。对于一个程序在同一个计算环境来说,每个算子都有一定的耗时,同一个事件的Processing Time,第n个算子和第n+1个算子不同。如果一个程序在不同的集群和环境下执行时,限于软硬件因素,不同环境下前序算子处理速度不同,对于下游算子来说,事件的Processing Time也会不同,不同环境下时间窗口的计算结果会发生变化。因此,Processing Time在时间窗口下的计算会有不确定性。
Processing Time只依赖当前执行机器的系统时钟,不需要依赖Watermark,无需缓存。Processing Time是实现起来非常简单也是延迟最小的一种时间语义。
1.3 Ingestion Time
Ingestion Time是事件到达Flink Souce的时间。从Source到下游各个算子中间可能有很多计算环节,任何一个算子的处理速度快慢可能影响到下游算子的Processing Time。而Ingestion Time定义的是数据流最早进入Flink的时间,因此不会被算子处理速度影响。
Ingestion Time通常是Event Time和Processing Time之间的一个折中方案。比起Event Time,Ingestion Time可以不需要设置复杂的Watermark,因此也不需要太多缓存,延迟较低。比起Processing Time,Ingestion Time的时间是Souce赋值的,一个事件在整个处理过程从头至尾都使用这个时间,而且后续算子不受前序算子处理速度的影响,计算结果相对准确一些,但计算成本稍高。
注:Ingestion Time1.13 版本已经不再提了,这也是为啥官网的图没看到Ingestion Time的原因。目前推荐Event Time的时间语义。
1.4 Flink如何设置时间域
调用 setStreamTimeCharacteristic 设置时间域,枚举类 TimeCharacteristic 预设了三种时间域,不显式设置的情况下,默认使用 TimeCharacteristic.EventTime(1.12 版本以前默认是 TimeCharacteristic.ProcessingTime)。
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //过期方法
在 1.12 以后版本默认是使用 EventTime,如果要显示使用 ProcessingTime,可以关闭 watermark(自动生成 watermark 的间隔设置为 0),设置
env.getConfig().setAutoWatermarkInterval(0);
二、Flink 水印机制
我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 Event Time 顺序排列的,为了保证计算结果的正确性,需要让窗口等待延迟数据到达后再进行计算,但是不能无限期地等待下去,必须有一种机制来确定何时触发窗口计算,这种机制就是水印(Watermark)。
稍稍总结一下水位线的引入原因:
- 分布式系统的网络传输的不确定性;
- 数据是乱序的;
- 支持事件时间的流处理器需要一种测量事件时间进度的方法,用以正确的处理窗口等操作;
水位线的物理意义有两点:
- 水位线本质是一个基于数据生成的、单调递增的时间戳;
- 水位线 W(t)表示当前数据流中的所有 t 时刻前的数据都已经到了。
水印是一种用于衡量事件时间进度的机制,其表示某个时刻(事件时间)以前的数据将不再产生,因此水印指的是一个时间点。水印作为数据流的一部分流动,并带有时间戳t。t表示该流中不应再有时间戳小于等于t的元素(即时间戳早于或等于水印的事件)。
下图显示了带有时间戳和嵌入式水印的事件流,事件是按顺序排列的(相对于其时间戳),这意味着水印只是流中的周期性标记。
水印对于乱序流至关重要,如下图所示,其中事件不是按其时间戳排序的。通常,水印是数据流中一个点的声明,表示水印之前的所有事件都应该到达。一旦水印到达算子,算子则认为某个时间周期,所有事件已经被收到,不会再有更多符合条件的事件。
水印是直接通过Source Function生成的或在后续的DataStream API中生成的。在实际的流计算中,一个作业往往会同时处理多个源的数据,多个源的数据按照key分组后进行Shuffle处理,数据会汇聚到同一个处理节点。而每个并行子任务通常独立生成水印,这样就容易导致汇聚到一起的水印不是单调递增的。对于这种情况,Flink会选择所有流入的水印中事件时间最小的一个发往下游,如下图所示。
多个流的水印流入算子后,由于当前算子也有自己的水印,因此算子会综合计算得出最终水印,计算规则为:取多个流中事件时间最小的水印与当前算子的水印进行对比,如果大于当前算子水印,则更新当前算子水印,并发往下游。例如抽象类AbstractStreamOperator中的源码如下:
三、分布式环境下Watermark的传播
在实际计算过程中,Flink的算子一般分布在多个并行的分区(或者称为实例)上,Flink需要将Watermark在并行环境下向前传播。如下图所示,Flink的每个并行算子子任务会维护针对该子任务的Event Time时钟,这个时钟记录了这个算子子任务Watermark处理进度,随着上游Watermark数据不断向下发送,算子子任务的Event Time时钟也要不断向前更新。由于上游各分区的处理速度不同,到达当前算子的Watermark也会有先后快慢之分,每个算子子任务会维护来自上游不同分区的Watermark信息,这是一个列表,列表内对应上游算子各分区的Watermark时间戳等信息。
当上游某分区有Watermark进入该算子子任务后,Flink先判断新流入的Watermark时间戳是否大于Partition Watermark列表内记录的该分区的历史Watermark时间戳,如果新流入的更大,则更新该分区的Watermark。例如,某个分区新流入的Watermark时间戳为4,算子子任务维护的该分区Watermark为1,那么Flink会更新Partition Watermark列表为最新的时间戳4。接着,Flink会遍历Partition Watermark列表中的所有时间戳,选择最小的一个作为该算子子任务的Event Time。同时,Flink会将更新的Event Time作为Watermark发送给下游所有算子子任务。算子子任务Event Time的更新意味着该子任务将时间推进到了这个时间,该时间之前的事件已经被处理并发送到下游。例如,图中第二步和第三步,Partition Watermark列表更新后,导致列表中最小时间戳发生了变化,算子子任务的Event Time时钟也相应进行了更新。整个过程完成了数据流中的Watermark推动算子子任务Watermark的时钟更新过程。Watermark像一个幕后推动者,不断将流处理系统的Event Time向前推进。我们可以将这种机制总结为:
- Flink某算子子任务根据各上游流入的Watermark来更新Partition Watermark列表。
- 选取Partition Watermark列表中最小的时间作为该算子的Event Time,并将这个时间发送给下游算子。
这样的设计机制满足了并行环境下Watermark在各算子中的传播问题,但是假如某个上游分区的Watermark一直不更新,Partition Watermark列表其他地方都在正常更新,唯独个别分区的时间停留在很早的某个时间,这会导致算子的Event Time时钟不更新,相应的时间窗口计算也不会被触发,大量的数据积压在算子内部得不到处理,整个流处理处于空转状态。这种问题可能出现在使用数据流自带的Watermark,自带的Watermark在某些分区下没有及时更新。针对这种问题,一种解决办法是根据机器当前的时钟周期性地生成Watermark。
此外,在union等多数据流处理时,Flink也使用上述Watermark更新机制,那就意味着,多个数据流的时间必须对齐,如果一方的Watermark时间较老,那整个应用的Event Time时钟也会使用这个较老的时间,其他数据流的数据会被积压。一旦发现某个数据流不再生成新的Watermark,我们要在SourceFunction中的SourceContext里调用markAsTemporarilyIdle设置该数据流为空闲状态。
四、Flink对乱序数据的三重保障
我们思考一个问题:怎样避免乱序数据带来计算不正确性?
常用的解决办法是:当最大的事件时间maxEventTime达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口。
但是,我们应该等待多久的时间呢?由于网络、分布式等原因造成的延时,一般大多数迟到的数据都会在最近一段时间到来,这个最近一段时间一般是毫秒级的,Watermark就是做到了这样的保障。还有很少的一部分数据会迟到很久,我们可以通过allowedLateness和sideOutputLateData来兜底。
处理乱序数据,三重保证机制:
3.1 Watermark
能够保证迟到很短的时间的数据到来后(一般是迟到毫秒级别内的数据,最大不超过1s),触发窗口关闭并输出。(即能够hold住短时间内迟到的数据)
3.2 allowedLateness
allowedLateness(lateness: Time):设置允许的延迟时间,默认为0,该方法仅对事件时间窗口有效。在水印通过窗口结尾后(即水印>=窗口结束时间),该方法指定的允许延迟时间才开始生效。该延迟时间与水印指定的允许延迟时间不冲突,相当于在水印延迟时间的基础上进行累加。落入该方法指定的允许延迟时间范围内的元素可能会导致窗口再次触发(例如EventTimeTrigger)。为了使这些元素正常被计算,Flink会保持窗口的状态,直到允许的延迟过期为止。一旦延迟过期,Flink将删除该窗口并删除其状态。
3.3 sideOutputLateData
sideOutputLateData(outputTag: OutputTag[T]):将延迟到达的数据保存到outputTag对象中,OutputTag是一种类型化的命名标签,用于标记算子的侧道输出,单独收集延迟数据。后面可通过DataStream的getSideOutput(outputTag)方法得到被丢弃数据组成的数据流。
当指定的允许延迟大于0时,在水印通过窗口结尾后,将保留窗口及其内容。在这种情况下,当一个迟到但未被丢弃的元素到达时,它可能会导致该窗口的另一次触发。这次触发称为延迟触发,因为是由延迟事件触发的,与主触发(即窗口的第一次触发)相反。对于会话窗口,后期触发会进一步导致窗口合并,因为可能缩小两个预先存在的未合并窗口之间的间隙。当使用全局窗口时,没有数据是延迟的,因为全局窗口的结束时间戳是Long.MAX_VALUE。
注意:
后期触发的元素应更新先前计算的结果,即数据流将包含同一计算的多个结果。根据你的应用程序,需要考虑这些重复的结果或对它们进行重复数据删除。
在水印的基础上设置允许延迟机制后,数据可以延迟的时间范围是多少?在只设置了水印的情况下,如果满足当前进入Flink的最大事件时间>=窗口结束时间+允许的最大延迟时间,则触发窗口计算,发射计算结果并销毁窗口。在水印的基础上设置了允许延迟机制后,如果满足当前进入Flink的最大事件时间>=窗口结束时间+允许的最大延迟时间(水印指定的),则触发窗口计算,发射计算结果,但不会销毁窗口,窗口会保留计算状态并继续等待延迟数据;每条延迟数据到达后,如果落入窗口内,都会再次触发窗口计算,更新计算状态,发射出最新计算结果,直到满足条件:当前进入Flink的最大事件时间>=窗口结束时间+允许的最大延迟时间(水印指定的)+允许延迟机制指定的延迟时间,则关闭并销毁窗口。此后到达的延迟数据,由于窗口已经关闭,数据将进入侧道输出流进行单独存放,后期根据业务单独处理即可。
指定允许延迟时间可以使用如下代码片段:
使用Flink的侧道输出机制可以获得一个后来被丢弃的数据组成的数据流。使用时首先需要使用sideOutputLateData(OutputTag)方法指定要在窗口化流上获取后期数据。然后可以使用getSideOutput(lateOutputTag)方法得到后期数据组成的数据流,代码如下:
为了更好地理解允许延迟和侧道输出机制,假设有乱序数据按照ABCDEFG的顺序依次到达Flink应用程序,并且设置了水印允许的最大延迟时间为3分钟,在水印的基础上又通过allowedLateness(Time.minutes(3))方法设置了允许的延迟时间为3分钟,使用sideOutputLateData(lateOutputTag)方法设置侧道输出,如下图所示。
当数据A到达时,由于窗口开始时间<=数据A的事件时间<窗口结束时间,因此数据A落入窗口内。
当数据B到达时,由于其事件时间>=窗口结束时间,因此数据B不属于该窗口。此时Watermark=进入Flink的当前最大事件时间‒允许的最大延迟时间=9:11‒3分钟=9:08。水印在窗口内,不会触发窗口计算。
当数据C到达时,由于窗口开始时间<=数据C的事件时间<窗口结束时间,因此数据C落入窗口内。
当数据D到达时,由于其事件时间>=窗口结束时间,因此数据D不属于该窗口。此时Watermark=进入Flink的当前最大事件时间‒允许的最大延迟时间=9:15‒3(分钟)=9:12>=窗口结束时间。水印在窗口外,触发窗口计算并发射计算结果。由于设置了允许延迟机制的延迟时间为3分钟,此时的窗口结束时间+允许的最大延迟时间(水印指定的)+允许延迟机制指定的延迟时间=9:10+3(分钟)+3(分钟)=9:16>9:15(进入Flink的当前最大事件时间),不满足窗口关闭的条件,因此窗口会继续等待延迟数据,并保留计算状态(此处的计算状态指的就是计算结果,例如窗口内数据的聚合结果)。
当数据E到达时,由于进入Flink的当前最大事件时间没有改变,窗口不会关闭,而是继续等待。窗口开始时间<=数据E的事件时间<窗口结束时间,因此数据E落入窗口内,并触发窗口计算,与上次计算的结果进行合并,发射出新的计算结果,如下图所示。
当数据F到达时,此时的窗口结束时间+允许的最大延迟时间(水印指定的)+允许延迟机制指定的延迟时间=9:10+3(分钟)+3(分钟)=9:16<=9:16(进入Flink的当前最大事件时间),满足窗口关闭的条件,因此窗口会关闭并销毁。
当数据G到达时,窗口开始时间<=数据G的事件时间<窗口结束时间,但是窗口已经关闭了,因此数据G将进入侧道输出流进行单独存放。通过侧道输出API可从侧道输出流中取出延迟严重的数据进行相应的业务处理。
这样分析下来应该豁然开朗了吧?如果还有啥疑问欢迎和我一起交流,我们下一篇再见。