Spark-SQL 项目

一、项目概述

(一)实验目标

  1. 统计有效数据条数:筛选出uid、phone、addr三个字段均无空值的记录并计数。
  2. 提取用户数量最多的前 20 个地址:按地址分组统计用户数,按降序排序后取前 20 名。

(二)数据说明

  1. 数据格式
    • 输入数据为 JSON 格式,字段包括uid(用户 ID)、phone(电话号码)、addr(地址)。
    • 数据特点:部分记录存在格式不规范问题(如单引号混用、字段值缺失、地址格式不统一,例如 “江苏省 苏州”“广东省 中山” 等),需先清洗转换。
    • 示例数据

json

{"uid":"1000166111","phone":"17703771999","addr":"河南省 南阳"}

{"uid":"1000432103","phone":"15388889881","addr":"云南省 昆明"}

  1. 有效数据定义
    同时满足以下条件的记录:
    • uid不为空(非null且非空字符串);
    • phone不为空(非null且非空字符串);
    • addr不为空(非null且非空字符串)。

二、实验准备

(一)环境配置

  1. 软件依赖
    • Spark 3.x+(需启用 Hive 支持以使用get_json_object函数);
    • 编程语言:Scala/Python(本文以 Scala 为例,Python 代码可通过 PySpark 实现);
    • 配置文件:确保spark.sql.warehouse.dir指向 HDFS 或本地路径(如hdfs://node01:9000/user/hive/warehouse)。
  2. 数据准备
    • 将 JSON 数据保存为文件(如user_data.json),确保每行一个 JSON 对象;
    • 若存在格式错误(如单引号),先用文本处理工具(如sed 's/\'/"/g')统一为双引号。

三、数据处理流程

(一)数据读取与格式转换

1. 读取原始数据

使用 Spark 的 JSON 数据源直接加载数据,自动推断 Schema:

scala

val rawDF = spark.read.json("path/to/user_data.json")

rawDF.printSchema() // 检查字段是否正确解析(可能因格式问题导致字段类型为String)

2. 字段提取与清洗

通过get_json_object函数(Spark SQL 内置函数)解析 JSON 字段,处理不规范格式:

scala

// 方法1:Spark SQL语句(推荐,清晰易读)

rawDF.createOrReplaceTempView("raw_data")

val parsedDF = spark.sql("""

SELECT

get_json_object(raw_data.data, '$.uid') AS uid, -- 提取uid

get_json_object(raw_data.data, '$.phone') AS phone, -- 提取phone

trim(get_json_object(raw_data.data, '$.addr')) AS addr -- 提取addr并去除前后空格

FROM raw_data

""")

// 方法2:DataFrame API(适合编程式处理)

import org.apache.spark.sql.functions.expr

val parsedDF = rawDF.select(

expr("get_json_object(data, '$.uid')").as("uid"),

expr("get_json_object(data, '$.phone')").as("phone"),

expr("trim(get_json_object(data, '$.addr'))").as("addr")

)

(二)统计有效数据条数

1. 筛选有效数据

过滤掉任一字段为空的记录:

scala

val validDF = parsedDF.filter(

col("uid").isNotNull &&

col("phone").isNotNull &&

col("addr").isNotNull

)

或通过SQL语句:

spark.sql("SELECT * FROM parsed_data WHERE uid IS NOT NULL AND phone IS NOT NULL AND addr IS NOT NULL")

2. 计数

scala

val validCount = validDF.count()

println(s"有效数据条数:$validCount")

或通过SQL返回结果:

spark.sql("SELECT COUNT(*) AS valid_data_count FROM valid_data").show()

(三)统计用户数量最多的前 20 个地址

1. 分组聚合

按addr分组,统计每个地址的用户数(直接使用count(*),因uid唯一,也可count(DISTINCT uid),需根据业务需求选择):

scala

val addrGroupDF = validDF.groupBy("addr").count().withColumnRenamed("count", "user_count")

2. 排序与筛选

按用户数降序排序,取前 20 条:

scala

val top20Addresses = addrGroupDF.orderBy(desc("user_count")).limit(20)

top20Addresses.show(false) // 展示结果,地址不换行

3. SQL 完整实现

spark.sql("""

SELECT

addr,

COUNT(*) AS user_count-- 或COUNT(DISTINCT uid)去重统计

FROM valid_data

GROUP BY addr

ORDER BY user_count DESC

LIMIT 20

""").show()

五、扩展与优化建议

(一)数据清洗增强

  1. 地址标准化:使用正则表达式或自定义函数清洗地址(如 “江苏省苏州” 统一为 “江苏省苏州市”);
  2. 手机号格式校验:添加正则表达式过滤无效手机号(如^1[3-9]\d{9}$)。

(二)性能优化

  1. 分区与缓存:对大数据集使用repartition分区,对高频访问的中间表(如validDF)调用cache();
  2. 列式存储:将结果数据保存为 Parquet 格式(validDF.write.parquet("output/valid_data")),提升后续查询效率。

(三)结果输出

将最终结果导出到 HDFS、本地文件或数据库:

scala

top20Addresses.write

.mode("overwrite")

.csv("output/top20_addresses") // 保存为CSV文件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/78373.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis的ZSet对象底层原理——跳表

我们来聊聊「跳表(Skip List)」,这是一个既经典又优雅的数据结构,尤其在 Redis 中非常重要,比如 ZSet(有序集合)底层就用到了跳表。 🌟 跳表(Skip List)简介 …

2025深圳中兴通讯安卓开发社招面经

2月27号 中兴通讯一面 30多分钟 自我介绍 聊项目 我的优缺点,跟同事相比,有什么突出的地方 Handler机制,如何判断是哪个消息比较耗时 设计模式:模板模式 线程的状态 线程的开启方式 线程池原理 活动的启动模式 Service和Activity…

【Castle-X机器人】二、智能导览模块安装与调试

持续更新。。。。。。。。。。。。。。。 【Castle-X机器人】智能导览模块安装与调试 二、智能导览模块安装与调试2.1 智能导览模块安装2.2 智能导览模块调试2.2.1 红外测温传感器测试2.2.2 2D摄像头测试 二、智能导览模块安装与调试 2.1 智能导览模块安装 使用相应工具将智能…

深入理解二叉树遍历:递归与栈的双重视角

二叉树的遍历前序遍历中序遍历后续遍历总结 二叉树的遍历 虽然用递归的方法遍历二叉树实现起来更简单,但是要想深入理解二叉树的遍历,我们还必须要掌握用栈遍历二叉树,递归其实就是利用了系统栈去遍历。特此记录一下如何用双重视角去看待二叉…

Qt Creator中自定义应用程序的可执行文件图标

要在Qt Creator中为你的应用程序设置自定义可执行文件图标,你需要按照以下步骤操作: Windows平台设置方法 准备图标文件: 创建一个.ico格式的图标文件(推荐使用256x256像素,包含多种尺寸) 可以使用在线工…

Windows11系统中GIT下载

Windows11系统中GIT下载 0、GIT背景介绍0.0 GIT概述0.1 GIT诞生背景0.2 Linus Torvalds 的设计目标0.3 Git 的诞生(2005 年)0.4 Git 的后续发展0.5 为什么 Git 能成功? 1、资源下载地址1.1 官网资源1.2 站内资源 2、安装指导3、验证是否下载完…

react的fiber 用法

在 React 里,Fiber 是 React 16.x 及后续版本采用的协调算法,它把渲染工作分割成多个小任务,让 React 可以在渲染过程中暂停、恢复和复用任务,以此提升渲染性能与响应能力。在实际开发中,你无需直接操作 Fiber 节点&am…

FPGA前瞻篇-数字电路基础-逻辑门电路设计

模拟信号: 一条随时间连续变化、平滑波动的曲线,比如正弦波。 数字信号: 一条只有高低两个状态(0和1),跳变清晰的方波曲线。 在 IC 或 FPGA 的逻辑设计中,我们通常只能处理数字信号&#xff0…

RabbitMQ 基础概念(队列、交换机、路由键、绑定键、信道、连接、虚拟主机、多租户)介绍

本文是博主在梳理 RabbitMQ 知识的过程中,将所遇到和可能会遇到的基础知识记录下来,用作梳理 RabbitMQ 的整体架构和功能的线索文章,通过查找对应的知识能够快速的了解对应的知识而解决相应的问题。 文章目录 一、RabbitMQ 是什么&#xff1f…

机器学习第一篇 线性回归

数据集:公开的World Happiness Report | Kaggle中的happiness dataset2017. 目标:基于GDP值预测幸福指数。(单特征预测) 代码: 文件一:prepare_for_traning.py """用于科学计算的一个库…

Java面试高频问题(29-30)

二十九、全链路压测:数据隔离与流量 关键技术点 1. 流量染色:通过Header注入X-Test-TraceId标识压测流量 2. 影子库表:通过ShardingSphere实现数据隔离 3. 熔断降级:压测流量触发异常时自动切换回生产数据源 数据隔离方案对比 …

Python常用的第三方模块之数据分析【pdfplumber库、Numpy库、Pandas库、Matplotlib库】

【pdfplumber库】从PDF文件中读取内容 import pdfplumber #打开PDF文件 with pdfplumber.open(DeepSeek从入门到精通(20250204).pdf) as pdf:for i in pdf.pages: #遍历页print(i.extract_text()) #extract_text()方法提取内容print(f----------------第{i.page_number}页结束…

长短板理论——AI与思维模型【83】

一、定义 长短板理论思维模型,也被称为木桶原理,是指一只木桶能盛多少水,并不取决于最长的那块木板,而是取决于最短的那块木板。该理论将木桶视为一个整体系统,各个木板代表着系统的不同组成部分或要素,强…

2025蓝桥省赛c++B组第二场题解

前言 这场的题目非常的简单啊,至于为什么有第二场,因为当时河北正在刮大风被迫停止了QwQ,个人感觉是历年来最简单的一场,如果有什么不足之处,还望补充。 试题 A: 密密摆放 【问题描述】 小蓝有一个大箱子&#xff0…

【数据结构与算法】从完全二叉树到堆再到优先队列

完全二叉树 CBT 设二叉树的深度为 h , 若非最底层的其他各层的节点数都达到最大个数 , 最底层 h 的所有节点都连续集中在左侧的二叉树叫做 完全二叉树 . 特点 对任意节点 , 其右分支下的叶子节点的最底层为 L , 则其左分支下的叶子节点的最低层一定是 L 或 L 1 .完全二叉树…

Leetcode:1. 两数之和

题目 给定一个整数数组 nums 和一个整数目标值 target,请你在该数组中找出 和为目标值 target 的那 两个 整数,并返回它们的数组下标。 你可以假设每种输入只会对应一个答案,并且你不能使用两次相同的元素。 你可以按任意顺序返回答案。 示…

flume整合kafka

需求一: 启动flume 启动kafka消费者,验证数据写入成功 新增测试数据 需求二: 启动Kafka生产者 启动Flume 在生产者中写入数据

Hbase集群管理与实践

一、HBase集群搭建实战 1.1 环境规划建议 硬件配置基准(以10节点集群为例): 角色CPU内存磁盘网络HMaster4核16GBSSD 200GB(系统盘)10GbpsRegionServer16核64GB124TB HDD(JBOD)25GbpsZooKeeper4核8GBSSD 500GB10Gbps1.2 关键配置项示例(hbase-site.xml) <configu…

STM32 开发 - stm32f10x.h 头文件(内存映射、寄存器结构体与宏、寄存器位定义、实现点灯案例)

概述 STM32F10x.h 是 STM32F1 系列微控制器的核心头文件&#xff0c;提供了所有外设寄存器的定义和内存映射 一、内存映射 #define PERIPH_BASE ((uint32_t)0x40000000)#define APB1PERIPH_BASE PERIPH_BASE #define APB2PERIPH_BASE (PERIPH_BASE 0x…

QEMU源码全解析 —— 块设备虚拟化(23)

接前一篇文章:QEMU源码全解析 —— 块设备虚拟化(22) 本文内容参考: 《趣谈Linux操作系统》 —— 刘超,极客时间 《QEMU/KVM源码解析与应用》 —— 李强,机械工业出版社 特此致谢! QEMU启动过程中的块设备虚拟化 上一回解析了qcow2格式对应的qcow2_open函数,本回解…