Spark SQL编程

1. Spark SQL概述

1.1 什么是Spark SQL

Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。与Spark SQL交互的方式有多种,包括SQL和Dataset API。计算结果时,使用相同的执行引擎,与您用于表达计算的API/语言无关。

1.2 为什么要有Spark SQL

1.3 SparkSQL的发展

1)发展历史

RDD(Spark1.0)=》Dataframe(Spark1.3)=》Dataset(Spark1.6)

如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同的他们的执行效率和执行方式在现在的版本中,dataSet性能最好,已经成为了唯一使用的接口。其中Dataframe已经在底层被看做是特殊泛型的DataSet<Row>。

2)三者的共性

(1)RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利

(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子如foreach时,三者才会开始遍历运算

(3)三者有许多共同的函数,如filter,排序等

(4)三者都会根据Spark的内存情况自动缓存运算

(5)三者都有分区的概念

1.4 Spark SQL的特点

1)易整合

无缝的整合了SQL查询和Spark编程。

2)统一的数据访问方式

使用相同的方式连接不同的数据源。

3)兼容Hive

在已有的仓库上直接运行SQL或者HQL。

4)标准的数据连接

通过JDBC或者ODBC来连接

1.5 SparkSession新的起始点

在老的版本中,SparkSQL提供两种SQL查询起始点:

  • 一个叫SQLContext,用于Spark自己提供的SQL查询;
  • 一个叫HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。

[atguigu@hadoop102 spark-local]$ bin/spark-shell

20/09/12 11:16:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Spark context Web UI available at http://hadoop102:4040

Spark context available as 'sc' (master = local[*], app id = local-1599880621394).

Spark session available as 'spark'.

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 3.3.1

      /_/

Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212)

Type in expressions to have them evaluated.

Type :help for more information.

2 常用方式

2.1 方法调用

1)创建一个maven工程SparkSQL

2)创建包名为com.atguigu.sparksql

3)输入文件夹准备:在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.json。并输入如下内容:

{"age":20,"name":"qiaofeng"}{"age":19,"name":"xuzhu"}{"age":18,"name":"duanyu"}

{"age":22,"name":"qiaofeng"}

{"age":11,"name":"xuzhu"}

{"age":12,"name":"duanyu"}

5)在pom.xml文件中添加spark-sql的依赖

<dependencies>

    <dependency>

       <groupId>org.apache.spark</groupId>

       <artifactId>spark-sql_2.12</artifactId>

       <version>3.3.1</version>

    </dependency>

    <dependency>

       <groupId>org.projectlombok</groupId>

       <artifactId>lombok</artifactId>

       <version>1.18.22</version>

    </dependency>

</dependencies>

6)代码实现

添加javaBean的User

package com.atguigu.sparksql.Bean;

import lombok.Data;

import java.io.Serializable;

@Data

public class User implements Serializable {

    public Long age;

    public String name;

    public User() {

    }

    public User(Long age, String name) {

        this.age = age;

        this.name = name;

    }

}

代码编写

package com.atguigu.sparksql;

import com.atguigu.sparksql.Bean.User;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.function.MapFunction;

import org.apache.spark.api.java.function.ReduceFunction;

import org.apache.spark.sql.*;

import scala.Tuple2;

public class Test01_Method {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        // 按照行读取

        Dataset<Row> lineDS = spark.read().json("input/user.json");

        // 转换为类和对象

        Dataset<User> userDS = lineDS.as(Encoders.bean(User.class));

//        userDS.show();

        // 使用方法操作

        // 函数式的方法

        Dataset<User> userDataset = lineDS.map(new MapFunction<Row, User>() {

            @Override

            public User call(Row value) throws Exception {

                return new User(value.getLong(0), value.getString(1));

            }

        },

                // 使用kryo在底层会有部分算子无法使用

                Encoders.bean(User.class));

        // 常规方法

        Dataset<User> sortDS = userDataset.sort(new Column("age"));

        sortDS.show();

        // 区分

        RelationalGroupedDataset groupByDS = userDataset.groupBy("name");

        // 后续方法不同

        Dataset<Row> count = groupByDS.count();

        // 推荐使用函数式的方法  使用更灵活

        KeyValueGroupedDataset<String, User> groupedDataset = userDataset.groupByKey(new MapFunction<User, String>() {

            @Override

            public String call(User value) throws Exception {

                return value.name;

            }

        }, Encoders.STRING());

        // 聚合算子都是从groupByKey开始

        // 推荐使用reduceGroup

        Dataset<Tuple2<String, User>> result = groupedDataset.reduceGroups(new ReduceFunction<User>() {

            @Override

            public User call(User v1, User v2) throws Exception {

                // 取用户的大年龄

                return new User(Math.max(v1.age, v2.age), v1.name);

            }

        });

        result.show();

        //4. 关闭sparkSession

        spark.close();

    }

}

在sparkSql中DS直接支持的转换算子有:map(底层已经优化为mapPartition)、mapPartition、flatMap、groupByKey(聚合算子全部由groupByKey开始)、filter、distinct、coalesce、repartition、sort和orderBy(不是函数式的算子,不过不影响使用)。

2.2 SQL使用方式

package com.atguigu.sparksql;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

public class Test02_SQL {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        Dataset<Row> lineDS = spark.read().json("input/user.json");

        // 创建视图 => 转换为表格 填写表名

        // 临时视图的生命周期和当前的sparkSession绑定

        // orReplace表示覆盖之前相同名称的视图

        lineDS.createOrReplaceTempView("t1");

        // 支持所有的hive sql语法,并且会使用spark的又花钱

        Dataset<Row> result = spark.sql("select * from t1 where age > 18");

        result.show();

        //4. 关闭sparkSession

        spark.close();

    }

}}

2.3 DSL特殊语法(扩展)

package com.atguigu.sparksql;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.col;

public class Test03_DSL {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        // 导入特殊的依赖 import static org.apache.spark.sql.functions.col;

        Dataset<Row> lineRDD = spark.read().json("input/user.json");

        Dataset<Row> result = lineRDD.select(col("name").as("newName"),col("age").plus(1).as("newAge"))

                .filter(col("age").gt(18));

        result.show();

        //4. 关闭sparkSession

        spark.close();

    }

}

3 SQL语法的用户自定义函数

3.1 UDF 用户自定义函数

1)UDF:一行进入,一行出

2)代码实现

package com.atguigu.sparksql;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.api.java.UDF1;

import org.apache.spark.sql.expressions.UserDefinedFunction;

import org.apache.spark.sql.types.DataTypes;

import static org.apache.spark.sql.functions.udf;

public class Test04_UDF {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        Dataset<Row> lineRDD = spark.read().json("input/user.json");

        lineRDD.createOrReplaceTempView("user");

        // 定义一个函数

        // 需要首先导入依赖import static org.apache.spark.sql.functions.udf;

        UserDefinedFunction addName = udf(new UDF1<String, String>() {

            @Override

            public String call(String s) throws Exception {

                return s + " 大侠";

            }

        }, DataTypes.StringType);

        spark.udf().register("addName",addName);

        spark.sql("select addName(name) newName from user")

                .show();

        // lambda表达式写法

        spark.udf().register("addName1",(UDF1<String,String>) name -> name + " 大侠",DataTypes.StringType);

        //4. 关闭sparkSession

        spark.close();

    }

}

3.2 UDAF 用户自定义聚合函数

1)UDAF:输入多行,返回一行。通常和groupBy一起使用,如果直接使用UDAF函数,默认将所有的数据合并在一起。

2)Spark3.x推荐使用extends Aggregator自定义UDAF,属于强类型的Dataset方式。

3)Spark2.x使用extends UserDefinedAggregateFunction,属于弱类型的DataFrame

4)案例实操

需求:实现求平均年龄,自定义UDAFMyAvg(age)

(1)自定义聚合函数实现-强类型

package com.atguigu.sparksql;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.Encoder;

import org.apache.spark.sql.Encoders;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.expressions.Aggregator;

import java.io.Serializable;

import static org.apache.spark.sql.functions.udaf;

public class Test05_UDAF {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        spark.read().json("input/user.json").createOrReplaceTempView("user");

        // 注册需要导入依赖 import static org.apache.spark.sql.functions.udaf;

        spark.udf().register("avgAge",udaf(new MyAvg(),Encoders.LONG()));

        spark.sql("select avgAge(age) newAge from user").show();

        //4. 关闭sparkSession

        spark.close();

    }

    public static class Buffer implements Serializable {

        private Long sum;

        private Long count;

        public Buffer() {

        }

        public Buffer(Long sum, Long count) {

            this.sum = sum;

            this.count = count;

        }

        public Long getSum() {

            return sum;

        }

        public void setSum(Long sum) {

            this.sum = sum;

        }

        public Long getCount() {

            return count;

        }

        public void setCount(Long count) {

            this.count = count;

        }

    }

    public static class MyAvg extends Aggregator<Long,Buffer,Double>{

        @Override

        public Buffer zero() {

            return new Buffer(0L,0L);

        }

        @Override

        public Buffer reduce(Buffer b, Long a) {

            b.setSum(b.getSum() + a);

            b.setCount(b.getCount() + 1);

            return b;

        }

        @Override

        public Buffer merge(Buffer b1, Buffer b2) {

            b1.setSum(b1.getSum() + b2.getSum());

            b1.setCount(b1.getCount() + b2.getCount());

            return b1;

        }

        @Override

        public Double finish(Buffer reduction) {

            return reduction.getSum().doubleValue() / reduction.getCount();

        }

        @Override

        public Encoder<Buffer> bufferEncoder() {

            // 可以用kryo进行优化

            return Encoders.kryo(Buffer.class);

        }

        @Override

        public Encoder<Double> outputEncoder() {

            return Encoders.DOUBLE();

        }

    }

}

3.3 UDTF(没有)

输入一行,返回多行(Hive)

SparkSQL中没有UDTF,需要使用算子类型的flatMap先完成拆分。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/142562.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解析SQL 获取表、字段及SQL查询参数

解析SQL 获取表、字段及SQL查询参数 1. 执行效果2. 使用2.1 引入依赖2.2 相关实体2.3 工具类 1. 执行效果 2. 使用 2.1 引入依赖 <!-- sql 解析处理--><dependency><groupId>com.github.jsqlparser</groupId><artifactId>jsqlparser</artifa…

「实用场景教程」如何用日程控件DHTMLX Scheduler制作酒店预订日历?(一)

dhtmlxScheduler是一个类似于Google日历的JavaScript日程安排控件&#xff0c;日历事件通过Ajax动态加载&#xff0c;支持通过拖放功能调整事件日期和时间&#xff0c;事件可以按天&#xff0c;周&#xff0c;月三个种视图显示。 DHTMLX Scheduler正式版下载 在本教程中&…

K8S的基础知识

K8S的意义与入门 专有名词 容器:包含了运行一个应用程序所需要的所有东西,包括:代码、运行时、各种依赖和配置。pod:K8s调度的最小单元,包含一个或多个容器。一个容器组中的容器具有紧密耦合性,共享资源,存储空间和IP。即同一个容器组中的容器可以通过localhost:xxx访问…

windows系统pycharm程序通过urllib下载权重https报错解决

报错内容&#xff1a; raise URLError(unknown url type: %s % type) urllib.error.URLError: <urlopen error unknown url type: https> 解决办法记录&#xff1a; 1. 下载 pyopenssl : pip install pyopenssl 此时&#xff0c; import ssl 可以通过提示指导你安…

简单地聊一聊Spring Boot的构架

前言 本文小编将详细解析Spring Boot框架&#xff0c;并通过代码举例说明每个层的作用。我们将深入探讨Spring Boot的整体架构&#xff0c;包括展示层、业务逻辑层和数据访问层。通过这些例子&#xff0c;读者将更加清晰地了解每个层在应用程序中的具体作用。通过代码实例&…

Uniapp开发 购物商城源码 在线电商商城源码 适配移动终端项目及各小程序

lilishop电商商城系统 商城移动端&#xff0c;使用Uniapp开发&#xff0c;可编译为所有移动终端项目及各小程序 源码下载&#xff1a;https://download.csdn.net/download/m0_66047725/88487579 源码下载2&#xff1a;关注我留言

0基础学习VR全景平台篇第121篇:认识视频剪辑软件Premiere

上课&#xff01;全体起立~ 大家好&#xff0c;欢迎观看蛙色官方系列全景摄影课程&#xff01; 大家好&#xff0c;这节课是带领大家认识认识我们的剪辑软件Premiere&#xff0c;一般简称是PR。 &#xff08;PR界面&#xff09; 我们首先打开PR&#xff0c;第一步就是要创建…

深入理解JVM虚拟机第二十五篇:详解JVM方法的绑定机制静态绑定和动态绑定,早期绑定晚期绑定,并编写代码从字节码角度证明这件事情

大神链接&#xff1a;作者有幸结识技术大神孙哥为好友&#xff0c;获益匪浅。现在把孙哥视频分享给大家。 孙哥链接&#xff1a;孙哥个人主页 作者简介&#xff1a;一个颜值99分&#xff0c;只比孙哥差一点的程序员 本专栏简介&#xff1a;话不多说&#xff0c;让我们一起干翻J…

Android WebView专题

WebView 专题 第一个WebView程序&#xff1a;加载远程网址 Layout添加WebView组件&#xff1b; <WebViewandroid:id"id/webView_first"android:layout_width"match_parent"android:layout_height"match_parent"/>初始化组件&#xff0c;加…

arm2 day6

串口实现单个字符的收发 main.c uart4.c uart4.h

java轮播图接口实现

一. 内容简介 实现java后端用户管理接口&#xff0c;数据库使用msyql。 二. 软件环境 2.1 java 1.8 2.2 mysql Ver 8.0.13 for Win64 on x86_64 (MySQL Community Server - GPL) 2.3 IDEA ULTIMATE 2019.3 2.4d代码地址 https://gitee.com/JJW_1601897441/competitionAs…

【左程云算法全讲10】打表技巧和矩阵处理技巧

系列综述&#xff1a; &#x1f49e;目的&#xff1a;本系列是个人整理为了秋招面试的&#xff0c;整理期间苛求每个知识点&#xff0c;平衡理解简易度与深入程度。 &#x1f970;来源&#xff1a;材料主要源于左程云算法课程进行的&#xff0c;每个知识点的修正和深入主要参考…

冲击900亿美元估值!邀约路演、秘密交表的Shein上市有望

双十一的狂欢刚刚结束&#xff0c;Shein即将赴美上市的消息又在电商圈里投下一枚重磅炸弹。 继被媒体曝光其寻求900亿美金估值后&#xff0c;最新的消息称其已邀请投资人参与路演&#xff0c;且已秘密完成交表。这个神秘的中国独角兽&#xff0c;离敲钟登陆美股的日子越来越近…

SoftwareTest6 - 用 Selenium 怎么点点点

用 Selenium 来点点点 一 . 什么是自动化 ?1.1 自动化测试的分类接口自动化测试UI 自动化测试 (界面测试) 1.2 实现自动化测试的工具 : selenium环境部署驱动 二 . selenium 的使用2.1 一个简单的示例 : 让谷歌浏览器在百度首页搜索蔡徐坤准备工作编写代码 2.2 打开谷歌浏览器…

【vue】AntDV组件库中a-upload实现文件上传:

文章目录 一、文档&#xff1a;二、使用(以Jeecg为例)&#xff1a;【1】template&#xff1a;【2】script&#xff1a; 三、效果图&#xff1a; 一、文档&#xff1a; Upload 上传–Ant Design Vue 二、使用(以Jeecg为例)&#xff1a; 【1】template&#xff1a; <a-uploa…

day08_子网划分与子网掩码

什么是子网划分? 1、概念&#xff1a;借主机位给网络位使用,以此来达到把一个大网段划分为n个儿子网段的目的&#xff0c;2. 为何要进行子网划分&#xff1f;3、子网掩码&#xff1a;就是对ip地址打记号4、 网络地址的计算机方式&#xff1a;ip地址与子网掩码都转换成二进制&a…

基于51单片机PCF8591数字电压表数码管显示设计( proteus仿真+程序+设计报告+讲解视频)

PCF8591数字电压表数码管显示 1.主要功能&#xff1a;讲解视频&#xff1a;2.仿真3. 程序代码4. 设计报告5. 设计资料内容清单&&下载链接资料下载链接&#xff08;可点击&#xff09;&#xff1a; 基于51单片机PCF8591数字电压表数码管设计( proteus仿真程序设计报告讲…

Vue3-TypeScript-Threejs:导入外部的glb格式3D模型

一、直接上代码&#xff0c;在vue3-typescript-threejs 项目 导入外部的glb格式3D模型 极简代码&#xff0c;快速理解 <template><div ref"container"></div></template><script lang"ts" setup>import { onMounted, ref …

IEEE--DSConv: Efficient Convolution Operator 论文翻译

论文地址:https://arxiv.org/pdf/1901.01928v1.pdf 目录 摘要 1 介绍 2 相关工作 3 DSConv层 4 量化过程 5 分布偏移 6 优化推断 7 训练 8 结果 8.1 ImageNet 8.2 内存和计算负载 8.3 转移性 9 结论 摘要 我们引入了一种卷积层的变体&#xff0c;称为DSConv&…

cmd打开idea

当我们用idea打开一个项目的时候&#xff0c;有时候这个项目目录是有的&#xff0c;但是用idea的open却找不到&#xff0c;有时候我要重新关闭窗口&#xff0c;再open好多次才有 于是我现在使用命令打开&#xff0c;先把idea安装路径的bin目录放在path里面 然后cd到项目路径&…