PySPARK带多组参数和标签的SparkSQL批量数据导出到S3的程序

设计一个基于多个带标签SparkSQL模板作为配置文件和多组参数的PySPARK代码程序,实现根据不同的输入参数自动批量地将数据导出为Parquet、CSV和Excel文件到S3上,标签和多个参数(以“_”分割)为组成导出数据文件名,文件已经存在则覆盖原始文件。
代码如下:

import json
from pyspark.sql import SparkSessiondef load_config(config_path):with open(config_path, 'r') as f:return json.load(f)def main(config_path, base_s3_path):# 初始化SparkSession,配置S3和Excel支持spark = SparkSession.builder \.appName("DataExportJob") \.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1") \.getOrCreate()# 配置S3访问(根据实际环境配置)spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "YOUR_ACCESS_KEY")spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "YOUR_SECRET_KEY")spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")config = load_config(config_path)for template in config['templates']:label = template['label']sql_template = template['sql_template']parameters_list = template['parameters']for params in parameters_list:# 验证参数数量是否匹配placeholders = sql_template.count('{')if len(params) != placeholders:raise ValueError(f"参数数量不匹配,模板需要{placeholders}个参数,但当前参数为{len(params)}个")# 替换SQL中的占位符formatted_sql = sql_template.format(*params)df = spark.sql(formatted_sql)# 生成文件名参数部分param_str = "_".join(params)base_filename = f"{label}_{param_str}"# 定义输出路径output_paths = {'parquet': f"{base_s3_path}/parquet/{base_filename}",'csv': f"{base_s3_path}/csv/{base_filename}",'excel': f"{base_s3_path}/excel/{base_filename}.xlsx"}# 写入Parquetdf.write.mode('overwrite').parquet(output_paths['parquet'])# 写入CSV(自动生成header)df.write.mode('overwrite') \.option("header", "true") \.csv(output_paths['csv'])# 写入Excel(使用spark-excel包)df.write.format("com.crealytics.spark.excel") \.option("header", "true") \.option("inferSchema", "true") \.mode("overwrite") \.save(output_paths['excel'])spark.stop()if __name__ == "__main__":import argparseparser = argparse.ArgumentParser()parser.add_argument('--config', type=str, required=True, help='Path to config JSON file')parser.add_argument('--s3-path', type=str, required=True, help='Base S3 path (e.g., s3a://your-bucket/data)')args = parser.parse_args()main(args.config, args.s3_path)

配置文件示例(config.json)

{"templates": [{"label": "sales_report","sql_template": "SELECT * FROM sales WHERE date = '{0}' AND region = '{1}'","parameters": [["202301", "north"],["202302", "south"]]},{"label": "user_activity","sql_template": "SELECT user_id, COUNT(*) AS cnt FROM activity WHERE day = '{0}' GROUP BY user_id","parameters": [["2023-01-01"],["2023-01-02"]]}]
}

使用说明

  1. 依赖管理

    • 确保Spark集群已安装Hadoop AWS和Spark Excel依赖:
      spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 your_script.py
      
  2. S3配置

    • 替换代码中的YOUR_ACCESS_KEYYOUR_SECRET_KEY为实际AWS凭证
    • 根据S3兼容存储调整endpoint(如使用MinIO需特殊配置)
  3. 执行命令

    spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 \
    data_export.py --config config.json --s3-path s3a://your-bucket/exports
    

输出结构

s3a://your-bucket/exports
├── parquet
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
├── csv
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
└── excel├── sales_report_202301_north.xlsx├── sales_report_202302_south.xlsx└── user_activity_2023-01-01.xlsx

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/69204.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

每日Attention学习19——Convolutional Multi-Focal Attention

每日Attention学习19——Convolutional Multi-Focal Attention 模块出处 [ICLR 25 Submission] [link] UltraLightUNet: Rethinking U-shaped Network with Multi-kernel Lightweight Convolutions for Medical Image Segmentation 模块名称 Convolutional Multi-Focal Atte…

2. K8S集群架构及主机准备

本次集群部署主机分布K8S集群主机配置主机静态IP设置主机名解析ipvs管理工具安装及模块加载主机系统升级主机间免密登录配置主机基础配置完后最好做个快照备份 2台负载均衡器 Haproxy高可用keepalived3台k8s master节点5台工作节点(至少2及以上)本次集群部署主机分布 K8S集群主…

游戏引擎学习第89天

回顾 由于一直没有渲染器,终于决定开始动手做一个渲染器,虽然开始时并不确定该如何进行,但一旦开始做,发现这其实是正确的决定。因此,接下来可能会花一到两周的时间来编写渲染器,甚至可能更长时间&#xf…

链式结构二叉树(递归暴力美学)

文章目录 1. 链式结构二叉树1.1 二叉树创建 2. 前中后序遍历2.1 遍历规则2.2 代码实现图文理解 3. 结点个数以及高度等二叉树结点个数正确做法: 4. 层序遍历5. 判断是否完全二叉树 1. 链式结构二叉树 完成了顺序结构二叉树的代码实现,可以知道其底层结构…

Kubernetes 中 BGP 与二层网络的较量:究竟孰轻孰重?

如果你曾搭建过Kubernetes集群,就会知道网络配置是一个很容易让人深陷其中的领域。在负载均衡器、服务通告和IP管理之间,你要同时应对许多变动的因素。对于许多配置而言,使用二层(L2)网络就完全能满足需求。但边界网关协议(BGP)—— 支撑互联网运行的技术 —— 也逐渐出…

Linux提权--John碰撞密码提权

​John the Ripper​(简称 John)是一个常用的密码破解工具,可以通过暴力破解、字典攻击、规则攻击等方式,尝试猜解用户密码。密码的弱度是提权攻击中的一个重要因素,如果某个用户的密码非常简单或是默认密码&#xff0…

大数据学习之Spark分布式计算框架RDD、内核进阶

一.RDD 28.RDD_为什么需要RDD 29.RDD_定义 30.RDD_五大特性总述 31.RDD_五大特性1 32.RDD_五大特性2 33.RDD_五大特性3 34.RDD_五大特性4 35.RDD_五大特性5 36.RDD_五大特性总结 37.RDD_创建概述 38.RDD_并行化创建 演示代码: // 获取当前 RDD 的分区数 Since ( …

[创业之路-286]:《产品开发管理-方法.流程.工具 》-1- IPD两个跨职能团队的组织

IPD(集成产品开发)中的两个重要跨职能组织是IPMT(集成产品管理团队)和PDT(产品开发团队)。 在IPD(集成产品开发)体系中,IRB(投资评审委员会)、IPM…

DeepSeek 提示词之角色扮演的使用技巧

老六哥的小提示:我们可能不会被AI轻易淘汰,但是会被“会使用AI的人”淘汰。 在DeepSeek的官方提示库中,有“角色扮演(自定义人设)”的提示词案例。截图如下: 在“角色扮演”的提示词案例中,其实…

第二个Qt开发实例:在Qt中利用GPIO子系统和sysfs伪文件系统实现按钮(Push Button)点击控制GPIO口(效果为LED2灯的灭和亮)

引言 本文承接博文 https://blog.csdn.net/wenhao_ir/article/details/145420998 里的代码,在那里面代码的基础上添加上利用sysfs伪文件系统实现按钮(Push Button)点击控制GPIO口的代码,进而实现LED2灯的灭和亮。 最终的效果是点击下面的LED按钮实现LED…

登山第十七梯:矩形拟合——无惧噪声

文章目录 一 摘要 二 资源 三 内容 (文章末尾提供源代码) 一 摘要 目前,获取点集的矩形拟合结果的主要方法是计算其最小外包直立矩形或者旋转矩形。这些方法简单、易用,在数据质量良好的情况下能够较好的贴合矩形形状。然而,在数据缺失时,最小外包围盒方法将会…

57. Uboot图形化界面配置

一、Uboot图形化配置方法 1、通过终端配置。 2、进入到uboot的源码根目录下。 3、首先默认配置 make mx6ull_alientek_emmc_defconfig //默认配置 4、输入make menuconfig。打开图形化配置界面。 5、注意,新电脑需要安装ncurses库。sudo apt-get install libncurs…

kalman滤波器C++设计仿真实例第三篇

1. 仿真场景 水面上有条船在做匀速直线航行,航行过程中由于风和浪的影响,会有些随机的干扰,也就是会有些随机的加速度作用在船身上,这个随机加速度的均方差大约是0.1,也就是说方差是0.01。船上搭载GPS设备,…

(2025|ICLR,音频 LLM,蒸馏/ALLD,跨模态学习,语音质量评估,MOS)音频 LLM 可作为描述性语音质量评估器

Audio Large Language Models Can Be Descriptive Speech Quality Evaluators 目录 1. 概述 2. 研究背景与动机 3. 方法 3.1 语音质量评估数据集 3.2 ALLD 对齐策略 4. 实验结果分析 4.1 MOS 评分预测(数值评估) 4.2 迁移能力(在不同…

stm32生成hex文件详解

1.产生的map文件干啥的? 2.组成情况??? 废话少说,直接上代码具体内容况: Component: ARM Compiler 5.06 update 7 (build 960) Tool: armlink [4d3601]Section Cross Referencesstartup_stm32f103xe.o(S…

百度热力图数据获取,原理,处理及论文应用6

目录 0、数据简介0、示例数据1、百度热力图数据日期如何选择1.1、其他实验数据的时间1.2、看日历1.3、看天气 2、百度热力图几天够研究?部分文章统计3、数据原理3.1.1 ** 这个比较重要,后面还会再次出现。核密度的值怎么理解?**3.1.2 Csv->…

[转]Java面试近一个月的面试总结

本文是在学习中的总结,欢迎转载但请注明出处:http://blog.csdn.net/pistolove/article/details/46753275 前言 打算换个工作,近一个月面试了不少的公司,下面将一些面试经验和思考分享给大家。另外校招也快要开始了,为…

学习threejs,tga格式图片文件贴图

👨‍⚕️ 主页: gis分享者 👨‍⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍⚕️ 收录于专栏:threejs gis工程师 文章目录 一、🍀前言1.1 ☘️TGA图片1.2 ☘️THREE.Mesh…

MSPFN 代码复现

1、环境配置 conda create -n MSPFN python3.9 conda activate MSPFN pip install opencv-python pip install tensorflow pip install tqdm pip install matplotlib2、train 2.1 创建数据集 2.1.1 数据集格式 |--rainysamples |--file1: |--file2:|--fi…

20240206 adb 连不上手机解决办法

Step 1: lsusb 确认电脑 usb 端口能识别设备 lsusb不知道设备有没有连上,就插拔一下,对比观察多了/少了哪个设备。 Step 2: 重启 adb server sudo adb kill-serversudo adb start-serveradb devices基本上就可以了~ Reference https://b…