目录
一 . SparkSQL简介
二 . Spark SQL与HIVE的异同
三 . DataFrame
1. 创建 DataFrame
2. RDD转换DataFrame
四 . 操作DataFrame
SQL方式:
DSL方式:
一 . SparkSQL简介
Spark SQL只能处理结构化数据 ,属于Spark框架一个部分 Schema:元数据信息
特点: 融合性 ,统一数据访问,hive兼容 , 标准化连接
将hive sql翻译成Spark上对应的RDD操作 ,底层运行SparkRDD
DataFrames是在RDD上面增加与省略了一些东西
DataFrame = RDD -泛型 +Schema +方便到的SQL操作 + 优化 ,是个特殊的RDD
RDD存储任意结构数据 ; DataFrame存储二维表结构数据
二 . Spark SQL与HIVE的异同
1- Spark SQL是基于内存计算, 而HIVE SQL是基于磁盘进行计算的
2- Spark SQL没有元数据管理服务(自己维护), 而HIVE SQL是有metastore的元数据管理服务的
3- Spark SQL底层执行Spark RDD程序, 而HIVE SQL底层执行是MapReduce
4- Spark SQL可以编写SQL也可以编写代码,但是HIVE SQL仅能编写SQL语句
三 . DataFrame
DataFrame表示的是一个二维的表。二维表,必然存在行、列等表结构描述信息
表结构描述信息(元数据Schema): StructType对象
字段: StructField对象,可以描述字段名称、字段数据类型、是否可以为空
行: Row对象
列: Column对象,包含字段名称和字段值
在一个StructType对象下,由多个StructField组成,构建成一个完整的元数据信息
1. 创建 DataFrame
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':spark = SparkSession.builder.appName('创建DataFrame')\.master('local[*]').getOrCreate()init_df = spark.createDataFrame(data=[(1,'张三',18),(2,'李四',40),(3,'王五',60)],schema='id:int,name:string,age:int')init_df2 = spark.createDataFrame(data=[(1, '张三', 18), (2, '李四', 30),(3,'王五',60)],schema=["id","name","age"])init_df.show()'''+---+----+---+| id|name|age|+---+----+---+| 1|张三| 18|| 2|李四| 30|+---+----+---+'''init_df2.show()init_df.printSchema()'''root|-- id: integer (nullable = true)|-- name: string (nullable = true)|-- age: integer (nullable = true)'''init_df2.printSchema()'''root|-- id: long (nullable = true)|-- name: string (nullable = true)|-- age: long (nullable = true)'''spark.stop()
2. RDD转换DataFrame
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructFieldos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.appName('rdd_2_dataframe')\.master('local[*]')\.getOrCreate()# 通过SparkSession得到SparkContextsc = spark.sparkContext# 2- 数据输入# 2.1- 创建一个RDDinit_rdd = sc.parallelize(["1,李白,20","2,安其拉,18"])# 2.2- 将RDD的数据结构转换成二维结构new_rdd = init_rdd.map(lambda line: (int(line.split(",")[0]),line.split(",")[1],int(line.split(",")[2])))# 将RDD转成DataFrame:方式一# schema方式一schema = StructType()\.add('id',IntegerType(),False)\.add('name',StringType(),False)\.add('age',IntegerType(),False)# schema方式二schema = StructType([StructField('id',IntegerType(),False),StructField('name',StringType(),False),StructField('age',IntegerType(),False)])# schema方式三schema = "id:int,name:string,age:int"# schema方式四schema = ["id","name","age"]init_df = spark.createDataFrame(data=new_rdd,schema=schema)# 将RDD转成DataFrame:方式二"""toDF:中的schema既可以传List,也可以传字符串形式的schema信息"""# init_df = new_rdd.toDF(schema=["id","name","age"])init_df = new_rdd.toDF(schema="id:int,name:string,age:int")# 3- 数据处理# 4- 数据输出init_df.show()init_df.printSchema()# 5- 释放资源sc.stop()spark.stop()
四 . 操作DataFrame
SQL方式:
df.createTempView('视图名称'): 创建一个临时的视图(表名)
df.createOrReplaceTempView('视图名称'): 创建一个临时的视图(表名),如果视图存在,直接替换
临时视图,仅能在当前这个Spark Session的会话中使用
df.createGlobalTempView('视图名称'): 创建一个全局视图,运行在一个Spark应用中多个spark会话中都可以使用。在使用的时候必须通过 global_temp.视图名称 方式才可以加载到。较少使用
DSL方式:
-
show():用于展示DF中数据, 默认仅展示前20行
-
参数1:设置默认展示多少行 默认为20
-
参数2:是否为阶段列, 默认仅展示前20个字符数据, 如果过长, 不展示(一般不设置)
-
-
printSchema():用于打印当前这个DF的表结构信息
-
select():类似于SQL中select, SQL中select后面可以写什么, 这样同样也一样
-
filter()和 where():用于对数据进行过滤操作, 一般在spark SQL中主要使用where
-
groupBy():用于执行分组操作
-
orderBy():用于执行排序操作