什么是水位线
在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,
用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数
据的时间戳来驱动的。
我们可以把时钟也以数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟
的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标
记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以
更新自己的时钟了。在 Flink 中,数据流中用来做时间标记的记号就叫做水位线。
水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,
主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个
数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
水位线的分类
有序的水位线
在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;而在实际应用中,
如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间
戳、插入水位线就做了大量的无用功。所以为了提高效率,一般会每隔一段时间生成一个水位
线,这个水位线的时间戳,就是当前最新数据的时间戳,所以这时的水位线,其实就是有序流中的一个周期性出现的时间标记。
无序的水位线
在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改
变,这就是所谓的“乱序数据”。
对于连续数据流,我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就
不再生成新的水位线,也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。
如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需
要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新
的水位线,。所以我们可以试着多等几秒,也就是把时钟调得更慢一些。最终的目的,就是要让窗口能够把所有迟到数据都收进来,得到正确的计算结果。对应到水位线上,其实就是要保证,当前时间已经进展到了这个时间戳,在这之后不可能再有迟到数据来了(延迟设的足够长)。
如何生成水位线
1.水位线的生成时机
水位线生产的最佳位置是在尽可能靠近数据源的地方,因为水位线生成时会做出一些有关元素顺序相对时间戳的假设。由于数据源读取过程是并行的,一切引起Flink跨行数据流分区进行重新分发的操作(比如:改变并行度,keyby等)都会导致元素时间戳乱序。但是如果是某些初始化的filter、map等不会引起元素重新分发的操作,可以考虑在生成水位线之前使用。
2.水位线生成策略
在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方 法:assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指
示事件时间。
val stream: DataStream[ClickEvent] = env.addSource(new ClickSource())
val withTimestampsAndWatermarks: DataStream[ClickEvent] = stream.assignTimestampsAndWatermarks(watermarkStrategy)
assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就是
所谓的“水位线生成策略”。WatermarkStrategy 中包含了一个“时间戳分配器”TimestampAssigner
和一个“水位线生成器”WatermarkGenerator。
trait WatermarkStrategy[T] extends TimestampAssignerSupplier[T] with WatermarkGeneratorSupplier[T] { def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[T] def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[T]
}
TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给
元素。时间戳的分配是生成水位线的基础。
WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在
WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。
onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,
以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作
onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间
为处理时间,可以调用环境配置的 setAutoWatermarkInterval()方法来设置,默认为
200ms。
env.getConfig.setAutoWatermarkInterval(60 * 1000L)
3. flink内置水位线生成器
- 有序流
val stream: DataStream[Event] = env.addSource(new ClickSource())
val withTimestampsAndWatermarks: DataStream[Event] = stream.assignTimestampsAndWatermarks( WatermarkStrategy .forMonotonousTimestamps[Event]() .withTimestampAssigner { (event, timestamp) => event.timestamp }
)
- 无序流
import java.time.Duration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector object OutOfOrdernessTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val clickSource = new ClickSource() val stream = env.addSource(clickSource) // 插入水位线的逻辑 val watermarkedStream = stream .assignTimestampsAndWatermarks( WatermarkStrategy .forBoundedOutOfOrderness(Time.seconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner[Event] { override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp }) ) watermarkedStream.print() env.execute("OutOfOrdernessTest") }
}