37、Flink 的窗口函数(Window Functions)详解

窗口函数(Window Functions)
a)概述

定义了 window assigner 之后,需要指定当窗口触发之后,如何计算每个窗口中的数据, 即 window function。

窗口函数有三种:ReduceFunctionAggregateFunctionProcessWindowFunction

  • 前两者执行更高效,因为 Flink 可以在每条数据到达窗口后进行增量聚合(incrementally aggregate);
  • ProcessWindowFunction 会得到能够遍历当前窗口内所有数据的 Iterable,以及关于这个窗口的 meta-information。

使用 ProcessWindowFunction 的窗口转换操作没有其它两种函数高效,因为 Flink 在窗口触发前必须缓存里面的所有数据; ProcessWindowFunction 可以与 ReduceFunctionAggregateFunction 合并来提高效率,既可以增量聚合窗口内的数据,又可以从 ProcessWindowFunction 接收窗口的 metadata。

b)ReduceFunction

ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同

Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。

示例:对窗口内元组的第二个属性求和。

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new ReduceFunction<Tuple2<String, Long>>() {public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {return new Tuple2<>(v1.f0, v1.f1 + v2.f1);}});
c)AggregateFunction

ReduceFunctionAggregateFunction 的特殊情况; AggregateFunction 接收三个参数:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)。

输入数据的类型是输入流的元素类型,AggregateFunction 接口有如下几个方法: 把每一条元素加进累加器、创建初始累加器、合并两个累加器、从累加器中提取输出(OUT 类型)。

ReduceFunction 相同,Flink 会在输入数据到达窗口时直接进行增量聚合。

示例:计算窗口内所有元素第二个属性的平均值。

private static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate());
d)ProcessWindowFunction

ProcessWindowFunction 具备 Iterable 能获取窗口内所有的元素 ,以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活;ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据。

ProcessWindowFunction 的函数签名如下:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param context The context in which the window is being evaluated.* @param elements The elements in the window being evaluated.* @param out A collector for emitting elements.** @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public abstract void process(KEY key,Context context,Iterable<IN> elements,Collector<OUT> out) throws Exception;/*** Deletes any state in the {@code Context} when the Window expires (the watermark passes its* {@code maxTimestamp} + {@code allowedLateness}).** @param context The context to which the window is being evaluated* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public void clear(Context context) throws Exception {}/*** The context holding window metadata.*/public abstract class Context implements java.io.Serializable {/*** Returns the window that is being evaluated.*/public abstract W window();/** Returns the current processing time. */public abstract long currentProcessingTime();/** Returns the current event-time watermark. */public abstract long currentWatermark();/*** State accessor for per-key and per-window state.** <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up* by implementing {@link ProcessWindowFunction#clear(Context)}.*/public abstract KeyedStateStore windowState();/*** State accessor for per-key global state.*/public abstract KeyedStateStore globalState();}}

key 参数由 keyBy() 中指定的 KeySelector 选出;如果是给出 key 在 tuple 中的 index 或用属性名的字符串形式指定 key,这个 key 的类型将总是 Tuple, 并且需要手动将它转换为正确大小的 tuple 才能提取 key。

示例:使用 ProcessWindowFunction 对窗口中的元素计数,并且将窗口本身的信息一同输出。

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.minutes(5))).process(new MyProcessWindowFunction());public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {@Overridepublic void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {long count = 0;for (Tuple2<String, Long> in: input) {count++;}out.collect("Window: " + context.window() + "count: " + count);}
}
e)增量聚合的 ProcessWindowFunction

ProcessWindowFunction 可以与 ReduceFunctionAggregateFunction 搭配使用, 使其能够在数据到达窗口的时候进行增量聚合,当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果;即实现了增量聚合窗口的元素并且从 ProcessWindowFunction 中获得窗口的元数据。

使用 ReduceFunction 增量聚合

示例:将 ReduceFunctionProcessWindowFunction 组合,返回窗口中的最小元素和窗口的开始时间。

DataStream<SensorReading> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new MyReduceFunction(), new MyProcessWindowFunction());// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {public SensorReading reduce(SensorReading r1, SensorReading r2) {return r1.value() > r2.value() ? r2 : r1;}
}private static class MyProcessWindowFunctionextends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {public void process(String key,Context context,Iterable<SensorReading> minReadings,Collector<Tuple2<Long, SensorReading>> out) {SensorReading min = minReadings.iterator().next();out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));}
}

使用 AggregateFunction 增量聚合

示例:将 AggregateFunction 与 ProcessWindowFunction 组合,计算平均值并与窗口对应的 key 一同输出。

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate(), new MyProcessWindowFunction());// Function definitionsprivate static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}private static class MyProcessWindowFunctionextends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {public void process(String key,Context context,Iterable<Double> averages,Collector<Tuple2<String, Double>> out) {Double average = averages.iterator().next();out.collect(new Tuple2<>(key, average));}
}
f)在 ProcessWindowFunction 中使用 per-window state

除了访问 keyed state,ProcessWindowFunction 还可以使用作用域仅为“当前正在处理的窗口”的 keyed state

per-window 中的 window 对应某个 key 的窗口实例:比如 以 user-id xyz 为 key,从 12:00 到 13:00 的时间窗口,具体情况取决于窗口的定义,根据具体的 key 和时间段会产生诸多不同的窗口实例。

Per-window state 如果处理有 1000 种不同 key 的事件,并且目前所有事件都处于 [12:00, 13:00) 时间窗口内,那么将会得到 1000 个窗口实例, 且每个实例都有自己的 keyed per-window state。

process() 接收到的 Context 对象中有两个方法允许访问以下两种 state:

  • globalState(),访问全局的 keyed state
  • windowState(), 访问作用域仅限于当前窗口的 keyed state

如果可能将一个 window 触发多次(比如当迟到数据会再次触发窗口计算, 或自定义了根据推测提前触发窗口的 trigger),那么这个功能将非常有用,这时可能需要在 per-window state 中储存关于之前触发的信息或触发的总次数。

当使用窗口状态时,一定记得在删除窗口时清除这些状态,应该定义在 clear() 方法中

WindowFunction(已过时)

在某些可以使用 ProcessWindowFunction 的地方,也可以使用 WindowFunction;它是旧版的 ProcessWindowFunction,只能提供更少的环境信息且缺少一些高级的功能,比如 per-window state。

WindowFunction 的函数签名如下:

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param window The window that is being evaluated.* @param input The elements in the window being evaluated.* @param out A collector for emitting elements.** @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

可以像下例这样使用:

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/17075.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式学习记录5.27(c++基础1)

目录 一.C和C的区别 二.输入输出流类 2.1输出cout 2.2输入cin 三.命名空间 2.1使用命名空间中的标识符 2.2命名空间中声明函数 2.3命名冲突问题 2.4匿名空间 2.5命名空间添加&#xff0c;嵌套&#xff0c;重命名 四.字符串的使用 4.1string类 4.2C风格和C风格字符串…

LeetCode27.移除元素

题目链接&#xff1a; 27. 移除元素 - 力扣&#xff08;LeetCode&#xff09; 思路分析&#xff1a;同样属于经典的双指针移动问题&#xff0c;要掌握固定的思路即可。 算法分析&#xff1a;这个题目可以这样处理&#xff0c;我们把所有非val 的元素都向前移动&#xff0c;把…

Java面试八股之线程池是怎么实现的

线程池是怎么实现的 线程池是一种基于池化技术的线程管理方式&#xff0c;通过预先创建一定数量的线程并保持在池中待命&#xff0c;从而在有任务来临时能够快速分配线程处理任务&#xff0c;而无需频繁创建和销毁线程&#xff0c;以此达到提升系统性能、减少资源消耗的目的。…

推荐《从零开始大模型开发与微调》

大模型是深度学习是当前AI和NLP研究与产业中最重要的方向之一。 本书用PyTorch 2.0作为学习大模型的基本框架&#xff0c;以ChatGLM为例详细讲解大模型的基本理论、算法、程序实现、应用实战以及微调技术&#xff0c;为读者揭示大模型开发技术。 《从零开始大模型开发与微调&…

两个数组的交集-力扣

想到的解法是使用两个哈希表&#xff0c;s1用来统计nums1中出现过的数字&#xff0c;然后遍历nums2数组&#xff0c;当能够在s1中查找到nums2的元素时&#xff0c;将这个元素添加到s2中&#xff0c;最后遍历s2&#xff0c;将其中的元素添加到返回数组中。 但最开始写时&#xf…

外星人存在与否......----小话外星人(1)

前一段时间&#xff0c;看了好多关于UFO、外星人、宇宙、远古外星人的视频和电子书&#xff0c;最后发现&#xff0c;这样的东西还是不要看多为好&#xff0c;搞得好像这些是真的似的&#xff0c;有时睡觉会被意外惊醒&#xff0c;想多了...... 1、外星人存在吗 不管有多少UFO的…

Windows10映射网络驱动器之后不显示映射盘

目录 背景解决步骤1、按 Windows R 打开运行2、打开注册表编辑器3、 System上新建-- DWORD(32bit)4、对新建的文件重命名5、将EnableLinkedConnections的数值改为16、退出注册表编辑器&#xff0c;重启系统。 知识扩展断开连接备份注册表 背景 目前有一台NAS服务器,和一台lin…

Vuex 页面刷新数据丢失怎么解决

当Vuex中的数据在页面刷新后丢失时&#xff0c;这通常是因为Vuex的状态数据是保存在运行内存中的&#xff0c;页面刷新会导致Vue实例重新加载&#xff0c;进而Vuex中的数据被重置为初始状态。为了解决这个问题&#xff0c;可以采取以下几种方法&#xff1a; 1. 使用浏览器的本…

工厂模式的三种实现方式

文章目录 1.引出工厂模式具体需求 2.传统模式1.类图2.目录结构3.pizzastore 用于设计pizza1.Pizza.java 抽象的Pizza类型2.CheesePizaa.java CheesePizaa3.GreekPizza.java GreekPizza 4.order 用于订购和制作pizza1.OrderPizza.java 制作pizza2.PizzaStore.java 订购pizza 5.优…

【Redis】 关于列表类型

文章目录 &#x1f343;前言&#x1f340;常见操作命令介绍&#x1f6a9;lpush&#x1f6a9;lpushx&#x1f6a9;rpush&#x1f6a9;rpushx&#x1f6a9;lrange&#x1f6a9;lpop&#x1f6a9;rpop&#x1f6a9;lindex&#x1f6a9;linsert&#x1f6a9;llen&#x1f6a9;lrem&…

“按摩”科技?

都说A股股民是特别善于学习的&#xff0c;这不市场又现新概念——“按摩科技”&#xff0c;成立仅6年&#xff0c;把上门按摩干到35亿营收也是没谁了&#xff0c;现在号称有1000万用户&#xff0c;3万家入驻商户数的按摩平台&#xff0c;难道就凭借2.5万名女技师&#xff0c;活…

【Django】中间件实现钩子函数预处理和后处理,局部装饰视图函数

在app文件夹里新建middleware.py继承MiddlewareMixin&#xff0c; 编写中间件类&#xff0c;重写process_request、process_response钩子函数 from django.http import HttpRequest, HttpResponse from django.utils.decorators import decorator_from_middleware from django…

关于pytest中用例名称使用中文乱码的解决

场景&#xff1a;使用pytest.mark.parametrize装饰器为用例自定义名称时&#xff0c;运行显示乱码。如下图所示&#xff1a; 解决方案&#xff1a; 1.在根目录 pytest.ini中增加一行代码 [pytest] disable_test_id_escaping_and_forfeit_all_rights_to_community_supportTrue…

NAT 网络转换

NAT(Network Address Translation) 网络地址转换 0x01 NAT 简介 为什么要使用 NAT IPv4 网络地址紧缺&#xff0c;从而出现了私有网段&#xff0c;来补充地址&#xff0c;但私有网段不课访问 internet 所以出现了 NAT 地址转换&#xff0c;将私有地址&#xff0c;转换为公网 I…

一口气看完es(上)

此系列博客分为上中下3篇&#xff1a;上篇是关于es的概念和对数据的增删改操作&#xff0c;中篇是对数据的查询、对搜索结果进行处理操作&#xff0c;下篇是介绍怎么在Java代码中调用和操作es。 基本概念 1、es是什么&#xff1f;有什么作用&#xff1f; es全名是elasticsea…

关于0成本部署个人博客

分享一个文章关于零成本搭建个人博客 参考&#xff1a;‘关于部署博客hexoshokagithub的流程以及问题’ - 关于博客部署 | XiaoYang Guo Welcome to Guo Xiaoyangs personal blog 欢迎来到郭晓阳的个人博客 (1330303.github.io) 这个博主讲的流程很全&#xff0c;而且回答也…

智慧管廊巡检运维解决方案

一、智慧管廊巡检行业目前存在的挑战和难题 智慧管廊巡检行业面临着运行环境的客观影响&#xff0c;如地面施工、液体渗漏、通风不佳、内部空间受限等问题。而管廊巡检机器人系统的出现却具有重大意义。它能够有力地保障管廊安全且可靠地运行&#xff0c;在面对火情、灾情等紧…

springboot基础篇(快速入门+要点总结)

目录 一、SpringBoot简介 二、创建SpringBoot&#xff08;通过Idea脚手架搭建项目&#xff09; 三、properties配置文件 properties 配置文件说明 ①. properties 基本语法 ②. 读取配置⽂件 ③. properties 缺点 2. yml 配置⽂件说明 ①. yml 基本语法 ②. yml 使用进…

上海AI lab发布MathBench,GPT-4o的数学能力有多强?

大模型数学能力哪家强&#xff1f; 最近&#xff0c;上海AI lab构建了一个全面的多语言数学基准——MathBench。与现有的基准不同的是&#xff0c;MathBench涵盖从小学、初中、高中、大学不同难度&#xff0c;从基础算术题到高阶微积分、统计学、概率论等丰富类别的数学题目&a…

React项目知识积累(五)

1.dispatch、dev派发 src/models/formStatus.js: const FromStatusModel {namespace: "fromStatus",state: {isDisable: false,},reducers: {saveIsDisable(state, { payload }) {return {...state,...payload,};},}, };export default FromStatusModel; 改变和提…