1,SparkSession对象操作
from pyspark.sql import SparkSessionfrom pyspark import SparkConffrom pyspark.sql import functions as F"""创建ss对象时可以指定一些参数如果参数在脚本中不生效,就需要通过saprk-submit指令中进行设置spark sql 的分区数是由catalyst引擎的优化器决定发生shuffle过程(遇到宽依赖算子时)分区数会调整为200个,200个分区对应者200个task任务可以通过spark.sql.shuffffle.partitions调整shuffle过程中的分区数(根据实际业务情况调整)"""# conf = SparkConf().set('driver-mimory','2g').set('num-executors','3')conf = SparkConf().set('spark.sql.shuffle.partitions','6')ss = SparkSession.\builder.\master('yarn').\appName('yarn_demo').\config(conf=conf).\getOrCreate()# 创建sc对象sc= ss.sparkContext#读取hdfs上的文件数据转换成rdd对象rdd1 = sc.textFile('/test/stu.txt')rdd_split = rdd1.map(lambda x:x.split(','))print(rdd1.take(10))df1 = rdd_split.toDF(schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')df1.show()#通过cast()修改字段类型,格式为df.列名.cast('修改后的列名')df_select4 = df1.select(df1.id.cast('int'),df1.name,df1['age'].cast('int'),df1['gender'],df1['major'],df1['birthday'])# print(df_select4.collect())df_groupby= df_select4.groupby('gender').agg(F.avg('age').alias('avg_age'))df_groupby.show()
2,数据源和格式
1.1数据读取
from pyspark.sql import SparkSession,functions as Fss = SparkSession.builder.master('local[*]').appName('local_demo').getOrCreate()print('--------------------txt格式文件----------------')#将读取到的数据保存到value列中df1 = ss.read.text(paths='/test/words.txt')# df1.show(truncate=False)# df1.printSchema()df_txt =df1.select(F.split('value',',')[0].alias('id'),F.split('value', ',')[1].alias('name'),F.split('value', ',')[2].alias('age'),F.split('value', ',')[3].alias('gender'),)# df_txt.show()# df_txt.printSchema()print('--------------------csv格式文件----------------')#path:文件路径#sep:分隔符,默认时逗号# schema:表结构,列名,类型# header:加载第一行列名信息#inferSchema:自动解析表结构df_csv = ss.read.csv(path ='/test/stu.csv',sep=',',inferSchema=True,schema='name string,age int,gender string,phone string,email string,city string,address string')df_csv.show()#另一种书写方式ss.read.load(path= '/test/stu.csv',format='csv',schema='name string,age int,gender string,phone string,email string,city string,address string').show()# ss.read.format('csv')print('--------------------json格式文件----------------------')df_json = ss.read.json(path = '/test/x0.json')df_json.show(truncate=False)print('--------------------mysql格式文件----------------------')df_mysql = ss.read.jdbc(url='jdbc:mysql://node1:3306/BI_db?characterEncoding=utf-8',table='test',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})df_mysql.show()print('--------------------orc格式文件读取----------------------')df_orc = ss.read.orc('file:///export/server/spark/examples/src/main/resources/users.orc')df_orc.show()print('--------------------parquet格式文件读取----------------------')df_parquet = ss.read.parquet('file:///export/server/spark/examples/src/main/resources/users.parquet')df_parquet.show()
1.2数据写入
"""ss.write.text/json/csv/jdbc()mode:写入模式overwrite:覆盖写,append:追加写"""from pyspark.sql import SparkSession,functions as Fss = SparkSession.builder.getOrCreate()df = ss.createDataFrame([[1,'张三',20,'男'],[2,'王五',21,'女']],schema='id int,name string,age int,gender string')df.show()print('-----------------text文件---------------')#对字符串进行处理,以字符串类型保存到value字段中df_text = df.select(F.concat_ws(',','id','name','age','gender').alias('value'))df_text.show()# path:目录路径 按照分区数据写入到目录下的文件中# df_text.write.text(path='/test/data_test')# df_text.write.save(path='/test/data_test',format='text',mode='append')df_text.write.mode('overwrite').format('text').save(path='/test/data_test')print('-----------------csv文件---------------')#header:是否将列名写入df.write.csv(path='/test/data_csv',mode='overwrite',header=True)print('-----------------json文件---------------')df.write.mode('overwrite').format('json').save(path='/test/data_json')print('-----------------mysql表文件---------------')#参数说明#table:表不存在的话会自动创建#mode:写入的模式有两种overwrite和append,需要指定,不指定第一遍创建可以成功第二遍创建会失败df.write.jdbc(url='jdbc:mysql://node1:3306/BI_db?characterEncoding=utf-8',table='test2',mode='append',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})
3,自定义函数
业务中需求无法是使用内置函数处理数据时,可以来自己定义函数实现需求处理
3.1分类
- UDF函数
- 一对一关系, df中的一行数据经过函数处理返回一行计算结果
- concat()/concat_ws()/split()…
- 可以自定义
- UDAF函数 聚合函数
- 多对一关系, df中的多行数据经过函数处理返回一行计算结果
- sum()/avg()/count()…
- 可以自定义
- UDTF函数
- 一对多关系, df中的一行数据经过函数处理返回多行计算结果
- explode() -> 爆炸函数, 接受容器类型(array or map type), 将容器中的元素拆分成多行
3.1UDTF函数
from pyspark.sql import SparkSession,functions as Fss = SparkSession.builder.getOrCreate()#读取文件df1 = ss.read.text(paths='/test/words.txt')df1.show(truncate=False)#对value字段中的字符串数据以逗号进行分割,返回列表df_split = df1.select(F.split('value',',').alias('words_list'))df_split.show(truncate=False)#使用udtf函数对列表进行拆分df3 = df_split.select(F.explode('words_list').alias('word'))df3.show()df3.groupby('word').count().orderBy('count',ascending=False).show()#sql方式df1.createTempView('test')res_df = ss.sql('select split(value,',') from test')res_df.show()res_df.printSchema()
3.2,UDF函数使用
自定义udf函数需要啊先注册才能够使用
两种注册方式:
普通注册:
import refrom pyspark.sql import SparkSession, functions as Ffrom pyspark.sql.types import StringType, ArrayTypess = SparkSession.builder.master('local[*]').appName('local_demo').getOrCreate()df_csv = ss.read.csv(path='/test/stu.csv', sep=',', inferSchema=True,schema='name string,age int,gender string,phone string,email string,city string,address string')df_csv.show()# 需求获取用户名称公司名称信息def get_emial(x):# print('x的值是'+x)# 通过正则表达式获取想要的部分res = re.match("(.*?)@(.*?)\.(.*)", x)# print(res)name = res.group(1)company = res.group(2)# 返回列表return [name, company]# 方式一:普通注册email_func = ss.udf.register(name='new_func', f=get_emial, returnType=ArrayType(StringType()))# 在df对象中使用自定义函数new_df = df_csv.select('name', 'age', email_func('email'))new_df.show()# Sql方式df_csv.createTempView('stu')sql_df = ss.sql("select name,age,new_func(email)[0] as user_name,new_func(email)[1] as company from stu")sql_df.show()
装饰器注册方式
UDF只能在DSL方式中使用
import refrom pyspark.sql import SparkSession, functions as Ffrom pyspark.sql.types import *ss = SparkSession.builder.master('local[*]').appName('local_demo').getOrCreate()df_csv = ss.read.csv(path='/test/stu.csv', sep=',', inferSchema=True,schema='name string,age int,gender string,phone string,email string,city string,address string')# 需求获取用户名称公司名称信息#步骤一:定义函数#步骤2,将自定义@F.udf(returnType=ArrayType(StringType()))def get_emial(x):# print('x的值是'+x)# 通过正则表达式获取想要的部分res = re.match("(.*?)@(.*?)\.(.*)", x)# print(res)name = res.group(1)company = res.group(2)# 返回列表return [name, company]# 在df对象中使用自定义函数new_df = df_csv.select('name', 'age', get_emial('email'))new_df.show()# # Sql方式,装饰器方式不能在sql方式中使用# df_csv.createTempView('stu')# sql_df = ss.sql("select name,age,new_func(email)[0] as user_name,new_func(email)[1] as company from stu")# sql_df.show()
4,UDAF函数
注意:UDAF函数需要借助pandas中的series类型进行操作
UDAF函数中多行数就是pandas中的series类型数据
pandas介绍:
pandas是python中一个数据分析包,需要通过pip install pandas进行安装
4.1pandas有两种数据类型:Series和DataFrame
import pandas as pd#创建series对象s1 = pd.Series(data=[1,2,3,4])#不指定索引时默认生成0,1,2,3,4print(s1)#指定行索引 index=s2 = pd.Series(data=(5,6,7,8),index=['a','b','c','d'])print(s2)print(type(s2))print(type(s1))#获取具体值#根据行索引获取对应位置的值print(s1[0])#通过key获取值print(s2['a'])#使用聚合函数print(s1.sum())print(s1.mean())# print(s1.cumsum())#获取行索引print(s1.index)
4.2dataFrame对象操作
import pandas as pd#创建对象
df = pd.DataFrame(data=[[1,'张三',12],[2,'李四',23]])
print(df)#指定行列索引
df2 = pd.DataFrame(data=[[1,'张三',12],[2,'李四',23],[3,'w',12]],index=['a','b','c'],columns=['id','name','age'])
print(df2)
#获取df中的数据值
#通过df[列名]->获取列数据
print(df2['id'])
print(df2.age)
#得到一个df对象
print(df2[['id','name']])
"""
loc:通过索引标签值获取数据
iloc:通过索引下标值获取数据
"""
#获取行数据
print(df2.loc['a'])
print(df2.iloc[0])#获取列数据
print(df2.loc[:,'id'])
print(df2.loc[:,['id','name']])
print(df2.iloc[:,0])
print(df2.iloc[:,[0,2]])
#获取行列数据
print(df2.loc['b','name'])
print(df2.iloc[1,1])#聚合函数
print(df2.sum())
print(df2['age'].mean())#分组聚合
print(df2.groupby('id')['age'].sum())
4.3pandas和spark的df相互转换
import pandas as pddf2 = pd.DataFrame(data=[[1,'张三',12],[2,'李四',23],[3,'w',12]],index=['a','b','c'],columns=['id','name','age'])from pyspark.sql import SparkSession# 创建ss对象ss = SparkSession.builder.getOrCreate()
spark_df = ss.createDataFrame(data=df2,schema='id int,name string,age int')spark_df.show()#saprk的df对象转换成pandas的df对象
new_pandas_dfd = spark_df.toPandas()
print(new_pandas_dfd)
4.4UDAF函数使用
注意点:需要安装pyspark模块
pyspark代码是会转换成java代码, 而pandas是python中特有的模块, java中没有此模块
自定义UDAF函数只能通过装饰器方式注册
自定义UDAF函数只能在DSL方式中使用
import pandas as pd
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss = SparkSession.builder.getOrCreate()df_csv = ss.read.csv(path ='/test/stu.csv',sep=',',inferSchema=True,schema='name string,age int,gender string,phone string,email string,city string,address string')df_csv.groupby('gender').agg(F.mean('age').alias('avg_age')).show()
#手写聚合函数mean()
"""
注意点:
①:需要指定自定义函数的参数类型,pandas的series类型
②:需要指定自定义函数的返回值类型python的类型
"""
@F.pandas_udf(returnType=FloatType())
def avg_age(age:pd.Series) ->float:print('age的值',age)res = age.mean()return res#第三步在df对象中使用udaf函数
df_csv.select(avg_age('age')).show()#sql方式
#将自定义rdaf函数
new_func = ss.udf.register('new_func',avg_age)
df_csv.createTempView('stu')
ss.sql("select gender,new_func(age) from stu group by gender").show()