Flink SQL自定义表值函数(Table Function)

使用场景: 表值函数即 UDTF,⽤于进⼀条数据,出多条数据的场景。

开发流程:

  • 实现 org.apache.flink.table.functions.TableFunction 接⼝
  • 实现⼀个或者多个⾃定义的 eval 函数,名称必须叫做 eval,eval ⽅法签名必须是 public 的
  • eval ⽅法的⼊参是直接体现在 eval 函数签名中,出参是体现在 TableFunction 类的泛型参数 T 中

注意:

eval 是没有返回值的,和标量函数不同,Flink TableFunction 接⼝提供了 collect(T) 来发送输出的数据,如果体现在函数签名上,就成了标量函数,使⽤ collect(T) 能体现出 进⼀条数据 出多条数据。

在 SQL 中是⽤ SQL 中的 LATERAL TABLE() 配合 JOIN 、 LEFT JOIN xxx ON TRUE 使⽤。

开发案例:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;/*** 输入数据:* nc -lk 8888* a,bb,cc* * 输出结果:* * res1=>:5> +I[a,bb,cc, a, 1]* res1=>:7> +I[a,bb,cc, cc, 2]* res1=>:6> +I[a,bb,cc, bb, 2]* res8=>:4> +I[a,bb,cc, a, 1]* res8=>:5> +I[a,bb,cc, bb, 2]* res8=>:6> +I[a,bb,cc, cc, 2]* res4=>:3> +I[a,bb,cc, cc, 2]* res4=>:1> +I[a,bb,cc, a, 1]* res4=>:2> +I[a,bb,cc, bb, 2]* res7=>:8> +I[a,bb,cc, bb, 2]* res7=>:1> +I[a,bb,cc, cc, 2]* res7=>:7> +I[a,bb,cc, a, 1]* res2=>:2> +I[a,bb,cc, cc, 2]* res2=>:8> +I[a,bb,cc, a, 1]* res2=>:1> +I[a,bb,cc, bb, 2]* res6=>:1> +I[a,bb,cc, cc, 2]* res6=>:7> +I[a,bb,cc, a, 1]* res6=>:8> +I[a,bb,cc, bb, 2]* res3=>:6> +I[a,bb,cc, bb, 2]* res3=>:7> +I[a,bb,cc, cc, 2]* res3=>:5> +I[a,bb,cc, a, 1]* res5=>:7> +I[a,bb,cc, bb, 2]* res5=>:8> +I[a,bb,cc, cc, 2]* res5=>:6> +I[a,bb,cc, a, 1]*/
public class TableFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);Table table = tEnv.fromDataStream(source, "field");tEnv.createTemporaryView("SourceTable", table);// 在 Table API ⾥可以直接调⽤ UDFTable res1 = tEnv.from("SourceTable").joinLateral(call(SplitFunction.class, $("field"))).select($("field"), $("word"), $("length"));Table res2 = tEnv.from("SourceTable").leftOuterJoinLateral(call(SplitFunction.class, $("field"))).select($("field"), $("word"), $("length"));// 在 Table API ⾥重命名 UDF 的结果字段Table res3 = tEnv.from("SourceTable").leftOuterJoinLateral(call(SplitFunction.class, $("field"))).as("myField", "newWord", "newLength").select($("myField"), $("newWord"), $("newLength"));// 注册函数tEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);// 在 Table API ⾥调⽤注册好的 UDFTable res4 = tEnv.from("SourceTable").joinLateral(call("SplitFunction", $("field"))).select($("field"), $("word"), $("length"));Table res5 = tEnv.from("SourceTable").leftOuterJoinLateral(call("SplitFunction", $("field"))).select($("field"), $("word"), $("length"));// 在 SQL ⾥调⽤注册好的 UDFTable res6 = tEnv.sqlQuery("SELECT field, word, length " +"FROM SourceTable, LATERAL TABLE(SplitFunction(field))");Table res7 = tEnv.sqlQuery("SELECT field, word, length " +"FROM SourceTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(field)) ON TRUE");// 在 SQL ⾥重命名 UDF 字段Table res8 = tEnv.sqlQuery("SELECT field, newWord, newLength " +"FROM SourceTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(field)) AS T(newWord, newLength) ON TRUE");tEnv.toDataStream(res1).print("res1=>");tEnv.toDataStream(res2).print("res2=>");tEnv.toDataStream(res3).print("res3=>");tEnv.toDataStream(res4).print("res4=>");tEnv.toDataStream(res5).print("res5=>");tEnv.toDataStream(res6).print("res6=>");tEnv.toDataStream(res7).print("res7=>");tEnv.toDataStream(res8).print("res8=>");env.execute();}@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))public static class SplitFunction extends TableFunction<Row> {public void eval(String str) {for (String s : str.split(",")) {// 输出结果collect(Row.of(s, s.length()));}}}
}

注意: 如果使⽤ Scala 实现函数,不要使⽤ Scala 中 object 实现 UDF,Scala object 是单例的,可能会导致并发问题。

测试结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/139753.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TCP发送窗口、接收窗口以及其工作原理

1*KvfIrP_Iwq40uVdRZYGnQg.png 上面的图表是从发送方的角度拍摄的快照。我们可以将数据分为4组&#xff1a; 1.已发送并已确认的字节&#xff08;蓝色&#xff09;2.已发送但尚未确认的字节&#xff08;黄色&#xff09;3.未发送但接收方准备好接收的字节&#xff08;绿色&…

2.如何实现API统一响应-web组件篇

文章目录 1. 统一响应1.1 CommonResult 1. 统一响应 前端调用api接口获得统一的响应&#xff1a; 成功&#xff0c;返回成功的状态码和数据&#xff1b;失败&#xff0c;返回失败的状态码和错误提示。 在标准的 RESTful API 的定义&#xff0c;是推荐使用 HTTP 响应状态码 (…

KCC@广州与 TiDB 社区联手—广州开源盛宴

10月21日&#xff0c;KCC广州与 TiDB 社区联手&#xff0c;在海珠区保利中悦广场 29 楼召开了一次难忘的开源盛宴。这不仅仅是 KCC广州的又一次线下见面&#xff0c;更代表着与 TiDB 社区及广州技术社区的首次深度合作。 活动的策划与组织由 KCC广州负责人 - 惠世冀、PingCAP 的…

归并排序 图解 递归 + 非递归 + 笔记

前置知识&#xff1a;讲解019-算法笔试中处理输入和输出&#xff0c;讲解020-递归和master公式 (1)左部分排好序&#xff0c;右部分排好序&#xff0c;利用merge过程让左右整体有序(2)merge过程:谁小拷贝谁&#xff0c;直到左右两部分所有的数字耗尽(3)递归实现和非递归实现(4…

浙大恩特客户资源管理系统 fileupload.jsp 任意文件上传

一、漏洞描述 杭州恩软信息技术有限公司&#xff08;浙大恩特&#xff09;提供外贸管理软件、外贸客户管理软件等外贸软件&#xff0c;是一家专注于外贸客户资源管理及订单管理产品及服务的综合性公司。 浙大恩特客户资源管理系统中的fileupload.jsp接口存在安全漏洞&#xf…

vue3 开启 https

1、安装mkcert证书创建器 npm i mkcert -g 2、检验是否安装成功 mkcert --version 有版本好出现则成功 3、创建证书颁发机构 mkcert create-ca 会在当前目录生成&#xff0c;ca.crt 和 ca.key 两个文件 4、创建证书 mkcert create-cert 会在当前目录生成&#xff0c;…

如何使用 NFTScan NFT API 在 zkSync 网络上开发 Web3 应用

zkSync 是由 Matter Labs 创建的&#xff0c;是一个以用户为中心的 zk rollup 平台&#xff0c;它是以太坊的第 2 层扩展解决方案&#xff0c;使用 zk-rollups 作为扩展技术&#xff0c;与 optimistic rollups 一样&#xff0c;zk-rollups 将会汇总以太坊主网上的交易并将交易证…

刷题学习记录BUUCTF

[极客大挑战 2019]RCE ME1 进入环境直接就有代码 <?php error_reporting(0); if(isset($_GET[code])){$code$_GET[code];if(strlen($code)>40){die("This is too Long.");}if(preg_match("/[A-Za-z0-9]/",$code)){die("NO.");}eval($co…

JSP运行环境搭建

将安装JSP引擎的计算机称作一个支持JSP的Web服务器。这个服务器负责运行JSP&#xff0c;并将运行结果返回给用户。 JSP的核心内容之一就是编写JSP页面,JSP页面是Web应用程序的重要组成部分之一。一个简单Web应用程序可能只有一个JSP页面,而一个复杂的Web应用程序可能由许多JSP…

Ubuntu18.04.6安装qt5.7.1(超级详细教程)

目录 1、下载对应Linux版本的qt 2、安装完qt&#xff0c;可能也要安装下对应的编译工具 1、下载对应Linux版本的qt &#xff08;1&#xff09;准备安装的是qt5.7.1&#xff1a;qt-opensource-linux-x64-5.7.1.run &#xff08;2&#xff09;在虚拟机进入存放qt安装包的目录…

C语言--求一个 3 X 3 的整型矩阵对角线元素之和

一.题目描述 求一个 3 X 3 的整型矩阵对角线元素之和 二.代码实现 #define _CRT_SECURE_NO_WARNINGS #include<stdio.h> int main() {int arr[3][3] { 0 };for (int i 0;i < 3;i){for (int j 0;j < 3;j){ printf("请输入数字&#xff1a;");scanf(&…

科技云报道:数智化升级,如何跨越数字世界与实体产业的鸿沟?

科技云报道原创。 数智化是当下商业环境下最大的确定性。 2022年&#xff0c;中国数字经济规模达50.2万亿元&#xff0c;占国内生产总值比重提升至41.5%&#xff0c;数字经济成为推动经济发展的重要引擎。从小型创业公司到跨国巨头&#xff0c;数字化转型在企业发展历程中彰显…

网康NS-ASG安全网关任意文件读取

此文件没有对身份进行校验即可下载任意文件 构造payload访问漏洞url&#xff1a; ​​/admin/cert_download.php?filegjxbstxdt.txt&certfile../../../../../../../../etc/passwd漏洞证明&#xff1a; 文笔生疏&#xff0c;措辞浅薄&#xff0c;望各位大佬不吝赐教&…

HDMI之编码篇

概述 HDMI 2.0b(含)以下版本,采用3个Channel方式输出。传输又分为3三种周期,视频数据,数据岛以及控制周期。视频传输采用8/10编码。数据岛采用4/10编码(TERC4)。控制周期采用2/10。编码都拓展成了10bits。 上图中,Pixel component(e.g.B)->D[7:0]表示视频数据周期…

【KingbaseES】R6 Liunx下使用命令行部署数据库集群

【KingbaseES】R6命令行部署数据库集群 A.数据库安装包下载软件下载页面授权下载页面 B.数据库集群部署软件安装第一步&#xff1a;创建Kingbase用户第二步&#xff1a;上传安装包1.创建Kingbase用户和准备安装目录2.使用FTP工具上传安装包镜像和授权文件到install目录下并授权…

【VS2019 Qt5 VTK9.2】临时解决配置相关问题的简单方法

配置报错 编译报错提示&#xff08;LNK2019或LNK2001&#xff09; 严重性 代码 说明 项目 文件 行 禁止显示状态 错误 LNK2019 无法解析的外部符号 “__declspec(dllimport) public: __cdecl QVTKOpenGLNativeWidget::QVTKOpenGLNativeWidget(class QWidget *,class QFlags)(_i…

pointnetgpd复现

参考&#xff1a; Installation Instructions — Dex-Net 0.2.0 documentation Install git clone https://github.com/lianghongzhuo/PointNetGPD.git 添加环境变量 gedit ~/.bashrc #添加下面这一行 export PointNetGPD_FOLDER$HOME/code/PointNetGPD #然后source source…

transfomer模型——简介,代码实现,重要模块解读,源码,官方

一、什么是transfomer Transformer是一种基于注意力机制&#xff08;attention mechanism&#xff09;的神经网络架构&#xff0c;最初由Vaswani等人在论文《Attention Is All You Need》中提出。它在自然语言处理&#xff08;NLP&#xff09;领域取得了巨大成功&#xff0c;特…

SQL Server 2022 安装步骤——SQL Server设置身份验证教程

目录 前言: 安装详细步骤: 第一步: 第二步: 第三步: 第四步: SQL Server 连接的方式: Window验证: SQL Server验证: 两者之间区别: 总结: SQL Server身份验证登录配置教程:​ 第一步: 第二步: 第三步: 番外篇: 前言: 本文讲解&#xff0c;如何安装SQL Server安…

如何判断一个角是否大于180度(2)

理论计算见上一篇&#xff1a; 如何判断一个角是否大于180度&#xff1f;_kv1830的博客-CSDN博客 此篇为代码实现 一。直接上代码&#xff1a; import cv2 as cv import numpy as np import mathdef get_vector(p_from, p_to):return p_to[0] - p_from[0], p_to[1] - p_from…