目录
- 一、浅语
- 二、三种数据处理方式比较
- 2.1 RDD
- 2.2 DataFrame
- 2.3 Spark SQL
- 三、三种方法的创建方式
- 3.1 创建RDD
- 3.2 创建DataFrame
- 3.2.1 创建sqlContext
- 3.2.2 定义Schema
- 3.2.3 创建DataFrame
- 3.3 创建SparkSQL
- 3.3.1 登录临时表
- 3.3.2 使用sparkSQL
- 四、三种方法显示部分字段
- 4.1 使用RDD选取显示部分字段
- 4.2 使用DataFrame选取显示字段
- 4.2.1 select方法输入[字段名]
- 4.2.2 select方法输入[dataframe名称].[字段名]
- 4.2.3 select方法输入[dataframe别名].[字段名]
- 4.2.4 通过dataframe中括号方式
- 4.3 使用SparkSQL选取显示字段
- 五、三种方法增加计算字段
- 5.1 RDD增加计算字段
- 5.2 DataFrame增加计算字段
- 5.3 SparkSQL增加计算字段
- 六、三种方法筛选数据
- 6.1 RDD筛选数据
- 6.2 DataFrame筛选数据
- 6.2.1 使用多个filter
- 6.2.2 使用单个filter
- 6.2.3 使用[dataframe 名称].[字段名]指定条件
- 6.2.4 使用[]指定筛选条件
- 6.3 SparkSQL筛选数据
- 七、三种方法按单个字段给数据排序
- 7.1 RDD:takeOrdered方法
- 7.2 DataFrame
- 7.3 SQparkSQL
- 八、三种方法按多个字段排序
- 8.1 RDD
- 8.2 DataFrame
- 8.3 SparkSQL
- 九、三种方法显示不重复的数据
- 9.1 RDD
- 9.2 DataFrame
- 9.3 SparkSQL
- 十、三种方法分组统计数据
- 10.1 RDD:map/reduce
- 10.2 DataFrame
- 10.2.1 crosstab:长表变宽表
- 10.3 SparkSQL
- 参考资料
一、浅语
上一篇(《pyspark RDD相关常用函数使用案例》) 对pyspark的一些常用函数做了梳理,这篇主要是针对RDD、DataFrame、SparkSql三种实现同一功能需要的方式做一梳理,通过实际动手,体会不同方式在数据处理过程中的差异性、便利性。
import findspark
findspark.init()
from pyspark.sql import SparkSession# 创建 Spark 会话
spark = SparkSession.builder \.appName("Test PySpark") \.master("local[*]") \.getOrCreate()
sc=spark.sparkContext
sc.master
'local[*]'
二、三种数据处理方式比较
2.1 RDD
- rdd的数据</font color-“green”>只能使用位置来指定每一个字段。
- rdd功能最强,能完成所有spark功能。
2.2 DataFrame
- Spark DataFrame被创建时</font color=“green”>必须定义Schema,定义每个字段名和数据类型。
- 定义了很多类似SQL的方法
- 比起RDD更容易使用
2.3 Spark SQL
- 有DataFrame派生出来,所以</font color=“green”>必须先创建DataFrame,进而转化使用。
- 最简单
三、三种方法的创建方式
3.1 创建RDD
# 读取本地数据文件
mRDD = sc.textFile("test.txt")
print('查看数据的行数:',mRDD.count())
# 按照空格分隔数据字段
sRDD = mRDD.map(lambda x:x.split(' '))
print('查看分隔后的结果:',sRDD.collect())
查看数据的行数: 8
查看分隔后的结果: [['yellow', '1', 'F'], ['blue', '2', 'M'], ['yellow', '3', 'F'], ['black', '4', 'F'], ['red', '5', 'M'], ['red', '5', 'M'], ['blue', '3', 'M'], ['blue', '7', 'M']]
3.2 创建DataFrame
3.2.1 创建sqlContext
sqlContext = SparkSession.builder.getOrCreate()
3.2.2 定义Schema
from pyspark.sql import Row
sRows = sRDD.map(lambda x:Row(color=x[0],num = int(x[1]),sex = x[2]))
# 查看schema
sRows.collect()
[Row(color='yellow', num=1, sex='F'),Row(color='blue', num=2, sex='M'),Row(color='yellow', num=3, sex='F'),Row(color='black', num=4, sex='F'),Row(color='red', num=5, sex='M'),Row(color='red', num=5, sex='M'),Row(color='blue', num=3, sex='M'),Row(color='blue', num=7, sex='M')]
3.2.3 创建DataFrame
# 创建DataFrame¶
df = sqlContext.createDataFrame(sRows)
# 使用.printSchema()查看DataFrame的schema
df.printSchema()
root|-- color: string (nullable = true)|-- num: long (nullable = true)|-- sex: string (nullable = true)
# 查看数据
df.show()
+------+---+---+
| color|num|sex|
+------+---+---+
|yellow| 1| F|
| blue| 2| M|
|yellow| 3| F|
| black| 4| F|
| red| 5| M|
| red| 5| M|
| blue| 3| M|
| blue| 7| M|
+------+---+---+
### 为DataFrame创建别名
dataf = df.alias('dataf')
dataf.show()
+------+---+---+
| color|num|sex|
+------+---+---+
|yellow| 1| F|
| blue| 2| M|
|yellow| 3| F|
| black| 4| F|
| red| 5| M|
| red| 5| M|
| blue| 3| M|
| blue| 7| M|
+------+---+---+
3.3 创建SparkSQL
3.3.1 登录临时表
dataf.registerTempTable('temp_tb')
D:\bigdataenv\spark-3.5.0-bin-hadoop3\python\pyspark\sql\dataframe.py:329: FutureWarning: Deprecated in 2.0, use createOrReplaceTempView instead.warnings.warn("Deprecated in 2.0, use createOrReplaceTempView instead.", FutureWarning)
3.3.2 使用sparkSQL
查看数据使用show()方法,默认显示前20行数据
sqlContext.sql('select * from temp_tb').show()
+------+---+---+
| color|num|sex|
+------+---+---+
|yellow| 1| F|
| blue| 2| M|
|yellow| 3| F|
| black| 4| F|
| red| 5| M|
| red| 5| M|
| blue| 3| M|
| blue| 7| M|
+------+---+---+
四、三种方法显示部分字段
4.1 使用RDD选取显示部分字段
rdd = sRDD.map(lambda x:(x[0],x[2],x[1]))
rdd.take(2)
[('yellow', 'F', '1'), ('blue', 'M', '2')]
4.2 使用DataFrame选取显示字段
下面四种方法显示的结果相同。
4.2.1 select方法输入[字段名]
df.select('color','sex').show()
+------+---+
| color|sex|
+------+---+
|yellow| F|
| blue| M|
|yellow| F|
| black| F|
| red| M|
| red| M|
| blue| M|
| blue| M|
+------+---+
4.2.2 select方法输入[dataframe名称].[字段名]
df.select(df.color,df.sex).show()
+------+---+
| color|sex|
+------+---+
|yellow| F|
| blue| M|
|yellow| F|
| black| F|
| red| M|
| red| M|
| blue| M|
| blue| M|
+------+---+
4.2.3 select方法输入[dataframe别名].[字段名]
dataf.select(dataf.color,dataf.sex).show()
+------+---+
| color|sex|
+------+---+
|yellow| F|
| blue| M|
|yellow| F|
| black| F|
| red| M|
| red| M|
| blue| M|
| blue| M|
+------+---+
4.2.4 通过dataframe中括号方式
df[df['color'],df['sex']].show()
+------+---+
| color|sex|
+------+---+
|yellow| F|
| blue| M|
|yellow| F|
| black| F|
| red| M|
| red| M|
| blue| M|
| blue| M|
+------+---+
4.3 使用SparkSQL选取显示字段
sqlContext.sql('select color,sex from temp_tb').show()
+------+---+
| color|sex|
+------+---+
|yellow| F|
| blue| M|
|yellow| F|
| black| F|
| red| M|
| red| M|
| blue| M|
| blue| M|
+------+---+
五、三种方法增加计算字段
5.1 RDD增加计算字段
sRDD.map(lambda x:(x[0],x[1],x[2],10-int(x[1]))).collect()
[('yellow', '1', 'F', 9),('blue', '2', 'M', 8),('yellow', '3', 'F', 7),('black', '4', 'F', 6),('red', '5', 'M', 5),('red', '5', 'M', 5),('blue', '3', 'M', 7),('blue', '7', 'M', 3)]
5.2 DataFrame增加计算字段
df.select('color','num','sex',10-df['num']).show()
+------+---+---+----------+
| color|num|sex|(10 - num)|
+------+---+---+----------+
|yellow| 1| F| 9|
| blue| 2| M| 8|
|yellow| 3| F| 7|
| black| 4| F| 6|
| red| 5| M| 5|
| red| 5| M| 5|
| blue| 3| M| 7|
| blue| 7| M| 3|
+------+---+---+----------+
# 为计算字段取一个别名
df.select('color','num','sex',(10-df['num']).alias('diff_num')).show()
+------+---+---+--------+
| color|num|sex|diff_num|
+------+---+---+--------+
|yellow| 1| F| 9|
| blue| 2| M| 8|
|yellow| 3| F| 7|
| black| 4| F| 6|
| red| 5| M| 5|
| red| 5| M| 5|
| blue| 3| M| 7|
| blue| 7| M| 3|
+------+---+---+--------+
5.3 SparkSQL增加计算字段
sqlContext.sql('select color,num,sex,10-num as diff_num from temp_tb').show()
+------+---+---+--------+
| color|num|sex|diff_num|
+------+---+---+--------+
|yellow| 1| F| 9|
| blue| 2| M| 8|
|yellow| 3| F| 7|
| black| 4| F| 6|
| red| 5| M| 5|
| red| 5| M| 5|
| blue| 3| M| 7|
| blue| 7| M| 3|
+------+---+---+--------+
六、三种方法筛选数据
6.1 RDD筛选数据
sRDD.filter(lambda x:int(x[1])>2).collect()
[['yellow', '3', 'F'],['black', '4', 'F'],['red', '5', 'M'],['red', '5', 'M'],['blue', '3', 'M'],['blue', '7', 'M']]
6.2 DataFrame筛选数据
四种筛选方式,执行结果相同。
6.2.1 使用多个filter
df.filter("color='blue'").filter('num=2').show()
+-----+---+---+
|color|num|sex|
+-----+---+---+
| blue| 2| M|
+-----+---+---+
6.2.2 使用单个filter
df.filter("color='blue' and num=2 ").show()
+-----+---+---+
|color|num|sex|
+-----+---+---+
| blue| 2| M|
+-----+---+---+
6.2.3 使用[dataframe 名称].[字段名]指定条件
注意:
- 必须使用“&”,不能使用“and”
- 必须使用“==”,不能使用“=”
df.filter((df.color=='blue') & (df.num==2)).show()
+-----+---+---+
|color|num|sex|
+-----+---+---+
| blue| 2| M|
+-----+---+---+
6.2.4 使用[]指定筛选条件
df.filter((df['color']=='blue') & (df['num']==2)).show()
+-----+---+---+
|color|num|sex|
+-----+---+---+
| blue| 2| M|
+-----+---+---+
6.3 SparkSQL筛选数据
sqlContext.sql("""select * from temp_tb where color='blue' and num=2 """).show()
+-----+---+---+
|color|num|sex|
+-----+---+---+
| blue| 2| M|
+-----+---+---+
七、三种方法按单个字段给数据排序
7.1 RDD:takeOrdered方法
takeOrdered(num,key=None):
- num:要显示的项数
- key:使用lambda语句设置要排序的字段
# 升序示例
sRDD.takeOrdered(3,key=lambda x:int(x[1]))
[['yellow', '1', 'F'], ['blue', '2', 'M'], ['yellow', '3', 'F']]
# 降序示例
sRDD.takeOrdered(3,key=lambda x: -1 * int(x[1]))
[['blue', '7', 'M'], ['red', '5', 'M'], ['red', '5', 'M']]
7.2 DataFrame
# 升序
df.orderBy('num').show()
+------+---+---+
| color|num|sex|
+------+---+---+
|yellow| 1| F|
| blue| 2| M|
| blue| 3| M|
|yellow| 3| F|
| black| 4| F|
| red| 5| M|
| red| 5| M|
| blue| 7| M|
+------+---+---+
# 降序
df.orderBy('num',ascending=0).show()
+------+---+---+
| color|num|sex|
+------+---+---+
| blue| 7| M|
| red| 5| M|
| red| 5| M|
| black| 4| F|
| blue| 3| M|
|yellow| 3| F|
| blue| 2| M|
|yellow| 1| F|
+------+---+---+
7.3 SQparkSQL
# 升序
sqlContext.sql('select * from temp_tb order by num').show()
+------+---+---+
| color|num|sex|
+------+---+---+
|yellow| 1| F|
| blue| 2| M|
| blue| 3| M|
|yellow| 3| F|
| black| 4| F|
| red| 5| M|
| red| 5| M|
| blue| 7| M|
+------+---+---+
# 降序sqlContext.sql('select * from temp_tb order by num desc').show()
+------+---+---+
| color|num|sex|
+------+---+---+
| blue| 7| M|
| red| 5| M|
| red| 5| M|
| black| 4| F|
| blue| 3| M|
|yellow| 3| F|
| blue| 2| M|
|yellow| 1| F|
+------+---+---+
八、三种方法按多个字段排序
8.1 RDD
# 先num降序,color升序
sRDD.takeOrdered(3,key = lambda x: (-1*x[1],x[0]))
[['black', '4', 'F'], ['blue', '2', 'M'], ['blue', '3', 'M']]
8.2 DataFrame
df.orderBy(['num','color'],ascending=[0,1]).show()
+------+---+---+
| color|num|sex|
+------+---+---+
| blue| 7| M|
| red| 5| M|
| red| 5| M|
| black| 4| F|
| blue| 3| M|
|yellow| 3| F|
| blue| 2| M|
|yellow| 1| F|
+------+---+---+
df.orderBy(df.num.desc(),df.color).show()
+------+---+---+
| color|num|sex|
+------+---+---+
| blue| 7| M|
| red| 5| M|
| red| 5| M|
| black| 4| F|
| blue| 3| M|
|yellow| 3| F|
| blue| 2| M|
|yellow| 1| F|
+------+---+---+
8.3 SparkSQL
sqlContext.sql("select * from temp_tb order by num desc , color ").show()
+------+---+---+
| color|num|sex|
+------+---+---+
| blue| 7| M|
| red| 5| M|
| red| 5| M|
| black| 4| F|
| blue| 3| M|
|yellow| 3| F|
| blue| 2| M|
|yellow| 1| F|
+------+---+---+
九、三种方法显示不重复的数据
9.1 RDD
sRDD.map(lambda x:x[2]).distinct().collect()
['F', 'M']
sRDD.map(lambda x:(x[1],x[2])).distinct().collect()
[('3', 'F'),('4', 'F'),('5', 'M'),('3', 'M'),('7', 'M'),('1', 'F'),('2', 'M')]
9.2 DataFrame
df.select('sex').distinct().show()
+---+
|sex|
+---+
| F|
| M|
+---+
df.select('num','sex').distinct().show()
+---+---+
|num|sex|
+---+---+
| 2| M|
| 3| F|
| 4| F|
| 1| F|
| 5| M|
| 3| M|
| 7| M|
+---+---+
9.3 SparkSQL
sqlContext.sql("select distinct sex from temp_tb").show()
+---+
|sex|
+---+
| F|
| M|
+---+
sqlContext.sql("select distinct num,sex from temp_tb").show()
+---+---+
|num|sex|
+---+---+
| 2| M|
| 3| F|
| 4| F|
| 1| F|
| 5| M|
| 3| M|
| 7| M|
+---+---+
十、三种方法分组统计数据
10.1 RDD:map/reduce
在RDD中进行数据的分组统计,必须使用map/reduce
# 单字段:eg:按照sex分组统计
sRDD.map(lambda x:(x[2],int(x[1]))).reduceByKey(lambda x,y:x+y).collect()
[('F', 8), ('M', 22)]
# 多字段
sRDD.map(lambda x:((x[2],x[0]),int(x[1]))).reduceByKey(lambda x,y:x+y).collect()
[(('F', 'yellow'), 4),(('M', 'blue'), 12),(('F', 'black'), 4),(('M', 'red'), 10)]
10.2 DataFrame
df.select(['sex','num']).groupBy('sex').sum().show()
+---+--------+
|sex|sum(num)|
+---+--------+
| F| 8|
| M| 22|
+---+--------+
df.select(['sex','color','num']).groupBy(['sex','color']).sum().orderBy(['sex','color']).show()
+---+------+--------+
|sex| color|sum(num)|
+---+------+--------+
| F| black| 4|
| F|yellow| 4|
| M| blue| 12|
| M| red| 10|
+---+------+--------+
10.2.1 crosstab:长表变宽表
df.crosstab('color','sex').show()
+---------+---+---+
|color_sex| F| M|
+---------+---+---+
| yellow| 2| 0|
| red| 0| 2|
| black| 1| 0|
| blue| 0| 3|
+---------+---+---+
10.3 SparkSQL
sqlContext.sql('select sex,sum(num) from temp_tb group by sex').show()
+---+--------+
|sex|sum(num)|
+---+--------+
| F| 8|
| M| 22|
+---+--------+
参考资料
《Python+Spark 2.0+Hadoop机器学习与大数据实战》, 林大贵,清华大学出版社,2017-12,9787302490739
