Flink窗口(2)—— Window API

目录

窗口分配器

时间窗口

计数窗口

全局窗口

窗口函数

增量聚合函数

全窗口函数(full window functions)

增量聚合和全窗口函数的结合使用

Window API 主要由两部分构成:窗口分配器(Window Assigners)和窗口函数(Window Functions)

stream.keyBy(<key selector>).window(<window assigner>) //指明窗口的类型.aggregate(<window function>) //定义窗口具体的处理逻辑

在window()方法中传入一个窗口分配器;

在aggregate()方法中传入一个窗口函数;

窗口分配器

指定窗口的类型,定义数据应该被“分配”到哪个窗口

方法:.window()

参数:WindowAssigner

返回值:WindowedStream

如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个WindowAssigner,返回的是 AllWindowedStream

时间窗口

滚动处理时间窗口

stream.keyBy(...)  
//1..of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //窗口大小
//2.通过设置偏移量offset 来调整起始点的时间戳
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) //窗口大小,偏移量
.aggregate(...)

默认的窗口起始点时间戳是窗口大小的整倍数

如果我们定义 1 天的窗口,默认就从 0 点开始;如果定义 1 小时的窗口,默认就从整点开始

如果不想用默认值,就需要设置好偏移量

偏移量的作用:标准时间戳其实就是1970 年 1 月 1 日 0 时 0 分 0 秒 0 毫秒开始计算的一个毫秒数,而这个时间是以 UTC 时间,也就是 0 时区(伦敦时间)为标准的。我们所在的时区是东八区,也就是 UTC+8,跟 UTC 有 8小时的时差。我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0点开启窗口,这时是北京时间早上 8 点。那怎样得到北京时间每天 0 点开启的滚动窗口呢?只要设置-8 小时的偏移量就可以了

滑动处理时间窗口

stream.keyBy(...)
//窗口大小,滑动步长(同样也可以设置偏移量)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

处理时间会话窗口

stream.keyBy(...)
//超时时间
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)

以上是静态设置了超时时间,也可以动态设置:

.window(ProcessingTimeSessionWindows.withDynamicGap(newSessionWindowTimeGapExtractor<Tuple2<String, Long>>() {@Overridepublic long extract(Tuple2<String, Long> element) { 
// 提取 session gap 值返回, 单位毫秒
//提取了数据元素的第一个字段,用它的长度乘以 1000 作为会话超时的间隔return element.f0.length() * 1000;}
}

滚动事件时间窗口

stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5))) 
.aggregate(...)

滑动事件时间窗口

stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

事件时间会话窗口

stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)

处理时间和事件时间的逻辑完全相同

计数窗口

滚动计数窗口:.countWindow(10) //窗口大小

滑动计数窗口:.countWindow(10,3) //窗口大小,滑动步长

每个窗口统计 10 个数据,每隔 3 个数据就统计输出一次结果

全局窗口

.window(GlobalWindows.create());

需要自定义触发器

 

窗口函数

WindowedStream——>DataStream

增量聚合函数

像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以

区别在于不立即输出结果,而是要等到窗口结束时间

归约函数(ReduceFunction):和简单聚合时使用的ReduceFunction完全一样


聚合函数(AggregateFunction):取消类型一致的限制,直接基于 WindowedStream 调 用.aggregate()方法,不需要经过map处理;这个方法需要传入一个AggregateFunction 的实现类作为参数,源码如下:

@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {/*** Creates a new accumulator, starting a new aggregate.** <p>The new accumulator is typically meaningless unless a value is added via {@link* #add(Object, Object)}.** <p>The accumulator is the state of a running aggregation. When a program has multiple* aggregates in progress (such as per key and window), the state (per key and window) is the* size of the accumulator.** @return A new accumulator, corresponding to an empty aggregate.*/ACC createAccumulator();/*** Adds the given input value to the given accumulator, returning the new accumulator value.** <p>For efficiency, the input accumulator may be modified and returned.** @param value The value to add* @param accumulator The accumulator to add the value to* @return The accumulator with the updated state*/ACC add(IN value, ACC accumulator);/*** Gets the result of the aggregation from the accumulator.** @param accumulator The accumulator of the aggregation* @return The final aggregation result.*/OUT getResult(ACC accumulator);/*** Merges two accumulators, returning an accumulator with the merged state.** <p>This function may reuse any of the given accumulators as the target for the merge and* return that. The assumption is that the given accumulators will not be used any more after* having been passed to this function.** @param a An accumulator to merge* @param b Another accumulator to merge* @return The accumulator with the merged state*/ACC merge(ACC a, ACC b);
}

IN:输入数据类型

ACC:累加器类型

OUT:输出数据类型

AggregateFunction 接口中有四个方法:

除了继承AggregateFunction,自定义聚合函数之外,Flink为我们提供了一系列预定义的简单聚合方法,如sum()/max()/maxBy()/min()/minBy(),可以直接基于WindowedStream调用

全窗口函数(full window functions)

全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算

典型的批处理方式,适用于一些基于全部数据才能进行的运算等等

窗口函数(WindowFunction)

stream.keyBy(<key selector>).window(<window assigner>)//基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类.apply(new MyWindowFunction());

WindowFunction的实现类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口本身的信息

功能可以被 ProcessWindowFunction(处理窗口函数,见下) 全覆盖


处理窗口函数(ProcessWindowFunction)

增强版的 WindowFunction

基于 WindowedStream 调用.process()方法,传入一个 ProcessWindowFunction 的实现类

ProcessWindowFunction的泛型:ProcessWindowFunction<IN,OUT,KEY,W>

分别是输入数据类型,输出数据类型,分区键的类型,Window类型(比如,是时间窗口,就是TimeWindow)

process()方法的定义:

示例代码如下,自定义窗口处理函数来处理数据:

public class UvCountByWindowExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 将数据全部发往同一分区,按窗口统计UVstream.keyBy(data -> true).window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new UvCountByWindow()).print();env.execute();}//自定义窗口处理函数public static class UvCountByWindow extends ProcessWindowFunction<Event, String, Boolean, TimeWindow>{@Overridepublic void process(Boolean aBoolean, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {HashSet<String> userSet = new HashSet<>();// 遍历所有数据,放到Set里去重for (Event event: elements){userSet.add(event.user);}// 结合窗口信息,包装输出内容Long start = context.window().getStart();Long end = context.window().getEnd();out.collect("窗口: " + new Timestamp(start) + " ~ " + new Timestamp(end)+ " 的独立访客数量是:" + userSet.size());}}}

这里的Event是一个POJO类,ClickSource是自定义的数据源,其代码如下:
Event.java:

public class Event {public String user;public String url;public Long timestamp;public Event() {}public Event(String user, String url, Long timestamp) {this.user = user;this.url = url;this.timestamp = timestamp;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", url='" + url + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}
}

ClickSource.java: 

public class ClickSource implements SourceFunction<Event> {// 声明一个布尔变量,作为控制数据生成的标识位private Boolean running = true;@Overridepublic void run(SourceContext<Event> ctx) throws Exception {Random random = new Random();    // 在指定的数据集中随机选取数据String[] users = {"Mary", "Alice", "Bob", "Cary"};String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};while (running) {ctx.collect(new Event(users[random.nextInt(users.length)],urls[random.nextInt(urls.length)],Calendar.getInstance().getTimeInMillis()));// 隔1秒生成一个点击事件,方便观测Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}}

增量聚合和全窗口函数的结合使用

增量聚合函数处理计算会更高效;而全窗口函数的优势在于提供了更多的信息

我们之前在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者 ProcessWindowFunction

处理机制:

基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/619341.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

wpf使用Popup封装数据筛选框--粉丝专栏

类似于DevExpress控件的功能 这是DevExpress的winform筛选样式&#xff0c;如下&#xff1a; 这是DevExpress的wpf筛选样式&#xff0c;如下&#xff1a; 这是Excel的筛选样式&#xff0c;如下&#xff1a; 先看效果 本案例使用wpf原生控件封装&#xff0c;功能基本上都满足…

04.neuvector进程策略生成与管控实现

原文链接&#xff0c;欢迎大家关注我的github 一、进程学习管控的实现方式 策略学习实现&#xff1a; 进程的学习与告警主要依据通过netlink socket实时获取进程启动和退出的事件: 1.创建netLink socket&#xff1b; 2.通过创建netlink的fd对进程的事件进行捕获与更新&#x…

复试 || 就业day14(2024.01.10)算法篇

文章目录 前言字符串中第二大的数字字符串中不同整数的数目判断句子是否为全字母句长度为三且各字符不同的子字符串检查是否区域内所有整数都被覆盖*重新分配字符使所有字符串都相等可以输入的最大单词数检查是否所有字符出现次数相同差的绝对值为 K 的数对数目至少在两个数组中…

2024.1.12每日一题

LeetCode 2085.统计出现过一次的公共字符串 2085. 统计出现过一次的公共字符串 - 力扣&#xff08;LeetCode&#xff09; 题目描述 给你两个字符串数组 words1 和 words2 &#xff0c;请你返回在两个字符串数组中 都恰好出现一次 的字符串的数目。 示例 1&#xff1a; 输…

什么是数通技术?以太网交换机在数通技术中的精要

什么是数通技术&#xff1f; 数通技术是指数字通信技术&#xff0c;它涵盖了数字信号处理、数据传输、网络通信等领域。通信工程师在数通技术中负责设计、建设和维护数字通信系统&#xff0c;以实现可靠、高效的信息传输。这涉及到数字信号的编解码、调制解调、数据压缩、网络…

PTA 1117 数字之王 C++实现 简易代码

给定两个正整数 N1​<N2​。把从 N1​ 到 N2​ 的每个数的各位数的立方相乘&#xff0c;再将结果的各位数求和&#xff0c;得到一批新的数字&#xff0c;再对这批新的数字重复上述操作&#xff0c;直到所有数字都是 1 位数为止。这时哪个数字最多&#xff0c;哪个就是“数字…

控制台项目和ASP.Net Core 1.项目创建 2.一键启动多个服务 3.引入别的库

感悟&#xff1a; 1.注意选择&#xff1a;.NET/.Net Core下面的控制台或者ASP.NET Core web应用&#xff0c;而且只有.net core的项目是跨平台的&#xff0c;选错的话&#xff0c;是无法发布到linux上的。 2.c#的命名空间和java包的区别&#xff1a; c#中是按照包来的&#x…

python MySQL学习

免费 MySQL Community Server 社区版本 免费 但是MySQL 不提供官方技术支持 MySQL Cluster 集群版 开源免费 可将几个 MySQL Server 封装乘一个Server 收费 MySQL Enterprise Edition 商业版 该版本是收费的 可以试用30天 官方提供技术支持 MySQL Cluster CGE 高级集群版…

软件测试|如何在Windows电脑中安装多个版本的Python?

简介 安装多个版本的Python在Windows电脑上是一项非常常见的任务&#xff0c;特别是当你需要在不同的Python项目中使用不同版本的Python时。下面是一个详细的步骤指南&#xff0c;帮助你在Windows上安装多个Python版本。 步骤1&#xff1a;下载Python安装程序 访问Python官方…

【Verilog】期末复习——分别画出下面两个程序综合后的电路图/reg型数据和wire型数据的区别

系列文章 数值&#xff08;整数&#xff0c;实数&#xff0c;字符串&#xff09;与数据类型&#xff08;wire、reg、mem、parameter&#xff09; 运算符 数据流建模 行为级建模 结构化建模 组合电路的设计和时序电路的设计 有限状态机的定义和分类 期末复习——数字逻辑电路分…

平衡小车——编码器

学习目标 了解编码器的构成理解编码器采样原理掌握编码器获取转速信息学习内容 编码器组成 左侧的减速齿轮中间的电机部分右侧的电路板减速齿轮 将电机转速通过齿轮按照一定比例进行降速。 电路板 电路板中,包含了一个圆形磁体,还有两个霍尔传感器。 电机转动时,圆形的磁…

iOS Universal Links(通用链接)详细教程

一&#xff1a;Universal Links是用来做什么的&#xff1f; iOS9.0推出的用于应用之间跳转的一种机&#xff0c; 通过一个https的链接启动app。如果手机有安装需要启动的app&#xff0c;可实现无缝跳转。如果没有安装&#xff0c;会打开网页。 实现场景&#xff1a;微信链接无…

如何开发测试框架?

基本概念 库 英文单词叫Library&#xff0c;库是由代码集合成的一个产品&#xff0c;供程序员调用。面向对象的代码组织形成的库叫类库&#xff0c;面向过程的代码组织形成的库叫函数库。 框架 英文单词叫Framework&#xff0c;框架是为解决一个或一类问题而开发的产品&#x…

视频剪辑达人分享:批量减片时时长并调整播放倍速的技巧

在视频剪辑中&#xff0c;经常要对多个视频片段进行时长调整和播放倍速的修改。如果一个个手动操作&#xff0c;不仅效率低下&#xff0c;还容易出错。如何快速批量处理这些片段呢&#xff1f;现在一起来看看云炫AI智剪批量减片时长并调整播放的具体步骤。 原视频和剪辑后的视…

Linux系统SSH远程管理服务概述

目录 一.SSH协议 1.定义 2.优点 &#xff08;1&#xff09;加密 &#xff08;2&#xff09;压缩 3.SSH的客户端与服务端 &#xff08;1&#xff09;客户端 &#xff08;2&#xff09;服务端 4.原理 5.实验&#xff1a;使用ssh远程登录 二.OpenSSH服务器 1.概念 2.…

CSS3动画效果详解

CSS3动画 在CSS3中&#xff0c;animation属性用于实现元素的动画。 animation属性跟transition属性在功能实现上是非常相似的&#xff0c;都是通过改变元素的属性值来实现动画效果。但是&#xff0c;这两者实际上有着本质的区别 对于transition属性来说&#xff0c;它只能将…

PADS 改变图纸和图页边界大小

PADS 改变图纸和图页边界大小 有时候画一画原理图发现画布不够用了&#xff0c;可改变图纸大小&#xff0c;对应的改变图页边界 若图页边界怎么选择都改变不了&#xff0c;可将途中图页边界删除&#xff0c;重新加载 选择对应的图页边距就好啦 分类: PADS

重学Java 4 进制转换和位运算

天赋不好好使用的话&#xff0c;可是会被收回的哦 ——24.1.13 一、进制转换 1.常用的进制 2.十进制和二进制之间的转换 1.十进制转二进制 辗转相除法——循环除以2&#xff0c;取余数&#xff0c;除到商为0为止&#xff0c;除完后&#xff0c;由下往上&#xff0c;得出换算后…

虚拟主机 如何上传大于100M的文件 php网站程序

问题 虚拟主机上传文件大小限制100m&#xff0c; 有时会遇到非常大的文件上传&#xff0c;上传过程中耗时非常久&#xff0c; 可能服务器的限制设置了上传文件尺寸&#xff0c;返回“413 request entity too large” 整体逻辑 前端&#xff1a;上传文件时&#xff0c;进行文…

C语言——内存函数【memcpy,memmove,memset,memcmp】

&#x1f4dd;前言&#xff1a; 在之前的文章C语言——字符函数和字符串函数&#xff08;一&#xff09;中我们学习过strcpy和strcat等用来实现字符串赋值和追加的函数&#xff0c;那么除了字符内容&#xff0c;其他的数据&#xff08;例如整型&#xff09;能否被复制或者移动呢…