Flink Window中典型的增量聚合(ReduceFunction / AggregateFunction)

一、什么是增量聚合函数

在Flink Window中定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,其实还完全没有头绪,这也就是窗口函数所需要做的事情。所以在窗口分配器之后,我们还要再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。
窗口可以将数据收集起来,最基本的处理操作当然就是基于窗口内的数据进行聚合。
我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。
典型的增量聚合函数有两个:ReduceFunction 和 AggregateFunction。
在这里插入图片描述

二、ReduceFunction

源码解析

@FunctionalInterface
@Public
public interface ReduceFunction<T> extends Function, Serializable {T reduce(T var1, T var2) throws Exception;
}

实际案例
在Flink中,使用socket模拟实时的数据流DataStream,通过定义一个滚动窗口,窗口的大小为10s,按照id分区,使用reduce聚合函数实现value的累加统计

package com.flink.DataStream.WindowFunctions;import com.flink.POJOs.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class FlinkWindowReduceFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);DataStreamSource<String> streamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);// 注意这里为什么返回的是KeyedStream(建控流/分区流),而不是DataStreamKeyedStream<WaterSensor, String> keyedStream = streamSource// 使用map函数将输入的string转为一个WaterSensor类.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String s) throws Exception {// 这里写的很详细,如何把string转为的WaterSensor类String[] strings = s.split(",");String id = strings[0];Long ts = Long.valueOf(strings[1]);Integer vc = Integer.valueOf(strings[2]);WaterSensor waterSensor = new WaterSensor();waterSensor.setId(id);waterSensor.setTs(ts);waterSensor.setVc(vc);return waterSensor;//return new WaterSensor(strings[0],Long.valueOf(strings[1]),Integer.valueOf(strings[2])}})// 按照id做keyBy分区(提问:KeyBy是如何实现分区的?).keyBy(new KeySelector<WaterSensor, String>() {// 也可以直接使用lamda表达式更简单@Overridepublic String getKey(WaterSensor waterSensor) throws Exception {// getId()方法就是return的waterSensor.idreturn waterSensor.getId();}});/*** 窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(WindowFunctions)* .window()方法需要传入一个窗口分配器,它指明了窗口的类型* */SingleOutputStreamOperator<WaterSensor> outputStreamOperator = keyedStream// 设置滚动窗口的大小(10秒).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))// 使用匿名函数实现增量聚合函数ReduceFunction.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor waterSensor1, WaterSensor waterSensor2) throws Exception {System.out.println("调用reduce方法,之前的结果:" + waterSensor1 + ",现在来的数据:" + waterSensor2);return new WaterSensor(waterSensor1.getId(), System.currentTimeMillis(), waterSensor1.getVc() + waterSensor2.getVc());}});outputStreamOperator.print();streamExecutionEnvironment.execute();}
}

启动Flink程序,启动nc,模拟输入

nc -lk 8888
# 00-10秒输入
a,11111,1
# 11-20秒输入
a,11111,2
a,22222,3
# 21-30秒输入
a,11111,4

查看控制台打印结果

WaterSensor{id='a', ts=11111, vc=1}
调用reduce方法,之前的结果:WaterSensor{id='a', ts=11111, vc=2},现在来的数据:WaterSensor{id='a', ts=22222, vc=3}
WaterSensor{id='a', ts=1702022598011, vc=5}
WaterSensor{id='a', ts=11111, vc=4}

在这里插入图片描述

三、AggregateFunction

虽然ReduceFunction 可以解决大多数归约聚合的问题,但是我们通过上述案例可以发现:这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
Flink Window API 中的 aggregate 就突破了这个限制,可以定义更加灵活的窗口聚合操作。这个方法需要传入一个 AggregateFunction 的实现类作为参数。AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {ACC createAccumulator();ACC add(IN var1, ACC var2);OUT getResult(ACC var1);ACC merge(ACC var1, ACC var2);
}

接口中有四个方法:
1.createAccumulator()
创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
2.add()
将输入的元素添加到累加器中。
3.getResult()
从累加器中提取聚合的输出结果。
4.merge()
合并两个累加器,并将合并后的状态作为一个累加器返回。
所以可以看到,AggregateFunction 的工作原理是:首先调用 createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/207188.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

听GPT 讲Rust源代码--src/tools(9)

File: rust/src/tools/rust-analyzer/crates/ide-assists/src/handlers/apply_demorgan.rs 在Rust源代码中&#xff0c;apply_demorgan.rs文件位于rust-analyzer工具的ide-assists库中&#xff0c;其作用是实现一个辅助函数&#xff0c;用于在代码中应用De Morgan定律的变换。 …

Android : 篮球记分器app _简单应用

示例图&#xff1a; 1.导包 在build.gradle 中 加入 // 使用androidx版本库implementation androidx.lifecycle:lifecycle-extensions:2.1.0-alpha03 2. 开启dataBinding android{...// 步骤1.开启data bindingdataBinding {enabled true}...} 3.写个类继承 ViewModel pac…

自下而上-存储全栈(TiDB/RockDB/SPDK/fuse/ceph/NVMe/ext4)存储技术专家成长路线

数字化时代的到来带来了大规模数据的产生&#xff0c;各行各业都面临着数据爆炸的挑战。 随着云计算、物联网、人工智能等新兴技术的发展&#xff0c;对存储技术的需求也越来越多样化。不同应用场景对存储的容量、性能、可靠性和成本等方面都有不同的要求。具备存储技术知识和技…

机器学习-聚类问题

前言 聚类算法又叫做”无监督分类“&#xff0c;目标是通过对无标记训练样本来揭示数据的内在性质及 规律&#xff0c;为进一步的数据分析提供基础。 Kmeans 作为聚类算法的典型代表&#xff0c;Kmeans可以说是最简单的聚类算法&#xff0c;没有之一&#xff0c;那她是怎么完…

MySQL为何偏爱B+树索引

一、MySQL、B树概念 MySQL是一种关系型数据库&#xff0c;它使用SQL语言来操作数据。SQL语言可以实现对数据的增删改查等操作&#xff0c;但是如果数据量很大&#xff0c;那么这些操作的效率就会很低。为了提高效率&#xff0c;MySQL引入了索引的概念。 索引是一种数据结构&am…

人体关键点检测1:人体姿势估计数据集

人体关键点检测1&#xff1a;人体姿势估计数据集 目录 人体关键点检测1&#xff1a;人体姿势估计数据集 1.人体姿态估计 2.人体姿势估计数据集 &#xff08;1&#xff09;COCO数据集 &#xff08;2&#xff09;MPII数据集 &#xff08;3&#xff09;Human3.6M &#xf…

MS5228/5248/5268:2.7V 到 5.5V、 12/14/16Bit、内置基准、八通道数模转换器

MS5228/MS5248/MS5268 是一款 12/14/16bit 八通道输出的电压型 DAC &#xff0c;内部集成上电复位电路、可选内部基准、接口采用四线串口模式&#xff0c; 最高工作频率可以到 40MHz &#xff0c;可以兼容 SPI 、 QSPI 、 DSP 接口和 Microwire 串口。输出接到一个 …

<习题集><LeetCode><链表><2/19/21/23/24>

目录 2. 两数相加 19. 删除链表的倒数第 N 个结点 21. 合并两个有序链表 23. 合并 K 个升序链表 24. 两两交换链表中的节点 2. 两数相加 https://leetcode.cn/problems/add-two-numbers/ public ListNode addTwoNumbers(ListNode l1, ListNode l2) {//head是cur链表头节点…

Go语言实现深度学习的正向传播和反向传播

文章目录 开发前言开发理论图解理论数据类型数学函数数据节点统一抽象变量数据节点常量数据节点单目运算封装双目运算封装算子节点统一抽象基础算子加法算子减法算子乘法算子除法算子指数算子对数算子正切算子正弦算子余弦算子数据流图正向传播反向传播正向训练反向训练运行示例…

船舶机电设备振动数据采集监控系统解决方案

船舶运行中&#xff0c;通常需要通过振动数据采集系统对船舶的各个机电设备运行进行监控&#xff0c;有助于在设备故障时快速预警&#xff0c;进行诊断、分析和维护&#xff0c;保证船舶机电设备正常工作&#xff0c;从而确保工作人员及船舶的安全。 船舶各种机电设备会产生大…

【模型量化】神经网络量化基础及代码学习总结

1 量化的介绍 量化是减少神经网络计算时间和能耗的最有效的方法之一。在神经网络量化中&#xff0c;权重和激活张量存储在比训练时通常使用的16-bit或32-bit更低的比特精度。当从32-bit降低到8-bit&#xff0c;存储张量的内存开销减少了4倍&#xff0c;矩阵乘法的计算成本则二…

ALNS算法中随机化重要性的评价

文章概述 本研究分析了在海上提货和交付问题中使用的ALNS元启发式算法中的随机化成分。研究者提出了简单的确定性替代方案&#xff0c;并通过实验比较了随机化和确定性成分的性能。结果表明&#xff0c;初始实现的简单确定性替代方案能够与随机化成分的性能相匹配。这项研究为…

IDEA使用git从远程仓库获取项目

将地址填入url中 然后直接clone就行

《Easy3d+Qt+VTK》学习

《Easy3dQtVTK》学习-1、编译与配置 一、编译二、配置注 一、编译 1、 资源下载&#xff1a;easy3d giuhub 2、解压缩 3、用qt打开CMakeLists.txt即可 4、点击项目&#xff0c;选择debug或者release&#xff0c;图中3处可自行选择&#xff0c;因为我的qt版本是6&#xff0c…

在linux上如何运用虚拟数据优化器VDO

本章主要介绍虚拟化数据优化器。 什么是虚拟数据优化器VDO 创建VDO设备以节约硬盘空间 16.1 了解什么是VDO VDO全称是Virtual Data Optimize&#xff08;虚拟数据优化)&#xff0c;主要是为了节省硬盘空间。 现在假设有两个文件file1和 file2&#xff0c;大小都是10G。file…

cpu 300% 爆满 内存占用不高 排查

top查询 cpu最高的PID ps -ef | grep PID 查看具体哪一个jar服务 jstack -l PID > ./jstack.log 下载/打印进程的线程栈信息 可以加信息简单分析 或进一步 查看堆内存使用情况 jmap -heap Java进程id jstack.log 信息示例 Full thread dump Java HotSpot(TM) 64-Bit Se…

横向扩展统一存储与备份服务器功能

Infortrend 更新了GS&#xff0c;GSe&#xff0c;GSe Pro统一存储系列的备份服务器功能。该功能降低数据备份成本&#xff0c;并提供灵活的备份策略。通过备份服务器功能&#xff0c;用户可以通过多种途径实现数据备份&#xff0c;包括公有云&#xff08;兼容S3&#xff09;、文…

C/C++,树算法——二叉树的插入(Insert)算法之源程序

1 文本格式 #include<iostream> using namespace std; // A BTree node class BTreeNode { int* keys; // An array of keys int t; // Minimum degree (defines the range for number of keys) BTreeNode** C; // An array of child pointers int …

dell服务器重启后显示器黑屏

1.硬件层面&#xff1a;观察主机的指示灯 &#xff08;1&#xff09;指示灯偏黄&#xff0c;硬件存在问题&#xff08;内存条有静电&#xff0c;拔出后用橡皮擦擦拭&#xff1b;或GPU松动&#xff09; a.电源指示灯黄&#xff0c;闪烁三下再闪烁一下&#xff0c;扣下主板上的纽…

Python Appium Selenium 查杀进程的实用方法

一、前置说明 在自动化过程中&#xff0c;经常需要在命令行中执行一些操作&#xff0c;比如启动应用、查杀应用等&#xff0c;因此可以封装成一个CommandExecutor来专门处理这些事情。 二、操作步骤 # cmd_util.pyimport logging import os import platform import shutil i…