通过门店销售明细表用PySpark得到每月每个门店的销冠和按月的同比环比数据

假设我在Amazon S3上有销售表的Parquet数据文件的路径,包含ID主键、门店ID、日期、销售员姓名和销售额,需要分别用PySpark的SparkSQL和Dataframe API统计出每个月所有门店和各门店销售额最高的人,不一定是一个人,以及他所在的门店ID和月总销售额。

使用DataFrame API实现:

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, max, date_format, col
from pyspark.sql.window import Window# 初始化Spark会话
spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()# 读取S3上的Parquet文件
df = spark.read.parquet("s3://path/to/sales/data")# 处理日期字段并计算每月各门店各销售员的销售额总和
sales_aggregated = df.withColumn("month", date_format(col("日期"), "yyyy-MM")) \.groupBy("门店ID", "month", "销售员姓名") \.agg(sum("销售额").alias("sales_total"))# 定义窗口规范(按门店和月份分区)
window_spec = Window.partitionBy("门店ID", "month")# 使用窗口函数计算最大销售额和月总销售额
result_df = sales_aggregated \.withColumn("max_sales", max("sales_total").over(window_spec)) \.withColumn("monthly_total", sum("sales_total").over(window_spec)) \.filter(col("sales_total") == col("max_sales")) \.select("month", "门店ID", "monthly_total", "销售员姓名", "sales_total") \.orderBy("month", "门店ID", "销售员姓名")# 显示结果
result_df.show()

使用SparkSQL实现:

# 注册DataFrame为临时视图
df.createOrReplaceTempView("sales_data")# 执行SQL查询
sql_result = spark.sql("""
WITH sales_aggregated AS (SELECT 门店ID,date_format(日期, 'yyyy-MM') AS month,销售员姓名,SUM(销售额) AS sales_totalFROM sales_dataGROUP BY 门店ID, date_format(日期, 'yyyy-MM'), 销售员姓名
)
SELECT month,门店ID,monthly_total,销售员姓名,sales_total
FROM (SELECT month,门店ID,销售员姓名,sales_total,MAX(sales_total) OVER (PARTITION BY 门店ID, month) AS max_sales,SUM(sales_total) OVER (PARTITION BY 门店ID, month) AS monthly_totalFROM sales_aggregated
) 
WHERE sales_total = max_sales
ORDER BY month, 门店ID, 销售员姓名
""")# 显示结果
sql_result.show()

说明:

  1. 数据准备:从S3读取Parquet文件,并使用date_format处理日期字段为年月格式。
  2. 聚合计算
    • 先按门店、月份和销售员分组,计算每个销售员当月的总销售额。
  3. 窗口函数
    • 使用窗口函数分别计算每个门店每月的最大销售额(用于识别最高销售员)和月总销售额。
  4. 结果过滤
    • 筛选出销售额等于当月最大销售额的记录(可能包含多个销售员)。
  5. 排序输出:按月份、门店ID和销售员姓名排序,确保结果有序。

两种实现方式均会输出以下列:

  • month:年月格式(yyyy-MM)
  • 门店ID:门店标识
  • monthly_total:该门店当月的总销售额
  • 销售员姓名:当月销售额最高的销售员
  • sales_total:该销售员当月的销售额(等于当月最高销售额)

假设我在Amazon S3上有销售表的Parquet数据文件的路径,包含ID主键、门店ID、日期、销售员姓名和销售额,需要分别用PySpark的SparkSQL和Dataframe API统计出按月统计的同比和环比数据,当前月如果不是月底的话,同比或环比数据需要取得上个月或者去年1日到对应的日期的总销售额值。

1. 使用DataFrame API实现

from pyspark.sql import SparkSession
from pyspark.sql import functions as Fspark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()# 读取Parquet数据
df = spark.read.parquet("s3://your-bucket/path/to/sales_data")# 获取当前日期信息
current_date = spark.sql("SELECT current_date()").first()[0]
current_year = current_date.year
current_month = current_date.month
current_day = current_date.day# 数据预处理
processed_df = (df.withColumn("date", F.col("date").cast("date")).withColumn("last_day", F.last_day("date")).withColumn("max_day", F.dayofmonth("last_day")).withColumn("cutoff_day", F.least(F.lit(current_day), F.col("max_day"))).filter(F.dayofmonth("date") <= F.col("cutoff_day"))
)# 按月聚合销售额
monthly_sales = (processed_df.groupBy(F.year("date").alias("year"), F.month("date").alias("month")).agg(F.sum("sales").alias("total_sales"))
)# 计算前月和去年同月信息
monthly_sales = (monthly_sales.withColumn("prev_month_year", F.when(F.col("month") == 1, F.col("year") - 1).otherwise(F.col("year"))).withColumn("prev_month_month", F.when(F.col("month") == 1, 12).otherwise(F.col("month") - 1)).withColumn("prev_year_year", F.col("year") - 1).withColumn("prev_year_month", F.col("month"))
)# 创建临时视图
monthly_sales.createOrReplaceTempView("monthly_sales")# 通过自连接获取比较数据
final_result = (monthly_sales.alias("curr").join(monthly_sales.alias("prev_month"),(F.col("curr.prev_month_year") == F.col("prev_month.year")) &(F.col("curr.prev_month_month") == F.col("prev_month.month")),"left").join(monthly_sales.alias("prev_year"),(F.col("curr.prev_year_year") == F.col("prev_year.year")) &(F.col("curr.prev_year_month") == F.col("prev_year.month")),"left").select(F.col("curr.year"),F.col("curr.month"),F.col("curr.total_sales"),F.col("prev_month.total_sales").alias("prev_month_sales"),F.col("prev_year.total_sales").alias("prev_year_sales"))
)# 计算增长率
final_result = final_result.withColumn("month_over_month",((F.col("total_sales") - F.col("prev_month_sales")) / F.col("prev_month_sales") * 100
).withColumn("year_over_year",((F.col("total_sales") - F.col("prev_year_sales")) / F.col("prev_year_sales") * 100
)# 显示结果
final_result.show()

2. 使用SparkSQL实现

# 注册预处理后的视图
processed_df.createOrReplaceTempView("processed_sales")# 执行SQL查询
sql_query = """
WITH monthly_sales AS (SELECTYEAR(date) AS year,MONTH(date) AS month,SUM(sales) AS total_salesFROM processed_salesGROUP BY YEAR(date), MONTH(date)
),
comparison_data AS (SELECTcurr.year,curr.month,curr.total_sales,prev_month.total_sales AS prev_month_sales,prev_year.total_sales AS prev_year_salesFROM monthly_sales currLEFT JOIN monthly_sales prev_monthON (curr.year = prev_month.year AND curr.month = prev_month.month + 1)OR (curr.month = 1 AND prev_month.month = 12 AND curr.year = prev_month.year + 1)LEFT JOIN monthly_sales prev_yearON curr.year = prev_year.year + 1 AND curr.month = prev_year.month
)
SELECTyear,month,total_sales,ROUND((total_sales - prev_month_sales) / prev_month_sales * 100, 2) AS mom_growth,ROUND((total_sales - prev_year_sales) / prev_year_sales * 100, 2) AS yoy_growth
FROM comparison_data
ORDER BY year, month
"""spark.sql(sql_query).show()

说明:

  1. 数据预处理

    • 转换日期类型并计算每个月的最后一天
    • 动态计算每个月的有效截止日期(考虑当前日期和月份长度)
    • 过滤出有效日期范围内的数据
  2. 聚合计算

    • 按年月分组计算总销售额
    • 使用自连接获取前月和去年同月的销售额数据
  3. 增长率计算

    • 环比增长率 = (本月销售额 - 上月销售额) / 上月销售额 * 100
    • 同比增长率 = (本月销售额 - 去年同期销售额) / 去年同期销售额 * 100
  4. 特殊处理

    • 自动处理月份边界(如1月的前月是去年12月)
    • 处理NULL值避免除零错误
    • 动态适应不同月份的天数差异

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/78395.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PostgreSQL 常用日志

PostgreSQL 常用日志详解 PostgreSQL 提供了多种日志类型&#xff0c;用于监控数据库活动、排查问题和优化性能。以下是 PostgreSQL 中最常用的日志类型及其配置和使用方法。 一、主要日志类型 日志类型文件位置主要内容用途服务器日志postgresql-<日期>.log服务器运行…

MySQL 存储过程:解锁数据库编程的高效密码

目录 一、什么是存储过程?二、创建存储过程示例 1:创建一个简单的存储过程示例 2:创建带输入参数的存储过程示例 3:创建带输出参数的存储过程三、调用存储过程调用无参数存储过程调用带输入参数的存储过程调用带输出参数的存储过程四、存储过程中的流控制语句示例 1:使用 …

基于STM32的物流搬运机器人

功能&#xff1a;智能循迹、定距夹取、颜色切换、自动跟随、自动避障、声音夹取、蓝牙遥控、手柄遥控、颜色识别夹取、循迹避障、循迹定距…… 包含内容&#xff1a;完整源码、使用手册、原理图、视频演示、PPT、论文参考、其余资料 资料只私聊

pg_jieba 中文分词

os: centos 7.9.2009 pg: 14.7 pg_jieba 依赖 cppjieba、limonp pg_jieba 下载 su - postgreswget https://github.com/jaiminpan/pg_jieba/archive/refs/tags/vmaster.tar.gzunzip ./pg_jieba-master cd ~/pg_jieba-mastercppjieba、limonp 下载 su - postgrescd ~/pg_jie…

基于Python+Flask的MCP SDK响应式文档展示系统设计与实现

以下是使用Python Flask HTML实现的MCP文档展示系统&#xff1a; # app.py from flask import Flask, render_templateapp Flask(__name__)app.route(/) def index():return render_template(index.html)app.route(/installation) def installation():return render_templa…

【“星睿O6”AI PC开发套件评测】GPU矩阵指令算力,GPU带宽和NPU算力测试

【“星睿O6”AI PC开发套件评测】GPU矩阵指令算力&#xff0c;GPU带宽和NPU算力测试 安谋科技、此芯科技与瑞莎计算机联合打造了面向AI PC、边缘、机器人等不同场景的“星睿O6”开发套件 该套件异构集成了Armv9 CPU核心、Arm Immortalis™ GPU以及安谋科技“周易”NPU 开箱和…

【Go语言】RPC 使用指南(初学者版)

RPC&#xff08;Remote Procedure Call&#xff0c;远程过程调用&#xff09;是一种计算机通信协议&#xff0c;允许程序调用另一台计算机上的子程序&#xff0c;就像调用本地程序一样。Go 语言内置了 RPC 支持&#xff0c;下面我会详细介绍如何使用。 一、基本概念 在 Go 中&…

11、Refs:直接操控元素——React 19 DOM操作秘籍

一、元素操控的魔法本质 "Refs是巫师与麻瓜世界的连接通道&#xff0c;让开发者能像操控魔杖般精准控制DOM元素&#xff01;"魔杖工坊的奥利凡德先生轻抚着魔杖&#xff0c;React/Vue的refs能量在杖尖跃动。 ——以神秘事务司的量子纠缠理论为基&#xff0c;揭示DOM…

MinIO 教程:从入门到Spring Boot集成

文章目录 一. MinIO 简介1. 什么是MinIO&#xff1f;2. 应用场景 二. 文件系统存储发展史1. 服务器磁盘&#xff08;本地存储&#xff09;2. 分布式文件系统(如 HDFS、Ceph、GlusterFS)3. 对象存储&#xff08;如 MinIO、AWS S3&#xff09;4.对比总结5.选型建议6.示例方案 三.…

电竞俱乐部护航点单小程序,和平地铁俱乐部点单系统,三角洲护航小程序,暗区突围俱乐部小程序

电竞俱乐部护航点单小程序开发&#xff0c;和平地铁俱乐部点单系统&#xff0c;三角洲护航小程序&#xff0c;暗区突围俱乐部小程序开发 端口包含&#xff1a; 超管后台&#xff0c; 老板端&#xff0c;打手端&#xff0c;商家端&#xff0c;客服端&#xff0c;管事端&#x…

基于 IPMI + Kickstart + Jenkins 的 OS 自动化安装

Author&#xff1a;Arsen Date&#xff1a;2025/04/26 目录 环境要求实现步骤自定义 ISO安装 ipmitool安装 NFS定义 ks.cfg安装 HTTP编写 Pipeline 功能验证 环境要求 目标服务器支持 IPMI / Redfish 远程管理&#xff08;如 DELL iDRAC、HPE iLO、华为 iBMC&#xff09;&…

如何在SpringBoot中通过@Value注入Map和List并使用YAML配置?

在SpringBoot开发中&#xff0c;我们经常需要从配置文件中读取各种参数。对于简单的字符串或数值&#xff0c;直接使用Value注解就可以了。但当我们需要注入更复杂的数据结构&#xff0c;比如Map或者List时&#xff0c;该怎么操作呢&#xff1f;特别是使用YAML这种更人性化的配…

短信验证码安全实战:三网API+多语言适配开发指南

在短信服务中&#xff0c;创建自定义签名是发送通知、验证信息和其他类型消息的重要步骤。万维易源提供的“三网短信验证码”API为开发者和企业提供了高效、便捷的自定义签名创建服务&#xff0c;可以通过简单的接口调用提交签名给运营商审核。本文将详细介绍如何使用该API&…

RabbitMQ和Seata冲突吗?Seata与Spring中的事务管理冲突吗

1. GlobalTransactional 和 Transactional 是否冲突&#xff1f; 答&#xff1a;不冲突&#xff0c;它们可以协同工作&#xff0c;但作用域不同。 Transactional: 这是 Spring 提供的注解&#xff0c;用于管理单个数据源内的本地事务。在你当前的 register 方法中&#xff0c…

一台服务器已经有个python3.11版本了,如何手动安装 Python 3.10,两个版本共存

环境&#xff1a; debian12.8 python3.11 python3.10 问题描述&#xff1a; 一台服务器已经有个python3.11版本了&#xff0c;如何手动安装 Python 3.10&#xff0c;两个版本共存 解决方案&#xff1a; 1.下载 Python 3.10 源码&#xff1a; wget https://www.python.or…

c++中的enum变量 和 constexpr说明符

author: hjjdebug date: 2025年 04月 23日 星期三 13:40:21 CST description: c中的enum变量 和 constexpr说明符 文章目录 1.Q:enum 类型变量可以有,--操作吗&#xff1f;1.1补充: c/c中enum的另一个细微差别. 2.Q: constexpr 修饰的函数,要求传入的参数必需是常量吗&#xff…

postman工具

postman工具 进入postman官网 www.postman.com/downloads/ https://www.postman.com/downloads/ https://www.postman.com/postman/published-postman-templates/documentation/ae2ja6x/postman-echo?ctxdocumentation Postman Echo is a service you can use to test your …

Spring和Spring Boot集成MyBatis的完整对比示例,包含从项目创建到测试的全流程代码

以下是Spring和Spring Boot集成MyBatis的完整对比示例&#xff0c;包含从项目创建到测试的全流程代码&#xff1a; 一、Spring集成MyBatis示例 1. 项目结构 spring-mybatis-demo/ ├── src/ │ ├── main/ │ │ ├── java/ │ │ │ └── com.example/…

【数据可视化-24】巧克力销售数据的多维度可视化分析

🧑 博主简介:曾任某智慧城市类企业算法总监,目前在美国市场的物流公司从事高级算法工程师一职,深耕人工智能领域,精通python数据挖掘、可视化、机器学习等,发表过AI相关的专利并多次在AI类比赛中获奖。CSDN人工智能领域的优质创作者,提供AI相关的技术咨询、项目开发和个…

c语言-分支结构

以下是我初学C语言的笔记记录&#xff0c;欢迎留言补充 一&#xff0c;分支结构分为几个 两个&#xff0c;一个是if语句&#xff0c;一个是Switch语句 二&#xff0c;if语句 &#xff08;1&#xff09;结构体 int main() {if()//判断条件{//表达式}else if()//判断条件{//表达式…