摸鱼大数据——Spark Structured Steaming——新零售数据分析案例

1、数据源介绍

数据内容

字段说明

2、分析需求

  • 数据清洗需求

 清洗需求:1) 将客户id(CustomerID)不为0的数据保留下来: CustomerID != 02) 将商品描述(Description)不为空的数据保留下来: Description !=''3) 将日期(InvoiceDate)格式转换为yyyy-MM-dd HH:mm 原有格式: 12/1/2010 8:26转换为: 2010-12-01 08:26 需求分析: 原有日期时间字符串格式 -> 时间戳 -> 新的日期时间字符串格式原有日期时间字符串格式 -> 时间戳:unix_timestamp时间戳 -> 新的日期时间字符串格式:from_unixtime格式化解释链接: https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
  • 指标统计需求

 需求一: 统计各个国家有多少的客户量需求二: 效率最高的10个国家需求三: 统计各个国家的总销售额分布情况需求四: 销售最高10个商品需求五: 商品描述的热门关键词TOP300需求六: 统计退货订单数最多的10个国家需求七: 商品的平均单价与销量的关系需求八: 月销售额随时间的变化趋势需求九: 日销售随时间的变化趋势需求十: 各国的购买订单量和退货订单量的关系

3、数据清洗

核心函数:       
       withColumn(参数1,参数2): 用来产生新列,或者用来覆盖旧列
            参数1:(新)列的名称。如果列的名称与已经存在的字段名称相同,那么会覆盖原有的字段
             参数1:列数据的来源
       unix_timestamp(参数1,参数2): 日期时间字符串格式 -> 时间戳
                参数1:日期时间
                参数2:日期格式  例如:  M/d/yyyy H:mm
       from_unixtime(参数1,参数2): 时间戳 -> 日期时间字符串格式
             参数1:时间戳
             参数2:日期格式  例如:  yyyy-MM-dd HH:mm

参考代码:

 # 导包import osfrom pyspark.sql import SparkSession,functions as F​# 绑定指定的python解释器os.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'​# 创建main函数if __name__ == '__main__':# 1.创建SparkSession对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()​# 2.数据输入init_df = spark.read.csv(path='file:///export/data/spark_project/anli/xls/E_Commerce_Data.csv',sep=',',header=True,inferSchema=True,encoding='utf8')# 验证数据类型和数据内容以及数据条数init_df.printSchema()init_df.show(5)print(f"清洗转换之前数据量: {init_df.count()}")# 3.数据处理  ETL: 抽取 清洗+转换 加载# 1) 将客户id(CustomerID)不为0的数据保留下来: CustomerID != 0# 2) 将商品描述(Description)不为空的数据保留下来: Description !=''clear_df = init_df.where("CustomerID != 0 and Description !='' ")# 3) 将日期(InvoiceDate)格式转换为yyyy-MM-dd HH:mm# 伪sql:  select from_unixtime(unix_timestamp('12/1/2010 8:26','M/d/yyyy H:mm'),'yyyy-MM-dd HH:mm');etl_df = clear_df.withColumn('InvoiceDate',F.from_unixtime(F.unix_timestamp('InvoiceDate','M/d/yyyy H:mm'),'yyyy-MM-dd HH:mm'))# 验证记录数和数据内容print(f"清洗转换之后数据量: {etl_df.count()}")etl_df.show(5)# 4.数据输出etl_df.write.csv(path='hdfs://node1:8020/xls_output',sep='\001',header=True,encoding='utf8')print('数据已经成功保存到hdfs的xls_output目录中')# 5.关闭资源spark.stop()

4、功能实现

准备工作
  • 读取清洗后的数据

 # 导包import osfrom pyspark.sql import SparkSession​# 绑定指定的python解释器os.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'​# 需求一: 统计各个国家有多少的客户量def xu_qiu1():pass​# 需求五: 商品描述的热门关键词TOP300def xu_qiu5():pass​# 需求九: 日销售随时间的变化趋势def xu_qiu9():pass​# 需求十: 各国的购买订单量和退货订单量的关系def xu_qiu10():pass​# 创建main函数if __name__ == '__main__':# 1.创建SparkSession对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()​# 2.数据输入# 注意: 因为上一步ETL结果存储到了hdfs中,所以本次直接从hdfs读取已经etl后的干净数据init_df = spark.read.csv(path='hdfs://node1:8020/xls_output',sep='\001',header=True,inferSchema=True,encoding='utf8')​# 3.验证数据# 验证数据类型和数据内容以及数据条数init_df.printSchema()init_df.show(5)print(f"用于数据分析的数据量: {init_df.count()}")​# 4.数据分析,10个需求,主要完成其中的1,5,9,10# 可以采用sql和dsl两种方式去完成# 为了方便统一在sql中的表名,可以提前创建一个视图init_df.createTempView('xls')​# 需求多,建议大家采样函数思想,分别把对应功能代码都封装成函数# 需求一: 统计各个国家有多少的客户量xu_qiu1()# 需求五: 商品描述的热门关键词TOP300xu_qiu5()# 需求九: 日销售随时间的变化趋势xu_qiu9()# 需求十: 各国的购买订单量和退货订单量的关系xu_qiu10()​# 5.关闭资源spark.stop()​
功能实现

如果运行sparksql,报count_distinct找不到,那么是因为pyspark版本原因导致。解决办法如下:

1- 检查自己3台机器的pyspark版本是否是3.1.2版本

pip list | grep pyspark

2-如果不是3.1.2版本,那么先卸载pyspark

命令: pip uninstall pyspark

3- 重新安装3.1.2版本pyspark

命令: pip install -i Simple Index pyspark==3.1.2

  • 需求一:统计各个国家有多少的客户量

    • 大白话:统计每个国家有多少个客户

 # 需求一:统计各个国家有多少的客户量def xuqiu_1():# SQL方式spark.sql("""selectCountry,count(distinct CustomerID) as cntfrom xlsgroup by Country""").show()# DSL方式init_df.groupBy('Country').agg(F.countDistinct('CustomerID').alias('cnt')).show()
  • 需求二:销量最高的10个国家

    • 大白话:统计每个国家的销售的数量,取出前10个

  • 需求三:统计各个国家的总销售额分布情况

    • 大白话:统计每个国家的销售额(购买数量 * 单价),不需要过滤掉退货订单

  • 需求四:销售最高10个商品

    • 大白话:统计每个商品的销售的数量,取出前10个商品

  • 需求五:商品描述的热门关键词TOP300

    • 大白话:统计每个关键词出现了多少次,取出前300个关键词

 # 需求五:商品描述的热门关键词TOP300def xuqiu_5():# SQLspark.sql("""selectword,count(1) as cntfrom xlslateral view explode(split(Description,' ')) t as wordgroup by wordorder by cnt desclimit 300""").show()# DSLinit_df.withColumn('word', F.explode(F.split('Description', ' '))).groupBy('word').agg(F.count('word').alias('cnt')).orderBy('cnt', ascending=False).limit(300).show()

  • 需求六:统计退货订单数最多的10个国家

    • 大白话:统计每个国家的退货的订单数量,取出前10个国家

 selectCountry,count(distinct InvoiceNo) as cntfrom xlswhere InvoiceNo like 'C%' -- 过滤出退货订单group by Countryorder by cnt desclimit 10
  • 需求七:商品的平均单价与销量的关系

    • 大白话:统计每个商品的平均单价以及销售数量

  • 需求八:月销售额随时间的变化趋势

    • 大白话:统计每个月的销售额(购买数量 * 单价),不需要过滤掉退货订单

  • 需求九:日销售随时间的变化趋势

    • 大白话:统计每天的销售数量和销售额,不需要过滤掉退货订单

 # 需求九:日销售随时间的变化趋势def xuqiu_9():# SQLspark.sql("""selectsubstr(InvoiceDate,1,10) as day,sum(Quantity) as total_num, -- 总销售数量sum(Quantity * UnitPrice) as total_money -- 总销售金额from xlsgroup by dayorder by day""").show(10)print("-" * 30)# DSL"""F.expr(参数):执行参数的表达式,得到Column对象"""init_df.withColumn('day', F.substring("InvoiceDate", 1, 10)).groupBy('day').agg(F.sum("Quantity").alias("total_num"),# 错误写法:F.sum("Quantity * UnitPrice").alias("total_money")​# 下面四个是正确的写法,任意选择一个F.sum(init_df["Quantity"] * init_df["UnitPrice"]).alias("total_money"),F.sum(init_df.Quantity * init_df.UnitPrice).alias("total_money"),F.sum(F.col("Quantity") * F.col("UnitPrice")).alias("total_money"),F.sum(F.expr("Quantity * UnitPrice")).alias("total_money")).orderBy('day').show(10)
  • 需求十:各国的总购买订单量和退货订单量的关系

    • 大白话:统计每个国家的购买订单量(总的订单数量,包含退货订单) 以及退货订单量

 # 需求十:各国的总购买订单量和退货订单量的关系def xuqiu_10():# SQL方式spark.sql("""selectCountry,count(distinct InvoiceNo) as total_num, -- 总购买订单量count(distinct if(InvoiceNo like 'C%',InvoiceNo,null)) as return_num -- 退货订单量from xlsgroup by Country""").show()# DSL方式init_df.groupBy('Country').agg(F.countDistinct("InvoiceNo").alias("total_num"),F.countDistinct(F.expr("if(InvoiceNo like 'C%',InvoiceNo,null)")).alias("return_num")).show()

可能遇到的错误:

原因: sum无法识别字段相乘的字符串内容。会当成一个字段名处理
解决办法: 转变成Column对象。下面四个是正确的写法,任意选择一个
    1- F.sum(init_df["Quantity"] * init_df["UnitPrice"]).alias("total_money")
    2- F.sum(init_df.Quantity * init_df.UnitPrice).alias("total_money")
    3- F.sum(F.col("Quantity") * F.col("UnitPrice")).alias("total_money")
    4- F.sum(F.expr("Quantity * UnitPrice")).alias("total_money")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/48422.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

gds-linkqueue:泛型链式队列

类似于C的queue的泛型容器,初始化、销毁、清空、入队、出队、取队首/尾、队空。 ​​​​​​​ ​​​​​​​

对某次应急响应中webshell的分析

文章前言 在之前处理一起应急事件时发现攻击者在WEB应用目录下上传了webshell,但是webshell似乎使用了某种加密混淆手法,无法直观的看到其中的木马连接密码,而客户非要让我们连接webshell来证实此文件为后门文件且可执行和利用(也是很恼火&a…

fMATLAB中fill函数填充不同区域

只需获取填充区域的边缘信息,函数边缘越详细越好,然后调用fill函数。 fill函数能够根据指定的顶点坐标和填充颜色来绘制多边形或曲线形状,并在其内部填充指定的颜色。这使得在MATLAB中创建具有视觉吸引力的图形变得简单而高效。 fill函数的…

《0基础》学习Python——第二十讲__网路爬虫/<3>

一、用post请求爬取网页 同样与上一节课的get强求的内容差不多,即将requests.get(url,headershead)代码更换成requests.post(url,headershead),其余的即打印获取的内容,如果content-typejson类型的,打印上述代码的请求,则用一个命…

笔记:现代卷积神经网络之VGG

本文为李沐老师《动手学深度学习》笔记小结,用于个人复习并记录学习历程,适用于初学者 神经网络架构设计的模块化 然AlexNet证明深层神经网络卓有成效,但它没有提供一个通用的模板来指导后续的研究人员设计新的网络。 在下面的几个章节中&a…

【Vue】`v-if` 指令详解:条件渲染的高效实现

文章目录 一、v-if 指令概述二、v-if 的基本用法1. 基本用法2. 使用 v-else3. 使用 v-else-if 三、v-if 指令的高级用法1. 与 v-for 一起使用2. v-if 的性能优化 四、v-if 的常见应用场景1. 表单验证2. 弹窗控制 五、v-if 指令的注意事项 Vue.js 是一个用于构建用户界面的渐进式…

Flink调优详解:案例解析(第42天)

系列文章目录 一、Flink-任务参数配置 二、Flink-SQL调优 三、阿里云Flink调优 文章目录 系列文章目录前言一、Flink-任务参数配置1.1 运行时参数1.2 优化器参数1.3 表参数 二、Flink-SQL调优2.1 mini-batch聚合2.2 两阶段聚合2.3 分桶2.4 filter去重(了解&#xf…

【中项】系统集成项目管理工程师-第3章 信息技术服务-3.4服务标准化

前言:系统集成项目管理工程师专业,现分享一些教材知识点。觉得文章还不错的喜欢点赞收藏的同时帮忙点点关注。 软考同样是国家人社部和工信部组织的国家级考试,全称为“全国计算机与软件专业技术资格(水平)考试”&…

持续集成02--Linux环境更新/安装Java新版本

前言 在持续集成/持续部署(CI/CD)的旅程中,确保开发环境的一致性至关重要。本篇“持续集成02--Linux环境更新/安装Java新版本”将聚焦于如何在Linux环境下高效地更新或安装Java新版本。Java作为广泛应用的编程语言,其版本的更新对…

XLua原理(一)

项目中活动都是用xlua开发的,项目周更热修也是用xlua的hotfix特性来做的。现研究底层原理,对于项目性能有个更好的把控。 本文认为看到该文章的人已具备使用xlua开发的能力,只研究介绍下xlua的底层实现原理。 一.lua和c#交互原理 概括&…

用程序画出三角形图案

创建各类三角形图案 直角三角形&#xff08;左下角&#xff09; #include <iostream> using namespace std;int main() {int rows;cout << "输入行数: ";cin >> rows;for(int i 1; i < rows; i){for(int j 1; j < i; j){cout << &…

003uboot目录分析和两个阶段

我们都知道s3c2440是一个soc&#xff0c;内含cpu和各种控制器、片内的RAM&#xff0c;他的CPU是arm920t。 我们先来分析一下uboot原码的各个目录 1.uboot目录分析 board&#xff1a;board里存放的是支持各个开发板的文件&#xff0c;包括链接脚本 common: common目录中存放的…

graham 算法计算平面投影点集的凸包

文章目录 向量的内积&#xff08;点乘&#xff09;、外积&#xff08;叉乘&#xff09;确定旋转方向numpy 的 cross 和 outernp.inner 向量与矩阵计算示例np.outer 向量与矩阵计算示例 python 示例生成样例散点数据图显示按极角排序的结果根据排序点计算向量转向并连成凸包 基本…

set、map、multiset、multimap容器介绍和常用接口使用

文章目录 前言一、set容器二、multiset三、map四、multimap 前言 1、set、map、 multiset、 multimap都是基于红黑树实现的容器。 2、set、multiset都使用头文件#include<set>,map、multimap都是使用头文件#include<map> 一、set容器 1、set容器的介绍 C标准库中的…

pytest常用命令行参数解析

简介&#xff1a;pytest作为一个成熟的测试框架&#xff0c;它提供了许多命令行参数来控制测试的运行方式&#xff0c;以配合适用于不同的测试场景。例如 -x 可以用于希望出现错误就停止&#xff0c;以便定位和分析问题。–rerunsnum适用于希望进行失败重跑等个性化测试策略。 …

【BUG】已解决:AttributeError: ‘str‘ object has no attribute ‘get‘

已解决&#xff1a;AttributeError: ‘str‘ object has no attribute ‘get‘ 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&#xff0c;我是博主英杰&#xff0c;211科班出身&#xff0c;就职于医疗科技公司&#xff0c;热衷分享知识&#xff0c…

C++初学者指南-5.标准库(第一部分)--标准库查找算法

C初学者指南-5.标准库(第一部分)–标准库查找算法 文章目录 C初学者指南-5.标准库(第一部分)--标准库查找算法查找/定位一个元素findfind_iffind_if_notfind_last / find_last_if / find_last_if_notfind_first_of 查找范围内的子范围 search find_endstarts_withends_with 找到…

SpringBoot3 + Vue3 学习 Day 2

登入接口 和 获取用户详细信息的开发 学习视频登入接口的开发1、登入主逻辑2、登入认证jwt 介绍生成 JWT① 导入依赖② 编写代码③ 验证JWT 登入认证接口的实现① 导入 工具类② controller 类实现③ 存在的问题及优化① 编写拦截器② 注册拦截器③ 其他接口直接提供服务 获取用…

Web3D:WebGL为什么在渲染性能上输给了WebGPU。

WebGL已经成为了web3D的标配&#xff0c;市面上有N多基于webGL的3D引擎&#xff0c;WebGPU作为挑战者&#xff0c;在渲染性能上确实改过webGL一头&#xff0c;由于起步较晚&#xff0c;想通过这个优势加持&#xff0c;赶上并超越webGL仍需时日。 贝格前端工场为大家分享一下这…

Webstorm-恢复默认UI布局

背景 在使用Webstorm的时候,有时候进行个性化设置,如字体、界面布局等. 但是设置后的效果不理想,想要重新设置回原来的模样,却找不到设置项. 这里提供一种解决方案,恢复默认设置,即恢复到最初刚下载好后的设置. 操作步骤 步骤一:打开setting 步骤二:搜索Restore Default,找到…