(五)Spark大数据开发实战:灵活运用PySpark常用DataFrame API

目录

一、PySpark

二、数据介绍

三、PySpark大数据开发实战

1、数据文件上传HDFS

2、导入模块及数据

3、数据统计与分析

①、计算演员参演电影数

②、依次罗列电影番位前十的演员

③、按照番位计算演员参演电影数

④、求每位演员所有参演电影中的最早、最晚上映时间及其相隔天数、年数

⑤、求每位演员所有电影中的评分最高值、最低值、电影数量、评分均值、标准差、方差、最高最低评分之差值

⑥、求参演大于等于10部电影的每位演员的平均评分,计算规则:去掉一个最高分和一个最低分,然后再计算电影平均分

⑦、求投票数在所有电影中排前80%、评分在所有电影中排前20%的电影信息

⑧、求美国、中国(含港澳台)、英国、法国、俄罗斯5个国家各个电影类型的上映电影数目

⑨、求各个地区评分最高的电影,若并列第一则以“|”拼接

⑩、统计从数据中最早年份到最晚年份的每月上映电影数量,若某个月份无电影上映则数量为0

4、下载统计结果

四、总结


一、PySpark

Apache Spark是一个用于大数据处理的开源分布式计算框架,而PySpark则是Spark的Python 实现。PySpark允许使用Python编程语言来利用Spark的强大功能,使得开发人员能够利用Python的易用性和灵活性进行大规模数据处理和分析。

PySpark与Spark-Scala的对比:

1、语言选择:
PySpark: 使用简洁而易学的Python作为编程语言,这使得PySpark学习难度大大降低。
Spark-Scala: 使用Scala作为主要编程语言。Scala是一门运行在Java虚拟机上的多范式编程语言,更接近Java,并具有强大的面向对象和函数式编程特性,但是其学习曲线较陡。

2、性能:
PySpark:由于Python是解释型语言,相比Scala的原生Spark可能会有性能上的一些损失。但通过PySpark的DataFrame和优化技术也可以显著提高性能。
Spark-Scala:使用Scala编写的Spark应用程序可能在性能上略优于PySpark,因为Scala是一门静态类型语言,而且运行在Java虚拟机上。

4、生态系统支持:
PySpark:可与Python的生态系统(如NumPy、Pandas)以及其他大数据工具和库进行集成。
Spark-Scala:由于运行在JVM上,可以利用Java生态系统,但Scala本身的生态系统相对较小。

5、机器学习支持:
PySpark: 提供了MLlib库,支持在分布式环境中进行大规模的机器学习任务。
Spark-Scala: 同样支持MLlib,但在API的使用上可能相对繁琐一些。

总体而言,PySpark强于数据分析,Spark-Scala强于性能。如果应用场景有非常多的可视化和机器学习算法需求,推荐使用pyspark,可以更好地和python中的相关库配合使用。

本文软件环境如下:

操作系统:CentOS Linux 7

Hadoop版本:3.1.3,安装教程可见我另一篇博客内容:Linux CentOS安装Hadoop3.1.3(单机版)详细教程

Spark版本:3.5.2,安装教程可见我另一篇博客内容:Linux CentOS安装PySpark3.5(单机版)详细教程及机器学习实战

Python版本:Python(Anaconda)3.11.4

PySpark基础学习可看 PySpark系列文章:

(一)PySpark3:安装教程及RDD编程

(二)PySpark3:SparkSQL编程

(三)PySpark3:SparkSQL40题

(四)PySpark3:Mlib机器学习实战-信用卡交易数据异常检测


二、数据介绍

本文数据来自采集豆瓣网分类排行榜 (“https://movie.douban.com/chart”)中各分类类别所有电影的相关信息并存储为csv文件。

爬虫代码在我另一篇博客:豆瓣电影信息爬取与可视化分析

数据放在了百度云上:https://pan.baidu.com/s/1YWB2iEOsMmXHkEUFpY2_TA?pwd=ej3z

数据如下图所示,包含电影名、上映日期、上映地区、类型、豆瓣链接、参演演员、演员数、评分、打分人数,共有3357部电影:

三、PySpark大数据开发实战

1、数据文件上传HDFS

首先通过xftp上传linux服务器,然后通过以下命令上传至HDFS:

hdfs dfs -mkdir /data
hdfs dfs -mkdir /output
hdfs dfs -put film_info.csv /data

2、导入模块及数据

使用SparkSession.builder.config(conf=SparkConf()).getOrCreate()创建Spark会话。使用spark.read.csv()读取CSV文件,并设置header=True以识别首行为列名,inferSchema=True自动推断数据类型。

from pyspark import SparkConf, SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql.window import Window# 主程序:
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()df = spark.read.csv("/data/film_info.csv", header=True, inferSchema=True)

3、数据统计与分析

①、计算演员参演电影数

以下代码中使用了spark sql进行统计,也可以通过DataFrame API进行统计:

df.groupBy("actor").agg(count("title").alias("act_film_num"))
# 按分隔符切分列表
df_split = df.withColumn("actors", F.split(df["actors"], "\|")) \.withColumn("types", F.split(df["types"], "\|")) \.withColumn("regions", F.split(df["regions"], "\|"))# 演员:拆分多行
df_exploded = df_split.withColumn("actor", F.explode(F.col("actors")))
df_exploded.drop(*["actors", "regions", "types"]).createOrReplaceTempView("actor_exploded")df1 = spark.sql('''select actor,count(*) as act_film_numfrom actor_exploded group by actor''')
df1.sort(df1["act_film_num"].desc()).repartition(1).write.mode("overwrite").option("header", "true").csv("/output/result1.csv")

结果如下:

+-------------+------------+
|        actor|act_film_num|
+-------------+------------+
|       童自荣|          43|
|     户田惠子|          37|
|         林雪|          33|
|       张国荣|          32|
|       刘德华|          31|
|       周星驰|          31|
|         成龙|          31|
|       任达华|          31|
|         刘洵|          30|
|塞缪尔·杰克逊|          29|
|  汤姆·汉克斯|          29|
|       梁家辉|          28|
|       吴孟达|          28|
|       梁朝伟|          27|
|      斯坦·李|          27|
|       吴君如|          27|
|    威廉·达福|          27|
|       黄秋生|          27|
|       胡立成|          27|
|  布拉德·皮特|          26|
+-------------+------------+
only showing top 20 rows
②、依次罗列电影番位前十的演员

这一题考察了窗口函数、行转列等等。

# 定义窗口函数,按电影标题和演员顺序排序
windowSpec = Window.partitionBy("title").orderBy("actors")
# 添加序号列
df2 = df_exploded.withColumn("rank", F.row_number().over(windowSpec))
# 过滤出前10个演员
rank_num = 10
rank_num_list = [str(i + 1) for i in range(rank_num)]
# 将演员重新组合成单行
df2_tmp1 = df2.groupBy("title").pivot("rank", rank_num_list).agg(F.collect_list("actor"))
df2_tmp2 = df2_tmp1.select("title", *[F.col(f"{i + 1}")[0].alias(f"actor{i + 1}") for i in range(rank_num)])
df2_tmp2.repartition(1).write.mode("overwrite").option("header", "true").csv("/output/result2.csv")

结果如下:

+------------------------+-------------------+---------------------+------------------+---------------+-----------------+----------------------+-------------------+---------------------+-----------------+-----------------+
|                   title|             actor1|               actor2|            actor3|         actor4|           actor5|                actor6|             actor7|               actor8|           actor9|          actor10|
+------------------------+-------------------+---------------------+------------------+---------------+-----------------+----------------------+-------------------+---------------------+-----------------+-----------------+
|                 101忠狗|          罗德·泰勒|            凯特·鲍尔|           本·怀特|    丽莎·戴维斯| 贝蒂·洛乌·格尔森|         J·帕特·奥马利|      玛莎·温特沃思|      大卫·弗兰克海姆|弗莱德里克·沃洛克|        汤姆·康威|
|                   11:14|        亨利·托马斯|          布莱克·赫伦|       芭芭拉·赫希|  克拉克·格雷格|    希拉里·斯万克|           肖恩·海托西|      斯塔克·桑德斯|          科林·汉克斯|        本·福斯特|  帕特里克·斯威兹|
|                    2012|        约翰·库萨克|          阿曼达·皮特|     切瓦特·埃加福|      坦迪·牛顿|    奥利弗·普莱特|           汤姆·麦卡锡|        伍迪·哈里森|          丹尼·格洛弗|      连姆·詹姆斯|        摩根·莉莉|
|                    2046|             梁朝伟|               章子怡|              王菲|       木村拓哉|             巩俐|                刘嘉玲|               张震|               张曼玉|             董洁|      通猜·麦金泰|
|                    21克|            西恩·潘|          娜奥米·沃茨|本尼西奥·德尔·托罗|  夏洛特·甘斯布|      梅丽莎·里奥|         迈克尔·芬内尔|     

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/58787.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringFactoriesLoader

1.什么是SPI (面试题) SPI全名Service Provider interface,翻译过来就是“服务提供接口”,再说简单就是提供某一个服务的接口, 提供给服务开发者或者服务生产商来进行实现。 Java SPI 是JDK内置的一种动态加载扩展点的实现。 这个机制在一…

Apifox 10月更新|测试步骤支持添加脚本和数据库操作、测试场景支持回收站、变量支持「秘密」类型

Apifox 新版本上线啦! 看看本次版本更新主要涵盖的重点内容,有没有你所关注的功能特性: 自动化测试模块能力持续升级 测试步骤支持添加「脚本」和「数据库操作」 测试场景和定时任务支持回收站内恢复 定时任务支持设置以分钟频率运行 导入…

「C/C++」C++标准库之#include<fstream>文件流

✨博客主页何曾参静谧的博客📌文章专栏「C/C」C/C程序设计📚全部专栏「VS」Visual Studio「C/C」C/C程序设计「UG/NX」BlockUI集合「Win」Windows程序设计「DSA」数据结构与算法「UG/NX」NX二次开发「QT」QT5程序设计「File」数据文件格式「PK」Parasoli…

liunx网络套接字 | 实现基于tcp协议的echo服务

前言:本节讲述linux网络下的tcp协议套接字相关内容。博主以实现tcp服务为主线,穿插一些小知识点。以先粗略实现,后精雕细琢为思路讲述实现服务的过程。下面开始我们的学习吧。 ps:本节内容建议了解网络端口号的友友们观看哦。 目录…

uni-app 运行HarmonyOS项目

1. uni-app 运行HarmonyOS项目 文档中心 1.1. HarmonyOS端 1.1.1. 准备工作 (1)下载DevEco Studio开发工具。   (2)在 DevEco Studio 中打开任意一个项目(也可以新建一个空项目)。   (3&…

WPF+MVVM案例实战(十三)- 封装一个自定义消息弹窗控件(上)

文章目录 1、案例效果2、功能实现1、创建文件2、资源文件获取3、枚举实现3、弹窗实现1、界面样式实现2、功能代码实现4、总结1、案例效果 2、功能实现 1、创建文件 打开 Wpf_Examples 项目,我们在用户控件类库中创建一个窗体文件 SMessageBox.xaml,同时创建枚举文件夹 Enum…

Unity BesHttp插件修改Error log的格式

实现代码 找到插件的 UnityOutput.cs 然后按照需求替换为下面的代码即可。如果提示 void ILogOutput.Flush() { } 接口不存在,删除这行代码即可。 using Best.HTTP.JSON.LitJson; using System; using System.Collections.Generic; using UnityEngine; using Syst…

Python热化学固态化学模型模拟

🎯要点 使用热化学方式,从材料项目数据库获得热力学数据构建固态材料无机合成模拟模型。固态反应网络是热力学相空间的模型,使得能够纳入简单的反应动力学行为。反应坐标图可视为加权有向图,其表示出热力学相空间的密集连接模型。…

详解软件设计中分库分表的几种实现以及应用示例

详解软件设计中分库分表的几种实现以及应用示例https://mp.weixin.qq.com/s?__bizMzkzMTY0Mjc0Ng&mid2247485108&idx1&sn8b3b803c120c163092c70fa65fe5541e&chksmc266aaa1f51123b7af4d7a3113fe7c25daa938a04ced949fb71a8b7773e861fb93d907435386#rd

简缩极化模型+简缩极化求解用优化的方法,也需要保证方程和未知数个数

一个定标器可以得到一个复数矢量,4个实数方程 而模型中我们有,每个定标器有不同的A和φ (两个实数)和相同的R和δc (4个复数)

多浏览器同步测试工具的设计与实现

在做Web兼容测试时,测试人员往往需要在不同浏览器上重复执行相同的操作。 现有自动化录制手段,其实是后置的对比,效率与反馈都存在延迟,执行过程相对是黑盒的,过程中如果测试人员没细化到具体的校验点,即使…

Google Recaptcha V2 简单使用

最新的版本是v3,但是一直习惯用v2,就记录一下v2 的简单用法,以免将来忘记了 首先在这里注册你域名,如果是本机可以直接直接填 localhost 或127.0.0.1 https://www.google.com/recaptcha/about/ 这是列子 网站密钥:是…

【初识Linux】

寻不到花的折翼枯叶蝶,永远也看不见凋谢............................................................................. 文章目录 前言 一、【基本指令】 1、ls 2、pwd 3、cd 4. touch 5.mkdir 6.rmdir 7、rm 8.man 9.cp 10、mv 11、cat 12、tac 13、more 14、le…

操作系统知识要点

一.操作系统的特性 1.并发性 在多道程序环境下,并发性是指在一段时间内,宏观上有多个程序同时运行,但实际上在单CPU的运行环境,每一个时刻只有一个程序在执行。 因此,从微观上来说,各个程序是交替、轮流…

jenkins搭建及流水线配置

1.安装docker curl https://mirrors.aliyun.com/repo/Centos-7.repo >> CentOS-Base-Aliyun.repomv CentOS-Base-Aliyun.repo /etc/yum.repos.d/yum -y install yum-utils device-mapper-persistent-data lvm2yum-config-manager --add-repo http://mirrors.aliyun.com/…

混沌接口压测利器Fortio:从TCP/UDP到gRPC,全方位覆盖云原生应用性能测试

#作者: 西门吹雪 文章目录 Fortio 安装docker 安装:MacOS安装:linux安装:对于http负载生成最重要的标志:Fortio server 功能 TCPUDPgRPC负载测试gRPC 负载测试在k8s或者容器中使用fortio进行压测fortio 直接在docker中作为sidecar使用 Fortio是一个微服…

【笔记】数据结构与算法

参考链接:数据结构(全) 参考链接:数据结构与算法学习笔记 一些PPT的整理,思路很不错,主要是理解角度吧,自己干啃书的时候结合一下会比较不错 0.总论 1.数据 注:图是一种数据结构!!…

leetcode - 684. 冗余连接

684. 冗余连接 解决思路 大致上的思路就是将元素加入到 并查集 中,那么在遍历到边的时候先去判断的边的两个端点的 根节点 是否相等,如果相等,那么就代表此刻把这条边加上去就形成了环【可以这么理解,如果形成了环,那…

【力扣打卡系列】二叉树·灵活运用递归

坚持按题型打卡&刷&梳理力扣算法题系列,语言为go,Day16 相同的树 题目描述 解题思路 边界条件,其中一个节点为空,return 只有p和q均为空才返回true,因此可以简写为pqreturn,先判断节点值是否一样&…

创建一个基于SSM框架的药品商超管理系统

创建一个基于SSM(Spring Spring MVC MyBatis)框架的药品商超管理系统是一个涉及多个步骤的过程。以下是一个详细的开发指南,包括项目结构、数据库设计、配置文件、Mapper接口、Service层、Controller层和前端页面的示例。 1. 需求分析 明…