spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子

0185edac07ea52a24acd2f58ef1d4375.png
目录
天小天:(一)Spark Streaming 算子梳理 — 简单介绍streaming运行逻辑
天小天:(二)Spark Streaming 算子梳理 — flatMap和mapPartitions
天小天:(三)Spark Streaming 算子梳理 — transform算子
天小天:(四)Spark Streaming 算子梳理 — Kafka createDirectStream
天小天:(五)Spark Streaming 算子梳理 — foreachRDD
天小天:(六)Spark Streaming 算子梳理 — glom算子
天小天:(七)Spark Streaming 算子梳理 — repartition算子
天小天:(八)Spark Streaming 算子梳理 — window算子

前言

本文主要讲解repartiion的作用及原理。

作用

repartition用来调整父RDD的分区数,入参为调整之后的分区数。由于使用方法比较简单,这里就不写例子了。

源码分析

接下来从源码的角度去分析是如何实现重新分区的。

DStream

/*** Return a new DStream with an increased or decreased level of parallelism. Each RDD in the* returned DStream has exactly numPartitions partitions.*/def repartition(numPartitions: Int): DStream[T] = ssc.withScope {this.transform(_.repartition(numPartitions))}

从方法中可以看到,实现repartition的方式是通过Dstreamtransform算子之间调用RDD的repartition算子实现的。

接下来就是看看RDD的repartition算子是如何实现的。

RDD

/*** Return a new RDD that has exactly numPartitions partitions.** Can increase or decrease the level of parallelism in this RDD. Internally, this uses* a shuffle to redistribute data.** If you are decreasing the number of partitions in this RDD, consider using `coalesce`,* which can avoid performing a shuffle.** TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.*/def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)}

首先可以看到RDDrepartition的实现是调用时coalesce方法。其中入参有两个第一个是numPartitions为重新分区后的分区数量,第二个参数为是否shuffle,这里的入参为true代表会进行shuffle。

接下来看下coalesce是如何实现的。

def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T] = withScope {require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")if (shuffle) {// 是否经过shuffle,repartition是走这个逻辑/** Distributes elements evenly across output partitions, starting from a random partition. */// distributePartition是shuffle的逻辑,// 对迭代器中的每个元素分派不同的key,shuffle时根据这些key平均的把元素分发到下一个stage的各个partition中。val distributePartition = (index: Int, items: Iterator[T]) => {var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)items.map { t =>// Note that the hash code of the key will just be the key itself. The HashPartitioner// will mod it with the number of total partitions.position = position + 1(position, t)}} : Iterator[(Int, T)]// include a shuffle step so that our upstream tasks are still distributednew CoalescedRDD(new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), // 为每个元素分配key,分配的逻辑为distributePartitionnew HashPartitioner(numPartitions)), // ShuffledRDD 根据key进行混洗numPartitions,partitionCoalescer).values} else {// 如果不经过shuffle之间返回CoalescedRDDnew CoalescedRDD(this, numPartitions, partitionCoalescer)}}

从源码中可以看到无论是否经过shuffle最终返回的都是CoalescedRDD。其中区别是经过shuffle需要为每个元素分配key,并根据key将所有的元素平均分配到task中。

CoalescedRDD

private[spark] class CoalescedRDD[T: ClassTag](@transient var prev: RDD[T], // 父RDDmaxPartitions: Int, // 最大partition数量,这里就是重新分区后的partition数量partitionCoalescer: Option[PartitionCoalescer] = None // 重新分区算法,入参默认为None)extends RDD[T](prev.context, Nil) {  // Nil since we implement getDependenciesrequire(maxPartitions > 0 || maxPartitions == prev.partitions.length,s"Number of partitions ($maxPartitions) must be positive.")if (partitionCoalescer.isDefined) {require(partitionCoalescer.get.isInstanceOf[Serializable],"The partition coalescer passed in must be serializable.")}override def getPartitions: Array[Partition] = {// 获取重新算法,默认为DefaultPartitionCoalescerval pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())// coalesce方法是根据传入的rdd和最大分区数计算出每个新的分区处理哪些旧的分区pc.coalesce(maxPartitions, prev).zipWithIndex.map {case (pg, i) => // pg为partitionGroup即旧的partition组成的集合,集合里的partition对应一个新的partitionval ids = pg.partitions.map(_.index).toArraynew CoalescedRDDPartition(i, prev, ids, pg.prefLoc) //组成一个新的parititon}}override def compute(partition: Partition, context: TaskContext): Iterator[T] = {// 当执行到这里时分区已经重新分配好了,这部分代码也是执行在新的分区的task中的。// 新的partition取出就的partition对应的所有partition并以此调用福rdd的迭代器执行next计算。partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>firstParent[T].iterator(parentPartition, context)}}override def getDependencies: Seq[Dependency[_]] = {Seq(new NarrowDependency(prev) {def getParents(id: Int): Seq[Int] =partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices})}override def clearDependencies() {super.clearDependencies()prev = null}/*** Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,* then the preferred machine will be one which most parent splits prefer too.* @param partition* @return the machine most preferred by split*/override def getPreferredLocations(partition: Partition): Seq[String] = {partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq}
}

对于CoalescedRDD来讲getPartitions方法是最核心的方法。旧的parition对应哪些新的partition就是在这个方法里计算出来的。具体的算法是在DefaultPartitionCoalescercoalesce方法体现出来的。

compute方法是在新的task中执行的,即分区已经重新分配好,并且拉取父RDD指定parition对应的元素提供给下游迭代器计算。

图示

写下来用两张图解释下是如何repartition

无shuffle

f64a60fdf06c5d550098d544ab9b95bf.png

有shuffle

4e759bdcdeda0607f968367be31f6c98.png

总结

以上repartition的逻辑基本就已经介绍完了。其中DefaultPartitionCoalescer中重新分区的算法逻辑并没有展开说。这里以后如果有时间会再写一篇详细介绍。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/533412.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

550什么意思_布草知识 | 都是羽绒,为什么价格大不同?

为什么羽绒的价格有些便宜有些这么贵呢?让小编来告诉你!市面上羽绒制品这么多,从几百到上万都有,中间的区别到底在哪里呢?接下来,我们就来说说,它们到底贵在哪里~01暖不暖,看蓬松度羽…

可调整大小的div_气液增压缸运行速度的调整以及压力的调节方式

点击蓝字 关注我们增压缸的行程及吨位绝对了设备整体速度,以下的调整只是在原基础上起到微调的作用。(1)气液增压缸时间调整:增压缸四个步骤动作是依靠时间继电器来控制的,可根据不同的产品的需求,通过时间继电器来调整每个步骤的…

pytest测试实战 电子书_电子书丨Selenium 3+Python 3自动化测试项目实战:从菜鸟到高手...

▊《Selenium 3Python 3自动化测试项目实战:从菜鸟到高手》田春成 著电子书售价:39.5元2019年9月出版Selenium是目前非常流行的一种自动化测试工具。本书基于Python 3语言讲述了新的Selenium 3的基本理论与操作,涉及各种高级应用,…

phpcms移动端和pc端_移动端调试大法

文章:樊秀宝(北京中心—小易F8技术小组)排版:suny在日常项目中的开发中,接触移动端开发的小伙伴们免不了要和移动端调试打交道。本文总结了常用的移动端调试方法,欢迎大家学习和补充。01谷歌浏览器谷歌浏览器是我们前端开发中必不…

redis 中一个字段 修改map_CTO 指名点姓让我带头冲锋,熬了一个通宵,终于把Redis中7千万个Key删完了...

由于有一条业务线不理想,高层决定下架业务。对于我们技术团队而言,其对应的所有服务器资源和其他相关资源都要释放。释放了 8 台应用服务器;1 台 ES 服务器;删除分布式定时任务中心相关的业务任务;备份并删除 MySQL 数…

太阳花图片_长寿花扔水里,光长叶不开花?赶紧加点营养液

养个花可不简单,春天一到还得操心换盆、换土,如果你像偷懒的话,还不如养些能水培的花,给它一杯水就够了,实在太省心啦!铜钱草铜钱草实在太好养了,摘一枝放在水杯里就能活,还挺有意境…

hystrix 全局熔断_跟我学Spring Cloud(Finchley版)14Feign使用Hystrix

Feign默认已经整合了Hystrix,本节详细探讨Feign使用Hystrix的具体细节。服务降级1 加配置,默认Feign是不启用Hystrix的,需要添加如下配置启用Hystrix,这样所有的Feign Client都会受到Hystrix保护!feign:hystrix:enable…

ubuntu 改屏幕分辨率命令_Ubuntu被曝严重漏洞!!!

GitHub安全研究员Kevin Backhouse发现的一个Ubuntu系统大漏洞。无需系统密码,就能添加新的sudo用户、获取root权限,事后还能删除不留痕迹。这种攻击方法非常简单,Backhouse在官方博客中写道:“使用终端中的一些简单命令&#xff0…

swag您的装置不支持_一件充满意境的中国风水墨粒子、电子屏风交互装置

不久前有人留言怎么用粒子做水墨,今天投石科技给大家分享个水墨粒子装置作品案例,大家可以发挥自己的想象去做中国风的一些东西,希望能对大家有些帮助吧。《墨迹》这是一个数字山水画的交互装置,它通过摄像头捕捉手部运动进行互动…

弱电工程集成商_弱电工程楼宇自控系统基础知识培训资料

前言:弱电行业里面楼宇自控系统是非常难的一个子系统,涉及到很多其他专业,楼宇自控系统的设计一般为厂家设计,但是也有系统集成商来设计的,楼宇自控系统主要学习它的控制原理,学习完以后学习DDC箱子的绘制&…

删除单元格_VBA(实验1)用VBA 删除某列空单元格的3种方法:删除法,转移到其他列方法,数组方法...

1 要解决的问题:删除某列中的空单元格/空行暂时只实现了删除一列中的空行,并没有实现多行的判断空行和删除方法。----之后再做更复杂的1.1 需求分析用VBA删除如下内容,解决思路都不同删除1列的空行(本文要做的)删除整个…

winpe制作u盘启动盘_怎么制作u盘启动盘 u盘启动盘制作方法【介绍】

使用u盘装系统时就需要先将u盘制作成一个启动u盘,这样才能够通过u盘启动装系统操作,那么 如何制作u盘启动盘 呢?为此,今天我们就为小伙伴们详细的介绍 怎样制作u盘启动盘 的操作。制作u盘启动盘准备工作:① 、准备一个空间容量大…

插入空行_如何一键插入表格空行,这个方法才最高级!

100万职场人都在看后台回复礼包领199元职场干货很久很久之前,小可教过大家如何一键删除空行,回顾请戳→《如何一键删除表格空行,这个方法才最高级!》这次,小可反过来,教大家如何一键插入很多空行&#xff0…

的控制台主题_【12.11最新版】芯片机/大气层主题软件NXThemesInstaller

Switch的主题的安装和管理主要通过自制软件——NXThemesInstaller软件地址:https://github.com/exelix11/SwitchThemeInjector本文只传了工具,主题需要自行去下载,可以按照自己喜欢的更换!!教程简单概括如下这是最常见…

数据卡片_E015 如何批量汇总工作簿数据,形成独立工作簿信息卡片

Hi,How are you doing?我是职场编码(CodeVoc)。在E000中,我们介绍了Node.js、Ruby、Electron等工具下载安装。这期,给你演示一下由Electron联合Ruby制作的小工具。知乎视频​www.zhihu.com借助Electron官方Demo&#…

redis哨兵模式没有切换主机_Redis哨兵(Sentinel)模式

Redis哨兵(Sentinel)模式在这里插入图片描述一、主从复制高可用当我们使用主从复制出现的问题手动故障转移写能力和存储能力受限主从复制 -master 宕机故障处理主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,…

mysql rpm 安装6_linux6.5 RPM方式安装 mysql5.6

步骤一、检查下linux是不是已经安装了MySQL# rpm -qa|grep mysqlmysql-libs-5.1.71-1.el6.x86_64# rpm -e --nodeps mysql-libs-5.1.71-1.el6.x86_64 //卸载# find / -name mysql//有mysql文件夹的话,把mysql的文件夹删掉步骤二、下载需要的安装包,下载地…

阿里云rds for mysql平台介绍_阿里云RDS for MySQL 快速入门——笔记

1初始化配置1.1设置白名单创建RDS实例后,需要设置RDS实例的白名单,以允许外部设备访问该RDS实例。默认的白名单只包含默认IP地址127.0.0.1,表示任何设备均无法访问该RDS实例。设置白名单包括两种操作:设置IP白名单:添加…

mysql数据库下载压缩包_mysql 8.0.22 zip压缩包版(免安装)下载、安装配置步骤详解...

大家好,今天我在学习 MySQL 8.0.22安装及配置遇到了一些问题,特地将我整个安装过程分享出来希望可以帮助不会安装的小伙伴😜。参考链接第一步 MySQL的下载进入MySQL官网下载,按下图所示步骤操作,耐心等待下载完成就可以…

mysql 5.5 主从同步问题_MySQL 5.5 主从复制异步、半同步以及注意事项详解

大纲一、前言二、Mysql 基础知识三、Mysql 复制(Replication)四、Mysql 复制(Replication)类型五、Mysql 主从复制基本步骤六、Mysql 主从复制(异步)七、Mysql 主从复制(半同步)八、Mysql 复制工具九、Mysql 复制注意事项十、Mysql 复制过滤一、前言从这一篇博客开始我们就来学…