1、数据源介绍
数据内容
字段说明
2、分析需求
-
数据清洗需求
清洗需求:1) 将客户id(CustomerID)不为0的数据保留下来: CustomerID != 02) 将商品描述(Description)不为空的数据保留下来: Description !=''3) 将日期(InvoiceDate)格式转换为yyyy-MM-dd HH:mm 原有格式: 12/1/2010 8:26转换为: 2010-12-01 08:26 需求分析: 原有日期时间字符串格式 -> 时间戳 -> 新的日期时间字符串格式原有日期时间字符串格式 -> 时间戳:unix_timestamp时间戳 -> 新的日期时间字符串格式:from_unixtime格式化解释链接: https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
-
指标统计需求
需求一: 统计各个国家有多少的客户量需求二: 效率最高的10个国家需求三: 统计各个国家的总销售额分布情况需求四: 销售最高10个商品需求五: 商品描述的热门关键词TOP300需求六: 统计退货订单数最多的10个国家需求七: 商品的平均单价与销量的关系需求八: 月销售额随时间的变化趋势需求九: 日销售随时间的变化趋势需求十: 各国的购买订单量和退货订单量的关系
3、数据清洗
核心函数:
withColumn(参数1,参数2): 用来产生新列,或者用来覆盖旧列
参数1:(新)列的名称。如果列的名称与已经存在的字段名称相同,那么会覆盖原有的字段
参数1:列数据的来源
unix_timestamp(参数1,参数2): 日期时间字符串格式 -> 时间戳
参数1:日期时间
参数2:日期格式 例如: M/d/yyyy H:mm
from_unixtime(参数1,参数2): 时间戳 -> 日期时间字符串格式
参数1:时间戳
参数2:日期格式 例如: yyyy-MM-dd HH:mm
参考代码:
# 导包import osfrom pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器os.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数if __name__ == '__main__':# 1.创建SparkSession对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.数据输入init_df = spark.read.csv(path='file:///export/data/spark_project/anli/xls/E_Commerce_Data.csv',sep=',',header=True,inferSchema=True,encoding='utf8')# 验证数据类型和数据内容以及数据条数init_df.printSchema()init_df.show(5)print(f"清洗转换之前数据量: {init_df.count()}")# 3.数据处理 ETL: 抽取 清洗+转换 加载# 1) 将客户id(CustomerID)不为0的数据保留下来: CustomerID != 0# 2) 将商品描述(Description)不为空的数据保留下来: Description !=''clear_df = init_df.where("CustomerID != 0 and Description !='' ")# 3) 将日期(InvoiceDate)格式转换为yyyy-MM-dd HH:mm# 伪sql: select from_unixtime(unix_timestamp('12/1/2010 8:26','M/d/yyyy H:mm'),'yyyy-MM-dd HH:mm');etl_df = clear_df.withColumn('InvoiceDate',F.from_unixtime(F.unix_timestamp('InvoiceDate','M/d/yyyy H:mm'),'yyyy-MM-dd HH:mm'))# 验证记录数和数据内容print(f"清洗转换之后数据量: {etl_df.count()}")etl_df.show(5)# 4.数据输出etl_df.write.csv(path='hdfs://node1:8020/xls_output',sep='\001',header=True,encoding='utf8')print('数据已经成功保存到hdfs的xls_output目录中')# 5.关闭资源spark.stop()
4、功能实现
准备工作
-
读取清洗后的数据
# 导包import osfrom pyspark.sql import SparkSession# 绑定指定的python解释器os.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 需求一: 统计各个国家有多少的客户量def xu_qiu1():pass# 需求五: 商品描述的热门关键词TOP300def xu_qiu5():pass# 需求九: 日销售随时间的变化趋势def xu_qiu9():pass# 需求十: 各国的购买订单量和退货订单量的关系def xu_qiu10():pass# 创建main函数if __name__ == '__main__':# 1.创建SparkSession对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.数据输入# 注意: 因为上一步ETL结果存储到了hdfs中,所以本次直接从hdfs读取已经etl后的干净数据init_df = spark.read.csv(path='hdfs://node1:8020/xls_output',sep='\001',header=True,inferSchema=True,encoding='utf8')# 3.验证数据# 验证数据类型和数据内容以及数据条数init_df.printSchema()init_df.show(5)print(f"用于数据分析的数据量: {init_df.count()}")# 4.数据分析,10个需求,主要完成其中的1,5,9,10# 可以采用sql和dsl两种方式去完成# 为了方便统一在sql中的表名,可以提前创建一个视图init_df.createTempView('xls')# 需求多,建议大家采样函数思想,分别把对应功能代码都封装成函数# 需求一: 统计各个国家有多少的客户量xu_qiu1()# 需求五: 商品描述的热门关键词TOP300xu_qiu5()# 需求九: 日销售随时间的变化趋势xu_qiu9()# 需求十: 各国的购买订单量和退货订单量的关系xu_qiu10()# 5.关闭资源spark.stop()
功能实现
如果运行sparksql,报count_distinct找不到,那么是因为pyspark版本原因导致。解决办法如下:
1- 检查自己3台机器的pyspark版本是否是3.1.2版本
pip list | grep pyspark
2-如果不是3.1.2版本,那么先卸载pyspark
命令: pip uninstall pyspark
3- 重新安装3.1.2版本pyspark
命令: pip install -i Simple Index pyspark==3.1.2
-
需求一:统计各个国家有多少的客户量
-
大白话:统计每个国家有多少个客户
-
# 需求一:统计各个国家有多少的客户量def xuqiu_1():# SQL方式spark.sql("""selectCountry,count(distinct CustomerID) as cntfrom xlsgroup by Country""").show()# DSL方式init_df.groupBy('Country').agg(F.countDistinct('CustomerID').alias('cnt')).show()
-
需求二:销量最高的10个国家
-
大白话:统计每个国家的销售的数量,取出前10个
-
-
需求三:统计各个国家的总销售额分布情况
-
大白话:统计每个国家的销售额(购买数量 * 单价),不需要过滤掉退货订单
-
-
需求四:销售最高10个商品
-
大白话:统计每个商品的销售的数量,取出前10个商品
-
-
需求五:商品描述的热门关键词TOP300
-
大白话:统计每个关键词出现了多少次,取出前300个关键词
-
# 需求五:商品描述的热门关键词TOP300def xuqiu_5():# SQLspark.sql("""selectword,count(1) as cntfrom xlslateral view explode(split(Description,' ')) t as wordgroup by wordorder by cnt desclimit 300""").show()# DSLinit_df.withColumn('word', F.explode(F.split('Description', ' '))).groupBy('word').agg(F.count('word').alias('cnt')).orderBy('cnt', ascending=False).limit(300).show()
-
需求六:统计退货订单数最多的10个国家
-
大白话:统计每个国家的退货的订单数量,取出前10个国家
-
selectCountry,count(distinct InvoiceNo) as cntfrom xlswhere InvoiceNo like 'C%' -- 过滤出退货订单group by Countryorder by cnt desclimit 10
-
需求七:商品的平均单价与销量的关系
-
大白话:统计每个商品的平均单价以及销售数量
-
-
需求八:月销售额随时间的变化趋势
-
大白话:统计每个月的销售额(购买数量 * 单价),不需要过滤掉退货订单
-
-
需求九:日销售随时间的变化趋势
-
大白话:统计每天的销售数量和销售额,不需要过滤掉退货订单
-
# 需求九:日销售随时间的变化趋势def xuqiu_9():# SQLspark.sql("""selectsubstr(InvoiceDate,1,10) as day,sum(Quantity) as total_num, -- 总销售数量sum(Quantity * UnitPrice) as total_money -- 总销售金额from xlsgroup by dayorder by day""").show(10)print("-" * 30)# DSL"""F.expr(参数):执行参数的表达式,得到Column对象"""init_df.withColumn('day', F.substring("InvoiceDate", 1, 10)).groupBy('day').agg(F.sum("Quantity").alias("total_num"),# 错误写法:F.sum("Quantity * UnitPrice").alias("total_money")# 下面四个是正确的写法,任意选择一个F.sum(init_df["Quantity"] * init_df["UnitPrice"]).alias("total_money"),F.sum(init_df.Quantity * init_df.UnitPrice).alias("total_money"),F.sum(F.col("Quantity") * F.col("UnitPrice")).alias("total_money"),F.sum(F.expr("Quantity * UnitPrice")).alias("total_money")).orderBy('day').show(10)
-
需求十:各国的总购买订单量和退货订单量的关系
-
大白话:统计每个国家的购买订单量(总的订单数量,包含退货订单) 以及退货订单量
-
# 需求十:各国的总购买订单量和退货订单量的关系def xuqiu_10():# SQL方式spark.sql("""selectCountry,count(distinct InvoiceNo) as total_num, -- 总购买订单量count(distinct if(InvoiceNo like 'C%',InvoiceNo,null)) as return_num -- 退货订单量from xlsgroup by Country""").show()# DSL方式init_df.groupBy('Country').agg(F.countDistinct("InvoiceNo").alias("total_num"),F.countDistinct(F.expr("if(InvoiceNo like 'C%',InvoiceNo,null)")).alias("return_num")).show()
可能遇到的错误:
原因: sum无法识别字段相乘的字符串内容。会当成一个字段名处理
解决办法: 转变成Column对象。下面四个是正确的写法,任意选择一个
1- F.sum(init_df["Quantity"] * init_df["UnitPrice"]).alias("total_money")
2- F.sum(init_df.Quantity * init_df.UnitPrice).alias("total_money")
3- F.sum(F.col("Quantity") * F.col("UnitPrice")).alias("total_money")
4- F.sum(F.expr("Quantity * UnitPrice")).alias("total_money")