Day10—Spark SQL基础

在这里插入图片描述

Spark SQL介绍

​ Spark SQL是一个用于结构化数据处理的Spark组件。所谓结构化数据,是指具有Schema信息的数据,例如JSON、Parquet、Avro、CSV格式的数据。与基础的Spark RDD API不同,Spark SQL提供了对结构化数据的查询和计算接口。

Spark SQL的主要特点:

  • 将SQL查询与Spark应用程序无缝组合

​ Spark SQL允许使用SQL或熟悉的API在Spark程序中查询结构化数据。与Hive不同的是,Hive是将SQL翻译成MapReduce作业,底层是基于MapReduce的;而Spark SQL底层使用的是Spark RDD。

  • 可以连接到多种数据源

​ Spark SQL提供了访问各种数据源的通用方法,数据源包括Hive、Avro、Parquet、ORC、JSON、JDBC等。

  • 在现有的数据仓库上运行SQL或HiveQL查询

​ Spark SQL支持HiveQL语法以及Hive SerDes和UDF (用户自定义函数) ,允许访问现有的Hive仓库。

DataFrame和DataSet

  • DataFrame的结构

​ DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。

​ DataFrame在RDD的基础上添加了数据描述信息(Schema,即元信息) ,因此看起来更像是一张数据库表。例如,在一个RDD中有3行数据,将该RDD转成DataFrame后,其中的数据可能如图所示:
在这里插入图片描述

  • DataSet的结构
    Dataset是一个分布式数据集,是Spark 1.6中添加的一个新的API。相比于RDD, Dataset提供了强类型支持,在RDD的每行数据加了类型约束。
    在这里插入图片描述
    在Spark中,一个DataFrame代表的是一个元素类型为Row的Dataset,即DataFrame只是Dataset[Row]的一个类型别名。

Spark SQL的基本使用

​ Spark Shell启动时除了默认创建一个名为sc的SparkContext的实例外,还创建了一个名为spark的SparkSession实例,该spark变量可以在Spark Shell中直接使用。

​ SparkSession只是在SparkContext基础上的封装,应用程序的入口仍然是SparkContext。SparkSession允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序,支持从不同的数据源加载数据,并把数据转换成DataFrame,然后使用SQL语句来操作DataFrame数据。

Spark SQL函数

内置函数

​ Spark SQL内置了大量的函数,位于API org.apache.spark.sql.functions

中。其中大部分函数与Hive中的相同。

​ 使用内置函数有两种方式:一种是通过编程的方式使用;另一种是在SQL

语句中使用。

  • 以编程的方式使用lower()函数将用户姓名转为小写/大写,代码如下:
df.select(lower(col("name")).as("greet")).show()
df.select(upper(col("name")).as("greet")).show()

​ 上述代码中,df指的是DataFrame对象,使用select()方法传入需要查询的列,使用as()方法指定列的别名。代码col(“name”)指定要查询的列,也可以使用$"name"代替,代码如下:

df.select(lower($"name").as("greet")).show()
  • 以SQL语句的方式使用lower()函数,代码如下:
df.createTempView("temp")
spark.sql("select upper(name) as greet from temp").show()

​ 除了可以使用select()方法查询指定的列外,还可以直接使用filter()、groupBy()等方法对DataFrame数据进行过滤和分组,例如以下代码:

df.printSchema()  # 打印Schema信息
df.select("name").show()  # 查询name列
# 查询name列和age列,其中将age列的值增加1
df.select($"name",$"age"+1).show()
df.filter($"age">25).show() # 查询age>25的所有数据
# 根据age进行分组,并求每一组的数量
df.groupBy("age").count().show() 
自定义函数

​ 当Spark SQL提供的内置函数不能满足查询需求时,用户可以根据需求编写自定义函数(User Defined Functions, UDF),然后在Spark SQL中调用。

​ 例如有这样一个需求:为了保护用户的隐私,当查询数据的时候,需要将用户手机号的中间4位数字用星号()代替,比如手机号180***2688。这时就可以编写一个自定义函数来实现这个需求,实现代码如下:

package spark.demo.sqlimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}/*** 用户自定义函数,隐藏手机号中间4位*/
object SparkSQLUDF {def main(args: Array[String]): Unit = {//创建或得到SparkSessionval spark = SparkSession.builder().appName("SparkSQLUDF").master("local[*]").getOrCreate()//第一步:创建测试数据(或直接从文件中读取)//模拟数据val arr=Array("18001292080","13578698076","13890890876")//将数组数据转为RDDval rdd: RDD[String] = spark.sparkContext.parallelize(arr)//将RDD[String]转为RDD[Row]val rowRDD: RDD[Row] = rdd.map(line=>Row(line))//定义数据的schemaval schema=StructType(List{StructField("phone",StringType,true)})//将RDD[Row]转为DataFrameval df = spark.createDataFrame(rowRDD, schema)//第二步:创建自定义函数(phoneHide)val phoneUDF=(phone:String)=>{var result = "手机号码错误!"if (phone != null && (phone.length==11)) {val sb = new StringBuffersb.append(phone.substring(0, 3))sb.append("****")sb.append(phone.substring(7))result = sb.toString}result}//注册函数(第一个参数为函数名称,第二个参数为自定义的函数)spark.udf.register("phoneHide",phoneUDF)//第三步:调用自定义函数df.createTempView("t_phone")		//创建临时视图spark.sql("select phoneHide(phone) as phone from t_phone").show()// +-----------+// |      phone|// +-----------+// |180****2080|// |135****8076|// |138****0876|// +-----------+}
}
窗口(开窗)函数

​ 开窗函数是为了既显示聚合前的数据,又显示聚合后的数据,即在每一行的最后一列添加聚合函数的结果。开窗口函数有以下功能:

  • 同时具有分组和排序的功能
  • 不减少原表的行数
  • 开窗函数语法:

聚合类型开窗函数

sum()/count()/avg()/max()/min() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]]) 

排序类型开窗函数

ROW_NUMBER() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]])
  • 以row_number()开窗函数为例:

​ 开窗函数row_number()是Spark SQL中常用的一个窗口函数,使用该函数可以在查询结果中对每个分组的数据,按照其排列的顺序添加一列行号(从1开始),根据行号可以方便地对每一组数据取前N行(分组取TopN)。row_number()函数的使用格式如下:

row_number() over (partition by 列名 order by 列名 desc) 行号列别名

上述格式说明如下:

partition by:按照某一列进行分组;

order by:分组后按照某一列进行组内排序;

desc:降序,默认升序。

例如,统计每一个产品类别的销售额前3名,代码如下:

package spark.demo.sqlimport org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}/*** 统计每一个产品类别的销售额前3名(相当于分组求TOPN)*/
object SparkSQLWindowFunctionDemo {def main(args: Array[String]): Unit = {//创建或得到SparkSessionval spark = SparkSession.builder().appName("SparkSQLWindowFunctionDemo").master("local[*]").getOrCreate()//第一步:创建测试数据(字段:日期、产品类别、销售额)val arr=Array("2019-06-01,A,500","2019-06-01,B,600","2019-06-01,C,550","2019-06-02,A,700","2019-06-02,B,800","2019-06-02,C,880","2019-06-03,A,790","2019-06-03,B,700","2019-06-03,C,980","2019-06-04,A,920","2019-06-04,B,990","2019-06-04,C,680")//转为RDD[Row]val rowRDD=spark.sparkContext.makeRDD(arr).map(line=>Row(line.split(",")(0),line.split(",")(1),line.split(",")(2).toInt))//构建DataFrame元数据val structType=StructType(Array(StructField("date",StringType,true),StructField("type",StringType,true),StructField("money",IntegerType,true)))//将RDD[Row]转为DataFrameval df=spark.createDataFrame(rowRDD,structType)//第二步:使用开窗函数取每一个类别的金额前3名df.createTempView("t_sales")		//创建临时视图//执行SQL查询spark.sql("select date,type,money,rank from " +"(select date,type,money," +"row_number() over (partition by type order by money desc) rank "+"from t_sales) t " +"where t.rank<=3").show()}
}

在这里插入图片描述

结果展示

在这里插入图片描述

小结

本次学习了Spark SQL基础,学习Spark SQL基础是掌握大数据处理的关键一步。Spark SQL是Apache Spark的一个模块,它提供了对结构化和半结构化数据的高效处理能力。通过学习Spark SQL,你将能够使用SQL查询和DataFrame API来分析数据集。Spark SQL的核心优势在于其能够处理大规模数据集,同时保持高性能。它支持多种数据源,包括HDFS、S3、Parquet等,使得数据的读写变得简单。此外,Spark SQL还提供了丰富的数据类型和复杂的数据操作功能,如过滤、分组、排序和聚合。学习过程中,你将了解如何创建DataFrame,执行转换和操作,以及如何使用SQL语句进行查询。你还将学习到如何优化Spark SQL查询,包括使用分区、索引和缓存技术来提高性能。

掌握Spark SQL基础对于数据工程师和分析师来说非常重要,因为它不仅可以提高数据处理的效率,还可以帮助你更好地理解和分析大规模数据集。随着你的学习深入,你将能够更有效地利用Spark的强大功能来解决实际问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/30594.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IDEA上MySQL的jar包导入教程

jar包下载网址——》https://mvnrepository.com/ 1.进入界面&#xff0c;点击搜索框&#xff0c;搜索mysql&#xff1a; 外国网站&#xff0c;可能有点慢,耐心等待即可。 2.点击查询结果&#xff1a; 进入界面&#xff0c;点击前两个结果的其中一个&#xff0c;两个都可以 …

计算机毕业设计Python+Vue.js+Flask+Scrapy电影大数据分析 电影推荐系统 电影爬虫可视化 电影数据分析 大数据毕业设计 协同过滤算法

开发技术 协同过滤算法、机器学习、vue.js、echarts、Flask、Python、MySQL 创新点 协同过滤推荐算法、爬虫、数据可视化 补充说明 两种Python协同过滤推荐算法集成 (ItemCF推荐算法 和 UserCF 推荐算法) 2.专业美工整体设计的细腻的酷黑主题&#xff0c;前后端分离一体化系统&…

leetCode-hot100-链表专题

leetCode-hot100-链表专题 链表简介单链表单链表的使用例题206.反转链表19.删除链表的倒数第N个结点24.两两交换链表中的节点25.K个一组翻转链表 双向链表双向链表的使用 循环链表61.旋转链表141.环形链表142.环形链表Ⅱ LinkedListLinkedList的使用 链表简介 参考博客&#x…

掉电安全文件系统分析

掉电安全FS 掉电安全的文件系统&#xff08;Power-Fail Safe File Systems&#xff09;被设计为在电源故障或系统崩溃的情况下仍能保持数据一致性的文件系统。这样的文件系统通常通过使用日志&#xff08;journaling&#xff09;或写时复制&#xff08;copy-on-write&#xff…

文章解读与仿真程序复现思路——电力系统自动化EI\CSCD\北大核心《协同考虑空气质量与热舒适度的空调系统双层优化控制策略》

本专栏栏目提供文章与程序复现思路&#xff0c;具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源…

升级 Kubernetes Ingress NGINX Controller: 自助编译 NGINX 的实践

保持软件的更新对于系统的安全性和稳定性至关重要。本文将介绍如何从源代码自助编译 NGINX&#xff0c;并将其集成到 Kubernetes Ingress NGINX Controller 镜像中&#xff0c;以实现 NGINX 的升级。此方法不仅可以提高系统的安全性&#xff0c;还可以定制 NGINX 的功能&#x…

ipvsadm命令总结

ipvsadm命令总结 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 什么是ipvsadm命令&#xff1f; ipvsadm是Linux系统下的一个命令行工具&#xff0c;用于配置…

【Oracle】调用HTTP接口

Oracle调用http接口 前情提要1.创建HTTP请求函数2.创建ACL并授予权限3.测试HTTP请求函数一点建议 前情提要 公司唯有oracle被允许访问内外网&#xff0c;因此在oracle中发起HTTP请求。 1.创建HTTP请求函数 CREATE OR REPLACE FUNCTION HTTP_REQUEST(v_url VARCHAR2,--請求地…

数据资产安全保卫战:构建多层次、全方位的数据安全防护体系,守护企业核心数据资产安全

一、引言 在信息化时代&#xff0c;数据资产已成为企业运营的核心&#xff0c;其安全性直接关系到企业的生存与发展。然而&#xff0c;随着网络技术的飞速发展&#xff0c;数据泄露、黑客攻击等安全威胁日益增多&#xff0c;给企业的数据资产安全带来了严峻挑战。因此&#xf…

CSS动画实现让元素一直旋转

在CSS中&#xff0c;使用keyframes动画和transform: rotate()属性来创建一个无限循环的旋转动画。以下是一个简单的示例&#xff0c;展示了如何使一个元素&#xff08;比如一个div&#xff09;无限地旋转&#xff1a; keyframes spin { from { transform: rotate(0deg); } …

202483读书笔记|《把你写进诗歌里》——人生是一场不知何时散场的约会,爱慕向来短暂,失去才是唯一出路

202483读书笔记|《把你写进诗歌里》——人生是一场不知何时散场的约会&#xff0c;爱慕向来短暂&#xff0c;失去才是唯一出路 摘录 《把你写进诗歌里&#xff08;2020年度中国优秀诗歌&#xff09;》&#xff0c;作者上官文露。并不惊艳&#xff0c;中英文双语对照的一本诗集&…

深入理解和应用Eureka:服务注册与发现的利器

目录 一 Eureka简介 什么是Eureka? 为什么选择Eureka? 二 Eureka的基本概念 Eureka Server Eureka Client 服务注册与发现 三 Eureka的工作原理 注册服务 心跳机制 服务发现 缓存机制 四 Eureka的配置与部署 配置Eureka Server 配置Eureka Client 五 Eureka的…

Python | Leetcode Python题解之第168题Excel表列名称

题目&#xff1a; 题解&#xff1a; class Solution:def convertToTitle(self, columnNumber: int) -> str:ans list()while columnNumber > 0:columnNumber - 1ans.append(chr(columnNumber % 26 ord("A")))columnNumber // 26return "".join(an…

RC4混淆变形

RC4混淆变形 一、RC4混淆变形1. 修改初始密钥调度算法 (KSA)2. 修改伪随机生成算法 (PRGA)3. 引入非线性变换4. 使用多个状态数组5. 增加密钥混淆步骤6.示例代码 二、RC4算法详解2.1. 密钥调度算法 (KSA)2.2. 伪随机生成算法 (PRGA)2.3. 加密/解密2.4 RC4 C实现2.5 代码解析 RC…

新手装修 避坑课2.0:装修之前一定要做好功课(55节课)

课程下载&#xff1a;https://download.csdn.net/download/m0_66047725/89388333 更多资源下载&#xff1a;关注我。 课程目录 第01节1.装修前准备工作.mp4 第02节开篇.mp4 第03节2.装修需要提前定好的设备和材料.mp4 第04节3.自装还是找装修公司.mp4 第05节4.自装怎么找…

Elementui的el-dropdown组件使用与案例

一、使用 trigger 属性用于设置下拉菜单的触发方式&#xff0c;可以是 click&#xff08;点击触发&#xff09;或 hover&#xff08;悬停触发&#xff09;。command 属性用于在 el-dropdown-item 上设置命令值。当用户点击某个菜单项时&#xff0c;会触发 command 事件&#x…

HTTP的持久连接(Persistent Connection)

目录 HTTP的持久连接&#xff08;Persistent Connection&#xff09;持久连接的工作原理HTTP/1.0 与 HTTP/1.1 的区别示例持久连接的优点持久连接的缺点超时机制 HTTP的持久连接&#xff08;Persistent Connection&#xff09; HTTP 的持久连接&#xff08;Persistent Connect…

win11照片裁剪视频无法保存问题解决

win11照片默认走核显&#xff0c;intel的显卡可能无法解码&#xff0c;在设置里把照片的显示卡默认换成显卡就好了

红队内网攻防渗透:内网渗透之Linux内网权限提升技术:LXDDockerRbash限制型bash

红队内网攻防渗透 1. 内网权限提升技术1.1 Linux系统提权-普通用户-LXD容器1.2 Linux系统提权-普通用户-Docker容器1.3 权限在docker里面1.4 Linux系统提权-普通用户-Rbash限制型bash1. 内网权限提升技术 利用参考 https://gtfobins.github.io/LXD、LXC 和 Docker 是三种不同…

企业UDP文件传输工具测速的方式(下)

在前一篇文章中&#xff0c;我们深入讨论了UDP传输的基本概念和镭速UDP文件传输工具如何使用命令行快速进行速度测试。现在&#xff0c;让我们进一步探索更为高级和灵活的方法&#xff0c;即通过整合镭速UDP的动态或静态库来实现网络速度的测量&#xff0c;以及如何利用这一过程…