0基础学习PyFlink——不可以用UDTAF装饰器装饰function的原因分析

在研究Flink的“用户自定义方法”(UserDefinedFunction)时,我们看到存在如下几种类型的装饰器:

  1. UDF:User Defined Scalar Function
  2. UDTF:User Defined Table Function
  3. UDAF:User Defined Aggregate Function
  4. UDTAF:User Defined Table Aggregate Function

在很多案例中,我们看到udf、udtf和udaf几个装饰器修饰function

@udf(result_type=DataTypes.BIGINT())
def add(i, j):return i + j@udtf(result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()])
def range_emit(s, e):for i in range(e):yield s, i@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def mean_udaf(v):return v.mean()

但是没有见到udtaf修饰function的案例,比如

# 错误的
@udtaf(result_type=DataTypes.ROW([DataTypes.FIELD("word", DataTypes.STRING()) , DataTypes.FIELD("count", DataTypes.BIGINT())]), accumulator_type=DataTypes.ROW([DataTypes.FIELD("word", DataTypes.STRING())]), func_type="general")
def lower(line):yield Row('a', 1)

这是因为这儿存在一个悖论

udtaf要求func_type必须是general

def udtaf(f: Union[Callable, TableAggregateFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,accumulator_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None,func_type: str = 'general') -> Union[UserDefinedAggregateFunctionWrapper, Callable]:"""Helper method for creating a user-defined table aggregate function.:param f: user-defined table aggregate function.:param input_types: optional, the input data types.:param result_type: the result data type.:param accumulator_type: optional, the accumulator data type.:param deterministic: the determinism of the function's results. True if and only if a call tothis function is guaranteed to always return the same result given thesame parameters. (default True):param name: the function name.:param func_type: the type of the python function, available value: general(default: general):return: UserDefinedAggregateFunctionWrapper or function... versionadded:: 1.13.0"""if func_type != 'general':raise ValueError("The func_type must be 'general', got %s."% func_type)if f is None:return functools.partial(_create_udtaf, input_types=input_types, result_type=result_type,accumulator_type=accumulator_type, func_type=func_type,deterministic=deterministic, name=name)else:return _create_udtaf(f, input_types, result_type, accumulator_type, func_type,deterministic, name)

如果func_type不是’general’,则会抛出错误,所以func_type="pandas"是不可以的。
udtaf修饰方法后的返回类型是UserDefinedAggregateFunctionWrapper。

def _create_udtaf(f, input_types, result_type, accumulator_type, func_type, deterministic, name):return UserDefinedAggregateFunctionWrapper(f, input_types, result_type, accumulator_type, func_type, deterministic, name, True)

delegate function要求非func_type必须是pandas

Table API下只有这些方法接受udtaf修饰function返回的UserDefinedAggregateFunctionWrapper。

  • def aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) -> ‘AggregatedTable’
  • def flat_aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) -> ‘FlatAggregateTable’

这些方法的在底层会调用被修饰的UserDefinedFunctionWrapper。

    def aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) \-> 'AggregatedTable':"""Performs a global aggregate operation with an aggregate function. You have to close theaggregate with a select statement... versionadded:: 1.13.0"""if isinstance(func, Expression):return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)else:func._set_takes_row_as_input()if hasattr(func, "_alias_names"):alias_names = getattr(func, "_alias_names")func = func(with_columns(col("*"))).alias(*alias_names)else:func = func(with_columns(col("*")))return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)

进而会调用到_java_user_defined_function。由于udtaf修饰的方法不是UserDefinedFunction对象,而是一个function,所以它会通过_create_delegate_function创建新的func 。

class UserDefinedFunctionWrapper(object):
……def _java_user_defined_function(self):……if not isinstance(self._func, UserDefinedFunction):func = self._create_delegate_function()……

而_create_delegate_function则要求udtaf中的function的func_type必须是pandas

    def _create_delegate_function(self) -> UserDefinedFunction:assert self._func_type == 'pandas'return DelegatingPandasAggregateFunction(self._func)

这就和之前udtaf中要求func_type必须是general相背。
所以我们没看到udtaf修饰function的案例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/118313.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

doris 问题 列表

1. flink 导入数据后 出现的错误 2.错误描述: 导入任务过多,新导入任务提交报错 “current running txns on db xxx is xx, larger than limit xx ”? 调整 fe 参数 : max_running_txn_num_per_db,默认100,可适当调…

vue3 elementPlus 表格实现行列拖拽及列检索功能

1、安装vuedraggable npm i -S vuedraggablenext 2、完整代码 <template> <div classcontainer><div class"dragbox"><el-table row-key"id" :data"tableData" :border"true"><el-table-columnv-for"…

ChatGPT和Copilot协助Vue火速搭建博客网站

AI 对于开发人员的核心价值 网上会看到很多 AI 的应用介绍或者教程 使用 AI 聊天&#xff0c;咨询问题 —— 代替搜索引擎使用 AI 写各种的电商文案&#xff08;淘宝、小红书&#xff09;使用 AI 做一个聊天机器人 —— 这最多算猎奇、业余爱好、或者搞个套壳产品来收费 以上…

java8 Lambda表达式以及Stream 流

Lambda表达式 Lambda表达式规则 Lambda表达式可以看作是一段可以传递的代码&#xff0c; Lambda表达式只能用于函数式接口&#xff0c;而函数式接口只有一个抽象方法&#xff0c;所以可以省略方法名&#xff0c;参数类型等 Lambda格式&#xff1a;&#xff08;形参列表&…

nginx加权轮询,upstream,Keepalive,负载均衡实现案例

1. nginx 加权轮询, weight是权重配置。 #配置上游服务器 upstream tomcats {server 192.168.1.173:8080 weight=1;server 192.168.1.174:8080 weight=2;server 192.168.1.175:8080 weight=5; } server{liste

AWS Lambda 操作 RDS 示例

实现目标 创建一个 Lambda 接收调用时传入的数据, 写入 RDS 数据库 Post 表存储文章信息. 表结构如下: idtitlecontentcreate_date1我是标题我是正文内容2023-10-21 15:20:00 AWS 资源准备 RDS 控制台创建 MySQL 实例, 不允许 Public access (后面 Lambda 需要通过 VPC 访问…

AI的Prompt是什么

一.AI的Prompt的作用 在人工智能&#xff08;AI&#xff09;中&#xff0c;"Prompt"通常指的是向AI系统提供的输入或指令&#xff0c;用于引导AI进行特定的操作或生成特定的输出。例如&#xff0c;在一个对话型AI系统中&#xff0c;用户输入的问题就是一个prompt&…

【vue】使用less报错:显示this.getOptions is not a function

在vue-cli中使用 lang“less” 时报错&#xff1a; Module build failed: TypeError: this.getOptions is not a function at Object.lessLoader 原因&#xff1a;版本过高所致&#xff0c;所用版本为 解决&#xff1a;降低版本&#xff1a;npm install less-loader4.1.0 --s…

分类预测 | MATLAB实现SSA-CNN-BiGRU-Attention数据分类预测(SE注意力机制)

分类预测 | MATLAB实现SSA-CNN-BiGRU-Attention数据分类预测&#xff08;SE注意力机制&#xff09; 目录 分类预测 | MATLAB实现SSA-CNN-BiGRU-Attention数据分类预测&#xff08;SE注意力机制&#xff09;分类效果基本描述模型描述程序设计参考资料 分类效果 基本描述 1.MATLA…

kubeadm初始化搭建cri-dockerd记录 containerd.io

07.尚硅谷_搭建K8s集群&#xff08;kubeadm方式&#xff09;-部署master节点_哔哩哔哩_bilibili 视频里的版本只有1.17而现在&#xff08;2023.10.20&#xff09;kubernetes最新版本是1.28&#xff0c;需要搭载cri-dockerd&#xff0c; 先去网站下载了对应的rpm包cri-dockerd…

力扣每日一题69:x的平方根

题目描述&#xff1a; 给你一个非负整数 x &#xff0c;计算并返回 x 的 算术平方根 。 由于返回类型是整数&#xff0c;结果只保留 整数部分 &#xff0c;小数部分将被 舍去 。 注意&#xff1a;不允许使用任何内置指数函数和算符&#xff0c;例如 pow(x, 0.5) 或者 x ** 0…

Linux---(四)权限

文章目录 一、shell命令及运行原理1.什么是操作系统&#xff1f;2.外壳程序3.用户为什么不直接访问操作系统内核?4.操作系统内核为什么不直接把结果显示出来&#xff1f;非要加外壳程序&#xff1f;5.shell理解重点总结&#xff08;1&#xff09;shell是什么&#xff1f;&…

JDK8新特性:Stream流

目录 1.获取Stream流 2.Stream流常见的中间方法 3.Stream流常见的终结方法 1、 Stream 是什么&#xff1f;有什么作用&#xff1f;结合了什么技术&#xff1f; ●也叫 Stream 流&#xff0c;是Jdk8开始新增的一套 API ( java . util . stream .*)&#xff0c;可以用于操作集…

前端精度问题 (id 返回的和传给后端的不一致问题)

eg: 后端返回 id 10976458979374929 前端获取到的: 10976458979374928 原因: js 中 Number类型范围-2^53 1 到 2^53 - 1 Number.isSafeInteger()用来判断一个整数是否落在这个范围之内。 java中 Long 类型的取值范围是-2^63 1 到 2^63 - 1, 比JavaScript中大很多&#xff0…

Java开发-WebSocket

WebSocket是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工通信-浏览器和服务器只需要完成一次握手&#xff0c;两者之间就可以创建持久性的连接&#xff0c;并实现 双向数据传输。 使用 导入maven坐标 <dependency><groupId>org.springframework.bo…

基于V/F控制的三相逆变器MATLAB仿真模型

微❤关注“电气仔推送”获得资料&#xff08;专享优惠&#xff09; 参考文献&#xff1a;张飞,刘亚,张玉杰.基于V/F控制的三相逆变器仿真模型的研究[J].自动化与仪器仪表,2015 关于V/F控制的论文非常多&#xff0c;随意下载&#xff01; 当分布式电源经过逆变器运行于孤岛模…

mysql聚合查询

在MySQL中&#xff0c;聚合查询是指通过一些特定的函数&#xff08;如SUM、AVG、MAX、MIN等&#xff09;根据需求来查询相关的信息。以下是一些MySQL聚合查询的例子&#xff1a; 使用SUM函数查询某列的总和&#xff1a; SELECT SUM(salary) FROM employees; 使用AVG函数查询…

【计算机毕设案例推荐】高校学术研讨信息管理系统小程序SpringBoot+Vue+小程序

前言&#xff1a;我是IT源码社&#xff0c;从事计算机开发行业数年&#xff0c;专注Java领域&#xff0c;专业提供程序设计开发、源码分享、技术指导讲解、定制和毕业设计服务 项目名 基于SpringBoot的高校学术研讨信息管理系统小程序 技术栈 SpringBoot小程序VueMySQLMaven 文…

UE4 材质实操记录

TexCoord的R通道是从左到右的递增量&#xff0c;G通道是从上到下的递增量&#xff0c;R通道减去0.5&#xff0c;那么左边就是【-0.5~0】区间&#xff0c;所以左边为全黑&#xff0c;Abs取绝对值&#xff0c;就达到一个两边向中间的一个递减的效果&#xff0c;G通道同理&#xf…

stm32外部时钟为12MHZ,修改代码适配

代码默认是8MHZ的&#xff0c;修改2个地方&#xff1a; 第一个地方是这个文件的这里&#xff1a; 第二个地方是找到这个函数&#xff1a; 修改第二个地方的这里&#xff1a;