RDD的依赖与分区

1 宽依赖和窄依赖

RDD从具体的依赖的角度讲,有窄依赖和宽依赖2种情况。

窄依赖:指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、filter等都会产生窄依赖;

宽依赖:指一个父RDD的Partition会被多个子RDD的一个Partition所使用,如groupByKey,reduceByKey等操作都会产生宽依赖。

总结:如果父RDD的一个Partition被一个子RDD的Partition使用就是窄依赖,否则就是宽依赖。如果子RDD的Partition对父RDD的Partition依赖的数量不会随着RDD数据规模的改变而改变就是窄依赖,否则就是宽依赖

特别说明:对join操作有两种情况,如果说join操作的时候每个partition仅仅和已知的Partition进行join,这次是join操作就是窄依赖;其它情况(input not co-partitioned 会产生shuffle操作,而co-partitioned是哪几个固定的Partition进行join)的join操作就是宽依赖;

因为是确定的partition数量的依赖关系,所有就是窄依赖,得出一个推论,窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖(也就是说对父RDD的依赖的Partition的数量不会随着RDD数据规模的改变而改变)

2 RDD根据依赖关系构成Stage

RDD基于不同的依赖关系构成了Stage,我们到底能不能把所有的RDD放在一个任务中运行:

假设一:Spark一开始是将所有相关联的RDD构成一个Stage,上图中的ABCDEFG 这些RDD都放在一个Stage中,groupByKey,join等操作,其中的严重问题是,需要挨个执行,产生了大量的中间数据(中间数据需要被存储起来,下一步才会执行,导致内存无法释放)。

从执行逻辑图的角度来看在每个RDD中不同的Partition是独立的,这个是数据分片的一个基本特征,也就是在RDD内部每个Partition数据彼此之间不会干扰。假设G是最后一个RDD,为最后一个RDD每个Partition分配一个具体的任务。最后一个RDD有3个Partition,为每个Partition分配一个task。这个时候第一个数据分片来自B和F,B的数据来自A的3个分片,这样做的话有很大的问题:①耗性能,重复计算:task太大,而且遇到shuffle级别操作的时候就必须计算依赖的RDD的所有Partition,而且都发送在一个task中计算。而且第2,3个Partition还要重复计算②存储浪费;从后往前的依赖关系,看哪些RDD进行cache,如果从G的角度看,3个分片各自算各自的,这也是数据存储的浪费。

上述假设,核心问题都是在遇到shuffle依赖(宽依赖)的时候,无法进行pipeline。则采取在有shuffle依赖的时候断开处理,原因是浪费内存、重复计算、任务太大、不方便管理所以要采用现在从后往前推理,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到该Stage中的方式。

回溯,血统从计算的角度说到底都是pipeline,每个Stage里面的Task数量是由该Stage中最后一个RDD的Partition的数量所决定的。最后一个Stage里面的任务的类型是ResultTask,前面其它所有的Stage里面的任务的类型多是ShuffleMapTask。

我们的计算表面上看是数据在流动,实质上是算子在流动:有两层含义①集群计算角度:数据不动代码动②在一个Stage内部算子为何会流动(pipeline)?首先是算子合并也就是,也就是所谓的函数式编程,执行的时候会最终进行函数的展开从而把一个Stage内部的多个算子合并成为一个大算子(其内部包含当前Stage所有算子对数据的计算逻辑);其次是由于Tranformation操作的lazy特征!!!在具体算子交给集群的executor计算之前首先会通过DAGScheduler进行算子的优化(基于数据本地性的pipeline)那就不会产生中间结果

注意:shuffle依赖一般是k-v的形式,并且k不可以是数组

3 RDD依赖源码

spark的宽依赖(narrow dependency)和窄依赖(wide dependency)继承于Dependency。Dependency是个抽象类,只包含一个rdd,是它依赖的parentRDD.

Dependency一个很重要的要求是,子RDD可以为其每个partition根据dependency找到它所对应的父RDD的partition,或者是找到计算的数据来源,所以每个实现的Dependency都要提供根据partitionID获取parentRDD的partition的方法。

3.1 窄依赖

NarrowDependency中子RDD的每个分区依赖少量(一个或多个)parent RDD分区,即parent RDD的partition至多被子RDD的某个partition使用一次。

窄依赖继承于Dependency并定义了一个获取parent rdd的方法,下面看窄依赖的子类: OneToOneDependency和RangeDependency。

OneToOneDependency

OneToOneDependency是一对一依赖关系,子RDD的每个partition依赖单个parentRdd的一个partition。常见的OneToOneDependency有map, filter, join等。源码如下:

实现了getParents方法:子RDD以及父RDD之间,每个partition是对应(如果子RDD中存在的话)的,所以两个RDD中的对应的partition应该具有相同的partitionId。

此类的Dependency中parent中的partitionId与childRDD中的partitionId是一对一的关系,也就是partition本身范围不会改变,一个parition经过transform还是一个partition,虽然内容发生了变化,所以可以在local完成,此类场景通常像mapreduce中只有map的场景。

RangeDependency

rangeDependency是子rdd的每个partition依赖多个父parentRdd的一个partition。常见的RangeDependency有union。

几个私有变量解释如下:Rdd:父RDD;inStart:父RDDpartition的起始位置;outStart:UnionRDD的起始位置;length:父RDD partition的数量。父RDD中的partition通常是子RDD中,连续的某块partition区间的父partition,所以对应关系应该是parentPartitionId = childPartitionId - childStart + parentStart。

重写了getParents方法:parentRDD在最终的rdd的位置[outStart, outStart + parentRDDLength]

该依赖关系仍然是一一对应,但是parentRDD中的某个区间的partitions对应到childRDD中的某个区间的partitions。典型的操作是union,多个parentRDD合并到一个childRDD,所以将每个parentRDD都对应到childRDD中的一个区间。需要注意的是:union不会把多个partition合并成一个partition,而是的简单的把多个RDD中的partitions放到一个RDD里面,partition不会发生变化

它仅仅被org.apache.spark.rdd.UnionRDD使用。UnionRDD是把多个RDD合成一个RDD,这些RDD是被拼接而成,即每个parent RDD的Partition的相对顺序不会变,只不过每个parent RDD在UnionRDD中的Partition的起始位置不同

3.2 宽依赖

宽依赖,是在shuffle stage的时候的依赖关系,是划分Stage的重要标志,依赖首先要求是PariRdd即k,v的形式才能做shuffle。宽依赖只有ShuffleDependency一个实现。每个Shuffle过程会有一个Id,ShuffleDependency可以根据这个ShuffleId去获得所依赖的partition的数据,所以ShuffleDependency所需要记录的就是要能够通过ShuffleId去获得需要的数据

ShuffleDependency类有3个泛型参数,K代表键类型,V代表值类型,而C则代表Combiner的类型。因为Shuffle过程对键值型数据才有意义,所以ShuffleDependency对父RDD的泛型类型有限制,必须是Product2[K,V]或者其子类,Product2在Scala中代表两个元素的笛卡尔积。

构造方法参数说明:①partitioner:分区器②serializer:闭包序列化器,SparkEnv中已经创建,为JavaSerializer。③keyOrdering:可选的对键类型K排序的排序规则。④aggregator:可选的Map端数据聚合逻辑。⑤mapSideCombine:指定是否启用Map数据预聚合。

还会调用SparkContext.newShuffleId()方法分配一个新的Shuffle ID,以及调用ShuffleManager的registerShuffle方法注册该Shuffle,返回Shuffle句柄(ShuffleHandle)

 

4、Partitioner分区器

4.1 Partitioner

Partitioner是一个抽象类,用于处理key-value类型的RDD,按照key进行元素的划分

只有两个方法:numPartitions获取分区个数和getPartition(key: Any)根据Key值得到分区ID

在Partitioner的伴生对象中有defaultPartitioner方法,HashPartitioner 是Spark默认的分区器,除非RDD已经指定了一个分区器;对于分区数量,如果设置了配置项spark.default.parallelism,那么使用该配置,否则使用上游分区的最大数目

4.2 HashPartitioner

根据上图源码可知:传入的参数partitions决定总的分区数;重写的numPartitions方法也只是简单返回该值;重写的getPartition实际上是调用了Utils工具类的nonNegativeMod方法,将以key的hashcode和numPartitions作为参数

nonNegativeMod方法将对key的hashCode和numPartitions进行取模运算,得到key对应的分区索引。使用哈希和取模的方式,可以方便地计算出下游RDD的各个分区将具体处理哪些key。由于上游RDD所处理的key的哈希值在取模后很可能产生数据倾斜,所以HashPartitioner并不是一个均衡的分区计算器

reduceByKey,aggregateByKey,join内部默认都是使用HashPartitioner

4.3 RangePartitioner

HashPartitioner的实现原理可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据,这是不希望的。而RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内

RangePartitioner分区执行原理概述:

1.计算总体的数据抽样大小sampleSize,计算规则是:至少每个分区抽取20个数据或者最多1e6的样本的数据量。

2.根据sampleSize和分区数量计算每个分区的数据抽样样本数量最大值sampleSizePrePartition。

3.根据以上两个值进行水塘抽样,返回RDD的总数据量,分区中总元素的个数和每个分区的采样数据。

4.计算出数据量较大的分区通过RDD.sample进行重新抽样。

5.通过抽样数组 candidates: ArrayBuffer[(K, wiegth)]计算出分区边界的数组BoundsArray

6.在取数据时,如果分区数小于128则直接获取,如果大于128则通过二分法,获取当前Key属于那个区间,返回对应的BoundsArray下标即为partitionsID。

RangePartitioner分区器的主要作用就是:将一定范围内的数映射到某一个分区内,所以它的实现中分界的方法rangeBounds尤为重要

private var rangeBounds: Array[K] 获取对应分区的边界。每个Range内的数据进入一个分区。首先当Partitioner大于1,那么给定总的数据抽样大小,最多1M的数据量(10^6),最少20倍的RDD分区数量,也就是每个RDD分区至少抽取20条数据,sampleSize是初步的负载均衡;sampleSizePerPartition是由于依赖的父RDD数据时不均匀的,有些Partition数据量会很大,有些Partition数据量会很小。其中乘3的目的是保证数据量特别小的分区能够抽取到足够的数据,同时保证数据量非常大的分区能够进行二次抽样。

然后确定各partition对应的边界,即rangeBounds。首先,利用 RangePartitioner伴生对象的sketch()方法对输入的RDD的每一个 partition进行抽样,抽样方法采取的是水塘抽样(Reservoir Sampling),可以在不知道总size的情况下进行抽样,特别适用于数据在内存存不下的情况。

因为分区只需要对key进行操作,所以RangePartitioner.sketch的第一个参数是rdd.map(_._1)。该函数返回值是val (numItems, sketched) ,其中numItems相当于记录rdd元素的总数;而sketched的类型是Array[(Int, Long, Array[K])],记录的是分区的编号、该分区中总元素的个数以及从父RDD中每个分区采样的数据。

sketch函数对父RDD中的每个分区进行采样,并记录下分区的ID和分区中数据总和。reservoirSampleAndCount函数就是典型的水塘抽样实现,唯一不同的是该算法还记录下i的值,这个就是该分区中元素的总和。

回到RangePartitioner方法中,如果获取的数据分布不均匀,则边界方法rangeBounds会再次抽样,但是只对抽象数少于要求的partition进行sample,其他抽样好的不会

最后获取到每个partition中每个样本和对应的weight( 类似candidates += ((key, weight))),weight为partition中元素数量与抽样数量的比值,对于重新抽样的,则为1。

最后通过RangePartitioner伴生对象的determineBounds()方法进行边界确定,获得边界值组成的数组Array[K];  Array[K]被赋值给rangeBounds,即各partition对应的边界

determineBounds主要对 candidates先按key进行排序,然后获取总抽样元素除以partition大小即为每个partition理论的大小,即代码中的step。

然后再对排序好的ordered进行遍历,当所代表的权重大于step的整数倍时,返回此时的key,作为划分条件。然后依次类推,获得每个partition的边界key。

总结:RangePartitioner是采取抽样的策略,每个partition理论的是抽取20个元素,实际采用水塘抽样(Reservoir Sampling)时为了避免抽样少于期望,会乘以3.然后再用determineBounds对抽样数据进行排序,weight是每个key所代表的抽样数量,再按weight确定每个partition接近理论的边界,并进行返回,即为partitionid(getPartition返回值)。

getPartition查找某个元素应该所属的partitionid时,如果partition数量过大,会采取二分查找。

4.4 水塘抽样算法(Reservoir Sampling)

水塘抽样是一系列的随机算法,其目的在于从包含n个项目的集合S中选取k个样本,其中n为一很大或未知的数量,尤其适用于不能把所有n个项目都存放到主内存的情况。

首先假设要从一个不知道行数的文本中等概率抽取一行,要如何做的?定义取出的行号为num,第一次以第一行作为取出行 num,而后第二次以二分之一概率决定是否用第二行替换 num,第三次以三分之一的概率决定是否以第三行替换 num……,以此类推。得出结论,在取第n个数据的时候,我们生成一个0到1的随机数p,如果p小于1/n,保留第n个数。大于1/n,继续保留前面的数。直到数据流结束,返回此数,算法结束。

将上面的条件变为,k为任意整数的情况,即要求最终返回的元素有k个,这就是水塘抽样(Reservoir Sampling)问题。要求是:取到第n个元素时,前n个元素被留下的几率相等,即k/n。

算法同上面思路类似,将1/n换乘k/n即可。在取第n个数据的时候,我们生成一个0到1的随机数p,如果p小于k/n,替换池中任意一个为第n个数。大于k/n,继续保留前面的数。直到数据流结束,返回此k个数。但是为了保证计算机计算分数额准确性,一般是生成一个0到n的随机数,跟k相比,道理是一样的。

用伪代码表示如下所示

从S中抽取首k项放入「水塘」中

对于每一个S[j]项(j ≥ k):

   随机产生一个范围0到j的整数r

   若 r < k 则把水塘中的第r项换成S[j]项

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473675.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

爬虫案列:京东商城长裤信息获取

1、创建Scrapy项目 使用全局命令startproject创建项目&#xff0c;创建新文件夹并且使用命令进入文件夹&#xff0c;创建一个名为jingdong的Scrapy项目。 [python] view plaincopy scrapy startproject jingdong 2.使用项目命令genspider创建Spider [python] view plaincopy …

ACwing 2. 01背包问题(DP)

文章目录1. 题目2. 解题1. 题目 有 N 件物品和一个容量是 V 的背包。每件物品只能使用一次。 第 i 件物品的体积是 vi&#xff0c;价值是 wi。 求解将哪些物品装入背包&#xff0c;可使这些物品的总体积不超过背包容量&#xff0c;且总价值最大。 输出最大价值。 输入格式 …

Redis-Scrapy分布式爬虫:当当网图书为例

Scrapy-Redis分布式策略&#xff1a; Scrapy_redis在scrapy的基础上实现了更多&#xff0c;更强大的功能&#xff0c;具体体现在&#xff1a; reqeust去重&#xff0c;爬虫持久化&#xff0c;和轻松实现分布式 假设有四台电脑&#xff1a;Windows 10、Mac OS X、Ubuntu 16.04、…

Saprk排序

1、基础排序算子sortBy和sortByKey 在Spark中存在两种对RDD进行排序的函数&#xff0c;分别是 sortBy和sortByKey函数。sortBy是对标准的RDD进行排序&#xff0c;它是从Spark0.9.0之后才引入的。而sortByKey函数是对PairRDD进行排序&#xff0c;也就是有Key和Value的RDD。下面…

ACwing 3. 完全背包问题(DP)

文章目录1. 题目2. 解题1. 题目 有 N 种物品和一个容量是 V 的背包&#xff0c;每种物品都有无限件可用。 第 i 种物品的体积是 vi&#xff0c;价值是 wi。 求解将哪些物品装入背包&#xff0c;可使这些物品的总体积不超过背包容量&#xff0c;且总价值最大。 输出最大价值。…

Crontab定时任务访问url实例

以下操作均是在ubuntu 下操作的&#xff1a; 1、进入crontab文件的编写状态&#xff1a; crontab -e 2、第一次进入编写crontab文件的界面&#xff0c;系统会提示选择相应的编辑器&#xff0c;一般我们选择vi编辑器就可以了&#xff1a;选择/usr/bin/vim.tiny 12345Select a…

ACwing 4. 多重背包问题 I(DP)

文章目录1. 题目2. 解题1. 题目 有 N 种物品和一个容量是 V 的背包。 第 i 种物品最多有 si 件&#xff0c;每件体积是 vi&#xff0c;价值是 wi。 求解将哪些物品装入背包&#xff0c;可使物品体积总和不超过背包容量&#xff0c;且价值总和最大。 输出最大价值。 输入格式…

数据算法与结构基本知识

数据结构与算法作用 没有看过数据结构和算法&#xff0c;有时面对问题可能会没有任何思路&#xff0c;不知如何下手去解决&#xff1b;大部分时间可能解决了问题&#xff0c;可是对程序运行的效率和开销没有意识&#xff0c;性能低下&#xff1b;有时会借助别人开发的利器暂时…

Master HA源码解析

1、Master HA概述 Spark在生产上做HA一般采用的是通过zookeeper的方式&#xff0c;配置3个master的话是比较可靠的方式。采用zookeeper做HA的话zookeeper会保存整个Spark程序运行时候的元数据&#xff08;包括Workers&#xff0c;Drivers&#xff0c;Applications&#xff0c;…

DNS坑爹呢?!

昨天下午3点多&#xff0c;大量网民反映无法上网。多家DNS服务商通过微博透露&#xff0c;在1月21日下午3点20分左右&#xff0c;全国所有通用顶级域的根出现异常&#xff0c;导致部分国内网民无法访问.com域名网站&#xff0c;对中国互联网造成严重影响。 昨天下午有事出去&am…

数据结构顺序表基本流程

生活中很多事物是有顺序关系的&#xff0c;如班级座位从前到后是按排的顺序&#xff0c;从左到右是按列的顺序&#xff0c;可以很方便的定位到某一个位置&#xff0c;但如果座位是散乱的&#xff0c;就很难定位。 在程序中&#xff0c;经常需要将一组&#xff08;通常是同为某…

Spark2.x RPC解析

1、概述 在Spark中很多地方都涉及网络通信&#xff0c;比如Spark各个组件间的消息互通、用户文件与Jar包的上传、节点间的Shuffle过程、Block数据的复制与备份等。Spark 2.0 之后&#xff0c;master 和worker 之间完全不使用akka 通信&#xff0c;改用netty实现。因为使用Akka…

LeetCode 1629. 按键持续时间最长的键

文章目录1. 题目2. 解题1. 题目 LeetCode 设计了一款新式键盘&#xff0c;正在测试其可用性。测试人员将会点击一系列键&#xff08;总计 n 个&#xff09;&#xff0c;每次一个。 给你一个长度为 n 的字符串 keysPressed &#xff0c;其中 keysPressed[i] 表示测试序列中第 …

数据结构中的栈

整理衣服时&#xff0c;先放冬天的衣服&#xff0c;后放夏天的衣服&#xff0c;这样夏天的衣服就在上面&#xff0c;方便夏季取用。 栈&#xff08;stack&#xff09;&#xff0c;有些地方称为堆栈&#xff0c;是一种容器&#xff0c;可存入数据元素、访问元素、删除元素&…

数据结构中的队列

生活中很多时候需要排队来维持秩序&#xff0c;如等公交、取票、办理银行业务等。 队列&#xff08;queue&#xff09;是只允许在一端进行插入操作&#xff0c;而在另一端进行删除操作的线性表。 队列是一种先进先出的&#xff08;First In First Out&#xff09;的线性表&am…

SparkContext解析

1、SparkContext概述 Spark的程序编写是基于SparkContext的&#xff0c;体现在2方面&#xff1a;①Spark编程的核心基础&#xff08;RDD&#xff09;&#xff0c;第一个RDD是由SparkContext创建的&#xff1b;②Spark程序的调度优化也是基于SparkContext&#xff0c;RDD在一开…

LeetCode 1630. 等差子数组

文章目录1. 题目2. 解题1. 题目 如果一个数列由至少两个元素组成&#xff0c;且每两个连续元素之间的差值都相同&#xff0c;那么这个序列就是 等差数列 。更正式地&#xff0c;数列 s 是等差数列&#xff0c;只需要满足&#xff1a;对于每个有效的 i &#xff0c; s[i1] - s[…

LeetCode 1631. 最小体力消耗路径(DFS + 二分查找)

文章目录1. 题目2. 解题1. 题目 你准备参加一场远足活动。给你一个二维 rows x columns 的地图 heights &#xff0c;其中 heights[row][col] 表示格子 (row, col) 的高度。 一开始你在最左上角的格子 (0, 0) &#xff0c;且你希望去最右下角的格子 (rows-1, columns-1) &…

Spark资源调度分配

1、任务调度与资源调度 任务调度&#xff1a;是指通过DAGScheduler&#xff0c;TaskScheduler&#xff0c;SchedulerBackend等进行的作业调度。 资源调度&#xff1a;是指应用程序获取资源。 任务调度是在资源调度的基础上&#xff0c;没有资源调度&#xff0c;那么任务调度…

两个栈实现队列与两个队列实现栈

1. 两个栈实现队列 实现一 思路 s1是入栈的&#xff0c;s2是出栈的。 入队列&#xff0c;直接压到s1是就行了出队列&#xff0c;先把s1中的元素全部出栈压入到s2中&#xff0c;弹出s2中的栈顶元素&#xff1b;再把s2的所有元素全部压回s1中 实现二 思路 s1是入栈的&#xff0c…