0基础学习PyFlink——用户自定义函数之UDF

大纲

  • 标量函数
    • 入参并非表中一行(Row)
    • 入参是表中一行(Row)
    • alias

PyFlink中关于用户定义方法有:

  • UDF:用户自定义函数。
  • UDTF:用户自定义表值函数。
  • UDAF:用户自定义聚合函数。
  • UDTAF:用户自定义表值聚合函数。

这些字母可以拆解如下:

  • UD表示User Defined(用户自定义);
  • F表示Function(方法);
  • T表示Table(表);
  • A表示Aggregate(聚合);
    在这里插入图片描述
    Aggregate(聚合)函数是指:以多行数据为输入,计算出一个新的值的函数。这块我们会在后续的章节介绍,本文我们主要介绍非聚合类型的用户自定义方法的简单使用。

标量函数

即我们常见的UDF。

def udf(f: Union[Callable, ScalarFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None, func_type: str = "general",udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]:

我们主要关注result_type和input_types,它们分别用于确定函数的输入和输出。
input_types可以是List[DataType], DataType, str, List[str]之一任何一种,这个要视使用者决定。UDTF也是这种类型,它们没啥区别。
result_type只能是DataType或str;而UDTF可以是List[DataType], DataType, str, List[str]任意之一。这也是UDF和UDTF最大的区别。
我们以一个例子来介绍它的用法。这个例子会将大写字符转换成小写字符,然后统计字符出现的次数。
在介绍例子之前,我们先构造Execute之前的准备环境

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctionword_count_data = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "A", "G"]  def word_count():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source)# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector('print')\.schema(sink_schema) \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)

这段代码从读取数据word_count_data,并构造出tab_source作为输入数据暂存的表。下面我们看下入参不同时,UDF怎么写

入参并非表中一行(Row)

    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=[DataTypes.STRING()])def colFunc(oneCol):return Row(oneCol.lower())

input_types我们设置成[DataTypes.STRING()],即该数组中只有一个参数,也表示修饰的方法只有一个参数,类型是String。如果觉得input_types写起来麻烦,这个参数可以不设置。
result_type我们设置为一个DataTypes.ROW([DataTypes.FIELD(“lower_word”, DataTypes.STRING())])。我们可以把它看成是一个新表的结构描述,即一行只有一个字段——lower_word,它的类型也是String。

    tab_lower=tab_source.map(colFunc(col('word')))

map方法中,我们会给UDF修饰的方法传入原始表tab_source每行中的word字段的值。然后构造出一个新的表tab_lower。这个新的表没有word字段,只有UDF中result_type定义的lower_word。

def map(self, func: Union[Expression, UserDefinedScalarFunctionWrapper]) -> 'Table':

后续只要使用这个新表,新字段即可。

    tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctionword_count_data = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "A", "G"]  def word_count():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source )# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector('print')\.schema(sink_schema) \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)@udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=[DataTypes.STRING()])def colFunc(oneCol):return Row(oneCol.lower())tab_lower=tab_source.map(colFunc(col('word')))   tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()if __name__ == '__main__':word_count()

入参是表中一行(Row)

    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=row_type_tab_source)def rowFunc(row):return Row(row[0].lower())tab_lower=tab_source.map(rowFunc) tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

主要的区别是map方法直接传递udf修饰的方法,而不是直接其调用返回值。input_types是原始表的行结构——RowType,而不是一个参数数组。
map方法给rowFunc传递原始表tab_source的每行数据,然后构造出一个新表tab_lower。新表的字段也在udf的result_type中定义了,它是String类型的lower_word。后面我们对新表就要聚合统计这个新的字段,而不是老表中的字段。

alias

前面两个案例,在定义UDF时,我们严格设置了result_type和input_types。实际input_types可以不用设置,但是result_type必须设置。上面例子中,result_type我们都设置为RowType,即表行的结构。如果觉得这样写很麻烦,可以考虑使用alias来实现。

    @udf(result_type=DataTypes.STRING())def colFunc(oneCol):return oneCol.lower()tab_lower=tab_source.map(colFunc(col('word'))).alias('lower_word')tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()
    @udf(result_type=DataTypes.STRING())def rowFunc(row):return row[0].lower()tab_lower=tab_source.map(rowFunc).alias('lower_word')tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

这样我们在定义udf时,只是指定了返回类型是个字符串,也不知道它在新表中叫啥名字(实际叫f0)。但是为了便于后续使用,我们使用alias给它取了一个别名lower_word。这样就可以让其参与后续的计算了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/123952.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue2+ant-design-vue a-select组件二次封装(支持单选/多选添加全选/分页(多选跨页选中)/自定义label)

一、效果图 二、参数配置 1、代码示例 <t-antd-selectv-model"selectVlaue":optionSource"stepList"change"selectChange" />2、配置参数&#xff08;Attributes&#xff09;继承 a-select Attributes 参数说明类型默认值v-model绑定值…

vivado crash

将增量编译去了

FPGA时序分析与约束(9)——主时钟约束

一、时序约束 时序引擎能够正确分析4种时序路径的前提是&#xff0c;用户已经进行了正确的时序约束。时序约束本质上就是告知时序引擎一些进行时序分析所必要的信息&#xff0c;这些信息只能由用户主动告知&#xff0c;时序引擎对有些信息可以自动推断&#xff0c;但是推断得到…

Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅

1.引入RocketMQ依赖&#xff1a;首先&#xff0c;在pom.xml文件中添加RocketMQ的依赖&#xff1a; <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</versi…

文件改名,轻松添加前缀顺序编号,文件改名更高效!

您是否曾经需要批量修改文件名&#xff0c;并希望在文件名中添加特定的前缀或顺序编号&#xff1f;现在&#xff0c;我们为您带来了一款全新的文件改名工具&#xff0c;帮助您轻松解决这个问题&#xff01; 第一步&#xff0c;进入文件批量改名高手主页面&#xff0c;在板块栏…

C++学习笔记之四(标准库、标准模板库、vector类)

C 1、C标准库2、C标准模板库2.1、vector2.1.1、vector与array2.1.2、vector与函数对象2.1.3、vector与迭代器2.1.4、vector与算法 1、C标准库 C C C标准库指的是标准程序库( S t a n d a r d Standard Standard L i b a r a y Libaray Libaray)&#xff0c;它定义了十个大类…

亚马逊,速卖通,美客多如何打造爆款商品,排名提升榜首

1、产品Listing的完整性 Listing是亚马逊A9算法认识你产品的基础&#xff0c;在发布一条listing的时候&#xff0c;尽可能地做到最好!在准备一条listing之前&#xff0c;一定事先要收集、整理足够多的产品关键词&#xff0c;在优化listing内容的时候填充进去。仔细观察优秀竞品…

Realrek 2.5G交换机 8+1万兆光RTL8373-VB-CG方案简介

新一代2.5G交换机方案RTL8373-VB-CG可以提供4中不同形态 a. 52.5G 电口110G光》RTL8373 b. 52.5G 电口110G电》RTL83738261 c. 82.5G 电口110G光》RTL83738224 d.82.5G 电口110G电口》RTL837382248261 1.概述 Realtek RTL8373-CG是一款低功耗、高性能、高度集成的八端口2.5G和一…

C++设计模式_19_Memento 备忘录(理解,目前多使用序列化方案来实现)

Memento 备忘录模式也属于“状态变化”模式&#xff0c;它是一个小模式&#xff0c;在今天来看有些过时&#xff0c;当今已经很少使用当前模式实现需求&#xff0c;思想却不变&#xff08;信息隐藏&#xff09;&#xff0c;目前多使用序列化方案来实现。本系列所介绍的模式&…

小程序开发——小程序项目的配置与生命周期

1.app.json配置属性 app.json配置属性 2.页面配置 app的页面配置指的是pages属性&#xff0c; pages数组的第一个页面将默认作为小程序的启动页。利用开发工具新建页面时&#xff0c;则pages属性对应的数组将自动添加该页面的路径&#xff0c;若是在硬盘中添加文件的形式则不…

C++数据结构X篇_23_快速排序(最快、不稳定的排序)

文章参考十大经典排序算法-快速排序算法详解进行整理补充。快速排序是最快的排序方法。 排序思路&#xff1a;分治法-挖坑填数&#xff1a;大问题分解为各个小问题&#xff0c;对小问题求解&#xff0c;使得大问题得以解决 文章目录 1. 什么是快速排序1.1 概念1.2 算法原理1.3 …

Linux rm命令:删除文件或目录

当 Linux 系统使用很长时间之后&#xff0c;可能会有一些已经没用的文件&#xff08;即垃圾&#xff09;&#xff0c;这些文件不但会消耗宝贵的硬盘资源&#xff0c;还是降低系统的运行效率&#xff0c;因此需要及时地清理。 rm 是强大的删除命令&#xff0c;它可以永久性地删除…

一百九十七、Java——IDEA项目中把多层文件夹拆开显示

一、目的 由于IDEA项目中&#xff0c;默认的是把文件夹连在一起显示&#xff0c;于是为了方便需要把这些连在一起的文件夹拆开&#xff0c;分层显示 如文件夹cn.kgc 二、解决措施 解决方法很简单 &#xff08;一&#xff09;找到IDEA项目上的小齿轮 &#xff08;二&#xf…

基于深度学习的单图像人群计数研究:网络设计、损失函数和监控信号

摘要 https://arxiv.org/pdf/2012.15685v2.pdf 单图像人群计数是一个具有挑战性的计算机视觉问题,在公共安全、城市规划、交通管理等领域有着广泛的应用。近年来,随着深度学习技术的发展,人群计数引起了广泛的关注并取得了巨大的成功。通过系统地回顾和总结2015年以来基于深…

基于STM32的多功能智能密码锁控制设计

**单片机设计介绍&#xff0c;1653基于STM32的多功能智能密码锁控制设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序文档 六、 文章目录 一 概要 基于STM32的多功能智能密码锁控制设计是一种用STM32微控制器开发的系统&#xff0c;用于控制和管理密码…

手机桌面待办事项APP推荐,手机上可使用哪些待办事项APP

生活中&#xff0c;无论你是一名专业人士&#xff0c;学生&#xff0c;还是家庭主妇&#xff0c;总有各种各样的任务等待着你&#xff0c;有时候需要额外的工具来提醒和管理这些待办事项。手机上的待办事项APP软件成为了这个任务的好帮手&#xff0c;为我们提供了快速、方便的方…

spring boot配置ssl(多cer格式)保姆级教程

1. 准备cer格式的证书&#xff1b; 2. 合并cer证书并转化成jks格式的证书 为啥有这一步&#xff0c;因为cer证书配置在spring boot项目中&#xff0c;项目启动不起来。如果有大佬想指导一下可以给我留言&#xff0c;在此先谢过大佬。 1&#xff09;先创建一个jks格式的证…

Kotlin(八) 数据类、单例

目录 一&#xff1a;创建数据类 二&#xff1a;单例类 一&#xff1a;创建数据类 和Java的不同&#xff0c;kotlin的数据类比较简单&#xff0c;New→Kotlin File/Class&#xff0c;在弹出的对话框中输入“Book”&#xff0c;创建类型选择“Data”。如图&#xff1a; 然后编…

Keil Map信息解析

基本功能&#xff1a; 1.在Keil里面&#xff0c;通过App.Map复制所有信息。然后解析剪辑版内容。 2.随意输入一个函数内存地址&#xff0c;即可遍历出该内存地址属于哪个.c或者函数名。或者能遍历出变量。 强化功能&#xff1a; 1.通过Keil5 命令 Save xxxxxxx\1.Hex 0x200173…

为什么我觉得Rust比C++复杂得多?

为什么我觉得Rust比C复杂得多&#xff1f; Rust自学确实有一定门槛&#xff0c;很多具体问题解决起来搜索引擎也不太帮的上忙&#xff0c;会出现卡住的情况&#xff0c;卡的时间长了就放弃了。最近很多小伙伴找我&#xff0c;说想要一些c语言资料&#xff0c;然后我根据自己从…