SparkSQL自定义AVG强类型聚合函数与弱类型聚合函数汇总

AVG是求平均值,所以输出类型是Double类型

1)创建弱类型聚合函数类extends UserDefinedAggregateFunction

class MyAgeFunction extends UserDefinedAggregateFunction {//函数输入的数据结构,需要new一个具体的结构对象,然后添加结构override def inputSchema: StructType = {new StructType().add("age",LongType)}//计算时的数据结构override def bufferSchema: StructType = {new StructType().add("sum",LongType).add("conut",LongType)}//函数返回的数据类型override def dataType: DataType = DoubleType//表述函数是否稳定override def deterministic: Boolean = true//表述的是函数计算之前的缓冲区的初始化 buffer(0)表示第一个结构:sum, buffer(1)示第二个结构:countoverride def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0Lbuffer(1) = 0L}//根据查询结构来更新缓冲区数据sum + = input.getLong  count+=1override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1}//将多个节点的缓冲区合并override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}//计算override def evaluate(buffer: Row): Any = {buffer.getLong(0).toDouble / buffer.getLong(1)}
}

聚合函数使用

def main(args: Array[String]): Unit = {//创建配置对象val conf = new SparkConf().setAppName("Spark01_Custom").setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val rdd1 = spark.sparkContext.makeRDD(List(("chun",21),("chun1",23),("chun3",22)))//隐士转换(RDD转换DF/DS需要引入隐式转换)import spark.implicits._//  rdd转DFval frame = rdd1.toDF("name","age")//创建全局视图frame.createGlobalTempView("people")//创建聚合函数对象val udaf = new MyAgeFunction//注册聚合函数spark.udf.register("avgAge",udaf)//frame.select("age").show()//sql  这里表名要把全局名也写上spark.sql("select avgAge(age) from global_temp.people").show}

2)创建强类型聚合函数AVG(extends Aggregator[输入类型,缓冲区类型,输出类型])


//声明自定义聚合函数(强类型)
//case class Aggregator[K, V, C] (这里由三个泛型)
class MyAgeClassFuction extends Aggregator[UserBean,AvgBuffer,Double]{//初始化缓冲区override def zero: AvgBuffer = AvgBuffer(0,0)//AvgBuffer =  把输入的数据更新进缓冲区override def reduce(b: AvgBuffer, a: UserBean): AvgBuffer = {//sum和count要设置为var的b.sum += a.ageb.count += 1b}//合并缓冲区override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {b1.sum = b1.sum + b2.sumb1.count = b1.count + b2.countb1}//计算结果override def finish(reduction: AvgBuffer): Double = {reduction.sum / reduction.count}//后俩都是数据变成类型之后的转码操作//第一个是自定义的类型,就用Encoders.productoverride def bufferEncoder: Encoder[AvgBuffer] = Encoders.product//如果不是自定义类型就用Encoders.scalaBooleanoverride def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}//样例类
case class UserBean(name : String, age : Int)
case class AvgBuffer(var sum : Int, var count : Int)

使用

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Spark02_Custom2").setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val rdd = spark.sparkContext.makeRDD(List(("chun1",23),("chun2",24),("chun3",25)))import spark.implicits._rdd.toDF("name","age")//自定义强类型聚合函数val udaf = new MyAgeClassFuction//这里不能注册,加入注册了名为avgAge,使用的时候是avgAge(字段),但是传入的应该是Bean对象,所以不可以这样写//需要将聚合函数转换为查询列val avgColumn = udaf.toColumn.name("avgAge")val userRDD = rdd.map {case (name, age) => {UserBean(name, age)}}//在sql里肯定没办法用,需要使用DSL风格select函数val ds = userRDD.toDSval rdd1 = ds.rddds.show()/****结果:+-----+---+| name|age|+-----+---+|chun1| 23||chun2| 24||chun3| 25|+-----+---+**/rdd1.foreach(println)//结果://UserBean(chun1,23)//UserBean(chun3,25)//UserBean(chun2,24)spark.stop()}

可以看到强类型聚合函数输出的结果每一行都是UserBean类型的,是样例类类型,并不像弱类型一样是row

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/437628.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PHP连接sql seaver数据库

我的PHP版本7.0 通过sqlsrv系列函数,需要下载安装Microsoft Drivers for PHP for SQL Server驱动: 地址:https://msdn.microsoft.com/library/dn865013.aspx。 根据自己需求下载安装,安装地址php下ext目录下,我的是4.0…

【转】D365 FO第三方集成(四)---客户端调用

客户端调用json-based服务非常简单,就是标准的http调用。 http调用首先要解决URL的组成,D365 FO json-based调用的url组成如下: https://usnconeboxax1aos.cloud.onebox.dynamics.cn/api/services/{服务组名}/{服务名}/{方法名} 调用的代码很…

NoSql理解+传统关系型数据库ACID+Nosql的CAP+BASE的理解

1)什么是Nosql NoSQL(NoSQL Not Only SQL ),意即“不仅仅是SQL”, 泛指非关系型的数据库。随着互联网web2.0网站的兴起,传统的关系数据库在应付web2.0网站,特别是超大规模和高并发的SNS类型的web2.0纯动态网站已经显…

ztree 点击重载 layui table

ztree 点击重载 layui table <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <HTML> <HEAD><TITLE> ztree_demo </TITLE><meta http…

【转】D365 FO第三方访问https证书问题

D365FO采用https&#xff0c;第三方通过API调用的时候&#xff0c;客户端不见得信任D365FO的证书&#xff0c;调用时候会报 基础连接已关闭&#xff0c;发送时发生错误&#xff0c;调用堆栈如下&#xff1a; 1 at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int…

数据结构与算法 - 稀疏数组(理解+代码实现案例)

举例 稀疏数组第一行是原数据几行几列和几个有效数据的个数 下面的行是各个有效数组的行列与具体值 稀疏数组实现 代码实现 package DataStructures.sparsearray;/*** 二维数组转稀疏数组 与 稀疏数组转成二维数组*/ public class SparseArray {public static void main(Stri…

php检测字符长度(中文)

strlen() mb_strlen() mb_strlen并不是PHP核心函数&#xff0c;使用前需要确保在php.ini加载了php_mbstring.dll&#xff0c;并开启extensionphp_mbstring.dll $str中文asd12;echo strlen($str).<br>;//11echo mb_strlen($str,utf8).<br>;//7

【转】Magento2 数据库操作

直接操作数据库 $objectManager \Magento\Framework\App\ObjectManager::getInstance(); $resource $objectManager->get(Magento\Framework\App\ResourceConnection); $connection $resource->getConnection(); $tableName $resource->getTableName(employee); …

数据结构 - 队列(非环形队列,以及优化成环形队列)

1&#xff09;队列的定义与实现形式-方式 2&#xff09;队列实现思路&#xff08;非环形&#xff0c;下面进行优化&#xff09; 3&#xff09;代码实现&#xff08;注意并不是环形&#xff09; package DataStructures.queue;import java.util.Scanner;/*** 使用数组模拟队列*…

爬取网易云音乐歌曲特色榜单信息

网易云音乐(iframe内的歌单) 刚开始学习做下记录 需要先下载好所需浏览器内核 我时谷歌&#xff0c;下载地址 http://chromedriver.storage.googleapis.com/index.html 然后没了&#xff0c;自己F12扒拉下就行了 运行&#xff1a; 左侧随便点击一个榜单后&#xff0c;复制ur…

【转】.htaccess 详解

.htaccess是什么 .htaccess文件(或者"分布式配置文件"&#xff09;提供了针对目录改变配置的方法&#xff0c; 即&#xff0c;在一个特定的文档目录中放置一个包含一个或多个指令的文件&#xff0c; 以作用于此目录及其所有子目录。作为用户&#xff0c;所能使用的命…

数据结构 - 单链表(Linked List)实现在内存中实现数据以链表形式生成并根据序号排序

下面实现一个例子来进行学习 1&#xff09;介绍 单链表的逻辑结构 在内存中的实际结构 具体创建示意图&#xff1a; 2&#xff09;代码实现 例子 1。第一个程序在添加的时候并没有按照序号排序&#xff0c;如果在添加的时候把位置改变输出的时候序号会改变 package DataStr…

Mysql count() 语句

百万数据测试 select count(主键) from table 执行效率&#xff1a; select count(*) AS AGGREGATE from table 以上测试均再 navicat 工具进行 由于各种原因&#xff0c;sql执行时间可定存在一定误差&#xff0c;但最终结果不变。

【转】magento性能优化的教程(非常详细)

Magento是一套专业开源的电子商务系统,Magento设计得非常灵活&#xff0c;具有模块化架构体系和丰富的功能但有朋友会发现此模块用到了会发现非常的缓慢了&#xff0c;那么下面我们来看关于magento性能优化的例子。 前面优化 mod_deflate模块&#xff0c;将text、 css 和 jav…

java数据结构 -链表 -获取有效节点个数,单链表中倒数k个节点

// 1.获取到单链表的节点的个数&#xff08;如果有头结点&#xff0c;不统计头结点&#xff09;public static int getLength(HeroNode head){if (head.next null){return 0;}int length 0;//定义一个辅助变量&#xff0c;HeroNode cur head.next;while(cur !null){length;c…

php国密sm4

加解密代码摘自网络&#xff0c;出处忘记了&#xff0c;这里就不附链接&#xff0c;对原创说声抱歉&#xff01; 先附上代码&#xff1a; <?phpClass SM4Util {public $SM4_CK [0x00070e15, 0x1c232a31, 0x383f464d, 0x545b6269,0x70777e85, 0x8c939aa1, 0xa8afb6bd, 0xc…

【转】可道云kodexplorer搭建私有云后的配置优化

一、上传下载速度优化 首先明确可道云没有对上传下载做任何限制&#xff0c;速度快慢和网络环境有关。可道云是基于http上传&#xff0c;所以和其他http上传速度基本一致&#xff1b;可以对比其他web系统或网站说附件上传速度。同其他例如webdav、FTP、QQ传输等软件底层协议不一…

phpStydy+wordpress 安装部署

1、先准备工具包 下载phpstudy&#xff0c;下载地址&#xff1a;https://www.xp.cn/ 下载wordpress &#xff0c;下载地址&#xff1a;https://cn.wordpress.org/download/ 2、安装phpStudy&#xff0c;下一步操作即可 3、安装完成后&#xff0c;检测环境配置&#xff0c;php、…

java数据结构 - 单链表(腾讯面试题实现单链表反转)

直接上实现代码 //单链表的反转public static void reverseList(HeroNode head){//如果当前链表为空&#xff0c;或只有一个节点&#xff0c;无需反转if (head.next null || head.next.next null){return ;}//定义一个辅助变量&#xff0c;帮助我们遍历HeroNode cur head.n…

数据结构 - 单链表(百度面试题单链表的倒序打印)

方法1&#xff1a;反转打印&#xff08;但是会改变链表结构&#xff0c;不建议&#xff09; https://blog.csdn.net/weixin_43736084/article/details/101939789 方法2&#xff1a;存入栈中&#xff0c;在出栈 public static void reversePrint(HeroNode head){if (head.next…