八股文系列Spark

为什么Spark 比 MapReduce 更快

DAG相比hadoop的mapreduce在大多数情况下可以减少磁盘I/O次数

        mapreduce通常需要将计算的中间结果写入磁盘,然后还要再读取磁盘,从而导致了频繁的磁盘IO

        spark通常不需要将计算的中间结果写入磁盘,只有shuffle时或中间内存不足才会落盘,这得益于spark的RDD和DAG,中间结果已RDD的形式保存在内存中,且能从DAG中快速恢复,大大减少了磁盘IO

spark shuffle 的优化

        mapreduce在shuffle时默认进行排序,spark在shuffle时则只有部分场景才需要排序(bypass机制不需要排序),排序是非常耗时的,这样就可以加快shuffle速度

spark支持将需要反复用到的数据进行缓存

        所以对于下次再次使用此rdd时,不再再次计算,而是直接从缓存中获取,因此可以减少数据加载耗时,所以更适合需要迭代计算的机器学习算法

任务级别并行度上的不同

        mapreduce采用多进程模型,而spark采用了多线程模型,多进程模型的好处是便于细粒度控制每个任务占用的资源,但每次任务的启动都会消耗一定的启动时间,即mapreduce的map task 和reduce task是进程级别的,都是jvm进程,每次启动都需要重新申请资源,消耗不必要的时间,而spark task是基于线程模型的,通过复用线程池中的线程来减少启动,关闭task所需要的开销(多线程模型也有缺点,由于同节点上所有任务运行在一个进行中,因此,会出现严重的资源争用,难以细粒度控制每个任务占用资源)

Spark 的 shuffle介绍

        在 Spark 的源码中,负责 shuffle 过程的执行、计算和处理的组件主要就是ShuffleManager,也即 shuffle 管理器。在 Spark 1.2 以前,默认的shuffle计算引擎是 HashShuffleManager。该 ShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘 IO操作影响了性能。
     因此在Spark 1.2以后的版本中,默认的 ShuffleManager 改成了 SortShuffleManager。 SortShuffleManager 相较于 HashShuffleManager 来说,有了一定的改进。主要就在于,每个 Task 在进行 shuffle 操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并 (merge)成一个磁盘文件,因此每个 Task 就只有一个磁盘文件。在下一个stage的 shuffle read task 拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

未经优化的HashShuffleManager

        默认每个 Executor 只有 1 个CPU core,也就是说,无论这个 Executor 上分配多少个 task 线程,同一时间都只能执行一 个 task 线程。

shuffle write阶段

        是在一个stage结束计算之后,为了下一个 stage可以执行shuffle类的算子,而将每个task处理的数据按key进行“分类”。所 谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中

shuffle read阶段

        是一个stage刚开始时要做的事情。此时该stage的 每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给下游stage的每个 task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可

        shuffle read 的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

生成文件数样例

下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件,例如:
        下一个stage总共有100个task, 那么当前stage的每个task都要创建100份磁盘文件。如果当前stage共有10个Executor,每个Executor执行5个Task,一共有50个task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。

优化后的HashShuffleManager

        这里说的优化,是指我们可以设置一个参数, spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说建议开启这个选项。

        开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内

        当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的 shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不 会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效 将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

生成文件数样例

        假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执 行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件, 所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数 量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建 100个磁盘文件,所有Executor只会创建1000个磁盘文件。

SortShuffleManager的普通运行机制

  1. 数据会先写入一个内存数据结构中,此 时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算 子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle 算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
  2. 在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数 据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式 分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。 BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一 次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
  3. 一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文 件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文 件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
生成文件数样例

        比如第一个stage 有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每 个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件

SortShuffleManager-bypass机制

bypass运行机制的触发条件

1、shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
2、算子不能有 map 端的预聚合操作(比如reduceByKey)。

  • 此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash 值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
  • 该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的 磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经 优化的HashShuffleManager来说,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不同在于:
  1. 磁盘写机制不同;
  2. 不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

Spark shuffle 相关的参数优化

spark.shuffle.file.buffer

默认值:32k
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数 据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。 
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k), 从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在 实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.reducer.maxSizeInFlight

默认值:48m
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。 
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如 96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发 现,合理调节该参数,性能会有1%~5%的提升。

spark.shuffle.io.maxRetries

默认值:3
参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络 异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数 之内拉取还是没有成功,就可能会导致作业执行失败。 
调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60 次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于 针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

spark.shuffle.io.retryWait

默认值:5s

参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。 
调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

spark.shuffle.memoryFraction

默认值:0.2
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默 认是20%。 
调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager

默认值:sort
参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort 和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的 版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。 
调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排 序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行 排序,那么建议通过bypass机制或优化的HashShuffleManager来避 免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前 发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

默认值:200
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个 阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的 HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成 一个文件,并会创建单独的索引文件。 
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

默认值:false 
参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启 consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。 
调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以 尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启 consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高 出10%~30%。

Spark 与 MapReduce 的 shuffle 的区别

整体功能

        两者并没有大的差别。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce(Spark 里可能是后续的一系列操作)。

实现流程

        两者差别不小。 Hadoop MapReduce 是 sort-based,进入 combine和 reduce的 records 必须先 sort。这样的好处在于 combine/reduce可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。以前 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行合并,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey的操作。在Spark 1.2之后,sort-based变为默认的Shuffle实现。

Spark 3.0的新特性对比Spark2.0

动态分区裁剪

        即Dynamic Partition Pruning,在3.0以前不支持动态分区,所谓的动态分区是针对分区表中多个表进行join的时候运行时(runtime)推断出来的信息,在on后面的条件语句满足一定的要求后就会进行自动动态分区裁剪优化。

3.0以前:

3.0以后:

自适应查询执行

        即Adaptive Query Execution,指对执行计划按照实际数据分布和组织情况,评估其执行所消耗的时间和资源,从而选择代价最小的计划执行。

  • 减少 Reducer 的数量
  • 将 Sort Merge Join 转换为 Broadcast Hash Join
  • 处理数据倾斜
3.0以前:

3.0以后:

加速器感知调度

        即Accelerator-aware Scheduling,在Spark3.0版本,支持在Standalone、YARN以及Kubernetes资源管理器下支持GPU,并且对现有正常的作业基本没影响。后续将支持TPU    

Spark 3.0 在 Kubernetes 上有更多的功能:
  • 支持使用 pod 模板来定制化 driver 和 executor 对应的 pods
  • 支持动态资源申请,资源空闲的时候,减少 executor 数量,资源紧张的时候,动态的加入一些 executor
  • 支持外置的 shuffle 服务,将 shuffle 服务放在独立的 pod 里,能够解耦成一个架构

谈谈你对RDD 的理解

        它翻译过来就叫做弹性分布式数据集,是一种数据结构,可以理解成是一个集合。在代码中的话,RDD 是一个抽象类。还有一个非常重要的特点: RDD是不保存数据的,仅仅封装了计算逻辑,也就是你直接打印 RDD 是看不见具体值的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/27496.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++ | const成员】一文了解类的 const数据成员、const成员函数、const对象、mutable 数据成员

😁博客主页😁:🚀https://blog.csdn.net/wkd_007🚀 🤑博客内容🤑:🍭嵌入式开发、Linux、C语言、C、数据结构、音视频🍭 ⏰发布时间⏰:2024-06-14 2…

新一代大核卷积反超ViT和ConvNet!同参数量下性能、精度、速度完胜

大核卷积网络是CNN的一种变体,也是深度学习领域的一种重要技术,它使用较大的卷积核来处理图像数据,以提高模型对视觉信息的理解和处理能力。 这种类型的网络能够捕捉到更多的空间信息,因为它的大步长和大感受野可以一次性覆盖图像…

C++语法08 数据类型之间的强制转换

目录 强制类型转换 强制类型转换格式 整型转换成浮点型 整型转换成浮点型其他写法 训练:糖果奖励 糖果奖励参考代码 浮点型转换成整型 浮点型转换成整型其他写法 训练:分离小数 分离小数参考代码 强制类型转换 强制类型转换,就是把…

如何应对生活中的不确定性:仁者安仁,知者利仁。

有较高自尊水平的人,接近于孔子说的:仁者。 ——— 有着稳定的高自尊,无论外在环境如何变化,对其影响都不大,他能够愉快地生活。 相反:一个人处于低自尊状态,就会活得很痛苦,对自己…

基于MCU平台的HMI开发的性能优化与实战(上)

随着汽车座舱智能化的不断演进,车内显示设备的数量显著增加,从传统的仪表盘和中控屏扩展至空调控制、扶手、副驾驶区域以及抬头显示(HUD)等多样化的显示单元。为了有效支持这些功能单元,同时控制整车成本,越…

手机在网状态-手机在网状态查询-手机在网站状态接口

查询手机号在网状态,返回正常使用、停机、未启用/在网但不可用、不在网(销号/未启用/异常)、预销户等多种状态 直连三大运营商,实时更新,可查询实时在网状态 高准确率-实时更新,准确率99.99% 接口地址&…

54.Python-web框架-Django-免费模板django-datta-able

1.Datta Able Django介绍 Detta Able Djiango是什么 Datta Able Django 是一个由AppSeed提供的开源Django管理面板,基于现代设计,为开发者提供了一流的功能和优雅的界面。它源自CodedThemes的高风格化Bootstrap 4模板——Datta Able Bootstrap Lite&…

python-基础篇-文件和异常

文章目录 文件和异常读写文本文件读写二进制文件读写JSON文件 文件和异常 实际开发中常常会遇到对数据进行持久化操作的场景,而实现数据持久化最直接简单的方式就是将数据保存到文件中。说到“文件”这个词,可能需要先科普一下关于文件系统的知识&#…

什么是快乐?

什么是快乐? What is Happiness? 1. 快乐不是追求外在的物质,而是内心的平静与满足。当我们学会感恩,懂得珍惜眼前的一切,心中自然会充满喜悦。快乐并非来自拥有更多,而是感受到已经拥有的足够。每一天都怀抱感激之情…

qt如何在linux平台上设置编译生成windows程序文件,跨平台?

在开始前刚好我有一些资料,是我根据网友给的问题精心整理了一份「qt的资料从专业入门到高级教程」, 点个关注在评论区回复“888”之后私信回复“888”,全部无偿共享给大家!!!QT本来目标就是跨平台&#xf…

Commons-Collections篇-CC4链分析

前言 因为 CommonsCollections4 除 4.0 的其他版本去掉了 InvokerTransformer 继承 Serializable,导致该方法无法序列化。 同时 CommonsCollections 4的版本 TransformingComparator 继承了 Serializable接口,而CommonsCollections 3里是没有的&#xf…

hrm人力管理系统源码(从招聘到薪酬的全过程人力管控系统)

一、项目介绍 一款全源码可二开,可基于云部署、私有部署的企业级数字化人力资源管理系统,涵盖了招聘、人事、考勤、绩效、社保、酬薪六大模块,解决了从人事招聘到酬薪计算的全周期人力资源管理,符合当下大中小型企业组织架构管理运…

Stringboot

一、概述 springboot是spring家族中的一个全新框架,用来简化spring程序的创建和开发过程。在以往我们通过SpringMVCSpringMybatis框架进行开发的时候,我们需要配置web.xml,spring配置,mybatis配置,然后整合在一起&…

django.db.utils.NotSupportedError: MySQL 8 or later is required (found 5.7.33).

django.db.utils.NotSupportedError: MySQL 8 or later is required (found 5.7.33). 一、原因分析 在新版的Django默认需要MySQL 8或更高版本,才能运行。 二、解决办法 1、升级mysql数据库版本 只需要将mysql版本升级到8.0,即可解决,当然这…

基于esp8266_点灯blinker_智能家居

文章目录 一 实现思路1 项目简介2 项目构成3 代码实现4 外壳部分 二 效果展示UI图片 一 实现思路 摘要:esp8266,mixly,点灯blinker,物联网,智能家居,3donecut 1 项目简介 1 项目效果 通过手机blinker app…

宝藏速成秘籍(3)选择排序法

一、前言 1.1、概念 选择排序法(Selection Sort)是一种简单直观的排序算法。它的基本思想是:每次从待排序的数组中选择最小(或最大)的元素,将其放在已排序部分的末尾,直到所有元素都排序完毕。…

Unet心电信号分割方法(Pytorch)

心血管疾病是一种常见病,严重影响人们的健康及日常生活。 近年来随着人们生活习惯的不断变化,心血管疾病对人们影响愈加明显,发病率呈现出逐年攀升的趋势,心血管疾病是中国城乡居民死亡的首要原因。心电图ECG已被广泛用于研究心跳…

光学雨量监测站:科技赋能,精准监测降水过程

TH-YJ3随着科技的不断进步,光学雨量监测站作为一种先进的降水监测设备,正逐渐在气象、水文、农业等领域发挥重要作用。光学雨量监测站以其高精度、高可靠性、实时性强的特点,为降水数据的收集和分析提供了强有力的支持,为相关领域…

Nginx负载均衡之长连接负载均衡

当客户端通过浏览器访问 HTTP 服务器时,HTTP 请求会通过 TCP 协议与 HTTP 服务器建立一条访问通道,当本次访问数据传输完毕后,该 TCP 连接会立即被断开,由于这个连接存在的时间很短,所以 HTTP 连接也被称为短连接。 …

Lua实现自定义函数面向对象编程

本文目录 1、引言2、原理3、实例4、层析验证 文章对应视频教程: 暂无,可以关注我的B站账号等待更新。 点击图片或链接访问我的B站主页~~~ 1、引言 在现代软件开发中,面向对象编程(OOP)已经成为一种广泛使用的编程范式…