(一)PySpark3:安装教程及RDD编程(非常详细)

目录

一、pyspark介绍

二、PySpark安装

三、RDD编程

1、创建RDD

2、常用Action操作

①collect

②take

③takeSample

④first

⑤count

⑥reduce

⑦foreach

⑧countByKey

⑨saveAsTextFile

3、常用Transformation操作

①map

②filter

③flatMap

④sample

⑤distinct

⑥subtract

⑦union

⑧intersection

⑨cartesian

⑩sortBy

⑪zip

⑫zipWithIndex

4、常用Transformation操作(键值对)

①reduceByKey

②groupByKey

③sortByKey

④join / leftOuterJoin / rightOuterJoin

⑤cogroup

⑥subtractByKey

⑦foldByKey

5、分区操作

①glom

②HashPartitioner

③mapPartitions / mapPartitionsWithIndex

④coalesce

⑤repartition

⑥partitionBy

6、缓存操作

①cache

②persist

③checkpoint

7、共享变量

①broadcast

②accumulator

四、总结


一、pyspark介绍

Apache Spark是一个用于大数据处理的开源分布式计算框架,而PySpark则是Spark的Python 实现。PySpark允许使用Python编程语言来利用Spark的强大功能,使得开发人员能够利用Python的易用性和灵活性进行大规模数据处理和分析。


PySpark与Spark-Scala的对比:

1、语言选择:
PySpark: 使用简洁而易学的Python作为编程语言,这使得PySpark学习难度大大降低。
Spark-Scala: 使用Scala作为主要编程语言。Scala是一门运行在Java虚拟机上的多范式编程语言,更接近Java,并具有强大的面向对象和函数式编程特性,但是其学习曲线较陡。

2、性能:
PySpark:由于Python是解释型语言,相比Scala的原生Spark可能会有性能上的一些损失。但通过PySpark的DataFrame和优化技术也可以显著提高性能。
Spark-Scala:使用Scala编写的Spark应用程序可能在性能上略优于PySpark,因为Scala是一门静态类型语言,而且运行在Java虚拟机上。

4、生态系统支持:
PySpark:可与Python的生态系统(如NumPy、Pandas)以及其他大数据工具和库进行集成。
Spark-Scala:由于运行在JVM上,可以利用Java生态系统,但Scala本身的生态系统相对较小。

5、机器学习支持:
PySpark: 提供了MLlib库,支持在分布式环境中进行大规模的机器学习任务。
Spark-Scala: 同样支持MLlib,但在API的使用上可能相对繁琐一些。

总体而言,PySpark强于数据分析,Spark-Scala强于性能。如果应用场景有非常多的可视化和机器学习算法需求,推荐使用pyspark,可以更好地和python中的相关库配合使用。

前置知识:

1、熟悉Spark RDD原理,了解RDD常用算子
2、具有Python编码能力,熟悉Python中numpy, pandas库的基本用法。
3、了解机器学习算法原理,如逻辑回归、决策树等等
4、需要安装:JDK、Anaconda

二、PySpark安装

首先安装spark,本文使用的安装文件为:spark-3.2.1-bin-hadoop3.2。即Spark版本为3.2.1,Hadoop可不安装,对本文后续代码运行无影响。

百度云链接如下:

链接:https://pan.baidu.com/s/1GmPZBoBtSZWJtPHqm-DhwA?pwd=bcm5 
提取码:bcm5

将下载的安装文件解压到指定目录即可,例如我的目录:D:\bigdata\spark-3.2.1-bin-hadoop3.2

配置系统变量:

此电脑-右键点击“属性”-高级系统设置-环境变量-系统变量

#系统变量新建
SPARK_HOME D:\bigdata\spark-3.2.1-bin-hadoop3.2 #换成你的解压目录
PYSPARK_DEIVER_PYTHON_OPTS notebook
PYSPARK_DEIVER_PYTHON ipython
PYTHONPATH %SPARK_HOME%\python\lib\py4j;%SPARK_HOME%\python\lib\pyspark#path添加
%SPARK_HOME%\bin

修改配置文件:
在解压路径目录conf下,复制文件spark-env.sh.template,修改文件名为spark-env.sh

修改配置文件spark-env.sh,在文件末尾添加以下代码:

#D:\Anaconda3换成你的anaconda安装目录
export PYSPARK_PYTHON=D:\Anaconda3
export PYSPARK_DRIVER_PYTHON=D:\Anaconda3
export PYSPARK_SUBMIT_ARGS='--master local[*]'
#local[*]  是利用所有的资源

以上步骤完成,spark已经安装完成。

接下来在Anaconda创建虚拟环境,安装相关python库。需要注意,Python安装的pyspark版本必须与前面安装的spark版本一致。

#创建虚拟环境
conda create -n spark python=3.8#进入虚拟环境
conda activate spark#安装相关包
pip install pyspark==3.2.1 findspark pyhive notebook pandas

三、RDD编程

第一步,初始化Spark环境,创建一个Spark应用程序:

import findspark
import pyspark 
from pyspark import SparkContext, SparkConf
findspark.init()conf = SparkConf().setAppName("test").setMaster("local[4]")
sc = SparkContext(conf=conf)

以上代码中,首先创建一个SparkConf对象,用于配置Spark应用程序。通过setAppName设置应用程序的名称为"test",通过setMaster设置运行模式为本地模式,使用4个本地线程。

随后,创建一个SparkContext对象,它是与Spark集群通信的主要入口点。一旦SparkContext被创建,就可以使用它来执行各种分布式计算任务。

1、创建RDD

RDD(Resilient Distributed Dataset):是 Spark 中的核心数据结构,代表分布在集群节点上的不可变、弹性(可容错)、可并行计算的数据集。RDD 可以分为多个分区,每个分区可以在集群中的不同节点上进行并行处理。

①用textFile方法加载本地或者集群文件系统中的数据

#从本地文件系统中加载数据
file = "./data/test.txt"
rdd = sc.textFile(file,3)#从集群文件系统中加载数据
file = "hdfs://localhost:9000/user/data/test.txt"
#也可以省去hdfs://localhost:9000
rdd = sc.textFile(file,3)

②用parallelize方法将Driver中的数据结构并行化成RDD。

rdd = sc.parallelize(range(1,11),2)

2、常用Action操作

其主要特点如下:

触发计算:Action操作是Spark计算的触发点,当调用 Action操作时,Spark将执行整个RDD的计算流程,并生成最终结果。这与Transformation操作的惰性计算形成对比。

输出结果:Action操作生成非惰性结果,即它们会立即执行计算并返回实际的结果。可以将计算结果返回到本地驱动程序,也可以将结果写入外部存储系统(如 HDFS、数据库等),另外还可以将结果缓存到驱动程序或本地内存中(但对于大型数据集来说可能会导致内存问题)。

①collect

collect()是一个action操作,用于从RDD中收集所有元素到Driver节点,形成一个本地的数据集(数组)。

rdd = sc.parallelize(range(1,11),2)
rdd.collect()

运行结果:

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

注意:collect()操作将整个RDD中的数据收集到Driver节点的内存中,在大规模数据集上执行该操作可能导致内存不足或性能问题。
因此,collect()操作适用于小规模的结果集,用于调试和查看数据。对于大规模数据集,更常见的做法是使用转换操作和行动操作组合来实现分布式计算,最后将结果写到外部存储或以其他方式处理。

②take

rdd.take(n)用于获取RDD中的前n个元素。不同于collect(),take(n)只取前n个元素,因此在处理大规模数据时更为高效。

rdd = sc.parallelize(range(1,11),2)
rdd.take(4)

输出结果:

[1, 2, 3, 4]
takeSample

rdd.takeSample(withReplacement, num, seed=None)用于从RDD中获取指定数量的随机样本。

参数说明:
withReplacement:布尔值,表示是否允许采样时元素的重复抽取。如果为 True,则允许重复抽取;如果为 False,则不允许。
num:要获取的样本数量。
seed:可选的种子值,用于控制随机数生成。如果提供了相同的种子值,多次调用takeSample将产生相同的样本。

rdd = sc.parallelize(range(1,11),2)
rdd.takeSample(False,5,0)

输出结果:

[8, 9, 2, 6, 4]
first

first()用于获取RDD中的第一个元素,对于大型数据集的性能较好。

rdd = sc.parallelize(range(1,11),2)
rdd.first()

输出结果:

1
⑤count

count()用于获取RDD中元素的总数量,以了解数据规模。

rdd = sc.parallelize(range(1,11),2)
rdd.count()

输出结果:

10
⑥reduce

reduce()用于对RDD中的元素进行规约操作,两两结合进行某种操作后继续与下一个元素结合,直到规约成一个最终的结果。reduce()通常用于执行可以并行化的可交换和可结合的操作,例如对数字进行加法或求和。这样的操作可以在每个分区上并行执行,然后合并结果。

#计算0+1+2+3+4+5+6+7+8+9
rdd = sc.parallelize(range(10),5) 
rdd.reduce(lambda x,y:x+y)

输出结果:

45
⑦foreach

rdd.foreach()用于对RDD中的每个元素应用指定的函数,与map不同,foreach 是一个行动操作,它会在每个分区上并行地对每个元素执行给定的函数。

rdd = sc.parallelize(range(10),5) 
accum = sc.accumulator(0)
rdd.foreach(lambda x:accum.add(x))
print(accum.value)

输出结果:

45

在以上代码中,通过sc.accumulator()创建了一个累加器,并初始化其值为0。然后使用rdd.foreach()对RDD中的每个元素执行匿名函数,该函数将元素的值累加到累加器中。由于累加器是在分布式环境中共享的,因此每个节点上的累加器都能够更新。

countByKey

rdd.countByKey()用于统计 (key, value) 对的RDD中每个key的出现次数。

pairRdd = sc.parallelize([("hello",1),("world",4),("hello",9),("something",16)]) 
pairRdd.countByKey()

输出结果:

defaultdict(int, {'hello': 2, 'world': 1, 'something': 1})
saveAsTextFile

saveAsTextFile()用于将RDD的内容保存为文本文件,即将分布式数据集的结果写入到本地文件系统或分布式文件系统(如HDFS)中。

#saveAsTextFile保存rdd成text文件到本地
text_file = "./test/rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)#重新读入会被解析文本
rdd_loaded = sc.textFile(text_file)
rdd_loaded.collect()

3、常用Transformation操作

Transformation操作是对RDD进行变换的操作,它们不会立即执行,而是构建了一个表示要在RDD上执行的操作的执行计划。Transformation操作是为了支持分布式计算而设计的。它们在整个集群上并行运行,并利用RDD的不可变性和分区的概念来实现高效的分布式处理。通过构建逻辑执行计划,Spark可以优化计算并在整个集群上分布计算任务,以提高性能。

其主要特点如下:

惰性计算:当应用一个Transformation操作时,Spark只是记录了该操作的存在,并没有实际执行计算。实际的计算将会在Action操作触发时进行。

生成新的RDD:由于RDD一旦创建就不能被修改,所以Transformation操作通常生成一个新的RDD,而不是修改原始的RDD。

窄/宽依赖:Transformation操作可以分为窄依赖和宽依赖。
窄依赖指每个父分区中的数据仅依赖于该父分区的数据,例如map操作。
宽依赖指某个父分区的数据可能依赖于多个父分区的数据,例如groupByKey和reduceByKey操作,这会导致数据的重新分区,因此可能引起数据的Shuffle。

①map

rdd.map()用于对RDD中的每个元素应用一个指定的函数,并返回一个包含应用函数后结果的RDD。rdd.map()接受一个函数作为参数(可以是lambda匿名函数),该函数将被应用到RDD中的每个元素。

rdd = sc.parallelize(range(10),3)
rdd.map(lambda x:x**2).collect()

输出结果:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
②filter

rdd.filter()用于对RDD中的元素进行过滤,并返回新RDD,其中包含满足指定条件的原始RDD中的元素。

rdd = sc.parallelize(range(10),3)
rdd.filter(lambda x:x>5).collect()

输出结果:

[6, 7, 8, 9]
flatMap

rdd.flatMap()即在rdd.map()的基础上将所有的结果扁平化为一个新的RDD。

rdd = sc.parallelize(["hello world","hello China"])
print(rdd.map(lambda x:x.split(" ")).collect())
print(rdd.flatMap(lambda x:x.split(" ")).collect())

输出结果:

[['hello', 'world'], ['hello', 'China']]
['hello', 'world', 'hello', 'China']
sample

rdd.sample()用于从RDD每个分区按照比例随机抽样一部分元素,生成新的RDD。

参数如下:

withReplacement:是否放回抽样。true-有放回,false-无放回
fraction:期望样本的大小作为RDD大小的一部分。fraction范围在[0,1],即表示选择每个元素的概率。fraction大于1时,即表示选择每个元素的期望次数。
seed:随机数生成器的种子。

rdd = sc.parallelize(range(10),2)#每个元素被抽到的概率为0.5,但输出的元素不一定是5个
print(rdd.sample(withReplacement=False, fraction=0.5,seed=0).collect())#每个元素被抽到的期望次数是2,但输出的元素不一定是20个
print(rdd.sample(withReplacement=True, fraction=2,seed=0).collect())

输出结果:

[1, 4, 5, 7]
[0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 3, 4, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 8, 9]

⑤distinct

rdd.distinct()对原始RDD进行去重。

rdd = sc.parallelize([1,1,2,2,3,3,4,5])
rdd.distinct().collect()

输出结果:

[4, 1, 5, 2, 3]
⑥subtract

rdd1.subtract(rdd2)用于计算两个RDD的差集,返回在rdd1中但不在rdd2中的元素。

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.subtract(rdd2).collect()

输出结果:

[1, 2, 3]
union

rdd1.union(rdd2)用于计算两个RDD的并集,需要注意返回结果中可能带有重复元素

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.union(rdd2).collect()

输出结果:

[1, 2, 3, 4, 5, 4, 5, 6, 7, 8]
intersection

rdd1.intersection(rdd2)用于计算两个RDD的交集。

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.intersection(rdd2).collect()

输出结果:

[4, 5]
cartesian

rdd1.cartesian(rdd2)用于计算两个RDD的笛卡尔积。

rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize(['a', 'b', 'c'])
rdd1.cartesian(rdd2).collect()

输出结果:

[(1, 'a'),(1, 'b'),(1, 'c'),(2, 'a'),(2, 'b'),(2, 'c'),(3, 'a'),(3, 'b'),(3, 'c')]
sortBy

rdd.sortBy()用于对RDD中的元素按照指定的排序键进行排序。

参数如下:

rdd.sortBy(keyfunc, ascending=True, numPartitions=None)
keyfunc用于从 RDD 的每个元素中提取用于排序的键,可以是lambda匿名函数。
ascending表示排序的顺序。 True为升序,False为降序。
numPartitions表示返回结果RDD的分区数。

data = [(1, 'apple'), (3, 'orange'), (2, 'banana'), (4, 'grape')]
rdd = sc.parallelize(data)
rdd.sortBy(lambda x: x[0], ascending=True,numPartitions=2).collect()

输出结果:

[(1, 'apple'), (2, 'banana'), (3, 'orange'), (4, 'grape')]
⑪zip

rdd1.zip(rdd2)按照拉链方式将两个RDD中的元素一对一地合并成元组,效果类似python的zip函数,需要两个RDD具有相同的分区,每个分区元素数量相同。

rdd1 = sc.parallelize([1, 2, 3, 4, 5],2)
rdd2 = sc.parallelize(['a', 'b', 'c', 'd', 'e'],2)
rdd1.zip(rdd2).collect()

输出结果:

[(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')]
zipWithIndex

rdd.zipWithIndex()用于将RDD中的每个元素与其在 RDD 中的索引位置一对一地合并成元组。

rdd = sc.parallelize([10, 20, 30, 40, 50])
rdd.zipWithIndex().collect()

输出结果:

[(10, 0), (20, 1), (30, 2), (40, 3), (50, 4)]

4、常用Transformation操作(键值对)

PairRDD中的元素是键值对,Spark提供了针对键值对的一系列转换和操作,使得对数据进行分组、聚合和排序等操作更加方便。

reduceByKey

rdd.reduceByKey()用于对RDD进行聚合的Transformation操作,将具有相同键的所有值根据提供的聚合函数进行合并。

参数如下:
reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash>)
func: 聚合函数,接受两个参数,用于将相同键的值进行合并。
numPartitions: 可选参数,用于指定返回结果RDD的分区数。
partitionFunc: 可选参数,用于指定分区函数,默认为哈希分区函数。

data = [('x', 3), ('y', 5), ('x', 2), ('y', 1), ('x', 1), ('z', 4)]
rdd = sc.parallelize(data)
rdd.reduceByKey(lambda x, y: x + y).collect()

输出结果:

[('y', 6), ('x', 6), ('z', 4)]
groupByKey

rdd.groupByKey()用于对PairRDD进行分组的Transformation操作,将相同键的所有值放在一个迭代器中并返回新RDD,适用于不涉及值的聚合操作,只需按键进行分组的情况。不过groupByKey()可能导致数据倾斜,特别是当某个键对应的值非常多时。另外,生成的结果是以键值对形式的迭代器,存在大量数据时可能导致内存溢出。

data = [('x', 3), ('y', 5), ('x', 2), ('y', 1), ('x', 1), ('z', 4)]
rdd = sc.parallelize(data)
print(rdd.groupByKey().collect())for key, values in rdd.groupByKey().collect():print(f"{key}: {list(values)}")

输出结果:

[('y', <pyspark.resultiterable.ResultIterable object at 0x00000259F5816C10>), ('x', <pyspark.resultiterable.ResultIterable object at 0x00000259F574C790>), ('z', <pyspark.resultiterable.ResultIterable object at 0x00000259F574C490>)]
y: [5, 1]
x: [3, 2, 1]
z: [4]
sortByKey

rdd.sortByKey()用于对PairRDD进行按键排序的Transformation操作,按照键进行排序,并返回一个新RDD。sortByKey()可能导致数据倾斜,特别是当某个键对应的值非常多时。在数据量庞大时,可能会影响性能,因为需要将数据在不同分区间移动以进行排序。

参数如下:
ascending:表示排序的顺序。True为升序(默认),False为降序。

data = [(3, 'x'), (5, 'y'), (2, 'z'), (4, 's')]
rdd = sc.parallelize(data)
rdd.sortByKey().collect()

输出结果:

[(2, 'z'), (3, 'x'), (4, 's'), (5, 'y')]
join / leftOuterJoin / rightOuterJoin

rdd1.join(rdd2)用于对两个PairRDD进行连接的Transformation操作,根据键将两个 PairRDD 中的元素进行连接。类似SQL中的inner join。

rdd1.leftOuterJoin(rdd2)、rdd1.rightOuterJoin(rdd2)分别是左关联、右关联。如果另一侧PairRDD 中没有匹配的键,则对应位置的值为None。

data1 = [('Tom', 18), ('Jerry', 19), ('Alice', 17)]
data2 = [('Tom', 'male'), ('Bob', 'male'), ('Alice', 'female')]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)
print(rdd1.join(rdd2).collect())
print(rdd1.leftOuterJoin(rdd2).collect())
print(rdd1.rightOuterJoin(rdd2).collect())

输出结果:

[('Tom', (18, 'male')), ('Alice', (17, 'female'))]
[('Jerry', (19, None)), ('Tom', (18, 'male')), ('Alice', (17, 'female'))]
[('Tom', (18, 'male')), ('Bob', (None, 'male')), ('Alice', (17, 'female'))]
cogroup

rdd1.cogroup(rdd2)用于对两个PairRDD进行先后两次分组连接的Transformation操作,相当于对rdd1、rdd2分别进行goupByKey,再对两个结果进行groupByKey。

rdd1 = sc.parallelize([("a", 1), ("a", 2), ("b", 3)])
rdd2 = sc.parallelize([("a", 3), ("b", 4)])
[(x, tuple(map(list, y))) for x, y in sorted(list(rdd1.cogroup(rdd2).collect()))]

输出结果:

[('a', ([1, 2], [3])), ('b', ([3], [4]))]
subtractByKey

rdd1.subtractByKey(rdd2)用于对两个PairRDD求差集的Transformation操作,即返回在rdd1中而不在rdd2中的元素。

rdd1 = sc.parallelize([("x", 1), ("y", 2), ("z", 3)])
rdd2 = sc.parallelize([("x", 3), ("y", 4)])
rdd1.subtractByKey(rdd2).collect()

输出结果:

[('z', 3)]
foldByKey

foldByKey的操作和reduceByKey类似,但是foldByKey可以提供一个初始值

data = [('x', 1), ('x', 2), ('x', 3),  ('y', 1), ('y', 2), ('z', 1)]
rdd = sc.parallelize(data)
print(rdd.foldByKey(0, lambda x, y: x + y).collect())
print(rdd.foldByKey(1, lambda x, y: x + y).collect())

输出结果:

[('y', 3), ('x', 6), ('z', 1)]
[('y', 5), ('x', 8), ('z', 2)]

5、分区操作

RDD分区操作主要分为调整分区与转换分区操作。

调整分区操作用于调整已有RDD的分区结构,不改变数据的物理位置,仅影响分区元数据,通常性能开销较小。例如:分区数调整,即调整现有RDD的分区数,但不移动数据。

转换分区操作改变了RDD的分区结构,通常是在数据上执行Transformation操作,产生一个新的RDD,其分区数可能发生变化。例如通过指定的分区数或者使用一些具体的分区算法,重新组织数据分区。在数据的重新组织过程中可能涉及跨分区的数据移动,通常伴随着性能开销。

①glom

rdd.glom()用于将每个分区的数据转换为一个数组,是Transformation操作。适用于需要对每个分区进行整体操作的场景。

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 3)
rdd.glom().collect()

输出结果:

[[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]
②HashPartitioner

rdd.HashPartitioner()用于对PairRDD进行哈希分区,即根据键的哈希值将数据划分到不同的分区中。

data = [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.partitionBy(2)for partition, data in enumerate(rdd2.glom().collect()):print(f"Partition {partition}: {data}")

输出结果:

Partition 0: [('b', 2), ('c', 3), ('d', 4)]
Partition 1: [('a', 1), ('e', 5)]
mapPartitions / mapPartitionsWithIndex

rdd.mapPartitions()是Transformation操作,用于对RDD的每个分区执行一个自定义映射函数,该函数可以处理分区内的所有元素,而不是一次仅处理一个元素。mapPartitions能够减少通信开销,因为映射操作是在每个分区内进行的,适用于需要对整个分区进行批量操作的场景,而不适用于需要考虑跨分区元素之间关系的场景。

rdd.mapPartitionsWithIndex()类似于mapPartitions,但提供了分区索引信息,允许更细粒度的控制。

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 3)
print(rdd.mapPartitions(lambda x:(i * 2 for i in x)).collect())
print(rdd.mapPartitionsWithIndex(lambda index,x:((index, i*2) for i in x)).collect())

输出结果:

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
[(0, 2), (0, 4), (0, 6), (1, 8), (1, 10), (1, 12), (2, 14), (2, 16), (2, 18), (2, 20)]
④coalesce

coalesce()用于减少分区数的Transformation操作,可以尽量避免数据迁移,提升效率。

参数如下:

coalesce(numPartitions, shuffle=False)
numPartitions:新的分区数。
shuffle:是否进行数据洗牌,默认为False。当设置为 True 时,将触发数据洗牌操作,否则只是简单地减小分区数。

rdd = sc.parallelize(range(20),10)
print(rdd.glom().collect())
print(rdd.coalesce(2,shuffle=False).glom().collect())
print(rdd.coalesce(2,shuffle=True).glom().collect())

输出结果:

[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [12, 13], [14, 15], [16, 17], [18, 19]]
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]]
[[0, 1, 4, 5, 6, 7, 12, 13, 16, 17], [2, 3, 8, 9, 10, 11, 14, 15, 18, 19]]
⑤repartition

rdd.repartition()用于重新分区的Transformation操作,可以增加或减少分区数,通过shuffle来重新组织数据。允许动态调整RDD的分区数,可在数据分布不均匀时提高计算性能。

对比:coalesce在已有分区基础上尽量减少数据shuffle,而repartition会创建新分区并且使用full shuffle。

rdd = sc.parallelize(range(25),25)
print(rdd.glom().collect())
print(rdd.repartition(5).glom().collect())
print(rdd.coalesce(5).glom().collect())

输出结果:

[[0], [1], [2], [3], [4], [5], [6], [7], [8], [9], [10], [11], [12], [13], [14], [15], [16], [17], [18], [19], [20], [21], [22], [23], [24]]
[[6, 8, 11, 15, 20, 21], [0, 9, 16, 18, 24], [2, 3, 7, 14, 19, 22], [1], [4, 5, 10, 12, 13, 17, 23]]
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, 16, 17, 18, 19], [20, 21, 22, 23, 24]]
⑥partitionBy

rdd.partitionBy()用于对PairRDD重新分区,是Transformation操作,可以根据指定的分区器对键值对数据进行重新分区,以更好地控制数据的分布。

data = [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)]
pair_rdd = sc.parallelize(data)
hash_partitioned_rdd = pair_rdd.partitionBy(2)
for partition, data in enumerate(hash_partitioned_rdd.glom().collect()):print(f"Partition {partition}: {data}")

输出结果:

Partition 0: [('b', 2), ('c', 3), ('d', 4)]
Partition 1: [('a', 1), ('e', 5)]

6、缓存操作

如果多个任务在计算过程中共享同一个RDD作为中间数据,通过对其进行缓存,将其存储在内存中,可以显著加快计算速度。但是对RDD的缓存并不会立即生效,而是在该RDD第一次被计算出来时才会进行缓存。在不再需要某个RDD时,可以使用unpersist来释放缓存,而这个操作是立即执行的。这样可以有效地管理内存资源,避免不必要的缓存。
另一方面,缓存在提高计算速度的同时,并不会切断RDD的血缘依赖关系。因为缓存的数据可能存在某些分区的节点发生故障的情况,例如内存溢出或者节点损坏。在这种情况下,可以根据血缘关系重新计算受影响分区的数据,确保计算的正确性。
如果需要切断血缘关系,可以使用checkpoint来设置检查点,将RDD保存到磁盘中。与缓存类似,对RDD进行checkpoint同样不会立即生效,而是在该RDD第一次被计算出来时才会保存成检查点。通常,checkpoint适用于一些计算代价非常高昂的中间结果,或者在重复计算结果不可保证完全一致的情况下(例如使用zipWithIndex算子)。
对RDD进行缓存是优化Spark计算性能的有效手段,但需要根据具体情况灵活运用,以确保计算的准确性和效率。

①cache

rdd.cache()用于将RDD的计算结果缓存到内存中,以便在后续操作中重用,可以显著提高迭代算法等需要多次使用同一数据集的性能。rdd.cache使用存储级别MEMORY_ONLY,意味着如果内存不足,Spark可能会根据缓存数据的大小和可用内存的情况进行动态调整,例如将一部分或全部缓存的数据移除,以腾出内存供其他操作使用。

a = sc.parallelize(range(10000),5)
a.cache()
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_aprint(mean_a)

输出结果:

4999.5
②persist

rdd.persist()用于将RDD中间结果缓存到内存或磁盘中,以便在后续操作中重用。与cache不同,persist允许用户指定不同的存储级别,以更灵活地管理缓存。存储级别即不同的数据缓存的位置和策略,可以是MEMORY_ONLY、DISK_ONLY、MEMORY_AND_DISK(默认)等。

rdd.persist()写入磁盘的文件是临时文件,应用执行完成后就会被删除,可以使用rdd.unpersist()立即释放缓存。

from  pyspark.storagelevel import StorageLevel
a = sc.parallelize(range(10000),5)
a.persist(StorageLevel.MEMORY_AND_DISK)
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_aa.unpersist() 
print(mean_a)

输出结果:

4999.5
③checkpoint

rdd.checkpoint()用于将RDD中间结果写入磁盘。由于血缘依赖过长会造成容错成本过高,可以中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重新执行程序,减少开销。需要注意的是,checkpoint操作并不会马上被执行,而是在执行Action操作时才被触发。另外,checkpoint路径保存的文件是永久存在的,不会随着应用的结束而被删除。

sc.setCheckpointDir("./data/checkpoint/")
rdd = sc.parallelize(["a","b","c","d"],2)
rdd_idx = rdd.zipWithIndex() 
rdd_idx.checkpoint() 
rdd_idx.take(3)

输出结果:

[('a', 0), ('b', 1), ('c', 2)]

7、共享变量

共享变量主要用于在分布式计算中实现在任务之间共享数据,以提高性能和降低网络开销。广播变量(broadcast variables)和累加器(accumulators)是两个重要的分布式计算工具,用于在集群上共享数据和累积结果。

①broadcast

当需要在所有工作节点之间共享较小的只读数据集时,使用广播变量可以避免将该数据集多次传输到各个节点。这可以有效减少网络开销,提高性能。典型的应用场景包括在所有节点上使用相同的配置参数、字典或者映射表等。并且可以避免任务间重复传输,如果一个RDD需要在多个任务中使用,而且这个RDD的数据较小,使用广播变量可以避免在不同任务之间多次传输相同的数据。

#广播变量 broadcast 不可变,在所有节点可读
broads = sc.broadcast(100)
rdd = sc.parallelize(range(10))
print(rdd.map(lambda x:x+broads.value).collect())
print(broads.value)

输出结果:

[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
100
②accumulator

累加器主要用于执行在分布式任务中的“添加”和“合并”操作,通常用于聚合和计数等操作。
例如,可以用累加器来计算在整个集群上发生的某个特定事件的总次数;计算所有节点上某个变量的总和或平均值。

#累加器 只能在Driver上可读,在其它节点只能进行累加
total = sc.accumulator(0)
rdd = sc.parallelize(range(10),3)
rdd.foreach(lambda x:total.add(x))
total.value

输出结果:

45

四、总结

总的来说,PySpark适合初学者入门学习,由于python门槛不高,易于掌握,可以通过PySpark了解Spark的运行机制以及RDD算子的使用。但如果是需要几百台服务器才能运行的任务场景, PySpark的UDF(User Defined Functions)的性能差距肯定比不过Spark-Scala。

至于选什么语言,取决于业务需求。如果是处理简单的数据清洗聚合,且数据量非常大,用Scala会有性能优势,可以节约计算资源。如果需要处理较为复杂的算法模型,依赖于各种第三方包,那么使用Python会更好。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/655757.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】—— 信号的产生

本期&#xff0c;我们今天要将的是信号的第二个知识&#xff0c;即信号的产生。 目录 &#xff08;一&#xff09;通过终端按键产生信号 &#xff08;二&#xff09;调用系统函数向进程发信号 &#xff08;三&#xff09;由软件条件产生信号 &#xff08;四&#xff09;硬件…

会计分录的概念和应用

目录 一. 会计分录的概念二. 会计分录的分类三. 会计分录的应用 \quad 一. 会计分录的概念 \quad 会计分录是指对每笔经济业务列示其应借记和应贷记账户及其金额的一种记录。 会计分录的基本要素 ( 1 )账户及其所属明细账户名称(或会计科目及其所属明细科目名称) (2 )记账方向…

Linux系统——正则表达式

有一段时间本机访问量过高&#xff0c;如何查看日志提取出访问量前十的信息 1.使用提取命令&#xff08;cut、awk、sed&#xff09;提取出ip地址的那一列 2.使用sort按数字排序&#xff0c;将相同的地址整合到一起 3.使用uniq -c统计出数量 4.使用sort 数字 数字倒序排序 5.最…

【React教程】(2) React之JSX入门与列表渲染、条件渲染详细代码示例

目录 JSX环境配置基本语法规则在 JSX 中嵌入 JavaScript 表达式在 JavaScript 表达式中嵌入 JSXJSX 中的节点属性声明子节点JSX 自动阻止注入攻击在 JSX 中使用注释JSX 原理列表循环DOM Elements 列表渲染语法高亮 条件渲染示例1&#xff1a;示例2&#xff1a;示例3&#xff08…

Learn to Earn,Move星航计划第三期诚邀您探索编程和区块链的乐趣

*以下文章来源于MoveFuns &#xff0c;作者MoveFuns DAO 星航计划是一个 Web3 技术的公益计划,旨在引导更多的人加入开源社区,学习Move语言&#xff0c;了解Web3。本期星航计划由 MoveFuns Dao 发起&#xff0c;由Sui官方基金会支持&#xff0c;汇集了 Web3开发领域内的专业导…

FullStack之Django(1)开发环境配置

FullStack之Django(1)开发环境配置 author: Once Day date&#xff1a;2022年2月11日/2024年1月27日 漫漫长路&#xff0c;才刚刚开始… 全系列文档请查看专栏: FullStack开发_Once_day的博客-CSDN博客Django开发_Once_day的博客-CSDN博客 具体参考文档: The web framewor…

图灵之旅--ArrayList顺序表LinkedList链表栈Stack队列Queue

目录 线性表顺序表ArrayList简介ArrayList使用ArrayList的构造ArrayList常见操作ArrayList的遍历ArrayList的扩容机制利用ArrayList洗牌ArrayList的优缺点 链表链表的实现双向链表的实现 LinkedListLinkedList引入LinkedList的使用LinkedList的构造LinkedList的常用方法介绍Lin…

ArcGIS Pro如何新建字段

无论是地图制作还是数据分析&#xff0c;字段的操作是必不可少的&#xff0c;在某些时候现有的字段不能满足需求还需要新建字段&#xff0c;这里为大家讲解一下在ArcGIS Pro中怎么新建字段&#xff0c;希望能对你有所帮助。 数据来源 教程所使用的数据是从水经微图中下载的水…

pytorch安装教程(Anaconda + GPU)

可以去nvidia官网更新驱动 获取下载pytorch的命令地址&#xff1a;Start Locally | PyTorch 在这里可以找到旧版本的cuda的命令&#xff1a;Previous PyTorch Versions | PyTorch 如果使用conda没有安装成功的话&#xff0c;就使用pip&#xff1a;

ToF传感器在移动机器人中的作用

原创 | 文 BFT机器人 在日新月异的机器人技术领域&#xff0c;技术的无缝整合正引领着人类与机器交互方式的革新潮流。ToF传感器作为变革性创新的一个例子&#xff0c;对移动机器人更好地感知周围环境起到了决定性的作用。 ToF传感器与激光雷达技术在创建深度图方面有着异曲同…

大模型视觉理解能力更进一步,谷歌提出全新像素级对齐模型PixelLLM

论文题目&#xff1a;Pixel Aligned Language Models 论文链接&#xff1a;https://arxiv.org/abs/2312.09237 项目主页&#xff1a;Pixel Aligned Language Models 近一段时间以来&#xff0c;大型语言模型&#xff08;LLM&#xff09;在计算机视觉领域中也取得了巨大的成功&a…

Unity 观察者模式(实例详解)

文章目录 简介示例1 - 简单的文本更新通知示例2 - 多观察者监听游戏分数变化示例3 - 事件系统实现观察者模式示例4 - 泛型观察者和可序列化的事件系统示例5 - 使用C#委托简化版 简介 在Unity中实现观察者模式&#xff0c;我们可以创建一个Subject&#xff08;目标/主题&#x…

前端面试题-js部分-数组去重-数组扁平化-伪数组转数组-面向对象的继承方式(ES5)

前端面试题-js部分-数组去重-数组扁平化-伪数组转数组-面向对象的继承方式ES5 数组去重数组扁平化伪数组转换为数组面向对象的继承方式&#xff08;ES5&#xff09; 数组去重 1.利用es6 set 去重 Set 类型不允许有值重复 let arr1 [1, 2, 4, 3, 5, 7, 1, 8, 2, 4, 9]console.…

【郑益慧】模拟电子技术:7.Mos管的工作原理

Mos管的工作原理 Mos管的出现&#xff0c;几乎不怎么耗电。因此在集成电路中起了非常大的作用 在某些方面确实比晶体三极管强。 基本原理&#xff1a;依靠电场效应来控制。 电场效应几乎是没有电流的&#xff0c;没有电流几乎是没有功率的。 从控制上来说&#xff0c;消耗…

华为——NGFW Module安装在集群交换机上,二层双机负载分担部署,交换机重定向引流

NGFW Module安装在集群交换机上&#xff0c;二层双机负载分担部署&#xff0c;交换机重定向引流 业务需求 如图1所示&#xff0c;两台交换机集群组网&#xff0c;两块NGFW Module分别安装在两台交换机的1号槽位组成双机负载分担组网。NGFW Module工作在二层&#xff0c;也就是…

Stable Diffusion结构解析-以图像生成图像!

手把手教你入门绘图超强的AI绘画&#xff0c;用户只需要输入一段图片的文字描述&#xff0c;即可生成精美的绘画。给大家带来了全新保姆级教程资料包 &#xff08;文末可获取&#xff09; AIGC专栏3——Stable Diffusion结构解析-以图像生成图像&#xff08;图生图&#xff0c…

【活动回顾】CSDN 1024 程序员节城市站系列活动·成都站 - 圆满结束!

文章目录 前言一、活动介绍二、精彩分享内容及活动议程2.1、1024 活动限量周边大放送2.2、《COC 成都社区情况和活动介绍》2.3、CSDN 创始人蒋涛寄语2.4、《AI 重构世界》2.5、《新技术助力企业降本增效》2.6、现场互动情况2.7、《探索开源世界&#xff0c;开拓创新思路》2.8、…

用C#实现最小二乘法(用OxyPlot绘图)

最小二乘法介绍✨ 最小二乘法&#xff08;Least Squares Method&#xff09;是一种常见的数学优化技术&#xff0c;广泛应用于数据拟合、回归分析和参数估计等领域。其目标是通过最小化残差平方和来找到一组参数&#xff0c;使得模型预测值与观测值之间的差异最小化。 最小二…

电商API接口的应用|电商跨境电商商品采集高效解决方案

电商API接口的应用|电商跨境电商商品采集高效解决方案 面对数十万亿元的跨境电商市场&#xff0c;以阿里巴巴国际站为代表的跨境电商数字平台&#xff0c;在政策、需求以及供应链的驱动下&#xff0c;为中小企业提供了全产业链、全供应链一体化综合服务&#xff0c;让越来越多…

使用plotly dash 画3d圆柱(Python)

plotly3D &#xff08;3d charts in Python&#xff09;可以画3维图形 在做圆柱的3D装箱项目&#xff0c;需要装箱的可视化&#xff0c;但是Mesh &#xff08;3d mesh plots in Python&#xff09;只能画三角形&#xff0c;所以需要用多个三角形拼成一个圆柱&#xff08;想做立…