flink的窗口

目录

窗口分类

1.按照驱动类型分类

1. 时间窗口(Time window)

2.计数窗口(Count window)

2.按照窗口分配数据的规则分类

窗口API分类

API调用

窗口分配器器:

窗口函数

增量聚合函数:

全窗口函数

flink sql 窗口函数

窗口 | Apache Flink

窗口分类

1.按照驱动类型分类

1. 时间窗口(Time window)

    时间窗口以时间点定义窗口的开始和结束,因此截取出就是某一段时间的数据。当到达结束时间时窗口不在接受数据,触发计算输出结果,并关闭销毁窗口。

flink有一个专门的类用来表示时间窗口TimeWindow,这个类只有两个私有属性;窗口的方法获取最大时间戳为end-1,因此窗口[start,end)  左开右闭;

@PublicEvolving
public class TimeWindow extends Window {private final long start;private final long end;public TimeWindow(long start, long end) {this.start = start;this.end = end;}@Overridepublic long maxTimestamp() {return end - 1;}
2.计数窗口(Count window)

计数窗口是基于元素个数截取,在到达固定个数是就触发计算并关闭窗口。

3.全局窗口(Global Windows)

是计数窗口的底层实现,窗口分配器由GlobalWindows类提供,需要自定义触发器实现窗口的计算;

 stream.keyBy(data -> true).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
//                .max().aggregate(new AvgPv()).print();查看源代码,windou函数后见windowStrream时获取默认的触发器
@PublicEvolvingpublic WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input;this.builder =new WindowOperatorBuilder<>(windowAssigner,windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),  //湖区触发器input.getExecutionConfig(),input.getType(),input.getKeySelector(),input.getKeyType());}// 计数窗口底层采用全局窗口加计数器来实现public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));}public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {return window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));}

2.按照窗口分配数据的规则分类

滚动窗口(Tumbling Window):窗口大小固定,窗口没有重叠;

滑动窗口 (Sliding Window):滑动窗口有重叠,也可以没有重叠,如果窗口size和滑动size相等,等于滚动窗口;

会话窗口 (Session Window):基于会话对窗口进行分组,与其他两个不同的是,会话窗口是借用会话窗口的超时失效机触发窗口计算,当数据到来后会开启一个窗口,如果在超时时间内有数据陆续到来,窗口不会关闭,反之会关闭;极端情况,如果数据总能在窗口超时时间到达前远远不断的到来,该窗口会一直开启不会关闭;

全局窗口 (Global Window):比较通用的窗口,该窗口会把数据分配到一个窗口中,窗口为全局有效,会把相同key的数据分配到同一个窗口中,默认不会触发计算,跟没有窗口一样,需要自定义触发器才能使用;

窗口API分类

窗口大的分类可以分为按键分区和非按键分区两种:按键分需要经过keyby操作,会把数据进行分发,实现负载均分,可以并行处理更大的数据量。而非按键分区窗口,相当于并行度为1,使用上直接调用windowall(),因此一般并不推荐使用;

stream
.keyby(...)  //流按键分区
.window(...)  //定义窗口分配器
[.trigger()] //设置出发器
[.evictor()]   //设置移除器
[.allowedLateness()]  // 设置延迟时间
[.sideOutputLateData()]  //设置侧输出流
.reduce/aggregate/fold/apply()  //处理函数
[.getSideOutput()] //获取侧输出流stream
.windowAll(...)  //定义窗口分配器
[.trigger()] //设置出发器
[.evictor()]   //设置移除器
[.allowedLateness()]  // 设置延迟时间
[.sideOutputLateData()]  //设置侧输出流
.reduce/aggregate/fold/apply()  //处理函数
[.getSideOutput()] //获取侧输出流

API调用

窗口操作包含两个重要的概念:窗口分配器(window Assigners)和窗口函数(window function)两部分;

窗口分配器用于构建窗口,确定窗口类型,确定数据划分哪一个窗口,窗口函数制定数据的计算规则;

窗口分配器器:

作用:窗口分配器用来划分窗口属于哪一个窗口;

窗口按照时间可以划分为:滚动、滑动和session,三种类型窗口;

窗口计数划分:滚动和滑动两种类型;

  eventStream.keyBy(data -> data.url).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate();
窗口函数

窗口函数按照计算特点可以分为增量计算和全量计算;

增量聚合函数:数据到达后立即计算,窗口只保存中间结果。效率高,性能好,但不够灵活。

全量聚合函数:缓存窗口的所有元素,触发后统一计算,效率低,但计算灵活。

增量聚合函数:

数据进入窗口会参与计算,窗口结束前只需要保留一个聚合后的状态值,内存压力小。

1.规约函数(ReduceFunction):数据保存留一个状态,输入类型和输出类型必须一致,来一条数据会处理将数据合并到状态中;

 stream.keyBy(r -> r.f0)// 设置滚动事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {// 定义累加规则,窗口闭合时,向下游发送累加结果return Tuple2.of(value1.f0, value1.f1 + value2.f1);}}).print();

sum、max、min等底层都是通过同名AggregateFunction实现(非下面的聚合函数),本质还是实现ReduceFunction结构重写了reduce方法;

2.聚合函数(AggrateFunction):在规约函数基础上进行完善。解决输出和输入类型必须一致的限制问题。实现应用更灵活;

  // 所有数据设置相同的key,发送到同一个分区统计PV和UV,再相除stream.keyBy(data -> true).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2))).aggregate(new AvgPv()).print();public static class AvgPv implements AggregateFunction<Event, Tuple2<HashSet<String>, Long>, Double> {@Overridepublic Tuple2<HashSet<String>, Long> createAccumulator() {// 创建累加器return Tuple2.of(new HashSet<String>(), 0L);}@Overridepublic Tuple2<HashSet<String>, Long> add(Event value, Tuple2<HashSet<String>, Long> accumulator) {// 属于本窗口的数据来一条累加一次,并返回累加器accumulator.f0.add(value.user);return Tuple2.of(accumulator.f0, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<HashSet<String>, Long> accumulator) {// 窗口闭合时,增量聚合结束,将计算结果发送到下游return (double) accumulator.f1 / accumulator.f0.size();}@Overridepublic Tuple2<HashSet<String>, Long> merge(Tuple2<HashSet<String>, Long> a, Tuple2<HashSet<String>, Long> b) {return null;}}
全窗口函数

全窗口函数会将进入窗口的数据先进行缓存,然后在窗口关闭时一起计算,缓存数据会占用内存资源,如果一个窗口数据量太大时,可能出现内存溢出的问题;

全窗口函数可以划分窗口函数(windowFunction)和处理窗口函数(processWindowFunction)两种;

窗口函数(windowFunction):老版本通用窗口接口,window()后调用apply(),传入实现windowFunction接口; 缺点是不能获取上下文信息,也没有更高级的功能。因为在功能上可以被processWindowFunction全覆盖,因此主键被弃用

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param window The window that is being evaluated.* @param input The elements in the window being evaluated.* @param out A collector for emitting elements.* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

处理窗口函数(processWindowFunction):是窗口API中最底层通用的窗口函数接口,可以获取到上问对象(context),实现为调用process方法传入自定义继承ProcessWindowFunction类;

input.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.minutes(5))).process(new MyProcessWindowFunction());/* ... */public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {@Overridepublic void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {long count = 0;for (Tuple2<String, Long> in: input) {count++;}out.collect("Window: " + context.window() + "count: " + count);}
}

注意:一般增量窗口函数和全量窗口函数可以一起使用,window().aggregate()方法可以传入两个函数,第一个采用增量聚合函数,第二个传入全量函数,这样数据在进入窗口会触发增量计算,窗口不会缓存数据。当窗口关闭触发计算时,结果数据穿度到全量计算,参数Iterable中一般只有一个数据;

aggregate(acct1,acct2)

flink sql 窗口函数

flink sql 窗口也包含常见的滚动窗口、滑动窗口、session窗口,但还有一种累计窗口。

在flink1.13版本后flinksql支持累计窗口CUMULATE,可以实现没5分钟触发一次计算,输出当天的累计数据,使用样例

SELECT cast(PROCTIME() as timestamp_ltz) as window_end_time,manufacturer_name,event_id,case when state is null then -1 else state end ,cast(sum(agg)as string ) as agg
FROM TABLE(CUMULATE(TABLE dm_cumulate, DESCRIPTOR(ts1), INTERVAL '5' MINUTES, INTERVAL '1' DAY(9)))
GROUP BYwindow_end,window_start,manufacturer_name,event_id,case when state is null then -1 else state end

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/38091.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL高级-MVCC-原理分析(RC级别)

文章目录 1、RC隔离级别下&#xff0c;在事务中每一次执行快照读时生成ReadView2、先来看第一次快照读具体的读取过程&#xff1a;3、再来看第二次快照读具体的读取过程: 1、RC隔离级别下&#xff0c;在事务中每一次执行快照读时生成ReadView 我们就来分析事务5中&#xff0c;两…

VBA代码解决方案第十五讲:如何对单元格区域进行高亮显示

《VBA代码解决方案》(版权10028096)这套教程是我最早推出的教程&#xff0c;目前已经是第三版修订了。这套教程定位于入门后的提高&#xff0c;在学习这套教程过程中&#xff0c;侧重点是要理解及掌握我的“积木编程”思想。要灵活运用教程中的实例像搭积木一样把自己喜欢的代码…

【操作系统期末速成】EP06 | 学习笔记(基于五道口一只鸭)

文章目录 一、前言&#x1f680;&#x1f680;&#x1f680;二、正文&#xff1a;☀️☀️☀️2.1 考点十四&#xff1a;同步互斥的基本概念2.2 考点十五&#xff1a;实现临界区互斥的基本方法2.3 考点十六&#xff1a;信号量的含义及常用信号量 一、前言&#x1f680;&#x1…

操作系统概论(二)

一、单项选择题(本大题共20小题&#xff0c;每小题1分&#xff0c;共20分) 在每小题列出的四个备选项中只有一个选项是符合题目要求的&#xff0c;请将其代码填写在题后的括号内。错选、多选或未选均无分。 1&#xff0e;操作员接口是操作系统为用户提供的使用计算机系统的手…

Vitis IDE 艰难切换--从传统 Vitis GUI 到 2024.1 统一软件界面

目录 1. 简介 2. 界面展示 2.1 启动 2.2 Flow Navigator 2.1.1 C Simulation Dialog 2.1.2 C Synthesis 2.1.3 C/RTL Co-simulation 2.1.4 Implementation 2.1.5 Package 3. C Synthesis 详解 3.1 Classic Configuration Settings 3.1.1 config_array_partition 3…

MySQL进阶:存储过程和函数

存储过程和函数 1. 简介2. 创建存储过程使用MySQL工作台创建存储过程 3. 删除存储过程4. 参数带默认值的参数参数验证输出参数 5. 变量6. 函数7. 其他约定 1. 简介 存储过程三大作用&#xff1a; 储存和管理SQL代码&#xff08;置于数据库中&#xff0c;与应用层分离&#xf…

快递物流仓库管理系统java项目springboot和vue的前后端分离系统java课程设计java毕业设计

文章目录 快递物流仓库管理系统一、项目演示二、项目介绍三、部分功能截图四、部分代码展示五、底部获取项目源码&#xff08;9.9&#xffe5;带走&#xff09; 快递物流仓库管理系统 一、项目演示 快递物流仓库管理系统 二、项目介绍 语言: Java 数据库&#xff1a;MySQL 前…

宝塔安装rabbitMQ实战

服务器环境说明 阿里云服务器、宝塔、centos7 一、下载erlang 原因&#xff1a;RabbitMQ服务端代码是使用并发式语言Erlang编写的&#xff0c;安装Rabbit MQ的前提是安装Erlang。 下载地址&#xff1a;http://www.erlang.org/downloads 下载对应的版本&…

山东省著名烈士孙善师孙善帅故居布展喜添新篇

人海信息网山东讯&#xff08;张春兄、冯爱云&#xff09; “……他们以钢铁般的意志&#xff0c;坚守共产党员的使命&#xff0c;他们就是泺口九烈士的孙善师孙善帅兄弟&#xff01;”6月28日&#xff0c;对于山东省著名烈士孙善师孙善帅故居来说&#xff0c;又是一个不平凡的…

LabVIEW电压电流实时监测系统

开发了一种基于LabVIEW和研华&#xff08;Advantech&#xff09;数据采集卡的电压电流实时监测系统&#xff0c;通过高效的数据采集和处理&#xff0c;为工业和科研用户提供高精度、实时的电压电流监测解决方案。系统采用研华USB-4711A数据采集卡&#xff0c;结合LabVIEW编程环…

AI论文速读 | 2024[KDD]自适应时空图神经网络中图中奖彩票的预训练识别

题目&#xff1a;Pre-Training Identification of Graph Winning Tickets in Adaptive Spatial-Temporal Graph Neural Networks 作者&#xff1a;Wenying Duan, Tianxiang Fang, Hong Rao, Xiaoxi He 机构&#xff1a;南昌大学&#xff0c;澳门大学 arXiv网址&#xff1a;h…

Python数据分析-股票分析和可视化(深证指数)

一、内容简介 股市指数作为衡量股市整体表现的重要工具&#xff0c;不仅反映了市场的即时状态&#xff0c;也提供了经济健康状况的关键信号。在全球经济体系中&#xff0c;股市指数被广泛用于预测经济活动&#xff0c;评估投资环境&#xff0c;以及制定财政和货币政策。在中国…

IEEE JSTSP综述:从信号处理领域分析视触觉传感器的研究

触觉传感器是机器人系统的重要组成部分&#xff0c;虽然与视觉相比触觉具有较小的感知面积&#xff0c;但却可以提供机器人与物体交互过程中更加真实的物理信息。 视觉触觉传感是一种分辨率高、成本低的触觉感知技术&#xff0c;被广泛应用于分类、抓取、操作等领域中。近期&a…

如何跑起来一个前后端项目

后端部署 第一步配置自己的maven 第二步优先导入自己本地jar包当本地没有在从远程下载 第三步找到配置文件 第四步成功运行后端部署完毕 前端部署 第一步看看项目node_modules有没有文件如果有就是已经安装好了对应的依赖&#xff0c;没有执行npm install 第二步运行即可

决策树划分属性依据

划分依据 基尼系数基尼系数的应用信息熵信息增益信息增益的使用信息增益准则的局限性 最近在学习项目的时候经常用到随机森林&#xff0c;所以对决策树进行探索学习。 基尼系数 基尼系数用来判断不确定性或不纯度&#xff0c;数值范围在0~0.5之间&#xff0c;数值越低&#x…

【知识学习】Unity3D中Scriptable Render Pipeline的概念及使用方法示例

Unity3D中的Scriptable Render Pipeline&#xff08;SRP&#xff09;是一种高度可定制的渲染管线框架&#xff0c;允许开发者完全控制渲染流程&#xff0c;以适应不同的渲染需求和硬件平台。SRP使得开发者可以编写自己的渲染逻辑&#xff0c;包括摄像机管理、渲染设置、光照处理…

【机器学习】K-means++: 一种改进的聚类算法详解

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 K-means: 一种改进的聚类算法详解引言1. K-means算法回顾1.1 基本概念1.2 局限性…

RDMA建链的3次握手和断链的4次挥手流程?

文章目录 基础信息建链 3次握手断链4次挥手建联状态active端passive端 报文结构函数关系其他后记 基础信息 CM: Communication Management 通信管理 连接管理SIDR: Service ID Resolution Protocol. 作用&#xff1a; enables users of Unreliable Datagram service to locate …

实验4 图像空间滤波

1. 实验目的 ①掌握图像空间滤波的主要原理与方法&#xff1b; ②掌握图像边缘提取的主要原理和方法&#xff1b; ③了解空间滤波在图像处理和机器学习中的应用。 2. 实验内容 ①调用 Matlab / Python OpenCV中的函数&#xff0c;实现均值滤波、高斯滤波、中值滤波等。 ②调…

【操作系统期末速成】 EP02 | 学习笔记(基于五道口一只鸭)

文章目录 一、前言&#x1f680;&#x1f680;&#x1f680;二、正文&#xff1a;☀️☀️☀️2.1 考点二&#xff1a;操作系统的功能及接口2.2 考点三&#xff1a;操作系统的发展及分类2.3 考点四&#xff1a;操作系统的运行环境&#xff08;重要&#xff09; 一、前言&#x…