0基础学习PyFlink——用户自定义函数之UDF

大纲

  • 标量函数
    • 入参并非表中一行(Row)
    • 入参是表中一行(Row)
    • alias

PyFlink中关于用户定义方法有:

  • UDF:用户自定义函数。
  • UDTF:用户自定义表值函数。
  • UDAF:用户自定义聚合函数。
  • UDTAF:用户自定义表值聚合函数。

这些字母可以拆解如下:

  • UD表示User Defined(用户自定义);
  • F表示Function(方法);
  • T表示Table(表);
  • A表示Aggregate(聚合);
    在这里插入图片描述
    Aggregate(聚合)函数是指:以多行数据为输入,计算出一个新的值的函数。这块我们会在后续的章节介绍,本文我们主要介绍非聚合类型的用户自定义方法的简单使用。

标量函数

即我们常见的UDF。

def udf(f: Union[Callable, ScalarFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None, func_type: str = "general",udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]:

我们主要关注result_type和input_types,它们分别用于确定函数的输入和输出。
input_types可以是List[DataType], DataType, str, List[str]之一任何一种,这个要视使用者决定。UDTF也是这种类型,它们没啥区别。
result_type只能是DataType或str;而UDTF可以是List[DataType], DataType, str, List[str]任意之一。这也是UDF和UDTF最大的区别。
我们以一个例子来介绍它的用法。这个例子会将大写字符转换成小写字符,然后统计字符出现的次数。
在介绍例子之前,我们先构造Execute之前的准备环境

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctionword_count_data = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "A", "G"]  def word_count():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source)# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector('print')\.schema(sink_schema) \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)

这段代码从读取数据word_count_data,并构造出tab_source作为输入数据暂存的表。下面我们看下入参不同时,UDF怎么写

入参并非表中一行(Row)

    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=[DataTypes.STRING()])def colFunc(oneCol):return Row(oneCol.lower())

input_types我们设置成[DataTypes.STRING()],即该数组中只有一个参数,也表示修饰的方法只有一个参数,类型是String。如果觉得input_types写起来麻烦,这个参数可以不设置。
result_type我们设置为一个DataTypes.ROW([DataTypes.FIELD(“lower_word”, DataTypes.STRING())])。我们可以把它看成是一个新表的结构描述,即一行只有一个字段——lower_word,它的类型也是String。

    tab_lower=tab_source.map(colFunc(col('word')))

map方法中,我们会给UDF修饰的方法传入原始表tab_source每行中的word字段的值。然后构造出一个新的表tab_lower。这个新的表没有word字段,只有UDF中result_type定义的lower_word。

def map(self, func: Union[Expression, UserDefinedScalarFunctionWrapper]) -> 'Table':

后续只要使用这个新表,新字段即可。

    tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctionword_count_data = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "A", "G"]  def word_count():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source )# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector('print')\.schema(sink_schema) \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)@udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=[DataTypes.STRING()])def colFunc(oneCol):return Row(oneCol.lower())tab_lower=tab_source.map(colFunc(col('word')))   tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()if __name__ == '__main__':word_count()

入参是表中一行(Row)

    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=row_type_tab_source)def rowFunc(row):return Row(row[0].lower())tab_lower=tab_source.map(rowFunc) tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

主要的区别是map方法直接传递udf修饰的方法,而不是直接其调用返回值。input_types是原始表的行结构——RowType,而不是一个参数数组。
map方法给rowFunc传递原始表tab_source的每行数据,然后构造出一个新表tab_lower。新表的字段也在udf的result_type中定义了,它是String类型的lower_word。后面我们对新表就要聚合统计这个新的字段,而不是老表中的字段。

alias

前面两个案例,在定义UDF时,我们严格设置了result_type和input_types。实际input_types可以不用设置,但是result_type必须设置。上面例子中,result_type我们都设置为RowType,即表行的结构。如果觉得这样写很麻烦,可以考虑使用alias来实现。

    @udf(result_type=DataTypes.STRING())def colFunc(oneCol):return oneCol.lower()tab_lower=tab_source.map(colFunc(col('word'))).alias('lower_word')tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()
    @udf(result_type=DataTypes.STRING())def rowFunc(row):return row[0].lower()tab_lower=tab_source.map(rowFunc).alias('lower_word')tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

这样我们在定义udf时,只是指定了返回类型是个字符串,也不知道它在新表中叫啥名字(实际叫f0)。但是为了便于后续使用,我们使用alias给它取了一个别名lower_word。这样就可以让其参与后续的计算了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/123952.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue2+ant-design-vue a-select组件二次封装(支持单选/多选添加全选/分页(多选跨页选中)/自定义label)

一、效果图 二、参数配置 1、代码示例 <t-antd-selectv-model"selectVlaue":optionSource"stepList"change"selectChange" />2、配置参数&#xff08;Attributes&#xff09;继承 a-select Attributes 参数说明类型默认值v-model绑定值…

安装PS及AI遇到的问题

Mac安装PS/AI/PR/AE提示错误代码146 Failed with error code 146解决办法: 访达, -> 前往 ->前往文件夹 -> /Applications/Utilities/Adobe Sync 将Adobe Sync 文件夹里的CoreSync文件夹直接删掉

C++ 标准库随机数:std::default_random_engine

库头文件 #include <random> // 通过种子值设置随机数生成器 std::default_random_engine rng(seed);// 不设置种子值&#xff0c;使用默认值 std::default_random_engine rng; // 生成一个0到9之间的随机整数 int random_int rng() % 10;// 生成一个0到1之间的随机浮…

vivado crash

将增量编译去了

[Shell] ${} 的多种用法

文章目录 解释代码 解释 在Shell脚本中&#xff0c;${} 是一种变量替换语法。它用于获取和操作变量的值。 具体来说&#xff0c;${} 可以用来执行以下操作&#xff1a; 变量引用&#xff1a;${variable} 表示引用变量 variable 的值。 变量默认值&#xff1a;${variable:-de…

FPGA时序分析与约束(9)——主时钟约束

一、时序约束 时序引擎能够正确分析4种时序路径的前提是&#xff0c;用户已经进行了正确的时序约束。时序约束本质上就是告知时序引擎一些进行时序分析所必要的信息&#xff0c;这些信息只能由用户主动告知&#xff0c;时序引擎对有些信息可以自动推断&#xff0c;但是推断得到…

Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅

1.引入RocketMQ依赖&#xff1a;首先&#xff0c;在pom.xml文件中添加RocketMQ的依赖&#xff1a; <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</versi…

文件改名,轻松添加前缀顺序编号,文件改名更高效!

您是否曾经需要批量修改文件名&#xff0c;并希望在文件名中添加特定的前缀或顺序编号&#xff1f;现在&#xff0c;我们为您带来了一款全新的文件改名工具&#xff0c;帮助您轻松解决这个问题&#xff01; 第一步&#xff0c;进入文件批量改名高手主页面&#xff0c;在板块栏…

C++学习笔记之四(标准库、标准模板库、vector类)

C 1、C标准库2、C标准模板库2.1、vector2.1.1、vector与array2.1.2、vector与函数对象2.1.3、vector与迭代器2.1.4、vector与算法 1、C标准库 C C C标准库指的是标准程序库( S t a n d a r d Standard Standard L i b a r a y Libaray Libaray)&#xff0c;它定义了十个大类…

js读取文件 vue读取文件 JavaScript 读取文件解析为字符串 js读取文件 Vue读取文件

js读取文件 vue读取文件 JavaScript 读取文件解析为字符串 js读取文件 Vue读取文件 js读取文件 vue读取文件 JavaScript 读取文件解析为字符串 js读取文件 Vue读取文件使用 FileReader js读取文件 vue读取文件 JavaScript 读取文件解析为字符串 js读取文件 Vue读取文件 使用 F…

【python爬虫】设计自己的爬虫 1. request封装

通过requests.session().request 封装request方法 考虑到请求HTTP/2.0 同时封装httpx 来处理HTTP/2.0的请求 封装requests # 遇到请求失败的情况时 重新请求&#xff0c;请求5次等待2s retry(stop_max_attempt_number5, retry_on_resultlambda re_data: re_data is None, wai…

亚马逊,速卖通,美客多如何打造爆款商品,排名提升榜首

1、产品Listing的完整性 Listing是亚马逊A9算法认识你产品的基础&#xff0c;在发布一条listing的时候&#xff0c;尽可能地做到最好!在准备一条listing之前&#xff0c;一定事先要收集、整理足够多的产品关键词&#xff0c;在优化listing内容的时候填充进去。仔细观察优秀竞品…

Realrek 2.5G交换机 8+1万兆光RTL8373-VB-CG方案简介

新一代2.5G交换机方案RTL8373-VB-CG可以提供4中不同形态 a. 52.5G 电口110G光》RTL8373 b. 52.5G 电口110G电》RTL83738261 c. 82.5G 电口110G光》RTL83738224 d.82.5G 电口110G电口》RTL837382248261 1.概述 Realtek RTL8373-CG是一款低功耗、高性能、高度集成的八端口2.5G和一…

C++设计模式_19_Memento 备忘录(理解,目前多使用序列化方案来实现)

Memento 备忘录模式也属于“状态变化”模式&#xff0c;它是一个小模式&#xff0c;在今天来看有些过时&#xff0c;当今已经很少使用当前模式实现需求&#xff0c;思想却不变&#xff08;信息隐藏&#xff09;&#xff0c;目前多使用序列化方案来实现。本系列所介绍的模式&…

小程序开发——小程序项目的配置与生命周期

1.app.json配置属性 app.json配置属性 2.页面配置 app的页面配置指的是pages属性&#xff0c; pages数组的第一个页面将默认作为小程序的启动页。利用开发工具新建页面时&#xff0c;则pages属性对应的数组将自动添加该页面的路径&#xff0c;若是在硬盘中添加文件的形式则不…

C++数据结构X篇_23_快速排序(最快、不稳定的排序)

文章参考十大经典排序算法-快速排序算法详解进行整理补充。快速排序是最快的排序方法。 排序思路&#xff1a;分治法-挖坑填数&#xff1a;大问题分解为各个小问题&#xff0c;对小问题求解&#xff0c;使得大问题得以解决 文章目录 1. 什么是快速排序1.1 概念1.2 算法原理1.3 …

手写常用的javascript函数

// 获取url参数 function getQueryString(name) {var reg new RegExp((^|&) name ([^&]*)(&|$), i);var r window.location.search.substr(1).match(reg);if (r ! null) {return unescape(r[2]);}return null; }// 验证手机号码(strict) function validPhoneNu…

Linux rm命令:删除文件或目录

当 Linux 系统使用很长时间之后&#xff0c;可能会有一些已经没用的文件&#xff08;即垃圾&#xff09;&#xff0c;这些文件不但会消耗宝贵的硬盘资源&#xff0c;还是降低系统的运行效率&#xff0c;因此需要及时地清理。 rm 是强大的删除命令&#xff0c;它可以永久性地删除…

一百九十七、Java——IDEA项目中把多层文件夹拆开显示

一、目的 由于IDEA项目中&#xff0c;默认的是把文件夹连在一起显示&#xff0c;于是为了方便需要把这些连在一起的文件夹拆开&#xff0c;分层显示 如文件夹cn.kgc 二、解决措施 解决方法很简单 &#xff08;一&#xff09;找到IDEA项目上的小齿轮 &#xff08;二&#xf…

基于深度学习的单图像人群计数研究:网络设计、损失函数和监控信号

摘要 https://arxiv.org/pdf/2012.15685v2.pdf 单图像人群计数是一个具有挑战性的计算机视觉问题,在公共安全、城市规划、交通管理等领域有着广泛的应用。近年来,随着深度学习技术的发展,人群计数引起了广泛的关注并取得了巨大的成功。通过系统地回顾和总结2015年以来基于深…