【Spark精讲】一文讲透SparkSQL聚合过程以及UDAF开发

SparkSQL聚合过程

这里的 Partial 方式表示聚合函数的模式,能够支持预先局部聚合,这方面的内容会在下一节详细介绍。 对应实例中的聚合语句,因为 count 函数支持 Partial 方式,因此调用的是 planAggregateWithoutDistinct 方法,生成了图 7.4 中的两个 HashAggregate (聚合执行方式中的一种,后续详细介绍)物理算子树节点,分别进行局部聚合与最终的聚合。 最后,在生成的 SparkPlan 中添加 Exchange 节点,统一排序与分区信息,生成物理执行计划(ExecutedPlan)。

聚合查询在计算聚合值的过程中,通常都需要保存相关的中间计算结果,例如 max 函数需要保存当前最大值, count 函数需要保存当前的数据总数,求平均值的 avg 函数需要同时保存 count 和 sum 的值,更复杂的函数(如 pencent让等)甚至需要临时存储全部的数据 。 聚合查询 计算过程中产生的这些中间结果会临时保存在聚合函数缓冲区。

在 SparkSQL 中,聚合过程有 4种模式,分别是 Partial模式、 ParitialMerge模式、 Final模式 和 Complete模式。

Final模式一般和 Partial模式组合在一起使用。 Partial模式可以看作是局部数据的聚合,在 具体实现中, Partial 模式的聚合函数在执行时会根据读入的原始数据更新对应的聚合缓冲区, 当处理完所有的输入数据后,返回的是聚合缓冲区中的中间数据 。 而 Final模式所起到的作用 是将聚合缓冲区的数据进行合并,然后返回最终的结果。 如下图所示,在最终分组计算总和 之前,可以先进行局部聚合处理,这样能够避免数据传输并减少计算量 。 因此,上述聚合过程 中在 map 阶段的 sum 函数处于 Partial模式,在 reduce 阶段的 sum 函数处于 Final模式 。

Complete模式和上述的 Partial/Final组合方式不一样,不进行局部聚合计算。 下图展示了同样的聚合函数采用 Complete模式的情形。 可以看到,最终阶段直接针对原始输入,中间没有局部聚合过程。 一般来讲, Complete模式应用在不支持Partial模式的聚合函数中。

相比 Partial、 Final和 Complete模式, PartialMerge模式的聚合函数主要是对聚合缓冲区进行合并,但此时仍然不是最终的结果。 ParitialMerge主要应用在 distinct语句中,如下图所示。 聚合语句针对同一张表进行 sum 和 count (distinct)查询,最终的执行过程包含了 4 步聚合操作。 第1步按照(A,C)分组,对 sum函数进行 Partial模式聚合计算;第2步是 PartialMerge模式,对上一步计算之后的聚合缓冲区进行合井,但此时仍然不是最终的结果;第3步分组的列 发生变化,再一次进行 Partial模式的 count计算;第4步完成 Final模式的最终计算。

Hive on Spark与SparkSQL的区别

Hive on Spark是由Cloudera发起,由Intel、MapR等公司共同参与的开源项目,其目的是把Spark作为Hive的一个计算引擎,将Hive的查询作为Spark的任务提交到Spark集群上进行计算。通过该项目,可以提高Hive查询的性能,同时为已经部署了Hive或者Spark的用户提供了更加灵活的选择,从而进一步提高Hive和Spark的普及率。

Hive on Spark是从Hive on MapReduce演进而来,Hive的整体解决方案很不错,但是从查询提交到结果返回需要相当长的时间,查询耗时太长,这个主要原因就是由于Hive原生是基于MapReduce的,那么如果我们不生成MapReduce Job,而是生成Spark Job,就可以充分利用Spark的快速执行能力来缩短HiveQL的响应时间。

Hive on Spark现在是Hive组件(从Hive1.1 release之后)的一部分。

SparkSQL和Hive On Spark都是在Spark上实现SQL的解决方案。Spark早先有Shark项目用来实现SQL层,不过后来推翻重做了,就变成了SparkSQL。这是Spark官方Databricks的项目,Spark项目本身主推的SQL实现。Hive On Spark比SparkSQL稍晚。Hive原本是没有很好支持MapReduce之外的引擎的,而Hive On Tez项目让Hive得以支持和Spark近似的Planning结构(非MapReduce的DAG)。所以在此基础上,Cloudera主导启动了Hive On Spark。这个项目得到了IBM,Intel和MapR的支持(但是没有Databricks)。

结构上Hive On Spark和SparkSQL都是一个翻译层,把一个SQL翻译成分布式可执行的Spark程序。比如一个SQL:

SELECT item_type, sum(price)
FROM item
GROUP item_type;

上面这个SQL脚本交给Hive或者类似的SQL引擎,它会“告诉”计算引擎做如下两个步骤:读取item表,抽出item_type,price这两个字段;对price计算初始的SUM(其实就是每个单独的price作为自己的SUM)因为GROUP BY说需要根据item_type分组,所以设定shuffle的key为item_type从第一组节点分组后分发给聚合节点,让相同的item_type汇总到同一个聚合节点,然后这些节点把每个组的Partial Sum再加在一起,就得到了最后结果。不管是Hive还是SparkSQL大致上都是做了上面这样的工作。

需要理解的是,Hive和SparkSQL都不负责计算,它们只是告诉Spark,你需要这样算那样算,但是本身并不直接参与计算。

Spark UDAF开发

分两种

  1. 无泛型约束的UDAF  extends UserDefinedAggregateFunction  extends Aggregator  dataframe设计的
  2. 有泛型约束的UDAF  extends Aggregator 该UDAF时允许添加泛型,保障函数更加安全。但是这种UDAF不可直接在SQL中被调用运算适用于强类型Datasets。

在Spark中使用
    1.编写UDAF<两种类型的UDAF都可以>
    2. 在spark中注册UDAF,为其绑定一个名字,使用

在Spark SQL 中使用
   1.编写UDAF<使用继承 UserDefinedAggregateFunction 类型编写>
   2. 打Jar包,并上传
   3. 注册临时聚合函数,并使用

ADD  jar TestSpark.jar;
CREATE  TEMPORARY FUNCTION  mean_my AS  'com.test.structure.udaf.MeanMy';
select t1.data,mean_my(t1.age)
from (
select 33 as age,  '1' as data 
union all 
select 55  as age, '1' as data 
union all 
select 66 as age, '2' as data
)t1
group by   t1.data;

自定义UDAF类

import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;public class MeanFloatUDAF extends UserDefinedAggregateFunction {/*** 聚合函数的输入数据结构*   函数的参数列表,不过需要写成StructType的格式*/@Overridepublic StructType inputSchema() {List<StructField> structFields = new ArrayList<>();structFields.add(DataTypes.createStructField( "field_nm", DataTypes.DoubleType, true ));return DataTypes.createStructType( structFields );
}/*** 聚缓存区数据结构 - 产生中间结果的数据类型* 如果是求平均数,存储总和以及计数,总和及计数就是中间结果* count    buffer.getInt(0)* sum_field   buffer.getDouble(1)*/
@Override
public StructType bufferSchema() {List<StructField> structFields = new ArrayList<>();structFields.add(DataTypes.createStructField( "count", DataTypes.IntegerType, true ));structFields.add(DataTypes.createStructField( "sum_field", DataTypes.DoubleType, true ));return DataTypes.createStructType( structFields );
}/*** 聚合函数返回值数据结构*/
@Override
public DataType dataType() {return DataTypes.DoubleType;
}/*** 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出*/
@Override
public boolean deterministic() {return true;
}/*** 初始化缓冲区* buffer是中间结果,是Row类的子类*/
@Override
public void initialize(MutableAggregationBuffer buffer) {//相加的初始值,这里的要和上边的中间结果的类型和位置相对应 - buffer.getInt(0)buffer.update(0,0);//参与运算数字个数的初始值buffer.update(1,Double.valueOf(0.0) );
}/***  给聚合函数传入一条新数据进行处理*  //每有一条数据参与运算就更新一下中间结果(update相当于在每一个分区中的计算)*  buffer里面存放着累计的执行结果,input是当前的执行结果*/
@Override
public void update(MutableAggregationBuffer buffer, Row input) {//个数加1buffer.update(0,buffer.getInt(0)+1);//每有一个数字参与运算就进行相加(包含中间结果)buffer.update(1,buffer.getDouble(1)+Double.valueOf(input.getDouble(0)));
}/***  合并聚合函数缓冲区   //全局聚合*/
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0,buffer1.getInt(0)+buffer2.getInt(0));buffer1.update(1,buffer1.getDouble(1)+buffer2.getDouble(1));
}/*** 计算最终结果*/
@Override
public Object evaluate(Row buffer) {return buffer.getDouble(1)/buffer.getInt(0);
}

使用自定义UDAF

//在Spark中使用  extends UserDefinedAggregateFunction类型的UDAF的使用import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;public class MeanUDAFMain {
public static void main(String[] args){try {SparkSession spark = SparkSession.builder().appName("Java Spark SQL data sources example").config("spark.some.config.option", "some-value").master("local[2]").getOrCreate();List<Row> dataExample = Arrays.asList(RowFactory.create( "2019-0801", 4,9.2),RowFactory.create( "2020-0802", 3,8.6),RowFactory.create( "2021-0803",2,5.5),RowFactory.create( "2021-0803",2,5.5),RowFactory.create( "2021-0803",7,4.5));StructType schema = new StructType(new StructField[]{new StructField("date", DataTypes.StringType, false, Metadata.empty()),new StructField("dist_mem", DataTypes.IntegerType, false, Metadata.empty()),new StructField("dm_mem", DataTypes.DoubleType, false, Metadata.empty())});Dataset<Row> itemsDF = spark.createDataFrame(dataExample, schema);itemsDF.printSchema();itemsDF.createOrReplaceTempView("test_mean_table");// 注册自定义聚合函数 -2. 在spark中注册UDAF,为其绑定一个名字spark.udf().register("mymean",new MeanFloatUDAF ());spark.sql("select dist_mem  from test_mean_table").show();spark.sql("select date,mymean(dm_mem) memdoubleMean from test_mean_table group by date").show();} catch (Exception e) {e.printStackTrace();}
}
}

Hive UDAF开发

UDF、UDAF、UDTF需要实现的方法

类型方法
UDF

类:

GenericUDF


包路径:
org.apache.hadoop.hive.ql.udf.generic

initialize:类型检查,返回结果类型
入参:ObjectInspector[]
出参:ObjectInspector
 

evaluate:功能逻辑实现

入参:DeferredObject[]

出参:Object

getDisplayString:函数名称
入参:String[]

出参:String
 

close:关闭函数,释放资源等
入参:无

出参:void

UDTF

类:
GenericUDTF

包路径:
org.apache.hadoop.hive.ql.udf.generic

initialize:类型检查,返回结果类型
入参:StructObjectInspector
出参:StructObjectInspector

process:功能逻辑实现
**调用forward输出一行数据,可多次调用

入参:Object[]

出参:void

close:关闭函数,释放资源等
入参:无

出参:void

UDAF

类:
AbstractGenericUDAFResolver

包路径:
org.apache.hadoop.hive.ql.udf.generic

类:
GenericUDAFEvaluator

包路径:
org.apache.hadoop.hive.ql.udf.generic
 


类:

AbstractAggregationBuffer

包路径:
org.apache.hadoop.hive.ql.udf.generic

-----AbstractGenericUDAFResolver-----

getEvaluator:获取计算器
入参:TypeInfo[]
出参:GenericUDAFEvaluator

---------GenericUDAFEvaluator----------

init:
入参:Mode,ObjectInspector[]
出参:ObjectInspector

getNewAggregationBuffer:

入参:无

出参:AggregationBuffer

reset:

入参:AggregationBuffer

出参:void

iterate:

入参:AggregationBuffer,Object[]

出参:void

merge:

入参:AggregationBuffer,Object

出参:void


terminate:

入参:AggregationBuffer

出参:Object

terminatePartial:

入参:AggregationBuffer

出参:Object

--------AbstractAggregationBuffer-------
estimate:评估内存占用大小

入参:无

出参:int

UDAF说明

一个Buffer作为中间处理数据的缓冲:获取getNewAggregationBuffer、重置reset
四个模式(Mode):

  1. PARTIAL1:
    from original data to partial aggregation data:
    iterate() and terminatePartial() will be called.
  2. PARTIAL2:
    from partial aggregation data to partial aggregation data:
    merge() and terminatePartial() will be called.
  3. FINAL:
    from partial aggregation to full aggregation:
    merge() and terminate() will be called.
  4. COMPLETE:
    from original data directly to full aggregation:
    iterate() and terminate() will be called.

五个方法

  1. 初始化init
  2. 遍历iterate:PARTIAL1和COMPLETE阶段
  3. 合并merge:PARTIAL2和FINAL阶段
  4. 终止terminatePartial:PARTIAL1和PARTIAL2阶段
  5. terminate:COMPLETE和FINAL阶段

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/586653.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

conda环境下nvrtc: error: invalid value for --gpu-architecture解决方法

1 问题描述 在运行视频处理的模型过程中&#xff0c;出现如下异常&#xff1a; nvrtc: error: invalid value for --gpu-architecture (-arch)nvrtc compilation failed: #define NAN __int_as_float(0x7fffffff) #define POS_INFINITY __int_as_float(0x7f800000) #define N…

用python画最简单的图案,用python画小猫简单代码

本篇文章给大家谈谈用python画小猫简单100行代码&#xff0c;以及用python画最简单的图案&#xff0c;希望对各位有所帮助&#xff0c;不要忘了收藏本站喔。 Source code download: 本文相关源码 from turtle import * #两个函数用于画心 defcurvemove():for i in range(200): …

AI绘画工具Midjourney绘画提示词Prompt分享

一、Midjourney绘画工具 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭…

Chapter 7 - 8. Congestion Management in Ethernet Storage Networks以太网存储网络的拥塞管理

Stomped CRC Counters Stomped CRC counters help in finding the location of bit errors in a network that uses cut-through switches. More precisely, these counters help in finding where bit errors do not exist. Stomped CRC 计数器有助于在使用直通式交换机的网络…

数据的复制

基本概念 数据的复制指的是通过网络链接的多台机器保留相同的副本 为什么要进行数据的复制 使得用户和数据在地理上比较接近&#xff0c;因为大数据要求我们将计算安排在数据存放的位置和我们基本的内存模型不是很一样 &#xff0c;比如磁盘调入内存之类的。即使系统的一部分…

将本地工作空间robot_ws上传到gitee仓库

git config --global user.name "geniusChinaHN" git config --global user.email "12705243geniuschinahnuser.noreply.gitee.com" cd ~/robot_ws #git init#创建原始仓库时候用 git add . git commit -m "上传文件内容描述" #git remote add r…

EOS链Ubuntu环境Install Prebuilt Binaries(安装预构建的二进制文件)的安装

[TOC](EOS链Ubuntu环境Install Prebuilt Binaries(安装预构建的二进制文件)的安装) EOS官网&#xff1a;https://eos.io/ 第一步 Ubuntu安装命令&#xff1a; 以下有两种安装方式&#xff0c;可以任选其一&#xff1a; 本文章已经上传绑定资源&#xff0c;也可以用命令安装。…

学生数据可视化与分析工具 vue3+flask实现

目录 一、技术栈亮点 二、功能特点 三、应用场景 四、结语 学生数据可视化与分析工具介绍 在当今的教育领域&#xff0c;数据驱动的决策正变得越来越重要。为了满足学校、教师和学生对于数据深度洞察的需求&#xff0c;我们推出了一款基于Vue3和Flask编写的学生数据可视化…

【PyQt】(自定义类)QIcon派生,更易用的纯色Icon

嫌Qt自带的icon太丑&#xff0c;自己写了一个&#xff0c;主要用于纯色图标的自由改色。 当然&#xff0c;图标素材得网上找。 Qt原生图标与现代图标对比&#xff1a; 没有对比就没有伤害 Qt图标 网络素材图标 自定义类XJQ_Icon&#xff1a; from PyQt5.QtGui import QIc…

LeetCode---377周赛---Floyd算法+字典树

题目列表 2974. 最小数字游戏 2975. 移除栅栏得到的正方形田地的最大面积 2976. 转换字符串的最小成本 I 2977. 转换字符串的最小成本 II 一、最小数字游戏 这题看懂题意就好&#xff0c;可以结合示例模拟一下&#xff0c;你就会发现规律&#xff0c;本质就是将数组排序&a…

后端程序员React初接触1

后端程序员React初接触 学习react基础与相关库的使用学习 包括react基础 路由 组件库等等 react是用于构建用户界面的JavaScript库 发送请求获取数据处理数据操作dom呈现页面&#xff08;react帮忙操作dom&#xff09; 数据渲染为视图 有facebook打造并开源 解决的问题 dom操…

Java 动态树的实现思路分析

Java 动态树的实现 目录概述需求&#xff1a; 设计思路实现思路分析1. 简单Java实现&#xff1a;2.建立父子表存储3.前端的对应的json 字符串方式 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0…

力扣:63. 不同路径 II(动态规划)

题目&#xff1a; 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish”&#xff09;。 现在考虑网格中有障碍物。那…

SVN下载安装(服务器与客户端)

1.下载 服务器下载&#xff1a;Download | VisualSVN Server 客户端下载&#xff1a;自行查找 2. 服务器安装 双击执行 运行 下一步 同意下一步 下一步 选中安装目录 3. 客户端安装 双击执行 下一步 4. 服务器创建仓库 5. 服务器创建用户 6. 客户端获取资源 文件夹右键

用idea跑起十多年前的项目

一、eclipse的项目 先删掉一些eclipse的配置文件 二、在idea中导入项目 1、导入成功后&#xff0c;先【锤一下】 2、然后发现编译不通过&#xff0c;非常多的报错信息&#xff0c;逐一解决报错 &#xff08;1&#xff09;tomcat7配置报错 &#xff08;2&#xff09;先删除tom…

java springboot将接口查询数据放在系统中 一小时系统更新一次 避免用户访问接口查询数据库缓慢

真到了公司 很多数据库表 特别是常用的功能业务对应的 都是几百万条起步的数据 查询会比较缓慢 那么 我们就可以不用每次都真的查询数据库 例如 我这里有一个接口 通过 封装的 IBookService.list 函数去查询数据库 接口返回是这样的 我们先在启动类 条件装配上 这个接口所在的…

vivado CDC约束-“设置总线倾斜”对话框

“设置总线倾斜”对话框 在AMD Vivado™ IDE中&#xff0c;可以通过多种方式设置总线偏斜约束&#xff1a; •通过时间约束编辑器。选择窗口 → 时间限制 → 断言 → 设置总线倾斜。从“时序约束编辑器”中&#xff0c;可以添加、删除或修改总线扭曲约束。 注意&#…

day11--java高级编程:反射

4 Day18–反射 本章专题与脉络 1. 反射(Reflection)的概念 1.1 反射的出现背景 Java程序中&#xff0c;所有的对象都有两种类型&#xff1a;编译时类型和运行时类型&#xff0c;而很多时候对象的编译时类型和运行时类型不一致&#xff08;多态&#xff09;。 Object obj n…

【AIGC科技展望】预测AIGC2025年的机会与挑战

2025年&#xff0c;AIGC的机会与挑战 在未来的五年里&#xff0c;AIGC&#xff08;AI Generated Content&#xff09;将会成为一个越来越重要的领域。但是&#xff0c;伴随着机会而来的是挑战。在这篇文章中&#xff0c;我们将一起探讨AIGC的机会与挑战&#xff0c;并预测2025…

机器学习系列13:通过随机森林获取特征重要性

我们已经知道通过 L1 正则化和 SBS 算法可以用来做特征选择。 我们还可以通过随机森林从数据集中选择相关的特征。随机森林里面包含了多棵决策树&#xff0c;我们可以通过计算特征在每棵决策树决策过程中所产生的的信息增益平均值来衡量该特征的重要性。 你可能需要参考&…