2023_Spark_实验十五:SparkSQL进阶操作

实验目标
  1. 通过实践掌握Spark SQL中复杂查询(包括子查询、窗口函数、联接等)的实现方式。
  2. 了解如何通过合理的数据分区和缓存策略进行性能优化。
  3. 实现一个基于Spark SQL的ETL数据处理流程,应用相关优化技巧。
实验背景

在本实验中,学员将使用Spark SQL处理一个典型的企业级大数据处理场景:从日志文件和交易数据中提取信息、清洗数据、进行复杂查询,并优化查询性能。

实验内容
  1. 环境准备

    • 配置并启动一个Spark集群,确保每个学员有一个可用的Spark环境。
    • 准备实验数据:模拟一份包含交易记录、用户信息和产品数据的日志文件,以及对应的CSV格式数据文件。
  2. 实验步骤

    • 使用Spark SQL进行数据加载(加载CSV、JSON等数据格式)。
    • 对加载的数据进行基本清洗与转换。
    • 编写并优化SQL查询,使用窗口函数、JOIN操作和子查询。
    • 对查询过程进行性能分析,并采用缓存、分区等优化策略。

实验数据如下:

users.csv

user_id,name,age,gender
1,李静,62,M
2,梁静,64,M
3,梁静,46,M
4,赵伟,59,M
5,徐丽娟,32,F
6,赵伟,23,M
7,王伟,46,F
8,徐涛,63,M
9,梁强,23,M
10,吴晓,18,M
11,周波,53,M

transactions.csv

transaction_id,user_id,amount,transaction_date
1,486,429.85170924871284,2024-10-09
2,102,736.1594138169264,2024-08-19
3,758,958.0420403336467,2024-05-02
4,156,137.85335989595777,2024-10-17
5,436,962.1964461356514,2023-12-28
6,10,472.1597363615911,2024-07-25
7,349,247.35900107583026,2023-11-26
8,901,349.2802498314715,2024-05-26

实验代码
package SparkSQLimport org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, row_number, to_date}
import org.apache.spark.sql.{SparkSession, functions => F}/*** @projectName SparkLearning2023  * @package SparkSQL  * @className SparkSQL.SparkSQLAdvancedExp  * @description SparkSQL进阶案例实验分析用户消费习惯 * @author pblh123* @date 2024/11/14 22:24* @version 1.0*/object SparkSQLAdvancedExp {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("Spark SQL Advanced Operations").master("local[2]").config("spark.sql.warehouse.dir", "tmp/spark-warehouse").enableHiveSupport().getOrCreate()// 设置日志级别为ERROR,避免冗余日志信息spark.sparkContext.setLogLevel("ERROR")// 加载CSV文件val dfUsers = spark.read.option("header", "true").csv("datas/sparksqldemo/users/users.csv")val dfTransactions = spark.read.option("header", "true").csv("datas/sparksqldemo/transactions/transactions.csv")// 执行操作,查看缓存是否有效dfUsers.show(3,0)dfTransactions.show(3,false)dfUsers.printSchema()dfTransactions.printSchema()// 去除重复数据val dfUsersClean = dfUsers.dropDuplicates()val dfTransactionsClean = dfTransactions.dropDuplicates()// 填充空值val dfUsersFilled = dfUsersClean.na.fill(Map("age" -> "0", "gender" -> "unknown"))// 使用filter去掉amount为null或空字符串的行val dfTransactionsNoNull = dfTransactionsClean.filter(dfTransactionsClean("amount").isNotNull && dfTransactionsClean("amount") =!= "")// 列转换:将年龄转换为整数类型val dfUsersWithAge = dfUsersFilled.withColumn("age", dfUsersFilled("age").cast("int"))val dfTransactioncc = dfTransactionsNoNull.withColumn("amount", dfTransactionsNoNull("amount").cast("double")).withColumn("transaction_date", to_date(col("transaction_date"), "yyyy-MM-dd"))dfUsersWithAge.printSchema()dfTransactioncc.printSchema()// 使用缓存缓存数据val dfUsersCached = dfUsersWithAge.cache()val dfTransactionsCached = dfTransactioncc.persist()// 执行操作,查看缓存是否有效dfUsersCached.show(3,0)dfTransactionsCached.show(3,0)//    将dataframe注册成临时视图,供sparksql使用dfUsersWithAge.createOrReplaceTempView("user")dfTransactioncc.createOrReplaceTempView("trans")// 获取每个用户的总交易金额(子查询)val totalSpentQuery ="""|SELECT user_id,|(SELECT SUM(amount) FROM trans WHERE trans.user_id = user.user_id) AS total_spent|FROM user|order by total_spent desc""".stripMarginval dfTotalSpent = spark.sql(totalSpentQuery)dfTotalSpent.show(3,0)// 定义窗口函数:按金额排序// col("amount").desc 降序排序,col("amount") 升序排序val windowSpec = Window.partitionBy("user_id").orderBy(col("amount").desc)// 为每个用户根据交易金额排序val dfWithRank = dfTransactioncc.withColumn("rank", F.row_number().over(windowSpec)).select("user_id", "transaction_id", "amount", "rank")dfWithRank.show(3,0)// 使用窗口函数为每个用户的交易记录分配行号val dfWithRank2 = dfTransactioncc.withColumn("rank", row_number().over(windowSpec))// 筛选出每个用户前5条记录val top5Df = dfWithRank2.filter(col("rank") <= 5).select("user_id", "transaction_id", "amount", "rank")// 显示结果top5Df.show(10,0)// 内联接(JOIN)操作:将用户与Top5交易数据联接val dfUserTransactions = dfUsersWithAge.join(top5Df, "user_id")dfUserTransactions.show(3,0)// 查看查询的执行计划dfUserTransactions.explain(true)dfUserTransactions.coalesce(1).write.parquet("datas/sparksqldemo/usersTrans")//    关闭sparkspark.stop()}}
代码执行过程图

    // 加载CSV文件val dfUsers = spark.read.option("header", "true").csv("datas/sparksqldemo/users/users.csv")val dfTransactions = spark.read.option("header", "true").csv("datas/sparksqldemo/transactions/transactions.csv")// 执行操作,查看缓存是否有效dfUsers.show(3,0)dfTransactions.show(3,false)dfUsers.printSchema()dfTransactions.printSchema()

    // 列转换:将年龄转换为整数类型val dfUsersWithAge = dfUsersFilled.withColumn("age", dfUsersFilled("age").cast("int"))val dfTransactioncc = dfTransactionsNoNull.withColumn("amount", dfTransactionsNoNull("amount").cast("double")).withColumn("transaction_date", to_date(col("transaction_date"), "yyyy-MM-dd"))dfUsersWithAge.printSchema()dfTransactioncc.printSchema()

    val dfTotalSpent = spark.sql(totalSpentQuery)dfTotalSpent.show(3,0)// 定义窗口函数:按金额排序// col("amount").desc 降序排序,col("amount") 升序排序val windowSpec = Window.partitionBy("user_id").orderBy(col("amount").desc)// 为每个用户根据交易金额排序val dfWithRank = dfTransactioncc.withColumn("rank", F.row_number().over(windowSpec)).select("user_id", "transaction_id", "amount", "rank")dfWithRank.show(3,0)

    // 使用窗口函数为每个用户的交易记录分配行号val dfWithRank2 = dfTransactioncc.withColumn("rank", row_number().over(windowSpec))// 筛选出每个用户前5条记录val top5Df = dfWithRank2.filter(col("rank") <= 5).select("user_id", "transaction_id", "amount", "rank")// 显示结果top5Df.show(10,0)// 内联接(JOIN)操作:将用户与Top5交易数据联接val dfUserTransactions = dfUsersWithAge.join(top5Df, "user_id")dfUserTransactions.show(3,0)

    // 查看查询的执行计划dfUserTransactions.explain(true)

任务清单:

1. 获取近三个月消费额大于所有用户同期消费平均值的用户的前三笔销售额清单

2. 将每个月每个用户的的总消费额数据存储到MySQL中表userTmonth保存。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/60710.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大模型研究报告 | 2024年中国金融大模型产业发展洞察报告|附34页PDF文件下载

随着生成算法、预训练模型、多模态数据分析等AI技术的聚集融合&#xff0c;AIGC技术的实践效用迎来了行业级大爆发。通用大模型技术的成熟推动了新一轮行业生产力变革&#xff0c;在投入提升与政策扶植的双重作用下&#xff0c;以大模型技术为底座、结合专业化金融能力的金融大…

MySQL联合索引(abc)命中测试

1.建表 mysql创建一张表&#xff0c;表名&#xff1a;‘test_models’ id列为 主键&#xff0c;int类型 &#xff0c;自增a,b,c,d,e 全部是int&#xff08;11&#xff09;为&#xff08;a,b,c&#xff09;添加一个联合索引 index_abc 执行语句&#xff1a;创建表 CREATE TA…

Gin 框架入门(GO)-1

1 介绍 Gin 是一个 Go (Golang) 编写的轻量级 http web 框架&#xff0c;运行速度非常快&#xff0c;Gin 最擅长的就是 Api 接口的高并发。 2 Gin 环境搭建 1.下载并安装 gin go get -u github.com/gin-gonic/gin 2.将 gin 引入到代码中&#xff1a; import "github.co…

VUE3+Three.js搭建教程

一、创建VUE项目工程 1、方法一 使用下面命令行快速创建vue项目&#xff0c;Please pick a preset这里我们选择使用VUE3 vue create projectName 创建时可能会遇到的报错 错误原因&#xff1a;当前使用的node版本未全局安装vue-cli脚手架&#xff0c;使用下面命令安装后再使…

【React】状态管理之Redux

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 状态管理之Redux引言1. Redux 的核心概念1.1 单一数据源&#xff08;Single Sou…

自己动手写Qt Creator插件

文章目录 前言一、环境准备1.先看自己的Qt Creator IDE的版本2.下载源码 二、使用步骤1.参考原本的插件2.编写自定义插件1.cmakelist增加一个模块2.同理&#xff0c;qbs文件也增加一个3.插件源码 三、效果总结 前言 就目前而言&#xff0c;Qt Creator这个IDE&#xff0c;插件比…

React Native 全栈开发实战班 -React Native 基础

本课程旨在帮助学员系统掌握 React Native 全栈开发技能&#xff0c;从基础入门到实战项目开发。课程将分为多个模块&#xff0c;第一部分将聚焦于 React Native 的基础知识&#xff0c;包括开发环境搭建、React Native 简介与特点&#xff0c;以及项目结构解析。 第一部分&am…

Leetcode:118. 杨辉三角——Java数学法求解

题目——Leetcode:118. 杨辉三角 给定一个非负整数 numRows&#xff0c;生成「杨辉三角」的前 numRows 行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方的数的和。 示例 1: 输入: numRows 5 输出: [[1],[1,1],[1,2,1],[1,3,3,1],[1,4,6,4,1]]示例 2: 输入: numRow…

Linux中.NET读取excel组件,不会出现The type initializer for ‘Gdip‘ threw an exception异常

组件&#xff0c;可通过nuget安装&#xff0c;直接搜名字&#xff1a; ExcelDataReader using ConsoleAppReadFileData.Model; using ExcelDataReader; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Task…

Robot | 用 RDK 做一个小型机器人(更新中)

目录 前言架构图开发过程摄像头模型转换准备校准数据使用 hb_mapper makertbin 工具转换模型 底版开发 结语 前言 最近想开发一个小型机器人&#xff0c;碰巧看到了 RDK x5 发布了&#xff0c;参数对于我来说非常合适&#xff0c;就买了一块回来玩。 外设也是非常丰富&#xf…

jenkins使用cli发行uni-app到h5

官网文档HBuilderX 文档 首先确定是否存在环境变量 正常情况cmd中执行cli 如果提示 cli 不是内部或外部命令&#xff0c;也不是可运行的程序或批处理文件。请先配置环境变量 Freestyle Project项目在Build Steps中增加Execute Windows batch command命令如下 d: cd D:\devsof…

基于Zynq FPGA对雷龙SD NAND的测试

一、SD NAND特征 1.1 SD卡简介 雷龙的SD NAND有很多型号&#xff0c;在测试中使用的是CSNP4GCR01-AMW与CSNP32GCR01-AOW。芯片是基于NAND FLASH和 SD控制器实现的SD卡。具有强大的坏块管理和纠错功能&#xff0c;并且在意外掉电的情况下同样能保证数据的安全。 其特点如下&…

【Java语言】String类

在C语言中字符串用字符可以表示&#xff0c;可在Java中有单独的类来表示字符串&#xff08;就是String&#xff09;&#xff0c;现在我来介绍介绍String类。 字符串构造 一般字符串都是直接赋值构造的&#xff0c;像这样&#xff1a; 还可以这样构造&#xff1a; 图更能直观的…

【神经科学学习笔记】基于分层嵌套谱分割(Nested Spectral Partition)模型分析大脑网络整合与分离的局部指标(二)

前言 1.学习背景 前几天笔者学习使用NSP (Network Segregation and Partnership) 算法计算大脑整合分离的全局指标&#xff0c;现在要在之前学习的基础上再来玩玩局部指标。 局部指标的计算主要在两个层面上进行&#xff1a;第一个层面是针对每个独立ROI的指标计算&#xff0…

WPF-控件的属性值的类型转化

控件的属性值需要转成int、double进行运算的&#xff0c;可以使用一下方法 页面代码 <StackPanel Margin"4,0,0,0" Style"{StaticResource Form-StackPanel}"> <Label Content"替换后材料增加金额&#xff…

Python3.11.9下载和安装

一、Python3.11.9下载和安装 1、下载 下载地址&#xff1a;https://www.python.org/downloads/windows/ 选择版本下载&#xff0c;例如&#xff1a;Python 3.11.9 - April 2, 2024 2、安装 双击exe安装 3、配置环境变量 pathD:\Program Files\python3.11.9 pathD:\Progr…

大模型学习笔记------BLIP模型的再思考

大模型学习笔记------BLIP模型的再思考 1、BLIP推理---如何进行“图生文”2、BLIP推理---如何进行视觉问答&#xff08;Visual Question Answering, VQA&#xff09;3、BLIP推理---如何进行图文检索&#xff08;Image-text retrieval&#xff09;任务4、总结 上一篇文章上文中讲…

超全面!一文带你快速入门HTML,CSS和JavaScript!

作为一名后端程序员&#xff0c;在开发过程中避免不了和前端打交道&#xff0c;所以就要了解一些前端的基础知识&#xff0c;比如三剑客HTML,CSS,JavaScript&#xff0c;甚至有必要学习一下Vue、React等前端主流框架。 学习文档&#xff1a;https://www.w3school.com.cn/ 一…

PcVue + SQL Grid : 释放数据的无限潜力

探秘PcVue系列&#xff1a;E3 PcVue SQL Grid : 释放数据的无限潜力 探秘PcVue之SQL 什么是SQL Grid&#xff1f; SQL Grid用于通过简单的sql查询语句&#xff0c;实现数据的查询和显示。结构化查询语句&#xff08;SQL&#xff09;可以帮助SCADA软件用户提高连接性以及发送和…

使用 Umami 部署博客分析工具

Umami 简介 Umami 是一款开源且注重隐私的网站分析工具&#xff0c;可替代 Google Analytics。它提供网站流量和用户行为等见解&#xff0c;但不使用 Cookie 或收集个人数据&#xff0c;符合隐私法规。Umami 轻巧易用&#xff0c;可自行托管。 如果你有自己的博客&#xff0c;…