Flink窗口分配器WindowAssigner

前言

Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。

WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling windows、 sliding windows、 session windows 和 global windows。除了 global windows ,其它分配器都是基于时间来分发数据的。

当然,你也可以继承 WindowAssigner 抽象类实现自定义的窗口分配逻辑。

WindowAssigner

先看一下 WindowAssigner 抽象类的定义:

@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {private static final long serialVersionUID = 1L;public WindowAssigner() {}public abstract Collection<W> assignWindows(T var1, long var2, WindowAssignerContext var4);public Trigger<T, W> getDefaultTrigger() {return this.getDefaultTrigger(new StreamExecutionEnvironment());}/** @deprecated */@Deprecatedpublic abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment var1);public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig var1);public abstract boolean isEventTime();@PublicEvolvingpublic abstract static class WindowAssignerContext {public WindowAssignerContext() {}public abstract long getCurrentProcessingTime();}
}

四个方法,作用如下:

  • assignWindows 将元素 element 分发到一个或多个窗口,返回值是窗口集合
  • getDefaultTrigger 返回默认的窗口触发器 Trigger
  • getWindowSerializer 返回窗口序列化器(窗口也要在算子间传输)
  • isEventTime 是否基于事件时间语义

Flink 内置的 WindowAssigner 实现类关系图如下:

首先,可以按照基于何种时间语义划分出三大类:

  • 基于事件时间语义
  • 基于处理时间语义
  • 不基于时间语义 --> GlobalWindows

在基于时间语义的大类下面,又可以按照时间窗口算法划分为三个具体实现:

  • 滚动窗口分配算法 tumbling windows
  • 滑动窗口分配算法 sliding windows
  • 会话窗口分配算法 session windows

定义窗口Window

窗口对象被 Flink 统一封装为抽象类org.apache.flink.streaming.api.windowing.windows.Window,Flink 内置了两种实现,分别是:

  • TimeWindow 基于时间范围的窗口,包含开始时间戳和结束时间戳
  • GlobalWindow 全局窗口,与时间无关的窗口

如果内置的这两种窗口无法满足你的需求,你也可以自定义窗口。需要注意的是,窗口本身是要在算子间传输的,所以你在自定义窗口的同时,还必须提供一个窗口序列化器,以便于 Flink 可以将你的窗口对象序列化传输。

如下示例,我们定义了一个基于数字范围的 NumberWindow,可以将一个数字划分到对应的数字范围窗口内。

public class NumberWindow extends Window {private final int min;private final int max;public NumberWindow(int min, int max) {this.min = min;this.max = max;}public int getMin() {return min;}public int getMax() {return max;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;NumberWindow that = (NumberWindow) o;return min == that.min && max == that.max;}@Overridepublic int hashCode() {return Objects.hash(min, max);}@Overridepublic long maxTimestamp() {return Long.MAX_VALUE;}
}

Window 实现还必须配套一个序列化器,主要是实现 两个int变量到窗口对象的转换。

public static class Serializer extends TypeSerializerSingleton<NumberWindow> {@Overridepublic boolean isImmutableType() {return true;}@Overridepublic NumberWindow createInstance() {return new NumberWindow(0, 0);}@Overridepublic NumberWindow copy(NumberWindow numberWindow) {return numberWindow;}@Overridepublic NumberWindow copy(NumberWindow numberWindow, NumberWindow t1) {return numberWindow;}@Overridepublic int getLength() {return 8;}@Overridepublic void serialize(NumberWindow numberWindow, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(numberWindow.getMin());dataOutputView.writeInt(numberWindow.getMax());}@Overridepublic NumberWindow deserialize(DataInputView dataInputView) throws IOException {return new NumberWindow(dataInputView.readInt(), dataInputView.readInt());}@Overridepublic NumberWindow deserialize(NumberWindow numberWindow, DataInputView dataInputView) throws IOException {return this.deserialize(dataInputView);}@Overridepublic void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(dataInputView.readInt());dataOutputView.writeInt(dataInputView.readInt());}@Overridepublic TypeSerializerSnapshot<NumberWindow> snapshotConfiguration() {return new TimeWindowSerializerSnapshot();}public static final class TimeWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<NumberWindow> {public TimeWindowSerializerSnapshot() {super(Serializer::new);}}
}

自定义WindowAssigner

窗口对象定义好了,接下来就是定义窗口分配对象。

简单原则,我们把数字划分为三个窗口,分别是:小数窗口、中位数窗口、大数窗口。
如下示例,继承 WindowAssigner 类,重写 assignWindows 方法,把数字划分到对应的窗口中。

public static class MyWindowAssigner extends WindowAssigner<Integer, NumberWindow> {private final int startingMedian;private final int startingLarge;public MyWindowAssigner(int startingMedian, int startingLarge) {this.startingMedian = startingMedian;this.startingLarge = startingLarge;}@Overridepublic Collection<NumberWindow> assignWindows(Integer element, long timestamp, WindowAssignerContext windowAssignerContext) {// 将数字划分到 小数、中位数、大数 窗口NumberWindow window;if (element < startingMedian) {window = new NumberWindow(Integer.MIN_VALUE, startingMedian - 1);} else if (element < startingLarge) {window = new NumberWindow(startingMedian, startingLarge - 1);} else {window = new NumberWindow(startingLarge, Integer.MAX_VALUE);}return List.of(window);}@Overridepublic Trigger<Integer, NumberWindow> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {return null;}@Overridepublic TypeSerializer<NumberWindow> getWindowSerializer(ExecutionConfig executionConfig) {return new NumberWindow.Serializer();}@Overridepublic boolean isEventTime() {return false;}
}

把流程串起来

窗口对象和窗口分配的逻辑都有了,接下来就是把整个流程给串起来。

如下示例程序,我们定义了一个一秒内生成10个一百以内随机数的数据源Source,然后将这些数字流分为一组,并为其指定我们自定义的 MyWindowAssigner 窗口分配策略,策略中划分了三个窗口,数字小于20的归为小数一档、20到80的归为中位数一档、大于80的归为大数一档,根本数字分配对应的窗口。然后我们自定义了 Trigger,当窗口内积攒的数字达到十个,就触发窗口计算并关闭窗口。最终 ProcessWindowFunction 打印窗口内的数字并求和。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<Integer>() {@Overridepublic void run(SourceContext<Integer> sourceContext) throws Exception {while (true) {Threads.sleep(100);sourceContext.collect(ThreadLocalRandom.current().nextInt(100));}}@Overridepublic void cancel() {}}).keyBy(i -> "all").window(new MyWindowAssigner(20, 80)).trigger(new Trigger<Integer, NumberWindow>() {@Overridepublic TriggerResult onElement(Integer element, long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {ValueState<Integer> countState = triggerContext.getPartitionedState(new ValueStateDescriptor<>("count", Integer.class));Integer count = Optional.ofNullable(countState.value()).orElse(0) + 1;if (count < 10) {countState.update(count);return TriggerResult.CONTINUE;}countState.update(0);return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onProcessingTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic TriggerResult onEventTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic void clear(NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {}}).process(new ProcessWindowFunction<Integer, Object, String, NumberWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Integer, Object, String, NumberWindow>.Context context, Iterable<Integer> iterable, Collector<Object> collector) throws Exception {StringBuilder builder = new StringBuilder("[" + context.window().getMin() + " - " + context.window().getMax() + "] [");int sum = 0;for (Integer value : iterable) {builder.append(value + ",");sum += value;}builder.append("] sum=" + sum);System.err.println(builder.toString());}});environment.execute();
}

运行 Flink 作业,控制台输出:

[20 - 79] [30,32,24,66,63,37,] sum=252
[20 - 79] [71,48,41,55,75,79,] sum=369
[80 - 2147483647] [99,90,88,98,85,99,] sum=559
[20 - 79] [74,30,56,70,36,78,] sum=344

尾巴

Flink 的 WindowAssigner 在数据处理中发挥着关键作用。它决定了如何将源源不断的数据流切分成不同的窗口,以便进行有针对性的聚合、计算和分析。
通过合理配置 WindowAssigner,我们能够根据时间、数量或自定义的逻辑来划分数据,灵活地适应各种业务场景。这使得 Flink 能够对海量的实时数据进行高效且精准的处理,帮助我们从数据中提取有价值的信息和洞察。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/882666.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前缀和和差分算法

文章目录 一维前缀和一维前缀和概念一维前缀和数组的构建 二维前缀和二维前缀和概念二维前缀和数组的构建 一维差分一维差分概念一维差分数组的构建 二维差分二维差分概念二维差分数组的构建 一维前缀和 一维前缀和概念 一维前缀和是一种常用的数据预处理方法&#xff0c;它能…

JS事件和DOM

1. DOM 1.1 基本概念 DOM&#xff0c;全称 Document Object Model&#xff0c;即文档对象模型。它是 Web 上最常用的 API 之一&#xff0c;是加载在浏览器中的文档模型&#xff0c;可以将文档表示为节点树&#xff08;或称 DOM 树&#xff09;&#xff0c;其中每个节点代表文…

【协议】IIC总线协议学习

一、IIC基本介绍 设计I2C的初衷是减少电视机等复杂电子系统内部的布线数量&#xff0c;同时也降低制造成本。通过使用只有两根线的通信总线&#xff0c;它有效地减少了器件间连接的复杂性。 IIC总线是两线制总线&#xff0c;仅有串行数据线SDA和串行时钟线SCL进行通信。减少…

【Python实例】Python读取并绘制tif数据

【Python实例】Python读取并绘制tiff数据 Python实例-以全球不透水面积数据为例数据准备&#xff1a;全球不透水面积数据基于gdal库绘制tif图基于Rasterio库绘制tif图 参考 GeoTIff 是一个标准的.tif 文件或是一个图像文件格式&#xff0c;它包含了一些额外的空间信息&#xff…

prompt learning

prompt learning 对于CLIP&#xff08;如上图所示&#xff09;而言&#xff0c;对其prompt构造的更改就是在zero shot应用到下游任务的时候对其输入的label text进行一定的更改&#xff0c;比如将“A photo of a{obj}”改为“[V1][V2]…[Vn][Class]”这样可学习的V1-Vn的token…

业务开发常见问题-并发工具类

hello&#xff0c;大家好&#xff0c;本讲我们一起聊一下常见的几个并发工具类的使用和坑&#xff01; 在日常工作中&#xff0c;我们经常会遇到多线程并发问题&#xff0c;比如ThreadLocal、锁、ConcurrentHashMap、CopyOnWriteArrayList等。那么如何正常的使用呢&#xff1f;…

【最新通知】2024年Cisco思科认证CCNA详解

CCNA现在涵盖安全性、自动化和可编程性。该计划拥有一项涵盖IT职业基础知识的认证&#xff0c;包括一门考试和一门培训课程&#xff0c;助您做好准备。 CCNA培训课程和考试最近面向最新技术和工作岗位进行了重新调整&#xff0c;为您提供了向任何方向发展事业所需的基础。CCNA认…

blender分离含有多个动作的模型,并导出含有材质的fbx模型

问题背景 笔者是模型小白&#xff0c;需要将网络上下载的fbx模型中的动作&#xff0c;分离成单独的动作模型&#xff0c;经过3天摸爬滚打&#xff0c;先后使用了blender&#xff0c;3d max&#xff0c;unity&#xff0c;最终用blender完成&#xff0c;期间参考了众多网络上大佬…

【Ansiable】ansible的模块和主机清单

目录 一、介绍一些运维自动化工具 二、Ansible 概述/简介 三、Ansible 工作机制 3.1 内部工作机制 3.2 外部工作机制 四、Ansible 执行流程 五、Ansblie 安装以及日常操作模块***** 5.1 ansible 环境安装部署 5.2 ansible 命令行模块 5.2.1 command 模块 5.2.2 shel…

明源云ERP报表服务GetErpConfig.aspx接口存在敏感信息泄露

一、漏洞简介 在访问 /service/Mysoft.Report.Web.Service.Base/GetErpConfig.aspx?erpKeyerp60 路径时&#xff0c;返回了包含敏感信息的响应。这些信息包括但不限于数据库连接字符串、用户名、密码、加密密钥等。这些敏感信息的暴露可能导致以下风险&#xff1a;数据库访问…

【IPv6】IPv6 NAT66介绍

参考链接 IPv6-to-IPv6 Network Address Translation (NAT66) (ietf.org)https://datatracker.ietf.org/doc/id/draft-mrw-nat66-00.html IPv6 NAT66 NAT66&#xff0c;全称为Network Address Translation for IPv6 to IPv6&#xff0c;是一种用于IPv6网络的地址转换技术。在…

Tkinter -- python GUI学习与使用

前言 python GUI 目前pythonGUI有很多&#xff0c;哪一个最好&#xff1f; 先说说我选择的思路&#xff0c;我的目的是开发一个易用的软件&#xff0c;最重要的是稳定&#xff0c;并且碰到问题能够解决&#xff0c;因此&#xff0c;我的目标很明确&#xff0c;有比较大的用户群…

基于Python的自然语言处理系列(39):Huggingface中的解码策略

在自然语言生成任务中&#xff0c;如何选择下一步的单词或者词语对生成的文本质量影响巨大。Huggingface 提供了多种解码策略&#xff0c;可以在不同的场景下平衡流畅度、创造力以及生成效率。在这篇文章中&#xff0c;我们将逐步介绍 Huggingface 中的几种常见解码策略&#x…

web API基础

作用和分类 作用: 就是使用 JS 去操作 html 和浏览器 分类&#xff1a; DOM (文档对象模型)、 BOM &#xff08;浏览器对象模型&#xff09; 什么是DOM DOM (Document Object Model) 译为文档对象模型&#xff0c;是 HTML 和 XML 文档的编程接口。 HTML DOM 定义了访问和操作 …

mingw64的Windows安装及配置教程gcc、g++等

mingw64.rar 链接&#xff1a;https://pan.baidu.com/s/18YrDRyi5NHtqnTwhJG6PuA 提取码&#xff1a;pbli &#xff08;免费永久有效&#xff0c;免安装&#xff0c;解压后配置环境变量即可使用&#xff09; 1 下载 解压后随便放到一个地方&#xff1a; 复制“bin”路径&am…

重磅:中国首个SVG技术标准发布,计育韬老师主笔起草

编辑搜图 中华人民共和国《融媒体 SVG 交互设计技术规范》是由复旦大学奇点新媒体研究中心等单位牵头&#xff0c;学科带头人计育韬等人主要起草&#xff0c;并于 2024 年 8 月起面向全社会行业从业者发布的最高技术标准。该标准前身为 2016 年计育韬与微信团队合作拟定的《SV…

置分辨率设置多显示器的时候提示, 某些设置由系统管理员进行管理

遇到的问题 设置分辨率设置多显示器的时候提示&#xff08;如下图所示&#xff09;&#xff1a; 某些设置由系统管理员进行管理 解决方法 先试试这个方法&#xff1a; https://answers.microsoft.com/zh-hans/windows/forum/all/%E6%9B%B4%E6%94%B9%E5%88%86%E8%BE%A8%E7%8…

强大的Python必备库,你知道几个?建议收藏!

在Python的世界里&#xff0c;库的丰富性让开发者的工作变得轻松而高效。那么&#xff0c;你知道哪些强大的Python必备库吗&#xff1f; 面对众多的Python库&#xff0c;如何选择适合自己的工具来提升开发效率和代码质量&#xff1f;&#xff0c;丰富多样的库如同一个个强大的…

AnaTraf | 提升网络性能:深入解析网络关键指标监控、TCP重传与TCP握手时间

AnaTraf 网络性能监控系统NPM | 全流量回溯分析 | 网络故障排除工具 在当今的数字化时代&#xff0c;网络的稳定性和性能对企业的运营效率至关重要。无论是内部通信、应用程序的运行&#xff0c;还是对外提供服务&#xff0c;网络都发挥着关键作用。对于网络工程师或IT运维人员…

EasyX图形库的安装

前言 EasyX是一个图形库&#xff0c;可以用来做一些c/c小游戏&#xff0c;帮助学习。 一、进入EasyX官网 https://easyx.cn/ 二、点击下载EasyX 三、下载好后以管理员身份运行它 四、点击下一步 五、然后它会自动检测你的编辑器&#xff0c;用哪个就在哪个点安装 六、安装成功…