1,DataFrame的基本使用
DSL方法–DataFrame方法
其中包括的方法有:select(),selectExpr(),groupby()/groupBy() where ,orderBy(),sort(),limit(),withColumn(),
from pyspark.sql import SparkSession#创建ss对象ss = SparkSession.builder.getOrCreate()# 创建sc对象sc= ss.sparkContext#读取hdfs上的文件数据转换成rdd对象rdd1 = sc.textFile('/test/stu.txt')rdd_split = rdd1.map(lambda x:x.split(','))print(rdd1.take(10))df1 = rdd_split.toDF(schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')df1.show()df1.printSchema()print('----------------select-----------------------------')#查询所有列数据df1_select = df1.select('*')df1_select.show()#查询指定条目数据print(df1_select.take(5))#查询指定列数据#通过三种方式获取列 '列名' df.列名 df['列名']df_select2 = df1_select.select('id',df1['name'],df1.gender)df_select2.show()#查询列时修改显示列名df_select3 = df1.select(df1['name'].alias('stu_name'),df1.gender.alias('stu_gender'))#通过cast()修改字段类型,格式为df.列名.cast('修改后的列名')df_select4 = df1.select(df1.id.cast('int'),df1.name,df1['age'].cast('int'),df1['gender'],df1['major'],df1['birthday'])#打印数据的数据结构df_select4.printSchema()print('-----------------selectExpr------------------------------')#selectExpr(),写入字符串表达式 和sql代码类似df_select5 = df1.selectExpr('id as user_id','name','gender as stu_gender')df_select5.show()#通过cast修改字段类型df_select6 = df1.selectExpr('cast(id as int)','cast(age as int)')df_select6.printSchema()print('-----------------where------------------------------')#过滤满足条件的数据,减少行数df_where1 = df_select4.where('age>22')df_where1.show()#过滤查询单个过滤条件三种方式df_where0 = df_select4.where('age>22')df_where0.show()df_where2 = df_select4.where(df_select4['name'] == '王刚')df_where2.show()df_where3 = df_select4.where(df_select4.name == '王刚')df_where3.show()#多个过滤条件两种过滤方式df_where4 = df_select4.where('age >22 and name = "王刚"')df_where4.show()df_where5 = df_where4.where((df_select4['age'] > 22) & (df_where4['name']=='王刚'))df_where5.show()# print('---------------------groupBy-------------------------------')# #计算不同性别的平均年龄# #聚合函数获取列名 ->'列名'# #df.toDF(列名1,列名2……)修改列名注意点:写所有的列名,如果不修改也需要写原列名# df_groupby1 = df_select4.groupby('gender').mean('age').toDF('gender','avg_age')## #第二种修改列名的方式# #df_groupby1.select('gender',df_groupby1['avg(age)'].alias('avg_age2')).show()## df_groupby1.show()### df_groupby2 = df_select4.groupby('major','gender').avg('age').toDF('majot','gnder','avg_age')# df_groupby2.show()## print('-------------------------groupby/groupBy where -------------------')# #分组后加where条件# df_group3_where = df_select4.groupby('major','gender').mean('age').where('avg(age)>21.5')# df_group3_where.show()print('---------------------orderBy/sort 排序 操作---------------------')#对出生日期进行降序排序df_orderby1= df_select4.orderBy('birthday',ascending=False)df_orderby1.show()df_orderby2= df_select4.orderBy('birthday',ascending=True)df_orderby2.show()#对多列进行排序操作df_orderby3 = df_select4.orderBy(['age','birthday'],ascending=[False,True])df_orderby3.show()df_sort1= df_select4.sort('age',ascending=True)df_sort1.show()from pyspark.sql.functions import *#方法来自于导入模块df_sort3 = df_select4.sort(desc('age'),asc('birthday'))df_sort3.show()print('--------------limit----------------------------------')#打印展示指定条目数局的df对象"""limit和show区别:limit生成新的df行数据由num确定show展示的一个行数据,不是df中的行数 df有100行数据,可以通过show方法展示其中给你前30行数据"""#默认展示二十行数据df_select4.show(n=30)#truncate:是否进行折叠省略,默认折叠True,False不折叠/不省略df_select4.show(truncate=False) #不省略df_limit = df_select4.limit(num=10)df_limit.show()print('-----------withColumn新增列--------------------')#第一个参数:新列名#第二个参数:新列的计算逻辑df_withcolumn = df_select4.withColumn('name_age',df_select4['age']+10)df_withcolumn.show()#新增一列常熟列F.lit()中from pyspark.sql import functions as Fdf_withcolumn2 = df_select4.withColumn('new_age',F.lit('10'))df_withcolumn2.show()#withColumnRenamed#第一个参数员列名#第二个参数:新列名"""修改列名方式①df.select(df['列名'].alias('新列明'))②df.toDF(列名1,列名2,……)③df.withColumnRenamed(原列名,新列名)"""df_withcolumnRenamed = df_select4.withColumnRenamed('age','new_age1')df_withcolumnRenamed.show()
sql语句的方式
其中包括的执行步骤为:
①先创建ss对象,再根据ss.sparkContext创建sc对象
②通过sc获取hdfs上的数据
③在对数据进行格式转换后,再将数据转为dataFrame结构
④再通过df创建一张表,给定表名
⑤在根据sql语句查询自己需要的数据
from pyspark.sql import SparkSession#创建ss对象ss = SparkSession.builder.getOrCreate()# 创建sc对象sc= ss.sparkContext#读取hdfs上的文件数据转换成rdd对象rdd1 = sc.textFile('/test/stu.txt')rdd_split = rdd1.map(lambda x:x.split(','))print(rdd1.take(10))df1 = rdd_split.toDF(schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')df1.show()df1.printSchema()#第一步:将需要处理的df对象映射成一张表 命名一个表名#表名随意df1.createTempView('stu')#第二部:执行sql语句df2 = ss.sql('select cast(id as string),name as user_name,gender,age from stu where age>22')df2.show()
2,DSL方法–join
#join关联方法# 多个df的关联操作from pyspark.sql import SparkSessionfrom pyspark.sql.types import *# 生成SparkSession对象ss = SparkSession.builder.getOrCreate()# SparkSession中可以通过sparkContext获取sparkContext对象sc = ss.sparkContext# 生成rddrdd1 = sc.parallelize([[1, 'zhangsan', 20],[2, 'lisi', 20],[3, 'wangwu', 22]])rdd2 = sc.parallelize([[3, 'zhaoliu', 20],[4, 'xiaoming', 21],[5, 'itcast', 22]])# 定义schema信息schema_type = StructType(). \add('id', IntegerType()). \add('name', StringType()). \add('age', IntegerType(), False)# toDF 将二维rdd数据转为dataframe数据df1 = rdd1.toDF(schema_type)df2 = rdd2.toDF(schema_type)df1.show()df2.show()#on:关联条件如果两个dff的字段名相同可以直接使用列名 ->可以有多个关联条件#how:关联方式,默认是inner,内连接#df_join中只有id相同的关联成功,且只显示关联成功的df_innerjoin = df1.join(df2,on='id',how='inner')df_innerjoin.show()#df_join中有两个关联字段df_join2 = df1.join(df2,on=df1.id == df2.id)df_join2.show()#结果# | id| name|age| id| name|age|# +---+------+---+---+-------+---+# | 3|wangwu| 22| 3|zhaoliu| 20|#左连接 how = 'left'df_leftjoin = df1.join(df2,on=['id'],how='left')df_leftjoin.show()#如果不指定how那么默认是innerjoin 内连接df_leftjoin2 = df1.join(df2,on=df1.id == df2.id,how='left')df_leftjoin2.show()#右连接df_right_join = df1.join(df2,on=['name'],how='right')df_right_join.show()#满外连接how = fulldf_full_joinn = df1.join(df2,on='id',how='full')df_full_joinn.show()#交叉连接,笛卡尔积(3X3=9条数据)df_cross_join = df1.crossJoin(df2)df_cross_join.show()
3,DSL-数据清洗
其中包括的方法有:toDF 将二维rdd数据转为dataframe数据,null值处理填充fillna(),删除dropna(),去重dropDuplicates()
#join关联方法# 多个df的关联操作from pyspark.sql import SparkSessionfrom pyspark.sql.types import *# 生成SparkSession对象ss = SparkSession.builder.getOrCreate()# SparkSession中可以通过sparkContext获取sparkContext对象sc = ss.sparkContext# 生成rddrdd1 = sc.parallelize([[1, 'zhangsan', 20],[2, 'lisi', 20],[3, 'wangwu', 22]])rdd2 = sc.parallelize([[3, 'zhaoliu', 20],[4, 'xiaoming', 21],[5, 'itcast', 22]])# 定义schema信息schema_type = StructType(). \add('id', IntegerType()). \add('name', StringType()). \add('age', IntegerType(), False)# toDF 将二维rdd数据转为dataframe数据df1 = rdd1.toDF(schema_type)df2 = rdd2.toDF(schema_type)df1.show()df2.show()df_innerjoin = df1.join(df2,on='id',how='left')df_innerjoin.show()#df.drop()df_drop = df_innerjoin.drop('name')df_drop.show()#删除多列数据df_drop2 = df_innerjoin.drop('id','name')df_drop2.show()#null处理:#①填充,使用值对null值进行替换# ②删除,删除null值行数据#填充null值#常数值->常数值类型是什么就是对df中对应的类型列进行填充,常熟值是数值类型,对df数值列进行填充df_fillnal1 = df_innerjoin.fillna(100)df_fillnal1.show()df_fillnam2 = df_innerjoin.na.fill('小明')df_fillnam2.show()df_fillnal3 = df_innerjoin.toDF('id','name','age','name_2','age_2').fillna(value={'age_2':100,'name_2':'小明'})df_fillnal3.show()df_fillnal4 = df_innerjoin.toDF('id','name','age','name_2','age_2').fillna(value=100,subset=['name_2','age_2'])df_fillnal4.show()#删除数据行#hoe:any,任意列值为null就要删除行数据 all所有列值为null才删除行数据df_dropna1 = df_innerjoin.toDF('id','name','age','name_2','age_2').dropna(how='any')df_dropna1.show()df_dropn2 = df_innerjoin.toDF('id','name','age','name_2','age_2').dropna(how='all')df_dropn2.show()df_dropna3 = df_innerjoin.toDF('id','name','age','name_2','age_2').dropna(thresh=2)df_dropna3.show()#交叉连接,笛卡尔积(3X3=9条数据)df_cross_join = df1.crossJoin(df2)df_cross_join.show()#去重->去除重复的行数据#subset:指定去重的列,不指定是对所有列进行去重df_dropDuplicates1 = df_cross_join.dropDuplicates()df_dropDuplicates1.show()#指定subset[]df_dropDuplicates2 = df_cross_join.toDF('id','name','age','id_2','name_2','age_2').dropDuplicates(subset=['id','name','age'])df_dropDuplicates2.show()
4,内置函数
4.1字符串函数
包含的方法有:concat(),concat_ws(),split(),substring(),substring_index(),replace(),regexp_extract()
# DF的内置函数# DF的数据是结构化数据,所以DSL方法和SQL的关键字基本一致# todo:导入内置函数模块,as命名别称F 使用模块时可以用F表示from pyspark.sql import SparkSession, functions as F# 创建SparkSession对象ss = SparkSession.builder.getOrCreate()# 使用sparkcontext读取hdfs上的文件数据sc = ss.sparkContext# 将读取的文件数据转化为rddrdd = sc.textFile('hdfs://node1:8020/test/stu.txt')res1 = rdd.collect()print(f'rdd文件数据:{res1}')# 将每行字符串数据切割转为二维rddrdd_split = rdd.map(lambda x: x.split(','))res2 = rdd_split.collect()print(f'切割后的rdd数据:{res2}')# 将rdd转为df数据df = rdd_split.toDF(schema='id string,name string,gender string,age string ,birthday string,major string,hobby string,create_time string')# 查看转化的df数据df.show()print('---------------------concat/concat_ws 拼接----------')#concat(列名1,列名2,……)df_concat1 = df.select(F.concat('name',df.gender,df['age']))df_concat1.show()#conat_ws(拼接符,列名1,列名2)df_concatws2 = df.select(F.concat_ws('-','name',df.gender,df['age']))df_concatws2.show()print('---------splilt----------------')#split(列名,分隔符),以指定分隔符对字符串列的数据进行分割操作,返回列表# def_split = df.select(F.split('birthday','-').alias('new_col'))def_split = df.select(F.split('birthday','-')[0].alias('year'),F.split('birthday','-')[1].alias('month'),F.split('birthday','-')[2].alias('day'))def_split.show()df_split2 = df.select(F.split('birthday','-',limit=11))df_split2.show()df_split3 = df.select(F.split('birthday','-',limit=2))print('-------------------substring/substring_index截取字符串-------------')#substring(列名,起始位置,截取长度) ->从起始位置截取指定长度的子串#其实位置从1开始,0,1的效果是一样的df.select(F.substring('birthday',0,4)).show()df.select(F.substring('birthday',1,4)).show()#substring_index(列名,截取符,第n个截取符号)#n:正数,从左往右数截取符,截取左侧部分#n:负数,从右往左数截取符df.select(F.substring_index('birthday','-',1)).show()df.select(F.substring_index('birthday','-',2)).show()df.select(F.substring_index('birthday','-',-1)).show()df.select(F.substring_index('birthday','-',-2)).show()
6.2日期时间函数
# DF的内置函数# DF的数据是结构化数据,所以DSL方法和SQL的关键字基本一致# todo:导入内置函数模块,as命名别称F 使用模块时可以用F表示from pyspark.sql import SparkSession, functions as F# 创建SparkSession对象ss = SparkSession.builder.getOrCreate()# 使用sparkcontext读取hdfs上的文件数据sc = ss.sparkContext# 将读取的文件数据转化为rddrdd = sc.textFile('hdfs://node1:8020/test/stu.txt')res1 = rdd.collect()# print(f'rdd文件数据:{res1}')# 将每行字符串数据切割转为二维rddrdd_split = rdd.map(lambda x: x.split(','))res2 = rdd_split.collect()# print(f'切割后的rdd数据:{res2}')# 将rdd转为df数据df = rdd_split.toDF(schema='id string,name string,gender string,age string ,birthday string,major string,hobby string,create_time string')# 查看转化的df数据# df.show()print('--------------------------------获取当前时间-------------------------')#获取年月日df.select(F.current_date()).show()#获取年月日时分秒毫秒#truncate=False 取消省略df.select(F.current_timestamp()).show(truncate=False)#获取时间戳df.select(F.unix_timestamp()).show()print('--------------------------------日期时间转换-------------------------')#date_format(日期时间类型列,格式)#y 年 M:月 d:天 H:时 m:分 s:秒df.select(F.date_format('birthday','MM/dd/yyyy')).show()print('--------------------------------#时间戳转换成日期时间-------------------------')#from_unixtime(时间戳类型列,格式)df_timestamp = df.select(F.current_timestamp())df.select(F.from_unixtime('create_time','yyyy-MM-dd')).show(truncate=False)print('--------------------------------时间取值-------------------------')#获取年df.select(F.year('birthday')).show()# 获取月份month()df.select(F.month('birthday')).show()#获取日df.select(F.dayofmonth('birthday')).show()#获取当前年份第多少天df.select(F.dayofyear('birthday')).show()#获取当前星期第多少天0-6,1是周日df.select(F.dayofweek('birthday')).show()#获取当前月1的最后一天日期df.select(F.last_day('birthday')).show()print('--------------------------------#时间加减-------------------------')#date_sub(日期时间类型列,天数) 减少多少天df.select(F.date_sub(F.current_date(),10)).show()#date_add(日期时间类型列,天数)加多少天df.select(F.date_add(F.current_date(),10)).show()#datediff(结束日期,开始日期)做差,获取天数差值df.select(F.datediff(F.current_date(),'birthday')).show()
6.3聚合函数
# DF的内置函数# DF的数据是结构化数据,所以DSL方法和SQL的关键字基本一致# todo:导入内置函数模块,as命名别称F 使用模块时可以用F表示from pyspark.sql import SparkSession, functions as F# 创建SparkSession对象ss = SparkSession.builder.getOrCreate()# 使用sparkcontext读取hdfs上的文件数据sc = ss.sparkContext# 将读取的文件数据转化为rddrdd = sc.textFile('hdfs://node1:8020/test/stu.txt')res1 = rdd.collect()# print(f'rdd文件数据:{res1}')# 将每行字符串数据切割转为二维rddrdd_split = rdd.map(lambda x: x.split(','))res2 = rdd_split.collect()# print(f'切割后的rdd数据:{res2}')# 将rdd转为df数据df = rdd_split.toDF(schema='id string,name string,gender string,age string ,birthday string,major string,hobby string,create_time string')# 查看转化的df数据# df.show()print('--------------------------------agg聚合函数-------------------------')#functions模块中的聚合函数要结合agg函数一起使用df_agg = df.groupby('gender').agg(F.sum(df.age.cast('int')).alias('sum_data'))df_agg.show()df_select = df.select(df.id.cast('int'),df.name,df['gender'],df['age'].cast('int'),df.birthday,df.major,df.hobby,df.create_time)#聚合函数#对某列进行聚合操作,返回一个值df_select.select(F.sum('age')).show()#分组聚合# count()不需要写列名df_select.groupby('gender').count().show()#内置函数中的聚合函数可以和agg函数结合使用#对一列进行分组聚合操作df_select.groupby('gender').agg(F.count('id').alias('cnt')).show()#对多列进行分组聚合操作#round(列名,位数) ->保留几位小数df_select.groupby('gender').agg(F.count('id').alias('cnt'),F.round(F.avg('age'),2).alias('avg_age'),F.max('age').alias('max_age')).show()#{key:value}->key:聚合字段名,value:聚合函数名df_select.groupby('gender').agg({'id':'count','age':'mean'}).show()