PySpark(四)PySpark SQL、Catalyst优化器、Spark SQL的执行流程

目录

PySpark SQL

基础

SparkSession对象

DataFrame入门

 DataFrame构建

DataFrame代码风格

 DSL

SQL

SparkSQL Shuffle 分区数目

 DataFrame数据写出

Spark UDF

Catalyst优化器 

Spark SQL的执行流程


PySpark SQL

基础

PySpark SQL与Hive的异同

Hive和Spark 均是:“分布式SQL计算引擎”
均是构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能。
目前,企业中使用Hive仍旧居多,但SparkSQL将会在很近的未来替代Hive成为分布式SQL计算市场的顶级

这里的重点是:Spark SQL能支持SQL和其他代码混合执行,自由度更高,且其是内存计算,更快。但是其没有元数据管理,然而它最终还是会作用到Hive层面,可以调用Hive的Metasotre

SparkSQL的基本对象是DataFrame,其特点及与其他对象的区别为: 

 SparkSQL 其实有3类数据抽象对象

  • SchemaRDD对象 (已废弃)
  • DataSet对象: 可用于Java、Scala语言
  • DataFrame对象:可用于Java、Scala、Python、R

SparkSession对象

 在RDD阶段,程序的执行入口对象是: SparkContext
在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象
SparkSession对象可以:
-用于SparkSQL编程作为入口对象
- 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext

from pyspark.sql import SparkSession
if __name__ == '__main__':spark =  SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()sc = spark.sparkContext

DataFrame入门

DataFrame的组成如下
在结构层面
StructType对象描述整个DataFrame的表结构

StructField对象描述一个列的信息
在数据层面
Row对象记录一行数据
Column对象记录一列数据并包含列的信息

 DataFrame构建

1、用RDD进行构建

rdd的结构要求为:[[xx,xx],[xx,xx]]

spark.createDataFrame(rdd,schema=[])

    spark =  SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()sc = spark.sparkContextrdd = sc.textFile('data/input/sql/people.txt').map(lambda x:x.split(',')).map(lambda x:[x[0],int(x[1])])print(rdd.collect())# [['Michael', 29], ['Andy', 30], ['Justin', 19]]df = spark.createDataFrame(rdd,schema=['name','age'])df.printSchema()#打印表结构df.show()#打印表
#     root
#     | -- name: string(nullable=true)
#     | -- age: long(nullable=true)
# 
# +-------+---+
# | name | age |
# +-------+---+
# | Michael | 29 |
# | Andy | 30 |
# | Justin | 19 |
# +-------+---+

2、利用StructType进行创建

需要先引入StructType,StringType,IntegerType等构建schema

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType
if __name__ == '__main__':spark =  SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()sc = spark.sparkContextrdd = sc.textFile('data/input/sql/people.txt').map(lambda x:x.split(',')).map(lambda x:[x[0],int(x[1])])
#构建schema    
schema =StructType().add("name",StringType(),nullable=False).\add('age',IntegerType(),nullable=True)df = spark.createDataFrame(rdd,schema=schema)df.printSchema()df.show()

3、toDF将rdd转换为df

下面展示了两种方式

    # 只设定列名,列的数据结构则是内部自己判断df = rdd.toDF(['name','age'])df.printSchema()# root# | -- name: string(nullable=true)# | -- age: long(nullable=true)# 设定列名和数据类型schema =StructType().add("name",StringType(),nullable=False).\add('age',IntegerType(),nullable=True)df = rdd.toDF(schema=schema)df.printSchema()# root# | -- name: string(nullable=false)# | -- age: integer(nullable=true)

4、基于pandas构建 

    dfp = pd.DataFrame({"id":[1,2,3],'score':[99,98,100]})df = spark.createDataFrame(dfp)df.printSchema()df.show()# root# | -- id: long(nullable=true)# | -- score: long(nullable=true)# # +---+-----+# | id | score |# +---+-----+# | 1 | 99 |# | 2 | 98 |# | 3 | 100 |# +---+-----+

5、通过文件读取创造

在读取json和parquet文件时不需要设定schema,因为文件已经自带

而读取csv时,还需要使用.option设定 header等参数 

这里说一下parquet文件

parquet:是Spark中常用的一种列式存储文件格式
和Hive中的ORC差不多,他俩都是列存储格式
parquet对比普通的文本文件的区别:

  • parquet 内置schema(列名列类型 是否为空)
  • 存储是以列作为存储格式
  • 存储是序列化存储在文件中的(有压缩属性体积小)

DataFrame代码风格

 DataFrame支持两种风格进行编程,分别是DSL风格SQL风格
DSL语法风格
DSL称之为:领域特定语言
其实就是指DataFrame的特有API
DSL风格意思就是以调用API的方式来处理Data比如: df.where0.limit0
SQL语法风格
SQL风格就是使用SQL语句处理DataFrame的数据比如: spark.sql(“SELECT*FROM xxx)

 DSL

其实就是用其内置的API处理数据,举例:

    df.select('id','subject').show()df.where('subject="语文"').show()df.select('id','subject').where('subject="语文"').show()df.groupBy('subject').count().show()

API其实跟SQL类似,这里不详细说明了,个人感觉不如直接写SQL语句

SQL

DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sgl0来执行SQL语句查询,结果返回一个DataFrame。如果想使用SQL风格的语法,需要将DataFrame注册成表采用如下的方式:

    df.createTempView('tmp') #创建临时视图df.createGlobalTempView('global_tmp')#创建全局试图# 全局表: 跨SparkSession对象使用在一个程序内的多个SparkSession中均可调用查询前带上前缀:global_tmpdf.createOrReplaceTempView('repalce_tmp')#创建临时表,如果存在则替换

然后使用spark.sql的形式书写sql代码

    spark.sql('select * from tmp where subject = "语文"').show()spark.sql('select id,score from repalce_tmp where score>90').show()spark.sql('select subject,max(score) from global_temp.global_tmp group by subject').show()

SparkSQL Shuffle 分区数目

 原因: 在SparkSQL中当Job中产生Shufle时,默认的分区数 spark.sql.shufle,partitions 为200,在实际项目中要合理的设置。
在代码中可以设置:

spark =  SparkSession.builder.appName('lmx').\
master('local[*]').config('spark.sql.shufle,partitions',2).\
getOrCreate()

spark.sqL.shuffle.partitions 参数指的是,在sql计算中,shuffle算子阶段默认的分区数是200

对于集群模式来说,200个默认也算比较合适

如在Local下运行,200个很多,在调度上会带宋限外的损耗,所以在Local下建议修改比较低, 比如2\4\10均可,这个参数和Spark RDD中设置并行度的参数是相互独立的

 DataFrame数据写出

统一API:

下面提供两种方法,分别写出为json和csv

    spark.sql('select user_id,avg(score) avg_score from tmp group by user_id order by avg_score desc').write.mode('overwrite').format('json').save('data/output/1t')spark.sql('select user_id,avg(score) avg_score from tmp group by user_id order by avg_score desc').write.mode('overwrite').format('csv')\.option('header',True)\.option('sep',';')\.save('data/output/csv')

其他的一些方法: 

SparkSQL中读取数据和写出数据 - 知乎

不过这里似乎不能自己命名导出的数据文件

Spark UDF

无论Hive还是SparKSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在pyspark.sql.functions中SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。回顾Hive中自定义函数有三种类型:
第一种:UDF(User-Defined-Function)函数.
一对一的关系,输入一个值经过函数以后输出一个值;
在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;

第二种:UDAF(User-Defined Aggregation Function)聚合函数

多对一的关系,输入多个值输出一个值,通常与groupBy联合使用;

第三种:UDTF(User-DefinedTable-Generating Functions)函数

一对多的关系,输入一个值输出多个值(一行变为多行),用户自定义生成函数,有点像flatMap;

在SparkSQL中,目前仅仅支持UDF函数和UDAF函数,目前Python仅支持UDF 

UDF有两种定义方式

方式1语法
udf对象=sparksession.udfregister(参数1,参数2,参数3)

参数1:UDF名称,可用于SQL风格

参数2:被注册成UDF的方法名
参数3:声明UDF的返回值类型

udf对象:返回值对象,是一个UDF对象,可用于DSL风格
方式2语法

from pyspark.sql import functions as F

udf对象 = F.udf(参数1,参数2)

参数1:被注册成UDF的方法名

参数2:声明UDF的返回值类型

udf对象:返回值对象,是一个UDF对象,可用于DSL风格

举例:

    def double_score(num):return 2*numudf1 = spark.udf.register('udf_1',double_score,IntegerType())# dsl风格df.select(udf1(df['score'])).show()# sql风格df.selectExpr('udf_1(score)').show()# sql风格2df.createTempView('tmp')spark.sql("select udf_1(score) from tmp").show()udf2 = F.udf(double_score,IntegerType())df.select(udf2(df['score'])).show()

当返回值是数组时,需要定义数组内部数据的数据类型:ArrayType(StringType())

    spark =  SparkSession.builder.appName('lmx').master('local[*]').config('spark.sql.shufle,partitions',2).getOrCreate()sc = spark.sparkContextrdd=sc.parallelize([['i love you'],['i like you']])df = rdd.toDF(['ifo'])def func(num):return num.split(' ')udf = spark.udf.register('udf_sql',func,ArrayType(StringType()))# dsl风格df.select(udf(df['ifo'])).show()

当返回值是字典时,需要使用StructType(),且定义每个列的名字(需要跟函数返回值的列名一样)和数据类型

    rdd=sc.parallelize([[1],[2],[3],[4],[5]])df = rdd.toDF(['ifo'])df.show()def func(num):return {'num':num,'num1':num+10}udf = spark.udf.register('udf_sql',func,StructType().\add('num',IntegerType(),nullable=False).\add('num1',IntegerType(),nullable=False))df.select(udf(df['ifo'])).show()

Catalyst优化器 

RDD的执行流程为:

代码 ->DAG调度器逻辑任务 ->Task调度器任务分配和管理监控 ->Worker干活

SparkSQL会对写完的代码,执行“自动优化”,既Catalyst优化器,以提升代码运行效率,避免开发者水平影响到代码执行效率。 (RDD代码不会,是因为RDD的数据对象太过复杂,无法被针对性的优化)

加入优化的SparkSQL大致架构为:

1.API 层简单的说就是 Spark 会通过一些 API 接受 SQL 语句

2.收到 SQL 语句以后,将其交给 Catalyst,Catalyst 负责解析 SQL,生成执行计划等

3.Catalyst 的输出应该是 RDD 的执行计划

4.最终交由集群运行 

 Catalyst优化器主要分为四个步骤

1、解析sql,生成AST(抽象语法树)

2、在 AST 中加入元数据信息,做这一步主要是为了一些优化,例如 col=col 这样的条件

以上面的图为例:

  • score.id → id#1#L 为 score.id 生成 id 为1,类型是 Long
  • score.math_score→math_score#2#L为 score.math_score 生成 id 为 2,类型为 Long
  • people.id→id#3#L为 people.id 生成 id 为3,类型为 Long
  • people.age→age#4#L为 people.age 生成 id 为 4,类型为 Long 

3、对已经加入元数据的 AST,输入优化器,进行优化,主要包含两种常见的优化:

谓词下推(Predicate Pushdown)\ 断言下推:将逻辑判断 提前到前面,以减少shuffle阶段的数据量。

以上面的demo举例,可以先进行people.age>10的判断再进行Join等操作。

列值裁剪(Column Pruning):将加载的列进行裁剪,尽量减少被处理数据的宽度

以上面的demo举例,由于只select了score和id,所以开始的时候,可以只保留这两个列,由于parquet是按列存储的,所以很适合这个操作

4、上面的过程生成的 AST 其实最终还没办法直接运行,这个 AST 叫做 逻辑计划,结束后,需要生成 物理计划,从而生成 RDD 来运行

Spark SQL的执行流程

如此,Spark SQL的执行流程为: 

1.提交SparkSQL代码
2.catalyst优化
        a.生成原始AST语法数
        b.标记AST元数据
        c.进行断言下推和列值裁剪 以及其它方面的优化作用在AST上
        d.将最终AST得到,生成执行计划
        e.将执行计划翻译为RDD代码
3.Driver执行环境入口构建(SparkSession)
4.DAG 调度器规划逻辑任务
5.TASK 调度区分配逻辑任务到具体Executor上工作并监控管理任务
6.Worker干活

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/668778.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据挖掘实战-基于决策树算法构建北京市空气质量预测模型

🤵‍♂️ 个人主页:艾派森的个人主页 ✍🏻作者简介:Python学习者 🐋 希望大家多多支持,我们一起进步!😄 如果文章对你有帮助的话, 欢迎评论 💬点赞&#x1f4…

ChatGPT Plus如何升级?信用卡付款失败怎么办?如何使用信用卡升级 ChatGPT Plus?

ChatGPT Plus是OpenAI提供的一种高级服务,它相较于标准版本,提供了更快的响应速度、更强大的功能,并且用户可以优先体验到新推出的功能。 尽管许多用户愿意支付 20 美元的月费来订阅 GPT-4,但在实际支付过程中,特别是…

【面试深度解析】腾讯音乐校招 Java 后端一面:SpringBoot工作机制、缓存雪崩、数据一致性、MySQL索引失效(下)

欢迎关注公众号(通过文章导读关注:【11来了】),及时收到 AI 前沿项目工具及新技术的推送! 在我后台回复 「资料」 可领取编程高频电子书! 在我后台回复「面试」可领取硬核面试笔记! 文章导读地址…

运维自动化bingo前端

项目目录结构介绍 项目创建完成之后,我们会看到bingo_web项目其实是一个文件夹,我们进入到文件夹内部就会发现一些目录和文件,我们简单回顾一下里面的部分核心目录与文件。 ├─node_modules/ # node的包目录,项目运行的依赖包…

【漏洞库】O2OA系统

O2OA invoke 后台远程命令执行漏洞 CNVD-2020-18740 漏洞描述 O2OA是一款开源免费的企业及团队办公平台,提供门户管理、流程管理、信息管理、数据管理四大平台,集工作汇报、项目协作、移动OA、文档分享、流程审批、数据协作等众多功能,满足企业各类管理和协作需求。 O2OA系…

LeetCode:2.两数相加

目录 题目:​编辑2. 两数相加 - 力扣(LeetCode) 分析问题: 官方的优秀代码博主的注释: 博主的辣眼代码,无注释,拉出来拷打自己: 每日表情包: 2. 两数相加 - 力扣&am…

面试经典150题——文本左右对齐(困难)

​"It always seems impossible until it’s done." - Nelson Mandela 1. 题目描述: 这个题目标为困难题目,但是如果我们静下心来把题目读懂了,其实无非就是不同情况下不同考虑而已,也没什么思维上的复杂,还…

Linux openKylin(开放麒麟)系统SSH服务安装配置与公网远程连接

文章目录 前言1. 安装SSH服务2. 本地SSH连接测试3. openKylin安装Cpolar4. 配置 SSH公网地址5. 公网远程SSH连接6. 固定SSH公网地址7. SSH固定地址连接8. 结语 前言 openKylin是中国首个基于Linux 的桌面操作系统开发者平台,通过开放操作系统源代码的方式&#xff…

C++:第十五讲高精度算法

每日C知识 system("color xx);是改变字体及背景颜色,前一个x代表一个数字,可以改变背景颜色,后一个x代表一个数字,可以改变字体颜色 ,但都是根据颜色表来的。 记住:要加头文件:#include&l…

手写分布式存储系统v0.3版本

引言 承接 手写分布式存储系统v0.2版本 ,今天开始新的迭代开发。主要实现 服务发现功能 一、什么是服务发现 由于咱们的服务是分布式的,那从服务管理的角度来看肯定是要有一个机制来知道具体都有哪些实例可以提供服务。举个例子就是,张三家…

DevOps落地笔记-07|案例分析:如何有效管理第三方组件

上一讲主要介绍了如何通过代码预检查的方式提高入库代码的质量,将代码检查尽可能前置,降低修复问题的成本,从而提高交付软件的质量。除了代码本身的问题,依赖组件也是经常困扰开发者的一个问题。比如,依赖组件的某个版…

认识Tomcat (一)

认识Tomcat (一) 一、服务器 1.1 服务器简介 ​ 硬件服务器的构成与一般的PC比较相似,但是服务器在稳定性、安全性、性能等方面都要求更高,因为CPU、芯片组、内存、磁盘系统、网络等硬件和普通PC有所不同。 ​ 软件服务器&…

深度学习(生成式模型)—— Consistency Models

文章目录 前言预备知识:SDE与ODEMethod实验结果 前言 Diffusion model需要多次推断才能生成最终的图像,这将耗费大量的计算资源。前几篇博客我们已经介绍了加速Diffusion model生成图像速率的DDIM和Stable Diffusion,本节将介绍最近大火的Co…

【Matplotlib】figure方法 你真的会了吗!?

🎈个人主页:甜美的江 🎉欢迎 👍点赞✍评论⭐收藏 🤗收录专栏:matplotlib 🤝希望本文对您有所裨益,如有不足之处,欢迎在评论区提出指正,让我们共同学习、交流进…

解放双手!ChatGPT助力编写JAVA框架!

摘要 本文介绍了使用 ChatGPT逐步创建 一个简单的Java框架,包括构思、交流、深入优化、逐步完善和性能测试等步骤。 亲爱的Javaer们,在平时编码的过程中,你是否曾想过编写一个Java框架去为开发提效?但是要么编写框架时感觉无从下…

Tauri:相比Electron,还有很长路要走的。

一、Tauri是什么 Tauri是一个开源的框架,用于构建跨平台的桌面应用程序。它允许开发者使用Web技术(如HTML、CSS和JavaScript)来构建高性能的本地应用程序,同时提供了访问底层操作系统功能的能力。 Tauri的设计目标是提供一种简单…

第97讲:MHA高可用集群模拟主库故障以及修复过程

文章目录 1.分析主库故障后哪一个从库会切换为主库2.模拟主库故障观察剩余从库的状态2.1.模拟主库故障2.3.当前主从架构 3.修复故障的主库3.1.修复主库3.2.当前主从架构3.3.恢复MHA 1.分析主库故障后哪一个从库会切换为主库 在模拟MHA高可用集群主库故障之前,我们先…

【JavaSE篇】——抽象类和接口

目录 🎓抽象类 🎈抽象类语法 🎈抽象类特性 🎈抽象类的作用 🎓接口 🎈语法规则 🎈接口特性 🎈接口使用(实现USB接口) 🎈实现多个接口 🎈…

力扣刷题之旅:进阶篇(一)

力扣(LeetCode)是一个在线编程平台,主要用于帮助程序员提升算法和数据结构方面的能力。以下是一些力扣上的入门题目,以及它们的解题代码。 --点击进入刷题地址 题目1:三数之和 题目描述: 给定一个包含n个…

代码随想录算法训练营第41天 | 343.整数拆分 + 96.不同的二叉搜索树

今日任务 343. 整数拆分 96.不同的二叉搜索树 343.整数拆分 - Medium 题目链接:力扣(LeetCode)官网 - 全球极客挚爱的技术成长平台 给定一个正整数 n ,将其拆分为 k 个 正整数 的和( k > 2 )&#xff0…