SparkSql---用户自定义函数UDFUDAF

文章目录

  • 1.UDF
  • 2.UDAF
    • 2.1 UDF函数实现原理
    • 2.2需求:计算用户平均年龄
      • 2.2.1 使用RDD实现
      • 2.2.2 使用UDAF弱类型实现
      • 2.2.3 使用UDAF强类型实现

1.UDF

用户可以通过 spark.udf 功能添加自定义函数,实现自定义功能。

如:实现需求在用户name前加上"Name:"字符串,并打印在控制台

  def main(args: Array[String]): Unit = {//创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")//创建 SparkSession 对象val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()import sc.implicits._//创建DataFrameval dataRDD: RDD[(String,Int)] = sc.sparkContext.makeRDD(List(("zhangsan",21),("lisi",24)))val dataframe = dataRDD.toDF("name","age")//注册udf函数sc.udf.register("addName",(x:String)=>"Name:"+x)//创建临时视图dataframe.createOrReplaceTempView("people")//对临时视图使用udf函数sc.sql("select addName(name) from people").show()sc.stop()}

在这里插入图片描述

2.UDAF

强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。**通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。**从 Spark3.0 版本后,UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数Aggregator。

2.1 UDF函数实现原理

在这里插入图片描述
在Spark中,UDF(用户自定义函数)在对表中的数据进行处理时,通常会将数据放入缓冲区中以便进行计算。这种缓冲策略可以提高数据处理的效率,特别是对于大数据集。

2.2需求:计算用户平均年龄

2.2.1 使用RDD实现

    val dataRDD: RDD[(String,Int)] = sc.sparkContext.makeRDD(List(("zhangsan",21),("lisi",24),("wangwu",26)))val reduceResult: (Int, Int) = dataRDD.map({case (name, age) => {(age, 1)}}).reduce((t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2)})println(reduceResult._1/reduceResult._2)

在这里插入图片描述

2.2.2 使用UDAF弱类型实现

需要用户自定义类实现UserDefinedAggregateFunction,并重写其中的方法,当前已不推荐使用。

package bigdata.wordcount.udfimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.util.AccumulatorV2/*** 用户自定义函数*/
object UDF_Demo02 {def main(args: Array[String]): Unit = {//创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")//创建 SparkSession 对象val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()import sc.implicits._val dataRDD: RDD[(String, Int)] = sc.sparkContext.makeRDD(List(("zhangsan", 19), ("lisi", 21), ("wangwu", 22)))val dataFrame: DataFrame = dataRDD.toDF("name","age")dataFrame.createOrReplaceTempView("user")//创建聚合函数var myAvg=new MyAverageUDAF()//在Spark中注册自定义的聚合函数sc.udf.register("avgMy",myAvg)sc.sql("select avgMy(age) from user").show()sc.stop()}case class User(var name:String,var age:Int)}class MyAverageUDAF extends UserDefinedAggregateFunction{//输入的要进行聚合的参数的类型override def inputSchema: StructType = StructType(Array(StructField("age",IntegerType)))//聚合函数缓冲区中的值的数据类型override def bufferSchema: StructType = StructType(Array(StructField("sum",LongType),StructField("count",LongType)))//函数返回的值的数据类型override def dataType: DataType = DoubleType//判断函数的稳定性//对于相同类型的输入是否有相同类型的输出override def deterministic: Boolean = true//聚合函数缓冲区中值的初始化//因为数据是弱类型的,函数缓冲区中是根据索引来找到对应的变量override def initialize(buffer: MutableAggregationBuffer): Unit = {//年龄的总和buffer(0)=0L//年龄的个数buffer(1)=0L}//更新缓冲区中的数据(执行操作步骤)override def update(buffer: MutableAggregationBuffer, input: Row): Unit ={//第0个索引值是否为空if(!input.isNullAt(0)) {//更新年龄sum的值buffer(0)=buffer.getLong(0)+input.getInt(0)//更新年龄个数buffer(1)=buffer.getLong(1)+1;}}//合并缓冲区override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)}//计算最终结果override def evaluate(buffer: Row): Double = {buffer.getLong(0).toDouble / buffer.getLong(1)}
}

在这里插入图片描述

2.2.3 使用UDAF强类型实现

Spark3.0 版本可以采用强类型的 Aggregator 方式代替 UserDefinedAggregateFunction

package bigdata.wordcount.udfimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession, TypedColumn}
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.util.AccumulatorV2/*** 用户自定义函数*/
object UDF_Demo03 {def main(args: Array[String]): Unit = {//创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")//创建 SparkSession 对象val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()import sc.implicits._val dataRDD: RDD[(String, Int)] = sc.sparkContext.makeRDD(List(("zhangsan", 19), ("lisi", 21), ("wangwu", 22)))val dataFrame: DataFrame = dataRDD.toDF("name","age")val dataset: Dataset[User01] = dataFrame.as[User01]//创建聚合函数var myAvg=new MyAverageUDAF01()//将聚合函数转换为查询的列val col: TypedColumn[User01, Double] = myAvg.toColumn//执行查询操作dataset.select(col).show()sc.stop()}case class User(var name:String,var age:Int)}//输入数据类型
case class User01(var name:String,var age:Int)
//缓存中的数据类型
case class AgeBuffer(var sum:Long,var count:Long)class MyAverageUDAF01 extends Aggregator[User01,AgeBuffer,Double]{//设置初始值override def zero: AgeBuffer = {AgeBuffer(0L,0L)}//缓冲区实现聚合override def reduce(b: AgeBuffer, a: User01): AgeBuffer = {b.sum = b.sum + a.ageb.count = b.count + 1b}//合并缓冲区override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = {b1.sum+=b2.sumb1.count+=b2.countb1}//计算最终结果override def finish(buff: AgeBuffer): Double = {buff.sum.toDouble/buff.count}//设置编码器和解码器//自定义类型就是 product 自带类型根据类型选择override def bufferEncoder: Encoder[AgeBuffer] = {Encoders.product}override def outputEncoder: Encoder[Double] = {Encoders.scalaDouble}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/655079.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

83、评估权值预加载带来的性能提升

上两节介绍的权值预加载技术,在很多业务中是真实存在的。 只不过限于本小册内容以及大部分同学硬件设备的局限,这里无法通过真正的推理框架(比如 tvm, pytorch) GPU 来实现底层细节,这些推理框架中一般会将类似的优化技术封装起来。 作为 …

APPium简介及安装

1 APPium简介 1. 什么是APPium? APPium是一个开源测试自动化框架,适用于原生、混合或移动Web应用程序的自动化测试工具。 APPium使用WebDriver协议驱动iOS、Android等应用程序。 2. APPium的特点 支持多平台(Android、iOS等) …

uniapp组件库Card 卡片 的使用方法

目录 #平台差异说明 #基本使用 #配置卡片间距 #配置卡片左上角的缩略图 #配置卡片边框 #设置内边距 #API #Props #Slot #Event 卡片组件一般用于多个列表条目,且风格统一的场景。 #平台差异说明 AppH5微信小程序支付宝小程序百度小程序头条小程序QQ小程…

基于STM32的以太网通信协议选择与实现

在基于STM32的以太网通信中,主要涉及到选择合适的通信协议和实现对应的功能代码。常见的通信协议包括TCP/IP、UDP、HTTP等,选择合适的协议取决于具体应用需求。以下将介绍在STM32上进行以太网通信时,常用的通信协议选择以及对应功能代码的实现…

CSDN·COC城市开发者组织2023年度聚会上海站纪实

目录 前言活动宣传活动议程活动总结结束语 前言 2023年是CSDNCOC成立之年,经过一年的不断发展和壮大,COC上海城市开发者社区的队伍不断壮大,本着每个月都有线下活动的原则,先后举办大大小小的线下沙龙活动20余场。与此同时&…

trucksim三车队列仿真 matlab一直闪退问题

Trucksim2019 matlab版本2023a 基本框架应该都清楚 下图是主界面 overlay videos and polots with other runs 模块是 关联另一个车辆模型 核心是下图标红是核心 Number of vehicle codes 选择ALL 不要选

世微AP5151芯片低压差 线性降压 恒流驱动IC 台灯 指示灯 矿灯

概述 AP5151 是一种低压差、线性降压、 固定输出电流的 LED 恒流驱动器。 除 LED 外,AP5151 无需外接其它元 器件即可构成一个恒流输出的 LED 驱动 电路。 AP5151 内置过热保护功能,可有效 保护芯片,避免结温超过 120oC 时因过热 而造成损坏…

【图形学】双三次贝塞尔曲线绘制方法

双三次贝塞尔曲线的定义 双三次贝塞尔曲面是由16个控制点定义的曲面,通常表示为4x4矩阵。 曲面的公式如下: p ( u , v ) ∑ i 0 3 ∑ j 0 3 P i , j B i , 3 ( u ) B j , 3 ( v ) , ( u , v ) ∈ [ 0 , 1 ] [ 0 , 1 ] p(u,v)\sum_{i0}^3\sum_{j0}…

【程序员英语】【美语从头学】初级篇(入门)(笔记)Lesson10(电话会话Ⅱ)

《美语从头学初级入门篇》 注意:被 删除线 划掉的不一定不正确,只是不是标准答案。 文章目录 Lesson 10 Telephone Conversation Ⅱ 电话会话(二)会话A会话B笔记I would like to do(Id like to to do)我想…

如何从视频号中提取视频的文案?抖音文案也可提取

最近不少小伙伴刷抖音看看视频一个一个的进行抄,太慢了。​今天分享一个工具搞定怎么提取视频文案的方法! 提取文案小助手 提取文案小助手是专注视频创作者的利器,帮助用户一键提取视频号中的文案,获取的方式比较简单&#xff0c…

Sui TVL跻身全链前十名|全面详解Sui原生功能如何促使DeFi蓬勃发展

根据DefiLlama数据显示,1月28日Sui TVL超过4.02亿美元,跻身所有区块链TVL前十名! Sui的创新应用原生功能为DeFi应用铺平了道路,为用户提供了良好的体验。每个原生功能,从促进高效的共享流动性到简化用户互动&#xff0…

js中contextmenu、click和mousedown的区别在哪?

contextmenu、click和mousedown的定义 contextmenu:在鼠标右键点击时触发。可以在此事件中执行特定的操作,比如显示自定义的右键菜单或者阻止浏览器默认的上下文菜单弹窗。它是为了满足特定场景下对右键点击的定制化处理需求。 click事件是通用的鼠标点…

印章制作办法

印章制作办法: 最近在做单位技术支持的培训,需要增加个培训章,自己内容完成即可。 一、excel表格中 ,插入-形状-圆(按住shift按键) 操作三步: 1.填充 无 : 2.形状轮廓 ---“红色…

代码随想录算法训练营day15|104.二叉树的最大深度、111.二叉树的最小深度、222.完全二叉树的节点个数

104.二叉树的最大深度 559.n叉树的最大深度 111.二叉树的最小深度 222.完全二叉树的节点个数 104.二叉树的最大深度 (优先掌握递归) 什么是深度,什么是高度,如何求深度,如何求高度,这里有关系到二叉树的遍…

数据结构—栈实现后缀表达式的计算

后缀表达式计算 过程分析 中缀表达式 (15)*3 > 后缀表达式 153* (可参考这篇文章:中缀转后缀) 第一步:我们从左至右扫描 后缀表达式(已经存放在一个字符数组中),遇到第一个数字字符 ‘1’ 放入栈中第二步&#xf…

幻兽帕鲁服务器出租,腾讯云PK阿里云怎么收费?

幻兽帕鲁服务器价格多少钱?4核16G服务器Palworld官方推荐配置,阿里云4核16G服务器32元1个月、96元3个月,腾讯云换手帕服务器服务器4核16G14M带宽66元一个月、277元3个月,8核32G22M配置115元1个月、345元3个月,16核64G3…

存储技术架构演进

一. 演进过程 存储技术架构的演进主要是从集中式到分布式的一种呈现,集中式存储模式凭借其在稳定性和可靠性方面的优势成为许多业务数据库的数据存储首选,顾名思义,集中式存储主要体现在集中性,一套集中式管理的存储系统&#xff…

python:socket基础操作(4)-《tcp客户端基础》

tcp就和udp不一样了,tcp是客户端和服务器端,如果想通过tcp发送数据,要先让tcp进行连接服务器端 tcp客户端 先让服务器端进行启动 import socketdef main():# 创建套接字tcp_client_socket socket.socket(socket.AF_INET,socket.SOCK_STREAM…

RSTP的P/A机制

如图所示根桥S1和S2之间新添加了一条链路,在当前状态下S2的另外几个端口p2是Alternate端口,p3是指定端口且处于Forwarding状态,p4是边缘端口。新链路连接成功后,P/A机制协商过程如下。 1.P0和P1两个端口马上都先成为指定端口发送RS TBPDU。 2.S2的P1口收到更优的RST BPD…

Google Chrome 中出现 ERR_SSL_KEY_USAGE_INCOMPATIBLE 错误

证书的方式发生了变化,出现了这个新错误,导致我无法浏览该网站。 可以右键属性获取位置 关闭导航器chrome并转到文件夹,找到Local State文件并删除 执行指令结束进程,重新打开浏览器即可 taskkill /im "chrome.exe"…