文章目录
- 1、窗口
- 2、分类
- 3、窗口API概览
- 4、窗口分配器
在批处理统计中,可以等待一批数据都到齐后,统一处理。但是在无界流的实时处理统计中,是来一条就得处理一条,那么如何统计最近一段时间内的数据呢? ⇒ 窗口的概念:将无限数据切割成一个个的"数据块"
1、窗口
Flink的窗口,可以理解为一个桶,水龙头下面的水桶的桶,窗口把无限流切割成一个个存储桶,流中数据被分发到对应的桶,再按需对每个桶中收集的数据做计算。
2、分类
按照驱动类型分
即按照窗口怎么去截取数据来分:
- 时间窗口:以时间点来定义窗口的开始和结束,
定点发车
,到点窗口就不再收集数据,且触发计算和窗口的销毁关闭 - 计数窗口:基于当前窗口里的元素个数,
人齐发车
按窗口数据分配规则:
滚动窗口:
- 固定大小,均匀切片
- 窗口之间首位相接,没有重叠,也没有间隔
- 每个数据都会被分配到一个窗口,且只会属于一个窗口
- 关键参数:窗口大小Windows size,可以时时长,也可以时元素数
滑动窗口:
- 窗口大小固定,但不是首尾相接
- 关键参数:窗口大小windows size、滑动步长windows slide,滑动步长即代表计算频率
- 滑动步长小于窗口大小时,出现重叠,此时的数据可能会被同时分配到多个窗口(size除以slide)
- 滚动窗口即size=slide的滑动窗口
- 举例:计算最近一个小时的订单数,每10分钟输出一次,即窗口长度一小时,滑动步长10分钟
会话窗口:
- 基于会话来分割数据
- 参数:会话超时时间size
- 相邻两条数据的间隔大于会话超时时间时,切割一次,开新的窗口
- 会话窗口长度不是固定的
- 会话窗口不会重叠,且留至少为size的间隔(session gap)
- 举个例子:上一条数据走了十分钟后,下一条数据还没有到来,即默认会话中止,下次数据来时开启新的会话窗口。关键词:间隔多久没有数据进来
全局窗口:
- 把相同key的所有数据都分配到同一个窗口
- 全局窗口没有结束时间点
- 窗口没结束时,默认不触发计算,因此需要自定义触发器才能做计算。比如Flink的计数窗口底层就是全局窗口
3、窗口API概览
调用窗口API前,要确定是否是基于按键分区(Keyed)的数据流KeyedStream来开窗,即调用窗口算子之前,是否有keyBy操作,按键分区的窗口
:
- 对DataStream先keyby,此时数据流被分成多条逻辑流,即一个个KeyedStream
- 基于KeyedStream做窗口操作,窗口计算会在多个并行子任务上同时执行
- 相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理
stream.keyBy(...).window(...)
对于非按键分区的窗口操作
,原始的DataStream就不会分成多条逻辑流,这时窗口逻辑只能在一个任务(task)上执行,就相当于强行把并行度变成了1
stream.windowAll(...)
对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。
对于窗口的操作,分为窗口分配器和窗口函数,前者指明了窗口的类型,是时间窗口、计数窗口、滑动、滚动还是会话窗口。后者定义窗口数据的计算和处理逻辑
stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)
4、窗口分配器
时间窗口
时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种:
- 滚动
stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //长度为5秒的滚动窗口.aggregate(...)
- 滑动
stream.keyBy(...)//长度为10秒、滑动步长为5秒的滑动窗口.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))).aggregate(...)
- 会话
stream.keyBy(...).window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) //会话的超时时间.aggregate(...)
看完window方法的传参,窗口分配器由类xxxEventTimeWindows提供:
//滚动
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregate(...)
//滑动
stream.keyBy(...).window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).aggregate(...)
//会话
stream.keyBy(...).window(EventTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)
计数窗口
- 滚动计数
stream.keyBy(...).countWindow(10)
- 滑动计数
stream.keyBy(...).countWindow(10,3) //传入两个参数:size和slide
全局窗口
全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调用.window(),分配器由GlobalWindows类提供。
stream.keyBy(...).window(GlobalWindows.create());
使用全局窗口,必须自行定义触发器才能实现窗口计算,否则不起作用。