1 时间语义
数据迟到的概念是:数据先产生,但是处理的时候滞后了
在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:
Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。
Ingestion Time:是数据进入Flink的时间。
Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。
在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。引入EventTime的时间属性如下:
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer[MyEvent](topic, schema, props))stream.keyBy( _.getUser ).timeWindow(Time.hours(1)).reduce( (a, b) => a.add(b) ).addSink(...)
设置了EventTime后后面处理底层会判断
注意:设置了事件时间,但是并不知道事件时间,Event Time 的使用一定要指定数据源中的时间戳,通过assignTimestampsAndWatermarks指定,时间戳要是ms单位。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream: DataStream[MyEvent] = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter())val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.filter( _.severity == WARNING ).assignTimestampsAndWatermarks(<watermark strategy>)withTimestampsAndWatermarks.keyBy( _.getGroup ).timeWindow(Time.seconds(10)).reduce( (a, b) => a.add(b) ).addSink(...)
对于排序好的数据,不需要延迟触发,可以只指定时间戳就行了
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.assignTimestampsAndWatermarks(_.timestamp)
对于乱序数据调用 assignTimestampAndWatermarks 方法,传入一个 BoundedOutOfOrdernessTimestampExtractor,就可以指定 watermark
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[WC](Time.milliseconds(1000)){override def extractTimestamp(element: WC): Long = {element.timestamp * 1000}}
2 WaterMark
2.1 什么是WaterMark
我们的数据从采集经过kafka,etl等操作要耗时的,再到流经source,到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生。
迟到数据是因为有延迟,简单的想法就多等一下。不要5秒的事件到了就关闭窗口,多等一会。我们要考虑的是当前事件的时间进展到底要按照什么时间算,也就是说假设现在5秒的窗口要关闭,设置延迟为2秒,那么5秒的数据来了就多等2秒,5秒的事件来了就相当于还没有进展到5秒,是进展到了5-2=3秒,也就是时间才进展到3秒。按照这种多等2秒的方式的话要等到时间戳是7的数据来了之后7-2=5才关闭5秒的窗口。这就提出了Watermark
乱序,其实就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。
Watermark可以从以下几个方面理解:①Watermark是一种衡量Event Time进展的机制。②Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。③数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。④Watermark可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行
Watermark延迟时间的设置一般根据数据的乱序情况定义,通常设置成最大乱序程度
2.2 Watermark传递
真正的Watermark其实就是一条特殊的记录,可以认为是插入数据流里面的一个特殊数据,Watermark可以理解为是一个有时间戳的特殊数据结构,就和数据一样一条一条来,后面处理数据如果是正常数据就正常处理,如果是Watermark就按照对于时间的操作该关闭窗口就关闭窗口。
Watermark必须单调递增,既然表示当前事件时间的进展,时间只能朝前不停的推进,另外总和当前数据的时间戳相关,数据的时间戳就应该是当前的事件时间。
当Flink接收到数据时,会按照一定的规则去生成Watermark。Watermark要求单调递增的话就选取所有当前已经来的数据里面最大的时间戳作为当前的事件时间,要多等一会的话在当前最大的时间戳基础上再减去一个延迟时间就可以了,即maxEventTime - 延迟时长。所以Watermark是基于数据携带的时间戳生成的,如果Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。
有序流的Watermarker(最大延迟时间为0)如下图所示:
乱序流的Watermarker(最大延迟时间为4)如下图所示:
上图中,采用周期性插入Watermark的生成策略,默认每200ms系统插入Watermark。我们设置的允许最大延迟到达时间为4s,当系统要插入第一个Watermark时查看此时数据中的最大事件时间为15,所以插入的Watermark是11s。过了200ms后到了第二次插入watermark的时候,此时数据中的最大事件时间为22,所以插入Watermark是18s。果我们的窗口1是1s-10s,窗口2是10s-20s,那么Watermarker为11到达之后需要触发窗口1。一旦触发以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。
2.3 Watermark的传递
Watermark的传递如上图所示。
Flink 的传递策略基本上遵循三点:①watermark 会以广播的形式在算子之间进行传播。并行任务没有数据交互不考虑,只要考虑上游有多少个任务给他发数据,下游要发送多少个数据到别的任务。②如果在程序里面收到了一个 Long.MAX_VALUE这个数值的 watermark,就表示对应的那一条流的一个部分不会再有数据发过来了,它相当于就是一个终止的标志。③单流输入取其大,多流输入取小。不同的上游任务发来的Watermark不一样,不能按照上游所有的Watermark中最大的Watermark来判定当前的事件时间,而是应该按照最小的那个来判定,因为Watermark代表的数据是他之前的数据都到期了,如果只接收到一个分区的Watermark是29表示这个分区29之前数据已经到齐了,但是不能保证当前任务不在接收29之前的数据,因为之前别的Watermark可能还没进展到29,所以应该按照最小的。
底层实现:上游有2个分区就会对每一个分区都去创建一个分区的Watermark(PARTITION Watermark),分别是29,14所以当前任务的事件时间是14,那么下游的子任务广播出去也是14,14之前的数据都到齐了。接下来一个分区来了一个新的Watermark是17,相当于这个分区的时间进展为17之前的都到齐,那么首先更新当前的Watermark,然后观察现在所有分区的Watermark最小值是否改变,如果改变那么事件时间就朝前进展,事件时间更新就往下游广播。
2.4 WaterMark使用
watermark对于有序数据,最常见的引用方式如下:
dataStream.assignTimestampsAndWatermarks(_.timestamp)
升序数据不用管Watermark,本身数据来就带有时间戳
watermark对于乱序数据,最常见的引用方式如下:
dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Element](Time.seconds(allowDealy)) {override def extractTimestamp(element: Element): Long = element.ds})
Watermark就是在assignTimestampsAndWatermarks里面定义出来的,BoundedOutOfOrdernessTimestampExtractor 是Flink内置提供的允许乱序最大延时的watermark生成方式,只需要重写其extractTimestamp方法。现在kafka源也支持直接生成Watermark,所以etl的时候可以把Watermark也产生。不过我们一般是在Flink把数据读进来做了转换之后马上分配一个Watermark。Watermark要保证正确性,延迟时间一般定义成最大的乱序程度(从数据里面提炼出来的参数)。同个分区数据可能会乱序,Watermark不会乱序(单调递增,取最大的时间戳减去延迟时间)
2.5 自定义WaterMark
watermark的生成策略有两种:一种是AssignerWithPeriodicWatermarks周期性生成(隔一段时间系统自动插入),另外一种是AssignerWithPunctuatedWatermarks根据特定标记生成。这两个接口都是Flink暴露了TimestampAssigner接口的子类型。实际生成中大量密集数据比较多,稀疏较少,所以一般使用周期性AssignerWithPeriodicWatermarks方式。
周期性的生成watermark系统会周期性的将watermark插入到流中。默认周期是200毫秒。可以使用ExecutionConfig.setAutoWatermarkInterval(watermarkInterval)方法进行设置。每隔watermarkInterval,Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark(watermarkInterval)方法。如果方法返回的watermark大于之前的watermark,新的watermark会被插入到流中。这个检查保证了watermark是单调递增的。如果方法返回的时间戳小于等于之前watermark,则不会产生新的watermark。
自定义一个周期性的时间戳抽取:
class MyPeriodicAssigner extends AssignerWithPeriodicWatermarks[Element] {val bound: Long = 60 * 1000 // 延时为1分钟var maxTs: Long = Long.MinValue // 观察到的最大时间戳override def getCurrentWatermark: Watermark = {new Watermark(maxTs - bound)}override def extractTimestamp(r: Element, previousTS: Long) = {maxTs = maxTs.max(r.timestamp)r.timestamp}
}
间断式地生成watermark。和周期性生成的方式不同,这种方式不是固定时间的,而是可以根据需要对每条数据进行筛选和处理,自定义一个间断式地生成watermar:
class MyPunctuatedAssigner extends AssignerWithPunctuatedWatermarks[Element] {val bound: Long = 60 * 1000override def checkAndGetNextWatermark(r: Element, extractedTS: Long): Watermark = {if (r.status == "sucess") {new Watermark(extractedTS - bound)} else {null}}override def extractTimestamp(r: Element, previousTS: Long): Long = {r.timestamp}
}