目录
- 前言
- DataFrame使用总结
- DataFrame的构建
- 方法1:通过列表构建
- 方法2:通过Row对象构建
- 方法3:通过表Schema构建
- 方法4:rdd结合字符串构建
- DataFrame的方法
PySpark实战笔记系列第五篇
- 10-用PySpark建立第一个Spark RDD(PySpark实战笔记系列第一篇)
- 11-pyspark的RDD的变换与动作算子总结(PySpark实战笔记系列第二篇))
- 12-pyspark的RDD算子注意事项总结(PySpark实战笔记系列第三篇)
- 13-pyspark的共享变量用法总结(PySpark实战笔记系列第四篇)
- 14-pyspark的DataFrame使用总结(PySpark实战笔记系列第五篇)
前言
在Spark中,除了RDD这种数据容器外,另一种一种更容易操作的一个分布式数据容器DataFrame,它更像传统关系型数据库的二维表,除了包括数据自身以外,还包括数据的结构信息(Schema),可以利用类似SQL的语言来进行数据访问。
DataFrame可以从多种数据来源上进行构建,比如结构化数据文件、Hive中的表、外部数据库或现有RDD。
DataFrame使用总结
DataFrame的构建
方法1:通过列表构建
列表的元素是元组,这个数据结构可以代表一种二维数据。然后利用spark.createDataFrame()方法来构建,示例如下:
import findspark
findspark.init()
#############################################
from pyspark.sql
import SparkSession
spark = SparkSession.builder \.master("local[2]") \.appName("DataFrameDemo") \.getOrCreate();
############################################
a = [('Jack', 32),('Smith', 33)]
df = spark.createDataFrame(a)
#[Row(_1='Jack', _2=32), Row(_1='Smith', _2=33)]
print(df.collect())
df.show()15
# +-----+---+
# | _1| _2|
# +-----+---+
# | Jack| 32|
# |Smith| 33|
# +-----+---+# 指定列名
df2 = spark.createDataFrame(a, ['name', 'age'])
#[Row(name='Jack', age=32), Row(name='Smith', age=33)]
print(df2.collect())
df2.show()
# +-----+---+
# | name|age|
# +-----+---+
# | Jack| 32|
# |Smith| 33|3
# +-----+---+
方法2:通过Row对象构建
到DataFrame对象是由Row这个数据结构构成的,因此也可以**用Row,然后利用spark.createDataFrame() 方法 **来创建DataFrame对象。示例如下:
# 通用的开头
# ......
#################################################
from pyspark.sql import Row a = [('Jk', 32),('Sm', 33)]
rdd = sc.parallelize(a)
# 创建包含列名的Row
RMes= Row('name', 'age')
# rdd对象的元素进行映射,转换成一个RMes对象,并返回一个新RDD对象
rmes = rdd.map(lambda r: RMes(*r))
df = spark.createDataFrame(rmes)
# [Row(name='Jk', age=32), Row(name='Sm', age=33)]
print(df.collect())
df.show()
# +-----+---+
# | name|age|
# +-----+---+
# | Jk | 32|
# |Sm | 33|
# +-----+---+
方法3:通过表Schema构建
上述两个方法都没能给定每个字段的类型,比如列名name是字符串类型,而列名age是数值类型。而通过用StructType方法创建了一个表Schema则可以实现,类似定义数据库中的表结构。再利用spark.createDataFrame()方法来创建DataFrame对象。示例如下:
# 通用的开头
# ......
#################################################
from pyspark.sql.types import *a = [('Jk', 32),('Sm', 33)]
rdd = sc.parallelize(a)
# 用StructType方法创建了一个表Schema
schema = StructType([StructField("name", StringType(), True),StructField("age", IntegerType(), True)])
# 创建DataFrame
df = spark.createDataFrame(rdd, schema)
# Row(name='Jk', age=32), Row(name='Sm', age=33)]
print(df.collect())
df.show()
# +-----+---+
# | name|age|
# +-----+---+
# | Jk | 32|
# |Sm | 33|
# +-----+---+
df.printSchema()
# root
# |-- name: string (nullable = true)
# |-- age: integer (nullable = true)
方法4:rdd结合字符串构建
借助StructType方法可以创建类型化的DataFrame对象,但是操作起来有点繁琐。下面示例一个简单一点的方法,同样可以创建具备字段类型的DataFrame对象。
# 通用的开头
# ......
#################################################
a = [('Jk', 32),('Sm', 33)]
rdd = sc.parallelize(a)
# 创建DataFrame:使用一个字符串对表结构中的字段类型进行定义
df = spark.createDataFrame(rdd, "name:string, age:int")
# Row(name='Jk', age=32), Row(name='Sm', age=33)]
print(df.collect())
df.show()
# +-----+---+
# | name|age|
# +-----+---+
# | Jk | 32|
# |Sm | 33|
# +-----+---+
df.printSchema()
# root
# |-- name: string (nullable = true)
# |-- age: integer (nullable = true)
DataFrame的方法
方法名 | 使用形式 | 参数说明 | 作用 | 示例 |
---|---|---|---|---|
show | df.show() | 用表格的方式对数据进行打印 | ||
printSchema | df.printSchema() | 打印df对象的Schema结构定义 | ||
createOrReplaceTempView | df.createOrReplaceTempView(viewName) | viewName:string,指定创建的表名 | 将DataFrame映射为一个数据库表 | |
select | df.select(colNames) | colNames:string,需要读取的列名称 | 读取指定列的信息 | |
selectExpr | df.selectExpr(*colMes) | 对列进行计算 | ||
agg | df.agg(dictMes) | dictMes:dict,其中key为列名,value为聚合的函数名 | 对DataFrame的指定列按照指定的方式进行聚合 | |
describe | df.describe(listMes) | listMes:默认None,描述统计的列名称 | 对指定字段进行描述统计 | |
summary | df.summary() | 给出DataFrame对象的概览统计结果 | ||
filter | df.filter(filterCondtion) | filterCondtion:指定的过滤条件逻辑关系式 | 对数据进行过滤 | |
join | df.join(otherDF,join_key,how=“inner”) | 第三个参数默认为inner,其他选项为:inner,cross,outer,full,full_outer,left,left_outer,right,right_outer,left_semi,left_anti | 多个DataFrame进行关联 | |
distinct | df.distinct() | 对DataFrame对象中的数据进行去重操作 | ||
drop | df.drop(*col_names) | col_names:需要删除的列名称 | 删除某些不需要的数据列信息(从列上移除) | |
exceptAll | df.exceptAll(otherDF) | 从一个DataFrame的数据中移除掉另外一个DataFrame中的数据(从行上移除) | ||
intersectAll | df.intersectAll(otherDF) | 求两个DataFrame的交集 | df1.intersectAll(df2).sort(“C1”,“C2”).show() | |
na.fill | df.na.fill(dictMes) | dictMes:dict,指定列空值的替换方式 | 对空值进行替换操作 | |
toJSON | df.toJSON() | 将DataFrame转成JSON格式 | ||
withColumn | df.withColumn(colName,df) | 增加计算列 | ||
withColumnRenamed | df.withColumnRenamed(old,new) | 修改列名 | 同上 | |
write.csv | df.write.csv(csvfilepath,mode) | df.write.csv函数给定的参数是数据存储的路径,而不是文件名。 | 将DataFrame数据以csv格式进行存储 |
参考文档:
- https://spark.apache.org/docs/latest/api/python/reference/pyspark.html
- 《Python大数据处理库PySpark实战》
博主写博文就是方便对自己所学所做的事做一备份记录或回顾总结。欢迎留言,沟通学习。
刚开始接触,请多指教,欢迎留言交流!