Flink窗口分配器WindowAssigner

前言

Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。

WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling windows、 sliding windows、 session windows 和 global windows。除了 global windows ,其它分配器都是基于时间来分发数据的。

当然,你也可以继承 WindowAssigner 抽象类实现自定义的窗口分配逻辑。

WindowAssigner

先看一下 WindowAssigner 抽象类的定义:

@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {private static final long serialVersionUID = 1L;public WindowAssigner() {}public abstract Collection<W> assignWindows(T var1, long var2, WindowAssignerContext var4);public Trigger<T, W> getDefaultTrigger() {return this.getDefaultTrigger(new StreamExecutionEnvironment());}/** @deprecated */@Deprecatedpublic abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment var1);public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig var1);public abstract boolean isEventTime();@PublicEvolvingpublic abstract static class WindowAssignerContext {public WindowAssignerContext() {}public abstract long getCurrentProcessingTime();}
}

四个方法,作用如下:

  • assignWindows 将元素 element 分发到一个或多个窗口,返回值是窗口集合
  • getDefaultTrigger 返回默认的窗口触发器 Trigger
  • getWindowSerializer 返回窗口序列化器(窗口也要在算子间传输)
  • isEventTime 是否基于事件时间语义

Flink 内置的 WindowAssigner 实现类关系图如下:

首先,可以按照基于何种时间语义划分出三大类:

  • 基于事件时间语义
  • 基于处理时间语义
  • 不基于时间语义 --> GlobalWindows

在基于时间语义的大类下面,又可以按照时间窗口算法划分为三个具体实现:

  • 滚动窗口分配算法 tumbling windows
  • 滑动窗口分配算法 sliding windows
  • 会话窗口分配算法 session windows

定义窗口Window

窗口对象被 Flink 统一封装为抽象类org.apache.flink.streaming.api.windowing.windows.Window,Flink 内置了两种实现,分别是:

  • TimeWindow 基于时间范围的窗口,包含开始时间戳和结束时间戳
  • GlobalWindow 全局窗口,与时间无关的窗口

如果内置的这两种窗口无法满足你的需求,你也可以自定义窗口。需要注意的是,窗口本身是要在算子间传输的,所以你在自定义窗口的同时,还必须提供一个窗口序列化器,以便于 Flink 可以将你的窗口对象序列化传输。

如下示例,我们定义了一个基于数字范围的 NumberWindow,可以将一个数字划分到对应的数字范围窗口内。

public class NumberWindow extends Window {private final int min;private final int max;public NumberWindow(int min, int max) {this.min = min;this.max = max;}public int getMin() {return min;}public int getMax() {return max;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;NumberWindow that = (NumberWindow) o;return min == that.min && max == that.max;}@Overridepublic int hashCode() {return Objects.hash(min, max);}@Overridepublic long maxTimestamp() {return Long.MAX_VALUE;}
}

Window 实现还必须配套一个序列化器,主要是实现 两个int变量到窗口对象的转换。

public static class Serializer extends TypeSerializerSingleton<NumberWindow> {@Overridepublic boolean isImmutableType() {return true;}@Overridepublic NumberWindow createInstance() {return new NumberWindow(0, 0);}@Overridepublic NumberWindow copy(NumberWindow numberWindow) {return numberWindow;}@Overridepublic NumberWindow copy(NumberWindow numberWindow, NumberWindow t1) {return numberWindow;}@Overridepublic int getLength() {return 8;}@Overridepublic void serialize(NumberWindow numberWindow, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(numberWindow.getMin());dataOutputView.writeInt(numberWindow.getMax());}@Overridepublic NumberWindow deserialize(DataInputView dataInputView) throws IOException {return new NumberWindow(dataInputView.readInt(), dataInputView.readInt());}@Overridepublic NumberWindow deserialize(NumberWindow numberWindow, DataInputView dataInputView) throws IOException {return this.deserialize(dataInputView);}@Overridepublic void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(dataInputView.readInt());dataOutputView.writeInt(dataInputView.readInt());}@Overridepublic TypeSerializerSnapshot<NumberWindow> snapshotConfiguration() {return new TimeWindowSerializerSnapshot();}public static final class TimeWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<NumberWindow> {public TimeWindowSerializerSnapshot() {super(Serializer::new);}}
}

自定义WindowAssigner

窗口对象定义好了,接下来就是定义窗口分配对象。

简单原则,我们把数字划分为三个窗口,分别是:小数窗口、中位数窗口、大数窗口。
如下示例,继承 WindowAssigner 类,重写 assignWindows 方法,把数字划分到对应的窗口中。

public static class MyWindowAssigner extends WindowAssigner<Integer, NumberWindow> {private final int startingMedian;private final int startingLarge;public MyWindowAssigner(int startingMedian, int startingLarge) {this.startingMedian = startingMedian;this.startingLarge = startingLarge;}@Overridepublic Collection<NumberWindow> assignWindows(Integer element, long timestamp, WindowAssignerContext windowAssignerContext) {// 将数字划分到 小数、中位数、大数 窗口NumberWindow window;if (element < startingMedian) {window = new NumberWindow(Integer.MIN_VALUE, startingMedian - 1);} else if (element < startingLarge) {window = new NumberWindow(startingMedian, startingLarge - 1);} else {window = new NumberWindow(startingLarge, Integer.MAX_VALUE);}return List.of(window);}@Overridepublic Trigger<Integer, NumberWindow> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {return null;}@Overridepublic TypeSerializer<NumberWindow> getWindowSerializer(ExecutionConfig executionConfig) {return new NumberWindow.Serializer();}@Overridepublic boolean isEventTime() {return false;}
}

把流程串起来

窗口对象和窗口分配的逻辑都有了,接下来就是把整个流程给串起来。

如下示例程序,我们定义了一个一秒内生成10个一百以内随机数的数据源Source,然后将这些数字流分为一组,并为其指定我们自定义的 MyWindowAssigner 窗口分配策略,策略中划分了三个窗口,数字小于20的归为小数一档、20到80的归为中位数一档、大于80的归为大数一档,根本数字分配对应的窗口。然后我们自定义了 Trigger,当窗口内积攒的数字达到十个,就触发窗口计算并关闭窗口。最终 ProcessWindowFunction 打印窗口内的数字并求和。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<Integer>() {@Overridepublic void run(SourceContext<Integer> sourceContext) throws Exception {while (true) {Threads.sleep(100);sourceContext.collect(ThreadLocalRandom.current().nextInt(100));}}@Overridepublic void cancel() {}}).keyBy(i -> "all").window(new MyWindowAssigner(20, 80)).trigger(new Trigger<Integer, NumberWindow>() {@Overridepublic TriggerResult onElement(Integer element, long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {ValueState<Integer> countState = triggerContext.getPartitionedState(new ValueStateDescriptor<>("count", Integer.class));Integer count = Optional.ofNullable(countState.value()).orElse(0) + 1;if (count < 10) {countState.update(count);return TriggerResult.CONTINUE;}countState.update(0);return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onProcessingTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic TriggerResult onEventTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic void clear(NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {}}).process(new ProcessWindowFunction<Integer, Object, String, NumberWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Integer, Object, String, NumberWindow>.Context context, Iterable<Integer> iterable, Collector<Object> collector) throws Exception {StringBuilder builder = new StringBuilder("[" + context.window().getMin() + " - " + context.window().getMax() + "] [");int sum = 0;for (Integer value : iterable) {builder.append(value + ",");sum += value;}builder.append("] sum=" + sum);System.err.println(builder.toString());}});environment.execute();
}

运行 Flink 作业,控制台输出:

[20 - 79] [30,32,24,66,63,37,] sum=252
[20 - 79] [71,48,41,55,75,79,] sum=369
[80 - 2147483647] [99,90,88,98,85,99,] sum=559
[20 - 79] [74,30,56,70,36,78,] sum=344

尾巴

Flink 的 WindowAssigner 在数据处理中发挥着关键作用。它决定了如何将源源不断的数据流切分成不同的窗口,以便进行有针对性的聚合、计算和分析。
通过合理配置 WindowAssigner,我们能够根据时间、数量或自定义的逻辑来划分数据,灵活地适应各种业务场景。这使得 Flink 能够对海量的实时数据进行高效且精准的处理,帮助我们从数据中提取有价值的信息和洞察。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/882666.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前缀和和差分算法

文章目录 一维前缀和一维前缀和概念一维前缀和数组的构建 二维前缀和二维前缀和概念二维前缀和数组的构建 一维差分一维差分概念一维差分数组的构建 二维差分二维差分概念二维差分数组的构建 一维前缀和 一维前缀和概念 一维前缀和是一种常用的数据预处理方法&#xff0c;它能…

JS事件和DOM

1. DOM 1.1 基本概念 DOM&#xff0c;全称 Document Object Model&#xff0c;即文档对象模型。它是 Web 上最常用的 API 之一&#xff0c;是加载在浏览器中的文档模型&#xff0c;可以将文档表示为节点树&#xff08;或称 DOM 树&#xff09;&#xff0c;其中每个节点代表文…

Docker system

docker system --help siqialiyun-sh-001:~/images$ sudo docker system --helpUsage: docker system COMMANDManage DockerCommands:df Show docker disk usage(显示docker磁盘使用情况)events Get real time events from the server(从服务器获取实时事件)in…

MySQL新手向:对比常用存储引擎

前言 为什么MySQL拥有的存储引擎有那么多&#xff0c;偏偏最常用的是InnoDB呢&#xff1f;带着这个问题&#xff0c;让我们对比几种常用的存储引擎&#xff0c;理解InnoDB的优势吧。 一、MyISAM存储引擎 1.1、MyISAM介绍 先说说MyISAM存储引擎的特点&#xff1a; 不支持事…

【协议】IIC总线协议学习

一、IIC基本介绍 设计I2C的初衷是减少电视机等复杂电子系统内部的布线数量&#xff0c;同时也降低制造成本。通过使用只有两根线的通信总线&#xff0c;它有效地减少了器件间连接的复杂性。 IIC总线是两线制总线&#xff0c;仅有串行数据线SDA和串行时钟线SCL进行通信。减少…

代码笔记:Linux系统上解压文件

zip unzip filename.zip -d /path/to/directorytar.gz tar -xzvf file.tar.gz -x: 表示提取&#xff08;extract&#xff09;文件&#xff0c;从压缩包中解压内容。-z: 表示使用 gzip 压缩&#xff0c;.tar.gz 文件是经过 gzip 压缩的 tar 包&#xff0c;因此需要这个选项来处…

存储设备专栏 2.5 -- linux 下块设备信息查看命令 lsblk 详细介绍】

> 请阅读【嵌入式及芯片开发学必备专栏】< 文章目录 lsblk 命令命令结构常用参数示例示例 1&#xff1a;基本用法示例 2&#xff1a;显示文件系统信息示例 3&#xff1a;仅列出磁盘示例 4&#xff1a;指定输出格式示例 5&#xff1a;以 JSON 格式输出 Summary lsblk 命令…

算法之二分查找

概述 二分查找算法的应用&#xff0c;包括有序和无序数据&#xff0c;有序数组默认按从小到大排序 在有序数组中找到num /*** 4 二分查找 在有序数组中找到num* 思路&#xff1a;找中值&#xff0c;然后中值元素和目标值比较。如果中值元素比目标值大&#xff0c;则继续在左…

React开发一个WebSocket

export default class SocketService {static instance null;static get Instance() {if (!this.instance) {this.instance new SocketService();}return this.instance;}// 和服务端连接的socket对象ws null;// 存储回调函数callBackMapping {};// 标识是否连接成功connec…

【Python实例】Python读取并绘制tif数据

【Python实例】Python读取并绘制tiff数据 Python实例-以全球不透水面积数据为例数据准备&#xff1a;全球不透水面积数据基于gdal库绘制tif图基于Rasterio库绘制tif图 参考 GeoTIff 是一个标准的.tif 文件或是一个图像文件格式&#xff0c;它包含了一些额外的空间信息&#xff…

Prometheus 抓取 nginx 访问日志的指标

要通过 Prometheus 的 NGINX Exporter 来抓取 NGINX 中的日志信息&#xff0c;例如状态码为 4xx 或 5xx 的日志&#xff0c;需要结合以下几种工具和方法&#xff1a; 1. NGINX Exporter 基础功能 NGINX Exporter 是一个 Prometheus Exporter&#xff0c;用于从 NGINX 的 /sta…

.mkp勒索病毒攻击全攻略:防护、数据恢复与安全建议

导言 随着互联网的发展&#xff0c;勒索病毒的威胁也在不断升级&#xff0c;给个人和企业的数据安全带来了巨大的挑战。其中&#xff0c;[datastorecyberfear.com].mkp、 [tsai.shenmailfence.com].mkp、 [sspdlk00036cock.li].mkp勒索病毒作为一种新型的加密型恶意病毒&#…

Chromium 中HTML5 WebSocket实现分析c++(一)

一、WebSocket前端接口定义&#xff1a; WebSocket 对象提供了用于创建和管理 WebSocket 连接&#xff0c;以及可以通过该连接发送和接收数据的 API。 使用 WebSocket() 构造函数来构造一个 WebSocket。 构造函数 WebSocket(url[, protocols]) 返回一个 WebSocket 对象。 …

如何读书?

如何读书&#xff1f; 方法论、读书、意义、思考背景 对于知识获取一直有一个疑问&#xff0c;那就是如何有效获取知识&#xff1f;这个答案我自己并没有总结出来&#xff0c;其一是本文要谈的读书&#xff0c;其他呢&#xff1f;多了暂未想到&#xff0c;先写读书吧。 恰巧…

System:oneshot类型的service

有的时候,某个进程只在特殊的情况下运行一下即可,不需要一直以服务的形式待命,这种情况下,可以设置service的type为oneshot,然后设置RemainAfterExit=yes。 这样设置的service,即使在其进程启动完成之后退出了,systemd 也仍然会认为这个服务还在运行中,将此服务视为活…

新时代下吉林省城乡流动人才就业问题及路径探析

摘要&#xff1a;新时代背景下&#xff0c;中国经济快速发展&#xff0c;城乡融合发展成为缩小城乡差距&#xff0c;推动共同富裕的重要方式。吉林省作为东北老工业基地&#xff0c;传统产业竞争优势减弱&#xff0c;城乡流动人才就业规模增加&#xff0c;并呈现“农村-城市”的…

prompt learning

prompt learning 对于CLIP&#xff08;如上图所示&#xff09;而言&#xff0c;对其prompt构造的更改就是在zero shot应用到下游任务的时候对其输入的label text进行一定的更改&#xff0c;比如将“A photo of a{obj}”改为“[V1][V2]…[Vn][Class]”这样可学习的V1-Vn的token…

业务开发常见问题-并发工具类

hello&#xff0c;大家好&#xff0c;本讲我们一起聊一下常见的几个并发工具类的使用和坑&#xff01; 在日常工作中&#xff0c;我们经常会遇到多线程并发问题&#xff0c;比如ThreadLocal、锁、ConcurrentHashMap、CopyOnWriteArrayList等。那么如何正常的使用呢&#xff1f;…

【最新通知】2024年Cisco思科认证CCNA详解

CCNA现在涵盖安全性、自动化和可编程性。该计划拥有一项涵盖IT职业基础知识的认证&#xff0c;包括一门考试和一门培训课程&#xff0c;助您做好准备。 CCNA培训课程和考试最近面向最新技术和工作岗位进行了重新调整&#xff0c;为您提供了向任何方向发展事业所需的基础。CCNA认…

blender分离含有多个动作的模型,并导出含有材质的fbx模型

问题背景 笔者是模型小白&#xff0c;需要将网络上下载的fbx模型中的动作&#xff0c;分离成单独的动作模型&#xff0c;经过3天摸爬滚打&#xff0c;先后使用了blender&#xff0c;3d max&#xff0c;unity&#xff0c;最终用blender完成&#xff0c;期间参考了众多网络上大佬…