PySpark 数据处理实战:从基础操作到案例分析

Spark 的介绍与搭建:从理论到实践_spark环境搭建-CSDN博客

Spark 的Standalone集群环境安装与测试-CSDN博客

PySpark 本地开发环境搭建与实践-CSDN博客

Spark 程序开发与提交:本地与集群模式全解析-CSDN博客

Spark on YARN:Spark集群模式之Yarn模式的原理、搭建与实践-CSDN博客

Spark 中 RDD 的诞生:原理、操作与分区规则-CSDN博客

Spark 中的 RDD 分区的设定规则与高阶函数、Lambda 表达式详解-CSDN博客

RDD 算子全面解析:从基础到进阶与面试要点-CSDN博客

目录

一、手机号码流量统计案例

(一)需求分析

(二)代码实现

(三)代码解析

二、合同数据分析案例

(一)需求分析

(二)代码实现

(三)代码解析

三、日志分析案例

(一)需求分析

(二)jieba分词器

安装一下

使用

测试

(四)代码实现

(三)代码解析

四、常见错误及解决方法

五、总结


        在大数据处理领域,PySpark 作为强大的工具,能够高效地处理大规模数据。本文将通过几个实际案例,详细介绍 PySpark 在数据处理中的应用,包括数据清洗、统计分析等操作,帮助读者深入理解 PySpark 的使用方法和数据处理流程。

一、手机号码流量统计案例

(一)需求分析

        给定一组数据,要求计算每个手机号码的总流量(上行 + 下行),但需排除手机号码不正确以及数据长度不够的数据。数据长度不一致的数据指的是一行数据切割后的列数与其他数据列数不同的数据。

(二)代码实现

以下是实现该功能的 PySpark 代码:

import math
import os
import re
from collections.abc import Iterable# 导入pyspark模块
from pyspark import SparkContext, SparkConfif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取 conf 对象# setMaster  按照什么模式运行,local  bigdata01:7077  yarn#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核#  appName 任务的名字conf = SparkConf().setMaster("local[*]").setAppName("rdd的创建方式")sc = SparkContext(conf=conf)fileRdd = sc.textFile("../../datas/zuoye/HTTP_20130313143750.dat")print(fileRdd.count())filterRdd = fileRdd.filter(lambda line: len(re.split("\t+",line)) == 11 and re.fullmatch(r"1[3-9]\d{9}",re.split("\t+",line)[1]) is not None )print(filterRdd.count())mapRdd = filterRdd.map(lambda line:(re.split("\t+",line)[1],int(re.split("\t+",line)[-3])+int(re.split("\t+",line)[-2])))rsRdd = mapRdd.reduceByKey(lambda sum,num:sum+num)rsRdd.foreach(lambda x:print(x[0],str(round(x[1]/1024,2))+"MB"))# 使用完后,记得关闭sc.stop()

(三)代码解析

  1. 首先,配置了 PySpark 运行所需的环境变量,包括 JAVA_HOMEHADOOP_HOME 以及 Python 解析器路径。
  2. 通过 SparkConf 设置运行模式为本地(local[*])并指定应用名称,然后创建 SparkContext 对象。
  3. 使用 textFile 读取数据文件,得到 fileRdd
  4. 利用 filter 操作过滤数据,先检查数据长度是否为 11,再通过正则表达式验证手机号码格式是否正确,得到 filterRdd
  5. 对 filterRdd 进行 map 操作,提取手机号码和总流量。
  6. 通过 reduceByKey 按手机号码分组并计算总流量。
  7. 最后,使用 foreach 输出每个手机号码及其对应的总流量(转换为 MB 并保留两位小数)。

二、合同数据分析案例

(一)需求分析

        给定合同数据文件,包含合同 ID、客户 ID、合同类型、总金额、合同付款类型、注册时间、购买数量、合同签约时间、购买的产品、是否已经交货等字段。需要查询已交货和未交货的数量分别是多少、购买合同的总金额是多少以及分期付款占全部订单的比例。

(二)代码实现

以下是实现该功能的 PySpark 代码:

import os
import re# 导入pyspark模块
from pyspark import SparkContext, SparkConfclass Contract:def __init__(self,line):# 合同类型, 总金额,合同付款类型,是否已经交货tuple1 = re.split(",",line)self.contract_type=tuple1[2]self.contract_money=int(tuple1[3])self.pay_type=tuple1[4]self.isDelivery=tuple1[-1]def __repr__(self):return "合同类型:%s,总金额:%d,合同付款类型:%s,是否已经交货:%s" % (self.contract_type,self.contract_money,self.pay_type,self.isDelivery)if __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取 conf 对象# setMaster  按照什么模式运行,local  bigdata01:7077  yarn#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核#  appName 任务的名字conf = SparkConf().setMaster("local[*]").setAppName("合同分析")# 假如我想设置压缩# conf.set("spark.eventLog.compression.codec","snappy")# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字sc = SparkContext(conf=conf)print(sc)mapRdd = sc.textFile("../../datas/zuoye/DEMO_CONTRACT.csv") \.filter(lambda line: line.find("合同ID") == -1) \.map(lambda line: Contract(line)) \"""1. 已交货和未交货的数量分别是多少2. 购买合同的总金额是多少3. 分期付款占全部订单的比例"""totalNum = mapRdd.count()deliverNum = mapRdd.filter(lambda contract:contract.isDelivery == '是').count()print("已交货和未交货的数量分别是:",deliverNum,totalNum-deliverNum)gouMaiMoney = mapRdd.filter(lambda contract:contract.contract_type=='购买合同') \.map(lambda contract:contract.contract_money).reduce(lambda sum,money:sum+money)gouMaiMoney2 = mapRdd.filter(lambda contract: contract.contract_type == '购买合同') \.map(lambda contract: contract.contract_money).sum()print("购买合同的总金额是:",gouMaiMoney2)# 第三问fenQiNum = mapRdd.filter(lambda contract:contract.pay_type=='分期付款').count()print("分期付款占全部订单的比例是:",fenQiNum/totalNum)# 使用完后,记得关闭sc.stop()

(三)代码解析

  1. 同样先配置环境变量并创建 SparkContext 对象。
  2. 定义了 Contract 类来封装合同数据的相关字段。
  3. 读取合同数据文件并进行过滤,排除标题行,然后将每行数据映射为 Contract 对象,得到 mapRdd
  4. 对于已交货和未交货数量的统计,先计算总订单数 totalNum,再通过过滤得到已交货订单数 deliverNum,进而得出未交货订单数。
  5. 计算购买合同总金额时,先过滤出购买合同类型的数据,然后提取金额并进行求和操作。
  6. 计算分期付款占比,先统计分期付款订单数 fenQiNum,再除以总订单数 totalNum

三、日志分析案例

(一)需求分析

  1. 统计热门搜索词 Top10,即统计用户搜索每个词出现的次数,然后降序排序取前 10。
  2. 统计所有用户搜索中最大点击次数、最小点击次数、平均点击次数,也就是计算所有用户在所有搜索过程中的最大、最小和平均点击次数。
  3. 统计一天每小时点击量并按照点击量降序排序,即统计每个小时点击的数据量并按降序排列。

(二)jieba分词器

汉语是需要分词的

python语言: Jieba 分词器

Java语言: IK 分词器(好久没更新过了)

安装一下

pip install jieba -i https://pypi.tuna.tsinghua.edu.cn/simple/

没有自定版本,安装的就是最新的版本

使用

语法:jieba.cut(“语句”) / jieba.cut_for_search(“语句”)
全模式:将句子中所有可以组成词的词语都扫描出来, 速度非常快,但可能会出现歧义
jieba.cut("语句", cut_all=True)
精确模式:将句子最精确地按照语义切开,适合文本分析,提取语义中存在的每个词
jieba.cut("语句", cut_all=False)
搜索引擎模式:在精确模式的基础上,对长词再次切分,适合用于搜索引擎分词
jieba.cut_for_search("语句")

测试

import jieba
# 测试一下结巴分词器
str = "中华人民共和国"
list01 = jieba.cut(str, cut_all=True)
# 中华,中华人民,中华人民共和国,华人,人民,人民共和国,共和,共和国
print(",".join(list01))
# 中华人民共和国
list02 = jieba.cut(str, cut_all=False)
for ele in list02:print(ele)# 中华 华人 人民 共和 共和国 中华人民共和国  比全模式少多,比精确模式多,适用于搜索引擎
list03 = jieba.cut_for_search(str)
print(*list03)

(四)代码实现

以下是实现日志分析功能的 PySpark 代码:

import os
import re# 导入pyspark模块
from pyspark import SparkContext, SparkConf
import jiebaif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'C:/Program Files/java/jdk1.8.0_181'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/Linux/hadoop/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取 conf 对象conf = SparkConf().setMaster("local[*]").setAppName("")# 根据配置文件,得到一个 SC 对象,第一个 conf 是形参的名字,第二个 conf 是实参的名字sc = SparkContext(conf=conf)# print(sc)# 清洗数据print("===========清洗数据===========")fileRdd = sc.textFile("../../datas/sogou/sogou.tsv")print(fileRdd.count())print(fileRdd.first())listRdd = fileRdd.map(lambda line: re.split("\\s+", line))filterList = listRdd.filter(lambda l1: len(l1) == 6)# 这个结果只获取而来时间 uid 以及热词,热词将左右两边的[] 去掉了tupleRdd = filterList.map(lambda l1: (l1[0], l1[1], l1[2][1:-1]))# 求热词top10print("===========求热词top10===========")wordRdd = tupleRdd.flatMap(lambda t1: jieba.cut_for_search(t1[2]))filterRdd2 = wordRdd.filter(lambda word: len(word.strip()) != 0 and word != "的").filter(lambda word: re.fullmatch("[\u4e00-\u9fa5]+", word) is not None)# filterRdd2.foreach(print)result = filterRdd2.map(lambda word: (word, 1)).reduceByKey(lambda sum, num: sum + num).sortBy(keyfunc=lambda tup: tup[1], ascending=False).take(10)for ele in result:print(ele)# 统计所有用户搜索中最大点击次数、最小点击次数、平均点击次数print("===========统计所有用户搜索中最大点击次数、最小点击次数、平均点击次数===========")def splitWord(tupl):li1 = jieba.cut_for_search(tupl[2])  # 中国 中华 共和国li2 = list()for word in li1:li2.append(((tupl[1], word), 1))return li2newRdd = tupleRdd.flatMap(splitWord)# newRdd.foreach(print)reduceByUIDAndWordRdd = newRdd.reduceByKey(lambda sum, num: sum + num)# reduceByUIDAndWordRdd.foreach(print)valList = reduceByUIDAndWordRdd.values()print(f"最大点击次数: {valList.max()}")print(f"最小点击次数: {valList.min()}")print(f"中位数: {valList.mean()}")  # 中位数print(f"平均点击次数: {valList.sum() / valList.count()}")# 统计一天每小时点击量并按照点击量降序排序print("===========统计一天每小时点击量并按照点击量降序排序===========")reductByKeyRDD = tupleRdd.map(lambda tup: (tup[0][0:2], 1)).reduceByKey(lambda sum, num: sum + num)sortRdd = reductByKeyRDD.sortBy(keyfunc=lambda tup: tup[1], ascending=False)listNum = sortRdd.take(24)for ele in listNum:print(ele)# 使用完后,记得关闭sc.stop()

(三)代码解析

  1. 配置环境变量后创建 SparkContext 对象。
  2. 定义 getWords 函数,用于将搜索词进行分词并构建 ((用户id,词), 1) 的格式。
  3. 读取日志数据文件,进行数据清洗,排除数据长度不足 6 的行和包含特定违禁词的行,然后提取相关字段得到 mapRdd
  4. 对于热词 Top10 的统计,先对热词进行分词,过滤掉特定词和非中文词,然后映射为 (词, 1) 格式,通过 reduceByKey 统计词频,最后按词频降序排序并取前 10。
  5. 统计最大、最小和平均点击次数时,先通过 flatMap 和 getWords 函数构建 ((用户id,词),点击次数) 格式的数据,过滤掉非中文词和特定词后,通过 reduceByKey 统计点击次数,再获取值并计算相关统计量。
  6. 统计一天每小时点击量时,先提取小时信息并映射为 (小时, 1) 格式,通过 reduceByKey 统计每小时点击量,最后按点击量降序排序并收集结果。

四、常见错误及解决方法

        在运行 PySpark 代码读取数据时,可能会遇到 Caused by: java.net.SocketException: Connection reset by peer: socket write error 错误。

Caused by: java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:477)

        原因是连接数过多,一般在本地 Windows 运行 Spark 代码且读取数据过多,或者代码中使用了 take() 算子时容易出现。解决方法有两种:一是将数据量变小一点,只截取一部分进行测试;二是避免使用 take 算子。

五、总结

        通过以上三个案例,我们详细展示了 PySpark 在不同数据处理场景下的应用。从手机号码流量统计到合同数据分析,再到日志分析,涵盖了数据过滤、映射、分组求和、排序以及特定数据统计等常见操作。同时,也指出了在实际运行代码过程中可能遇到的错误及解决方法。希望读者能够通过这些案例,深入理解 PySpark 的使用技巧,在大数据处理工作中更加得心应手。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/60348.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用GPT-SoVITS训练语音模型

1.项目演示 阅读单句话 1725352713141 读古诗 1725353700203 2.项目环境 开发环境:linux 机器配置如下:实际使用率百分之二十几, 3.开发步骤 1.首先是准备数据集,要求是wav格式,一到两个小时即可, 2.…

Python学习从0到1 day27 Python 高阶技巧 ③ 设计模式 — 单例模式

此去经年,再难同游 —— 24.11.11 一、什么是设计模式 设计模式是一种编程套路,可以极大的方便程序的开发最常见、最经典的设计模式,就是我们所学习的面向对象了。 除了面向对象外,在编程中也有很多既定的套路可以方便开发,我们称之为设计模…

3.2 软件需求:面对过程分析模型

面对过程分析模型 1. 需求分析的模型概述1.1 面对过程分析模型-结构化分析方法1.2 结构化分析的过程 2. 功能模型:数据流图初步2.1 加工2.2 外部实体(数据源点/终点)2.3 数据流2.4 数据存储2.5 注意事项 3. 功能模型:数据流图进阶…

Android Studio 运行模拟器无法打开avd

问题:已经下载了HAXM 打开模拟器时还是提示未下载HAXM,无法打开avd 解决方案: 控制面板 -> 启动或关闭Windows功能,打开图下两项,后重启电脑重启Android Studio:

Qt文件系统-二进制文件读写

实例功能概述 除了文本文件之外,其他需要按照一定的格式定义读写的文件都称为二进制文件。每种格式的二进制文件都有自己的格式定义,写入数据时按照一定的顺写入,读出时也按照相应的顺读出。例如地球物理中常用的SEG-Y格式文件,必…

ARXML汽车可扩展标记性语言规范讲解

ARXML: Automotive Extensible Markup Language (汽车可扩展标记语言) xmlns: Xml name space (xml 命名空间) xsd: Xml Schema Definition (xml 架构定义) 1、XML与HTML的区别,可扩展。 可扩展,主要是…

游戏引擎学习第六天

这节讲的内容比较多: 参考视频:https://www.bilibili.com/video/BV1apmpYVEQu/ XInput 是微软提供的一个 API,用于处理 Windows 平台上 Xbox 控制器(包括有线和无线)及其他游戏控制器的输入。它为开发者提供了一组函数,用于查询控…

vivado+modelsim: xxx is not a function name

xxx is not a function name vivado问题:xxx is not a function name原因 vivado问题:xxx is not a function name 在写verilog modelsim仿真时,遇到error:xxx is not a function name。 原因 该变量xxx在仿真文件里,如下图红框所示&#…

云计算在教育领域的应用

💓 博客主页:瑕疵的CSDN主页 📝 Gitee主页:瑕疵的gitee主页 ⏩ 文章专栏:《热点资讯》 云计算在教育领域的应用 云计算在教育领域的应用 云计算在教育领域的应用 引言 云计算概述 定义与原理 发展历程 云计算的关键技…

立体工业相机提升工业自动化中的立体深度感知

深度感知对仓库机器人应用至关重要,尤其是在自主导航、物品拾取与放置、库存管理等方面。 通过将深度感知与各种类型的3D数据(如体积数据、点云、纹理等)相结合,仓库机器人可以在错综复杂环境中实现自主导航,物品检测…

模拟鼠标真人移动轨迹算法-易语言

一.简介 鼠标轨迹算法是一种模拟人类鼠标操作的程序,它能够模拟出自然而真实的鼠标移动路径。 鼠标轨迹算法的底层实现采用C/C语言,原因在于C/C提供了高性能的执行能力和直接访问操作系统底层资源的能力。 鼠标轨迹算法具有以下优势: 模拟…

JavaWeb——Web入门(8/9)- Tomcat:基本使用(下载与安装、目录结构介绍、启动与关闭、可能出现的问题及解决方案、总结)

目录 基本使用内容 下载与安装 目录结构介绍 启动与关闭 启动 关闭 可能出现的问题及解决方案 问题一:启动时窗口一闪而过 问题二:端口号冲突 问题三:部署应用程序 总结 基本使用内容 Tomcat 服务器在 Java Web 开发中扮演着至关重…

PostgreSQL中如果有Left Join的时候索引怎么加

在PostgreSQL中,当你的查询包含多个LEFT JOIN和WHERE条件时,合理地添加索引可以显著提高查询性能。以下是一些具体的优化步骤和建议: 1. 分析查询 使用 EXPLAIN ANALYZE 命令分析你的查询,了解查询的执行计划,识别出连…

通过DNS服务器架构解释DNS请求过程

在前面的章节,这里,基于PCAP数据包和RFC文档详细介绍了DNS请求和响应的每个字段的含义。但是在现实的网络世界中,DNS请求和响应的数据包是怎么流动的,会经过哪些设备。本文将着重说明一下目前网络空间中DNS请求和响应的流动过程。 当前网络空间中比较常见DNS请求的流程如下…

aspose如何获取PPT放映页“切换”的“持续时间”值

aspose如何获取PPT放映页“切换”的“持续时间”值 项目场景问题描述问题1:从官方文档和资料查阅发现并没有对切换的持续时间进行处理的方法问题2:aspose的依赖包中,所有的关键对象都进行了混淆处理 解决方案1、找到ppt切换的持续时间对应的混…

GIT:如何查找已删除的文件的历史记录

首先你得知道文件的名称和路径 然后打开 gitlab,到项目中,仓库-> 文件 查找文件 复制文件名到可能存在过这个文件的分支当中,就能看到了

自动渗透测试与手动渗透测试

根据《渗透测试中发现的 5 种常见网络安全威胁》报告,渗透测试越来越受欢迎。预计到 2025 年,渗透测试市场规模将达到 45 亿美元。 什么是自动渗透测试? 自动化渗透测试工具可以快速有效地检查系统中是否存在已知的安全问题,即使…

使用elementUI实现表格行拖拽改变顺序,无需引入外部库

前言: 使用vue2element UI,且完全使用原生的拖拽事件,无需引入外部库。 如果表格数据量较大,或需要更多复杂功能,可以考虑使用 vuedraggable库,提供更多配置选项和拖拽功能。 思路: 1. 通过el-table的ro…

WPF Prism框架

Prism 是一个开源框架,专门用于开发可扩展、模块化和可测试的企业级 XAML 应用程序,适用于 WPF(Windows Presentation Foundation)和 Xamarin Forms 等平台。它基于 MVVM(Model-View-ViewModel)设计模式&am…

C#开发流程

注:检查数据库链接 设置搜索 1.新建模块文件夹 对应应用 右键-添加-新建文件夹 2.新建类 在新建模块下右键 新建-类,修改类名称 修改internal为public 新建所需字段,注意类型声明及必填设置 [SugarColumn(IsNullable false)]public strin…