FlinkSQL聚合函数(Aggregate Function)详解

使用场景: 聚合函数即 UDAF,常⽤于进多条数据,出⼀条数据的场景。

在这里插入图片描述

上图展示了⼀个 聚合函数的例⼦ 以及 聚合函数包含的重要⽅法

案例场景:

关于饮料的表,有三个字段,分别是 id、name、price,表⾥有 5 ⾏数据,找到所有饮料⾥最贵的饮料的价格,即执⾏⼀个 max() 聚合拿到结果,遍历所有 5 ⾏数据,最终结果就只有⼀个数值。

开发流程:

实现 AggregateFunction 接⼝,其中所有的⽅法必须是 public 的、⾮ static 的;

必须实现以下⽅法:

  • Acc聚合中间结果 createAccumulator() : 为当前 Key 初始化⼀个空的 accumulator,其存储了聚合的中间结果,⽐如在执⾏ max() 时会存储当前的 max 值;
  • accumulate(Acc accumulator, Input输⼊参数) : 每⼀⾏数据,调⽤ accumulate() ⽅法更新 accumulator,处理每⼀条输⼊数据,方法必须声明为 public 和⾮ static 的,accumulate ⽅法可以重载,⽅法的参数类型可以不同,并且⽀持变⻓参数。
  • Output输出参数 getValue(Acc accumulator) : 通过调⽤ getValue ⽅法来计算和返回最终的结果。

某些场景下必须实现:

  • retract(Acc accumulator, Input输⼊参数) : 在回撤流的场景下必须实现,在计算回撤数据时调⽤,如果没有实现会直接报错。
  • merge(Acc accumulator, Iterable it) : 在批式聚合以及流式聚合中的 Session、Hop 窗⼝聚合场景下必须要实现,此外,这个⽅法对于优化也有帮助,例如,打开了两阶段聚合优化,需要 AggregateFunction 实现 merge ⽅法,在数据 shuffle 前先进⾏⼀次聚合计算。
  • resetAccumulator() : 在批式聚合中是必须实现的。

关于⼊参、出参数据类型信息的⽅法:

默认情况下,⽤户的 Input 输⼊参数( accumulate(Acc accumulator, Input输⼊参数) 的⼊参 Input输⼊参数 )、accumulator( Acc聚合中间结果 createAccumulator() 的返回结果)、 Output输出参数数据类型( Output输出参数 getValue(Acc accumulator) 的 Output输出参数 )会被 Flink 使⽤反射获取到。

对于 accumulator 和 Output 输出参数类型,Flink SQL 的类型推导在遇到复杂类型时会推导出错误的结果(注意:Input输⼊参数 因为是上游算⼦传⼊的,类型信息是确认的,不会出现推导错误),⽐如⾮基本类型 POJO 的复杂类型。

同 ScalarFunction 和 TableFunction, AggregateFunction 提供了 AggregateFunction#getResultType() 和AggregateFunction#getAccumulatorType() 指定最终返回值类型和 accumulator 的类型,两个函数的返回值类型是TypeInformation。

  • getResultType() : 即 Output 输出参数 getValue(Acc accumulator) 的输出结果数据类型;
  • getAccumulatorType() : 即 Acc聚合中间结果 createAccumulator() 的返回结果数据类型。

案例: 加权平均值

  • 定义⼀个聚合函数来计算某⼀列的加权平均
  • 在 TableEnvironment 中注册函数
  • 在查询中使⽤函数

实现思路:

为了计算加权平均值,accumulator 需要存储加权总和以及数据的条数,定义了类 WeightedAvgAccumulator 作为 accumulator,Flink 的 checkpoint 机制会⾃动保存 accumulator,在失败时进⾏恢复,保证精确⼀次的语义。

WeightedAvg(聚合函数)的 accumulate ⽅法有三个输⼊参数,第⼀个是 WeightedAvgAccum accumulator,另外两个是⽤户⾃定义的输⼊:输⼊的值 ivalue 和 输⼊的权重 iweight,尽管 retract()、merge()、resetAccumulator() ⽅法在⼤多数聚合类型中都不是必须实现的,但在样例中提供了他们的实现,并且定义了 getResultType() 和 getAccumulatorType()。

代码案例:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;import java.io.Serializable;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;/*** 输入数据:* a,1,1* a,10,2** 输出结果:* res1=>:1> +I[a, 1.0]* res2=>:1> +I[a, 1.0]* res3=>:1> +I[a, 1.0]** res1=>:1> -U[a, 1.0]* res1=>:1> +U[a, 7.0]* res3=>:1> -U[a, 1.0]* res3=>:1> +U[a, 7.0]* res2=>:1> -U[a, 1.0]* res2=>:1> +U[a, 7.0]*/
public class AggregateFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple3<String, Double, Double>> tpStream = source.map(new MapFunction<String, Tuple3<String, Double, Double>>() {@Overridepublic Tuple3<String, Double, Double> map(String input) throws Exception {return new Tuple3<>(input.split(",")[0],Double.parseDouble(input.split(",")[1]),Double.parseDouble(input.split(",")[2]));}});Table table = tEnv.fromDataStream(tpStream, "field,iValue,iWeight");tEnv.createTemporaryView("SourceTable", table);Table res1 = tEnv.from("SourceTable").groupBy($("field")).select($("field"), call(WeightedAvg.class, $("iValue"), $("iWeight")));// 注册函数tEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);// Table API 调⽤函数Table res2 = tEnv.from("SourceTable").groupBy($("field")).select($("field"), call("WeightedAvg", $("iValue"), $("iWeight")));// SQL API 调⽤函数Table res3 = tEnv.sqlQuery("SELECT field, WeightedAvg(`iValue`, iWeight) FROM SourceTable GROUP BY field");tEnv.toChangelogStream(res1).print("res1=>");tEnv.toChangelogStream(res2).print("res2=>");tEnv.toChangelogStream(res3).print("res3=>");env.execute();}// ⾃定义⼀个计算权重 avg 的 accmulatorpublic static class WeightedAvgAccumulator implements Serializable {public Double sum = 0.0;public Double count = 0.0;}// 输⼊:Long iValue, Integer iWeightpublic static class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccumulator> {// 创建⼀个 accumulator@Overridepublic WeightedAvgAccumulator createAccumulator() {return new WeightedAvgAccumulator();}public void accumulate(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {acc.sum += iValue * iWeight;acc.count += iWeight;}public void retract(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {acc.sum -= iValue * iWeight;acc.count -= iWeight;}// 获取返回结果@Overridepublic Double getValue(WeightedAvgAccumulator acc) {if (acc.count == 0) {return null;} else {return acc.sum / acc.count;}}// Session window 使⽤这个⽅法将⼏个单独窗⼝的结果合并public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> it) {for (WeightedAvgAccumulator a : it) {acc.count += a.count;acc.sum += a.sum;}}public void resetAccumulator(WeightedAvgAccumulator acc) {acc.count = 0.0;acc.sum = 0.0;}}
}

测试结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/140228.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

BI 数据可视化平台建设(1)—交叉表组件演变实战

作者&#xff1a;vivo 互联网大数据团队 - Zhu Jianchen 本文是vivo互联网大数据团队《BI数据可视化平台建设》系列文章第1篇 - 交叉表组件。 交叉表在数据分析里应用广泛&#xff0c;通过本文&#xff0c;你将了解到&#xff1a; 交叉表的基本概念&#xff0c;以及BI可视化平…

【狂神说Java】Nginx详解

✅作者简介&#xff1a;CSDN内容合伙人、信息安全专业在校大学生&#x1f3c6; &#x1f525;系列专栏 &#xff1a;狂神说Java &#x1f4c3;新人博主 &#xff1a;欢迎点赞收藏关注&#xff0c;会回访&#xff01; &#x1f4ac;舞台再大&#xff0c;你不上台&#xff0c;永远…

数据代理机制

目录 前言 Object.defineProperty() 语法 第三个参数配置项 数据代理机制的实现 MVVM分层思想 前言 本文介绍Vue的数据代理机制&#xff0c;也就是通过vue实例对象来代理data对象中的属性的操作 Object.defineProperty() 在介绍vue的数据代理机制前&#xff0c;我们需要…

JVM字符串常量池StringTable

目录 一、StringTable为什么要调整 二、String的基本特性 三、String的内存分配 四、字符串拼接操作 五、intern()方法 六、Stringtable的垃圾回收 七、G1中String去重操作 一、StringTable为什么要调整 jdk7之前&#xff0c;hotspot对于方法区的实现是永久代&#xff…

【图像处理:OpenCV-Python基础操作】

【图像处理&#xff1a;OpenCV-Python基础操作】 1 读取图像2 显示图像3 保存图像4 图像二值化、灰度图、彩色图&#xff0c;像素替换5 通道处理&#xff08;通道拆分、合并&#xff09;6 调整尺寸大小7 提取感兴趣区域、掩膜8 乘法、逻辑运算9 HSV色彩空间&#xff0c;获取特定…

ENVI IDL:如何基于气象站点数据进行反距离权重插值?

01 前言 仅仅练习&#xff0c;大可使用ArcGIS或者已经封装好的python模块进行插值&#xff0c;此处仅仅从底层理解如何从公式和代码理解反距离权重插值的过程&#xff0c;从而更深刻的理解IDL的使用和插值的理解。 02 函数说明 2.1 Read_CSV()函数 官方语法如下&#xff1a…

Python---字典---dict

1、为什么需要字典 如果想要存储一个人的信息&#xff0c;姓名&#xff1a;Tom&#xff0c;年龄&#xff1a;20周岁&#xff0c;性别&#xff1a;男&#xff0c;如何快速存储。 person [Tom, 20, 男] 在日常生活中&#xff0c;姓名、年龄以及性别同属于一个人的基本特征。 但…

Please No More Sigma(构造矩阵)

Please No More Sigma 给f(n)定义如下&#xff1a; f(n)1 n1,2; f(n)f(n-1)f(n-2) n>2; 给定n&#xff0c;求下式模1e97后的值 Input 第一行一个数字T&#xff0c;表示样例数 以下有T行&#xff0c;每行一个数&#xff0c;表示n。 保证T<100&#xff0c;n<100000…

51单片机入门

一、单片机以及开发板介绍 写在前面&#xff1a;本文为作者自学笔记&#xff0c;课程为哔哩哔哩江协科技51单片机入门教程&#xff0c;感兴趣可以看看&#xff0c;适合普中A2开发板或者HC6800-ESV2.0江协科技课程所用开发板。 工具安装请另行搜索&#xff0c;这里不做介绍&…

CompareM-平均氨基酸一致性(AAI)计算

文章目录 Comparem简介比较基因组统计基因组使用模式其他 安装使用基于基因组计算氨基酸一致性基于基因组蛋白计算氨基酸一致性 结果转变成矩阵参考 Comparem简介 CompareM 是一个支持进行大规模基因组比较分析的软件工具包。它提供跨基因组&#xff08;如氨基酸一致性&#x…

《未来之路:技术探索与梦想的追逐》

创作纪念日 日期&#xff1a;2023年07月05日文章标题&#xff1a;《从零开始-与大语言模型对话学技术-gradio篇&#xff08;1&#xff09;》成为创作者第128天 在这个平凡的一天&#xff0c;我撰写了自己的第一篇技术博客&#xff0c;题为《从零开始-与大语言模型对话学技术-…

深度学习的集体智慧:最新发展综述

一、说明 我们调查了来自复杂系统的想法&#xff0c;如群体智能、自组织和紧急行为&#xff0c;这些想法在机器学习中越来越受欢迎。人工神经网络正在影响我们的日常生活&#xff0c;从执行预测性任务&#xff08;如推荐、面部识别和对象分类&#xff09;到生成任务&#xff08…

RT-DETR算法改进:更换损失函数DIoU损失函数,提升RT-DETR检测精度

💡本篇内容:RT-DETR算法改进:更换损失函数DIoU损失函数 💡本博客 改进源代码改进 适用于 RT-DETR目标检测算法(ultralytics项目版本) 按步骤操作运行改进后的代码即可🚀🚀🚀 💡改进 RT-DETR 目标检测算法专属 文章目录 一、DIoU理论部分 + 最新 RT-DETR算法…

从0到0.01入门React | 008.精选 React 面试题

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…

自动计算零售数据分析指标?BI软件表示可行

随着BI技术的飞速发展&#xff0c;借助系统来计算分析指标也不是什么难事&#xff0c;即便是面对组合多变的零售数据分析指标&#xff0c;奥威BI软件也依旧可以又快又精准地完成指标计算。 BI软件可以自动计算零售数据分析指标&#xff0c;如销售额、库存量、订单量等。在计算…

[工业自动化-11]:西门子S7-15xxx编程 - PLC从站 - 分布式IO从站/从机

目录 一、什么是以分布式IO从站/从机 二、分布式IO从站的意义 三、ET200分布式从站系列 一、什么是以分布式IO从站/从机 在工业自动化领域中&#xff0c;分布式 IO 系统是目前应用最为广泛的一种 I/O 系统&#xff0c;其中分布式 IO 从站是一个重要的组成部分。 分布式 IO …

Redis系列-四种部署方式-单机部署+主从模式+哨兵模式【7】

目录 Redis系列-四种部署方式-单机部署主从模式【7】redis-四种部署模式单机模式主从模式数据同步的方式全量数据同步增量数据同步 Redis哨兵模式总结缺点&#xff1a;哨兵模式应用sentinel.conf配置项 REF 个人主页: 【⭐️个人主页】 需要您的【&#x1f496; 点赞关注】支持…

移动医疗科技:开发互联网医院系统源码

在这个数字化时代&#xff0c;互联网医院系统成为了提供便捷、高效医疗服务的重要手段。本文将介绍利用移动医疗科技开发互联网医院系统的源码&#xff0c;为医疗行业的数字化转型提供有力支持。 智慧医疗、互联网医院这一类平台可以通过线上的形式进行部分医疗服务&#xff…

59基于matlab的爬行动物搜索算法(Reptile search algorithm, RSA)

基于matlab的爬行动物搜索算法&#xff08;Reptile search algorithm, RSA&#xff09;一种新型智能优化算法。该算法主要模拟鳄鱼的捕食行为&#xff0c;来实现寻优求解&#xff0c;具有收敛速度快&#xff0c;寻优能力强的特点。程序已调通&#xff0c;可直接运行。 59matlab…

云原生微服务架构及实现技术

云原生是一种技术理念和架构方法&#xff0c;它充分利用云计算的优势&#xff0c;将应用程序和基础设施进行优化&#xff0c;以适应云环境的特性。云原生的设计原则主要包括弹性、韧性、安全性、可观测性、灰度等&#xff0c;旨在让企业在云环境中实现轻量、敏捷、高度自动化的…