详解 Flink 的 window API

一、window 概述

​ Streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 Flink window 是一种将无限数据切割为有限块进行处理的手段。window 是无限数据流处理的核心, window 将一个无限的 stream 拆分成有限大小的 ”buckets” 桶,然后可以在这些桶上做计算操作

二、window 类型

1. Time Window

时间窗口,按照时间生成 Window

1.1 Tumbling Time Window

滚动时间窗口

在这里插入图片描述

  • 将数据依据固定的窗口长度(时间)对数据进行切片
  • 特点:时间对齐,窗口长度固定,没有重叠
  • 重要参数:窗口长度(时间值)
  • 适用场景:适合做 BI 统计等(做每个时间段的聚合计算)
1.2 Sliding Time Window

滑动时间窗口

在这里插入图片描述

  • 滑动时间窗口由固定的窗口长度和滑动间隔组成
  • 特点:时间对齐,窗口长度固定,可以有重叠,数据最大的重叠数 = 窗口长度/滑动间隔
  • 重要参数:窗口长度和滑动间隔(时间值)
  • 适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)
1.3 Session Window

会话时间窗口

在这里插入图片描述

  • 由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的 session,也就是一段时间没有接收到新数据就会生成新的窗口
  • 特点:时间无对齐
  • 重要参数:会话最小时间间隔

2. Count Window

计数窗口,按照指定的数据条数生成一个 Window,与时间无关

2.1 Tumbling Count Window

滚动计数窗口

  • 将数据依据固定的窗口长度(计数)对数据进行切片
  • 特点:计数对齐,窗口长度固定,没有重叠
  • 重要参数:窗口长度(计数值)
2.2 Sliding Count Window

滑动计数窗口

  • 滑动计数窗口由固定的窗口长度和滑动间隔组成
  • 特点:计数对齐,窗口长度固定,可以有重叠,数据最大的重叠数 = 窗口长度/滑动间隔
  • 重要参数:窗口长度和滑动间隔(计数值)

三、window API 操作

在这里插入图片描述

1. Window 创建

1.1 非按键分区流

原始的 DataStream 调用 windowAll() 方法创建的窗口只能在一个任务(task)上执行,相当于并行度变成了 1,生产上不建议使用

AllWindowedStream stream = dataStream.windowAll()
1.2 按键分区流

Window 的创建推荐是 DataStream 经过 KeyBy 之后调用 window() 方法

在这里插入图片描述

/**通用开窗方法:WindowedStream<T> window()参数:WindowAssignerFlink 提供的通用 WindowAssigner:1.滚动窗口(tumbling window)2.滑动窗口(sliding window)3.会话窗口(session window)4.全局窗口(global window)
*/
public class TestWindowCreate {public static void main(String[] args) throw Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取文本数据/*sensorReading.txtsensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1 */DataStream<String> inputStream = env.readTextFile("sensorReading.txt");DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>(){@Overridepublic SensorReading map(String value) throws Exception {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//创建窗口//1.滚动时间窗口//1.1 使用通用 window() 方法dataStream.keyBy("id").window(TumblingProcessTimeWindows.of(Time.seconds(5)));//1.2 使用 timeWindow() 方法dataStream.keyBy("id").timeWindow(Time.seconds(5));//2.滑动时间窗口//2.1 使用通用 window() 方法dataStream.keyBy("id").window(SlidingProcessTimeWindows.of(Time.seconds(6), Time.seconds(2)));//2.2 使用 timeWindow() 方法dataStream.keyBy("id").timeWindow(Time.seconds(6), Time.seconds(2));//3.会话窗口dataStream.keyBy("id").window(EventTimeSessionWindows.withGap(Time.minutes(1)));//4.计数窗口//4.1 滚动计数窗口dataStream.keyBy("id").countWindow(10L);//4.2 滑动计数窗口dataStream.keyBy("id").countWindow(10L, 2L);env.execute();}
}

2. Window 函数

window function 定义了要对窗口中收集的数据做的计算操作

2.1 增量聚合函数

incremental aggregation functions,每条数据到来就进行计算,保持一个简单的状态,窗口结束时输出最终的状态。简单的 sum/max/maxBy/min/minBy 聚合函数都是增量聚合

2.1.1 ReduceFunction
/**方法签名:reduce(ReduceFunction<T> reduce)注意:ReduceFunction 的类型 T 不能改变
*/
public class TestWindowFunction {public static void main(String[] args) throw Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取文本数据/*sensorReading.txtsensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1 */DataStream<String> inputStream = env.readTextFile("sensorReading.txt");DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>(){@Overridepublic SensorReading map(String value) throws Exception {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//创建窗口并使用窗口函数dataStream.keyBy("id").timeWindow(Time.seconds(5)).reduce(new ReduceFunction<SenesorReading>() {@Overridepublic SenesorReading reduce(SenesorReading value1, SenesorReading value2) throws Exception {return value2;}}).print();env.execute();}
}
2.1.2 AggregateFunction
/**方法签名:aggregate(AggregateFunction<IN, ACC, OUT> aggregate)AggregateFunction 的 3 个泛型:1.IN:输入数据类型2.ACC:中间累加器的数据类型3.OUT:输出数据类型AggregateFunction 接口中需要实现的 4 个方法:1.createAccumulator():创建一个累加器,即为聚合创建了一个初始状态,每个聚合任务只会调用一次2.add():将输入的元素添加到累加器中。基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value 和当前的累加器accumulator;返回一个新的累加器值,是对聚合状态进行更新。每条数据到来之后都会调用这个方法3.getResult():从累加器中提取聚合的输出结果。可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如计算平均值,可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用4.merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging  Window)的场景就是会话窗口(Session Windows)
*/
public class TestWindowFunction {public static void main(String[] args) throw Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取文本数据/*sensorReading.txtsensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1 */DataStream<String> inputStream = env.readTextFile("sensorReading.txt");DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>(){@Overridepublic SensorReading map(String value) throws Exception {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//创建窗口并使用窗口函数dataStream.keyBy("id").timeWindow(Time.seconds(15)).aggregate(new AggregateFunction<SenesorReading, Integer, Integer>() {@Overridepublic Integer createAccumulator() { return 0;}@Overridepublic Integer add(SenesorReading value, Integer accumulator) {return accumulator + 1;}@Overridepublic Integer getResult(Integer accumulator) {return accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {return a + b;}}).print();env.execute();}
}
2.2 全窗口函数

full window functions,先收集窗口中的每一条数据,并在内部缓存起来,等到窗口要输出结果的时候再将所有数据进行计算并输出

2.2.1 WindowFunction
/**方法签名:apply(WindowFunction<IN, OUT, KEY, W extends Window> window)泛型:1.IN:输入数据类型2.OUT:输出数据类型3.KEY:分组 key 的类型4.W:窗口的类型需要实现的方法:void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out)1.key:分区的 key2.window:当前窗口信息3.input:窗口所有数据的可迭代集合4.out:数据收集器
*/
public class TestFullWindowFunction {public static void main(String[] args) throw Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取文本数据/*sensorReading.txtsensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1 */DataStream<String> inputStream = env.readTextFile("sensorReading.txt");DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>(){@Overridepublic SensorReading map(String value) throws Exception {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//创建窗口并使用窗口函数dataStream.keyBy("id").timeWindow(Time.seconds(15)).apply(new WindowFunction<SenesorReading, Tuple3<String, Long, Integer>, Tuple, TimeWindow>() {@Overridepublic void apply(Tuple key, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String, Long, Integer>> out) throws Exception { String id = key.getField(0);Long windowEnd = window.getEnd();Integer count = IteratorUtils.toList(input.iterator()).size();out.collect(new Tuple3<>(id, windowEnd, count));}}).print();env.execute();}
}
2.2.2 ProcessWindowFunction
/**方法签名:process(ProcessWindowFunction<IN, OUT, KEY, W extends Window> window)泛型:1.IN:输入数据类型2.OUT:输出数据类型3.KEY:分组 key 的类型4.W:窗口的类型需要实现的方法:void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out)1.key:分区的 key2.context:上下文环境对象3.input:窗口所有数据的可迭代集合4.out:数据收集器
*/
public class TestFullWindowFunction {public static void main(String[] args) throw Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//读取文本数据/*sensorReading.txtsensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1 */DataStream<String> inputStream = env.readTextFile("sensorReading.txt");DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>(){@Overridepublic SensorReading map(String value) throws Exception {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//创建窗口并使用窗口函数dataStream.keyBy("id").timeWindow(Time.seconds(15)).process(new ProcessWindowFunction<SenesorReading, Tuple3<String, Long, Integer>, Tuple, TimeWindow>() {@Overridepublic void process(Tuple key, Context context, Iterable<SensorReading> input, Collector<Tuple3<String, Long, Integer>> out) throws Exception { String id = key.getField(0);Long windowEnd = context.window().getEnd();Integer count = IteratorUtils.toList(input.iterator()).size();out.collect(new Tuple3<>(id, windowEnd, count));}}).print();env.execute();}
}

3. 其他可选 API

3.1 trigger

触发器主要是用来控制窗口什么时候触发计算,即执行窗口函数

/**参数:Trigger 抽象类内置实现类:EventTimeTrigger、ProcessingTimeTrigger 和 CountTrigger 等自定义实现类:继承 Trigger 抽象类并重写方法1.onElement():窗口中每到来一个元素,都会调用这个方法2.onEventTime():当注册的事件时间定时器触发时,将调用这个方法3.onProcessingTime():当注册的处理时间定时器触发时,将调用这个方法4.clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态
*/
trigger(Trigger<> trigger)
3.2 evictor

移除器主要用来定义移除某些数据的逻辑

/**参数:Evictor 接口实现方法:1.evictBefore():定义执行窗口函数之前的移除数据操作2.evictAfter():定义执行窗口函数之后的以处数据操作注意:默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的
*/
evictor(Evictor evictor)
3.3 allowedLateness

允许延迟的数据,设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算更新结果。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口

/**方法签名
*/
allowedLateness(Time time)
3.4 sideOutputLateData

将迟到的数据放入侧输出流,可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据

/**参数:OutputTag 输出标签,用来标记分支的迟到数据流
*/
sideOutputLateData(OutputTag<T> outputTag)//实例化方式:
OutputTag<String> outputTag = new OutputTag<String>("late") {};//提取侧输出流方法:由执行完所有窗口函数后得到的 DataStream 调用
getSideOutput(OutputTag<T> outputTag)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/849907.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

单片机原理及技术(三)—— AT89S51单片机(二)(C51编程)

一、AT89S51单片机的并行I/O端口 1.1 P0口 AT89S51的P0口是一个通用的I/O口&#xff0c;可以用于输入和输出。每个引脚都可以通过软件控制为输入或输出模式。 1.1.1 P0口的工作原理 P0口的工作原理是通过对P0寄存器的读写操作来控制P0口的引脚。 输出模式&#xff1a;当P0口…

UI学习笔记(一)

UI学习 一&#xff1a;UIView基础frame属性隐藏视图对象&#xff1a;UIView的层级关系 二&#xff1a;UIWindow对象三&#xff1a;UIViewController基础UIViewController使用 四&#xff1a;定时器与视图移动五&#xff1a;UISwitch控件六&#xff1a;滑动条和进度条七&#xf…

2021年vue面试题整理(万字解析)

一、对MVVM的理解 MVVM分为Model、View、ViewModel。 Model 代表数据模型&#xff0c;数据和业务逻辑都在Model层中定义&#xff1b;泛指后端进行的各种业务逻辑处理和数据操控&#xff0c;对于前端来说就是后端提供的 api 接口。 View 代表UI视图&#xff0c;负责数据的展示…

【Python数据分析--Numpy库】Python数据分析Numpy库学习笔记,Python数据分析教程,Python数据分析学习笔记(小白入门)

一&#xff0c;Numpy教程 给大家推荐一个很不错的笔记&#xff0c;个人长期学习过程中整理的 Python超详细的学习笔记共21W字点我获取 1-1 安装 1-1-1 使用已有的发行版本 对于许多用户&#xff0c;尤其是在 Windows 上&#xff0c;最简单的方法是下载以下的 Python 发行版…

Swift 序列(Sequence)排序面面俱到 - 从过去到现在(一)

概览 在任何语言中对序列(或集合)元素的排序无疑是一种司空见惯的常规操作,在 Swift 语言里自然也不例外。序列排序看似简单,实则“暗藏玄机”。 要想真正掌握 Swift 语言中对排序的“各种姿势”,我们还得从长计议。不如就先从最简单的排序基本功开始聊起吧。 在本篇博…

【十大排序算法】插入排序

插入排序&#xff0c;如一位细心的整理者&#xff0c; 她从序列的左端开始&#xff0c; 挨个将元素归位。 每当她遇到一个无序的元素&#xff0c; 便将它插入已经有序的部分&#xff0c; 直至所有元素有序排列。 她不张扬&#xff0c;却有效率&#xff0c; 用自己的方式&…

pdf文件在线压缩网站,pdf文件在线压缩工具软件

在数字化时代的今天&#xff0c;PDF文件已经成为我们日常生活和工作中不可或缺的一部分。然而&#xff0c;随着PDF文件的广泛使用&#xff0c;其文件大小问题也日益凸显。过大的PDF文件不仅占用了大量的存储空间&#xff0c;而且在传输和共享过程中也往往面临诸多不便。因此&am…

SylixOS网卡多 IP 配置

概述 网卡多 IP 是指在同一个网络接口上配置和绑定多个 IP 地址。 引进网卡多 IP 的目的主要有以下几个&#xff1a; 提供服务高可用性。通过在同一接口绑定多个 IP 地址&#xff0c;然后在服务端使用这些 IP 地址启动多个服务实例。这样在任意一 IP 出现问题时&#xff0c;可…

Redis学习(十二)Redis的三种删除策略

目录 一、背景二、Redis 的三种删除策略2.1 定时删除&#xff08;用CPU换内存空间&#xff09;2.2 定期删除2.3 惰性删除&#xff08;用内存换CPU性能&#xff09; 三、总结 一、背景 我们都知道 Redis 是一种内存数据&#xff0c;所有的数据均存储在内存中&#xff0c;可以通…

Android 代码打印meminfo

旨在替代adb shell dumpsys meminfo packageName&#xff0c;在log打印meminfo&#xff0c;以便分析内存情况 ActivityManager.MemoryInfo memoryInfo new ActivityManager.MemoryInfo(); activityManager.getMemoryInfo(memoryInfo); long totalMemory Runtime.getRuntime(…

大数据环境搭建@Hive编译

Hive3.1.3编译 1.编译原因1.1Guava依赖冲突1.2开启MetaStore后运行有StatsTask报错1.3Spark版本过低 2.环境部署2.1jdk安装2.2maven部署2.3安装图形化桌面2.4安装Git2.5安装IDEA 3.拉取Hive源码4.Hive源码编译4.1环境测试1.测试方法——编译2.问题及解决方案&#x1f4a5;问题1…

【设计模式】结构型-装饰器模式

在代码的海洋深处迷离&#xff0c;藏匿着一片神奇之地。那里有细腻的线条交错&#xff0c;是装饰器的奇妙艺术。 文章目录 一、登录的困境二、装饰器模式三、装饰器模式的核心组成部分四、运用装饰器模式五、装饰器模式的应用场景六、小结推荐阅读 一、登录的困境 假设我们有…

YOLOv5改进总目录 | backbone、Neck、head、损失函数,注意力机制上百种改进技巧

&#x1f4a1;&#x1f4a1;&#x1f4a1;本专栏所有程序均经过测试&#xff0c;可成功执行&#x1f4a1;&#x1f4a1;&#x1f4a1; 专栏地址&#xff1a; YOLOv5改进入门——持续更新各种有效涨点方法 点击即可跳转 报错 解决Yolov5的RuntimeError: result type Float can…

qq号码采集软件

寅甲QQ号码采集软件, 一款采集QQ号、QQ邮件地址&#xff0c;采集QQ群成员、QQ好友的软件。可以按关键词采集&#xff0c;如可以按地区、年龄、血型、生日、职业等采集。采集速度非常快且操作很简单。

C# WPF入门学习主线篇(九)—— ComboBox常见属性和事件

欢迎来到C# WPF入门学习系列的第九篇。在前面的文章中&#xff0c;我们已经学习了 Button、TextBox、Label 和 ListBox 控件。今天&#xff0c;我们将探讨 WPF 中的另一个重要控件——ComboBox。本文将详细介绍 ComboBox 的常见属性和事件&#xff0c;并通过示例代码展示其在实…

逻辑这回事(三)----时序分析与时序优化

基本时序参数 图1.1 D触发器结构 图1.2 D触发器时序 时钟clk采样数据D时&#xff0c;Tsu表示数据前边沿距离时钟上升沿的时间&#xff0c;MicTsu表示时钟clk能够稳定采样数据D的所要求时间&#xff0c;Th表示数据后边沿距离时钟上升沿的时间&#xff0c;MicTh表示时钟clk采样…

Spring Boot集成pmd插件快速入门Demo

1.什么是pmd插件&#xff1f; PMD 插件允许您在项目的源代码上自动运行PMD代码分析工具&#xff0c;并生成带有其结果的站点报告。它还支持与 PMD 一起分发的单独的复制/粘贴检测器工具&#xff08;或 CPD&#xff09;。 此版本的 Maven PMD 插件使用 PMD 6.42.0 并且需要 Jav…

从大到小吗?-分支c++

题目描述 给出 4 个整数&#xff0c;a , b , c , d 。 判断这四个数字是否满足从大到小。 输入 输入 4 个整数&#xff0c;a , b , c , d 。 输出 输出 Yes 或者 No 。 样例输入 4 3 2 1 样例输出 Yes 提示 分析&#xff1a; 这道题十分的简单&#xff0c;只需判断…

23 二叉搜索树

本节目标 1.内容安排说明 2.二叉搜索树实现 3.应用分析 4.进阶题 1. 内容安排说明 二叉树在c数据结构已经说过了&#xff0c;本节内容是因为&#xff1a; map和set特性需要先铺垫二叉搜索树&#xff0c;而二叉搜索树也是一种树形结构二叉搜索树的特性了解&#xff0c;有助于…

Linux:动态库和静态库的编译与使用

目录 1.前言 2.静态链接库 3.静态链接库生成步骤 4.静态链接库的使用 5.动态链接库 6.动态链接库生成步骤 7.动态链接库的使用 8.动态链接库无法加载 9.解决动态链接库无法加载问题 前言 在《MinGW&#xff1a;从入门到链接库》博客中简单介绍了如何编译动态链接库和静态链接库…