1 Window概述
streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。
Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。
注意:window一般在keyBy(KeyedStram)后。如果实在DataStream后的话是windowAll(不建议使用,会将所有数据汇总到一个分区计算)
window assigner确定了数据属于哪个窗口丢到正确的桶里面,还没有做计算。真正做计算是在window assigner后面的window function。下面两步和起来才是一个完整的窗口操作
.window(<window assigner>).aggregate(new AverageAggregate)
2 Window的类型
Window可以分成两类:TimeWindow(按照时间生成Window)和CountWindow(按照指定的数据条数生成一个Window,与时间无关)。
2.1 TimeWindow
对于TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。
2.1.1 Tumbling Window
将数据依据固定的窗口长度对数据进行切片,滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。特点:时间对齐,窗口长度固定,没有重叠。适合做BI统计等(做每个时间段的聚合计算)
2.1.2 Sliding Window
滑动窗口由固定的窗口长度和滑动间隔组成。滑动窗口分配器将元素分配到固定长度的窗口中,如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。适用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。特点:时间对齐,窗口长度固定,可以有重叠。
2.1.3 Session Window
由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。特点:时间无对齐。
3 WindowAPI
3.0 窗口分配器
窗口分配器 即 window() 方法,我们可以用 .window() 来定义一个窗口,然后基于这个 window 去做一些聚合或者其它处理操作。注意 window () 方法必须在 keyBy 之后才能用。Flink 提供了更加简单的 .timeWindow 和 .countWindow 方法,用于定义时间窗口和计数窗口。
window() 方法接收的输入参数是一个 WindowAssigner(窗口分配器),WindowAssigner 负责将每条输入的数据分发到正确的 window 中。Flink 提供了通用的 WindowAssigner:①滚动时间窗口( .timeWindow(Time.seconds(5)))②滑动时间窗口(.timeWindow(Time.seconds(15), Time.seconds(5)))③会话窗口( .window(EventTimeSessionWindows.withGap(Time.minutes(1)))④滚动计数窗口(.countWindow(5))⑤滑动计数窗口(.countWindow(20,10))
3.1 TimeWindow
TimeWindow将指定时间范围内的所有数据组成一个window,一次对一个window里面的所有数据进行计算。默认的时间窗口根据Processing Time 进行窗口的划分,将Flink获取到的数据根据进入Flink的时间划分到不同的窗口中。
(1)滚动窗口
时间间隔参数可以通过Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等指定。
val timeWindowStream = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).timeWindow(Time.seconds(5)).reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))
(2)滑动窗口
在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。每5s就计算输出结果一次,每一次计算的window范围是1分钟内的所有元素如下:
val timeWindowStream: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).timeWindow(Time.seconds(15), Time.seconds(5)).reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
或者
val timeWindowStream: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).window(SlidingEventTimeWindows.of(Time.minutes(1),Time.seconds(5)).reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))
3.2 CountWindow
CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果。
注意:CountWindow的window_size指的是相同Key的元素的个数,不是输入的所有元素的总数。
(1)滚动窗口
指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
val countWindowStream: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).countWindow(5).reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))
(2)滑动窗口
在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。每当某一个key的个数达到10的时候,触发计算,计算最近该key最近20个元素的内容如下
val keyedStream: KeyedStream[(String, Int), Tuple] = dataStream.map(r => (r.id, r.temperature)).keyBy(0).countWindow(20,10).sum(1)
3.3 window function
window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:①增量聚合函数:每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction。②全窗口函数:先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction就是一个全窗口函数
(1)ReduceFunction
下面的示例的展示了如何将递增的ReduceFunction与ProcessWindowFunction结合使用,以返回窗口中的最小事件以及窗口的开始时间。
val input: DataStream[SensorReading] = ...input.keyBy(<key selector>).timeWindow(<duration>).reduce((r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },( key: String,context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,minReadings: Iterable[SensorReading],out: Collector[(Long, SensorReading)] ) =>{val min = minReadings.iterator.next()out.collect((context.window.getStart, min))})
(2)AggregateFunction
下面的示例展示了如何将递增的AggregateFunction与ProcessWindowFunction结合起来计算平均值,并同时发出键和窗口以及平均值。
val input: DataStream[(String, Long)] = ...input.keyBy(<key selector>).timeWindow(<duration>).aggregate(new AverageAggregate(), new MyProcessWindowFunction())// Function definitions/*** The accumulator is used to keep a running sum and a count. The [getResult] method* computes the average.*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {override def createAccumulator() = (0L, 0L)override def add(value: (String, Long), accumulator: (Long, Long)) =(accumulator._1 + value._2, accumulator._2 + 1L)override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2override def merge(a: (Long, Long), b: (Long, Long)) =(a._1 + b._1, a._2 + b._2)
}class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]) = {val average = averages.iterator.next()out.collect((key, average))}
}
(3)FoldFunction
下面的示例显示如何将递增的FoldFunction与ProcessWindowFunction结合使用,以提取窗口中的事件数量,并返回窗口的键和结束时间。
val input: DataStream[SensorReading] = ...input.keyBy(<key selector>).timeWindow(<duration>).fold (("", 0L, 0),(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },( key: String,window: TimeWindow,counts: Iterable[(String, Long, Int)],out: Collector[(String, Long, Int)] ) =>{val count = counts.iterator.next()out.collect((key, window.getEnd, count._3))})
(4)ProcessWindowFunction
除了访问键控状态(任何富函数都可以),ProcessWindowFunction还可以使用范围限定在函数当前处理的窗口的键控状态。
val input: DataStream[(String, Long)] = ...input.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction())
3.4 trigger
触发器trigger定义 window 什么时候关闭,触发计算并输出结果。触发器确定窗口(由窗口分配者形成)何时准备好由窗口函数处理。每个WindowAssigner都有一个默认的触发器。如果默认触发器不符合需要,可以自定义触发器。
触发器接口有五个方法,允许触发器对不同的事件作出反应:
①onElement()方法对添加到窗口的每个元素调用。
②当一个已注册的事件时间计时器触发时,将调用onEventTime()方法。
③当触发注册的处理时间计时器时,将调用onProcessingTime()方法。
④onMerge()方法与有状态触发器相关,当两个触发器对应的窗口合并时,将它们的状态合并起来,例如在使用会话窗口时。
⑤clear()方法在删除相应窗口时执行所需的任何操作。
关于上述方法,有两点需要注意:
①前三个函数通过返回一个TriggerResult来决定如何处理它们的调用事件。动作可以是下列动作之一:CONTINUE:什么也不做;FIRE:触发计算;PURGE:清除窗口中的元素;FIRE_AND_PURGE:触发计算并随后清除窗口中的元素。
②这些方法中的任何一个都可以用于为将来的操作注册处理或事件时间计时器。
3.5 evitor
移除器evitor可以窗口触发前或触发后,定义移除某些数据的逻辑。一般和global window一起用,要自定义trigger和evitor,因为把所有的数据都存下来了,不用的数据丢弃。evitor接口有2个方法如下
/*** Optionally evicts elements. Called before windowing function.** @param elements The elements currently in the pane.* @param size The current number of elements in the pane.* @param window The {@link Window}* @param evictorContext The context for the Evictor*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);/*** Optionally evicts elements. Called after windowing function.** @param elements The elements currently in the pane.* @param size The current number of elements in the pane.* @param window The {@link Window}* @param evictorContext The context for the Evictor*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
evictBefore()包含要在窗口函数之前应用的驱逐逻辑,而evictAfter()包含要在窗口函数之后应用的驱逐逻辑。
Flink提供了三个预先实现的驱逐器:①CountEvictor:保持窗口中元素的用户指定数量,并丢弃从窗口缓冲区开始的剩余元素。②DeltaEvictor:获取delta函数和阈值,计算窗口缓冲区中最后一个元素与每个剩余元素之间的增量,并删除增量大于或等于阈值的元素。③TimeEvictor:以毫秒为单位的时间间隔作为参数,对于给定的窗口,它在元素中查找最大时间戳max_ts,并删除所有时间戳小于max_ts - interval的元素。
3.6 allowedLateness
允许处理迟到的数据,分布式计算数据可能是乱序的,开了时间窗口之后,可能属于他的数据姗姗来迟。假设正在是10点关闭窗口,允许1分钟的迟到数据,到10点不关但是要触发一次计算输出一个计算结果,后面一分钟再来的数据可以在这个基础上在做叠加触发一次计算再输出一个结果。也就是先输出结果后面更新。
注意:这些处理迟到数据的必须在数据自带的时间处理才有意义
val input: DataStream[T] = ...input.keyBy(<key selector>).window(<window assigner>).allowedLateness(<time>).<windowed transformation>(<window function>)
3.7 sideOutputLateData和getSideOutput
sideOutputLateData将迟到的数据放入侧输出流,getSideOutput获取侧输出流
val lateOutputTag = OutputTag[T]("late-data")val input: DataStream[T] = ...val result = input.keyBy(<key selector>).window(<window assigner>).allowedLateness(<time>).sideOutputLateData(lateOutputTag).<windowed transformation>(<window function>)val lateStream = result.getSideOutput(lateOutputTag)
3.8 window API 总览
Keyed Windowsstream.keyBy(...) <- keyed versus non-keyed windows.window(...) <- required: "assigner"[.trigger(...)] <- optional: "trigger" (else default trigger)[.evictor(...)] <- optional: "evictor" (else no evictor)[.allowedLateness(...)] <- optional: "lateness" (else zero)[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data).reduce/aggregate/fold/apply() <- required: "function"[.getSideOutput(...)] <- optional: "output tag"
Non-Keyed Windowsstream.windowAll(...) <- required: "assigner"[.trigger(...)] <- optional: "trigger" (else default trigger)[.evictor(...)] <- optional: "evictor" (else no evictor)[.allowedLateness(...)] <- optional: "lateness" (else zero)[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data).reduce/aggregate/fold/apply() <- required: "function"[.getSideOutput(...)] <- optional: "output tag"