详解 Flink Table API 和 Flink SQL 之函数

一、系统内置函数

1. 比较函数

API函数表达式示例
Table API===,>,<,!=,>=,<=id===1001,age>18
SQL=,>,<,!=,>=,<=id=‘1001’,age>18

2. 逻辑函数

API函数表达式示例
Table API&&,||,!,.isFalse1>1 && 2>1,true.isFalse
SQLand,or,is false,not1>1 and 2>1,true is false

3. 算术函数

API函数表达式示例
Table API+,-,*,/,n1.power(n2)1 + 1,2.power(2)
SQL+,-,*,/,power(n1,n2)1 + 1,power(2,2)

4. 字符串函数

API函数表达式示例
Table APIstring1 + string2,.upperCase(),.lowerCase(),.charLength()‘a’ + ‘b’,
‘hello’.upperCase()
SQLstring1 || string2,upper(),lower(),char_length()‘a’ || ‘b’,upper(‘hello’)

5. 时间函数

API函数表达式示例
Table API.toDate,.toTimestamp,currentTime(),n.days,n.minutes‘20230107’.toDate,
2.days,10.minutes
SQLDate string,Timestamp string,current_time,interval string rangeDate ‘20230107’,interval ‘2’ hour/second/minute/day

6. 聚合函数

API函数表达式示例
Table API.count,.sum,.sum0id.count,age.sum,sum0 表示求和的所有值都为 null 则返回 0
SQLcount(),sum(),rank(),row_number()count(*),sum(age)

二、用户自定义函数(UDF)

UDF 显著地扩展了查询的表达能力,可以解决一些系统内置函数无法解决的需求。使用步骤为:自定义 UDF 函数类继承 UserDefinedFunction 抽象类;创建 UDF 实例并在环境中调用 registerFunction() 方法注册;在 Table API 或 SQL 中使用

1. 标量函数

Scalar Function,可以将 0、1 或多个标量值,映射到一个新的标量值,一进一出

/**用户自定义标量函数步骤:1.自定义函数类继承 ScalarFunction 抽象类,并在类中定义一个 public 的名为 eval 的方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestScalarFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的标量函数 hashCode,查询每条数据 id 的hashCode 值//2.创建自定义标量函数实例HashCode hashCode = new HashCode(0.8);//3.在环境中注册函数tableEnv.registerFunction("hashCode", hashCode);//4.使用//4.1 Table APITable resultTable = sensorTable.select("id, ts, hashCode(id)");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, ts, hashCode(id) from sensor");tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义 HashCode 函数,继承 ScalarFunction 类,并定义 eval 方法public static class HashCode extends ScalarFunction {private int factor;public HashCode(int factor) {this.factor = factor;}//该方法必须为 public 且名称必须为 eval,返回值和参数可以自定义public int eval(String str) {return str.hashCode() * factor;}}
}

2. 表函数

Table Function,可以将0、1或多个标量值作为输入参数,可以返回任意数量的行作为输出。一进多出

/**用户自定义表函数步骤:1.自定义函数类继承 TableFunction 抽象类,定义输出泛型,并在类中定义一个 public 的名为 eval,返回值为 void 的方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestTableFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表函数 Split,将每条数据的 id 分割并按 (word, length) 输出//2.创建自定义表函数实例Split split = new Split("_");//3.在环境中注册函数tableEnv.registerFunction("split", split);//4.使用//4.1 Table API//一进多出函数要配合 joinLateral 使用,侧写表Table resultTable = sensorTable.joinLateral("split(id) as (word, length)").select("id, ts, word, length");//4.2 SQL//一进多出函数要进行 lateral table 的关联,类似于 lateral viewTable resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length from sensor, lateral table(split(id)) as lt(word, length)");tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义表函数 Split 继承 TableFunction,定义输出类型为 Tuple2<String, Integer>public static class Split extends TableFunction<Tuple2<String, Integer>> {private String separator = ",";public Split(String separator) {this.separator = separator;}//必须定义一个 public 的返回值为 void 的 eval 方法public void eval(String str) {for(String s : str.split(separator)) {//使用 collect() 方法输出结果collect(new Tuple2<>(s, s.length()));}}}
}

3. 聚合函数

Aggregate Function,可以把一个表中的数据,聚合成一个标量值,多进一出

/**用户自定义聚合函数步骤:1.自定义函数类继承 AggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和getValue()方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的聚合函数 AvgTemp,按 id 分组后求最新的平均温度值//2.创建自定义聚合函数实例AvgTemp avgTemp = new AvgTemp();//3.在环境中注册函数tableEnv.registerFunction("avgTemp", avgTemp);//4.使用//4.1 Table API//聚合函数必须在 groupBy 后的 aggregate 方法使用Table resultTable = sensorTable.groupBy("id").aggregate("avgTemp(temp) as avgtemp").select("id, avgtemp");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, avgTemp(temp) from sensor group by id");tableEnv.toRetractStream(resultTable, Row.class).print("result");tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义聚合函数 AvgTemp 继承 AggregateFunction 类,定义输出类型为 Double,中间累加器类型为 Tuple2<Double, Integer> 并实现方法public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double, Integer>> {@Overridepublic Double getValue(Tuple2<Double, Integer> accumulator) {return accumulator.f0 / accumulator.f1;}@Overridepublic Tuple2<Double, Integer> createAccumulator() {return new Tuple2<>(0.0, 0);}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublic void accumulate(Tuple2<Double, Integer> accumulator, Double temp) {accumulator.f0 += temp;accumulator.f1 += 1;}}
}

4. 表聚合函数

Table Aggregate Function,可以把一个表中数据,聚合为具有多行和多列的结果表,多进多出

/**用户自定义表聚合函数步骤:1.自定义函数类继承 TableAggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和 emitValue()方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表聚合函数 Top2Temp,按 id 分组后求最新的 top2 温度值//2.创建自定义表聚合函数实例Top2Temp top2Temp = new Top2Temp();//3.在环境中注册函数tableEnv.registerFunction("top2Temp", top2Temp);//4.使用//Table API//表聚合函数必须在 groupBy 后的 flatAggregate 方法使用Table resultTable = sensorTable.groupBy("id").flatAggregate("top2Temp(temp) as (temp, irank)").select("id, temp, irank");//表聚合函数目前不能在 SQL 中使用tableEnv.toRetractStream(resultTable, Row.class).print("result");env.execute();}//定义一个 Accumulatorpublic static class Top2TempAcc {private Double highestTemp = Double.MIN_VALUE;private Double secondHighestTemp = Double.MIN_VALUE;public Double getHighestTemp() {return highestTemp;}public void setHighestTemp(Double highestTemp) {this.highestTemp=highestTemp;}public Double getSecondHighestTemp() {return secondHighestTemp;}public void setSecondHighestTemp(Double secondHighestTemp) {this.secondHighestTemp=secondHighestTemp;}}//1.自定义表聚合函数 Top2Temp 继承 TableAggregateFunction 类,定义输出类型为 Tuple2<Double, Integer>,中间累加器类型为自定义的 Top2TempAcc 类并实现方法public static class Top2Temp extends TableAggregateFunction<Tuple2<Double, Integer>, Top2TempAcc> {@Overridepublic void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>> out) {out.collect(new Tuple2<>(acc.getHighestTemp(), 1));out.collect(new Tuple2<>(acc.getSecondHighestTemp(), 2));}@Overridepublic Top2TempAcc createAccumulator() {return new Top2TempAcc();}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublic void accumulate(Top2TempAcc acc, Double temp) {if(acc.getHighestTemp() < temp) {acc.setSecondHighestTemp(acc.getHighestTemp());acc.setHighestTemp(temp);} else if(acc.getSecondHighestTemp() < temp) {acc.setSecondHighestTemp(temp);}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/26873.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用PlayCanvas打造一个3D模型

本文由ScriptEcho平台提供技术支持 项目地址&#xff1a;传送门 基于 PlayCanvas 的 3D 物理场景开发 应用场景介绍 PlayCanvas 是一款功能强大的 3D 引擎&#xff0c;可用于创建各种类型的 3D 体验&#xff0c;包括游戏、模拟和交互式可视化。本技术博客将介绍如何使用 Pl…

怎么把wma格式转化为mp3格式?四种wma格式转成MP3格式的方法

怎么把wma格式转化为mp3格式&#xff1f;转换WMA格式音频文件为MP3格式是一个常见的需求&#xff0c;尤其是在我们希望在不同设备或平台上播放音频时。WMA格式虽然在Windows系统中较为常见&#xff0c;但在其他平台上的兼容性可能不如MP3格式。因此&#xff0c;将WMA转换为MP3是…

基于Spring Boot的智能分析平台

项目介绍&#xff1a; 智能分析平台实现了用户导入需要分析的原始数据集后&#xff0c;利用AI自动生成可视化图表和分析结论&#xff0c;改善了传统BI系统需要用户具备相关数据分析技能的问题。该项目使用到的技术是SSMSpring Boot、redis、rabbitMq、mysql等。在项目中&#…

在 Wed 中应用 MyBatis(同时使用MVC架构模式,以及ThreadLocal 事务控制)

1. 在 Wed 中应用 MyBatis&#xff08;同时使用MVC架构模式&#xff0c;以及ThreadLocal 事务控制&#xff09; 文章目录 1. 在 Wed 中应用 MyBatis&#xff08;同时使用MVC架构模式&#xff0c;以及ThreadLocal 事务控制&#xff09;2. 实现步骤&#xff1a;1. 第一步&#xf…

Vulnhub-DC-1,7

靶机IP:192.168.20.141 kaliIP:192.168.20.128 网络有问题的可以看下搭建Vulnhub靶机网络问题(获取不到IP) 前言 1和7都是Drupal的网站&#xff0c;只写了7&#xff0c;包含1的知识点 信息收集 用nmap扫描端口及版本号 进入主页查看作者给的提示&#xff0c;不是暴力破解的…

nodejs湖北省智慧乡村旅游平台-计算机毕业设计源码00232

摘 要 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;旅游行业当然也不能排除在外。智慧乡村旅游平台是以实际运用为开发背景&#xff0c;运用软件工程开发方法&#xff0c;采…

Weighted A* 改进型(1):XDP

本文的主要内容来自于文献[1]&#xff0c;总的来说这篇文献给我的感觉就是理论证明非常精妙&#xff0c;最后的实际效果也是提升的非常明显。 在Introduction中作者给出了一般Best first search&#xff08;BFS&#xff0c;常用的包括A *&#xff0c;weighted A * &#xff0c…

TK防关联引流系统:全球多账号运营,一“键”掌控!

在TikTok的生态系统中&#xff0c;高效管理多个账号对于品牌推广的成功起着决定性的作用。TK防关联引流系统&#xff0c;作为一款专门为TikTok用户打造的强大工具&#xff0c;为全球范围内的多账号运营提供了坚实的支持。 TK防关联引流系统的核心优势体现在以下几个方面&#x…

2024.6.13

2024.6.13 【痛苦的&#xff0c;热烈的&#xff0c;误解的&#xff0c;无解的&#xff0c;快乐的&#xff0c;解脱的】 Thursday 五月初八 <theme oi-“game theory”> P4018 Roy&October之取石子 Roy&October之取石子 题目背景 Roy 和 October 两人在玩一个…

anaconda安装pytorch-快速上手99%可以(可以虚拟环境OR不进行虚拟环境)

一、预备工作 先检查自己是否有anaconda 在cmd里面输入conda --version查看 二、在anaconda中创建虚拟环境 1.1 打开Anaconda Prompt 1.2 进行自定义安装python 将其中的自定义地址和版本换成自己想安装的地址和版本 我这里安装的地址是E:\Anaconda\DL,python版本是3.8.3…

uniapp地图自定义文字和图标

这是我的结构&#xff1a; <map classmap id"map" :latitude"latitude" :longitude"longitude" markertap"handleMarkerClick" :show-location"true" :markers"covers" /> 记住别忘了在data中定义变量…

Vue页面生成PDF后调起浏览器打印

一、安装依赖 首先&#xff0c;需要安装 html2canvas 和 jsPDF 库。 npm install html2canvas jspdf二、创建公共方法引入 在utils文件夹下创建两个文件分别为pdfExport.js和printPDF.js&#xff0c;代码如下&#xff1a; pdfExport.js import html2canvas from html2canv…

Python PDF转化wolrd代码怎么写

将PDF文件转换为Word文档的过程通常需要使用一些外部库来实现&#xff0c;因为Python本身并不直接支持这种转换。一个常用的库是pdf2docx&#xff0c;它可以帮助我们将PDF文件转换为Word文档格式。以下是使用pdf2docx库将PDF转换为Word的基本步骤&#xff1a; 首先&#xff0c;…

云顶之弈-测试报告

一. 项目背景 个人博客系统采用前后端分离的方法来实现&#xff0c;同时使用了数据库来存储相关的数据&#xff0c;同时将其部署到云服务器上。前端主要有四个页面构成&#xff1a;登录页、列表页、详情页以及编辑页&#xff0c;以上模拟实现了最简单的个人博客系统。其结合后…

Sqoop学习详细介绍!!

一、Sqoop介绍 Sqoop是一款开源的工具&#xff0c;主要用于在Hadoop(HDFS/Hive/HBase)与传统的数据库(mysql、postgresql...)间进行数据的传递&#xff0c;可以将一个关系型数据库&#xff08;例如 &#xff1a; MySQL ,Oracle ,Postgres等&#xff09;中的数据导进到Hadoop的H…

虚拟声卡实现音频回环

虚拟声卡实现音频回环 一、电脑扬声器播放声音路由到麦克风1. Voicemeeters安装设置2. 音频设备选择 二、回声模拟 一、电脑扬声器播放声音路由到麦克风 1. Voicemeeters安装设置 2. 音频设备选择 以腾讯会议为例 二、回声模拟 选中物理输入设备“Stereo Input 1”和物理输出设…

GlusterFS企业分布式存储

GlusterFS 分布式文件系统代表-nfs常见分布式存储Gluster存储基础梳理GlusterFS 适合大文件还是小文件存储&#xff1f; 应用场景术语Trusted Storage PoolBrickVolumes Glusterfs整体工作流程-数据访问流程GlusterFS客户端访问流程 GlusterFS常用命令部署 GlusterFS 群集准备环…

修改版的VectorDBBench更好用

原版本VectorDBBench的几个问题 在这里就不介绍VectorDBBench是干什么的了&#xff0c;上官网即可。 1.并发数设置的太少 2.测试时长30秒太长 3.连接milvus无用户和密码框&#xff0c;这个是最大的问题 4.修改了一下其它参数 由于很多网友发私信问一些milvus的相关技术问…

【机器学习】支持向量机(个人笔记)

文章目录 SVM 分类器的误差函数分类误差函数距离误差函数C 参数 非线性边界的 SVM 分类器&#xff08;内核方法&#xff09;多项式内核径向基函数&#xff08;RBF&#xff09;内核 源代码文件请点击此处&#xff01; SVM 分类器的误差函数 SVM 使用两条平行线&#xff0c;使用…

基于flask的网站如何使用https加密通信

文章目录 内容简介网站目录示例生成SSL证书单独使用Flask使用WSGI服务器Nginx反向代理参考资料 内容简介 HTTPS 是一种至关重要的网络安全协议&#xff0c;它通过在 HTTP 协议之上添加 SSL/TLS 层来确保数据传输的安全性和完整性。这有助于防止数据在客户端和服务器之间传输时…