19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4

26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的


文章目录

  • Flink 系列文章
    • 7、sql clinet中应用自定义函数
      • 1)、实现自定义函数
      • 2)、打包并上传jar至flink的lib目录下
      • 3)、验证
        • 1、创建表
        • 2、初始化表数据
        • 3、注册函数
        • 4、验证自定义函数
    • 8、pojo 数据类型应用示例-表值函数


本文展示了自定义函数在Flink sql client的应用以及自定义函数中使用pojo的示例。
本文依赖flink、kafka集群能正常使用。
本文分为2个部分,即自定义函数在Flink sql client中的应用以及自定义函数中使用pojo数据类型。
本文的示例如无特殊说明则是在Flink 1.17版本中运行。

7、sql clinet中应用自定义函数

本示例将上文中自定义的函数打包后在flink sql client中进行应用。

1)、实现自定义函数

本文的所有示例需要依赖的maven见本篇的上一篇:17、Flink 之Table API: Table API 支持的操作(1)
或者引入

    <!-- flink依赖引入--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
  • 示例代码
package org.table_sql;import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;/*** @author alanchan**/@FunctionHint(output = @DataTypeHint("ROW<id int, name String, age int, balance int, rowtime string>"))
public class Alan_SplitFunction extends TableFunction<Row> {public void eval(String str) {String[] row = str.split(",");collect(Row.of(Integer.valueOf(row[0]), row[1], Integer.valueOf(row[2]), Integer.valueOf(row[3]), row[4]));}}

2)、打包并上传jar至flink的lib目录下

将该文件打包成jar文件,特别说明的是,注意flink运行环境与打包引入的jar文件是否冲突,推荐做法是只打包创建自定义函数所依赖的jar文件,其他jar使用flink部署环境的jar。
本示例打包后的文件名:Alan_SplitFunction.jar
上传jar文件后,并重启flink集群。

3)、验证

1、创建表
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.Flink SQL> CREATE TABLE alan_split_table (
>   userString STRING
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'alan_split',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.Flink SQL> select * from alan_split_table;
[INFO] Result retrieval cancelled.
2、初始化表数据

本示例是通过kafka队列插入的数据,前提是kafka环境好用。

[alanchan@server1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic alan_split
>"11,alan,18,20,1699341167461"
>"12,alan,19,25,1699341168464"
>"13,alan,20,30,1699341169472"
>"14,alanchan,18,22,1699341170479"
>"15,alanchan,19,25,1699341171482"Flink SQL> select * from alan_split_table;
+----+--------------------------------+
| op |                     userString |
+----+--------------------------------+
| +I |    11,alan,18,20,1699341167461 |
| +I |    12,alan,19,25,1699341168464 |
| +I |    13,alan,20,30,1699341169472 |
| +I | 14,alanchan,18,22,169934117... |
| +I | 15,alanchan,19,25,169934117... |
3、注册函数

将自定义的函数注册为flink的临时函数,临时函数只在当前的会话中起作用,如果注册成其他函数,参考如下语法

CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION[IF NOT EXISTS] [[catalog_name.]db_name.]function_nameAS identifier [LANGUAGE JAVA|SCALA|PYTHON][USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]# TEMPORARY
# 创建一个有 catalog 和数据库命名空间的临时 catalog function ,并覆盖原有的 catalog function 。# TEMPORARY SYSTEM
# 创建一个没有数据库命名空间的临时系统 catalog function ,并覆盖系统内置的函数。

本示例注册为临时函数,如下

Flink SQL> CREATE TEMPORARY FUNCTION alan_split AS 'org.table_sql.Alan_SplitFunction';
[INFO] Execute statement succeed.Flink SQL> show functions;
+-----------------------+
|         function name |
+-----------------------+
|                IFNULL |
|      SOURCE_WATERMARK |
|                TYPEOF |
|                   abs |
|                  acos |
|            alan_split |
|                   and |
|                 array |
。。。。。。
4、验证自定义函数
Flink SQL> SELECT userString, t_id, t_name,t_age,t_balance,t_rowtime 
> FROM alan_split_table 
> LEFT JOIN LATERAL TABLE(alan_split(userString)) AS T(t_id, t_name,t_age,t_balance,t_rowtime) ON TRUE;
+----+--------------------------------+-------------+--------------------------------+-------------+-------------+--------------------------------+
| op |                     userString |        t_id |                         t_name |       t_age |   t_balance |                      t_rowtime |
+----+--------------------------------+-------------+--------------------------------+-------------+-------------+--------------------------------+
| +I |    11,alan,18,20,1699341167461 |          11 |                           alan |          18 |          20 |                  1699341167461 |
| +I |    12,alan,19,25,1699341168464 |          12 |                           alan |          19 |          25 |                  1699341168464 |
| +I |    13,alan,20,30,1699341169472 |          13 |                           alan |          20 |          30 |                  1699341169472 |
| +I | 14,alanchan,18,22,169934117... |          14 |                       alanchan |          18 |          22 |                  1699341170479 |
| +I | 15,alanchan,19,25,169934117... |          15 |                       alanchan |          19 |          25 |                  1699341171482 |

至此,完成了自定义函数注册至flink sql client的验证。

8、pojo 数据类型应用示例-表值函数

功能参考 19、Flink 的Table API 和 SQL 中的自定义函数及示例(2) 中的【4、表值函数-自定义函数说明及示例】
本示例仅仅是展示在自定义函数中使用pojo 对象。

本示例仅仅是一种实现方式,也可以覆盖getTypeInference并以编程方式提供所有组件,不再赘述。

本示例仅仅是以表值函数作为示例,其他的自定义函数类似。

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
public class TestUDTableFunctionDemo2 {@Data@NoArgsConstructor@AllArgsConstructorpublic static class User {private int id;private String name;private int age;private int balance;private String rowtime;}//	@FunctionHint(output = @DataTypeHint("User<id int, name String, age int, balance int, rowtime string>"))
//	public static class OverloadedFunction extends TableFunction<Row> {@FunctionHint(output =@DataTypeHint(bridgedTo = User.class))public static class OverloadedFunction extends TableFunction<User> {public void eval(String str) {String[] user = str.split(",");// 使用 Row数据类型
//			collect(Row.of(Integer.valueOf(user[0]), user[1], Integer.valueOf(user[2]), Integer.valueOf(user[3]), user[4]));// 使用User pojo数据类型collect(new User(Integer.valueOf(user[0]), user[1], Integer.valueOf(user[2]), Integer.valueOf(user[3]), user[4]));}}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);DataStream<String> row = env.fromCollection(//id name age balance rowtimeArrays.asList("11,alan,18,20,1699341167461","12,alan,19,25,1699341168464","13,alan,20,30,1699341169472","14,alanchan,18,22,1699341170479","15,alanchan,19,25,1699341171482"));Table usersTable2 = tenv.fromDataStream(row, $("userString"));tenv.createTemporarySystemFunction("OverloadedFunction", OverloadedFunction.class);Table result5 = usersTable2.leftOuterJoinLateral(call("OverloadedFunction", $("userString")).as("t_id","t_name","t_age","t_balance","t_rowtime")).select($("t_id"),$("t_name"),$("t_age"),$("t_balance"),$("t_rowtime")
//				.select($("userString"),$("t_id"),$("t_name"),$("t_age"),$("t_balance"),$("t_rowtime"))	;	DataStream<Tuple2<Boolean, Row>> result5DS = tenv.toRetractStream(result5, Row.class);result5DS.print();
//		15> (true,+I[15, alanchan, 19, 25, 1699341171482])
//		12> (true,+I[12, alan, 19, 25, 1699341168464])
//		13> (true,+I[13, alan, 20, 30, 1699341169472])
//		11> (true,+I[11, alan, 18, 20, 1699341167461])
//		14> (true,+I[14, alanchan, 18, 22, 1699341170479])env.execute();}}

以上,展示了自定义函数在Flink sql client的应用以及自定义函数中使用pojo的示例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/138210.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习 python opencv 火焰检测识别 计算机竞赛

文章目录 0 前言1 基于YOLO的火焰检测与识别2 课题背景3 卷积神经网络3.1 卷积层3.2 池化层3.3 激活函数&#xff1a;3.4 全连接层3.5 使用tensorflow中keras模块实现卷积神经网络 4 YOLOV54.1 网络架构图4.2 输入端4.3 基准网络4.4 Neck网络4.5 Head输出层 5 数据集准备5.1 数…

Python调用pymysql组件MySQL存储过程无法得到返回参数

Python调用MySQL存储过程的返回参数 1. 问题描述 MySQL的存储过程如下&#xff0c;入口参数两个&#xff0c;出口参数1个&#xff1a; DELIMITER $$ create procedure pro_test_args(in a int ,in b int ,out t_num int) beginset t_num a * b;END $$通过Python调用mysql的…

python OrderedDict类(有序字典)

嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! python更多源码/资料/解答/教程等 点击此处跳转文末名片免费获取 创建有序字典 import collectionsdic collections.OrderedDict() dic[k1] v1 dic[k2] v2 dic[k3] v3 print(dic)#输出&#xff1a;OrderedDict([(k1, v1), (…

C#中.NET 7.0不再支持ADO.NET,.NET Framwork依旧支持

目录 一、.NET 7.0框架下任何应用不再支持ADO.NET 二、.NET Framwork框架下Windows窗体应用支持ADO.NET 三、.NET 7.0不支持ADO.NET的真正原因 经过一阵折腾&#xff0c;终于可以确证C#中.NET框架不再支持用户通过ADO.NET的实体框架模型访问数据库&#xff0c;无论是.NET 7…

Rc与Arc实现1vN所有权机制

Rc与Arc实现1vN所有权机制 观察引用计数的变化一个例子多线程无力的Rc< T >Arc Rust所有权机制要求一个值只能有一个所有者&#xff0c;在大多数情况下&#xff0c;都没有问题&#xff0c;但是考虑以下情况&#xff1a; 在图数据结构中&#xff0c;多个边可能会拥有同一个…

C# .NET Core API Controller以及辅助专案

准备工作 Windows 10Visual Studio 2019(2017就有可以集中发布到publish目录的功能了吧)C#将方法封装(据说可以提高效率,就像是我们用的dll那种感觉新增专案作为我们API的辅助专案(作用类似dll&#xff0c;此处&#xff0c;你也可以在你自己的API专案里建文件夹&#xff0c;但…

MySQL---存储过程

存储过程的相关概念 是一组为了完成特定功能的sql语句的集合&#xff0c;类似于函数 写好了一个存储过程之后&#xff0c;我们可以像函数一样随时调用sql的集合。 复杂的&#xff0c;需要很多sql语句联合执行完成的任务 存储过程再执行上比sql语句的执行速度更快&#xff0c…

js 求数组中的对象某个属性和

可以直接看下效果 代码&#xff1a; <script>let list [{num: 1,price: 10,},{num: 2,price: 10,},{num: 3,price: 10,},{num: 4,price: 10,},]// for循环 求总数和 num的和let num 0for (let i 0; i < list.length; i) {num list[i].num}console.log(第一种&am…

学习c#的第三天

目录 C# 数据类型 值类型&#xff08;Value types&#xff09; 引用类型&#xff08;Reference types&#xff09; 指针类型&#xff08;Pointer types&#xff09; C# 类型转换 隐式类型转换 显式类型转换 类型之间的转换 - Convert 和 Parse Convert.ToInt32() 与 i…

AirTag追踪汽车

美国华盛顿特区&#xff0c;11月4日&#xff0c;在一项全新的抗击车辆盗窃的措施中&#xff0c;市长穆里尔•鲍泽签署了一项新计划&#xff0c;将向该市车辆盗窃频率较高的社区居民免费提供苹果AirTag追踪器。 AirTag是苹果公司推出的一款蓝牙跟踪设备&#xff0c;它依靠Findm…

从单体到微服务:使用Spring Boot构建事件驱动的Java应用程序

Spring Boot是Pivotal团队设计的一种微服务框架&#xff0c; 基于Spring开发&#xff0c;用于简化新Spring应用的初始搭建及开发过程&#xff0c;提升Spring 开发者的体验。它秉持“约定大于配置”的思想&#xff0c;集成了大量开箱即用的第三方库&#xff0c;支持绝大多数开源…

asp.net学院网上报销系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio

一、源码特点 asp.net学院网上报销系统是一套完善的web设计管理系统&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为vs2010&#xff0c;数据库为sqlserver2008&#xff0c;使用c#语言 开发 asp.net学院网上报销系统 应用技术…

基于飞浆resnet50的102分类

目录 1.数据预处理 2.数据导入 3.模型导入 4.批训练 5. 输出结果 6.结果参考 1.数据预处理 Ttransforms.Compose([transforms.Resize((250,250)),transforms.RandomCrop(size224),transforms.RandomHorizontalFlip(0.5),transforms.RandomRotation(degrees15),transforms.Color…

navicat导入.sql文件出现:[ERR] 1067 - Invalid default value for ‘create_date‘

比较老的系统生成的数据库导入5.7时报错[Err] 1067 - Invalid default value for create_time 错误分析 表中的第一个TIMESTAMP列&#xff08;如果未声明为NULL或显示DEFAULT或ON UPDATE子句&#xff09;将自动分配DEFAULT CURRENT_TIMESTAMP和ON UPDATE CURRENT_TIMESTAMP属…

ElasticSearch文档分析

ElasticSearch文档分析 包含下面的过程&#xff1a; 将一块文本分成适合于倒排索引的独立的 词条将这些词条统一化为标准格式以提高它们的“可搜索性”&#xff0c;或者 recall 分析器执行上面的工作。分析器实际上是将三个功能封装到了一个包里&#xff1a; 字符过滤器 首先&a…

【性能测试】非GUI模式Jemter压测+TPS性能拐点详细,一篇带你打通...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 非GUI模式执行Jem…

RK3568驱动指南|第七篇-设备树-第64章 device_node转换成platform_device实验

瑞芯微RK3568芯片是一款定位中高端的通用型SOC&#xff0c;采用22nm制程工艺&#xff0c;搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器。RK3568 支持4K 解码和 1080P 编码&#xff0c;支持SATA/PCIE/USB3.0 外围接口。RK3568内置独立NPU&#xff0c;可用于轻量级人工…

SOLIDWORKS软件提供了哪些特征造型方法?硕迪科技

SOLIDWORKS作为一款三维设计软件&#xff0c;为用户提供了多种特征造型方法&#xff0c;以下是其中几种常用的&#xff1a; 实体建模特征&#xff1a;SOLIDWORKS使用实体建模技术来创建和编辑三维几何体。通过使用基本几何体&#xff08;如立方体、圆柱体、圆锥体等&#xff09…

Spring-Security前后端分离权限认证

前后端分离 一般来说&#xff0c;我们用SpringSecurity默认的话是前后端整在一起的&#xff0c;比如thymeleaf或者Freemarker&#xff0c;SpringSecurity还自带login登录页,还让你配置登出页,错误页。 但是现在前后端分离才是正道&#xff0c;前后端分离的话&#xff0c;那就…

@ConfigurationProperties使用

一直有个疑问,在使用ConfigurationProperties注解作用一个配置类时,如果该配置类继承了一个父类,那么父类的那些配置字段是否可以读取配置信息。 答案是可以的&#xff0c;前提是父类对应字段的set方法是public。 BaseProperties.java Getter Setter public class BasePropert…