Saprk SQL基础知识

一.Spark SQL基本介绍

1.什么是Spark SQL

Spark SQL是Spark多种组件中其中一个,主要是用于处理大规模的[结构化数据]

Spark SQL的特点:

1).融合性:既可以使用SQL语句,也可以编写代码,同时支持两者混合使用.

2).统一的数据访问:Spark SQL用统一的API对接不同的数据源

3).Hive的兼容性:Spark SQL可以和Hive进行整合,合并后将执行引擎换成Spark,核心是基于hive的metastore来处理.

4).标准化连接:Spark SQL支持JDBC/ODBC连接

2.Spark SQL和Hive的异同点

相同点:

①都是分布式SQL计算引擎

②都可以处理大规模结构化数据

③都可以建立在Yarn集群上运行

不同点:

①Spark SQL的底层是RDD,Hive SQL的底层是MapReduce

②Spark SQL既可以编写SQL语句,又可以编写代码,而Hive SQL只可以编写SQL语句

③Spark SQL没有元数据管理服务,而Hive SQL有metastore管理元数据服务

④Spark SQL是基于内存运行的,Hive SQL是基于磁盘运行的

3.Spark SQL的数据结构对比

说明:

pandas的DataFrame:二维表 处理单机结构数据

Spark Core:处理任何的数据结构,处理大规模的分布式数据

Spark SQL:二维表,处理大规模的分布式结构数据 

 

RDD:存储直接就是对象,比如在图中,存储就是一个Person的对象,但是里面是什么数据内容,不太清楚.

DataFrame:将Person中各个字段数据,进行结构化存储,形成一个DataFrame,可以直接看到数据

Dataset:将Person对象中数据都按照结构化的方式存储好,同时保留对象的类型,从而知道来源于一个Person对象

由于Python不支持泛型,所以无法使用Dataset类型,客户端仅支持DataFrame类型 

二.DataFrame详解

1.DataFrame基本介绍

 

DataFrame表示的是一个二维的表,二维表,必然存在行,列等表结构描述信息.

表结构描述信息(元数据Schema) :StructType对象

字段:StructField对象,可以描述字段名称,字段数据类型,是否可以为空

行:Row对象

列:Column对象,包含字段名称和字段值

在一个StructType对象下,由多个StructField组成,构建成一个完整的元数据信息

2.DataFrame的构建方式

2.1 通过RDD得到一个DataFrame

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructFieldos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.appName('rdd_2_dataframe')\.master('local[*]')\.getOrCreate()# 通过SparkSession得到SparkContextsc = spark.sparkContext# 2- 数据输入# 2.1- 创建一个RDDinit_rdd = sc.parallelize(["1,李白,20","2,安其拉,18"])# 2.2- 将RDD的数据结构转换成二维结构new_rdd = init_rdd.map(lambda line: (int(line.split(",")[0]),line.split(",")[1],int(line.split(",")[2])))# 将RDD转成DataFrame:方式一# schema方式一schema = StructType()\.add('id',IntegerType(),False)\.add('name',StringType(),False)\.add('age',IntegerType(),False)# schema方式二schema = StructType([StructField('id',IntegerType(),False),StructField('name',StringType(),False),StructField('age',IntegerType(),False)])# schema方式三schema = "id:int,name:string,age:int"# schema方式四schema = ["id","name","age"]init_df = spark.createDataFrame(data=new_rdd,schema=schema)# 将RDD转成DataFrame:方式二"""toDF:中的schema既可以传List,也可以传字符串形式的schema信息"""# init_df = new_rdd.toDF(schema=["id","name","age"])init_df = new_rdd.toDF(schema="id:int,name:string,age:int")# 3- 数据处理# 4- 数据输出init_df.show()init_df.printSchema()# 5- 释放资源sc.stop()spark.stop()

场景:RDD可以存储任意结构的数据;而DataFrame只能处理二维表数据。在使用Spark处理数据的初期,可能输入进来的数据是半结构化或者是非结构化的数据,那么我可以先通过RDD对数据进行ETL处理成结构化数据,再使用开发效率高的SparkSQL来对后续数据进行处理分析。

2.2 内部初始化数据得到DataFrame

from pyspark import SparkConf, SparkContext
import os# 绑定指定的Python解释器
from pyspark.sql import SparkSessionos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("内部初始化数据得到DataFrame。类似SparkCore中的parallelize")# 1- 创建SparkSession顶级对象spark = SparkSession.builder\.appName('inner_create_dataframe')\.master('local[*]')\.getOrCreate()# 2- 数据输入"""通过createDataFrame创建DataFrame,schema数据类型可以是:DataType、字符串、List字符串:格式要求格式一 字段1 字段类型,字段2 字段类型格式二(推荐) 字段1:字段类型,字段2:字段类型List:格式要求["字段1","字段2"]"""# 内部初始化数据得到DataFrameinit_df = spark.createDataFrame(data=[(1,'张三',18),(2,'李四',30)],schema="id:int,name:string,age:int")# init_df = spark.createDataFrame(#     data=[(1, '张三', 18), (2, '李四', 30)],#     schema="id int,name string,age int"# )# init_df = spark.createDataFrame(#     data=[(1, '张三', 18), (2, '李四', 30)],#     schema=["id","name","age"]# )# init_df = spark.createDataFrame(#     data=[(1, '张三', 18), (2, '李四', 30)],#     schema=["id:int", "name:string", "age:int"]# )# 3- 数据处理# 4- 数据输出# 输出dataframe的数据内容init_df.show()# 输出dataframe的schema信息init_df.printSchema()# 5- 释放资源spark.stop()

场景:一般用在开发和测试中,因为只能处理少量的数据

Schema总结

通过createDataFrame创建DataFrame,schema数据类型可以是:DataType,字符串,List

1:字符串

格式一 字段1 字段类型,字段2 字段类型

格式二 字段1:字段类型,字段2:字段类型

2:List

["字段1","字段2"]

3:DataType

格式一 schema = StructType().add('id',IntegerType(),False)

.add('id',IntegerType(),False).add('id',IntegerType(),False)

格式二 schema = StructType([StructField('id',IntegerType,False),

StructField('id',IntegerType,False),

StructField('id',IntegerType,False)])

 2.3 读取外部文件

复杂API

统一API格式:

sparksession.read

.format('text|csv|json|parquet|orc|avro|jdbc|...')

.option('k','v')

.schema(StructType | String)

.load('加载数据路径') #读取外部文件的路径,支持HDFS也支持本地

简写API

 请注意: 以上所有的外部读取方式,都有简单的写法。spark内置了一些常用的读取方案的简写

格式:spark.read.读取方式()

例如:

df = spark.read.csv(

path ='file:///export/data/_03_spark_sql/data/stu.txt',header=True,sep=' ',inferSchema=True,encoding='utf-8')

2.3.1 Text方式读取

 

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("text方式读取文件")

    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .appName('text_demo')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    """
        load:支持读取HDFS文件系统和本地文件系统
            HDFS文件系统:hdfs://node1:8020/文件路径
            本地文件系统:file:///文件路径
            
        text方式读取文件总结:
            1- 不管文件中内容是什么样的,text会将所有内容全部放到一个列中处理
            2- 默认生成的列名叫value,数据类型string
            3- 我们只能够在schema中修改字段value的名称,其他任何内容不能修改
    """
    init_df = spark.read\
        .format('text')\
        .schema("my_field string")\
        .load('file:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt')

    # 3- 数据处理
    # 4- 数据输出
    init_df.show()
    init_df.printSchema()

    # 5- 释放资源
    spark.stop()
 

text方式读取文件总结:

1-不管文件中内容是什么样的,text会将所有内容全部放到一个列中处理

2-默认生成的列名叫value,数据类型string

3-我们只能够在schema中修改字段value的名称,其他任何内容不能修改 

2.3.2 CSV方式读取

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("csv方式读取文件")# 1- 创建SparkSession对象spark = SparkSession.builder\.appName('csv_demo')\.master('local[*]')\.getOrCreate()# 2- 数据输入"""csv格式读取外部文件总结:1- 复杂API和简写API都必须掌握2- 相关参数作用说明:2.1- path:指定读取的文件路径。支持HDFS和本地文件路径2.2- schema:手动指定元数据信息2.3- sep:指定字段间的分隔符2.4- encoding:指定文件的编码方式2.5- header:指定文件中的第一行是否是字段名称2.6- inferSchema:根据数据内容自动推断数据类型。但是,推断结果可能不精确"""# 复杂API写法init_df = spark.read\.format('csv')\.schema("id int,name string,address string,sex string,age int")\.option("sep"," ")\.option("encoding","UTF-8")\.option("header","True")\.load('file:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt')# 简写API写法# init_df = spark.read.csv(#     path='file:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt',#     schema="id int,name string,address string,sex string,age int",#     sep=' ',#     encoding='UTF-8',#     header="True"# )# init_df = spark.read.csv(#     path='file:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt',#     sep=' ',#     encoding='UTF-8',#     header="True",#     inferSchema=True# )# 3- 数据处理# 4- 数据输出init_df.show()init_df.printSchema()# 5- 释放资源spark.stop()

csv格式读取外部文件总结:

1-相关参数说明:

1.1 path:文件路径,HDFS和本地

1.2 schema:手动指定元数据信息

1.3 sep:指定字段间的分隔符

1.4 encoding:指定文件的编码方式

1.5 header:指定文件中的第一行是否是字段名称

1.6 inferSchema:根据数据内容自动推断数据类型,但是推断结果可能不精确 

 2.3.3 JSON方式读取

json的数据内容

{'id': 1,'name': '张三','age': 20}
{'id': 2,'name': '李四','age': 23,'address': '北京'}
{'id': 3,'name': '王五','age': 25}
{'id': 4,'name': '赵六','age': 29}

代码实现:

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.appName('json_demo')\.master('local[*]')\.getOrCreate()# 2- 数据输入"""json读取数据总结:1- 需要手动指定schema信息。如果手动指定的时候,字段名称与json中的key名称不一致,会解析不成功,以null值填充2- csv/json中schema的结构,如果是字符串类型,那么字段名称和字段数据类型间,只能以空格分隔"""# init_df = spark.read.json(#     path='file:///export/data/gz16_pyspark/02_spark_sql/data/data.txt',#     schema="id2 int,name string,age int,address string",#     encoding='UTF-8'# )# init_df = spark.read.json(#     path='file:///export/data/gz16_pyspark/02_spark_sql/data/data.txt',#     schema="id:int,name:string,age:int,address:string",#     encoding='UTF-8'# )init_df = spark.read.json(path='file:///export/data/gz16_pyspark/02_spark_sql/data/data.txt',schema="id int,name string,age int,address string",encoding='UTF-8')# 3- 数据输出init_df.show()init_df.printSchema()# 4- 释放资源spark.stop()

 json读取数据总结:

1-需要手动指定schema信息,如果手动指定的时候,字段名称与json中的key名称不一致,会解析不成功,以null值填充

2-csv/json中schema的结构,如果是字符串类型,那么字段名称和字段数据类型间,只能以空格分隔

3.DataFrame的相关API

操作DataFrame一般有两种操作方案:一种为DSL方式,另一种为SQL方式

 

SQL方式: 通过编写SQL语句完成统计分析操作
DSL方式: 特定领域语言,使用DataFrame特有的API完成计算操作,也就是代码形式

从使用角度来说: SQL可能更加的方便一些,当适应了DSL写法后,你会发现DSL要比SQL更好用
从Spark角度来说: 更推荐使用DSL方案,此种方案更加利于Spark底层的优化处理

3.1 SQL相关的API

创建一个视图/表

 

df.createTempView('视图名称'): 创建一个临时的视图(表名)
df.createOrReplaceTempView('视图名称'): 创建一个临时的视图(表名),如果视图存在,直接替换
临时视图,仅能在当前这个Spark Session的会话中使用


df.createGlobalTempView('视图名称'): 创建一个全局视图,运行在一个Spark应用中多个spark会话中都可以使用。在使用的时候必须通过 global_temp.视图名称 方式才可以加载到。较少使用

执行SQL语句

spark.sql('书写SQL') 

3.2 DSL相关的API

show():用于展示DF中数据,默认仅展示前20行

参数1:设置默认展示多少行,默认为20

参数2:是否为阶段列,默认仅展示前20个字符数据,如果过长,不展示

printSchema():用于打印当前这个DF的表结构信息

select():类似于SQL中select, SQL中select后面可以写什么, 这样同样也一样

  • filter()和 where():用于对数据进行过滤操作, 一般在spark SQL中主要使用where

  • groupBy():用于执行分组操作

  • orderBy():用于执行排序操作

DSL主要支持以下几种传递的方式:  str | Column对象 | 列表
    str格式:  '字段'
    Column对象:  
        DataFrame含有的字段  df['字段']
        执行过程新产生:  F.col('字段')
    列表: 
        ['字段1','字段2'...]
        [df['字段1'],df['字段2']]

 为了能够支持在编写Spark SQL的DSL时候,在DSL中使用SQL函数,专门提供一个SQL的函数库。直接加载使用即可

导入这个函数库: import pyspark.sql.functions as F
通过F调用对应的函数即可。SparkSQL中所支持的函数,都可以通过以下地址查询到: 
https://spark.apache.org/docs/3.1.2/api/sql/index.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/606015.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

超好用的阅读器更新摸鱼模式啦

潮汐阅读器重磅更新啦!这次更新的是隐蔽模式(摸鱼模式)。 何为隐蔽模式?就是将阅读窗口的标题栏隐藏掉,从而可以使阅读窗口可以隐藏在任意其他窗口里面不被发现,从而可以快乐的看小说摸鱼啦! …

18.将文件上传至云服务器 + 优化网站的性能

目录 1.将文件上传至云服务器 1.1 处理上传头像逻辑 1.1.1 客户端上传 1.1.2 服务器直传 2.优化网站的性能 2.1 本地缓存优化查询方法 2.2 压力测试 1.将文件上传至云服务器 客户端上传:客户端将数据提交给云服务器,并等待其响应;用户…

软件工程造价师证书有用吗?难不难考?

🎯软件工程造价师证书是有用的,它证明了持有人具备评估和估算软件开发cheng本、进度和资源规划的能力。✔️在IT行业中,受高度重视,特别是在软件开发和项目管理领域。 👩软件工程造价师考试难易程度因人而异。该证书需…

影响代理IP稳定性的因素有哪些?

代理IP作为一种网络服务,在生活中扮演着各种各样的角色。它们可以用于保护隐私、突破访问限制、提高网络安全性等。代理IP的稳定性受到多种因素的影响,下面和大家探讨一下影响代理IP稳定性的因素。 1、网络环境:代理IP所处的网络环境对它的稳…

腾讯云新用户的定义与权益

腾讯云作为国内领先的云计算服务提供商,吸引了越来越多的用户。对于新用户来说,了解腾讯云的新用户定义和相关权益非常重要,因为它关系到用户能否享受到更多的优惠和服务。 一、腾讯云新用户的定义 腾讯云新用户是指首次注册腾讯云账号并且没…

【数据库】mysql事务

一、事务的基本概念 1、事务的定义 事务可由一条非常简单的SQL语句组成,也可以由一组复杂的SQL语句组成。。 在 MySQL 中只有使用了 Innodb 数据库引擎的数据库或表才支持事务。事务处理可以用来维护数据库的完整性,保证成批的 SQL 语句要么全部执行&…

Java技术专题:「入门到精通系列」深入探索常用的六种加密技术和实现

文章目录 1. 引言2. 对称加密3. 非对称加密4. 哈希算法5. 消息摘要6. 数字签名7. 数字证书8. 拓展功能与未来展望 🎉欢迎来到Java学习路线专栏~探索Java中的静态变量与实例变量 ☆* o(≧▽≦)o *☆嗨~我是IT陈寒🍹✨博客主页:IT陈寒的博客&am…

产品经理如何选择城市?

年底,全国性的人口大迁徙即将开始。选择城市,堪称年轻人的“二次投胎”,族望留原籍,家贫走他乡。 古人在选择城市时,主要的考量因素是家族势力,这一点放在当代,大致也成立,如果在老…

如何在CentOS安装SQL Server数据库并通过内网穿透工具实现公网访问

文章目录 前言1. 安装sql server2. 局域网测试连接3. 安装cpolar内网穿透4. 将sqlserver映射到公网5. 公网远程连接6.固定连接公网地址7.使用固定公网地址连接 前言 简单几步实现在Linux centos环境下安装部署sql server数据库,并结合cpolar内网穿透工具&#xff0…

C# 反射的终点:Type,MethodInfo,PropertyInfo,ParameterInfo,Summry

文章目录 前言反射是什么?常用类型操作SummryPropertyInfoMethodInfo无参函数运行 有参函数运行,获取paramterInfo 总结 前言 我之前写了一篇Attribute特性的介绍,成功拿到了Attribute的属性,但是如果把Attribute玩的溜,那就要彻…

什么是企业数字化转型?数字化的价值体现在哪里?

从2015年接触平安的数字化转型,到2021年承接阿里云的服务数字化项目,再到2023年主导大大小小10来个数字化项目,8年的时间,数字化对我而言已经从一个“新词”变成了一个“旧词”。 8年过去,数字化也从一道企业的“选做…

迎接人工智能的下一个时代:ChatGPT的技术实现原理、行业实践以及商业变现途径

课程背景 2023年,以ChatGPT为代表的接近人类水平的对话机器人,AIGC不断刷爆网络,其强大的内容生成能力给人们带来了巨大的震撼。学术界和产业界也都形成共识:AIGC绝非昙花一现,其底层技术和产业生态已经形成了新的格局…

[Vulnhub靶机] DriftingBlues: 2

[Vulnhub靶机] DriftingBlues: 2靶机渗透思路及方法(个人分享) 靶机下载地址: https://download.vulnhub.com/driftingblues/driftingblues2.ova 靶机地址:192.168.67.21 攻击机地址:192.168.67.3 一、信息收集 1.…

led手电筒照明线性恒流驱动芯片推荐:SM2123EGL双通道可调光

LED手电筒照明线性恒流驱动芯片是一种专门用于LED手电筒的照明系统的关键组件。它采用了线性恒流驱动技术,可以确保LED手电筒在不同电池电压和温度变化下,保持恒定的亮度输出,提高了LED手电筒的稳定性和可靠性。 LED手电筒照明线性恒流驱动芯…

VScode右键没有go to definition选项

1. 背景 1.1. 项目代码在远程服务器上; 1.2. win重装系统,重新安装vscode出现问题,没重装系统之前是没问题的; 2. 问题 打开vscode,通过ssh链接远程服务器中的项目代码后,选中函数右键没有go to defini…

大连理工大学软件学院2022年秋季学期《矩阵与数值分析》上机作业

文章目录 《计算机科学计算》第二版162页第12题(1)162页第16题216页第12题 《数值分析方法与应用》一、基础知识部分1、5、 二、线性方程组求解2、6、 三、非线性方程组求解1、4、 四、插值与逼近1、5、7、 五、数值积分2、 六、微分方程数值解法1、 《计…

机房自动化监控手把手分享给你 - 番外1:声光报警实现

本文章是一个机房自动化监控实际项目系列文章的番外篇,有个朋友问能否补充一个声光报警的实现,我仔细一想:虽然我不在这个项目中实现声光报警,但我在其他项目用过,使用的设备器件成本很低。那就以这个项目为背景&#…

视频转为序列图的软件,让视频批量转为序列图

你是否曾经遇到过这样的困境:需要将一段视频转为一系列的图片,但却没有合适的工具来完成?或许你曾经手动截图,或者用其他方式,但结果往往不尽如人意,图片质量差、色彩失真、画面不清晰。现在,让…

C语言动态内存管理

我们目前知道的开辟内存空间的方法有: 1.创建变量 2.创建数组; 但是这2种方法开辟的空间大小都是固定的,如果是数组的话确认了大小之后是无法改变的; int a10;//在栈区空间上开辟4个字节的空间;int arr[10];//在栈…

C++ 模板进阶

目录 一、非类型模板参数 二、模板的特化 1、函数模板特化 2、类模板特化 全特化 偏特化 3、例题 三、模板分离编译 1、定义 2、解决方法 3、模板总结 一、非类型模板参数 模板参数分类类型形参与非类型形参。 类型形参即:出现在模板参数列表中&#xf…