大数据分析与应用实验任务九

大数据分析与应用实验任务九

实验目的

  • 进一步熟悉pyspark程序运行方式;

  • 熟练掌握pysaprkRDD基本操作相关的方法、函数,解决基本问题。

实验任务

进入pyspark实验环境,打开命令行窗口,输入pyspark,完成下列任务:

在实验环境中自行选择路径新建以自己姓名拼音命名的文件夹,后续代码中涉及的文件请保存到该文件夹下(需要时文件夹中可以创建新的文件夹)。

一、参考书中相应代码,练习RDD持久性、分区及写入文件(p64、67、80页相应代码)。
1.持久化

迭代计算经常需要多次重复使用同一组数据。下面就是多次计算同一个RDD的例子。

listlzy=["Hadoop","Spark","Hive","Darcy"]
rddlzy=sc.parallelize(listlzy)
print(rddlzy.count())#行动操作,触发一次真正从头到尾的计算
print(','.join(rddlzy.collect()))#行动操作,触发一次真正从头到尾的计算

image-20231123112954850

一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)。针对上面的实例,增加持久化语句以后的执行过程如下:

listlzy=["Hadoop","Spark","Hive","Darcy"]
rdd=sc.parallelize(listlzy)
rdd.cache()#会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成
print(rdd.count())#第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中
print(','.join(rdd.collect()))#第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd

image-20231123113247048

2.分区
  • 设置分区的个数

    在调用textFile()和parallelize()方法的时候手动指定分区个数即可,语法格式如下:

    sc.textFile(path,partitionNum)
    

    其中,path参数用于指定要加载的文件的地址,partitionNum参数用于指定分区个数。下面是一个分区的实例。

    listlzy=[5,2,0,1,3,1,4]
    rddlzy=sc.parallelize(listlzy,2)//设置两个分区
    

    image-20231123113420685

  • 使用repartition方法重新设置分区个数

    通过转换操作得到新RDD时,直接调用repartition方法即可。例如

datalzy=sc.parallelize([1,2,3,4,5],2)
len(datalzy.glom().collect())#显示datalzy这个RDD的分区数量
rdd = datalzy.repartition(1) #对 data 这个 RDD 进行重新分区
len(rdd.glom().collect()) #显示 rdd 这个 RDD 的分区数量

image-20231123113519540

  • 自定义分区方法

    下面是一个实例,要求根据 key 值的最后一位数字将 key 写入到不同的文件中,比如,10 写入到 part-00000,11 写入到 part-00001,12 写入到 part-00002。打开一个 Linux 终端,使用 vim 编辑器创建一个代码文件“/root/Desktop/luozhongye/TestPartitioner.py”,输入以下代码:

    from pyspark import SparkConf, SparkContextdef MyPartitioner(key):print("MyPartitioner is running")print('The key is %d' % key)return key % 10def main():print("The main function is running")conf = SparkConf().setMaster("local").setAppName("MyApp")sc = SparkContext(conf=conf)data = sc.parallelize(range(10), 5)data.map(lambda x: (x, 1)).partitionBy(10, MyPartitioner).map(lambda x: x[0]).saveAsTextFile("file:///root/Desktop/luozhongye/partitioner")if __name__ == '__main__':main()

使用如下命令运行 TestPartitioner.py:

cd /root/Desktop/luozhongye
python3 TestPartitioner.py

或者,使用如下命令运行 TestPartitioner.py:

cd /root/Desktop/luozhongye
spark-submit TestPartitioner.py

程序运行后会返回如下信息:

image-20231123114351343

3.文件数据写入
  • 把 RDD 写入到文本文件中
textFile = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
textFile.saveAsTextFile("file:///root/Desktop/luozhongye/writeback")

其中wordlzy.txt的内容

Hadoop is good
Spark is fast
Spark is better
luozhongye is handsome

image-20231123115053912

Spark 采用惰性机制。可以使用如下的“行动”类型的操作查看 textFile 中的内容:

textFile.first()

正因为 Spark 采用了惰性机制,在执行转换操作的时候,即使输入了错误的语句,pyspark 也不会马上报错,而是等到执行“行动”类型的语句启动真正的计算时,“转换”操作语句中的错误才会显示出来,比如:

textFile = sc.textFile("file:///root/Desktop/luozhongye/wordcount/word123.txt")

image-20231123115210529

  • 把 RDD 写入到文本文件中

可以使用 saveAsTextFile()方法把 RDD 中的数据保存到文本文件中。下面把 textFile 变量中的内容再次写回到另外一个目录 writeback 中,命令如下:

textFile = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
textFile.saveAsTextFile("file:///root/Desktop/luozhongye/writeback")

进入到“/root/Desktop/luozhongye/writeback”目录查看

cd /root/Desktop/luozhongye/writeback
ls

image-20231123115411983

二、逐行理解并运行4.4.2实例“文件排序”。

新建多个txt文件file1.txt 、file2.txt 、file3.txt ,其内容分别如下:

33
37
12
40
4
16
39
5
1
45
25

要求读取所有文件中的整数,进行排序后,输出到一个新的文件中,输出的内容为每行两个整数,第一个整数为第二个整数的排序位次,第二个整数为原待排序的整数。

实现上述功能的代码文件“/root/Desktop/luozhongye/FileSort.py”的内容如下:

#!/usr/bin/env python3 
from pyspark import SparkConf, SparkContextindex = 0def getindex():global indexindex += 1return indexdef main():conf = SparkConf().setMaster("local[1]").setAppName("FileSort")sc = SparkContext(conf=conf)lines = sc.textFile("file:///root/Desktop/luozhongye/file*.txt")index = 0result1 = lines.filter(lambda line: (len(line.strip()) > 0))result2 = result1.map(lambda x: (int(x.strip()), ""))result3 = result2.repartition(1)result4 = result3.sortByKey(True)result5 = result4.map(lambda x: x[0])result6 = result5.map(lambda x: (getindex(), x))result6.foreach(print)result6.saveAsTextFile("file:///root/Desktop/luozhongye/sortresult")if __name__ == '__main__':main()

image-20231123232005765

三、完成p96实验内容3,即“编写独立应用程序实现求平均值问题”,注意每位同学自己修改题目中的数据。

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个字段是学生名字,第二个字段是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。

数学成绩.txt:

小罗 110
小红 107
小新 100
小丽 99

英语成绩.txt:

小罗 95 
小红 81 
小新 82
小丽 76

政治成绩.txt:

小罗 65 
小红 71 
小新 61 
小丽 66

408成绩.txt:

小罗 100
小红 103
小新 94
小丽 110

实现代码如下:

from pyspark import SparkConf, SparkContext# 初始化Spark配置和上下文
conf = SparkConf().setAppName("AverageScore")
sc = SparkContext(conf=conf)# 读取数学成绩文件
math_rdd = sc.textFile("数学成绩.txt").map(lambda x: (x.split()[0], int(x.split()[1])))# 读取英语成绩文件
english_rdd = sc.textFile("英语成绩.txt").map(lambda x: (x.split()[0], int(x.split()[1])))# 读取政治成绩文件
politics_rdd = sc.textFile("政治成绩.txt").map(lambda x: (x.split()[0], int(x.split()[1])))# 读取408成绩文件
computer_rdd = sc.textFile("408成绩.txt").map(lambda x: (x.split()[0], int(x.split()[1])))# 合并所有成绩数据
all_scores_rdd = math_rdd.union(english_rdd).union(politics_rdd).union(computer_rdd)# 计算每个学生的成绩总和和成绩数量
sum_count_rdd = all_scores_rdd.combineByKey(lambda value: (value, 1),lambda acc, value: (acc[0] + value, acc[1] + 1),lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))# 计算平均成绩
average_scores_rdd = sum_count_rdd.mapValues(lambda x: x[0] / x[1])# 输出到新文件
average_scores_rdd.saveAsTextFile("平均成绩")# 关闭Spark上下文
sc.stop()

image-20231123233508387

实验心得

在这次实验中,我进一步熟悉了使用PySpark进行大数据处理和分析的方法,并深入了解了PySpark RDD的基本操作。学会了分区、持久化、数据写入文件,并解决实际问题。这次实验让我对PySpark有了更深入的理解,并增强了我处理和分析大数据的能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/165570.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis入门教程

1. 什么是NoSql NoSQL一词最早出现于1998年,是Carlo Strozzi开发的一个轻量、开源、不提供SQL功能的关系数据库。2009年,Last.fm的Johan Oskarsson发起了一次关于分布式开源数据库的讨论,来自Rackspace的Eric Evans再次提出了NoSQL的概念&am…

编程参考 - C++ Code Review: 一个计算器的项目

GitHub - jroelofs/calc: Toy Calculator Toy Calculator 1,拿到一个project,第一眼看,没有配置文件,说明没有引入持续集成系统,continuous integration system。 2,然后看cmake文件,使用的子…

使用Python的turtle模块绘制钢铁侠图案

1.1引言: 在Python中,turtle模块是一个非常有趣且强大的工具,它允许我们以一个可视化和互动的方式学习编程。在本博客中,我们将使用turtle模块来绘制钢铁侠的图案。通过调用各种命令,我们可以引导turtle绘制出指定的图…

CrystalDiskInfo/CrystalDiskMark/DiskGenius系统迁移

CrystalDiskInfo 主要用于看硬盘的各种信息,包括但不限于硬盘通电时间、通电次数、硬盘好坏状态 CrystalDiskMark 主要用于测试硬盘的读写速度、连续读写速度 DiskGenius 主要用于通过U盘装操作系统后进行,磁盘分区,更改磁盘名、隐藏部分…

案例014:Java+SSM+uniapp+mysql基于微信小程序的健身管理系统

文末获取源码 开发语言:Java 框架:SSM JDK版本:JDK1.8 数据库:mysql 5.7 开发软件:eclipse/myeclipse/idea Maven包:Maven3.5.4 小程序框架:uniapp 小程序开发软件:HBuilder X 小程序…

【机器学习 | ARIMA】经典时间序列模型ARIMA定阶最佳实践,确定不来看看?

🤵‍♂️ 个人主页: AI_magician 📡主页地址: 作者简介:CSDN内容合伙人,全栈领域优质创作者。 👨‍💻景愿:旨在于能和更多的热爱计算机的伙伴一起成长!!&…

SpringBoot:ch02 配置文件(日志)

前言 简单介绍 Spring Boot 中常见的配置文件类型&#xff0c;如 application.properties 和 application.yml 等&#xff0c;并说明它们各自的特点和用途。 一、前期准备 1、新建项目&#xff0c;结构如下 2、添加依赖 <?xml version"1.0" encoding"UTF…

单片机语音芯片开发要解决的问题

在单片机语音芯片开发过程中&#xff0c;可能会遇到多种问题&#xff0c;这些问题可能来自于技术层面&#xff0c;也可能来自于芯片本身的设计和应用层面。下面让我们具体从芯片的功耗、语音识别的准度、芯片的尺寸和芯片的可靠性四个方面开展讨论。 1.芯片的功耗问题 首先&a…

【AIGC重塑教育】AI大爆发的时代,未来的年轻人怎样获得机会和竞争力?

目录 AI浪潮来袭 AI与教育 AI的优势 延伸阅读 推荐语 ​作者&#xff1a;刘文勇 来源&#xff1a;IT阅读排行榜 本文摘编自《AIGC重塑教育&#xff1a;AI大模型驱动的教育变革与实践》&#xff0c;机械工业出版社出版 AI浪潮来袭 这次&#xff0c;狼真的来了。 AI正迅猛地…

81基于matlab GUI的图像处理

基于matlab GUI的图像处理&#xff0c;功能包括图像颜色处理&#xff08;灰度图像、二值图像、反色变换、直方图、拉伸变换&#xff09;&#xff1b;像素操作&#xff08;读取像素、修改像素&#xff09;、平滑滤波&#xff08;均值平滑、高斯平滑、中值平滑&#xff09;、图像…

Java多线程之线程安全问题

文章目录 一. 线程安全概述1. 什么是线程安全问题2. 一个存在线程安全问题的程序 二. 线程不安全的原因和线程加锁1. 案例分析2. 线程加锁2.1 理解加锁2.2 synchronized的使用2.3 再次分析案例 3. 线程不安全的原因 三. 线程安全的标准类 一. 线程安全概述 1. 什么是线程安全问…

基于C#实现赫夫曼树

赫夫曼树又称最优二叉树&#xff0c;也就是带权路径最短的树&#xff0c;对于赫夫曼树&#xff0c;我想大家对它是非常的熟悉&#xff0c;也知道它的应用场景&#xff0c;但是有没有自己亲手写过&#xff0c;这个我就不清楚了&#xff0c;不管以前写没写&#xff0c;这一篇我们…

【LeetCode刷题笔记】DFSBFS(二)

994. 腐烂的橘子(树/图的BFS问题) 解题思路: 多源BFS ,首选找到 所有的腐烂的橘子 ,放入队列中,然后进行 BFS 广搜,广搜的 层数 - 1 就是所需要花费的分钟数。 在最开始先扫描一遍二维数组,将所有的 腐烂的橘子 加入 队列 ,同时统计新鲜橘子的数量 <

Blender烘焙AO操作及对应的python代码

&#xff08;一&#xff09;Blender软件操作 1. 导入模型&#xff08;这里省略&#xff09; 2. 材质设置 模型使用的所有材质都需要删除Surface Shader&#xff0c;没有其他多余的计算&#xff0c;可以大量缩短烘焙时间。删除之后的只留下一个材质输出节点&#xff0c;如图所…

CentOS Stream 9系统Cgroup问题处理

安装docker容器启动失败 之前适配过Ubuntu系统的容器&#xff0c;由于版本比较高&#xff0c;没有挂载Cgroup的路径。这次使用Centos Stream 9系统安装docker容器时也遇到了这个情况。由于处理方式有些不一样&#xff0c;所以记录一下。 这是docker容器启动过报错的输出日志。…

Haclon简介及数据类型

Haclon简介 HALCON是由德国MVtec公司开发的机器视觉算法包&#xff0c;它由一千多个各自独立的函数&#xff08;算子&#xff09;构成&#xff0c;其中除了包含各类滤波、色彩以及几何、数学转换、形态学计算分析、图像校正&#xff0c;目标分类辨识、形状搜寻等基本的图像处理…

C/C++文件操作————写文件与读文件以及通讯录的改进 (保姆级教学)

个人主页&#xff1a;点我进入主页 专栏分类&#xff1a;C语言初阶 C语言程序设计————KTV C语言小游戏 C语言进阶 C语言刷题 欢迎大家点赞&#xff0c;评论&#xff0c;收藏。 一起努力&#xff0c;一起奔赴大厂。 目录 1.前言 2.写文件函数与读文件函数 …

多个JDK版本可以吗:JDK17、JDK19、JDK1.8轻松切换(无坑版)小白也可以看懂

多个版本JDK切换 多个JDK&#xff1a;JDK17、JDK19、JDK1.8轻松切换&#xff08;无坑版&#xff09;小白也可以看懂 提示&#xff1a;看了网上很多教程&#xff0c;5w观看、32w观看、几千观看的&#xff0c;多多少少带点坑&#xff0c;这里我就把踩过的坑都给抹了 文章目录 多…