0基础学习PyFlink——用户自定义函数之UDTF

大纲

  • 表值函数
  • 完整代码

在《0基础学习PyFlink——用户自定义函数之UDF》中,我们讲解了UDF。本节我们将讲解表值函数——UDTF
在这里插入图片描述

表值函数

我们对比下UDF和UDTF

def udf(f: Union[Callable, ScalarFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None, func_type: str = "general",udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]:
def udtf(f: Union[Callable, TableFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_types: Union[List[DataType], DataType, str, List[str]] = None,deterministic: bool = None,name: str = None) -> Union[UserDefinedTableFunctionWrapper, Callable]:

可以发现:

  • UDF比UDTF多了func_type和udf_type参数;
  • UDTF的返回类型比UDF的丰富,多了两个List类型:List[DataType]和List[str];

特别是最后一点,可以认为是UDF和UDTF在应用上的主要区别。
换种更容易理解的说法是:UDTF可以返回任意数量的行作为输出而不是像UDF那样返回单个值(行)。
举一个例子:

word_count_data = ["A", "B", "C", "a", "C"] 

我们希望统计上面这些字符的个数,以及小写后字符的个数。这样A的个数是1,a的个数是2(因为a算一个,A小写后又算一个)。C的个数是2,g的个数是2。
这就要求统计算法在遇到大写字母时,需要统计大小写两种字母;而遇到小写字母时,只需要统计小写字母。

    @udtf(result_types=[DataTypes.STRING()], input_types=row_type_tab_source)def rowFunc(row):if row[0].isupper():yield row[0]yield row[0].lower()else:yield row[0]

yield关键字返回的是generator生成器。Table API对rowFunc的调用最终会生成[“A”,“a”,“B”,“b”,“C”,“c”,“a”,“C”,“c”]。
和调用UDF不同的是,需要使用flat_map来调用UDTF。flat即为“打平”,可以生动的理解为将多维降为一维。

    tab_trans=tab_source.flat_map(rowFunc)tab_trans.execute().print()
+--------------------------------+
|                             f0 |
+--------------------------------+
|                              A |
|                              a |
|                              B |
|                              b |
|                              C |
|                              c |
|                              a |
|                              C |
|                              c |
+--------------------------------+
9 rows in set

由于我们没有指定经过处理的值所属的字段名称,于是会使用默认的f0作为字段名。我们可以使用alias来给它别名下。

    tab_trans_alias=tab_trans.alias('trans_word')tab_trans_alias.execute().print()
+--------------------------------+
|                     trans_word |
+--------------------------------+
|                              A |
|                              a |
|                              B |
|                              b |
|                              C |
|                              c |
|                              a |
|                              C |
|                              c |
+--------------------------------+
9 rows in set

最后我们就可以用这个新的表做字数统计计算

    tab_trans_alias.group_by(col('trans_word')) \.select(col('trans_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()
+I[A, 1]
+I[a, 2]
+I[B, 1]
+I[b, 1]
+I[C, 2]
+I[c, 2]

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctionword_count_data = ["A", "B", "C", "a", "C"]  def word_count():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source)# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector('print')\.schema(sink_schema) \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)@udtf(result_types=[DataTypes.STRING()], input_types=row_type_tab_source)def rowFunc(row):if row[0].isupper():yield row[0]yield row[0].lower()else:yield row[0]tab_trans=tab_source.flat_map(rowFunc)tab_trans.execute().print()tab_trans_alias=tab_trans.alias('trans_word')tab_trans_alias.execute().print()tab_trans_alias.group_by(col('trans_word')) \.select(col('trans_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()if __name__ == '__main__':word_count()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/119608.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【网安大模型专题10.19】论文6:Java漏洞自动修复+数据集 VJBench+大语言模型、APR技术+代码转换方法+LLM和DL-APR模型的挑战与机会

How Effective Are Neural Networks for Fixing Security Vulnerabilities 写在最前面摘要贡献发现 介绍背景:漏洞修复需求和Java漏洞修复方向动机方法贡献 数据集先前的数据集和Java漏洞Benchmark数据集扩展要求数据处理工作最终数据集 VJBenchVJBench 与 Vul4J 的…

腾讯云学生专享云服务器介绍及购买攻略

随着互联网技术的不断发展,越来越多的人开始关注云计算领域。作为国内领先的云计算服务商,腾讯云推出了“云校园”扶持计划,完成学生认证即可购买学生专享云服务器。 一、活动对象 仅限腾讯云官网通过个人认证的35岁以下学生用户参与&#x…

初识Node.js开发

一、Node.js是什么 1.node.js是什么 官方对Node.js的定义: Node.js是一个基于V8 JavaScript引擎的JavaScript运行时环境。 也就是说Node.js基于V8引擎来执行JavaScript的代码,但是不仅仅只有V8引擎: 前面我们知道V8可以嵌入到任何C 应用…

解决 Element-ui中 表格(Table)使用 v-if 条件控制列显隐时数据展示错乱的问题

本文 Element-ui 版本 2.x 问题 在 el-table-column 上需根据不同 v-if 条件来控制列显隐时&#xff0c;就会出现列数据展示错乱的情况&#xff08;要么 A 列的数据显示在 B 列上&#xff0c;或者后端返回有数据的但是显示的为空&#xff09;&#xff0c;如下所示。 <tem…

vue中使用xlsx插件导出多sheet excel实现方法

安装xlsx&#xff0c;一定要注意版本&#xff1a; npm i xlsx0.17.0 -S package.json&#xff1a; {"name": "hello-world","version": "0.1.0","private": true,"scripts": {"serve": "vue-c…

【Kotlin精简】第5章 简析DSL

1 DSL是什么&#xff1f; Kotlin 是一门对 DSL 友好的语言&#xff0c;它的许多语法特性有助于 DSL 的打造&#xff0c;提升特定场景下代码的可读性和安全性。本文将带你了解 Kotlin DSL 的一般实现步骤&#xff0c;以及如何通过 DslMarker &#xff0c; Context Receivers 等…

[UDS] --- DiagnosticSessionControl 0x10 service

1 会话 $10包含3个子功能&#xff0c;01 Default默认会话&#xff0c;02 Programming编程会话&#xff0c;03 Extended扩展会话&#xff0c;ECU上电时&#xff0c;进入的是默认会话&#xff08;Default&#xff09;。 为什么设计三个会话模式呢&#xff1f;因为权限问题。默认…

CSS高级的详细解析

CSS高级 目标&#xff1a;掌握定位的作用及特点&#xff1b;掌握 CSS 高级技巧 01-定位 作用&#xff1a;灵活的改变盒子在网页中的位置 实现&#xff1a; 1.定位模式&#xff1a;position 2.边偏移&#xff1a;设置盒子的位置 left right top bottom 相对定位 posit…

【计算机网络(1)】计算机网络体系结构1:计算机网络概述

文章目录 概念 & 功能 & 发展计算机网络的概念计算机网络的功能计算机网络的发展网络的本质 组成 & 分类计算机网络的组成计算机网络的分类 概念 & 功能 & 发展 计算机网络的概念 1. 网络 网一样的东西或网状系统。其中&#xff08;有线电视网络、电信网…

34基于MATLAB的杨氏双孔干涉条纹,可调节距离,孔的大小等参数,程序已调试通过,可直接运行。

基于MATLAB的杨氏双孔干涉条纹&#xff0c;可调节距离&#xff0c;孔的大小等参数&#xff0c;程序已调试通过&#xff0c;可直接运行。 34matlab杨氏双孔干涉条纹GUI (xiaohongshu.com)

(ubuntu)安装nginx

文章目录 前言回顾Linux命令在线安装&#xff1a;相关命令&#xff1a;相关路径常用配置&#xff1a; 卸载nginxbug相关: 前言 提示&#xff1a;别再问我的规划是什么了&#xff1a;呼吸&#xff0c;难道不算一个吗&#xff1f; --E.M齐奥朗 回顾Linux命令 # 查看当前进程的所…

Go语言入门心法(十三): 反射认知升维

Go语言入门心法(一): 基础语法 Go语言入门心法(二): 结构体 Go语言入门心法(三): 接口 Go语言入门心法(四): 异常体系 Go语言入门心法(五): 函数 Go语言入门心法(六): HTTP面向客户端|服务端编程 Go语言入门心法(七): 并发与通道 Go语言入门心法(八): mysql驱动安装报错o…

Win安装protobuf和IDEA使用protobuf插件

一、Win安装protobuf 1、下载编译器 protobuf下载地址&#xff1a;https://github.com/protocolbuffers/protobuf/releases 选择自己需要的版本下载&#xff0c;这里下载的是 protoc-3.19.1-win64.zip&#xff0c;下载之后进行解压即可。 2、配置环境变量 path 系统变量中添加…

RT-Thread 7. RT-Thread Studio ENV修改MCU型号

1. 修改MCU型号 2.在ENV界面输入 scons -c scons --dist3. dist下为更新后完整源代码 4.导入RT-Thread Studio 发现GD32F330已经生效了。 5. 自己编写startup_gd32f3x0.S&#xff0c;准确性待验证 ;/* ; * Copyright (c) 2006-2021, RT-Thread Development Team ; * ; * SPD…

如何将Mysql数据库的表导出并导入到另外的架构

如何将Mysql数据库的表导出并导入到另外的架构 准备一、解决方法1.右键->导出->用mysqldump导出2.注意路径一般为&#xff1a;C:/Program Files/MySQL/MySQL Server 8.0/bin/mysqldump.exe和导出的sql文件位置3.右键->SQL脚本->运行SQL脚本4.找到SQL脚本并点击确定…

通过实例理解Go Web身份认证的几种方式

在2023年Q1 Go官方用户调查报告[1]中&#xff0c;API/RPC services、Websites/web services都位于使用Go开发的应用类别的头部(如下图)&#xff1a; 我个人使用Go开发已很多年&#xff0c;但一直从事底层基础设施、分布式中间件等方向&#xff0c;Web应用开发领域涉及较少&…

SD-WAN跨境网络专线|跨境访问无忧!让海外SaaS平台与视频会议更稳定轻松的解决方案

在现如今全球化的时代&#xff0c;企业都有布局全球或是有潜力的国家&#xff0c;在海外开分公司必不可少&#xff0c;那与海外合作伙伴进行沟通与合作已经成为企业的常态。但是&#xff0c;访问海外的SaaS平台和进行视频会议时&#xff0c;我们经常会遇到网络不稳定、速度慢的…

idea 中配置 maven

前文叙述&#xff1a; 配置 maven 一共要设置两个地方&#xff1a;1、为当前项目设置2、为新项目设置maven 的下载和安装可参考我之前写过的文章&#xff0c;具体的配置文章中也都有讲解。1、为当前项目进行 maven 配置 配置 VM Options: -DarchetypeCataloginternal2、为新项…

CleanMyMac X免费macOS清理系统管家

近些年伴随着苹果生态的蓬勃发展&#xff0c;越来越多的用户开始尝试接触Mac电脑。然而很多人上手Mac后会发现&#xff0c;它的使用逻辑与Windows存在很多不同&#xff0c;而且随着使用时间的增加&#xff0c;一些奇奇怪怪的文件也会占据有限的磁盘空间&#xff0c;进而影响使用…

心理咨询医院预约和挂号系统

源码下载地址 支持&#xff1a;远程部署/安装/调试、讲解、二次开发/修改/定制 系统分为&#xff1a;患者端、医生端、管理员端。 患者端 医生端 管理员端