Spark【Spark SQL(四)UDF函数和UDAF函数】

UDF 函数

        UDF 是我们用户可以自定义的函数,我们通过SparkSession对象来调用 udf 的 register(name:String,func(A1,A2,A3...)) 方法来注册一个我们自定义的函数。其中,name 是我们自定义的函数名称,func 是我们自定义的函数,它可以有很多个参数。

        通过 UDF 函数,我们可以针对某一列数据或者某单元格数据进行针对的处理。

案例 1

定义一个函数,给 Andy 的 name 字段的值前 + "Name: "。

def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("spark sql udf").setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()import spark.implicits._val df = spark.read.json("data/sql/people.json")df.createOrReplaceTempView("people")spark.udf.register("prefixName",(name:String)=>{if (name.equals("Andy"))"Name: " + nameelsename})spark.sql("select prefixName(name) as name,age,sex from people").show()spark.stop()}

        这里我们定义了一个自定义的 UDF 函数:prefixName,它会判断name字段的值是否为 "Andy",如果是,就会在她的值前+"Name: "。

运行结果:

+----------+---+---+
|      name|age|sex|
+----------+---+---+
|   Michael| 30| 男|
|Name: Andy| 19| 女|
|    Justin| 19| 男|
|Bernadette| 20| 女|
|  Gretchen| 23| 女|
|     David| 27| 男|
|    Joseph| 33| 女|
|     Trish| 27| 女|
|      Alex| 33| 女|
|       Ben| 25| 男|
+----------+---+---+

UDAF 函数

        强类型的DataSet和弱类型的DataFrame都提供了相关聚合函数,如count、countDistinct、avg、max、min。

        UDAF 也就是我们用户的自定义聚合函数。聚合函数就比如 avg、sum这种函数,需要先把所有数据放到一起(缓冲区),再进行统一处理的一个函数。

        实现 UDAF 函数需要有我们自定义的聚合函数的类(主要任务就是计算),我们可以继承 UserDefinedAggregateFunction,并实现里面的八种方法,来实现弱类型的聚合函数。(Spark3.0之后就不推荐使用了,更加推荐强类型的聚合函数)

        我们可以继承Aggregator来实现强类型的聚合函数。

案例1 - 平均年龄

case 类可以直接构建对象,不需要new,因为样例类可以自动生成它的伴生对象和apply方法。

弱类型实现

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, IntegerType, LongType, StructField, StructType}/*** 弱类型*/
object UDAFTest01 {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("spark sql udaf").setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()import spark.implicits._val df = spark.read.json("data/sql/people.json")df.createOrReplaceTempView("people")spark.udf.register("avgAge",new MyAvgUDAF())spark.sql("select avgAge(age) from people").show()spark.stop()}
}
class MyAvgUDAF extends UserDefinedAggregateFunction{// 输入数据的结构 INoverride def inputSchema: StructType = {StructType(Array(StructField("age",LongType)))}// 缓冲区数据的结构 BUFFERoverride def bufferSchema: StructType = {StructType(Array(StructField("total",LongType),StructField("count",LongType)))}// 函数计算结果的数据类型 OUToverride def dataType: DataType = LongType// 函数的稳定性 (传入相同的参数结果是否相同)override def deterministic: Boolean = true// 缓冲区初始化override def initialize(buffer: MutableAggregationBuffer): Unit = {//这两种写法都一样
//    buffer(0) = 0L
//    buffer(1) = 0L//第二种方法buffer.update(0,0L) //total 给缓冲区的第0个数据结构-total-初始化赋值0Lbuffer.update(1,0L) //count 给缓冲区的第1个数据结构-count-初始化赋值0L}// 数据过来之后 如何更新缓冲区override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {// 第一个参数代表缓冲区的第i个数据结构 0代表total 1代表count// 第二个参数是对第一个参数的数据结构进行重新赋值// buffer.getLong(0)是取出缓冲区第0个值-也就是total的值,给它+上输入的值中的第0个值(因为我们输入结构只有一个就是age:Long)buffer.update(0,buffer.getLong(0)+input.getLong(0))buffer.update(1,buffer.getLong(1)+1)  //count 每次数据过来+1}// 多个缓冲区数据合并override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1.update(0,buffer1.getLong(0)+buffer2.getLong(0))buffer1.update(1,buffer1.getLong(1)+buffer2.getLong(1))}// 计算结果操作override def evaluate(buffer: Row): Any = {buffer.getLong(0)/buffer.getLong(1)}
}

运行结果:

+-----------+
|avgage(age)|
+-----------+
|         25|
+-----------+

 

强类型实现

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, Row, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregator/*** 强类型*/
object UDAFTest02 {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("spark sql udaf").setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()import spark.implicits._val df = spark.read.json("data/sql/people.json")df.createOrReplaceTempView("people")spark.udf.register("avgAge",functions.udaf(new MyAvg_UDAF()))spark.sql("select avgAge(age) from people").show()spark.stop()}
}/*** 自定义聚合函数类:*  1.继承org.apache.spark.sql.expressions.Aggregator,定义泛型:*    IN  : 输入数据类型 Long*    BUF : 缓冲区数据类型*    OUT : 输出数据类型 Long*  2.重写方法*/
//样例类中的参数默认是 val 所以这里必须指定为var
case class Buff(var total: Long,var count: Long)
class MyAvg_UDAF extends Aggregator[Long,Buff,Long]{// zero: Buff zero代表这个方法是用来初始值(0值)// Buff是我们的case类 也就是说明这里是用来给 缓冲区进行初始化override def zero: Buff = {Buff(0L,0L)}// 根据输入数据更新缓冲区 要求返回-Buffoverride def reduce(buff: Buff, in: Long): Buff = {buff.total += inbuff.count += 1buff}// 合并缓冲区 同样返回buff1override def merge(buff1: Buff, buff2: Buff): Buff = {buff1.total += buff2.totalbuff1.count += buff2.countbuff1}// 计算结果override def finish(buff: Buff): Long = {buff.total/buff.count}// 网络传输需要序列化 缓冲区的编码操作 -编码override def bufferEncoder: Encoder[Buff] = Encoders.product// 输出的编码操作 -解码override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}

运行结果:

+-----------+
|avgage(age)|
+-----------+
|         25|
+-----------+

 

早期UDAF强类型聚合函数

SQL:结构化数据查询 & DSL:面向对象查询(有对象有方法,与类型相关,所以通过DSL语句结合起来使用)

早期的UDAF强类型聚合函数使用DSL操作。

定义一个case类对应数据类型,然后通过as[对象]方法将DataFrame转为DataSet类型,然后将我们的UDAF聚合类转为列对象。

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, Row, SparkSession, TypedColumn, functions}
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, IntegerType, LongType, StructField, StructType}/*** 早期的UDAF强类型聚合函数使用DSL操作*/
object UDAFTest03 {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("spark sql udaf").setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()import spark.implicits._val df = spark.read.json("data/sql/people.json")val ds: Dataset[User] = df.as[User]// 将UDAF强类型聚合函数转为查询的类对象val udafCol: TypedColumn[User, Long] = new OldAvg_UDAF().toColumnds.select(udafCol).show()spark.stop()}
}/*** 自定义聚合函数类:*  1.继承org.apache.spark.sql.expressions.Aggregator,定义泛型:*    IN  : 输入数据类型 User*    BUF : 缓冲区数据类型*    OUT : 输出数据类型 Long*  2.重写方法*/
//样例类中的参数默认是 val 所以这里必须指定为var
case class User(name: String,age: Long,sex: String)
case class Buff(var total: Long,var count: Long)
class OldAvg_UDAF extends Aggregator[User,Buff,Long]{// zero: Buff zero代表这个方法是用来初始值(0值)// Buff是我们的case类 也就是说明这里是用来给 缓冲区进行初始化override def zero: Buff = {Buff(0L,0L)}// 根据输入数据更新缓冲区 要求返回-Buffoverride def reduce(buff: Buff, in: User): Buff = {buff.total += in.agebuff.count += 1buff}// 合并缓冲区 同样返回buff1override def merge(buff1: Buff, buff2: Buff): Buff = {buff1.total += buff2.totalbuff1.count += buff2.countbuff1}// 计算结果override def finish(buff: Buff): Long = {buff.total/buff.count}// 网络传输需要序列化 缓冲区的编码操作 -编码override def bufferEncoder: Encoder[Buff] = Encoders.product// 输出的编码操作 -解码override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}

运行结果:

+------------------------------------------+
|OldAvg_UDAF(com.study.spark.core.sql.User)|
+------------------------------------------+
|                                        25|
+------------------------------------------+

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/78855.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

腾讯mini项目-【指标监控服务重构】2023-07-19

今日已办 OpenTelemetry Logs 通过日志记录 API 支持日志收集 集成现有的日志记录库和日志收集工具 Overview 日志记录 API - Logging API,允许您检测应用程序并生成结构化日志旨在与其他 telemerty data(例如metric和trace)配合使用&am…

Java代码审计16之fastjson反序列化漏洞(1)

文章目录 1、简介fastjson2、fastjson的使用2.1、将类序列化为字符串2.2、将字符串还原为对象2.3、小结以上2.4、稍微扩展思路 3、fastjson漏洞利⽤原理与dnslog4、JdbcRowSetImpl利用链4.1、JdbcRowSetImpl的基本知识4.2、利用代码复现4.3、生成poc4.4、模拟真实场景4.5、利用…

MongoDB差异数据对比的快速指南

MongoDB是一种非关系型数据库,它以灵活的 JSON-like 文档的形式存储数据,这种特性使其在处理大量数据和实现快速开发时更具有优势。而由于其灵活的数据模型和强大的性能,MongoDB 被广泛应用在各种业务场景中。随着业务的发展和数据的增长&…

kafka 3.5 生产者在把数据推送到服务端,再到落盘的过程中,怎么保证不丢失数据源码

一、生产者客户端配置参数acks说明1、acks12、acks03、acks-1 二、请求在写入Leader的数据管道之前,则会验证Leader的ISR副本数量和配置中的最小ISR数量1、Leader的ISR小于配置文件中minInSyncReplicas,并且acks-1,则抛异常2、如果acks不等于…

Excel VLOOKUP 初学者教程:通过示例学习

目录 前言 一、VLOOKUP的用法 二、应用VLOOKUP的步骤 三、VLOOKUP用于近似匹配 四、在同一个表里放置不同的VLOOKUP函数 结论 前言 Vlookup(V 代表“垂直”)是 excel 中的内置函数,允许在 excel 的不同列之间建立关系。 换句话说&#x…

iPhone苹果15手机怎么看是国行还是美版或港版的苹果iPhone15手机?

iPhone苹果手机15机型区域版本识别代码 CH代码为国行 LL代码为美版 ZP代码为港版 iPhone苹果15手机怎么看是国行还是美版或港版的苹果iPhone15手机? 1、打开苹果iPhone15手机桌面上的「设置」; 2、在iPhone苹果15手机设置内找到「通用」并点击打开&…

大型游戏动作竞技游戏开发和体感VR/AR游戏开发:创造引人入胜的虚拟世界

大型游戏动作竞技游戏和体感VR/AR游戏都代表了游戏开发领域的最新趋势。它们提供了高度沉浸式的娱乐体验,结合了视觉、听觉和体感互动。在本文中,我们将探讨如何开发这两种类型的游戏,并介绍其关键特点和开发流程。 大型游戏动作竞技游戏的特…

Spring学习 (一): IoC容器

前言 参考 廖雪峰Spring教程 一、什么是IoC容器 容器的意思可以理解为一个提供供程序正常运行,提供各种依赖的组件的包的环境。 IoC,控制反转,实际上就是将原本由代码编写者控制的各个对象(组件)的生命周期托管给底…

Java手写HashMap及拓展实践

Java手写HashMap 思维导图 #mermaid-svg-liNfjvnThNZyNIWd {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-liNfjvnThNZyNIWd .error-icon{fill:#552222;}#mermaid-svg-liNfjvnThNZyNIWd .error-text{fill:#552222;…

【OJ比赛日历】快周末了,不来一场比赛吗? #09.16-09.22 #12场

CompHub[1] 实时聚合多平台的数据类(Kaggle、天池…)和OJ类(Leetcode、牛客…)比赛。本账号会推送最新的比赛消息,欢迎关注! 以下信息仅供参考,以比赛官网为准 目录 2023-09-16(周六) #3场比赛2023-09-17…

【FPGA项目】进阶版沙盘演练——报文收发(报文处理、CDC、CRC)

前言 书接上文【FPGA项目】沙盘演练——基础版报文收发_子墨祭的博客-CSDN博客,前面我们做了基础版的报文收发,相信对逻辑设计有了一定的认知,在此基础上,继续完善一个实际报文收发可能会遇到的一些处理: 报文处理握手…

公交查询系统

目录 需求分析 1 概述 2 课题分析 3 实现功能步骤 4 项目背景 概要设计 1 系统流程图. 2 功能模块. 3 各功能模块 4 数据存储 5 类设计 三、详细设计 1公交线路查询系统用户界面 2公交信息存储模快 3公交信息查询模块 4用户信息输入和输出模块 四、调试分析 五、使用说明 六、…

STM32外部复位IC与看门狗冲突,无法复位问题解决方案

使用STM32H743制作了一款飞控,外部复位IC采用MAX809STR,打板完后,烧录飞控固件后大量板子无法正常启动,怀疑是晶振没有起振或MCU未焊接好,检查后均焊接正常,编写裸机LED定时闪烁验证程序可正常运行。经网上查询资料锁定…

Python 环境搭建,集成开发环境IDE: PyCharm

Python 环境搭建,集成开发环境IDE: PyCharm 一、Python 环境搭建二、Python下载三、Python安装四、环境变量配置五、Python 环境变量六、运行Python1、交互式解释器:2、命令行脚本3、集成开发环境(IDE:Integrated Development Environment&am…

Hadoop-Hive

1. hive安装部署 2. hive基础 3. hive高级查询 4. Hive函数及性能优化 1.hive安装部署 解压tar -xvf ./apache-hive-3.1.2-bin.tar.gz -C /opt/soft/ 改名mv apache-hive-3.1.2-bin/ hive312 配置环境变量:vim /etc/profile #hive export HIVE_HOME/opt/soft/hive…

软件测试的基本流程是什么?软件测试流程详细介绍

软件测试和软件开发一样,是一个比较复杂的工作过程,如果无章法可循,随意进行测试势必会造成测试工作的混乱。为了使测试工作标准化、规范化,并且快速、高效、高质量地完成测试工作,需要制订完整且具体的测试流程。 01…

JavaScript的DOM操作(二)

一、元素的特性attribute 1.元素的属性和特性 前面我们已经学习了如何获取节点,以及节点通常所包含的属性,接下来我们来仔细研究元素Element。 我们知道,一个元素除了有开始标签、结束标签、内容之外,还有很多的属性&#xff0…

Flutter 使用pageview无缝隙自动轮播教程

导入要使用的轮播图片 List<String> imagesa ["assets/images/car_qidian.jpg","assets/images/car_bg.jpg","assets/images/car_bg.jpg","assets/images/car_bg.jpg","assets/images/car_bg.jpg","assets/imag…

【算法与数据结构】450、LeetCode删除二叉搜索树中的节点

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;本题首先要分析删除节点的五种情况&#xff1a; 1、没有找到节点2、找到节点 左右子树为空左子树为空…

docker容器管理-实操命令

本单元主要是在docker镜像管理下进一步的培训学习文档。 docker镜像管理-实操_忍冬行者的博客-CSDN博客 四.容器管理 1.运行一个容器 docker container run --name c1 -it nginx:latest /bin/sh 2.后台运行一个容器 docker container run --name c1 -it -d nginx:latest 3.查…