pyspark入门基础详细讲解

1.前言介绍

学习目标:了解什么是Speak、PySpark,了解为什么学习PySpark,了解课程是如何和大数据开发方向进行衔接

使用pyspark库所写出来的代码,既可以在电脑上简单运行,进行数据分析处理,又可以把代码无缝迁移到成百上千的服务器集群上去做分布式计算。

为什么要学习pyspark呢?

总结

2.基础准备

学习目标:掌握pyspark库的安装,掌握pyspark执行环境入口对象的构建,理解pyspark的编程模型。

建议使用国内代理镜像网站下载更快。

 简化代码,本质上是同一个意思,链式结构,链式调用化简程序 基本原则,就是我不管调用什么方法,我的返回值都是同一个对象啊

代码展示:
"""
演示获取pyspark的执行环境入库对象:SparkContext
并通过SparkContext对象获取当前PySpark的版本
"""# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象  setMaster是描写运行模式   setAppName是设置当前Spark任务的名字
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 同一个意思,链式结构,链式调用化简程序
# 基本原则,就是我不管调用什么方法,我的返回值都是同一个对象啊
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

spark需要启动时间,所以代码的运行一小会,3.5.3就是当前spark的运行版本

这个sc非常非常重要哦,后续给大家讲解。

通过sc拿到数据输入,数据处理计算是通过RDD类对象的一系列成员方法来对数据进行计算,然后把结果对外进行输出

我们只需要记住后期写spark代码的三大步,把数据加载进来,对数据进行计算,把结果输出去

总结

3.数据输入

学习目标:理解RDD对象,掌握PySpark数据输入的2种方法。

RDD就和列表等数据容器差不多

python数据容器转RDD对象

parallelize成员方法把数据容器存入RDD对象

如果要查看RDD里面有什么内容,需要用collect()方法

字符串会把每一个字符都拆出来,存入RDD对象,字典仅有key被存入RDD对象

读取文件转RDD对象

总结

4.数据计算

map方法

学习目标:掌握RDD的map方法

map会把传入的每一个参数都返回一个值

你会发现报错了,报错的原因是spark没有找到python解释器

给他指定一条路径,这样就没有问题了。如果指定路径之后还是没有解决的,可能是因为pycharm版本太新,降低版本就行了,建议是pycharm3.10

对于简单函数我们可以使用lambda匿名函数。

结果是一样的

链式调用

总结

flatMap方法

学习目标:掌握RDD的flatMap方法对数据进行计算。

通过map,可以看到尽管我们把数据分成一个一个的,但是还是存在嵌套,依旧被嵌套在list当中

当我们使用了flatMap方法后,发现解除了嵌套

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] ="D:/python/venv/Scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize(["ikun 22","ikun 3","ik unh hhhh"])
# 需求:将RDD数据里面的一个个单词提取出来
rdd2 = rdd.flatMap(lambda x: x.split(" ")) # 使用空格进行切分
print(rdd2.collect())

使用flatMap可以解除内部嵌套,语法与map一样

总结:

reduceByKey方法

学习目标:掌握RDD的reduceByKey方法    

 

二元元组指的是元组里面存储的只有两个元素

KV型的RDD一般是两个元素,把第一个元素当成key,第二个当成value,自动按照key分组,然后根据你传入的逻辑计算value

(v,v)->(v)  意思是传入两个相同类型的参数,返回一个返回值,类型和传入要求一致

自动分组并且组内求和

总结

可以完成按key进行分组,并且组内进行逻辑计算

练习案例1

学习目标:完成使用PySpark进行单词计数的案例

数据文件

取出所有的单词,flatMap是把单词一个一个取出来,map是把单词一行一行取出来,一行是一个列表。

把单词转换成二元元组

完整代码

"""
完成练习案例:单词计数统计
"""
from pyspark import SparkConf,SparkContext
import os
# 1.构建执行环境入口对象
os.environ['PYSPARK_PYTHON'] ="D:/python/venv/Scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 2.读取数据文件
rdd = sc.textFile("D:/word.txt")
# 3.取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 4.将所有单词都转换成二元元组,单词为key,value设置为1
# (hello,1) (spark,1) (itheima,1) (itcast,1)
word_with_one_rdd = word_rdd.map(lambda word: (word,1))
# 5.分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
# 6.打印输出结果
print(result_rdd.collect())

filter方法

学习目标:掌握RDD的filter方法

True被保留,False被丢弃

总结

distinct方法

学习目标:掌握RDD的distinct方法

不需要传入参数,功能简单就是去重操作

总结

sortBy方法

学习目标:掌握RDD的sortBy方法进行内容的排序

接收函数传入参数并且有一个返回值

目前我们没有解除到分布式,就先写上numPartitions=1

之前写过一个读取文件,统计单词的个数,现在让我们对他进行排序

可以自己控制升序或者降序,True升序,False降序

from pyspark import SparkConf,SparkContext
import os
# 1.构建执行环境入口对象
os.environ['PYSPARK_PYTHON'] ="D:/python/venv/Scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 2.读取数据文件
rdd = sc.textFile("D:/word.txt")
# 3.取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 4.将所有单词都转换成二元元组,单词为key,value设置为1
# (hello,1) (spark,1) (itheima,1) (itcast,1)
word_with_one_rdd = word_rdd.map(lambda word: (word,1))
# 5.分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
# 6.对结果进行排序
final_rdd = result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(final_rdd.collect())

总结:

练习案例2

学习目标:完成练习案例2的开发

完整代码:

"""
完成练习案例:json商品统计
"""
# 1.各个城市销售额排名,从小到大
# 2.全部城市,有哪些商品类别在售卖
# 3.北京市有哪些商品类别在售卖
from pyspark import SparkConf,SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] ='D:/python/venv/Scripts/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 需求1:城市销售额排名
# 1.1 读取文件得到RDD
file_rdd = sc.textFile("D:/2222.txt")
# 1.2 取出一个个json字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))# 1.3 将一个个json字符串转换为字典
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# 1.4 取出城市和销售额数据
city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'],int(x['money'])))
# 1.5 按城市分组按销售额聚合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a,b:a+b)
# 1.6 按销售额聚合结果进行排序
result1_rdd = city_result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print("需求1的结果:",result1_rdd.collect())
# 需求2:全部城市有哪些商品类别在售卖
# 2.1 取出全部的商品类别
# 2.2 对全部商品类别进行去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2的结果:",category_rdd.collect())
# 需求3:北京市有哪些商品类别在售卖
# 3.1 过滤北京市的数据
beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
# 3.2 取出全部商品类别
# 3.3 进行商品类别去重
result3_rdd = beijing_data_rdd.map(lambda x: x['category']).distinct()
print("需求3的结果:",result3_rdd.collect())

5.数据输出

输出为python对象

学习目标:掌握将RDD的结果输出为python对象的各类方法

collect算子

reduce算子

reduce和reducebykey的区别是reducebykey是获取key然后组内计算,reduce是单纯的直接计算

take算子

就是取前N个元素

count算子

总结

from pyspark import SparkConf,SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] ='D:/python/venv/Scripts/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备RDD
rdd = sc.parallelize([1,2,3,4,5])
# collect算子,输出RDD为list对象
rdd_list:list = rdd.collect()
print(rdd_list)
print(type(rdd_list))
# reduce算子,对RDD进行两两融合
num = rdd.reduce(lambda a,b: a+b)
print(num)
# take算子,取出RDD前N个元素,组成list返回
take_list = rdd.take(3)
print(take_list)
# count,统计rdd内有多少条数据,返回值为数字
num_count = rdd.count()
print(f"rdd内有{num_count}个元素")
sc.stop()

输出到文件中

学习目标:掌握将RDD的内容输出到文件中,了解如何更改RDD的分区数为1

报错了,原因是配置的问题,接下来我们给他配置

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/60131.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

5. 类加载子系统

一、前言 前面我们了解了字节码文件的大致组成部分,那么 JVM 是如何加载 .class字节码文件的?加载到.class字节码文件后又做了哪些事情呢? 二、类加载子系统初步认识 首先类加载子系统作为虚拟机和外界的一个对接口,主要负责以…

AI 写作(六):核心技术与多元应用(6/10)

一、AI 写作的核心技术概述 AI 写作在当今数字化时代正发挥着越来越重要的作用。它不仅极大地提高了写作效率,还为不同领域带来了创新的可能性。 AI 写作的核心技术主要包括基于模板的文本生成和基于深度学习的文本生成。基于模板的文本生成通常依赖预先设定的模板…

显卡驱动版本过低怎么办?显卡驱动升级的方法

显卡驱动程序是计算机与显卡之间沟通的桥梁,它负责将操作系统发出的指令翻译成显卡可以理解的语言,从而确保图形显示的流畅与高效。当您遇到显卡驱动版本过低的问题时,升级驱动程序不仅能够提升电脑的图形处理能力,还能解决兼容性…

java导出word文件(手绘)

文章目录 代码细节效果图参考资料 代码细节 使用的hutool的WordUtil,WordUtil对poi进行封装,但是这一块的官方封装的很少,很多细节都没有。代码中是常见的绘制段落,标题、表格等常用api Word07Writer writer WordUtil.getWriter(…

UML的另一个主角——用例图

顺序图和类图已经出过单集,本贴要分享的是用例图。 类图https://blog.csdn.net/jsl123x/article/details/143526286?spm1001.2014.3001.5501顺序图https://jslhyh32.blog.csdn.net/article/details/134350587 目录 一.系统 二.参与者 1.主要参与者 2.次要参与…

《TCP/IP网络编程》学习笔记 | Chapter 4:基于TCP的服务器端/客户端(1)

《TCP/IP网络编程》学习笔记 | Chapter 4:基于TCP的服务器端/客户端(1) 《TCP/IP网络编程》学习笔记 | Chapter 4:基于TCP的服务器端/客户端(1)理解TCP和UDPTCP/IP协议栈TCP/IP协议的诞生背景链路层网络层T…

【基于PSINS工具箱】以速度为观测量的SINS/GNSS组合导航,UKF滤波

基于【PSINS工具箱】,提供一个MATLAB例程,仅以速度为观测量的SINS/GNSS组合导航(滤波方式为UKF) 文章目录 工具箱程序简述运行结果 代码程序讲解MATLAB 代码教程:使用UKF进行速度观测1. 引言与基本设置2. 初始设置3. U…

【Vue】Vue3.0(十七)Vue 3.0中Pinia的深度使用指南(基于setup语法糖)

上篇文章: 【Vue】Vue3.0(十一)Vue 3.0 中 computed 计算属性概念、使用及示例 🏡作者主页:点击! 🤖Vue专栏:点击! ⏰️创作时间:2024年11月10日15点23分 文章…

跨境云专线:构建高速、安全的全球业务网络

在企业出海加速的背景下,越来越多的企业需要在全球范围内部署业务,特别是在多个国家和地区之间进行数据传输。然而,跨境网络连接常常面临带宽不足、延迟高、数据安全性差等问题,这给企业的业务运营带来了巨大挑战。为了解决这些问…

分布式——BASE理论

简单来说: BASE(Basically Available、Soft state、Eventual consistency)是基于CAP理论逐步演化而来的,核心思想是即便不能达到强一致性(Strong consistency),也可以根据应用特点采用适当的方…

UE5.4 PCG 获取地形Layer

使用AttributeFilter:属性过滤器 节点 设置地形Layer名称和权重 效果:

使用wordpress搭建简易的信息查询系统

背景 当前有这样的一个需求,要实现让客户能够自助登录系统查询一些个人的信息,市面上没有特别符合我的需求的产品,经过一段时间的研究,想出了一个用wordpress实现简易信息查询系统,有两种方式。 方式一:使…

EasyUI弹出框行编辑,通过下拉框实现内容联动

EasyUI弹出框行编辑,通过下拉框实现内容联动 需求 实现用户支付方式配置,当弹出框加载出来的时候,显示用户现有的支付方式,datagrid的第一列为conbobox,下来选择之后实现后面的数据直接填充; 点击新增:新…

ssm079基于SSM框架云趣科技客户管理系统+jsp(论文+源码)_kaic

毕 业 设 计(论 文) 题目:客户管理系统设计与实现 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本客户管理系统就是在这…

PICO+Unity 用手柄点击UI界面

如果UI要跟随头显,可将Canvas放置到XR Origin->Camera Offset->Main Camera下 1.Canvas添加TrackedDeviceGraphicRaycaster组件 2.EventSystem移动默认的Standard Input Module,添加XRUIInputModule组件 3.(可选)设置射线可…

apt镜像源制作-ubuntu22.04

# 安装必要的软件 sudo apt-get install -y apt-mirror # 编辑/etc/apt/mirror.list,添加以下内容 set base_path /var/spool/apt-mirror # 指定要镜像的Ubuntu发布和组件-null dir jammy-updates main restricted universe multiverse # 镜像的Ubuntu发布和组件的URL-n…

springboot初体验

目录 环境 controller 修改端口号 更改banner图标 运行结果 最核心的:自动装配 环境 jdk17springboot3.3.5maven3.8.2 controller controller层和启动类同级 package com.example.demo.controller; ​ import org.springframework.web.bind.annotation.RequestMapping;…

Q:警告无法解释导入PIL Pylance(reportMisssingIMports)

问题显示: 解决方法: 1.确认安装 Pillow:在 VS Code 的终端中运行以下命令,以确保环境中安装了 Pillow pip install pillow2.选择正确的解释器:在 VS Code 中,按下 CtrlShiftP,输入并选择 “P…

python中常见的8种数据结构之一数组的应用

在Python中,数组是一种常见的数据结构,用于存储一系列相同类型的元素。在实际应用中,数组可以用于解决各种问题。 以下是数组在Python中的一些常见应用: 1. 存储和访问数据:数组可以用于存储和访问一组数据。可以通过…

网络安全——下载并在kali虚拟机上启动Cobalt Strike

目录 一、下载 二、上传文件到kali虚拟机 三、启动服务端 四、启动客户端 一、下载 CobaltStrike4.8汉化版带插件-CSDN博客 下载并解压后 二、上传文件到kali虚拟机 1、打开并运行kali虚拟机,查看kali的ip地址 2、打开xshell,新建连接,连…