【Flink】窗口(Window)

窗口理解

窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。 本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。

对窗口的正确理解
我们将窗口理解为一个一个的水桶,数据流(stream)就像水流,每个数据都会分发到对应的桶中,当达到结束时间时,对每个桶中收集的数据进行计算处理
在这里插入图片描述

Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口

窗口的分类

按照驱动类型分

时间窗口(Time Window)

以时间来定义窗口的开始和结束,获取某一段时间内的数据(类比于我们的定时发车

计数窗口(Count Window)

计数窗口是基于元素的个数来获取窗口,达到固定个数时就计算并关闭窗口。(类比于我们的人齐才发车

按照窗口分配数据的规则分类

滚动窗口(Tumbling Window)

窗口之间没有重叠,也不会有间隔的首尾相撞状态,这样,每个数据都会被分到一个窗口,而且只会属于一个窗口。
滚动窗口的应用非常广泛,它可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。
在这里插入图片描述

DataStream<T> input = ...;// 滚动 event-time 窗口
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 滚动 processing-time 窗口
input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);

滑动窗口(Sliding Windows)

滑动窗口大小也是固定的,但是窗口之间并不是首尾相接的,而是重叠的。
在这里插入图片描述

DataStream<T> input = ...;// 滑动 event-time 窗口
input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口,偏移量为 -8 小时
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<windowed transformation>(<window function>);

会话窗口(Session Windows)

会话窗口,是基于“会话”(session)来对数据进行分组的,会话窗口只能基于时间来定义。
在这里插入图片描述

DataStream<T> input = ...;// 设置了固定间隔的 event-time 会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 event-time 会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);// 设置了固定间隔的 processing-time session 窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 processing-time 会话窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);

全局窗口

这种窗口对全局有效,会把相同的key的所有数据分配到同一个窗口中,这种窗口没有结束时间,默认不会触发计算,如果希望对数据进行处理,需要自定义“触发器”。
在这里插入图片描述

DataStream<T> input = ...;input.keyBy(<key selector>).window(GlobalWindows.create()).<windowed transformation>(<window function>);

计数窗口

计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink为我们提供了非常方便的接口:直接调用.countWindow()方法

滚动计数窗口

滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小。

stream.keyBy(...).countWindow(10)
滑动计数窗口

与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。

stream.keyBy(...).countWindow(103)

窗口函数(Window Functions)

定义了 window assigner 之后,我们需要指定当窗口触发之后,我们如何计算每个窗口中的数据, 这就是 window function 的职责了
窗口函数有三种:ReduceFunction、AggregateFunction 或 ProcessWindowFunction。

ReduceFunction

ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。 Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new ReduceFunction<Tuple2<String, Long>>() {//v1 和v2是 2个相同类型的输入参数public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {return new Tuple2<>(v1.f0, v1.f1 + v2.f1);}});

AggregateFunction

ReduceFunction 是 AggregateFunction 的特殊情况。 AggregateFunction 接收三个类型:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)。

/*** The accumulator is used to keep a running sum and a count. The {@code getResult} method* computes the average.*/
private static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate());

接口中有四个方法:

  • createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add():将输入的元素添加到累加器中。
  • getResult():从累加器中提取聚合的输出结果。
  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。

可以看到,AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

ProcessWindowFunction

ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable, 以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。 ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据

public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("127.0.0.1", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(WaterSensor::getId);WindowedStream<WaterSensor, String, TimeWindow> sensorWS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) {// 上下文可以拿到window对象,还有其他东西:侧输出流 等等long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements);}}).print();env.execute();}
}

增量聚合和全窗口函数的结合使用

在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。
我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。

// ReduceFunction与WindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<TRKW> function) // ReduceFunction与ProcessWindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)// AggregateFunction与WindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)// AggregateFunction与ProcessWindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,ProcessWindowFunction<VRKW> windowFunction)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/155080.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

虾皮泰国选品-如何使用知虾进行市场分析和选品

在电商平台上&#xff0c;选品是一项非常重要的任务。虾皮作为泰国地区最大的电商平台之一&#xff0c;提供了一款名为“知虾”的选品工具&#xff0c;帮助卖家进行市场分析和选品决策。本文将介绍如何使用知虾进行虾皮泰国选品市场分析和选品&#xff0c;以及其中的具体步骤和…

使用jmeter对接口进行简单测试

JMeter是一个开源的性能测试工具&#xff0c;它可以对于Web应用程序、FTP、数据库服务器等各种服务器进行性能测试和负载测试&#xff0c;以确定它们是否能够承受预期的负载。JMeter支持多种协议和技术&#xff0c;如HTTP、HTTPS、FTP、JDBC、LDAP、SOAP、JMS等。它使用Java编写…

一文详解!SRM(供应商管理)助力实现采购端实现降本增效

供应商管理关系到企业各部门的正常运转&#xff0c;一个好的SRM供应商管理系统对于公司来说无疑是锦上添花&#xff0c;改善企业与供应商的关系&#xff0c;可以帮助企业实现采购端的降本增效。但在信息化转型的浪潮下&#xff0c;很多企业SRM信息化却遇到不少问题。 那么请花…

父子进程exec,fork等

linux系统编程之进程&#xff08;五&#xff09;&#xff1a;exec系列函数&#xff08;execl,execlp,execle,execv,execvp)使用-CSDN博客 C中的exec()函数_c exec函数_向阳逐梦的博客-CSDN博客 前言 fork 函数之后&#xff0c;如果想要把子进程换成一个我想要执行的进程&#…

MAX/MSP SDK学习01:Object的基本构成、创建销毁行为函数的定义、属性的赋值、以及相关注意事项

Object的基本构成、创建&销毁&行为函数的定义、属性的赋值、以及相关注意事项。 #include "ext.h" // standard Max include, always required #include "ext_obex.h" // required for new style Max object// object struct&#xff0c;定义属…

G管螺纹尺寸对照表

G管螺纹尺寸对照表 NPT 是 National (American) Pipe Thread 的缩写&#xff0c;属于美国标准的 60 度锥管螺纹&#xff0c;用于北美地区&#xff0e;国家标准可查阅 GB/T12716-1991 PT 是 Pipe Thread 的缩写&#xff0c;是 55 度密封圆锥管螺纹&#xff0c;属惠氏螺纹家族&a…

微服务学习|Gateway网关:网关作用、快速入门、路由断言工厂、路由过滤器配置、全局过滤器、过滤器执行顺序、跨域问题处理

为什么需要网关 网关功能: 1.身份认证和权限校验 2.服务路由、负载均衡 3.请求限流 网关的技术实现 在SpringCloud中网关的实现包括两种:gateway、zuul Zuul是基于Servlet的实现&#xff0c;属于阻塞式编程。而SprinaCloudGateway则是基于Spring5中提供的WebFlux&#xf…

最新PS 2024 虎标正式版来啦,附带AI神经滤镜(支持win/mac)

软件简介 文件名称 PS2024 虎标正式版 支持系统 windows、Mac 获取方式 文章底部 分享形式 百度网盘 小伙伴们&#xff0c;下午好&#xff01;今天给大家的是PS 2024 25.0虎标正式版。 添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; 添加图片注释…

机器学习介绍与分类

随着科学技术的不断发展&#xff0c;机器学习作为人工智能领域的重要分支&#xff0c;正逐渐引起广泛的关注和应用。本文将介绍机器学习的基本概念、原理和分类方法&#xff0c;帮助读者更好地理解和应用机器学习技术。 一、机器学习的基本概念 机器学习是一种通过从数据中学…

了解一下公网IP和域名的区别与联系

​  公网IP和域名是互联网中两个重要的概念&#xff0c;它们在网络通信和网站访问中起着不同的作用。 我们来了解一下公网IP。公网IP是指在全球范围内唯一的IP地址&#xff0c;用于标识互联网上的设备。每个设备连接到互联网时都会被分配一个公网IP地址&#xff0c;这个地址可…

人机交互——自然语言生成

自然语言生成是让计算机自动或半自动地生成自然语言的文本。这个领域涉及到自然语言处理、语言学、计算机科学等多个领域的知识。 1.简介 自然语言生成系统可以分为基于规则的方法和基于统计的方法两大类。基于规则的方法主要依靠专家知识库和语言学规则来生成文本&#xff0…

MySQL之JDBC编程

目录 1. 数据库编程的必备条件 2. Java的数据库编程&#xff1a;JDBC 3. JDBC工作原理 4. JDBC使用 4.1 IDEA配置JDBC 4.2 JDBC开发案例 4.3 JDBC使用步骤总结 5. JDBC常用接口和类 5.1 JDBC API 5.2 数据库连接Connection 5.3 Statement对象 5.4 ResultS…

电脑便签功能在哪里找?电脑桌面便签怎么添加?

很多上班族在使用电脑办公的时候&#xff0c;都需要随手记录工作事项&#xff0c;例如记录共同工作时的想法、会议笔记、常用工作资料、每天待办的工作任务等事项&#xff0c;这时候使用纸质的笔记本来记录工作&#xff0c;不仅不方便随时查看和使用&#xff0c;而且在修改、删…

HarmonyOS ArkTSTabs组件的使用(六)

Tabs组件的使用 ArkUI开发框架提供了一种页签容器组件Tabs&#xff0c;开发者通过Tabs组件可以很容易的实现内容视图的切换。页签容器Tabs的形式多种多样&#xff0c;不同的页面设计页签不一样&#xff0c;可以把页签设置在底部、顶部或者侧边。 Tabs组件的简单使用 Tabs组件…

报错注入 [极客大挑战 2019]HardSQL1

打开题目 输入1或者1"&#xff0c;页面均回显NO,Wrong username password&#xff01;&#xff01;&#xff01; 那我们输入1 试试万能密码 1 or 11 # 输入1 and 12 # 输入1 union select 1,2,3 # 输入1 ununionion seselectlect 1,2,3 # 输入1 # 输入1# 页面依旧回…

mac 和 windows 相互传输文件【共享文件夹】

文章目录 前言创建共享文件夹mac 连接共享文件夹 前言 温馨提示&#xff1a;mac 电脑和 windows 电脑必须处于同一局域网下 本文根据创建共享文件夹的方式实现文件互相传输&#xff0c;所以两台电脑必须处于同一网络 windows 创建共享文件夹&#xff0c;mac 电脑通过 windows…

C++11新特性 变参模板、完美转发和emplace

#include <iostream> #include <vector> #include <deque> #include <list> #include <algorithm> using namespace std;class student { public:student() {cout << "无参构造函数被调用!" << endl;}student(int age, st…

PyQt(学习笔记)

学习资料来源&#xff1a; PyQt快速入门——b站王铭东老师 PyQt官网的所有模块 C具体实现的官方文档 PyQt&#xff08;学习笔记&#xff09; PyCharm环境准备运行第一个程序QPushButtonQLabelQLineEdit调整窗口大小、位置、图标布局信号与槽PyQt引入多线程 PyCharm环境准备 新…

yum 搭建仓库 http/ftp

目录 http ftp http 服务端 1. 下载 httpd 服务&#xff0c;记得将防火墙和安全终端全部关掉 2. 开启 httpd 服务 3. 临时挂载 客户端 1. 下载 httpd 服务&#xff0c;记得将防火墙和安全终端全部关掉 2. 开启 httpd 服务 3. 进入 /etc/yum.repos.d 4. 新建一个目录 mhy&…

新零售数字化系统提供商怎么选择?2023十大收银系统排行榜-亿发

随着零售业务的日益繁荣和电子商务的迅猛发展&#xff0c;零售收银系统已成为各类商家提高效率、管理库存、提供更好服务的不可或缺的工具。然而&#xff0c;在众多的收银系统中&#xff0c;如何选择一款适合自己的&#xff0c;一直是许多商家头疼的问题。今天我们就来盘点一下…