大数据学习18之Spark-SQL

1.概述

1.1.简介

        Spark SQL 是 Apache Spark 用于处理结构化数据的模块。

1.2.历史

1.2.1.Shark

        Hadoop诞生初期,Hive是唯一在Hadoop上运行的SQL-on-Hadoop工具,MR的中间计算过程产生了大量的磁盘落地操作,消耗了大量的I/O,降低了程序的运行效率。

        为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具被开发,Spark-SQL便是其中的一种,Spark-SQL的前身就是Shark。

1.2.2.Hive on Spark*

        Spark的开发较晚,那时候的主流数据仓库便是Hive,为了通用性,Spark就与Hive结合起来了,对于SQL语句的解析都交给Hive来进行处理,并且Spark程序一定程度上替代了Hive底层的MapReduce程序,提高了作业的计算效率。

1.2.3.Spark on Hive*

        随着Spark发展,Shark 对于 Hive 的太多依赖(如采用 Hive 的语法解析器、查询优化器等等)制约了 Spark的 One stack to rule them all 的既定方针,制约了 Spark 各个组件的相互集成,所以就提出了 SparkSQL 项目。

        并且Hive本身的迭代更新速度较慢,就算是现在的最新版本的Hive支持的Spark也才2.x.x,

同时Spark在3.0.0版本时做出了一系列的优化,如果还是依赖于与Hive的化Spark3.0以上的版本的是用不了的,那么Spark的优化就没有意义了。

        为了让Spark的优化变得可用,Spark就自己开发了一套用于SQL操作的模块,由之前的Shark来到了现在的Spark-SQL。

        经过这次的转变,Spark由原来的依赖Hive解析SQL变成了由自己的Spark-SQL模块解析的方式,但是保留了对Hive的元数据访问。

        也就是说,现在的Spark除了元数据外,几乎可以说是一个一栈式大数据框架了。

1.2.4.Hive on Spark vs. Spark on Hive

        Hive on Spark:Hive为主体,在Hive中继承Spark,Hive即存储元数据,也解析SQL语句,只是Hive将引擎从MR更换为Spark由 ,Spark 负责运算工作,但部署较为复杂。

        Spark on Hive:Spark为主体,Hive只负责元数据的存储,由Spark来解析和执行SQL语句,其中SQL语法为Spark-SQL语法,且部署简单。Spark on Hive 的优点在于它提供了更灵活的编程接口,适用于各种数据处理需求。

2.数据模型

2.1. RDD 和 DataFrame

2.1.1.RDD转DataFrame

//创建样例类
scala> case class User(id: Int, name: String, age: Int, gender: Int)
defined class User//创建 RDD
scala> val rdd = sc.makeRDD(List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu",20, 1)))//RDD 转 DataFrame
scala> val df = rdd.toDF

2.1.2.DataFrame 转 RDD

//创建 DataFrame
scala> val df = spark.read.json("file:///opt/spark-local/data/user/user.json")//DataFrame 转 RDD
scala> val rdd = df.rdd

 2.2.RDD 和 Dataset

2.2.1. RDD 转 Dataset

        RDD 和 Dataset 两个都是强类型模型,所以可以相互直接转换。

//创建样例类
scala> case class User(id: Int, name: String, age: Int, gender: Int)//创建 RDD
scala> val rdd = sc.makeRDD(List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu",20, 1)))//RDD 转 Dataset。
scala> val ds = rdd.toDS

2.2.2.Dataset 转 RDD

scala> val rdd = ds.rdd

2.3. DataFrame 和 Dataset

2.3.1.DataFrame 转 Dataset

        配合样例类使用 as[类型] 转换为 DataSet。

scala> val df = spark.read.json("file:///opt/yjx/spark-scalocal/data/user/user.json")scala> val ds = df.as[User]

2.3.2.Dataset 转 DataFrame

//创建 Dataset
scala> case class User(id: Int, name: String, age: Int, gender: Int)defined class Userscala> val list = List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu", 20, 1))
list: List[User] = List(User(1,zhangsan,18,1), User(2,lisi,19,0), User(3,wangwu,20,1))scala> val ds = list.toDS//Dataset 转 DataFrame
scala> val df = ds.toDF

3. IDEA 开发 SparkSQL

        创建普通 Maven 项目,添加以下依赖。

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.2</version>
</dependency>

3.1.DataFrame

object DataFrameDemo {
case class User(id: Int, name: String, age: Int, gender: Int)
def main(args: Array[String]): Unit = {
// ==================== 建立连接 ====================
// 初始化配置对象并设置运行模式与 AppName
val conf = new SparkConf().setMaster("local[*]").setAppName("DataFrameDemo")
// 根据配置对象初始化 SparkSession 对象
val spark = SparkSession.builder().config(conf).getOrCreate()
// 日志级别
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
// 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
import spark.implicits._
// ==================== 业务处理 ====================
// RDD 转 DataFrame
val rdd = sc.makeRDD(List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu",
20, 1)))
val df1: DataFrame = rdd.toDF()
df1.show()
// 直接创建 DataFrame
val df2 = spark.read.json("data/user/user.json")
df2.show()
// 创建临时表
df2.createOrReplaceTempView("t_user")
// 编写 SQL
lazy val sql = "SELECT id, name, age, gender FROM t_user ORDER BY age DESC"
// 执行 SQL
spark.sql(sql).show()
// ==================== 关闭连接 ====================
spark.stop
}

3.2.Dataset

def main(args: Array[String]): Unit = {
// ==================== 建立连接 ====================
// 初始化配置对象并设置运行模式与 AppName
val conf = new SparkConf().setMaster("local[*]").setAppName("DatasetDemo")
// 根据配置对象初始化 SparkSession 对象
val spark = SparkSession.builder().config(conf).getOrCreate()
// 日志级别
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
// 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
import spark.implicits._
// ==================== 业务处理 ====================
// RDD 转 Dataset
val rdd = sc.makeRDD(List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu",
20, 1)))
val ds1: Dataset[User] = rdd.toDS()
ds1.show()
// 创建 DataFrame
val df: DataFrame = spark.read.json("data/user/user.json")
// 通过 DataFrame 使用 as[类型] 转换为 DataSet
val ds2: Dataset[User] = df.as[User]
ds2.show()
// 创建临时表
ds2.createOrReplaceTempView("t_user")
// 编写 SQL
lazy val sql = "SELECT id, name, age, gender FROM t_user ORDER BY age DESC"
// 执行 SQL
spark.sql(sql).show
// ==================== 关闭连接 ====================
spark.stop
}

4.DSL 领域特定语言

        DSL 为 Domain Specific Language 的缩写,翻译过来为领域特定语言。简单理解就是 Spark 独有的结构化数据操作语法。

        此处不做赘述。

5.自定义函数

5.1.UDF用户定义普通函数

案例: 

object UDFDemo {
case class Emp(empno: Int, ename: String, job: String, mgr: Int, hiredate: String, sal: Double,
comm: Double, deptno: Int)
def main(args: Array[String]): Unit = {
// ==================== 建立连接 ====================
// 初始化配置对象并设置运行模式与 AppName
val conf = new SparkConf().setMaster("local[*]").setAppName("UDFDemo")
// 根据配置对象初始化 SparkSession 对象
val spark = SparkSession.builder().config(conf).getOrCreate()
// 日志级别
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
// 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
import spark.implicits._
// ==================== 业务处理 ====================
// 数据准备
val df: DataFrame = spark.read
.option("header", "true")
.option("sep", ",")
.option("inferSchema", "true")
.csv("data/scott/emp.csv")
val emp: Dataset[Emp] = df.as[Emp]
emp.createOrReplaceTempView("emp")
// 注册 UDF 函数
val prefix_name = spark.udf.register("prefix_name", (name: String) => {
"Hello: " + name
})
// 在 SQL 中使用
val sql =
"""
|SELECT ename, prefix_name(ename) AS new_name FROM emp
|""".stripMargin
spark.sql(sql).show(5)
// 在 DSL 中使用
emp.select('job, prefix_name('job).as("new_job")).show(5)
// ==================== 关闭连接 ====================
spark.stop
}
}

5.2.UDAF用户定义聚合函数

案例:

object UDAFDemo03_Spark3 {
case class Emp(empno: Int, ename: String, job: String, mgr: Int, hiredate: String, sal: Double,
comm: Double, deptno: Int)
def main(args: Array[String]): Unit = {
// ==================== 建立连接 ====================
// 初始化配置对象并设置运行模式与 AppName
val conf = new SparkConf().setMaster("local[*]").setAppName("UDAFDemo02")
// 根据配置对象初始化 SparkSession 对象
val spark = SparkSession.builder().config(conf).getOrCreate()
// 日志级别
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
// 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
import spark.implicits._
// ==================== 业务处理 ====================
// 数据准备
val df: DataFrame = spark.read
.option("header", "true")
.option("sep", ",")
.option("inferSchema", "true")
.csv("data/scott/emp.csv")
val emp: Dataset[Emp] = df.as[Emp]
emp.createOrReplaceTempView("emp")
// 注册 UDAF 函数(强类型自定义 UDAF 在 Spark 3.0.0 中的使用方式)
val my_avg = spark.udf.register("my_avg", functions.udaf(MyAvg))
// 在 SQL 中使用
val sql =
"""
|SELECT my_avg(sal) AS avg_sal FROM emp
|""".stripMargin
spark.sql(sql).show()
// 在 DSL 中使用
emp.select(my_avg('sal).as("avg_sal")).show()
// ==================== 关闭连接 ====================
spark.stop
}
// 缓存区数据的结构 Buff(求和, 计数)
case class Buff(var sum: Double, var count: Long)
/**
* 自定义 UDAF 聚合函数:计算薪资的平均值
* IN:输入数据的类型
* BUFF:缓存区数据的类型
* OUT:返回值数据的类型
*/
object MyAvg extends Aggregator[Double, Buff, Double] {
// 初始化缓冲区 Buff(求和, 计数)
override def zero: Buff = Buff(0D, 0L)
// 根据输入的数据更新缓冲区的数据
override def reduce(b: Buff, in: Double): Buff = {
// 累加每次输入的数据
b.sum += in
// 计数器每次 +1
b.count += 1
// 返回缓冲区对象
b
}
// 合并缓冲区
override def merge(b1: Buff, b2: Buff): Buff = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// 计算最终结果
override def finish(b: Buff): Double = b.sum / b.count
// 缓冲区数据的编码处理
// Encoders.product 是进行 Scala 元组和 case 类转换的编码器
//override def bufferEncoder: Encoder[Buff] = Encoders.product
// 或者
override def bufferEncoder: Encoder[Buff] = Encoders.kryo(classOf[Buff])
// 输出数据的编码处理
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

5.3.UDTF用户定义表创建函数

        先添加依赖:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20220924</version>
</dependency>

     案例:

/**
* 数据:{"movie": [{"movie_name": "肖申克的救赎", "movie_type": "犯罪" }, {"movie_name": "肖申克的救赎",
"movie_type": "剧情" }]}
* 需求:从一行 JSON 格式数据中取出 movie_name 和 movie_type 两个 Key 及其对应的 Value。K-V 输出的格式为:
* movie_name movie_type
* 肖申克的救赎 犯罪
* 肖申克的救赎 剧情
*/
class MyUDTF extends GenericUDTF {
// 实例化 UDTF 对象,判断传入参数的长度以及数据类型
// 和 Hive 的自定义 UDTF 不一样的是,Spark 使用的是已经过时的 initialize(ObjectInspector[] argOIs)
override def initialize(argOIs: Array[ObjectInspector]): StructObjectInspector = {
// 获取入参
// 参数校验,判断传入参数的长度以及数据类型
if (argOIs.length != 1) throw new UDFArgumentLengthException("参数个数必须为 1")
if (ObjectInspector.Category.PRIMITIVE != argOIs(0).getCategory) {
/*
UDFArgumentTypeException(int argumentId, String message)
异常对象需要传入两个参数:
int argumentId:参数的位置,ObjectInspector 中的下标
String message:异常提示信息
*/
throw new UDFArgumentTypeException(0, "参数类型必须为 String")
}
// 自定义函数输出的字段和类型
// 创建输出字段名称的集合
val columNames = new util.ArrayList[String]
// 创建字段数据类型的集合
val columType = new util.ArrayList[ObjectInspector]
columNames.add("movie_name")
columType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)
columNames.add("movie_type")
columType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)
ObjectInspectorFactory.getStandardStructObjectInspector(columNames, columType)
}
// 处理数据
override def process(objects: Array[AnyRef]): Unit = {
val outline = new Array[String](2)
if (objects(0) != null) {
val jsonObject = new JSONObject(objects(0).toString)
val jsonArray: JSONArray = jsonObject.getJSONArray("movie")
var i = 0
while ( {
i < jsonArray.length
}) {
outline(0) = jsonArray.getJSONObject(i).getString("movie_name")
outline(1) = jsonArray.getJSONObject(i).getString("movie_type")
// 将处理好的数据通过 forward 方法将数据按行写出
forward(outline)
i += 1
}
}
}
override def close(): Unit = {}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/60945.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Android】Service使用方法:本地服务 / 可通信服务 / 前台服务 / 远程服务(AIDL)

1 本地Service 这是最普通、最常用的后台服务Service。 1.1 使用步骤 步骤1&#xff1a;新建子类继承Service类&#xff1a;需重写父类的onCreate()、onStartCommand()、onDestroy()和onBind()方法步骤2&#xff1a;构建用于启动Service的Intent对象步骤3&#xff1a;调用st…

QML学习 —— 34、视频媒体播放器(附源码)

效果 说明 您可以单独使用MediaPlayer播放音频内容(如音频),也可以将其与VideoOutput结合使用以渲染视频。VideoOutput项支持未转换、拉伸和均匀缩放的视频演示。有关拉伸均匀缩放演示文稿的描述,请参见fillMode属性描述。 播放可能出错问题 出现的问题:      DirectS…

Spring MVC练习(前后端分离开发实例)

White graces&#xff1a;个人主页 &#x1f649;专栏推荐:Java入门知识&#x1f649; &#x1f439;今日诗词:二十五弦弹夜月&#xff0c;不胜清怨却飞来&#x1f439; ⛳️点赞 ☀️收藏⭐️关注&#x1f4ac;卑微小博主&#x1f64f; ⛳️点赞 ☀️收藏⭐️关注&#x1f4…

如何把大模型调教成派大星?

目录 主要内容模型图实验结果如何把大模型变成派大星&#xff1f;chatglm3-6B 数据集准备代码运行微调结果 文章声明&#xff1a;非广告&#xff0c;仅个人体验&#xff1a;参考文献&#xff1a;https://www.aspiringcode.com/content?id17197387451937&uid291a2ae1546b48…

国土安全部发布关键基础设施安全人工智能框架

美国国土安全部 (DHS) 发布建议&#xff0c;概述如何在关键基础设施中安全开发和部署人工智能 (AI)。 https://www.dhs.gov/news/2024/11/14/groundbreaking-framework-safe-and-secure-deployment-ai-critical-infrastructure 关键基础设施中人工智能的角色和职责框架 https:/…

QML TableView 实例演示 + 可能遇到的一些问题(Qt_6_5_3)

一、可能遇到的一些问题 Q1&#xff1a;如何禁用拖动&#xff1f; 在TableView下加一句代码即可&#xff1a; interactive: false 补充&#xff1a;这个属性并不专属于TableView&#xff0c;而是一个通用属性。很多Controls下的控件都可以使用&#xff0c;其主要作用就是控…

C基础上机题目51_55

51.字符数组x中存有任意一串字符&#xff1b;请编制函数&#xff0c;按给定的替代关系对数组x中的所有字符进行替代&#xff0c;仍存入数组x的对应的位置上&#xff0c;最后调用函数把结果x输出。 替代关系&#xff1a;f(p)p*11%256 (p是数组中某一个字符的ASCII值&#xff0c…

03-微服务搭建

1、搭建分布式基本环境 分布式组件 功能 SpringCloud Alibaba - Nacos 注册中心&#xff08;服务发现/注册&#xff09;、配置中心&#xff08;动态配置管理&#xff09; SpringCloud Alibaba - Sentinel 服务容错&#xff08;限流、降级、熔断&#xff09; SpringCloud …

Java八股(一)

目录 1.JVM、JRE、JDK之间的关系 2.static关键字作用&#xff08;通俗版&#xff09; 3.面向对象、面向过程 4.私有方法 5.Java代码执行与编译 6.IOC 1.JVM、JRE、JDK之间的关系 Java一次编写到处运行&#xff0c;可移植性好&#xff0c;保证这一点的就是iava虚拟机JVM …

Cannot find a valid baseurl for repo: centos-sclo-rh/x86_64

yum install 报错: Cannot find a valid baseurl for repo: centos-sclo-rh/x86_64 CentOS7的SCL源在2024年6月30日停止维护了。 当scl源里面默认使用了centos官方的地址&#xff0c;无法连接&#xff0c;需要替换为阿里云。 cd /etc/yum.repos.d/ 找到 CentOS-SCLo-scl.repo 和…

Vue前端开发-slot传参

slot 又称插槽&#xff0c;它是在子组件中为父组件提供的一个占位符&#xff0c;使用来表示&#xff0c;通过这个占位符&#xff0c;父组件可以向中填充任意的内容代码&#xff0c;这些代码将自动替换占位符的位置&#xff0c;从而轻松实现在父组件中控制子组件内容的需求。 作…

如何在 Ubuntu 22.04 上安装带有 Nginx 的 ELK Stack

今天我们来聊聊如何在 Ubuntu 22.04 服务器上安装 ELK Stack&#xff0c;并集成 Nginx 作为 Web 服务器&#xff0c;同时使用 Let’s Encrypt Certbot 进行 SSL 认证。ELK Stack&#xff0c;包括 Elasticsearch、Logstash 和 Kibana&#xff0c;是一套强大的工具&#xff0c;用…

快速理解微服务中Sentinel怎么实现限流

Sentinel是通过动态管理限流规则&#xff0c;根据定义的规则对请求进行限流控制。 一.实现步骤 1.定义资源&#xff1a;在Sentinel中&#xff0c;资源可以是URL、方法等&#xff0c;用于标识需要进行限流的请求&#xff1b;(在Sentinel中&#xff0c;需要我们去告诉Sentinel哪些…

基于单片机的智慧小区人脸识别门禁系统

本设计基于单片机的智慧小区人脸识别门禁系统。由STM32F103C8T6单片机核心板、显示模块、摄像头模块、舵机模块、按键模块和电源模块组成。可以通过摄像头模块对进入人员人脸数据进行采集&#xff0c;识别成功后&#xff0c;舵机模块动作&#xff0c;模拟门禁打开&#xff0c;门…

llama-factory 系列教程 (七),Qwen2.5-7B-Instruct 模型微调与vllm部署详细流程实战

文章目录 介绍llama-factory 安装装包下载模型 微调模型数据集训练模型 微调后的模型推理 介绍 时隔已久的 llama-factory 系列教程更新了。本篇文章是第七篇&#xff0c;之前的六篇&#xff0c;大家酌情选看即可。 因为llama-factory进行了更新&#xff0c;我前面几篇文章的实…

利用Docker容器技术部署发布web应用程序

Docker是什么&#xff1f; docker 是一个开源的应用容器引擎&#xff0c;可以帮助开发者打包应用以及依赖包到一个可移植的容器中&#xff0c;然后发布到任何流行的Linux机器上&#xff0c;也可以实现虚拟化&#xff0c;容器是完全使用沙箱机制&#xff0c;相互之间不会有任何…

SpringCloud框架学习(第五部分:SpringCloud Alibaba入门和 nacos)

目录 十二、SpringCloud Alibaba入门简介 1. 基本介绍 2.作用 3.版本选型 十三、 SpringCloud Alibaba Nacos服务注册和配置中心 1.简介 2.各种注册中心比较 3.下载安装 4.Nacos Discovery服务注册中心 &#xff08;1&#xff09; 基于 Nacos 的服务提供者 &#xf…

Linux—进程概念学习-03

目录 Linux—进程学习—31.进程优先级1.1Linux中的进程优先级1.2修改进程优先级—top 2.进程的其他概念3.进程切换4.环境变量4.0环境变量的理解4.1环境变量的基本概念4.2添加环境变量—export4.3Linux中环境变量的由来4.4常见环境变量4.5和环境变量相关的命令4.6通过系统调用获…

信创改造 - TongRDS 替换 Redis

记得开放 6379 端口哦 1&#xff09;首先在服务器上安装好 TongRDS 2&#xff09;替换 redis 的 host&#xff0c;post&#xff0c;passwd 3&#xff09;TongRDS 兼容 jedis # 例如&#xff1a;更改原先 redis 中对应的 host&#xff0c;post&#xff0c;passwd 改成 TongRDS…

Python 爬虫入门教程:从零构建你的第一个网络爬虫

网络爬虫是一种自动化程序&#xff0c;用于从网站抓取数据。Python 凭借其丰富的库和简单的语法&#xff0c;是构建网络爬虫的理想语言。本文将带你从零开始学习 Python 爬虫的基本知识&#xff0c;并实现一个简单的爬虫项目。 1. 什么是网络爬虫&#xff1f; 网络爬虫&#x…