0基础学习PyFlink——用户自定义函数之UDTF

大纲

  • 表值函数
  • 完整代码

在《0基础学习PyFlink——用户自定义函数之UDF》中,我们讲解了UDF。本节我们将讲解表值函数——UDTF
在这里插入图片描述

表值函数

我们对比下UDF和UDTF

def udf(f: Union[Callable, ScalarFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None, func_type: str = "general",udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]:
def udtf(f: Union[Callable, TableFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_types: Union[List[DataType], DataType, str, List[str]] = None,deterministic: bool = None,name: str = None) -> Union[UserDefinedTableFunctionWrapper, Callable]:

可以发现:

  • UDF比UDTF多了func_type和udf_type参数;
  • UDTF的返回类型比UDF的丰富,多了两个List类型:List[DataType]和List[str];

特别是最后一点,可以认为是UDF和UDTF在应用上的主要区别。
换种更容易理解的说法是:UDTF可以返回任意数量的行作为输出而不是像UDF那样返回单个值(行)。
举一个例子:

word_count_data = ["A", "B", "C", "a", "C"] 

我们希望统计上面这些字符的个数,以及小写后字符的个数。这样A的个数是1,a的个数是2(因为a算一个,A小写后又算一个)。C的个数是2,g的个数是2。
这就要求统计算法在遇到大写字母时,需要统计大小写两种字母;而遇到小写字母时,只需要统计小写字母。

    @udtf(result_types=[DataTypes.STRING()], input_types=row_type_tab_source)def rowFunc(row):if row[0].isupper():yield row[0]yield row[0].lower()else:yield row[0]

yield关键字返回的是generator生成器。Table API对rowFunc的调用最终会生成[“A”,“a”,“B”,“b”,“C”,“c”,“a”,“C”,“c”]。
和调用UDF不同的是,需要使用flat_map来调用UDTF。flat即为“打平”,可以生动的理解为将多维降为一维。

    tab_trans=tab_source.flat_map(rowFunc)tab_trans.execute().print()
+--------------------------------+
|                             f0 |
+--------------------------------+
|                              A |
|                              a |
|                              B |
|                              b |
|                              C |
|                              c |
|                              a |
|                              C |
|                              c |
+--------------------------------+
9 rows in set

由于我们没有指定经过处理的值所属的字段名称,于是会使用默认的f0作为字段名。我们可以使用alias来给它别名下。

    tab_trans_alias=tab_trans.alias('trans_word')tab_trans_alias.execute().print()
+--------------------------------+
|                     trans_word |
+--------------------------------+
|                              A |
|                              a |
|                              B |
|                              b |
|                              C |
|                              c |
|                              a |
|                              C |
|                              c |
+--------------------------------+
9 rows in set

最后我们就可以用这个新的表做字数统计计算

    tab_trans_alias.group_by(col('trans_word')) \.select(col('trans_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()
+I[A, 1]
+I[a, 2]
+I[B, 1]
+I[b, 1]
+I[C, 2]
+I[c, 2]

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctionword_count_data = ["A", "B", "C", "a", "C"]  def word_count():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source)# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector('print')\.schema(sink_schema) \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)@udtf(result_types=[DataTypes.STRING()], input_types=row_type_tab_source)def rowFunc(row):if row[0].isupper():yield row[0]yield row[0].lower()else:yield row[0]tab_trans=tab_source.flat_map(rowFunc)tab_trans.execute().print()tab_trans_alias=tab_trans.alias('trans_word')tab_trans_alias.execute().print()tab_trans_alias.group_by(col('trans_word')) \.select(col('trans_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()if __name__ == '__main__':word_count()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/119608.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【网安大模型专题10.19】论文6:Java漏洞自动修复+数据集 VJBench+大语言模型、APR技术+代码转换方法+LLM和DL-APR模型的挑战与机会

How Effective Are Neural Networks for Fixing Security Vulnerabilities 写在最前面摘要贡献发现 介绍背景:漏洞修复需求和Java漏洞修复方向动机方法贡献 数据集先前的数据集和Java漏洞Benchmark数据集扩展要求数据处理工作最终数据集 VJBenchVJBench 与 Vul4J 的…

NO.2 | 977.有序数组的平方 ,209.长度最小的子数组 ,59.螺旋矩阵II

NO.2 | 977.有序数组的平方 ,209.长度最小的子数组 ,59.螺旋矩阵II 题目链接:977.有序数组的平方 前置条件:数组是非递减的,如果是乱序的数组应该就只能暴力解法暴力解法:直接遍历,对每个元素都…

与AI对话,如何写好prompt?

玩转AIGC,优质的Prompt提示词实在是太重要了!同样的问题,换一个问法,就会得到差别迥异的答案。你是怎样和AI进行对话交流的呢?我来分享几个: 请告诉我…我想知道…对于…你有什么看法?帮我解决…

腾讯云学生专享云服务器介绍及购买攻略

随着互联网技术的不断发展,越来越多的人开始关注云计算领域。作为国内领先的云计算服务商,腾讯云推出了“云校园”扶持计划,完成学生认证即可购买学生专享云服务器。 一、活动对象 仅限腾讯云官网通过个人认证的35岁以下学生用户参与&#x…

初识Node.js开发

一、Node.js是什么 1.node.js是什么 官方对Node.js的定义: Node.js是一个基于V8 JavaScript引擎的JavaScript运行时环境。 也就是说Node.js基于V8引擎来执行JavaScript的代码,但是不仅仅只有V8引擎: 前面我们知道V8可以嵌入到任何C 应用…

PULP Ubuntu18.04

1. 安装eda工具:questasim_10.7_linux64,网上有教程和方法,如有问题,可私信我 2. 代码下载: git clone https://github.com/pulp-platform/pulp 编译代码 cd pulp source setup/vsim.sh make checkout make scripts …

解决 Element-ui中 表格(Table)使用 v-if 条件控制列显隐时数据展示错乱的问题

本文 Element-ui 版本 2.x 问题 在 el-table-column 上需根据不同 v-if 条件来控制列显隐时&#xff0c;就会出现列数据展示错乱的情况&#xff08;要么 A 列的数据显示在 B 列上&#xff0c;或者后端返回有数据的但是显示的为空&#xff09;&#xff0c;如下所示。 <tem…

vue中使用xlsx插件导出多sheet excel实现方法

安装xlsx&#xff0c;一定要注意版本&#xff1a; npm i xlsx0.17.0 -S package.json&#xff1a; {"name": "hello-world","version": "0.1.0","private": true,"scripts": {"serve": "vue-c…

【Kotlin精简】第5章 简析DSL

1 DSL是什么&#xff1f; Kotlin 是一门对 DSL 友好的语言&#xff0c;它的许多语法特性有助于 DSL 的打造&#xff0c;提升特定场景下代码的可读性和安全性。本文将带你了解 Kotlin DSL 的一般实现步骤&#xff0c;以及如何通过 DslMarker &#xff0c; Context Receivers 等…

[UDS] --- DiagnosticSessionControl 0x10 service

1 会话 $10包含3个子功能&#xff0c;01 Default默认会话&#xff0c;02 Programming编程会话&#xff0c;03 Extended扩展会话&#xff0c;ECU上电时&#xff0c;进入的是默认会话&#xff08;Default&#xff09;。 为什么设计三个会话模式呢&#xff1f;因为权限问题。默认…

FreeRTOS学习2018.6.27

《FreeRTOS学习》 1.freeRTOS基本功能函数&#xff1a; 定义任务&#xff1a;ATaskFunction(); 创建任务&#xff1a;xTaskCreate(); 改优先级&#xff1a;vTaskPrioritySet(); 系统延时&#xff1a;vTaskDelay(); 精确延时&#xff1a;vTaskDelayUntil(); 空闲任务钩子函数&…

CSS高级的详细解析

CSS高级 目标&#xff1a;掌握定位的作用及特点&#xff1b;掌握 CSS 高级技巧 01-定位 作用&#xff1a;灵活的改变盒子在网页中的位置 实现&#xff1a; 1.定位模式&#xff1a;position 2.边偏移&#xff1a;设置盒子的位置 left right top bottom 相对定位 posit…

【计算机网络(1)】计算机网络体系结构1:计算机网络概述

文章目录 概念 & 功能 & 发展计算机网络的概念计算机网络的功能计算机网络的发展网络的本质 组成 & 分类计算机网络的组成计算机网络的分类 概念 & 功能 & 发展 计算机网络的概念 1. 网络 网一样的东西或网状系统。其中&#xff08;有线电视网络、电信网…

34基于MATLAB的杨氏双孔干涉条纹,可调节距离,孔的大小等参数,程序已调试通过,可直接运行。

基于MATLAB的杨氏双孔干涉条纹&#xff0c;可调节距离&#xff0c;孔的大小等参数&#xff0c;程序已调试通过&#xff0c;可直接运行。 34matlab杨氏双孔干涉条纹GUI (xiaohongshu.com)

(ubuntu)安装nginx

文章目录 前言回顾Linux命令在线安装&#xff1a;相关命令&#xff1a;相关路径常用配置&#xff1a; 卸载nginxbug相关: 前言 提示&#xff1a;别再问我的规划是什么了&#xff1a;呼吸&#xff0c;难道不算一个吗&#xff1f; --E.M齐奥朗 回顾Linux命令 # 查看当前进程的所…

hive sql,年月日 时分秒格式的数据,以15分钟为时间段,找出每一条数据所在时间段的上下界限时间值(15分钟分区)

获取当前的年月日 时分秒 select date_format(current_timestamp(), yyyy-MM-dd HH:mm:ss)date_format(时间字段, ‘yyyy-MM-dd HH:mm:ss’) 将时间字段转为 2023-10-18 18:14:16 这种格式 在指定时间上增加15分钟 select from_unixtime(unix_timestamp(current_timestamp(…

JavaScript设计模式快速参考指南

揭开 JavaScript 设计模式秘密的旅程&#xff0c;探索它们的意义、实现和实际应用。 创建模式是一种设计模式类别&#xff0c;用于解决与对象创建情况相关的常见问题。 1、单例模式 将特定对象的实例数量限制为一个。单例减少了对全局变量的需求&#xff0c;从而避免了名称冲…

Go语言入门心法(十三): 反射认知升维

Go语言入门心法(一): 基础语法 Go语言入门心法(二): 结构体 Go语言入门心法(三): 接口 Go语言入门心法(四): 异常体系 Go语言入门心法(五): 函数 Go语言入门心法(六): HTTP面向客户端|服务端编程 Go语言入门心法(七): 并发与通道 Go语言入门心法(八): mysql驱动安装报错o…

Win安装protobuf和IDEA使用protobuf插件

一、Win安装protobuf 1、下载编译器 protobuf下载地址&#xff1a;https://github.com/protocolbuffers/protobuf/releases 选择自己需要的版本下载&#xff0c;这里下载的是 protoc-3.19.1-win64.zip&#xff0c;下载之后进行解压即可。 2、配置环境变量 path 系统变量中添加…

分布式、集群、微服务

分布式是以缩短单个任务的执行时间来提升效率的&#xff1b;而集群则是通过提高单位时间内执行的任务数来提升效率。 分布式是指将不同的业务分布在不同的地方。 集群指的是将几台服务器集中在一起&#xff0c;实现同一业务。 分布式中的每一个节点&#xff0c;都可以做集群…