聊聊Flink:这次把Flink的window分类(滚动、滑动、会话、全局)、窗口函数讲透

一、窗口

窗口(Window)是处理无界流的关键所在。窗口将流分成有限大小的“桶”,我们可以在其上应用算子计算。Flink可以使用window()和windowAll()定义一个窗口,二者都需要传入一个窗口分配器WindowAssigner,WindowAssigner负责分配事件到相应的窗口。

window()作用于KeyedStream上,即keyBy()之后,这样可以多任务并行计算,对窗口内的多组数据分别进行聚合;windowAll()作用于非KeyedStream上(通常指DataStream),由于所有元素都必须通过相同的算子实例,因此该操作本质上是非并行的,仅在特殊情况下(例如对齐的时间窗口)才可以并行执行。假设要计算24小时内每个用户的订单平均消费额,就需要使用window()定义窗口;如果要计算24小时内的所有订单平均消费额,则需要使用windowAll()定义窗口。

一个Flink窗口程序的大致骨架结构如下:

对KeyedStream应用window()函数进行窗口计算:
在这里插入图片描述

对非KeyedStream应用windowAll()函数进行窗口计算:
在这里插入图片描述
上面方括号([…])中的命令是可选的。也就是说,Flink 允许你自定义多样化的窗口操作来满足你的需求。

首先必须要在定义窗口前确定的是你的 stream 是 keyed 还是 non-keyed。 keyBy(…) 会将你的无界 stream 分割为逻辑上的 keyed stream。 如果 keyBy(…) 没有被调用,你的 stream 就不是 keyed。

对于 keyed stream,其中数据的任何属性都可以作为 key。 使用 keyed stream 允许你的窗口计算由多个 task 并行,因为每个逻辑上的 keyed stream 都可以被单独处理。 属于同一个 key 的元素会被发送到同一个 task。

对于 non-keyed stream,原始的 stream 不会被分割为多个逻辑上的 stream, 所以所有的窗口计算会被同一个 task 完成,也就是 parallelism 为 1。

二、窗口的分类

Flink的窗口可以分为滚动窗口、滑动窗口、会话窗口、全局窗口,且每种窗口又可分别根据事件时间和处理时间进行创建。

2.1 滚动窗口(Tumbling Windows)
滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。 比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建(如下图所示)。
在这里插入图片描述
滚动窗口使用时需要指定窗口大小参数,下面的代码片段展示了如何使用滚动窗口:
在这里插入图片描述
滚动窗口分配器还可以使用可选的偏移(Offset)参数,该参数可用于更改窗口的对齐方式。例如,在没有偏移的情况下,时间窗口会做一个对齐,那么1小时窗口的起止时间可以是[0:00:00.000~0:59:59.999)。如果你想要一个以小时为单位的窗口流,但是窗口需要从每个小时的第15分钟开始,则可以使用偏移量,代码如下:

TumblingEventTimeWindows.of(Time.hours(1),Time.minutes(15))

那么窗口的起止时间将变为[0:15:00.000~1:14:59.999),这样你将得到起始时间在0:15:00,1:15:00,2:15:00的窗口。与此相反,如果你生活在不使用UTC±00:00时间(世界标准时间)的地方,例如中国使用UTC+08:00,中国的当地时间要设置偏移量为Time.hours(-8)。你想要一个一天大小的时间窗口,并且窗口从当地时间的每一个00:00:00开始,可以使用

TumblingEventTimeWindows.of(Time.days(1),Time.hours(-8))

因为UTC+08:00比UTC时间早8小时。

2.2 滑动窗口(Sliding Windows)
与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。

在这里插入图片描述
使用滑动窗口时,需要设置窗口大小和滑动步长两个参数。滑动步长决定了Flink以多大的频率来创建新的窗口,步长较小,窗口的个数会很多。步长小于窗口的大小时,相邻窗口会重叠,一个事件会被分配到多个窗口;步长大于窗口大小时,有些事件可能会丢失。

下面的代码片段展示了如何使用滑动窗口:
在这里插入图片描述
如上一个例子所示,滚动窗口的 assigners 也可以传入可选的 offset 参数。这个参数可以用来对齐窗口。 比如说,不设置 offset 时,长度为一小时、滑动距离为 30 分钟的滑动窗口会与 linux 的 epoch 对齐。 你会得到如 1:00:00.000 - 1:59:59.999, 1:30:00.000 - 2:29:59.999 等。 如果你想改变对齐方式,你可以设置一个 offset。 如果设置了 15 分钟的 offset,你会得到 1:15:00.000 - 2:14:59.999、1:45:00.000 - 2:44:59.999 等。 一个重要的 offset 用例是根据 UTC-0 调整窗口的时差。比如说,在中国你可能会设置 offset 为 Time.hours(-8)。

2.3 会话窗口(Session Windows)
会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

在这里插入图片描述
下面的代码展示了如何使用会话窗口。
在这里插入图片描述
创建动态间隔会话窗口,需要实现SessionWindowTimeGapExtractor接口,并实现其中的extract()方法,可以在extract()方法中加入相应的业务逻辑来动态控制会话间隔。

在这里插入图片描述
2.4 全局窗口(Global Windows)
全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义的 trigger 时有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。

在这里插入图片描述
下面的代码片段展示了如何使用全局窗口:
在这里插入图片描述

三、窗口函数(Window Functions)

事件被窗口分配器分配到窗口后,接下来需要指定想要在每个窗口上执行的计算函数(即窗口函数),以便对窗口内的数据进行处理。Flink提供的窗口函数有ReduceFunction、AggregateFunction、ProcessWindowFunction。

ReduceFunction和AggregateFunction是增量计算函数,都可以基于中间状态对窗口中的元素进行递增聚合。例如,窗口每流入一个新元素,新元素就会与中间数据进行合并,生成新的中间数据,再保存到窗口中。

ProcessWindowFunction是全量计算函数,如果需要依赖窗口中的所有数据或需要获取窗口中的状态数据和窗口元数据(窗口开始时间、窗口结束时间等),就需要使用ProcessWindowFunction。例如对整个窗口数据排序取TopN,使用ProcessWindowFunction就非常灵活。

3.1 ReduceFunction

ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。 Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。

ReduceFunction 可以像下面这样定义:
在这里插入图片描述
上面的例子是对窗口内元组的第二个属性求和。

3.2 AggregateFunction
AggregateFunction是聚合函数的基本接口,也是ReduceFunction的通用版本。与ReduceFunction相同,Flink将在窗口输入元素到达时对其进行增量聚合。

AggregateFunction的泛型具有3种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型指的是输入流中元素的类型。AggregateFunction具有一种将一个输入元素添加到累加器的方法,还具有创建初始累加器,将两个累加器合并为一个累加器以及从累加器提取输出(OUT类型)的方法。

AggregateFunction是一种灵活的聚合函数,具有以下特点:

  • 可以对输入值、中间聚合和结果使用不同的类型,以支持广泛的聚合类型。
  • 支持分布式聚合,不同的中间聚合可以合并在一起,以允许预聚合/最终聚合优化。

AggregateFunction的中间聚合(正在进行的聚合状态)称为累加器。将值添加到累加器,并通过结束累加器状态获得最终的聚合。中间聚合的数据类型可能与最终的聚合结果类型不同,例如求平均值时,需要保存计数和总和作为中间聚合。合并中间聚合(部分聚合)意味着合并累加器。

AggregationFunction本身是无状态的。为了允许单个AggregationFunction实例维护多个聚合(例如每个Key一个聚合),AggregationFunction在新聚合启动时会创建一个新的累加器。此外,聚合函数必须是可序列化的,因为它们在分布式执行期间会在分布式进程之间发送。

AggregationFunction接口的Java定义源码如下:

在这里插入图片描述
例如,计算窗口中每组元素的第二个字段的平均值,定义和使用AggregateFunction的代码片段如下:
在这里插入图片描述

注:
AggregateFunction的merge()方法用于会话窗口。当会话窗口彼此之间的实际间隔比已定义的间隔小时,它们将合并在一起。为了可合并,对会话窗口计算时也需要相应的触发器和窗口函数,例如ReduceFunction,AggregateFunction或ProcessWindowFunction。当需要合并两个会话窗口时,merge()方法会被调用,通过该方法合并两个窗口的结果。对于滚动窗口和滑动窗口不会调用merge()方法。

3.3 .ProcessWindowFunction

ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable, 以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。 ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据。因此,使用ProcessWindowFunction需要注意数据量不应太大,否则会造成内存溢出。

抽象类ProcessWindowFunction的源码如下:

@PublicEvolving
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>extends AbstractRichFunction {private static final long serialVersionUID = 1L;/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param context The context in which the window is being evaluated.* @param elements The elements in the window being evaluated.* @param out A collector for emitting elements.* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;/*** Deletes any state in the {@code Context} when the Window expires (the watermark passes its* {@code maxTimestamp} + {@code allowedLateness}).** @param context The context to which the window is being evaluated* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public void clear(Context context) throws Exception {}/** The context holding window metadata. */public abstract class Context implements java.io.Serializable {/** Returns the window that is being evaluated. */public abstract W window();/** Returns the current processing time. */public abstract long currentProcessingTime();/** Returns the current event-time watermark. */public abstract long currentWatermark();/*** State accessor for per-key and per-window state.** <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up by* implementing {@link ProcessWindowFunction#clear(Context)}.*/public abstract KeyedStateStore windowState();/** State accessor for per-key global state. */public abstract KeyedStateStore globalState();/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/public abstract <X> void output(OutputTag<X> outputTag, X value);}
}

key 参数由 keyBy() 中指定的 KeySelector 选出。 如果是给出 key 在 tuple 中的 index 或用属性名的字符串形式指定 key,这个 key 的类型将总是 Tuple, 并且你需要手动将它转换为正确大小的 tuple 才能提取 key。

ProcessWindowFunction 可以像下面这样定义:

在这里插入图片描述
使用ProcessWindowFunction来处理简单的聚合(例如计算元素数量)是非常低效的。接下来讲解如何将ReduceFunction或AggregateFunction与ProcessWindowFunction结合起来,以便实现增量聚合并通过ProcessWindowFunction获得额外的窗口信息等。

3.4 带增量聚合的ProcessWindowFunction

由于ProcessWindowFunction是全量计算函数,如果既要获得窗口信息又要进行增量聚合,则可以将ProcessWindowFunction与ReduceFunction或AggregateFunction结合使用。

ProcessWindowFunction可以与ReduceFunction或AggregateFunction组合在一起,以便在元素到达窗口时增量地聚合。当窗口关闭时,ProcessWindowFunction将提供聚合的结果。

3.4.1 结合ReduceFunction实现增量聚合
下例展示了如何将 ReduceFunction 与 ProcessWindowFunction 组合,返回窗口中的最小元素和窗口的开始时间。
在这里插入图片描述
3.4.2 结合AggregateFunction实现增量聚合
下例展示了如何将 AggregateFunction 与 ProcessWindowFunction 组合,计算平均值并与窗口对应的 key 一同输出。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/61939.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

友思特新闻 | 友思特荣获广州科技创新创业大赛智能装备行业赛初创组优胜企业!

2024年11月19日&#xff0c;第十三届中国创新创业大赛&#xff08;广东广州赛区&#xff09;暨2024年广州科技创新创业大赛智能装备行业赛颁奖典礼隆重举行。 赛事奖项介绍&#xff1a;广州科技创新创业大赛智能装备行业赛 第十三届“中国创新创业大赛&#xff08;广东广州赛区…

2024强网拟态决赛-eBeepf

漏洞分析与利用 分析后面看情况吧&#xff0c;有时间再写吧&#xff0c;先贴个利用脚本&#xff1a; #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <fcntl.h> #include <…

Kotlin Multiplatform 未来将采用基于 JetBrains Fleet 定制的独立 IDE

近期 Jetbrains 可以说是动作不断&#xff0c;我们刚介绍了 IntelliJ IDEA 2024.3 K2 模式发布了稳定版支持 &#xff0c;而在官方最近刚调整过的 Kotlin Multiplatform Roadmap 优先关键事项里&#xff0c;可以看到其中就包含了「独立的 Kotlin Multiplatform IDE&#xff0c;…

38_转置卷积

转置卷积也被称为&#xff08;Transposed Convolution&#xff09;&#xff0c;也被称为fractionally_strided convolution、deconvolution。 转置卷积不是卷积的逆运算。 转置卷积也是卷积 转置卷积的作用是上采样。 1. 基础概念 转置卷积&#xff08;Transposed Convolution…

【Linux】内核的编译和加载

Linux内核是操作系统的核心&#xff0c;负责管理系统的硬件资源&#xff0c;并为用户空间的应用程序提供必要的服务。内核的编译和加载是操作系统开发和维护的重要环节。本文将详细介绍Linux内核的编译过程以及如何加载内核到系统中。 1. 引言 Linux内核的编译是一个复杂的过…

Qt桌面应用开发 第七天(绘图事件 绘图设备)

目录 1.绘图事件paintEvent 2.高级绘图 3.图片绘制 4.绘图设备 4.1QPixmap 4.2QBitmap 4.3QImage 4.4QPicture 1.绘图事件paintEvent paintEvent——绘图事件 需求&#xff1a;利用QPainter绘制点、线、圆、矩形、文字&#xff1b;设置画笔改为红色&#xff0c;宽度为…

使用IDEA构建springboot项目+整合Mybatis

目录 目录 1.Springboot简介 2.SpringBoot的工作流程 3.SpringBoot框架的搭建和配置 4.用Springboot实现一个基本的select操作 5.SpringBoot项目部署非常简单&#xff0c;springBoot内嵌了 Tomcat、Jetty、Undertow 三种容器&#xff0c;其默认嵌入的容器是 Tomcat&#xff0c;…

【含开题报告+文档+PPT+源码】基于SSM的电影数据挖掘与分析可视化系统设计与实现

开题报告 随着互联网的普及和数字娱乐产业的蓬勃发展&#xff0c;电影作为一种重要的娱乐方式&#xff0c;已经深入人们的日常生活。然而&#xff0c;面对海量的电影资源&#xff0c;用户在选择观影内容时常常感到困惑和无所适从。传统的电影推荐方式&#xff0c;如人工筛选、…

C++使用minio-cpp(minio官方C++ SDK)与minio服务器交互简介

目录 minio简介minio-cpp简介minio-cpp使用 minio简介 minio是一个开源的高性能对象存储解决方案&#xff0c;完全兼容Amazon S3 API&#xff0c;支持分布式存储&#xff0c;适用于大规模数据架构&#xff0c;容易集成&#xff0c;而且可以方便的部署在集群中。 如果你已经部…

【君正T31开发记录】8.了解rtsp协议及设计模式

前边搞定了驱动&#xff0c;先不着急直接上手撸应用层的代码&#xff0c;先了解一下大致要用到的东西。 设计PC端先用vlc rtsp暂时H264编码&#xff08;vlc好像不支持h265,这个后边我试试&#xff09;的视频流&#xff0c;先需要支持上rtsp server&#xff0c;了解rtsp协议是必…

JavaScript中的this指向绑定规则(超全)

JavaScript中的this指向绑定规则&#xff08;超全&#xff09; 1.1 为什么需要this? 为什么需要this? 在常见的编程语言中&#xff0c;几乎都有this这个关键字&#xff08;Objective-C中使用的是self),但是在JavaScript中的this和常见的面向对象语言中的this不太一样 常见面…

Spring注入Map学习

Spring注入Map学习 在Spring中 在策略模式中, 会经常用到 根据Bean名称获取Bean的实例 有2个方法很好用 1. 使用Autowired注入 2. 使用构造方法注入 但是奇怪的一点是: 日志打印并没有看到结果, 第一行的 Autowired的结果 是个null 那是因为 注入时机 的问题 注入时机&…

Redis五大基本类型——Set集合命令详解(命令用法详解+思维导图详解)

目录 一、Set集合类型介绍 二、常见命令 1、SADD 2、SMEMBERS 3、SISMEMBER 4、SCARD 5、SRANDMEMBER 6、SPOP 7、SMOVE 8、SREM ​编辑 9、集合间操作 &#xff08;1&#xff09;SINTER &#xff08;2&#xff09;SINTERSTORE &#xff08;3&#xff09;SUNION…

sql 查询语句:将终端数据形式转换成insert语句

文本转换&#xff1a;sql 查询语句&#xff1a;将终端数据形式转换成insert语句 如上&#xff0c;写过后端的都知道&#xff0c;从生产或其他地方拿到的数据&#xff0c;有可能会是图一&#xff1b;但实际上&#xff0c;我们需要图二的数据&#xff1b; 不废话&#xff0c;直接…

C++数据结构与算法

C数据结构与算法 1.顺序表代码模版 C顺序表模版 #include <iostream> using namespace std; // 可以根据需要灵活变更类型 #define EleType intstruct SeqList {EleType* elements;int size;int capacity; };// Init a SeqList void InitList(SeqList* list, int capa…

【AIGC】大模型面试高频考点-RAG篇

【AIGC】大模型面试高频考点-RAG篇 &#xff08;1&#xff09;RAG的基本原理&#xff08;2&#xff09;RAG有哪些评估方法&#xff1f;&#xff08;3&#xff09;RAG有哪些评估框架&#xff1f;&#xff08;4&#xff09;RAG各模块有哪些优化策略&#xff1f; &#xff08;1&am…

标准操作规程(SOP)制定方法+模板指南

在企业的成功之路上&#xff0c;拥有制定、传播以及管理流程文档与详细步骤指南的能力至关重要。众多组织都将标准操作规程&#xff08;SOP&#xff09;作为指导其工作流程操作的核心文档形式。 但SOP的作用远不止于操作指南&#xff1b;它们更像是高性能车辆中的精密GPS系统。…

硬件工程师零基础入门:一.电子设计安全要点与欧姆定律

硬件工程师零基础入门:一.电子设计安全要点与欧姆定律 第一节 电子设计安全要点第二节 欧姆定律 第一节 电子设计安全要点 电路小白最好先买直流稳压电源&#xff08;将高压转成低压直流电&#xff09;使用&#xff0c;尽量不要使用市电。 1.尽量不要捏住电源两端。 正确做法&a…

ShuffleNet:一种为移动设备设计的极致高效的卷积神经网络

摘要 https://arxiv.org/pdf/1707.01083 我们介绍了一种名为ShuffleNet的计算效率极高的卷积神经网络&#xff08;CNN&#xff09;架构&#xff0c;该架构专为计算能力非常有限的移动设备&#xff08;例如10-150 MFLOPs&#xff09;而设计。新架构利用两种新操作&#xff1a;逐…

学习Zookeeper

Zookeeper有手就行 1. 初识ZooKeeper1.1 安装ZooKeeper1.2 ZooKeeper命令操作1.2.1 Zookeeper数据模型1.2.2 Zookeeper 服务端常用命令1.2.3 Zookeeper客户端常用命令 2. ZooKeeperJavaAPl操作2.1 Curator介绍2.2 CuratorAPI常用操作2.2.0 引入Curator支持2.2.1 建立连接2.2.2 …