Flink Window中典型的增量聚合函数(ReduceFunction / AggregateFunction)

一、什么是增量聚合函数

在Flink Window中定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,其实还完全没有头绪,这也就是窗口函数所需要做的事情。所以在窗口分配器之后,我们还要再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。
窗口可以将数据收集起来,最基本的处理操作当然就是基于窗口内的数据进行聚合。
我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。
典型的增量聚合函数有两个:ReduceFunction 和 AggregateFunction。
在这里插入图片描述

二、归约函数ReduceFunction

源码解析

@FunctionalInterface
@Public
public interface ReduceFunction<T> extends Function, Serializable {T reduce(T var1, T var2) throws Exception;
}

实际案例
在Flink中,使用socket模拟实时的数据流DataStream,通过定义一个滚动窗口,窗口的大小为10s,按照id分区,使用reduce聚合函数实现value的累加统计

package com.flink.DataStream.WindowFunctions;import com.flink.POJOs.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class FlinkWindowReduceFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);DataStreamSource<String> streamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);// 注意这里为什么返回的是KeyedStream(建控流/分区流),而不是DataStreamKeyedStream<WaterSensor, String> keyedStream = streamSource// 使用map函数将输入的string转为一个WaterSensor类.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String s) throws Exception {// 这里写的很详细,如何把string转为的WaterSensor类String[] strings = s.split(",");String id = strings[0];Long ts = Long.valueOf(strings[1]);Integer vc = Integer.valueOf(strings[2]);WaterSensor waterSensor = new WaterSensor();waterSensor.setId(id);waterSensor.setTs(ts);waterSensor.setVc(vc);return waterSensor;//return new WaterSensor(strings[0],Long.valueOf(strings[1]),Integer.valueOf(strings[2])}})// 按照id做keyBy分区(提问:KeyBy是如何实现分区的?).keyBy(new KeySelector<WaterSensor, String>() {// 也可以直接使用lamda表达式更简单@Overridepublic String getKey(WaterSensor waterSensor) throws Exception {// getId()方法就是return的waterSensor.idreturn waterSensor.getId();}});/*** 窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(WindowFunctions)* .window()方法需要传入一个窗口分配器,它指明了窗口的类型* */SingleOutputStreamOperator<WaterSensor> outputStreamOperator = keyedStream// 设置滚动窗口的大小(10秒).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))// 使用匿名函数实现增量聚合函数ReduceFunction.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor waterSensor1, WaterSensor waterSensor2) throws Exception {System.out.println("调用reduce方法,之前的结果:" + waterSensor1 + ",现在来的数据:" + waterSensor2);return new WaterSensor(waterSensor1.getId(), System.currentTimeMillis(), waterSensor1.getVc() + waterSensor2.getVc());}});outputStreamOperator.print();streamExecutionEnvironment.execute();}
}

启动Flink程序,启动nc,模拟输入

nc -lk 8888
# 00-10秒输入
a,11111,1
# 11-20秒输入
a,11111,2
a,22222,3
# 21-30秒输入
a,11111,4

查看控制台打印结果

WaterSensor{id='a', ts=11111, vc=1}
调用reduce方法,之前的结果:WaterSensor{id='a', ts=11111, vc=2},现在来的数据:WaterSensor{id='a', ts=22222, vc=3}
WaterSensor{id='a', ts=1702022598011, vc=5}
WaterSensor{id='a', ts=11111, vc=4}

在这里插入图片描述

三、聚合函数AggregateFunction

虽然ReduceFunction 可以解决大多数归约聚合的问题,但是我们通过上述案例可以发现:这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
Flink Window API 中的 aggregate 就突破了这个限制,可以定义更加灵活的窗口聚合操作。这个方法需要传入一个 AggregateFunction 的实现类作为参数。AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

源码解析

@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {ACC createAccumulator();ACC add(IN var1, ACC var2);OUT getResult(ACC var1);ACC merge(ACC var1, ACC var2);
}

接口中有四个方法:
1.createAccumulator()
创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
2.add()
将输入的元素添加到累加器中。
3.getResult()
从累加器中提取聚合的输出结果。
4.merge()
合并两个累加器,并将合并后的状态作为一个累加器返回。
所以可以看到,AggregateFunction 的工作原理是:首先调用 createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

案例解析

package com.flink.DataStream.WindowFunctions;import com.flink.POJOs.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class FlinkWindowAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);DataStreamSource<String> streamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);KeyedStream<WaterSensor, String> keyedStream = streamSource.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String s) throws Exception {String[] split = s.split(",");return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));}}).keyBy((KeySelector<WaterSensor, String>) waterSensor -> waterSensor.getId());// 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> windowAssigner = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(20)));SingleOutputStreamOperator<String> aggregate = windowAssigner.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("调用add方法,value=" + value);return accumulator + value.getVc();}@Overridepublic String getResult(Integer accumulator) {System.out.println("调用getResult方法");return accumulator.toString();}@Overridepublic Integer merge(Integer integer, Integer acc1) {System.out.println("调用merge方法");return null;}});aggregate.print();streamExecutionEnvironment.execute();}}

启动Flink程序,启动nc,模拟数据流输入
在这里插入图片描述在这里插入图片描述

创建累加器
调用add方法,value=WaterSensor{id='a', ts=1111, vc=1}
调用getResult方法
1
创建累加器
调用add方法,value=WaterSensor{id='a', ts=1111, vc=2}
调用add方法,value=WaterSensor{id='a', ts=1111, vc=3}
调用add方法,value=WaterSensor{id='a', ts=1111, vc=4}
调用getResult方法
9
创建累加器
调用add方法,value=WaterSensor{id='a', ts=1111, vc=5}
调用getResult方法
5

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/221658.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机组成原理-ATT格式vsIntel格式

文章目录 AT&T格式 vs lntel格式 x86汇编语言是lntel格式&#xff0c;还有一种汇编语言格式是AT&T AT&T格式 vs lntel格式 lntel格式中取主存地址内容未指明长度默认为32位&#xff0c;对应下图中第四行右边的指令 百分号 美元符号 小括号 可用于计算机结构体数组…

竞赛保研 python+opencv+机器学习车牌识别

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于机器学习的车牌识别系统 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;4分工作量&#xff1a;4分创新点&#xff1a;3分 该项目较为新颖&#xff0c;适…

Amazon Q:对话智能赋能企业发展

授权说明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 亚马逊云科技开发者社区, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道 。 在最近举办的亚马逊云科技大会上&#xff0c;引人瞩目的消息是A…

斑马zebra目标检测数据集VOC+YOLO格式2300张

斑马是由四百万年前的原马进化出来的&#xff0c;最早出现的斑马可能是细纹斑马。有关史前马科动物的化石现存于美国爱达荷州克文的克文化石床国家博物馆。斑马的史前马为“克文马”&#xff08;美洲斑马或者克文斑马&#xff09;&#xff0c;学名为“Equussimplicidens”&…

​ 轻量应用服务器:亚马逊云科技打造全球领先的云计算解决方案

随着“第四次工业革命”的爆炸式发展&#xff0c;众多企业都将自己的业务与迅速发展的应用开发和网站建设领域高度绑定。而对于众多有上云需求的企业和个人用户来说&#xff0c;选择一款自己的服务器配置就成为了一项至关重要的任务。而随着需求端的不断扩大&#xff0c;云服务…

Nacos-NacosRule 负载均衡—设置集群使本地服务优先访问

userservice: ribbon: NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule # 负载均衡规则 NacosRule 权重计算方法 目录 一、介绍 二、示例&#xff08;案例截图&#xff09; 三、总结 一、介绍 NacosRule是AlibabaNacos自己实现的一个负载均衡策略&…

《教育信息化论坛》期刊杂志论文发表投稿

《教育信息化论坛》由中原大地传媒股份有限公司主管&#xff0c;河南电子音像出版社、文心出版社主办&#xff0c;我刊立足于教育信息化、教育现代化科研&#xff0c;重点介绍国内外信息化、现代化教学手段、教学方式、教学传播研究的新成果和新观点&#xff0c;推广成功的国内…

【Spring教程28】Spring框架实战:从零开始学习SpringMVC 之 请求与请求参数详解

目录 1 设置请求映射路径1.1 环境准备 1.2 问题分析1.3 设置映射路径 2 请求参数2.1 环境准备2.2 参数传递2.2.1 GET发送单个参数2.2.2 GET发送多个参数2.2.3 GET请求中文乱码2.2.4 POST发送参数2.2.5 POST请求中文乱码 欢迎大家回到《Java教程之Spring30天快速入门》&#xff…

认识缓存,一文读懂Cookie,Session缓存机制。

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…

一、CM4树莓派系统烧录

操作系统&#xff08;Raspberry Pi OS&#xff09;应用程序 Raspberry Pi OS系统&#xff08;树莓派推荐系统&#xff09;&#xff1a;较小的内存占用、较高的易用性以及对浮点单元的支持 浮点单元&#xff1a;浮点运算单元&#xff08;FPU&#xff09;是处理器中专门进行浮点…

windows redis 允许远程访问配置

安装好windows版本的redis&#xff0c;会以服务方式启动&#xff0c;但是不能远程访问&#xff0c;这个时候需要修改配置。redis安装路径下会有2个配置文件&#xff0c;究竟需要怎么修改才能生效呢&#xff1f;看下图 这里的redis服务指定了是redis.windows-service.conf文件&…

Docker | 发布镜像到镜像仓库

✅作者简介:大家好,我是Leo,热爱Java后端开发者,一个想要与大家共同进步的男人😉😉 🍎个人主页:Leo的博客 💞当前专栏:Docker系列 ✨特色专栏: MySQL学习 🥭本文内容:Docker | 发布镜像到镜像仓库 📚个人知识库: [Leo知识库]https://gaoziman.gitee.io/bl…

骨灰级程序员那些年曾经告诉我们的高效学习的态度

一、背景 以前阅读陈皓老师的左耳听风专栏中关于如何高效学习的总结让我收货颇丰&#xff0c;今天总结了一下&#xff0c;分享给大家 老师说&#xff1a; 学习是一件“逆人性”的事&#xff0c;就像锻炼身体一样&#xff0c;需要人持续付出&#xff0c;会让人感到痛苦&#…

c语言堆排序(详解)

堆排序 堆排序是一种基于二叉堆数据结构的排序算法&#xff0c;它的基本概念包括&#xff1a; 建立堆&#xff1a;将待排序的列表构建成一个二叉堆&#xff0c;即满足堆的性质的完全二叉树&#xff0c;可以是最大堆或最小堆。最大堆要求父节点的值大于等于其子节点的值&#x…

基于node 安装express后端脚手架

1.首先创建文件件 2.在文件夹内打开终端 npm init 3.安装express: npm install -g express-generator注意的地方&#xff1a;这个时候安装特别慢,最后导致不成功 解决方法&#xff1a;npm config set registry http://registry.npm.taobao.org/ 4.依次执行 npm install -g ex…

Qt之Ui样式表不影响子类的配置

Qt之Ui样式表不影响子类的配置 问题 在ui界面上布局时&#xff0c;当对容器进行样试设计时&#xff0c;会对容器内其它成员对象也进行了修改 分析 对应*.ui文件内容 从这个写法来看&#xff0c;它的样式属性会影响其成员对象样式属性。 解决方法 在容器的样式表中写时适…

如何将Galaxybase图数据库应用于电力设备管理

导读 近日&#xff0c;受强冷空气影响&#xff0c;部分北方地区出现不同程度的降雪&#xff0c;并持续降温。据国家电网发布的预警通知&#xff0c;要求启动预警响应和应急机制&#xff0c;密切跟踪灾害预警信息和应急响应情况&#xff0c;滚动研判分析覆冰、积雪、低温等对电…

beyond compare文件夹比较时候文本乱码问题解决

“格式”&#xff0c;在下面的左侧编码重写和右侧编码覆盖选择 GB2312/UTF-8/GBK&#xff0c;这个可以根据自己喜好和文本自身的encode选择。

跟着我学Python基础篇:08.集合和字典

往期文章 跟着我学Python基础篇&#xff1a;01.初露端倪 跟着我学Python基础篇&#xff1a;02.数字与字符串编程 跟着我学Python基础篇&#xff1a;03.选择结构 跟着我学Python基础篇&#xff1a;04.循环 跟着我学Python基础篇&#xff1a;05.函数 跟着我学Python基础篇&#…

从零构建属于自己的GPT系列6:模型部署2(文本生成函数解读、模型本地化部署、文本生成文本网页展示、代码逐行解读)

&#x1f6a9;&#x1f6a9;&#x1f6a9;Hugging Face 实战系列 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在PyCharm中进行 本篇文章配套的代码资源已经上传 从零构建属于自己的GPT系列1&#xff1a;数据预处理 从零构建属于自己的GPT系列2&#xff1a;模型训…