RDD介绍

RDD设计背景

在实际应用中,存在许多迭代式计算,这些应用场景的共同之处是 : 不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入.
而目前的MapReduce框架都是把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销;
如果能将结果保存在内存当中,就可以大量减少IO.
RDD就是为了满足这种需求而出现的,它提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理,
不同RDD之间的转换操作形成依赖关系,可以实现管道化,从而避免了中间结果的落地存储,大大降低了数据复制、磁盘IO和序列化开销,最终加快计算速度.

RDD概念

RDD,弹性分布式数据集.
一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合;
一个RDD可以分成多个分区,每个分区就是一个数据集片段(HDFS上的块);
一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算.
弹性:既可以存储在内存又可以存储在磁盘
分布式:可以被分成多个分区,不同的分区可以被保存到不同的节点上进行并行计算
数据集:本质上是一个只读的分区记录集合
RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合;
不能直接修改,只能基于稳定的物理存储中的数据集来创建RDD,或者通过在其他RDD上执行确定的转换操作(如map、join和groupBy)而创建得到新的RDD.RDD提供了一组丰富的操作以支持常见的数据运算,分为'行动'(Action)'转换'(Transformation)两种类型,前者用于执行计算并指定输出的形式,后者指定RDD之间的相互依赖关系.
两类操作的主要区别是:转换操作(比如map、filter、groupBy、join等)接受RDD并返回RDD,而行动操作(比如count、collect等)接受RDD但是返回非RDD(即输出一个值或结果).

RDD执行过程

Spark用Scala语言实现了RDD的API,程序员可以通过调用API实现对RDD的各种操作.
RDD典型的执行过程如下:
1. 读入外部数据源(或者内存中的集合)创建RDD;
2. RDD经过一系列的'Transformation'操作,每一次都会产生不同的RDD,供给下一个'Transformation'使用;
3. 最后一个RDD经'Action'操作进行处理,并输出到外部数据源(或者变成Scala/JAVA集合或变量).
需要说明的是,RDD采用了惰性调用,即在RDD的执行过程中,真正的计算发生在RDD的'Action'操作,
对于'Action'之前的所有'Transformation'操作,Spark只是记录下'Transformation'操作应用的一些基础数据集以及RDD生成的轨迹,即相互之间的依赖关系,而不会触发真正的计算.

在这里插入图片描述

从输入中逻辑上产生了A和C两个RDD,经过一系列'Transformation'操作,逻辑上生成了F(也是一个RDD),之所以说是逻辑上,是因为这时候计算并没有发生,Spark只是记录了RDD之间的生成和依赖关系.
也就当F要进行输出时,是当F进行'Action'操作的时候,Spark才会根据RDD的依赖关系生成DAG,并从起点开始真正的计算.

在这里插入图片描述

血缘关系

上诉一系列处理称为一个'血缘关系(Lineage)',即DAG拓扑排序的结果.
采用惰性调用,通过血缘关系连接起来的一系列RDD操作就可以实现管道化(pipeline),避免了多次转换操作之间数据同步的等待,
而且不用担心有过多的中间数据,因为具有血缘关系的操作都管道化了,一个操作得到的结果不需要保存为中间数据,而是直接管道式地流入到下一个操作进行处理.
同时,这种通过血缘关系就把一系列操作进行管道化连接的设计方式,也使得管道中每次操作的计算变得相对简单,保证了每个操作在处理逻辑上的单一性;
相反,在MapReduce的设计中,为了尽可能地减少MapReduce过程,在单个MapReduce中会写入过多复杂的逻辑.

RDD特性

总体而言,Spark采用RDD以后能够实现高效计算的主要原因如下:
(1) 高效的容错性
现有的分布式共享内存、键值存储、内存数据库等,为了实现容错,必须在集群节点之间进行数据复制或者记录日志,即在节点之间会发生大量的数据传输,这对于数据密集型应用而言会带来很大的开销.
而在RDD的设计中,数据只读,不可修改,如果需要修改数据,必须从父RDD转换到子RDD,由此在不同RDD之间建立了血缘关系;所以,RDD是一种天生具有容错机制的特殊集合,
不需要通过数据冗余的方式(比如详细的记录操作的日志)实现容错,而只需通过RDD父子依赖(血缘)关系重新计算得到丢失的分区来实现容错,无需回滚整个系统,这样就避免了数据复制的高开销,
而且重算过程可以在不同节点之间并行进行,实现了高效的容错;
此外,RDD提供的转换操作都是一些粗粒度的操作(比如map、filter和join),RDD依赖关系只需要记录这种粗粒度的转换操作,而不需要记录具体的数据和各种细粒度操作的日志(比如对哪个数据项进行了修改),这就大大降低了数据密集型应用中的容错开销.
(2) 中间结果持久化到内存
数据在内存中的多个RDD操作之间进行传递,不需要落地到磁盘上,避免了不必要的读写磁盘开销.
(3) 存放的数据可以是Java对象,避免了不必要的对象序列化和反序列化开销.

RDD的依赖关系

RDD中不同的操作会使得不同RDD中的分区会产生不同的依赖; RDD中的依赖关系分为窄依赖(Narrow Dependency)、宽依赖(Wide Dependency).
宽依赖: 一个父RDD的一个分区对应一个子RDD的多个分区; 一对多,伴有shuffle过程.
窄依赖: 一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区;一对一或多对一.
总结:如果父RDD的一个分区只被一个子RDD的一个分区所使用就是窄依赖,否则就是宽依赖.

在这里插入图片描述

窄依赖典型的操作包括map、filter、union等,宽依赖典型的操作包括groupByKey、sortByKey等;
对于连接(join)操作,可以分为两种情况:
1.对输入进行协同划分,属于窄依赖.
协同划分(co-partitioned)是指多个父RDD的某一分区的所有'键(key)'落在子RDD的同一个分区内,不会产生同一个父RDD的某一分区落在子RDD的两个分区的情况.
2.对输入做非协同划分,属于宽依赖.
对于窄依赖的RDD,可以以流水线的方式计算所有父分区,不会造成网络之间的数据混合;
对于宽依赖的RDD,则通常伴随着Shuffle操作,即首先需要计算好所有父分区数据,然后在节点之间进行Shuffle.

阶段划分(stage)

Spark通过分析各个RDD的依赖关系生成了DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分阶段,
具体划分方法是:在DAG中进行反向解析,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的阶段中;将窄依赖尽量划分在同一个阶段中,可以实现流水线计算.
例如,假设从HDFS中读入数据生成3个不同的RDD(即A、C和E),通过一系列转换操作后再将计算结果保存回HDFS;
对DAG进行解析时,在依赖图中进行反向解析,由于从RDD A到RDD B的转换以及从RDD B和F到RDD G的转换,都属于宽依赖,因此,在宽依赖处断开后可以得到三个阶段,即阶段1、阶段2和阶段3.
可以看出,在阶段2中,从map到union都是窄依赖,这两步操作可以形成一个流水线操作,
比如,分区7通过map操作生成的分区9,可以不用等待分区8到分区10这个转换操作的计算结束,而是继续进行union操作,转换得到分区13,这样流水线执行大大提高了计算的效率.

在这里插入图片描述

由上述论述可知,把一个DAG图划分成多个'stage'以后,每个阶段都代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集合;
每个任务集合会被提交给任务调度器(TaskScheduler)进行处理,由任务调度器将任务分发给Executor运行.

Spark算子

RDD支持两种类型的操作:
Transformation:从一个RDD转换为一个新的RDD.
Action:基于一个数据集进行运算(引起Job运算),并返回RDD.
例如,map是一个Transformation操作,map将数据集的每一个元素按指定的函数转换为一个RDD返回;reduce是一个action操作.
Spark的所有Transformation操作都是懒执行,它们并不立马执行,而是先记录对数据集的一系列Transformation操作;这种设计让Spark的运算更加高效.
例如,对一个数据集map操作之后使用reduce只返回结果,而不返回庞大的map运算的结果集.
默认情况下,每个转换的RDD在执行不同Action操作时都会重新计算;即使两个Action操作会使用同一个转换的RDD,该RDD也会重新计算.
除非使用persist方法或cache方法将RDD缓存到内存,这样在下次使用这个RDD时将会提高计算效率,也支持将RDD持久化到硬盘上或在多个节点上复制.

Transformation算子

下面列出了Spark常用的transformation操作,详细的细节请参考RDD API文档(Scala、Java、Python、R)和键值对RDD方法文档(Scala、Java).map(func)
将原来RDD的每个数据项,使用map中用户自定义的函数func进行映射,转变为一个新的元素,并返回一个新的RDD.filter(func)
使用函数func对原RDD中数据项进行过滤,将符合func中条件的数据项组成新的RDD返回.flatMap(func)
类似于map,但是输入数据项可以被映射到0个或多个输出数据集合中,所以函数func的返回值是一个数据项集合而不是一个单一的数据项.mapPartitions(func)
类似于map,但是该操作是在每个分区上分别执行,所以当操作一个类型为T的RDD时func的格式必须是Iterator<T> => Iterator<U>.
即mapPartitions需要获取到每个分区的迭代器,在函数中通过这个分区的迭代器对整个分区的元素进行操作.mapPartitionsWithIndex(func)
类似于mapPartitions,但是需要提供给func一个整型值,这个整型值是分区的索引,所以当处理T类型的RDD时,func的格式必须为(Int, Iterator<T>) => Iterator<U>.union(otherDataset)
返回原数据集和参数指定的数据集合并后的数据集;
使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同;
该操作不进行去重操作,返回的结果会保存所有元素;如果想去重,可以使用distinct().intersection(otherDataset)
返回两个数据集的交集.distinct([numTasks]))
将RDD中的元素进行去重操作.groupByKey([numTasks])
操作(K,V)格式的数据集,返回(K, Iterable)格式的数据集.
注意,如果分组是为了按key进行聚合操作(例如,计算sum、average),此时使用reduceByKey或aggregateByKey计算效率会更高.
注意,默认情况下,并行情况取决于父RDD的分区数,但可以通过参数numTasks来设置任务数.reduceByKey(func, [numTasks])
使用给定的func,将(K,V)对格式的数据集中key相同的值进行聚集,其中func的格式必须为(V,V) => V,可选参数numTasks可以指定reduce任务的数目.aggregateByKey(zeroValue)(seqOp, combOp,[numTasks])(K,V)格式的数据按key进行聚合操作,聚合时使用给定的合并函数和一个初始值,返回一个(K,U)对格式数据;
需要指定的三个参数:zeroValue为在每个分区中,对key值第一次读取V类型的值时,使用的U类型的初始变量;
seqOp用于在每个分区中,相同的key中V类型的值合并到zeroValue创建的U类型的变量中;combOp是对重新分区后两个分区中传入的U类型数据的合并函数.sortByKey([ascending], [numTasks])
(K,V)格式的数据集,其中K已实现了Ordered,经过sortByKey操作返回排序后的数据集,指定布尔值参数ascending来指定升序或降序排列.join(otherDataset, [numTasks])
用于操作两个键值对格式的数据集,操作两个数据集(K,V)(K,W)返回(K,(V, W))格式的数据集,通过leftOuterJoin、rightOuterJoin、fullOuterJoin完成外连接操作.cogroup(otherDataset, [numTasks])
用于操作两个键值对格式数据集(K,V)(K,W),返回数据集格式为(K,(Iterable, Iterable)).这个操作也称为groupWith.
对在两个RDD中的Key-Value类型的元素,每个RDD相同Key的元素分别聚合为一个集合,并且返回两个RDD中对应Key的元素集合的迭代器.cartesian(otherDataset)
对类型为T和U的两个数据集进行操作,返回包含两个数据集所有元素对的(T,U)格式的数据集;即对两个RDD内的所有元素进行笛卡尔积操作.pipe(command, [envVars])
以管道(pipe)方式将RDD的各个分区(partition)使用shell命令处理(比如一个 Perl或 bash脚本),
RDD的元素会被写入进程的标准输入(stdin),将进程返回的一个字符串型 RDD(RDD of strings),以一行文本的形式写入进程的标准输出(stdout)中.coalesce(numPartitions)
把RDD的分区数降低到通过参数numPartitions指定的值,在得到的更大一些数据集上执行操作,会更加高效.repartition(numPartitions)
随机地对RDD的数据重新洗牌(Reshuffle),从而创建更多或更少的分区,以平衡数据,总是对网络上的所有数据进行洗牌(shuffles).repartitionAndSortWithinPartitions(partitioner)
根据给定的分区器对RDD进行重新分区,在每个结果分区中,按照key值对记录排序,这在每个分区中比先调用repartition再排序效率更高,因为它可以将排序过程在shuffle操作的机器上进行.

Action算子

下面列出了Spark支持的常用的action操作,详细请参考RDD API文档(Scala、Java、Python、R)和键值对RDD方法文档(Scala、Java).reduce(func)
使用函数func聚集数据集中的元素,这个函数func输入为两个元素,返回为一个元素;这个函数应该符合结合律和交换率,这样才能保证数据集中各个元素计算的正确性.collect()
在驱动程序中,以数组的形式返回数据集的所有元素;通常用于filter或其它产生了大量小数据集的情况.count()
返回数据集中元素的个数.first()
返回数据集中的第一个元素,类似于take(1).take(n)
返回数据集中的前n个元素,类似于sql中的limit.takeOrdered(n,[ordering])
返回RDD按自然顺序或自定义顺序排序后的前n个元素.saveAsTextFile(path)
将数据集中的元素以文本文件或文本文件集合的形式保存到指定的本地文件系统、HDFS或其它Hadoop支持的文件系统中;
Spark将在每个元素上调用toString方法,将数据元素转换为文本文件中的一行记录.saveAsSequenceFile(path) (Java and Scala)
将数据集中的元素以Hadoop Sequence文件的形式保存到指定的本地文件系统、HDFS或其它Hadoop支持的文件系统中;
该操作只支持对实现了Hadoop的Writable接口的键值对RDD进行操作,在Scala中,还支持隐式转换为Writable的类型(Spark包括了基本类型的转换,例如Int、Double、String等).saveAsObjectFile(path) (Java and Scala)
将数据集中的元素以简单的Java序列化的格式写入指定的路径;这些保存该数据的文件,可以使用SparkContext.objectFile()进行加载.countByKey()
仅支持对(K,V)格式的键值对类型的RDD进行操作;返回(K,Int)格式的Hashmap,(K,Int)为每个key值对应的记录数目.foreach(func)
对数据集中每个元素使用函数func进行处理.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/15480.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

为何程序员35岁就开始被嫌弃了?程序员该如何避免中年危机?

文章目录 一、为何程序员35岁就开始被嫌弃了&#xff1f;1、技术更新迅速2、职业发展瓶颈3、成本考虑4、年龄歧视5、市场供需变化6、个人因素 二、程序员该如何避免中年危机&#xff1f;1、持续学习与技能更新2、拓展技术广度与深度3、提升软技能4、关注行业趋势与市场变化5、建…

vue3 input输入框输入限制(数字)

输入框限制输入的内容格式&#xff0c;如&#xff08;金额&#xff0c;数字&#xff09; 金额限制小数点后2位数 <el-input placeholder"请填写费用" v-model"formMoney.total_money" keyup"formMoney.total_money checkPrice(formMoney.total_…

20240521(代码整洁和测试入门学习)

测试: 1.测试工程师、测试工具开发工程师、自动化测试工程师。 python&#xff1a; 1、发展背景和优势&#xff1b; 2、开始多需的工具 interpreter(解释器) refactor(重构) 2、变量和注释的基础语法 3、输入输出 i 1 for i in range(1, 11): print(i, end ) 不换行打印…

jupyter notebook 实现联邦学习模型

联邦学习(Federated Learning)是一种机器学习框架,它允许多个参与方(例如,移动设备或服务器)在本地数据集上训练模型,而无需将数据集中到一个位置。这有助于保护数据隐私,并允许在分布式环境中进行模型训练。 要在Jupyter Notebook中实现联邦学习模型,你可以遵循以下…

性能大爆炸!为你的Matomo换一个高性能的环境!

随着我的 Matomo 越来越大&#xff0c;功能需求的增多&#xff0c;插件也变得越来越多&#xff0c;使用传统的LNMP架构或者LAMP架构都会发现性能正在急剧下级&#xff0c;为此&#xff0c;我们发现了使用FrankenPHP&#xff08;以下简称FPHP&#xff09;的方案 首先&#xff0…

Android kotlin协程

说明 可代替线程整异步可控制&#xff0c;灵活 &#xff08;控制优先级&#xff0c;内存占用等&#xff09;速度快 效率高有数量上限 使用 runBlocking 一般用于测试 不建议使用GlobalScope.launch 全局的 生命周期跟随application 不建议使用CoroutineScope(job) 用 基本使…

樱花下落的速度是每秒5厘米,我们的心又该以什么速度去接近呢

樱花下落的速度是每秒五厘米。5年前第一次接触秒速五厘米的时候&#xff0c;我还在念初中&#xff0c;那时候的我尚且理解不了作品里的太多东西&#xff0c;只是为那辆列车隔开了明里和贵树感到悲伤&#xff0c;为他们二人那段无疾而终的感情感到遗憾。五年后再一次重温&#x…

GEE批量导出逐日、逐月、逐季节和逐年的遥感影像(以NDVI为例)

影像导出 1.逐日数据导出2.逐月数据导出3.季节数据导出4.逐年数据导出 最近很多小伙伴们私信我&#xff0c;问我如何高效导出遥感数据&#xff0c;从逐日到逐季度&#xff0c;我都有一套自己的方法&#xff0c;今天就来和大家分享一下&#xff01;   &#x1f50d;【逐日导出…

Scala 入门介绍和环境搭建

一、简介 Scala 是一门以 Java 虚拟机&#xff08;JVM&#xff09;为运行环境并将面向对象和函数式编程的最佳特性结合在一起的静态类型编程语言 (静态语言需要提前编译&#xff0c;如&#xff1a;Java、c、c 等&#xff0c;动态语言如&#xff1a;js)Scala 是一门多范式的编程…

【介绍下Pwn,什么是Pwn?】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

CSS3文字与字体

文字与字体 @font-face 用途:定义一种自定义字体,使其可以在网页中使用。通过@font-face规则,可以指定字体名称、来源(通常是URL)以及字体的各种变体(如常规、粗体、斜体等)。 @font-face {font-family: MyCustomFont;src: url(mycustomfont.woff2) format(woff2

冯喜运:5.25黄金价格和原油价格加速看跌?未来如何走势?

【黄金消息面分析】&#xff1a;本周黄金市场经历剧烈波动&#xff0c;金价创下五个半月来最糟糕的单周表现&#xff0c;尽管周五因美元下跌小幅回升。美联储的鹰派立场和美国经济数据强劲削弱了降息预期&#xff0c;导致金价承压。然而&#xff0c;分析师对未来金价走势看法不…

Rolla‘s homework:Image Processing with Python Final Project

对比学习Yolo 和 faster rcnn 两种目标检测 要求 Image Processing with Python Final Project Derek TanLoad several useful packages that are used in this notebook:Image Processing with Python Final Project Project Goals: • Gain an understanding of the object …

leetcode 1049.最后一块石头的重量II

思路&#xff1a;01背包 其实这道题我们可以转化一下&#xff0c;乍一看有点像区间dp&#xff0c;很像区间合并那种类型。 但是&#xff0c;后来发现&#xff0c;这道题的精髓在于你如何转成背包问题。我们可以把这个石头分成两堆&#xff0c;然后求出来这两堆的最小差值就行…

使用git生成SSH公钥,并设置SSH公钥

1、在git命令行里输入以下命令 ssh-keygen -t rsa 2、按回车&#xff0c;然后会看到以下字眼 Generating public/private rsa key pair. Enter file in which to save the key (/c/Users/xxx/.ssh/id_rsa) 例&#xff1a; 3、继续回车&#xff0c;然后会看到以下字眼 Enter…

【面试干货】数据库乐观锁,悲观锁的区别,怎么实现

【面试干货】数据库乐观锁&#xff0c;悲观锁的区别&#xff0c;怎么实现 1、乐观锁&#xff0c;悲观锁的区别2、总结 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 1、乐观锁&#xff0c;悲观锁的区别 悲观锁&#xff08;Pessimistic Lo…

web前端框架设计第十课-组件

web前端框架设计第十课-组件 一.预习笔记 组件&#xff1a;Vue最强大的功能之一 1.局部组件注册 注意事项&#xff1a;template标签中只能有一个根元素 2.全局组件的注册 注意事项&#xff1a;组件名的大小写需要注意&#xff08;实践&#xff09; 3.案例&#xff08;查询框…

Vivado 使用教程(个人总结)

Vivado 是 Xilinx 公司推出的一款用于 FPGA 设计的集成开发环境 (IDE)&#xff0c;提供了从设计输入到实现、验证、调试和下载的完整流程。本文将详细介绍 Vivado 的使用方法&#xff0c;包括项目创建、设计输入、约束文件、综合与实现、仿真、调试、下载配置等步骤。 一、创建…

设计模式--责任链模式

责任链模式是一种行为设计模式&#xff0c;它允许将请求沿着处理者链进行发送。请求会沿链传递&#xff0c;直到某个处理者对象负责处理它。这种模式在许多应用场景中非常有用&#xff0c;例如在处理用户输入、过滤请求以及实现多级审核时。 应用场景 处理用户输入&#xff1…

kafka之consumer参数auto.offset.reset

Kafka的auto.offset.reset 参数是用于指定消费者在启动时如何处理偏移量&#xff08;offset&#xff09;的。这个参数有三个主要的取值&#xff1a;earliest、latest和none。 earliest&#xff1a; 当各分区下有已提交的offset时&#xff0c;从提交的offset开始消费&#xff1b…