0基础学习PyFlink——用户自定义函数之UDTAF

大纲

  • UDTAF
  • TableAggregateFunction的实现
    • 累加器
      • 定义
      • 创建
      • 累加
    • 返回
      • 类型
      • 计算
  • 完整代码

在前面几篇文章中,我们分别介绍了UDF、UDTF和UDAF这三种用户自定义函数。本节我们将介绍最后一种函数:UDTAF——用户自定义表值聚合函数。
在这里插入图片描述

UDTAF

UDTAF函数即具备了UDTF的特点,也具备UDAF的特点。即它可以像《0基础学习PyFlink——用户自定义函数之UDTF》介绍的UDTF那样可以返回任意数量的行作为输,又可以像《0基础学习PyFlink——用户自定义函数之UDAF》介绍的UDAF那样通过聚合的数据(多组)计算出一个值
举一个例子:我们拿到一个学生成绩表,每行包括:

  • 学生姓名
  • 英语成绩
  • 数学成绩
  • 年级

现在我们需要把这张表调整为:

  • 学生姓名
  • 成绩
  • 科目
  • 科目年级平均成绩
  • 年级
    在这里插入图片描述
    将一行中的“英语成绩”和“数学成绩”,拆成“成绩”和“科目”,相当于把一行数据拆解成多行,如上图左侧“张三”只有一行,而右侧有两行“张三”信息。这种拆解操作就需要T类型的用户自定义函数,比如UDTF和UDTAF。
    而我们需要计算一个年级一科的平均成绩,比如1年级英语的平均成绩,则需要按年级聚合之后再做计算。这个就需要A类型的用户自定义函数,比如UDAF和UDTAF。
    同时要满足上述两种技术方案的就是UDTAF。我们先看下主体代码,它和《0基础学习PyFlink——用户自定义函数之UDAF》中的很像。但是有两个重要区别:
  • 要设置成in_streaming_mode模式,否则会报错
  • udtaf要修饰一个对象,而非一个方法;
def calc():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_streaming_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('english', DataTypes.FLOAT()), DataTypes.FIELD('math', DataTypes.FLOAT()), DataTypes.FIELD('grade', DataTypes.STRING())])students_score = [("张三", 80.0, 60.0, "1"),("李四", 75.0, 95.0, "1"),("王五", 90.0, 90.0, "2"),("赵六", 85.0, 70.0, "2"),("孙七", 60.0, 0.0, "3"),]tab_source = t_env.from_elements(students_score, row_type_tab_source)split_class = udtaf(SplitClass())tab_source.group_by(col('grade')) \.flat_aggregate(split_class) \.select(col('*')) \.execute().print()

TableAggregateFunction的实现

用于计算的类要继承于TableAggregateFunction,即UDTAF中的TAF。

class SplitClass(TableAggregateFunction):_class_keys = ["english", "math"]

我们需要通过get_result_type告诉框架,UDTAF函数返回的是什么类型的数据。一般我们都是构造一个行类型——ROW,然后定义其每个字段的值和类型:

  • name:string类型,用户姓名;
  • score:float类型,考分;
  • avg score:float类型,科目年级平均分数;
  • class:sting类型,科目名称;

累加器

accumulator(累加器)是用于参与计算的中间数据。比如这个案例中,我们会向让accumulator保存拆解后的数据(即一行拆解成多行后的数据),然后再计算各年级每科的平均成绩。

定义

    def get_accumulator_type(self):return DataTypes.ARRAY(DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())])) 

因为只是为了保存展开的数据,于是我们只用定义均值计算之前的字段:

  • name:string类型,姓名;
  • score:float类型,分数;
  • class:string类型,科目名称;

创建

刚开始时,我们让其是一个空数组,对应上定义中的ARRAY类型。

    def create_accumulator(self):return []

累加

我们对科目进行遍历,进行行的拆分。即将(“张三”, 80.0, 60.0, “1”)拆解成(“张三”, 80.0, “english”)和(“张三”, 60.0, “math”)这样的两组数据。

    def accumulate(self, accumulator, row):for i in self._class_keys:accumulator.append(Row(row["name"], row[i], i))

返回

类型

    def get_result_type(self):return DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("avg score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())])

可以看到result_type(返回类型)和accumulator_type(累加器类型)是不一样的(也可以一样,主要看怎么计算规则)。前者比后者多了“学科年级平均分”(avg score),这就更加接近我们希望获得的最终结果。
这些字段和我们目标字段只差一个grade(年级)。因为原始表中有grade,且我们会通过grade聚类,所以最终我们可以获得这个信息,而不用在这儿定义。
需要注意的是,虽然表值类型函数返回的是一组数据(若干Row),但是这儿只是返回Row的具体定义,而不是ARRAY[Row]。

计算

    def emit_value(self, accumulator):rows = []for i in self._class_keys: total = 0.0student_count = 0for y in accumulator:# y[2] y[]"class"]if i == y[2]:# y[1] y["score"]total = total + y[1]student_count = student_count + 1avg_score = total / student_countfor y in accumulator:if i == y[2]:rows.append(Row(y[0], y[1], avg_score, y[2]))for x in rows:   yield x

这个函数会在最后执行,它会通过累加器中的数据计算“学科年级平均分”,然后构造和“返回类型”一直的Row到rows数组中。最后通过yeild关键字返回一个生成器,我们可以将其看成还是一组Row,即拆解后的结果。

最后我们看下结果

+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                          grade |                           name |                          score |                      avg score |                          class |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                              1 |                           张三 |                           80.0 |                           77.5 |                        english |
| +I |                              1 |                           李四 |                           75.0 |                           77.5 |                        english |
| +I |                              1 |                           张三 |                           60.0 |                           77.5 |                           math |
| +I |                              1 |                           李四 |                           95.0 |                           77.5 |                           math |
| +I |                              2 |                           王五 |                           90.0 |                           87.5 |                        english |
| +I |                              2 |                           赵六 |                           85.0 |                           87.5 |                        english |
| +I |                              2 |                           王五 |                           90.0 |                           80.0 |                           math |
| +I |                              2 |                           赵六 |                           70.0 |                           80.0 |                           math |
| +I |                              3 |                           孙七 |                           60.0 |                           60.0 |                        english |
| +I |                              3 |                           孙七 |                            0.0 |                            0.0 |                           math |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
10 rows in set

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf,TableAggregateFunction
import pandas as pd
from pyflink.table.udf import UserDefinedFunction
from typing import Listclass SplitClass(TableAggregateFunction):_class_keys = ["english", "math"]def emit_value(self, accumulator):rows = []for i in self._class_keys: total = 0.0student_count = 0for y in accumulator:if i == y[2]:total = total + y[1]student_count = student_count + 1avg_score = total / student_countfor y in accumulator:if i == y[2]:rows.append(Row(y[0], y[1], avg_score, y[2]))return rowsdef create_accumulator(self):return []def accumulate(self, accumulator, row):for i in self._class_keys:accumulator.append(Row(row["name"], row[i], i))def get_accumulator_type(self):return DataTypes.ARRAY(DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())]))  def get_result_type(self):return DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("avg score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())])def calc():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_streaming_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('english', DataTypes.FLOAT()), DataTypes.FIELD('math', DataTypes.FLOAT()), DataTypes.FIELD('grade', DataTypes.STRING())])students_score = [("张三", 80.0, 60.0, "1"),("李四", 75.0, 95.0, "1"),("王五", 90.0, 90.0, "2"),("赵六", 85.0, 70.0, "2"),("孙七", 60.0, 0.0, "3"),]tab_source = t_env.from_elements(students_score, row_type_tab_source)split_class = udtaf(SplitClass())tab_source.group_by(col('grade')) \.flat_aggregate(split_class) \.select(col('*')) \.execute().print()if __name__ == '__main__':calc()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/120765.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

什么是腾讯云轻量应用服务器?轻量性能和价格表分享

腾讯云轻量应用服务器怎么样?什么是腾讯云轻量应用服务器?轻量应用服务器性能怎么样?腾讯云轻量应用服务器如何收费?轻量2核2G3M云服务器88元一年、3年轻量2核2G4M带宽优惠价366.6元、轻量2核4G5M服务器166.6元一年、3年轻量2核4G…

【Javascript】ajax(阿甲克斯)

目录 什么是ajax? 同步与异步 原理 注意 写一个ajax请求 创建ajax对象 设置请求方式和地址 发送请求 设置响应HTTP请求状态变化的函数 什么是ajax? 是基于javascript的一种用于创建快速动态网页的技术,是一种在无需重新加载整个网页的情况下&#xff0c…

计算机网络【CN】IPV4报文格式

版本(4bit):IPV4/IPV6首部长度(4bit):标识首部的长度 单位是4B最小为:20B最大为:60(15*4)B总长度(16bit):整个数据报&…

vue使用smooth-signature实现移动端电子签字,包括横竖屏

vue使用smooth-signature实现移动端电子签字&#xff0c;包括横竖屏 1.使用smooth-signature npm install --save smooth-signature二.页面引入插件 import SmoothSignature from "smooth-signature";三.实现效果 四.完整代码 <template><div class&quo…

LVS集群-DR模式

概念&#xff1a; LVS-DR模式&#xff0c;也是最常用的lVS负载方式&#xff0c;DR DIRECT ROUTING 直接路由模式 负载均衡器lVS调度器&#xff0c;只负责请求和转发到后端的真实服务器&#xff0c;但是影响结果&#xff0c;由后端服务器直接转发给客户端&#xff0c;不需要经…

2023年第四届MathorCup大数据竞赛(A题)|坑洼道路检测和识别|数学建模完整代码+建模过程全解全析

当大家面临着复杂的数学建模问题时&#xff0c;你是否曾经感到茫然无措&#xff1f;作为2021年美国大学生数学建模比赛的O奖得主&#xff0c;我为大家提供了一套优秀的解题思路&#xff0c;让你轻松应对各种难题。 希望这些想法对大家的做题有一定的启发和借鉴意义。 让我们来…

LLM系列 | 22 : Code Llama实战(下篇):本地部署、量化及GPT-4对比

引言 模型简介 依赖安装 模型inference 代码补全 4-bit版模型 代码填充 指令编码 Code Llama vs ChatGPT vs GPT4 小结 引言 青山隐隐水迢迢&#xff0c;秋尽江南草未凋。 小伙伴们好&#xff0c;我是《小窗幽记机器学习》的小编&#xff1a;卖热干面的小女孩。紧接…

微信小程序设计之主体文件app-ts/js

一、新建一个项目 首先&#xff0c;下载微信小程序开发工具&#xff0c;具体下载方式可以参考文章《微信小程序开发者工具下载》。 然后&#xff0c;注册小程序账号&#xff0c;具体注册方法&#xff0c;可以参考文章《微信小程序个人账号申请和配置详细教程》。 在得到了测…

新的iLeakage攻击从Apple Safari窃取电子邮件和密码

图片 导语&#xff1a;学术研究人员开发出一种新的推测性侧信道攻击&#xff0c;名为iLeakage&#xff0c;可在所有最新的Apple设备上运行&#xff0c;并从Safari浏览器中提取敏感信息。 攻击概述 iLeakage是一种新型的推测性执行攻击&#xff0c;针对的是Apple Silicon CPU和…

简单了解一下:NodeJS的WebSocket网络编程

NodeJS的webSocket网络编程。 那什么是WebSocket呢&#xff1f;WebSocket是HTML5提供的一种浏览器和服务器进行通信的网络技术。两者之间&#xff0c;只需要做一个握手动作&#xff0c;就可以在浏览器和服务器之间开启一条通道&#xff0c;就可以进行数据相互传输。 实现WebS…

c++的4中类型转换操作符(static_cast,reinterpret_cast,dynamic_cast,const_cast),RTTI

目录 引入 介绍 static_cast 介绍 使用 reinterpret_cast 介绍 使用 const_cast 介绍 使用 dynamic_cast 介绍 使用 RTTI(运行时确定类型) 介绍 typeid运算符 dynamic_cast运算符 type_info类 引入 原本在c中,我们就已经接触到了很多类型转换 -- 隐式类型转…

线程安全问题

线程安全 简单来说&#xff0c;在多个线程访问某个方法或者对象的时候&#xff0c;不管通过任何的方式调用以及线程如何去交替执行。在程序中不做任何同步干预操作的情况下&#xff0c;这个方法或者对象的执行/修改都能按照预期的结果来反馈&#xff0c;那么这个类就是线程安全…

YOLOv7优化:感受野注意力卷积运算(RFAConv),效果秒杀CBAM和CA等 | 即插即用系列

💡💡💡本文改进:感受野注意力卷积运算(RFAConv),解决卷积块注意力模块(CBAM)和协调注意力模块(CA)只关注空间特征,不能完全解决卷积核参数共享的问题 提供多种卷积变体供使用:CBAMConv,CAMConv,CAConv,RFAConv,RFCAConv RFAConv | 亲测在多个数据集能够实现…

buuctf_练[CISCN2019 华东南赛区]Web4

[CISCN2019 华东南赛区]Web4 文章目录 [CISCN2019 华东南赛区]Web4掌握知识解题思路代码分析正式解题 关键paylaod 掌握知识 ​ 根据url地址传参结构来判断php后端还是python后端&#xff1b;uuid.getnode()函数的了解&#xff0c;可以返回主机MAC地址十六进制&#xff1b;pyt…

Golang 自定义函数库(个人笔记)

1.用字符串连接切片元素&#xff08;类似php implode&#xff09; package mainimport ("fmt""strconv""strings" )func main() {data : []int{104, 101, 108, 108, 111}fmt.Println(IntSliceToString(data, ",")) }func IntSliceToS…

汽车4S店如何在数字化管理下,提高市场竞争力

在所有人都认为疫情过后&#xff0c;经济形势会一路向阳&#xff0c;但是&#xff0c;实际情况出乎所有人的意料&#xff0c;各行各业举步维艰。 新闻爆出的各大房地产&#xff0c;恒大的2.4万亿让人瞠目结舌&#xff0c;还有碧桂园和融创&#xff0c;也是债台高筑了&#xff…

ToLua使用原生C#List和Dictionary

ToLua是使用原生C#List 介绍Lua中使用原生ListC#调用luaLua中操作打印测试如下 Lua中使用原生DictionaryC#调用luaLua中操作打印测试如下 介绍 当你用ToLua时C#和Lua之间肯定是会互相调用的&#xff0c;那么lua里面使用List和Dictionary肯定是必然的&#xff0c;在C#中可以调用…

shell的执行流控制

目录 1.for语句 2.条件语句 while...do语句 until...do 语句 if...then...elif...then...else...fi 语句 3.case语句 4.expect 5.break,continue,exit 1.for语句 作用&#xff1a;为循环执行动作 for语句结构 for //定义变量 do //使用变量&#xff0…

python+requests+unittest执行自动化接口测试!

1、安装requests、xlrd、json、unittest库 <1>pip 命令安装&#xff1a; pip install requests pip install xlrd pip install json pip install unittest <2> pycharm里安装 2、利用Page Object Model 设计理念创建六类Python Package(也可根据项目要求具体实施…

企业安全—DevSecOps概述详情

0x00 前言 SDL存在的问题在于体量过于庞大&#xff0c;不利于快速进行适配和进行&#xff0c;所以就有了DevSecOps&#xff0c;实际上是因为敏捷开发也就是DevOps的推进&#xff0c;并且坐上了云服务模式的火车&#xff0c;所以这一系列的东西都开始普及。DevSecOps作为DevOps…