【spark】SparkSQL

目录

  • SparkSQL
    • 01.快速入门
      • 什么是SparkSQL
      • 为什么学习SparkSQL
      • SparkSQL的特点
      • SparkSQL发展历史-前身Shark框架
      • SparkSQL发展历史
    • 02.SparkSQL概述
      • SparkSQL和Hive的异同
      • SparkSQL的数据抽象
      • DataFrame概述
      • SparkSession对象
    • 03.DataFrame入门和操作
      • DataFrame的组成
        • DataFrame的代码构建-基于RDD-1
        • DataFrame的代码构建-基于RDD-2
        • DataFrame的代码构建-基于RDD-3
        • DataFrame的代码构建-基于Pandas的DataFrame
        • DataFrame的代码构建-读取外部数据-text
        • DataFrame的代码构建-读取外部数据-json
        • DataFrame的代码构建-读取外部数据-csv
        • DataFrame的代码构建-读取外部数据-parquet
      • DataFrame的入门操作
      • SparkSQL数据清洗API
      • DataFrame数据写出
    • 04.SparkSQL函数定义
      • SparkSQL定义UDF
      • SparkSQL使用窗口函数
    • 05.SparkSQL的运行流程
      • SparkSQL的自动优化
      • Catalyst优化器
      • SparkSQL的执行流程
    • 06.SparkSQL整合Hive
      • Hive执行流程
      • SparkOn Hive
    • 07.分布式SQL引擎配置

SparkSQL

01.快速入门

什么是SparkSQL

SparkSQL是Spark的一个模块,用于处理海量结构化数据

为什么学习SparkSQL

SparkSQL是非常成熟的海量结构化数据处理框架:
学习SparkSQL主要在2个点:

  • SparkSQL本身十分优秀,支持SQL语言、性能强、可以自动优化、API简单、兼容HIVE等等
  • 企业大面积在使用SparkSQL处理业务数据
    1、离线开发
    2、数仓搭建
    3、科学计算
    4、数据分析

SparkSQL的特点

  • 融合性:SQL可以无缝集成在代码中,随时用SQL处理数据
  • 统一数据访问:一套标准API可读写不同数据源
  • Hive兼容:可以使用SparkSQL直接计算并生成Hive数据表
  • 标准化连接:支持标准化JDBC/ODBC连接,方便和各种数据源进行数据交互

SparkSQL发展历史-前身Shark框架

在这里插入图片描述

SparkSQL发展历史

在这里插入图片描述

02.SparkSQL概述

SparkSQL和Hive的异同

相同点:
1、分布式SQL计算引擎
2、构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能

不同点:
在这里插入图片描述

SparkSQL的数据抽象

1、SparkSQL-DataFrame

- 二维表数据结构
- 分布式结构集合(分区)

2、SparkSQL FOor JVM-DataSet【可用于Java\Scala\语言】
3、SparkSQL For Python\R-DataFrame【可用于Java\Scale\Python\R】

DataFrame概述

在这里插入图片描述

在这里插入图片描述
DataFrame是按照二维表格的形式存储数据
RDD则是存储对象本身

SparkSession对象

在RDD阶段,程序的执行入口对象是SparkContext
在Sparke2.0后,推出SparkSeaaion对象,作为Spark编码的统一入口对象

SparkSession对象可以:
1、用于SparkSQL编程作为入口对象
2、用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
在这里插入图片描述

03.DataFrame入门和操作

DataFrame的组成

DataFrame是一个二维表结构,那么表格结构就有无法绕开的三个点:

  • 表结构表述
    比如MySQL中的一张表:

  • 由许多行组成

  • 数据也可以被分成多个列

  • 表也有表结构信息(列、列名、列类型、列约束等)
    在结构层面上:

  • StructType对象描述整个DataFrame的表结构

  • StructField对象描述一个列的信息
    在数据层面上:

  • Row对象记录一行数据

  • Column对象记录一列数据并包含列的信息

在这里插入图片描述
在表结构层面,DataFrame的表结构由:
StructType描述:

struct_type = StructType().\add("id",IntegerType(),False).\add("name",StringType(),True).\add("age",IntegerType(),False)

一个StructField记录:列名、列类型、列是否运行为空
多个StructField组成一个StructedType对象
一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空

一行数据描述为Row对象,如Row(1,张三,11)
一列数据描述为Column对象,Column对象包含一列数据和列的信息

DataFrame的代码构建-基于RDD-1
#coding:utf8from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContextrdd = sc.testFile("../data/input/sql/people.txt).map(lambda x:x.spalit(",")).map(lambda x:(x[0],int(x[1])))#2.构建DataFrame对象## 参数一:被转换的rdd## 参数二:指定列名,通过list的形式指定,按照顺序依次提供字符串名称即可df = spark.createDataFrame(rdd,schema=['name','age'])df.printSchema()## 参数一:表示 展示出来多少条数据,默认不传的话是20## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替## 如果给False,表示不截断全部显示,默认是Truedf.show(20,False)
DataFrame的代码构建-基于RDD-2
#coding:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerTypeif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContextrdd = sc.testFile("../data/input/sql/people.txt).map(lambda x:x.spalit(",")).map(lambda x:(x[0],int(x[1])))schema=StructType().add('name',StringType(),True).add('age'IntegerType(),False)df = spark.createDataFrame(rdd,schema)df.printSchema()## 参数一:表示 展示出来多少条数据,默认不传的话是20## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替## 如果给False,表示不截断全部显示,默认是Truedf.show(20,False)
DataFrame的代码构建-基于RDD-3

该方法用于对数据类型不敏感

#coding:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerTypeif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContextrdd = sc.testFile("../data/input/sql/people.txt).map(lambda x:x.spalit(",")).map(lambda x:(x[0],int(x[1])))# toDF的方式构建DataFramedf1 = rdd.toDF(['name','age'])# 方法二schema=StructType().add('name',StringType(),True).add('age'IntegerType(),False)rdd.toDF(schema)df1.printSchema()## 参数一:表示 展示出来多少条数据,默认不传的话是20## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替## 如果给False,表示不截断全部显示,默认是Truedf1.show(20,False)
DataFrame的代码构建-基于Pandas的DataFrame
#coding:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerTypeif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContext# 基于Pandas的DataFrame构建SparkSQL的DataFrame对象pdf = pd.DataFrame({'id':[1,2,3],'name':['张大仙','王小小','王大锤'],'age':[11,11,11]})# 将Pandas的DF对象转换成SparkDFdf1 = spark.createDataFrame(pdf)df1.printSchema()## 参数一:表示 展示出来多少条数据,默认不传的话是20## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替## 如果给False,表示不截断全部显示,默认是Truedf1.show(20,False)
DataFrame的代码构建-读取外部数据-text

构建StructType,text数据源,读取数据的特点是,是将一整行只作为一个列读取,默认列名是value 类型是String

spark session.read.format(“text|csv|json|parquet|orc|avro|jdbc…”)
.option(“k”,“v”)#option可选
.schema(StructType|String)#STRING的语法如。Schema(“name STRING”,“age INT” )
.load(“被读取文件的路径,支持本地文件系统和HDFS”)

#coding:utf8from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContext# 构建StructType,text数据源,读取数据的特点是,是将一整行只作为一个列读取,默认列名是value 类型是Stringschema = StructType().add('data',StirngType(),True)df = spark.read.format('text').schema(schema=schema).load('../data/input/sql/people.txt')df.printSchema()## 参数一:表示 展示出来多少条数据,默认不传的话是20## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替## 如果给False,表示不截断全部显示,默认是Truedf.show(20,False)
DataFrame的代码构建-读取外部数据-json

json类型自带有Schema信息

#coding:utf8from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContext# json类型自带有Schema信息schema = StructType().add('data',StirngType(),True)df = spark.read.format('json').load('../data/input/sql/people.txt')df.printSchema()## 参数一:表示 展示出来多少条数据,默认不传的话是20## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替## 如果给False,表示不截断全部显示,默认是Truedf.show(20,False)
DataFrame的代码构建-读取外部数据-csv
#coding:utf8from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContext# json类型自带有Schema信息schema = StructType().add('data',StirngType(),True)df = spark.read.format('csv').\option('sep',';').\option('header',True).\option('encoding','utf-8').\schema('name STRING age INT,job STRING').\load('../data/input/sql/people.txt')df.printSchema()df.show(20,False)
DataFrame的代码构建-读取外部数据-parquet

parquet:是spark中常用的一种列式存储文件格式,和Hive中ORC差不多,他俩都是列存储格式

parquet对比普通文本文件的区别

  • parquet内置schema(列名、列类型、是否为空)
  • 存储是以列作为存储格式
  • 存储时序列化存储在文件中的,有压缩属性体积小
#coding:utf8from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContext# parquet类型自带有Schema信息schema = StructType().add('data',StirngType(),True)df = spark.read.format('parquet').load('../data/input/sql/people.txt')df.printSchema()df.show(20,False)

DataFrame的入门操作

DataFrame支持两种风格进行编程,分别是:

  • DSL风格:DataFrame的特有API,调用API的方式来处理Data
#coding:utf8from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContext# parquet类型自带有Schema信息df = spark.read.format('csv').load('../data/input/sql/people.txt')# column对象的获取id_column = df['id']subject_column = df['subject']# DSL风格演示df.select(["id","subject"]).show()df.select ("id","subject").show()df.select(id_column,subject_column) # filter APIdf.filter("score < 99").show()df.filter(df['score'] < 99).show()# where APIdf.where("score < 99").show()df.where(df['score'] < 99).show()# group by APIdf.groupBy("subject").count().show()df.groupBy(df['subject']).count().show()
  • SQL风格:spark.sql(“select * from XXX”)
    使用sparj.sql()来执行SQL语句查询,结果返回一个DataFrame
df.createTempView("score") #注册一个临时视图
df.createOrReplaceTempView("socre") #注册一个临时表,如果存在,进行替换
df.createGlobalTempView("score") # 注册一个全局表

全局表:跨sparksession对象使用,在一个程序内的多个sparkSession中均可调用,查询前带上前缀
global_temp.

SparkSQL数据清洗API

  • 去重方法:dropDuplicates
  • 缺失值处理:
    • dropna 是可以对缺失值进行删除;只要列中有null 就删除这一行数据
      参数:thread=3表示,至少满足3个有效列,不满足就删除当前数据
    • fillna(“loss”) 对缺失值的列进行填充
    • fillna(“N/A”,subset=[‘job’])指定列进行填充
    • fillna({‘name’:‘未知姓名’,‘age’:1,‘job’:‘worker’})设定一个字典,对所有的列提供填充规则

DataFrame数据写出

SparkSQL 统一API写出DataFrame数据
df.write.mode().format().option(K,V).save(PATH)

  • mode,传入模式字符串可选:append追加,overwrite覆盖,ignore忽略,error重复就报异常(默认的)
  • format,传入格式字符串,可选:text,csv,json,parquet,orc,avro,jdbc
  • save 写出的路径,支持本地文件和HDFS

04.SparkSQL函数定义

SparkSQL定义UDF

pyspark UDF

SparkSQL使用窗口函数

  • 聚合开窗函数

  • 排序开窗函数
    – ROW_NUMBER() OVER()
    –DENSE_RANK() OVER()
    –RANK() OVER()

  • NTILE分组窗口

05.SparkSQL的运行流程

SparkSQL的自动优化

RDD的运行完全是按照开发者的代码执行,如果开发者水平有限,RDD的执行效率也会收到影响
而SparkSQL会对写完的代码,执行“自动优化”,以提升代码运行效率,避免开发者水平影响到代码执行效率;依赖于:Catalyst优化器

Catalyst优化器

在这里插入图片描述
STEP1:解析SQL,并生成AST(抽象语法树)
在这里插入图片描述在这里插入图片描述
在这里插入图片描述

大方面的优化点有2个:

  • 谓词下推、断言下推:将逻辑判断提前到前面,以减少shuffle阶段的数据量
  • 列值剪裁:将加载的列进行剪裁,尽量减少被处理数据的宽度

SparkSQL的执行流程

06.SparkSQL整合Hive

Hive执行流程

在这里插入图片描述

SparkOn Hive

在这里插入图片描述

Spark On Hive就是因为Spark自身没有元数据管理功能,所以使用Hive的Metastore服务做为元数据管理服务。计算有Spark执行

07.分布式SQL引擎配置

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/634077.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

NativePHP:开发跨平台原生应用的强大工具

NativePHP 是一种创新性的技术&#xff0c;可以帮助开发者使用 PHP 语言构建原生应用程序。本文将介绍 NativePHP 的概念和优势&#xff0c;探讨其在跨平台应用开发中的应用&#xff0c;并提供一些使用 NativePHP 开发原生应用的最佳实践。 什么是 NativePHP&#xff1f; Nati…

js实现iframe内容加载失败自动重新加载功能

最近一个项目上的程序经常出现掉线的情况&#xff0c;经排查是该单位的网络不稳定&#xff0c;存在网络丢包现象。导致有时候程序运行加载页面失败&#xff0c;开机自启动应用时出现请求失败的概率非常大&#xff0c;为了解决这个问题我在网上东找西找也没有找到有效的解决办法…

丰果管道——2024中国家装管道十大品牌

丰果管道——2024中国家装管道十大品牌 丰果&#xff08;中国&#xff09;有限公司 丰果管道品牌创立于1999年&#xff0c;是国内最早从事PPR家装管道生产的品牌之一&#xff0c;在业内有着良好的口碑和市场美誉度&#xff0c;在全国的头部装企更是有相当高的市场占有率。2023年…

猫咪发腮长肉吃什么?5款适合猫咪发腮长肉的猫罐头推荐

随着冬季的来临&#xff0c;北方的小猫咪们因为有暖气&#xff0c;日子还算好过。然而南方的猫咪们只能依靠自己的抵抗力来度过这个寒冷的季节。为了确保这些怕冷的小家伙能温暖地度过冬天&#xff0c;铲屎官们是不是该考虑为它们囤积一些肉肉呢&#xff1f; 有些猫咪&#xf…

无货源跨境电商到底应该怎么做,新手必看

如今&#xff0c;跨境电商无疑已经成为了一个热门的创业领域&#xff0c;但对于一些新手来说&#xff0c;面临的一个主要挑战是如何处理产品的货源问题。下面我就和大家分享一下无货源跨境电商的基本概念以及一些新手可以采取的策略和步骤&#xff0c;帮助大家在这个领域取得成…

C#MQTT编程08--MQTT服务器和客户端(cmd版)

1、前言 前面完成了winform版&#xff0c;wpf版&#xff0c;为什么要搞个cmd版&#xff0c;因为前面介绍了mqtt的报文结构&#xff0c;重点分析了【连接报文】&#xff0c;【订阅报文】&#xff0c;【发布报文】&#xff0c;这节就要就看看实际报文是怎么组装的&#xff0c;这…

Flink编程——风险欺诈检测

Flink 风险欺诈检测 文章目录 Flink 风险欺诈检测背景准备条件FraudDetectionJob.javaFraudDetector.java 代码分析执行环境创建数据源对事件分区 & 欺诈检测输出结果运行作业欺诈检测器 欺诈检测器 v1&#xff1a;状态欺诈检测器 v2&#xff1a;状态 时间完整的程序期望的…

3C市场发展态势疲软?直播电商带来新机遇

“ 能否迎来新生 ” 文&#xff5c;王娴 编辑 | 靳淇 出品&#xff5c;极新 目前直播电商成为家电3C行业不可替代的经营阵地,电商生态的重构为关注沟通渠道与销售通路的家电3C行业带来更多选择。内容场与货架场贯通的独特优势&#xff0c;正吸引商家加速入场全域兴趣电商。…

Failed at the node sass@4.14.1 postinstall script.

首先&#xff0c;查看node和 npm版本 #用于列出已安装的 Node.js 版本。 nvm ls #切换node版本 nvm use 12.17.0 #换国内镜像源&#xff1a;&#xff08;单独设置sass的安装源。&#xff09; npm config set sass_binary_sitehttps://npm.taobao.org/mirrors/node-sass …

外卖系统创新:智能推荐与用户个性化体验

外卖系统的日益普及使得用户对于更智能、个性化的体验有着不断增长的期望。在这篇文章中&#xff0c;我们将探讨如何通过智能推荐技术&#xff0c;为用户提供更贴心、更符合口味的外卖选择。我们将使用 Python 和基于协同过滤的推荐算法作为示例&#xff0c;让您更深入地了解智…

VBA_MF系列技术资料1-315

MF系列VBA技术资料 为了让广大学员在VBA编程中有切实可行的思路及有效的提高自己的编程技巧&#xff0c;我参考大量的资料&#xff0c;并结合自己的经验总结了这份MF系列VBA技术综合资料&#xff0c;而且开放源码&#xff08;MF04除外&#xff09;&#xff0c;其中MF01-04属于…

CSS常见元素类型 盒子模型

文章目录 常见元素类型块元素内联元素空元素修改元素类型测试元素类型 盒子模型标准文本流:外边距和内边距测试盒子模型 常见元素类型 块元素 常见块元素: div p h1~h6 ul li img 这些元素结束之后自带换行&#xff0c;一行只能存在一个元素&#xff0c;无法横向排列&#xf…

selenium代理ip可用性测试

测试代理ip是否工作正常&#xff0c;将正常的代理ip提取出来 from selenium import webdriver from fake_useragent import UserAgent def check_proxy(proxy):print("开始测试&#xff1a;"proxy)chrome_options webdriver.ChromeOptions()chrome_options.add_arg…

html + css + js简单的项目

以下内容直接复制粘贴就能运行 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title&…

uniapp打包配置 (安卓+ios)

TOC 基础配置 HBuilderX中打开项目的manifest.json文件&#xff0c;在“基础配置”中可以设置App的应用名称、版本号等信息&#xff1a; 应用标识 DCloud应用appid&#xff08;简称appid&#xff09;是由DCloud创建App项目时生成的唯一标识&#xff0c;关联DCloud云端服务&…

CentOS 8.5 安装图解

特特特别的说明 CentOS发行版已经不再适合应用于生产环境&#xff0c;客观条件不得不用的话&#xff0c;优选7.9版本&#xff0c;8.5版本次之&#xff0c;最次6.10版本&#xff08;比如说Oracle 11GR2就建议在6版本上部署&#xff09;&#xff01; 引导和开始安装 选择倒计时结…

原生微信小程AR序实现模型动画播放只播放一次,且停留在最后一秒

1.效果展示 0868d9b9f56517a9a07dfc180cddecb2 2.微信小程序AR是2023年初发布&#xff0c;还有很多问提&#xff08;比如glb模型不能直接播放最后一帧&#xff1b;AR识别不了金属、玻璃材质的模型等…有问题解决了的小伙伴记得告诉我一声&#xff09; 微信官方文档地址 3.代码…

软件测试阶段简介_单元测试、集成测试、配置项测试、系统测试

文章目录 前言一、软件测试“V”模型二、单元测试三、集成测试四、配置项测试五、系统测试总结 前言 一般来说&#xff0c;按照软件的研制阶段划分&#xff0c;软件测试可分为单元测试、集成测试、配置项测试、系统测试等。本文将对上述各测试阶段进行逐一介绍。 一、软件测试…

Redis--Zset使用场景举例(滑动窗口实现限流)

文章目录 前言什么是滑动窗口zset实现滑动窗口小结附录 前言 在Redis–Zset的语法和使用场景举例&#xff08;朋友圈点赞&#xff0c;排行榜&#xff09;一文中&#xff0c;提及了redis数据结构zset的指令语法和一些使用场景&#xff0c;今天我们使用zset来实现滑动窗口限流&a…

Python高级编程之IO模型与协程

更多Python学习内容&#xff1a;ipengtao.com 在Python高级编程中&#xff0c;IO模型和协程是两个重要的概念&#xff0c;它们在处理输入输出以及异步编程方面发挥着关键作用。本文将介绍Python中的不同IO模型以及协程的概念、原理和用法&#xff0c;并提供丰富的示例代码来帮助…