flink学习(8)——窗口函数

增量聚合函数

——指窗口每进入一条数据就计算一次

例如:要计算数字之和,进去一个12 计算结果为20, 再进入一个7 ——结果为27

 reduce

aggregate(aggregateFunction)

package com.bigdata.day04;public class _04_agg函数 {public static final Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L),};public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);// 此时我要获取每个班级的平均成绩// 输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)// IN——Tuple3<String, String, Long>// ACC——Tuple3<String, Integer,Long> 第一个是班级(key)第二个是数量,第三个是总的成绩// OUT —— Tuple2<String,Double> 第一个是班级 第二个是平均成绩dataStreamSource.countWindowAll(3).aggregate(new AggregateFunction<Tuple3<String, String, Long>, Tuple3<String, Integer,Long>, Tuple2<String,Double>>() {// 初始化一个 累加器@Overridepublic Tuple3<String, Integer, Long> createAccumulator() {return Tuple3.of(null,0,0L);}// 累加器和输入的值进行累加// Tuple3<String, String, Long> value 第一个是传入的值// Tuple3<String, Integer, Long> accumulator 第二个是累加器的值@Overridepublic Tuple3<String, Integer, Long> add(Tuple3<String, String, Long> value, Tuple3<String, Integer, Long> accumulator) {return Tuple3.of(value.f0,accumulator.f1+1,accumulator.f2+value.f2);}// 获取结果——在不同节点的结果进行汇总后实现@Overridepublic Tuple2<String, Double> getResult(Tuple3<String, Integer, Long> accumulator) {return Tuple2.of(accumulator.f0, (double) accumulator.f2 / accumulator.f1);}// 由于flink是分布式,所以在别的节点也会进行累加 ,该方法是不同节点的结果进行汇总// 即累加器之间的累加@Overridepublic Tuple3<String, Integer, Long> merge(Tuple3<String, Integer, Long> a, Tuple3<String, Integer, Long> b) {return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

sum()

min()

max()

 全量聚合函数

指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求)

全量聚合函数比较简单,但是会将所有的数据存放在内存中,因此会占用大量的内存空间

apply 

package com.bigdata.day04;public class _05_app函数 {public static final Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L),};public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);//2. source-加载数据dataStreamSource.countWindowAll(3).apply(new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>() {@Overridepublic void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) throws Exception {Long sum = 0L;int length = 0;String key = null;for (Tuple3<String, String, Long> value : values) {sum += value.f2;length++;key = value.f0;}out.collect(Tuple2.of(key,(double) sum/length));}}).print();env.execute();}
}// 总结// 接口
new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>()
GlobalWindow 窗口对象  Tuple3<String,String,Long> 传入的值  Tuple2<String,Double> 结果// 重写的方法
public void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) Iterable<Tuple3<String, String, Long>> values 传入值的迭代器 进行遍历
Collector<Tuple2<String,Double>> out 收集器 调用collect方法收集即可
window 窗口对象//使用窗口对象我们可以拿到窗口的起始时间long start = window.getStart();long end = window.getEnd();

process

使用方式一:在connect合流之后对两个类型不同的流进行处理

使用方式二:在分流的时候使用,可以通过context.output方法对每个数据添加一个标签

 使用方式一
new CoProcessFunction<Long, String, String>()  
// 第一个泛型是第一个流的类型 第二个泛型是第二个流的类型  第三个泛型是合并后流的类型@Overridepublic void processElement1(Long l, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {// Long 是数据类型 结果使用collector中的collect 收集collector.collect(String.valueOf(l));}@Overridepublic void processElement2(String s, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {//  String 是数据类型 结果使用collector中的collect 收集collector.collect(s);}
使用方式二
此时使用的是context中的context.output(odd, element); 方法
odd 是标签
element 是元素OutputTag<Long> odd = new OutputTag<>("奇数",TypeInformation.of(Long.class));
OutputTag<Long> even = new OutputTag<>("偶数", TypeInformation.of(Long.class));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/61964.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uname -m(machine) 命令用于显示当前系统的机器硬件架构(Unix Name)

文章目录 关于 arm64 架构检查是否安装了 Rosetta 2其他相关信息解释&#xff1a;命令功能&#xff1a;示例&#xff1a; dgqdgqdeMac-mini / % uname -m arm64您运行的 uname -m 命令显示您的系统架构是 arm64。这意味着您的 Mac Mini 使用的是 Apple 的 M1 或更新的芯片&…

QUICK调试camera-xml解析

本文主要介绍如何在QUICK QC6490使能相机模组。QC6490的相机基于CameraX的框架&#xff0c;只需通过配置XML文件&#xff0c;设置相机模组的相关参数&#xff0c;就可以点亮相机。本文主要介绍Camera Sensor Module XML和Camera Sensor XML配置的解析&#xff0c;这中间需要cam…

10、PyTorch autograd使用教程

文章目录 1. 相关思考2. 矩阵求导3. 两种方法求jacobian 1. 相关思考 2. 矩阵求导 假设我们有如下向量&#xff1a; y 1 3 x 1 5 [ w T ] 5 3 b 1 3 \begin{equation} y_{1\times3}x_{1\times5}[w^T]_{5\times3}b_{1\times3} \end{equation} y13​x15​[wT]53​b13​​…

AIGC-----AIGC在虚拟现实中的应用前景

AIGC在虚拟现实中的应用前景 引言 随着人工智能生成内容&#xff08;AIGC&#xff09;的快速发展&#xff0c;虚拟现实&#xff08;VR&#xff09;技术的应用也迎来了新的契机。AIGC与VR的结合为创造沉浸式体验带来了全新的可能性&#xff0c;这种组合不仅极大地降低了VR内容的…

Linux lsof

输出当前所有活跃进程打开的所有文件。文件、目录、管道、socket套接字、设备等等。 COMMAND PID TID TASKCMD USER FD TYPE DEVICE SIZE/OFF NODE NAME 其中 COMMAND: 显示命令的名称 PID: 进程ID USER: 用户名…

Java项目实战II基于微信小程序的校运会管理系统(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、核心代码 五、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导 一、前言 在充满活力与激情的校园生活中&#xff0c;校运会不仅是…

计算机网络的类型

目录 按覆盖范围分类 个人区域网&#xff08;PAN&#xff09; 局域网&#xff08;LAN&#xff09; 城域网&#xff08;MAN&#xff09; 4. 广域网&#xff08;WAN&#xff09; 按使用场景和性质分类 公网&#xff08;全球网络&#xff09; 外网 内网&#xff08;私有网…

第R4周:LSTM-火灾温度预测(TensorFlow版)

>- **&#x1f368; 本文为[&#x1f517;365天深度学习训练营]中的学习记录博客** >- **&#x1f356; 原作者&#xff1a;[K同学啊]** 往期文章可查阅&#xff1a; 深度学习总结 任务说明&#xff1a;数据集中提供了火灾温度&#xff08;Tem1&#xff09;、一氧化碳浓度…

手搓人工智能—聚类分析(下)谱系聚类与K-mean聚类

“无论结果如何&#xff0c;至少我们存在过” ——《无人深空》 前言 除了上一篇手搓人工智能-聚类分析&#xff08;上&#xff09;中提到的两种简单聚类方式&#xff0c;还有一些更为常用、更复杂的聚类方式&#xff1a;谱系聚类&#xff0c;K-均值聚类。 谱系聚类 谱系聚类…

文件内容扫描工具

简介 文件扫描助手是一款基于Vite Vue 3 Electron技术栈开发的跨平台桌面应用程序。它提供了强大的文件内容搜索功能&#xff0c;支持Word、Excel、PDF、PPT等常见办公文档格式。用户可以通过关键词快速定位到包含特定内容的文件&#xff0c;极大地提高了文件管理和查找效率…

函数类型注释和Union联合类型注释

函数类型注释格式&#xff08;调用时提示输入参数的类型&#xff09;: )def 函数名(形参名:类型&#xff0c;形参名:类型&#xff09;->函数返回值类型: 函数体 Union联合类型注释&#xff08;可注释多种类型混合的变量&#xff09;格式: #先导入模块 from typing import…

AIGC--AIGC与人机协作:新的创作模式

AIGC与人机协作&#xff1a;新的创作模式 引言 人工智能生成内容&#xff08;AIGC&#xff09;正在以惊人的速度渗透到创作的各个领域。从生成文本、音乐、到图像和视频&#xff0c;AIGC使得创作过程变得更加快捷和高效。然而&#xff0c;AIGC并非完全取代了人类的创作角色&am…

【ue5】UE5运行时下载视频/UE5 runtime download video(MP4)

插件还是老朋友。 节点的content type要打对。 &#xff08;参照表&#xff1a;MIME 类型&#xff08;MIME Type&#xff09;完整对照表 - 免费在线工具&#xff09; 结果展示&#xff1a;

在Hadoop上实现分布式深度学习

在Hadoop上实现分布式深度学习 引言 随着大数据和深度学习的快速发展&#xff0c;分布式深度学习已成为当前研究和应用领域的热点。Hadoop作为一个广泛使用的分布式计算框架&#xff0c;在存储和处理大规模数据集方面表现出色&#xff0c;成为实现分布式深度学习的理想选择。…

protobuf 的windows 安装和执行

windows 下载&#xff1a; protobuf 路径 https://github.com/protocolbuffers/protobuf/releases/download/v3.7.1/protoc-3.7.1-win64.zip protobuf 执行命令&#xff1a;到exe文件下 F:\newIntelljCode\itstack-demo-netty\itstack-demo-netty-2-02\protoc-3.5.0-win32\bin…

C++ STL - stack, queue 讲解

stack stack 是 STL 提供的容器. 实现的数据结构是栈. 构造函数 栈主要是提供特殊的添加删除操作 (后进先出), 所以构造函数比较简单. std::stack<int> s; push / pop 栈最主要的两个函数: push: 向栈顶添加一个元素, 没有返回值 pop: 删除栈顶一个元素, 没有返回值…

STM32F103外部中断配置

一、外部中断 在上一节我们介绍了STM32f103的嵌套向量中断控制器&#xff0c;其中包括中断的使能、失能、中断优先级分组以及中断优先级配置等内容。 1.1 外部中断/事件控制器 在STM32f103支持的60个可屏蔽中断中&#xff0c;有一些比较特殊的中断&#xff1a; 中断编号13 EXTI…

C嘎嘎探索篇:栈与队列的交响:C++中的结构艺术

C嘎嘎探索篇&#xff1a;栈与队列的交响&#xff1a;C中的结构艺术 前言&#xff1a; 小编在之前刚完成了C中栈和队列&#xff08;stack和queue&#xff09;的讲解&#xff0c;忘记的小伙伴可以去我上一篇文章看一眼的&#xff0c;今天小编将会带领大家吹奏栈和队列的交响&am…

Leetcode 290 word Pattern

题意&#xff1a;给定两个字符串p和s,其中一个字符串p表示另一个字符串的pattern。例如&#xff0c;“aaa”, 另一个字符串含有"good good good".求输入的两个字符串是否具有这样的匹配关系 题解&#xff1a;先把字符串s根据空格split存储在vector中方便遍历。我需要…

【c语言】文件操作详解 - 从打开到关闭

文章目录 1. 为什么使用文件&#xff1f;2. 什么是文件&#xff1f;3. 如何标识文件&#xff1f;4. 二进制文件和文本文件&#xff1f;5. 文件的打开和关闭5.1 流和标准流5.1.1 流5.1.2 标准流 5.2 文件指针5.3 文件的打开和关闭 6. 文件的读写顺序6.1 顺序读写函数6.2 对比一组…