Spark SQL进阶

DataFrame详解

清洗相关API

  • 去重API
    在这里插入图片描述
  • 删除空缺值的API
    在这里插入图片描述
  • 替换缺失值的API
    在这里插入图片描述
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.appName('sparksql_etl_api')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.read.csv(path='file:///export/data/clear_data.csv',sep=',',encoding='UTF-8',header="True",inferSchema=True)init_df.printSchema()# 3- 数据处理# 3.1- 删除重复数据:dropDuplicates"""dropDuplicates总结:用来删除重复数据。如果没有指定参数subset,那么要比对行中的所有字段内容,如果全部相同,就认为是重复数据,会被删除;如果有指定参数subset,那么只比对subset中指定的字段范围"""init_df.dropDuplicates().show()init_df.dropDuplicates(subset=["id","name"]).show()# 指定不存在的字段会报错:AnalysisException: Cannot resolve column name "id2" among (id, name, age, address)# init_df.dropDuplicates(subset=["id2","name"]).show()print("-"*30)# 3.2- 删除缺失值数据:dropna"""dropna(thresh,subset):删除缺失值数据.1- 如果不传递任何参数,只要有任意一个字段值为null,那么就删除整行数据2- 如果只指定了subset,那么空值的检查,就只会限定在subset指定的范围内3- 如果只指定了thresh,那么空值检查的这些字段中,至少需要有thresh(>=thresh)个字段的值不为空,才不会被删除"""init_df.dropna().show()init_df.dropna(subset=["id","name"]).show()init_df.dropna(thresh=1,subset=["name","age","address"]).show()init_df.dropna(thresh=2,subset=["name","age","address"]).show()init_df.dropna(thresh=2).show()print("-" * 30)# 3.3- 替换缺失值数据:fillna"""fillna(value,subset):替换缺失值数据value:必须要传递参数.是用来填充缺失值的subset:限定缺失值替换范围注意:1-value如果不是字典,那么只会替换字段类型匹配的空值2-最常用的是value传递字典的形式"""init_df.fillna(value=999).show()init_df.fillna(value=999,subset=["id","name"]).show()init_df.fillna(value={"id":111,"name":"未知姓名","age":100,"address":"北京"}).show()# 4- 数据输出# 5- 释放资源spark.stop()
总结:
1- dropDuplicates总结:用来删除重复数据。如果没有指定参数subset,那么要比对行中的所有字段内容,
如果全部相同,就认为是重复数据,会被删除;如果有指定参数subset,那么只比对subset中指定的字段范围2- dropna(thresh,subset):删除缺失值数据.2.1- 如果不传递任何参数,只要有任意一个字段值为null,那么就删除整行数据2.2- 如果只指定了subset,那么空值的检查,就只会限定在subset指定的范围内2.3- 如果只指定了thresh,那么空值检查的这些字段中,至少需要有thresh(>=thresh)个字段的值不为空,才不会被删除3- fillna(value,subset):替换缺失值数据3.1- value:必须要传递参数.是用来填充缺失值的3.1- subset:限定缺失值替换范围注意:3.1- value如果不是字典,那么只会替换字段类型匹配的空值3.2- 最常用的是value传递字典的形式

Spark SQL的Shuffle分区设置

Spark SQL底层本质上还是Spark的RDD程序,认为 Spark SQL组件就是一款翻译软件,用于将SQL/DSL翻译为Spark RDD程序, 执行运行

​ Spark SQL中同样也是存在shuffle的分区的,在执行shuffle分区后, shuffle分区数量默认为 200个,但是实际中, 一般都是需要调整这个分区的, 因为当数据量比较少的数据, 200个分区相对来说比较大一些, 但是当数据量比较大的时候, 200个分区显得比较小

如何调整shuffle分区数量? spark.sql.shuffle.partitions

方案一(不推荐):  直接修改spark的配置文件spark-defaults.conf。全局设置,默认值为200。设置为: spark.sql.shuffle.partitions     20方案二(常用,推荐使用): 在客户端通过submit命令提交的时候, 动态设置shuffle的分区数量。部署、上线的时候、基于spark-submit提交运行的时候./spark-submit --conf "spark.sql.shuffle.partitions=20"方案三(比较常用): 在代码中设置。主要在测试环境中使用, 但是一般在部署上线的时候, 会删除。优先级也是最高的。一般的使用场景是,当你的数据量未来不会发生太大的波动。sparkSession.conf.set('spark.sql.shuffle.partitions',20)
import timefrom pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':start = time.time()# 1- 创建SparkSession顶级对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('sparksql_wordcount_demo')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.read.text(paths='hdfs://node1:8020/input/words.txt')# 创建临时视图init_df.createTempView('words')# init_df.show()# init_df.printSchema()# 3- 数据处理# 3.1- SQL方式:子查询spark.sql("""selectword,count(1) as cnt,max(word) as max_word,min(word) as min_wordfrom (selectexplode(split(value,' ')) as wordfrom words) group by word""").show()print("-"*50)# 3.2- SQL方式:侧视图spark.sql("""selectword,count(1) as cntfrom words-- 侧视图lateral view explode(split(value,' ')) t as wordgroup by word""").show()print("DSL运行结果")print("-" * 50)# 3.3- DSL方式一"""DSL方式总结:withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源"""init_df.select(F.explode(F.split('value',' ')).alias('word')).groupBy('word').count().show()print("-" * 50)init_df.select(F.explode(F.split('value', ' ')).alias('word')).groupBy('word').count().withColumnRenamed('count','cnt').show()print("-" * 50)# 3.3- DSL方式二init_df.select(F.explode(F.split('value', ' ')).alias('word')).groupBy('word').agg(F.count('word').alias('cnt'),F.max('word').alias('max_word')).show()print("-" * 50)# 3.4- DSL方式三init_df.withColumn('word',F.explode(F.split('value', ' '))).groupBy('word').agg(F.count('word').alias('cnt')).show()run_time = time.time() - startprint("运行耗时:",run_time)time.sleep(1000)# 4- 数据输出# 5- 释放资源spark.stop()

数据写出操作

统一的输出语法:
在这里插入图片描述

对应的简写API格式如下,以CSV为例:
init_df.write.csv(path='存储路径',mode='模式',header=True,sep='\001',encoding='UTF-8'
)

输出到文件中 json csv orc text …

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("csv方式读取文件")# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions","1")\.appName('数据输出到文件系统')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.read.csv(path='file:///export/data/stu.txt',sep=' ',encoding='UTF-8',header="True",inferSchema=True)# 3- 数据处理result = init_df.where('age>=20')# 4- 数据输出result.show()result.printSchema()# 数据输出到文件系统:简单API"""常用参数说明:1- path:指定结果数据输出路径。支持本地文件系统和HDFS文件系统2- mode:当输出目录中文件已经存在的时候处理办法2.1- append:追加。如果文件已经存在,那么继续在该目录下产生新的文件2.2- overwrite:覆盖。如果文件已经存在,那么就先将已有的文件清除,再写入进去2.3- ignore:忽略。如果文件已经存在,那么不执行任何操作2.4- error:报错。如果文件已经存在,那么直接报错。会报错AnalysisException: path file:xxx already exists.3- sep:字段间的分隔符4- header:数据输出的时候,是否要将字段名称输出到文件的第一行。推荐设置为True5- encoding:文件输出的编码方式"""result.write.csv(path="file:///export/data/output/",mode='ignore',sep=',',header=True,encoding="UTF-8")# 数据输出到文件系统:复杂API"""设置mode,需要单独调用mode()方法"""result.write\.format('json')\.mode("overwrite")\.option("encoding","UTF-8")\.save('file:///export/data/output_json/')# 5- 释放资源spark.stop()
常用参数说明:1- path:指定结果数据输出路径。支持本地文件系统和HDFS文件系统2- mode:当输出目录中文件已经存在的时候处理办法2.1- append:追加。如果文件已经存在,那么继续在该目录下产生新的文件2.2- overwrite:覆盖。如果文件已经存在,那么就先将已有的文件清除,再写入进去2.3- ignore:忽略。如果文件已经存在,那么不执行任何操作2.4- error:报错。如果文件已经存在,那么直接报错。会报错AnalysisException: path 	file:xxx already exists.3- sep:字段间的分隔符4- header:数据输出的时候,是否要将字段名称输出到文件的第一行。推荐设置为True5- encoding:文件输出的编码方式

将结果数据基于JDBC方案, 输出到关系型数据库, 例如说: MySql

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("数据输出到数据库")# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions","1")\.appName('sparksql_database')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.read.csv(path='file:///export/data/stu.txt',sep=' ',encoding='UTF-8',header="True",inferSchema=True)# 3- 数据处理result = init_df.where('age>=20')# 4- 数据输出result.show()result.printSchema()# 数据输出到数据"""创建数据库命令:create database text character set utf8;"""result.write.jdbc(url='jdbc:mysql://node1:3306/text?useUnicode=true&characterEncoding=utf-8',table='student',mode='append',properties={ 'user' : 'root', 'password' : '123456' })# 5- 释放资源spark.stop()

运行结果截图:
在这里插入图片描述
可能出现的错误一:
在这里插入图片描述

原因:  缺少连接MySQL数据库的驱动数据库的驱动包, 一般都是一些Jar包如何放置【mysql-connector-java-5.1.41.jar】驱动包呢?  1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,目录位置: /export/server/spark/jars2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包目录位置: /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下hdfs的spark的jars目录下:  hdfs://node1:8020/spark/jars请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案:  spark-submit --jars  ....

可能出现的错误二:
在这里插入图片描述
在这里插入图片描述

原因:将中文输出到了数据表中
解决办法:
1- 数据库连接要加上:useUnicode=true&characterEncoding=utf-8
2- 创建数据库的时候需要指定编码character set utf8

常见DSL代码

分类格式含义示例
API/方法select查询字段select(‘id1’, ‘id2’)
where对数据过滤where(‘avg_score>3’)
groupBy对数据分组groupBy(‘userid’)
orderBy对数据排序orderBy(‘cnt’, ascending=False)
limit取前几条数据orderBy(‘cnt’, ascending=False).limit(1)
agg聚合操作,里面可以写多个聚合表达式agg(F.round(F.avg(‘score’), 2).alias(‘avg_score’))
show打印数据init_df.show()
printSchema打印数据的schema信息,也就是元数据信息init_df.printSchema()
alias对字段取别名F.count(‘movieid’).alias(‘cnt’)
join关联2个DataFrameetl_df.join(avg_score_dsl_df, ‘movieid’)
withColumn基于目前的数据产生一个新列init_df.withColumn(‘word’,F.explode(F.split(‘value’, ’ ')))
dropDuplicates删除重复数据init_df.dropDuplicates(subset=[“id”,“name”])
dropna删除缺失值init_df.dropna(thresh=2,subset=[“name”,“age”,“address”])
fillna替换缺失值init_df.fillna(value={“id”:111,“name”:“未知姓名”,“age”:100,“address”:“北京”})
first取DataFrame中的第一行数据
over创建一个窗口列
函数avg计算均值
count计数
col将字段包装成Column对象,一般用于对新列的包装
round保留小数位
desc降序排序
row_number行号。从1开始编号
窗口partitionBy对数据分区
orderBy对数据排序orderBy(F.desc(‘pv’))
1- 什么使用使用select实现聚合,什么时候使用groupBy+agg/select实现聚合?如果不需要对数据分组,那么可以直接使用select实现聚合;如果有分组操作,需要使用groupBy+agg/select,推荐使用agg2- first()总结如果某个DataFrame中只有一行数据,并且不使用join来对比数据,那么一般需要使用first()明确指定和第一行进行比较3- F.col()总结对于在计算过程中临时产生的字段,需要使用F.col()封装成Column对象
  • API/方法:是由DataFrame来调用
  • 函数:需要先通过import pyspark.sql.functions as F导入,使用F调用。Spark SQL内置提供的函数https://spark.apache.org/docs/3.1.2/api/sql/index.html
  • 窗口:需要先通过from pyspark.sql import Window导入

Spark SQL函数定义

窗口函数

在Spark SQL中使用窗口函数案例:
需求是找出每个cookie中pv排在前3位的数据,也就是分组取TOPN问题

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window as win# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config('spark.sql.shuffle.partitions',1)\.appName('sparksql_win_function')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.read.csv(path='file:///export/data/cookie.txt',schema='cookie string,datestr string,pv int',sep=',',encoding='UTF-8')init_df.createTempView('win_data')init_df.show()init_df.printSchema()# 3- 数据处理# SQLspark.sql("""select cookie,datestr,pvfrom (selectcookie,datestr,pv,row_number() over (partition by cookie order by pv desc) as rnfrom win_data) tmp where rn<=3""").show()# DSL"""select:注意点,结果中需要看到哪几个字段,就要明确写出来"""init_df.select("cookie","datestr","pv",F.row_number().over(win.partitionBy('cookie').orderBy(F.desc('pv'))).alias('rn')).where('rn<=3').select("cookie","datestr","pv").show()# 4- 数据输出# 5- 释放资源spark.stop()

运行结果截图:
在这里插入图片描述

SQL函数分类

SQL函数,主要分为以下三大类:

  • UDF函数:用户自定义函数
    • 特点:一对一,输入一个得到一个
    • 例如:split() substr()
  • UDAF函数:用户自定义聚合函数
    • 特点:多对一,输入多个得到一个
    • 例如:sum() avg() count() min()
  • UDTF函数:用户自定义表数据生成函数
    • 特点:一对多,输入一个得到多个
    • 例如:explode()

在SQL中提供的所有的内置函数,都是属于以上三类中某一类函数

有这么多的内置函数,为啥还需要自定义函数呢?为了扩充函数功能。在实际使用中,并不能保证所有的操作函数都已经提前的内置好了。很多基于业务处理的功能,其实并没有提供对应的函数,提供的函数更多是以公共功能函数。此时需要进行自定义,来扩充新的功能函数

在这里插入图片描述

1- SparkSQL原生的时候,Python只能开发UDF函数
2- SparkSQL借助其他第三方组件,Python可以开发UDF、UDAF函数

Spark SQL原生存在的问题:大量的序列化和反序列
在这里插入图片描述

虽然Python支持自定义UDF函数,但是其效率并不是特别的高效。因为在使用的时候,传递一行处理一行,返回一行的方式。这样会带来非常大的序列化的开销的问题,导致原生UDF函数效率不好早期解决方案: 基于Java/Scala来编写自定义UDF函数,然后基于python调用即可目前主要的解决方案: 引入Arrow框架,可以基于内存来完成数据传输工作,可以大大的降低了序列化的开销,提供传输的效率,解决原生的问题。同时还可以基于pandas的自定义函数,利用pandas的函数优势完成各种处理操作

Spark原生自定义UDF函数

自定义函数流程:

第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可第二步: 将Python函数注册到Spark SQL中注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)参数1: 【UDF函数名称】,此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范参数2: 【自定义的Python函数】,表示将哪个Python的函数注册为Spark SQL的函数参数3: 【UDF函数的返回值类型】。用于表示当前这个Python的函数返回的类型udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用说明: 如果通过方式一来注册函数, 【可以用在SQL和DSL】注册方式二:  udf对象 = F.udf(参数1,参数2)参数1: Python函数的名称,表示将那个Python的函数注册为Spark SQL的函数参数2: 返回值的类型。用于表示当前这个Python的函数返回的类型udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用说明: 如果通过方式二来注册函数,【仅能用在DSL中】注册方式三:  语法糖写法  @F.udf(returnType=返回值类型)  放置到对应Python的函数上面说明: 实际是方式二的扩展。如果通过方式三来注册函数,【仅能用在DSL中】第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可

Pandas的UDF函数

Apache Arrow框架基本介绍

Apache Arrow是Apache旗下的一款顶级的项目。是一个跨平台的在内存中以列式存储的数据层,它的设计目标就是作为一个跨平台的数据层,来加快大数据分析项目的运行效率

​ Pandas 与 Spark SQL 进行交互的时候,建立在Apache Arrow上,带来低开销 高性能的UDF函数

​ Arrow并不会自动使用,在某些情况下,需要配置 以及在代码中需要进行小的更改才可以使用

前提:服务器上已经安装pyspark
然后执行
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark[sql]

如何使用呢?  默认不会自动启动的, 一般建议手动配置sparkSession.conf.set('spark.sql.execution.arrow.pyspark.enabled',True)

在这里插入图片描述

基于Arrow完成Pandas DataFrame和Spark DataFrame互转

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("基于Arrow完成Pandas DataFrame和Spark DataFrame互转")# 1- 创建SparkSession对象spark = SparkSession.builder\.appName('dataframe')\.master('local[*]')\.getOrCreate()# 手动开启Arrow框架spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)# 2- 数据输入init_df = spark.createDataFrame(data=[(1, '张三', '广州'), (2, '李四', '深圳')],schema='id int,name string,address string')# 3- 数据处理# sparksql dataframe -> pandas dataframepd_df = init_df.toPandas()print(type(pd_df),pd_df)new_pd_df = pd_df[pd_df['id']==2]# pandas dataframe -> sparksql dataframespark_df = spark.createDataFrame(data=new_pd_df)spark_df.show()spark_df.printSchema()# 4- 数据输出# 5- 释放资源spark.stop()

使用场景:

1- Spark的DataFrame -> Pandas的DataFrame:当大数据处理到后期的时候,可能数据量会越来越少,这样可以考虑使用单机版的Pandas来做后续数据的分析

2- Pandas的DataFrame -> Spark的DataFrame:当数据量达到单机无法高效处理的时候,或者需要和其他大数据框架集成的时候,可以转成Spark中的DataFrame

总结:
Pandas的DataFrame -> Spark的DataFrame: spark.createDataFrame(data=pandas_df)
Spark的DataFrame -> Pandas的DataFrame: init_df.toPandas()

基于Pandas完成UDF函数

基于Pandas的UDF函数来转换为Spark SQL的UDF函数进行使用。底层是基于Arrow框架来完成数据传输,允许向量化(可以充分利用计算机CPU性能)操作。

Pandas的UDF函数其实本质上就是Python的函数,只不过函数的传入数据类型为Pandas的类型

基于Pandas的UDF可以使用自定义UDF函数和自定义UDAF函数

自定义函数流程:

第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可第二步: 将Python函数包装成Spark SQL的函数注册方式一: udf对象 = spark.udf.register(参数1, 参数2)参数1: UDF函数名称。此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范参数2: Python函数的名称。表示将哪个Python的函数注册为Spark SQL的函数使用: udf对象只能在DSL中使用。参数1指定的名称只能在SQL中使用注意: 如果编写的是UDAF函数,那么注册方式一需要配合注册方式三,一起使用注册方式二: udf对象 = F.pandas_udf(参数1, 参数2)参数1: 自定义的Python函数。表示将哪个Python的函数注册为Spark SQL的函数参数2: UDF函数的返回值类型。用于表示当前这个Python的函数返回的类型对应到Spark SQL的数据类型udf对象: 返回值对象,是一个UDF对象。仅能用在DSL中使用注册方式三: 语法糖写法  @F.pandas_udf(returnType=返回值Spark SQL的数据类型)  放置到对应Python的函数上面说明: 实际是方式一的扩展。仅能用在DSL中使用第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可
自定义UDF函数

自定义Python函数的要求:SeriesToSeries

  • 表示:第一步中创建自定义Python函数的时候,输入参数的类型和返回值类型必须都是Pandas中的Series类型
  • 需求:完成a列和b列的求和计算操作
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F# 绑定指定的Python解释器
from pyspark.sql.types import IntegerTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.appName('pandas_udf')\.master('local[*]')\.getOrCreate()# 手动开启Arrow框架spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)# 2- 数据输入init_df = spark.createDataFrame(data=[(1,2),(2,3),(3,4)],schema='num1 int,num2 int')init_df.createTempView('tmp')# 3- 数据处理# 3.1- 自定义Python函数"""1- num1:pd.Series用来限定输入的参数类型是Pandas中的Series对象2-  -> pd.Series用来限定返回值类型是Pandas中的Series对象"""def my_sum(num1:pd.Series, num2:pd.Series) -> pd.Series:return num1+num2# 3.2- 注册进SparkSQL。注册方式一dsl_my_sum = spark.udf.register('sql_my_sum',my_sum)# 3.3- 使用# SQLspark.sql("""selectnum1,num2,sql_my_sum(num1,num2) as resultfrom tmp""").show()# DSLinit_df.select("num1","num2",dsl_my_sum("num1", "num2").alias("result")).show()# 注册方式二dsl2_my_sum = F.pandas_udf(my_sum,IntegerType())# DSLinit_df.select("num1","num2",dsl2_my_sum("num1", "num2").alias("result")).show()# 注册方式三@F.pandas_udf(IntegerType())def my_sum_candy(num1:pd.Series, num2:pd.Series) -> pd.Series:return num1+num2# DSLinit_df.select("num1","num2",my_sum_candy("num1", "num2").alias("result")).show()# 4- 数据输出# 5- 释放资源spark.stop()

运行结果截图:
在这里插入图片描述

自定义UDAF函数

自定义Python函数的要求:Series To 标量

  • 表示:自定义函数的输入数据类型是Pandas中的Series对象,返回值数据类型是标量数据类型。也就是Python中的数据类型,例如:int、float、bool、list…
  • 需求:对某一列数据计算平均值的操作
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F# 绑定指定的Python解释器
from pyspark.sql.types import IntegerType, FloatTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.appName('pandas_udaf')\.master('local[*]')\.getOrCreate()# 手动开启Arrow框架spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)# 2- 数据输入init_df = spark.createDataFrame(data=[(1,2),(2,3),(3,3)],schema='num1 int,num2 int')init_df.createTempView('tmp')# 3- 数据处理# 3.1- 自定义Python函数"""UDAF对自定义Python函数的要求:输入数据的类型必须是Pandas中的Series对象,返回值类型必须是Python中的标量数据类型"""@F.pandas_udf(returnType=FloatType())def my_avg(num2_col:pd.Series) -> float:print(type(num2_col))print(num2_col)# 计算平均值return num2_col.mean()# 3.2- 注册进SparkSQL。注册方式一dsl_my_avg = spark.udf.register('sql_my_avg',my_avg)# 3.3- 使用# SQLspark.sql("""selectsql_my_avg(num2) as resultfrom tmp""").show()# DSLinit_df.select(dsl_my_avg("num2").alias("result")).show()# 4- 数据输出# 5- 释放资源spark.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/613866.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

渗透线应用-取料呼叫FC(SCL源代码)

渗透线应用相关文章可以参考下面文章链接: https://rxxw-control.blog.csdn.net/article/details/135526725https://rxxw-control.blog.csdn.net/article/details/135526725渗透线小车控制 https://rxxw-control.blog.csdn.net/article/details/133611151

【算法】基础算法001之双指针

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》《C》《Linux》《算法》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 目录 前言 1.数组分块&#xf…

【JaveWeb教程】(20) MySQL数据库开发之 基本查询、条件查询、聚合函数、分组查询、排序查询、分页查询 详细代码示例讲解

目录 1. 数据库操作-DQL1.1 介绍1.2 语法1.3 基本查询1.4 条件查询1.5 聚合函数1.6 分组查询1.7 排序查询1.8 分页查询1.9 案例1.9.1 案例一1.9.2 案例二 在上次学习的内容中&#xff0c;我们讲解了&#xff1a; 使用DDL语句来操作数据库以及表结构&#xff08;数据库设计&…

C++学习笔记(三十二):c++ 堆内存与栈内存比较

本节对堆和栈内存进行描述。 应用程序启动后&#xff0c;操作系统将整个程序加载到内存&#xff0c;分配相应的物理ram&#xff0c;确保程序可以正常运行。堆和栈是ram中存在的两个区域。栈通常是一个预定义大小的内存区域&#xff0c;一般是2M字节左右。堆也是预定了默认值的…

12、JVM高频面试题

1、JVM的主要组成部分有哪些 JVM主要分为下面几部分 类加载器&#xff1a;负责将字节码文件加载到内存中 运行时数据区&#xff1a;用于保存java程序运行过程中需要用到的数据和相关信息 执行引擎&#xff1a;字节码文件并不能直接交给底层操作系统去执行&#xff0c;因此需要…

NumPy 数据操作实用指南:从基础到高效(下)

文章接上篇&#xff1a; In [53]: from PIL import Image In [60]: dog Image.open(./dog.jpg) dog . . . In [61]: dog_datanp.array(dog) # 图片数据是ndarray # 彩色照片三维&#xff1a;高度&#xff0c;宽度&#xff0c;像素&#xff08;表示不同颜色&#xff09;&…

C语言操作符与表达式详解

目录 操作符的分类&#xff1a; &#xff08;1&#xff09;算数操作符 &#xff08;2&#xff09;移位操作符 &#xff08;3&#xff09;位操作符 &#xff08;4&#xff09;赋值操作符 &#xff08;5&#xff09;单目操作符 &#xff08;6&#xff09;关系操作符 &…

CSS 选择器全攻略:从入门到精通(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

pytorch11:模型加载与保存、finetune迁移训练

目录 一、模型加载与保存1.1 序列化与反序列化概念1.2 pytorch中的序列化与反序列化1.3 模型保存的两种方法1.4 模型加载两种方法 二、断点训练2.1 断点保存代码2.2 断点恢复代码 三、finetune3.1 迁移学习3.2 模型的迁移学习3.2 模型微调步骤3.2.1 模型微调步骤3.2.2 模型微调…

Asp .Net Core 系列: 集成 CORS跨域配置

文章目录 什么是CORS?Asp .Net Core 种如何配置CORS?CorsPolicyBuilder类详解注册以及使用策略三种方式EnableCors 和 DisableCors 特性关于带证书与不带证书代码的实现跨源&#xff08;cross-origin&#xff09;不带请求证书(Credentials)跨源&#xff08;cross-origin&…

c++析构函数

析构函数的简述 1. 析构函数和构造函数类似&#xff0c;是c规定当对象的生命周期结束时&#xff0c;默认你会调用析构函数。 2. 同理&#xff0c;当我们不写析构函数的时候&#xff0c;编译器会自动生成一个空实现的析构函数。 3. 析构函数只能编译器自己调用&#xff0c;我们…

CSS 选择器全攻略:从入门到精通(上)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

JavaScript从入门到精通系列第三十一篇:详解JavaScript中的字符串和正则表达式相关的方法

文章目录 知识回顾 1&#xff1a;概念回顾 2&#xff1a;正则表达式字面量 一&#xff1a;字符串中正则表达式方法 1&#xff1a;split 2&#xff1a;search 3&#xff1a;match 4&#xff1a;replace 知识回顾 1&#xff1a;概念回顾 正则表达式用于定义一些字符串的…

代码随想录算法训练营第二天|977 有序数组的平方、209长度最小的子数组、59 螺旋矩阵||

977 有序数组的平方 题目链接&#xff1a;有序数组的平方 思路 暴力解法 很容易想到的就是按照题目的说明&#xff0c;先给非递减数组中的每个元素做平方&#xff0c;然后使用一个排序函数对齐进行排序即可。 class Solution { public:vector<int> sortedSquares(ve…

IntelliJ IDEA Java 连接 mysql 配置(附完整 demo)

下载 MySQL 驱动 从MySQL官网下载JDBC驱动的步骤如下&#xff1a; 1&#xff09;访问MySQL的官方网站&#xff1a;MySQL 2&#xff09;点击页面上方的"DOWNLOADS"菜单&#xff1b; 3&#xff09;在下载页面&#xff0c;找到"MySQL Community (GPL) Downloads…

C++内存管理机制(侯捷)笔记2

C内存管理机制&#xff08;侯捷&#xff09; 本文是学习笔记&#xff0c;仅供个人学习使用。如有侵权&#xff0c;请联系删除。 参考链接 Youtube: 侯捷-C内存管理机制 Github课程视频、PPT和源代码: https://github.com/ZachL1/Bilibili-plus 下面是第二讲allocator具体实…

11 双向链表

单链表的局限&#xff1a; 单链表的缺点&#xff1a;逆序访问单链表中的元素耗时大。&#xff08;时间复杂度&#xff1a;O&#xff09; 双向链表的定义 第0个节点【a1】的pre指针为NULL&#xff0c;要注意 插入操作&#xff1a; 删除操作&#xff1a; 初步实现双链表 代码&…

【Vue系列】Vue3快速构建项目,以及在已有代码情况首次打开如何初始化依赖项

欢迎来到《小5讲堂》 大家好&#xff0c;我是全栈小5。 这是是《前端》序列文章&#xff0c;每篇文章将以博主理解的角度展开讲解&#xff0c; 特别是针对知识点的概念进行叙说&#xff0c;大部分文章将会对这些概念进行实际例子验证&#xff0c;以此达到加深对知识点的理解和掌…

一天一个设计模式---适配器模式

概念 适配器模式是一种结构型设计模式&#xff0c;用于将一个类的接口转换成客户端所期望的另一个接口。它允许不兼容的接口之间进行协同工作&#xff0c;使得原本由于接口不匹配而无法合作的类能够一起工作。 具体内容 适配器模式主要包括以下几个要素&#xff1a; 目标接…

VBA中类的解读及应用第八讲:实现定时器功能的自定义类事件

《VBA中类的解读及应用》教程【10165646】是我推出的第五套教程&#xff0c;目前已经是第一版修订了。这套教程定位于最高级&#xff0c;是学完初级&#xff0c;中级后的教程。 类&#xff0c;是非常抽象的&#xff0c;更具研究的价值。随着我们学习、应用VBA的深入&#xff0…