Spark SQL 概述

Spark SQL 概述

Spark SQL 是 Apache Spark 的一个模块,专门用于处理结构化数据。它集成了 SQL 查询和 Spark 编程的强大功能,使得处理大数据变得更加高效和简便。通过 Spark SQL,用户可以直接在 Spark 中使用 SQL 查询,或者使用 DataFrame 和 DataSet API 进行数据操作。

  • 一、Spark SQL 架构
  • 二、Spark SQL 特点
  • 三、Spark SQL 运行原理
  • 四、Spark SQL API 相关概述
  • 五、Spark SQL 依赖
  • 六、Spark SQL 数据集
    • 1、DataFrame
    • 2、Dataset
    • 3、DataFrame 和 Dataset 的关系
  • 七、Spark Sql 基本用法
    • 1、Scala 创建 SparkSession 对象
    • 2、DataFrame 和 Dataset 的创建方式
    • 3、DataFrame API

一、Spark SQL 架构

Spark SQL 的架构主要由以下几个组件组成:

  1. SparkSession:Spark 应用的统一入口点,用于创建 DataFrame、DataSet 和执行 SQL 查询。
  2. Catalyst 优化器:Spark SQL 的查询优化引擎,负责解析、分析、优化和生成物理执行计划。
  3. DataFrame 和 DataSet API:提供面向对象的编程接口,支持丰富的数据操作方法。
  4. 数据源接口:支持多种数据源,如 HDFS、S3、HBase、Cassandra、Hive 等。
  5. 执行引擎:将优化后的查询计划转换为执行任务,并在分布式集群上并行执行这些任务。

二、Spark SQL 特点

  • 统一数据访问接口:支持多种数据源(如 CSV、JSON、Parquet、Hive、JDBC、HBase 等)并提供一致的查询接口。
  • DataFrame 和 Dataset API:提供面向对象的编程接口,支持类型安全的操作,便于数据处理。
  • Catalyst 优化器:自动将用户的查询转换为高效的执行计划,提升查询性能。
  • 与 Hive 的集成:无缝集成 Hive,能够直接访问现存的 Hive 数据,并使用 Hive 的 UDF 和 UDAF。
  • 高性能:通过 Catalyst 优化器和 Tungsten 执行引擎,实现高效的查询性能和内存管理。
  • 多种操作方式:支持 SQL 和 API 编程两种操作方式,灵活性高。
  • 外部工具接口:提供 JDBC/ODBC 接口供第三方工具借助 Spark 进行数据处理。
  • 高级接口:提供了更高层级的接口,方便地处理数据。

三、Spark SQL 运行原理

在这里插入图片描述

查询解析(Query Parsing):将 SQL 查询解析成抽象语法树(AST)。

逻辑计划生成(Logical Plan Generation):将 AST 转换为未优化的逻辑计划。

逻辑计划优化(Logical Plan Optimization):使用 Catalyst 优化器对逻辑计划进行一系列规则优化。

物理计划生成(Physical Plan Generation):将优化后的逻辑计划转换为一个或多个物理计划,并选择最优的物理计划。

执行(Execution):将物理计划转换为 RDD,并在集群上并行执行。

四、Spark SQL API 相关概述

SparkContext:SparkContext 是 Spark 应用程序的主入口点,负责连接到 Spark 集群,管理资源和任务调度。在 Spark 2.0 之后,推荐使用 SparkSession 取代 SparkContext。

SQLContext:SQLContext 是 Spark SQL 的编程入口点,允许用户通过 SQL 查询或 DataFrame API 进行数据处理。它提供了基本的 Spark SQL 功能。

HiveContext:HiveContext 是 SQLContext 的子集,增加了对 Hive 的集成支持,可以直接访问 Hive 中的数据和元数据,使用 Hive 的 UDF 和 UDAF。

SparkSession:SparkSession 是 Spark 2.0 引入的新概念,合并了 SQLContext 和 HiveContext 的功能,提供了统一的编程接口。SparkSession 是 Spark SQL 的建议入口点,支持使用 DataFrame 和 Dataset API 进行数据处理。

创建 SparkContext 和 SparkSession 的注意事项:如果同时需要创建 SparkContext 和 SparkSession,必须先创建 SparkContext,再创建 SparkSession。如果先创建 SparkSession,再创建 SparkContext,会导致异常,因为在同一个 JVM 中只能运行一个 SparkContext。

五、Spark SQL 依赖

<properties><spark.version>3.1.2</spark.version><spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${spark.scala.version}</artifactId><version>${spark.version}</version>
</dependency>

六、Spark SQL 数据集

在 Spark SQL 中,数据集主要分为以下几种类型:DataFrame 和 Dataset。它们是处理和操作结构化和半结构化数据的核心抽象。

1、DataFrame

Dataset 是在 Spark 2.0 中引入的新的抽象数据结构,它是强类型的,可以存储 JVM 对象。Dataset API 结合了 DataFrame 的操作简便性和类型安全性,适用于需要更高级别数据类型控制和面向对象编程风格的场景。具体特点如下:

  • 类似于二维表格:DataFrame 类似于传统的关系数据库中的二维表格。
  • Schema(数据结构信息):在 RDD 的基础上加入了 Schema,描述数据结构的信息。
  • 支持嵌套数据类型:DataFrame 的 Schema 支持嵌套的数据类型,如 structmaparray
  • 丰富的 SQL 操作 API:提供更多类似 SQL 操作的 API,便于进行数据查询和操作。

2、Dataset

Dataset 是在 Spark 2.0 中引入的新的抽象数据结构,它是强类型的,可以存储 JVM 对象。Dataset API 结合了 DataFrame 的操作简便性和类型安全性,适用于需要更高级别数据类型控制和面向对象编程风格的场景。具体特点如下:

  • 强类型:Spark 1.6中引入的一个更通用的数据集合,Dataset 是强类型的,提供类型安全的操作。
  • RDD + Schema:可以认为 Dataset 是 RDD 和 Schema 的结合,既有 RDD 的分布式计算能力,又有 Schema 描述数据结构的信息。
  • 适用于特定领域对象:可以存储和操作特定领域对象的强类型集合。
  • 并行操作:可以使用函数或者相关操作并行地进行转换和操作。

3、DataFrame 和 Dataset 的关系

  • DataFrame 是特殊的 Dataset:DataFrame 是 Dataset 的一个特例,即 DataFrame = Dataset[Row]
  • 数据抽象和操作方式的统一:DataFrame 和 Dataset 统一了 Spark SQL 的数据抽象和操作方式,提供了灵活且强大的数据处理能力。

七、Spark Sql 基本用法

1、Scala 创建 SparkSession 对象

import org.apache.spark.sql.SparkSession
object SparkSqlContext {def main(args: Array[String]): Unit = {// 创建 SparkConf 对象,设置应用程序的配置val conf: SparkConf = new SparkConf().setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程.setAppName("spark sql") // 设置应用程序名称为 "spark sql"// 创建 SparkSession 对象,用于 Spark SQL 的编程入口val spark: SparkSession = SparkSession.builder().config(conf) // 将 SparkConf 配置应用于 SparkSession.getOrCreate() // 获取现有的 SparkSession,或者新建一个// 获取 SparkContext 对象,可以直接从 SparkSession 中获取val sc: SparkContext = spark.sparkContext// 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法import spark.implicits._// 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...// 停止 SparkSession,释放资源spark.stop()}
}

2、DataFrame 和 Dataset 的创建方式

1、从集合创建

case class Person(name: String, age: Int)				// 下同val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")

1、从文件系统读取

val schema = StructType(Seq(StructField("name", StringType, nullable = false),StructField("age", IntegerType, nullable = false)
))val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]val dfCsv: DataFrame = spark.read// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  .schema(schema)		  // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 .option("header", "true").csv("/path/to/csv/file")

3、从关系型数据库读取

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)

4、从非结构化数据源读取

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")

5、手动创建 Dataset

import org.apache.spark.sql.types._val schema = StructType(Seq(StructField("name", StringType, nullable = false),StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]val dfManual: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(data), schema
)

3、DataFrame API

语法示例一

模拟数据(1000条):

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...

需求:哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。

// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
import spark.implicits._// 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
val schema = StructType(Seq(StructField("id", LongType),StructField("name", StringType),StructField("gender", StringType),StructField("age", IntegerType),StructField("city", StringType),
))// 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
val WindowSpec: WindowSpec = Window.partitionBy($"gender").orderBy($"avg_age".desc)// 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
// 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
spark.read// .schema(schema)	// 应用我们定义的schema .option("header", "true") 							// 使用CSV的header作为列名.csv("D:\\projects\\sparkSql\\people.csv")			// DataFrame.select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选).groupBy($"city", $"gender") 							// 按城市和性别分组.agg(			// 多重聚合count($"id").as("count"),   		// 计算每个组的ID数量round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数).where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组.orderBy($"avg_age".desc)     // 按平均年龄降序排序.select($"city", $"gender", $"avg_age",dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank")).show() // 显示结果

结果:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  东莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|广州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+

语法示例二:视图,sql

// 读取CSV文件到DataFrame,使用header作为列名
val dfPeople: DataFrame = spark.read.option("header", "true") // 使用CSV的header作为列名.csv("D:\\projects\\sparkSql\\people.csv")// 将DataFrame注册为临时视图
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL来查询这个视图了
// 例如,查询所有人的姓名和年龄
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql("""|select * from people_view|where gender = '男'|""".stripMargin).show()

语法示例三:join

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)val frmStu = spark.createDataFrame(Seq(Student("张三", 1),Student("李四", 1),Student("王五", 2),Student("赵六", 2),Student("李明", 2),Student("王刚", 4),Student("王朋", 5),)
)val frmClass = spark.createDataFrame(Seq(Class(1, "name1"),Class(2, "name2"),Class(3, "name3"),Class(4, "name4"))
)

left 左连接,rignt 右连接, full 全外连接,anti左差集,semi左交集

// 别名 + inner 内连接
frmStu.as("S").join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接.show()// 使用左外连接将df和frmClass根据classId合并
frmStu.join(frmClass, Seq("classId"), "left")	.show()// 左差集
frmStu.join(frmClass, Seq("classId"), "anti")	.show()// 左交集
frmStu.join(frmClass, Seq("classId"), "semi")	.show()

结果

别名 + inner 内连接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|张三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|赵六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王刚|      4|      4|    name4|
+----+-------+-------+---------+使用左外连接将df和frmClass根据classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|张三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|赵六|    name2|
|      2|李明|    name2|
|      4|王刚|    name4|
|      5|王朋|     null|
+-------+----+---------+左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+左交集
+-------+----+
|classId|name|
+-------+----+
|      1|张三|
|      1|李四|
|      2|王五|
|      2|赵六|
|      2|李明|
|      4|王刚|
+-------+----+

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/44210.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu16.04安装低版本cmake(安装cmake安装)

文章目录 ubuntu16.04安装低版本cmake&#xff08;安装cmake安装&#xff09;1. **下载并解压CMake压缩文件**&#xff1a;- 首先&#xff0c;你需要从CMake的官方网站或其他可靠来源下载cmake-2.8.9-Linux-i386.tar.gz文件。- 然后在终端中使用以下命令解压文件&#xff1a; 2…

BFS:多源BFS问题

一、多源BFS简介 超级源点&#xff1a;其实就是把相应的原点一次性都丢到队列中 二、01矩阵 . - 力扣&#xff08;LeetCode&#xff09; class Solution { public:const int dx[4]{1,-1,0,0};const int dy[4]{0,0,1,-1};vector<vector<int>> updateMatrix(vector…

headerpwn:一款针对服务器响应与HTTP Header的模糊测试工具

关于headerpwn headerpwn是一款针对服务器响应与HTTP Header的模糊测试工具&#xff0c;广大研究人员可以利用该工具查找网络异常并分析服务器是如何响应不同HTTP Header的。 功能介绍 当前版本的headerpwn支持下列功能&#xff1a; 1、服务器安全与异常检测&#xff1b; 2、…

QFileDialog的简单了解

ps&#xff1a;写了点垃圾&#xff08;哈哈哈&#xff09; 它继承自QDialog 这是Windows自己的文件夹 这是两者的对比图&#xff1a; 通过看QFileDialog的源码&#xff0c;来分析它是怎么实现这样的效果的。 源码组成&#xff1a; qfiledialog.h qfiledialog_p.h&#xff…

Python面试宝典第11题:最长连续序列

题目 给定一个未排序的整数数组 nums &#xff0c;找出数字连续的最长序列&#xff08;不要求序列元素在原数组中连续&#xff09;的长度。请你设计并实现时间复杂度为 O(n) 的算法解决此问题。 示例 1&#xff1a; 输入&#xff1a;nums [100,4,200,1,3,2] 输出&#xff1a;…

打造热销爆款:LazadaShopee店铺测评与关键词策略

面对Lazada和Shopee平台上店铺销量难以突破的困境&#xff0c;卖家们往往寻求各种解决方案。其中&#xff0c;店铺测评作为提升店铺信誉、优化产品排名及增加曝光度的有效手段&#xff0c;正逐渐成为卖家关注的焦点。以下将深入探讨店铺测评的好处、实施技巧及自养号的关键要素…

提升校园效率:智慧校园后勤管理中的寻物管理功能

在智慧校园后勤管理体系中&#xff0c;寻物管理功能扮演着连接遗失与找回的桥梁角色&#xff0c;它充分利用现代信息技术&#xff0c;为校园内的师生提供了一套高效、便捷的失物招领解决方案。此功能围绕以下几个核心方面展开。 首先&#xff0c;它支持在线报失与信息登记。一旦…

如何连接到公司的服务器?

1.下载FileZilla FileZilla的下载与安装以及简单使用&#xff08;有图解超简单&#xff09;-CSDN博客 2.打开 3.输入主机 用户名 密码 端口 注&#xff1a;主机支持的协议类型&#xff1a; 4.连接成功 其他方式也有很多&#xff0c;比如通过cmd&#xff0c;html网页等等 3个…

昇思25天学习打卡营第19天|ShuffleNet图像分类

今天是参加昇思25天学习打卡营的第19天&#xff0c;今天打卡的课程是“ShuffleNet图像分类”&#xff0c;这里做一个简单的分享。 1.简介 在第15-18日的学习内容中&#xff0c;我们陆陆续续学习了计算机视觉相关的模型包括图像语义分割、图像分类、目标检测等内容&#xff0c…

中关村软件园发布“数据合规与出境评估服务平台”

在2024中关村论坛年会期间&#xff0c;中关村软件园发布“数据合规与出境评估服务平台”。该平台是中关村软件园结合北京市“两区”建设&#xff0c;立足软件园国家数字服务出口基地和数字贸易港建设&#xff0c;围绕园区内外部企业用户的业务合作、科研创新、跨国运营等场景需…

Python UDP编程之实时聊天与网络监控详解

概要 UDP(User Datagram Protocol,用户数据报协议)是网络协议中的一种,主要用于快速、简单的通信场景。与TCP相比,UDP没有连接、确认、重传等机制,因此传输效率高,但也不保证数据的可靠性和顺序。本文将详细介绍Python中如何使用UDP协议进行网络通信,并包含相应的示例…

如何理解跨界营销?详解跨界营销的主要类型和方法!

跨界营销是一种创新的营销策略&#xff0c;它巧妙地捕捉不同行业、产品和消费者偏好之间的共通点和潜在联系。这种策略将看似不相关的元素相互融合&#xff0c;相互影响&#xff0c;创造出一种全新的生活方式和审美观念&#xff0c;以此吸引目标消费者群体的注意和青睐。 通过…

LiveNVR监控流媒体Onvif/RTSP用户手册-用户管理:编辑、添加用户、关联通道、重置密码、删除、过滤搜索

LiveNVR监控流媒体Onvif/RTSP用户手册-用户管理:编辑、添加用户、关联通道、重置密码、删除、过滤搜索 1、用户管理1.1、添加用户1.2、关联通道1.3、重置密码1.4、编辑1.5、删除1.6、过滤搜索 2、RTSP/HLS/FLV/RTMP拉流Onvif流媒体服务 1、用户管理 1.1、添加用户 点击用户管理…

学习网络的第一步:全面解析OSI与TCP/IP模型

我是小米,一个喜欢分享技术的29岁程序员。如果你喜欢我的文章,欢迎关注我的微信公众号“软件求生”,获取更多技术干货! Hello,大家好!我是你们的好朋友小米。今天我们来聊一聊网络基础知识中的重量级选手——OSI模型和TCP/IP模型!网络的世界就像一个巨大的迷宫,而这两个…

Docker 镜像构建报 exec xxx.sh: no such file or directory

问题记录 场景&#xff1a; 处于对nacos docker 部署最新版本的探究&#xff0c;但是nacos/nacos-server镜像拉取不到最新版本&#xff0c;官网也是给出自己构建镜像的方案。 具体步骤很简单&#xff0c;先clone项目&#xff0c;然后签出你要的nacos版本&#xff0c;通过docke…

算法力扣刷题记录 四十二【101. 对称二叉树、100.相同的树、572.另一个树的子树】

前言 二叉树篇&#xff0c;开始对二叉树操作练习。 记录 四十二【101. 对称二叉树】。 继续。 一、题目阅读 给你一个二叉树的根节点 root &#xff0c; 检查它是否轴对称。 示例 1&#xff1a; 输入&#xff1a;root [1,2,2,3,4,4,3] 输出&#xff1a;true示例 2&#x…

Pytest单元测试系列[v1.0.0][Pytest基础]

Pytest安装与配置 和Unittest一样&#xff0c;Pytest是另一个Python语言的单元测试框架&#xff0c;与Unittest相比它的测试用例更加容易编写、运行方式更加灵活、报错信息更加清晰、断言写法更简洁并且它可以运行有unittest和nose编写的测试用例。 Pytest 安装 启动命令行&…

【Pytorch】Conda环境下载慢换源/删源/恢复默认源

文章目录 背景临时换源永久换源打开conda配置condarc换源执行配置 命令行修改源添加源查看源 删源恢复默认源使用示范 背景 随着实验增多&#xff0c;需要分割创建环境的情况时有出现&#xff0c;在此情况下使用conda create --name xx python3.10 pytorch torchvision pytorc…

uni-app三部曲之二: 封装http请求

1.引言 前面一篇文章写了使用Pinia进行全局状态管理。 这篇文章主要介绍一下封装http请求&#xff0c;发送数据请求到服务端进行数据的获取。 感谢&#xff1a; 1.yudao-mall-uniapp: 芋道商城&#xff0c;基于 Vue Uniapp 实现&#xff0c;支持分销、拼团、砍价、秒杀、优…

电脑自动重启是什么原因呢?99%人都不知道的解决办法,直接打破循环

当你的电脑突然毫无预警地自动重启&#xff0c;不仅打断了工作流程&#xff0c;还可能导致未保存的数据丢失&#xff0c;这无疑是一件令人沮丧的事情。那么&#xff0c;电脑自动重启是什么原因呢&#xff1f;有什么方法可以解决呢&#xff1f;别担心&#xff0c;在大多数情况下…