Flink SQL自定义表值函数(Table Function)

使用场景: 表值函数即 UDTF,⽤于进⼀条数据,出多条数据的场景。

开发流程:

  • 实现 org.apache.flink.table.functions.TableFunction 接⼝
  • 实现⼀个或者多个⾃定义的 eval 函数,名称必须叫做 eval,eval ⽅法签名必须是 public 的
  • eval ⽅法的⼊参是直接体现在 eval 函数签名中,出参是体现在 TableFunction 类的泛型参数 T 中

注意:

eval 是没有返回值的,和标量函数不同,Flink TableFunction 接⼝提供了 collect(T) 来发送输出的数据,如果体现在函数签名上,就成了标量函数,使⽤ collect(T) 能体现出 进⼀条数据 出多条数据。

在 SQL 中是⽤ SQL 中的 LATERAL TABLE() 配合 JOIN 、 LEFT JOIN xxx ON TRUE 使⽤。

开发案例:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;/*** 输入数据:* nc -lk 8888* a,bb,cc* * 输出结果:* * res1=>:5> +I[a,bb,cc, a, 1]* res1=>:7> +I[a,bb,cc, cc, 2]* res1=>:6> +I[a,bb,cc, bb, 2]* res8=>:4> +I[a,bb,cc, a, 1]* res8=>:5> +I[a,bb,cc, bb, 2]* res8=>:6> +I[a,bb,cc, cc, 2]* res4=>:3> +I[a,bb,cc, cc, 2]* res4=>:1> +I[a,bb,cc, a, 1]* res4=>:2> +I[a,bb,cc, bb, 2]* res7=>:8> +I[a,bb,cc, bb, 2]* res7=>:1> +I[a,bb,cc, cc, 2]* res7=>:7> +I[a,bb,cc, a, 1]* res2=>:2> +I[a,bb,cc, cc, 2]* res2=>:8> +I[a,bb,cc, a, 1]* res2=>:1> +I[a,bb,cc, bb, 2]* res6=>:1> +I[a,bb,cc, cc, 2]* res6=>:7> +I[a,bb,cc, a, 1]* res6=>:8> +I[a,bb,cc, bb, 2]* res3=>:6> +I[a,bb,cc, bb, 2]* res3=>:7> +I[a,bb,cc, cc, 2]* res3=>:5> +I[a,bb,cc, a, 1]* res5=>:7> +I[a,bb,cc, bb, 2]* res5=>:8> +I[a,bb,cc, cc, 2]* res5=>:6> +I[a,bb,cc, a, 1]*/
public class TableFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);Table table = tEnv.fromDataStream(source, "field");tEnv.createTemporaryView("SourceTable", table);// 在 Table API ⾥可以直接调⽤ UDFTable res1 = tEnv.from("SourceTable").joinLateral(call(SplitFunction.class, $("field"))).select($("field"), $("word"), $("length"));Table res2 = tEnv.from("SourceTable").leftOuterJoinLateral(call(SplitFunction.class, $("field"))).select($("field"), $("word"), $("length"));// 在 Table API ⾥重命名 UDF 的结果字段Table res3 = tEnv.from("SourceTable").leftOuterJoinLateral(call(SplitFunction.class, $("field"))).as("myField", "newWord", "newLength").select($("myField"), $("newWord"), $("newLength"));// 注册函数tEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);// 在 Table API ⾥调⽤注册好的 UDFTable res4 = tEnv.from("SourceTable").joinLateral(call("SplitFunction", $("field"))).select($("field"), $("word"), $("length"));Table res5 = tEnv.from("SourceTable").leftOuterJoinLateral(call("SplitFunction", $("field"))).select($("field"), $("word"), $("length"));// 在 SQL ⾥调⽤注册好的 UDFTable res6 = tEnv.sqlQuery("SELECT field, word, length " +"FROM SourceTable, LATERAL TABLE(SplitFunction(field))");Table res7 = tEnv.sqlQuery("SELECT field, word, length " +"FROM SourceTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(field)) ON TRUE");// 在 SQL ⾥重命名 UDF 字段Table res8 = tEnv.sqlQuery("SELECT field, newWord, newLength " +"FROM SourceTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(field)) AS T(newWord, newLength) ON TRUE");tEnv.toDataStream(res1).print("res1=>");tEnv.toDataStream(res2).print("res2=>");tEnv.toDataStream(res3).print("res3=>");tEnv.toDataStream(res4).print("res4=>");tEnv.toDataStream(res5).print("res5=>");tEnv.toDataStream(res6).print("res6=>");tEnv.toDataStream(res7).print("res7=>");tEnv.toDataStream(res8).print("res8=>");env.execute();}@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))public static class SplitFunction extends TableFunction<Row> {public void eval(String str) {for (String s : str.split(",")) {// 输出结果collect(Row.of(s, s.length()));}}}
}

注意: 如果使⽤ Scala 实现函数,不要使⽤ Scala 中 object 实现 UDF,Scala object 是单例的,可能会导致并发问题。

测试结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/139753.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TCP发送窗口、接收窗口以及其工作原理

1*KvfIrP_Iwq40uVdRZYGnQg.png 上面的图表是从发送方的角度拍摄的快照。我们可以将数据分为4组&#xff1a; 1.已发送并已确认的字节&#xff08;蓝色&#xff09;2.已发送但尚未确认的字节&#xff08;黄色&#xff09;3.未发送但接收方准备好接收的字节&#xff08;绿色&…

2.如何实现API统一响应-web组件篇

文章目录 1. 统一响应1.1 CommonResult 1. 统一响应 前端调用api接口获得统一的响应&#xff1a; 成功&#xff0c;返回成功的状态码和数据&#xff1b;失败&#xff0c;返回失败的状态码和错误提示。 在标准的 RESTful API 的定义&#xff0c;是推荐使用 HTTP 响应状态码 (…

QT QSplashScreen

多数大型应用程序启动时都会在程序完全启动前显示一个启动画面&#xff0c;在程序完全启动后消失。程序启动画面可以显示相关产品的一些信息&#xff0c;使用户在等待程序启动的同时了解相关产品的功能&#xff0c;这也是一个宣传的方式。Qt中提供的QSplashScreen 类实现了在程…

KCC@广州与 TiDB 社区联手—广州开源盛宴

10月21日&#xff0c;KCC广州与 TiDB 社区联手&#xff0c;在海珠区保利中悦广场 29 楼召开了一次难忘的开源盛宴。这不仅仅是 KCC广州的又一次线下见面&#xff0c;更代表着与 TiDB 社区及广州技术社区的首次深度合作。 活动的策划与组织由 KCC广州负责人 - 惠世冀、PingCAP 的…

归并排序 图解 递归 + 非递归 + 笔记

前置知识&#xff1a;讲解019-算法笔试中处理输入和输出&#xff0c;讲解020-递归和master公式 (1)左部分排好序&#xff0c;右部分排好序&#xff0c;利用merge过程让左右整体有序(2)merge过程:谁小拷贝谁&#xff0c;直到左右两部分所有的数字耗尽(3)递归实现和非递归实现(4…

浙大恩特客户资源管理系统 fileupload.jsp 任意文件上传

一、漏洞描述 杭州恩软信息技术有限公司&#xff08;浙大恩特&#xff09;提供外贸管理软件、外贸客户管理软件等外贸软件&#xff0c;是一家专注于外贸客户资源管理及订单管理产品及服务的综合性公司。 浙大恩特客户资源管理系统中的fileupload.jsp接口存在安全漏洞&#xf…

DDD贫血模型、充血模型

贫血模型 贫血模型是一种软件开发中的设计模式&#xff0c;它指的是将数据和业务逻辑分离的一种设计模式&#xff0c;其中数据和领域模型是独立于业务逻辑的。在贫血模型中&#xff0c;数据由数据对象存储&#xff0c;而业务逻辑由服务对象处理。这种设计模式的优点是使代码更…

Android修行手册 - 万字梳理JNI开发正确技巧和错误缺陷

JNI 简介 JNI&#xff0c;Java Native Interface&#xff0c;是 native code 的编程接口。JNI 使 Java 代码程序可以与 native code 交互——在 Java 程序中调用 native code&#xff1b;在 native code 中嵌入 Java 虚拟机调用 Java 的代码。 它支持将 Java 代码与使用其他…

二十三种设计模式全面解析-解密迭代器模式:探索遍历之道

在软件开发中&#xff0c;遍历数据集合是一个非常常见的需求。但是&#xff0c;如何以一种优雅、灵活的方式遍历集合&#xff0c;并且能够适应各种不同的数据结构和迭代方式&#xff0c;一直是开发者们面临的挑战。今天&#xff0c;我将带你深入探索迭代器模式&#xff08;Iter…

vue3 开启 https

1、安装mkcert证书创建器 npm i mkcert -g 2、检验是否安装成功 mkcert --version 有版本好出现则成功 3、创建证书颁发机构 mkcert create-ca 会在当前目录生成&#xff0c;ca.crt 和 ca.key 两个文件 4、创建证书 mkcert create-cert 会在当前目录生成&#xff0c;…

如何使用 NFTScan NFT API 在 zkSync 网络上开发 Web3 应用

zkSync 是由 Matter Labs 创建的&#xff0c;是一个以用户为中心的 zk rollup 平台&#xff0c;它是以太坊的第 2 层扩展解决方案&#xff0c;使用 zk-rollups 作为扩展技术&#xff0c;与 optimistic rollups 一样&#xff0c;zk-rollups 将会汇总以太坊主网上的交易并将交易证…

力扣117双周赛

第 117 场双周赛 给小朋友们分糖果 I 同T2 给小朋友们分糖果 II 数学 class Solution { public:long long distributeCandies(int n, int limit) {long long ans 0;for (int i 0; i < min(n, limit); i) {if (n - i < limit) {ans n - i 1;} else if (n - i <…

刷题学习记录BUUCTF

[极客大挑战 2019]RCE ME1 进入环境直接就有代码 <?php error_reporting(0); if(isset($_GET[code])){$code$_GET[code];if(strlen($code)>40){die("This is too Long.");}if(preg_match("/[A-Za-z0-9]/",$code)){die("NO.");}eval($co…

JSP运行环境搭建

将安装JSP引擎的计算机称作一个支持JSP的Web服务器。这个服务器负责运行JSP&#xff0c;并将运行结果返回给用户。 JSP的核心内容之一就是编写JSP页面,JSP页面是Web应用程序的重要组成部分之一。一个简单Web应用程序可能只有一个JSP页面,而一个复杂的Web应用程序可能由许多JSP…

Ubuntu18.04.6安装qt5.7.1(超级详细教程)

目录 1、下载对应Linux版本的qt 2、安装完qt&#xff0c;可能也要安装下对应的编译工具 1、下载对应Linux版本的qt &#xff08;1&#xff09;准备安装的是qt5.7.1&#xff1a;qt-opensource-linux-x64-5.7.1.run &#xff08;2&#xff09;在虚拟机进入存放qt安装包的目录…

100天精通风控建模(原理+Python实现)——第5天:风控建模中数据标准化是什么?

风控模型已在各大银行和公司都实际运用于业务,用于营销和风险控制等。    之前已经阐述了100天精通风控建模(原理+Python实现)——第1天:什么是风控建模?    100天精通风控建模(原理+Python实现)——第2天:风控建模有什么目的?    100天精通风控建模(原理+Python实现…

C语言--求一个 3 X 3 的整型矩阵对角线元素之和

一.题目描述 求一个 3 X 3 的整型矩阵对角线元素之和 二.代码实现 #define _CRT_SECURE_NO_WARNINGS #include<stdio.h> int main() {int arr[3][3] { 0 };for (int i 0;i < 3;i){for (int j 0;j < 3;j){ printf("请输入数字&#xff1a;");scanf(&…

Redis之与SSM集成Spring注解式缓存

目录 一.整合 1.1.整合应用 1.1.1.pom配置 1.1.2.所需配置 二.注解式开发及应用场景 2.1. Cacheable 2.2. CachePut 2.3. CacheEvict 2.4.总结 三.redis的击穿穿透雪崩 好啦今天就到这里了哦&#xff01;&#xff01;希望能帮到你哦&#xff01;&#xff01; 一.整合…

科技云报道:数智化升级,如何跨越数字世界与实体产业的鸿沟?

科技云报道原创。 数智化是当下商业环境下最大的确定性。 2022年&#xff0c;中国数字经济规模达50.2万亿元&#xff0c;占国内生产总值比重提升至41.5%&#xff0c;数字经济成为推动经济发展的重要引擎。从小型创业公司到跨国巨头&#xff0c;数字化转型在企业发展历程中彰显…

分治构造:P9384

https://www.luogu.com.cn/problem/P9384 分治构造是很常见的一种构造 不能有三元环和五元环&#xff0c;考虑推广出去&#xff0c;也就是不能有奇环 那如果我们让每种颜色都为二分图&#xff0c;那么必然满足 考虑 0-9 总共10个数字&#xff0c;数据范围1000&#xff0c;考…