【Python】PySpark

前言

Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。

简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。

Spark对Python语言的支持,重点体现在Python第三方库:PySpark

PySpark是由Spark官方开发的Python语言第三方库。

Python开发者可以使用pip程序快速的安装PySpark并像其它第三方库那样直接使用。

在这里插入图片描述

基础准备

安装

同其它的Python第三方库一样,PySpark同样可以使用pip程序进行安装。

pip install pyspark或使用国内代理镜像网站(清华大学源)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark

构建PySpark执行环境入口对象

想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。

PySpark的执行环境入口对象是:类SparkContext的类对象

# 导包
from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)# 打印PySpark的运行版本
print(sc.version)# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

运行需要Java环境,推荐jdk8

PySpark的编程模型

SparkContext类对象,是PySpark编程中一切功能的入口。

PySpark的编程,主要分为如下三大步骤:

在这里插入图片描述

数据输入

PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象

RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)

PySpark针对数据的处理,都是以RDD对象作为载体,即:

  • 数据存储在RDD内
  • 各类数据的计算方法,也都是RDD的成员方法
  • RDD的数据计算方法,返回值依旧是RDD对象

在这里插入图片描述

Python数据容器转RDD对象

PySpark支持通过SparkContext对象的parallelize成员方法,将list/tuple/set/dict/str转换为PySpark的RDD对象

# 导包
from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd1 = sc.parallelize([1, 2, 3])    
rdd2 = sc.parallelize((1, 2, 3))    
rdd3 = sc.parallelize({1, 2, 3})    
rdd4 = sc.parallelize({'key1': 'value1', 'key2': 'value2'}) 
rdd5 = sc.parallelize('hello')  # 输出RDD的内容,需要使用collect()
print(rdd1.collect())   # [1, 2, 3]
print(rdd2.collect())   # [1, 2, 3]
print(rdd3.collect())   # [1, 2, 3]
print(rdd4.collect())   # ['key1', 'key2']
print(rdd5.collect())   # ['h', 'e', 'l', 'l', 'o']# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

注意:

  • 字符串会被拆分出一个个的字符,存入RDD对象
  • 字典仅有key会被存入RDD对象

读取文件转RDD对象

PySpark也支持通过SparkContext入口对象来读取文件,构建出RDD对象。

先提前预备一个txt文件

hello
python
day
# 导包
from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.textFile('E:\\code\\py-space\\8.27\\hello.txt')# 输出RDD的内容,需要使用collect()
print(rdd.collect())    # ['hello', 'python', 'day']# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

数据计算

RDD对象内置丰富的:成员方法(算子)

map算子

将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD

rdd.map(func)
# func: f:(T) -> U
# f: 表示这是一个函数
# (T) -> U 表示的是方法的定义:()表示无需传入参数,(T)表示传入1个参数
# T是泛型的代称,在这里表示 任意类型
# U是泛型的代称,在这里表示 任意类型# (T) -> U : 这是一个函数,该函数接收1个参数,传入参数类型不限,返回一个返回值,返回值类型不限
# (A) -> A : 这是一个函数,该函数接收1个参数,传入参数类型不限,返回一个返回值,返回值类型和传入参数类型一致

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6])# 通过map方法将全部数据乘以10,传入参数为函数
rdd2 = rdd.map(lambda x: x * 10)# 输出RDD的内容,需要使用collect()
print(rdd2.collect())   # [10, 20, 30, 40, 50, 60]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

由于map()的返回值还是RDD对象,可以继续在尾部进行链式调用

rdd3 = rdd.map(lambda x: x * 10).map(lambda x: x + 9)

flatMap算子

对RDD执行map操作,然后进行解除嵌套操作。

在这里插入图片描述

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(['a b c', 'd e f'])# 输出RDD的内容,需要使用collect()
print(rdd.map(lambda x: x.split(' ')).collect())    # [['a', 'b', 'c'], ['d', 'e', 'f']]
print(rdd.flatMap(lambda x:x.split(' ')).collect())   # ['a', 'b', 'c', 'd', 'e', 'f']# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduceByKey算子

针对KV型(二元元组)RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作

rdd.reduceByKey(func)
# func: (V, V) -> V
# 接收2个传入参数(类型要一致),返回一个返回值,返回值类型和传入参数类型要求一致

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])# 输出RDD的内容,需要使用collect()
print(rdd.reduceByKey(lambda a, b: a+b).collect())  # [('b', 3), ('a', 2)]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduceByKey中的聚合逻辑是:比如有[1,2,3,4,5],然后聚合函数是:lambda a,b: a+b

在这里插入图片描述

注意:reduceByKey中接收的函数,只负责聚合,不理会分组;分组是自动by key来分组的

filter算子

过滤想要的数据进行保留。

rdd.filter(func)
# func: (T) -> bool
# 传入一个参数任意类型,返回值必须是True/False,返回是True的数据被保留,False的数据被丢弃

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6])# 输出RDD的内容,需要使用collect()
print(rdd.filter(lambda x: x % 2 == 0).collect())  # [2, 4, 6]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

distinct算子

对RDD数据进行去重,返回新的RDD

rdd.distinct() # 无需传参

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 3, 2, 6])# 输出RDD的内容,需要使用collect()
print(rdd.distinct().collect())  # [6, 1, 2, 3]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

sortBy算子

对RDD数据进行排序,基于你指定的排序依据。

rdd.sortKey(func, ascending=False, numPartitions=1)
# func: (T) -> U:告知按照RDD中的哪个数据进行排序,比如lambda x: x[1]表示按照RDD中的第二列元素进行排序
# ascending:True升序,False降序
# numPartitions:用多少分区排序,全局排序需要设置为1

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([('Aiw', 9), ('Tom', 6), ('Jack', 8), ('Bolb', 5)])# 输出RDD的内容,需要使用collect()
print(rdd.sortBy(lambda x: x[1], ascending=False,numPartitions=1).collect())  # [('Aiw', 9), ('Jack', 8), ('Tom', 6), ('Bolb', 5)]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

数据输出

collect算子

将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象。

rdd.collect()
# 返回值是一个List

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3])rdd_list: list = rdd.collect()print(rdd_list)   # [1, 2, 3]
print(type(rdd_list))   # <class 'list'># 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduce算子

对RDD数据集按照你传入的逻辑进行聚合

rdd.reduce(func)
# func:(T, T) -> T
# 传入2个参数,1个返回值,要求返回值和参数类型一致

在这里插入图片描述

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10))print(rdd.reduce(lambda a, b: a+b))   # 45# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

take算子

取RDD的前N个元素,组合成List进行返回。

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10))rdd_take: list = rdd.take(3)print(rdd_take)   # [1, 2, 3]
print(type(rdd_take))   # <class 'list'># 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

count算子

计算RDD有多少条数据,返回值是一个数字。

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10))rdd_count: int = rdd.count()print(rdd_count)   # 9
print(type(rdd_count))   # <class 'int'># 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

saveAsTextFile算子

将RDD的数据写入文本文件中。支持本地写出、HDFS等文件系统。

注意事项:

在这里插入图片描述

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'
os.environ['HADOOP_HOME'] = 'D:/Hadoop-3.0.0'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10))rdd.saveAsTextFile('./8.27/output') # 运行之前确保输出文件夹不存在,否则报错# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

上述代码输出结果,输出文件夹内有多个分区文件

修改RDD分区为1个

方式一:SparkConf对象设置属性全局并行度为1:

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')
# 设置属性全局并行度为1
conf.set('spark.default.parallelism','1')
# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

方式二:创建RDD的时候设置(parallelize方法传入numSlices参数为1)

rdd = sc.parallelize(range(1, 10), numSlices=1)
rdd = sc.parallelize(range(1, 10), 1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/55381.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LabVIEW是如何控制硬件的?

概述 工程 师 和 科学 家 可以 使用 LabVIEW 与 数千 种 不同 的 硬件 设备 无缝 集成&#xff0c; 并 通过 方便 的 功能 和 跨 所有 硬件 的 一致 编 程 框架 帮助 节省 开发 时间。 内容 通过更简单的系统集成节省开发时间 连接到任何硬件 NI 硬件 第三方硬件 快速找到…

ubuntu18.04复现yolo v8之最终章,realsenseD435i+yolo v8完美运行

背景&#xff1a;上一篇博客我们已经为复现yolov8配置好了环境&#xff0c;如果前面的工作顺利进行&#xff0c;我们已经完成了90%&#xff08;学习类程序最难的是环境配置&#xff09;。 接下来将正式下载yolov8的相关代码&#xff0c;以及进行realsenseD435i相机yolo v8的de…

【学习FreeRTOS】第16章——FreeRTOS事件标志组

1.事件标志组简介 事件标志位&#xff1a;用一个位&#xff0c;来表示事件是否发生 事件标志组是一组事件标志位的集合&#xff0c; 可以简单的理解事件标志组&#xff0c;就是一个整数。 事件标志组的特点&#xff1a; 它的每一个位表示一个事件&#xff08;高8位不算&…

linux入门详解

文章目录 一、引言1.1 开发环境1.2 生产环境1.3 测试环境1.4 操作系统的选择 二、Linux介绍2.1 Linux介绍2.2 Linux的版本2.3 Linux和Windows区别 三、Linux安装3.1 安装VMware3.2 安装Xterm3.3 在VMware中安装Linux3.3.1 选择安装方式3.3.2 指定镜像方式3.3.3 选择操作系统类型…

《C语言编程环境搭建》工欲善其事 必先利其器

C语言编译器 GCC 系列 GNU编译器套装(英语&#xff1a;GNU Compiler Collection&#xff0c;缩写为GCC)&#xff0c;指一套编程语言编译器&#xff0c;常被认为是跨平台编译器的事实标准。原名是&#xff1a;GNU C语言编译器(GNU C Compiler)。 MinGW 又称mingw32 &#xff0c…

DevOps中的持续测试优势和工具

持续测试 DevOps中的持续测试是一种软件测试类型&#xff0c;它涉及在软件开发生命周期的每个阶段测试软件。持续测试的目标是通过早期测试和经常测试来评估持续交付过程的每一步的软件质量。 DevOps中的持续测试流程涉及开发人员、DevOps、QA和操作系统等利益相关者。 持续…

组件库的使用和自定义组件

目录 一、组件库介绍 1、什么是组件 2、组件库介绍 3、arco.design 二、组件库的使用 1、快速上手 2、主题定制 3、暗黑模式 4、语言国际化 5、业务常见问题 三、自定义组件 2、组件开发规范 3、示例实践guide-tip 4、业务组件快速托管 一、组件库介绍 1、什么是…

9个python自动化脚本,PPT批量生成缩略图、添加图片、重命名

引言 最近一番在整理资料&#xff0c;之前买的PPT资源很大很多&#xff0c;但归类并不好&#xff0c;于是一番准备把这些PPT资源重新整理一下。统计了下&#xff0c;这些PPT资源大概有2000多个&#xff0c;一共30多G&#xff0c;一个一个手动整理这个投入产出比也太低了。 作为…

openGauss学习笔记-49 openGauss 高级特性-索引推荐

文章目录 openGauss学习笔记-49 openGauss 高级特性-索引推荐49.1 单query索引推荐49.2 虚拟索引49.3 workload级别索引推荐 openGauss学习笔记-49 openGauss 高级特性-索引推荐 openGauss的索引推荐的功能&#xff0c;共包含三个子功能&#xff1a;单query索引推荐、虚拟索引…

数据结构基础:P3-树(上)----编程作业02:List Leaves

本系列文章为浙江大学陈越、何钦铭数据结构学习笔记&#xff0c;系列文章链接如下&#xff1a; 数据结构(陈越、何钦铭)学习笔记 文章目录 一、题目描述二、整体思路与实现代码 一、题目描述 题目描述&#xff1a; 给定一棵树&#xff0c;按照从上到下、从左到右的顺序列出所有…

【填坑向】MySQL常见报错及处理系列(ERROR! The server quit without updating PID file)

本系列其他文章 【填坑向】MySQL常见报错及处理系列&#xff08;Communications link failure & Access denied for user ‘root‘‘localhost‘&#xff09;_AQin1012的博客-CSDN博客翻一下大致的意思就是默认会按照如下的顺序读取配置文件&#xff0c;我上面贴出的配置文…

⛳ Docker 安装 MySQL

&#x1f38d;目录 ⛳ Docker 安装 MySQL&#x1f69c; 一、搜索 mysql , 查看版本&#x1f3a8; 二、拉取mysql镜像&#x1f463; 三、建立容器的挂载文件&#x1f9f0; 四、创建mysql配置文件&#xff0c;my.conf&#x1f3ed; 五、根据镜像产生容器&#x1f381; 六、远程连…

Java基础 数据结构一【栈、队列】

什么是数据结构 数据结构是计算机科学中的一个重要概念&#xff0c;用于组织和存储数据以便有效地进行访问、操作和管理。它涉及了如何在计算机内存中组织数据&#xff0c;以便于在不同操作中进行查找、插入、删除等操作 数据结构可以看作是一种数据的组织方式&#xff0c;不…

[NLP]深入理解 Megatron-LM

一. 导读 NVIDIA Megatron-LM 是一个基于 PyTorch 的分布式训练框架&#xff0c;用来训练基于Transformer的大型语言模型。Megatron-LM 综合应用了数据并行&#xff08;Data Parallelism&#xff09;&#xff0c;张量并行&#xff08;Tensor Parallelism&#xff09;和流水线并…

docker-maven-plugin直接把镜像推到私有仓库

接着上篇 推送到本地docker 我们已经把服务做成镜像推到docker&#xff0c;也可以通过docker login 私有地址&#xff0c;去push。麻烦 直接上代码 1、pom改动 <properties><docker.registry>eco-registry.XXX.com</docker.repostory><docker.registry…

项目---日志系统

目录 项目系统开发环境核心技术日志系统介绍为什么需要日志系统? 日志系统框架设计日志系统模块划分代码实现通用工具实现日志等级模块实现日志消息模块实现格式化模块实现落地模块实现日志器模块同步日志器异步日志器缓冲区实现异步工作器实现 回归异步日志器模块建造者模式日…

KVM创建虚拟机可访问外网+可使用Xshell等工具连接

创建虚拟机时使用桥接网络模块即可&#xff0c;如下&#xff1a; 1、创建一个存储卷(虚拟机的磁盘) 2、创建虚拟机时选择网络 3、系统安装完成后配置固定IP地址 vi /etc/sysconfig/network-scripts/ifcfg-eth0ONBOOTyes BOOTPROTOstatic IPADDR16.32.15.60 GATEWAY16.32.15.2…

C++--两个数组的dp问题(2)

1.交错字符串 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 给定三个字符串 s1、s2、s3&#xff0c;请判断 s3 能不能由 s1 和 s2 交织&#xff08;交错&#xff09; 组成。 两个字符串 s 和 t 交织 的定义与过程如下&#xff0c;其中每个字符串都…

SpringBoot实现文件上传和下载笔记分享(提供Gitee源码)

前言&#xff1a;这边汇总了一下目前SpringBoot项目当中常见文件上传和下载的功能&#xff0c;一共三种常见的下载方式和一种上传方式&#xff0c;特此做一个笔记分享。 目录 一、pom依赖 二、yml配置文件 三、文件下载 3.1、使用Spring框架提供的下载方式 3.2、通过IOUti…

NIO原理浅析(一)

IO简介 摘抄了下维基百科对IO的定义&#xff0c;Input/Output&#xff0c;输入和输出&#xff0c;通常指数据在存储器或者其他周边设备之间的输出和输入&#xff0c;输入是系统接收到信号或者数据&#xff0c;输出则是从系统发送的信号或数据。 Java IO 读写原理 Java中文件…