Spark Job优化

1 Map端优化

1.1 Map端聚合

map-side预聚合,就是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。

RDD的话建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

SparkSQL本身的HashAggregte就会实现本地预聚合+全局聚合。

1.2 读取小文件优化

读取的数据源有很多小文件,会造成查询性能的损耗,大量的数据分片信息以及对应产生的Task元信息也会给Spark Driver的内存造成压力,带来单点问题。

设置参数:

spark.sql.files.maxPartitionBytes=128MB   默认128m

spark.files.openCostInBytes=4194304     默认4m

参数(单位都是bytes):

  • maxPartitionBytes:一个分区最大字节数。
  • openCostInBytes:打开一个文件的开销。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g --class com.atguigu.sparktuning.map.MapSmallFileTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

源码理解: DataSourceScanExec.createNonBucketedReadRDD()

FilePartition. getFilePartitions()

1)切片大小= Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

计算totalBytes的时候,每个文件都要加上一个open开销

defaultParallelism就是RDD的并行度

2)当(文件1大小+ openCostInBytes)+(文件2大小+ openCostInBytes)+…+(文件n-1大小+ openCostInBytes)+ 文件n <= maxPartitionBytes时,n个文件可以读入同一个分区,即满足: N个小文件总大小 (N-1*openCostInBytes <=  maxPartitionBytes的话。

1.3 增大map溢写时输出流buffer

1mapShuffle Write有一个缓冲区初始阈值5m,超过会尝试增加到2*当前使用内存。如果申请不到内存,则进行溢写。这个参数是internal,指定无效(见下方源码)。也就是说资源足够会自动扩容,所以不需要我们去设置。

2)溢写时使用输出流缓冲区默认32k,这些缓冲区减少了磁盘搜索和系统调用次数,适当提高可以提升溢写效率。

3Shuffle文件涉及到序列化,是采取的方式读写,默认按照每批次1万条去读写。设置得太低会导致在序列化时过度复制,因为一些序列化器通过增长和复制的方式来翻倍内部数据结构。这个参数是internal,指定无效(见下方源码)。

综合以上分析,我们可以调整的就是输出缓冲区的大小。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.map.MapFileBufferTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

源码理解:

2 Reduce端优化

2.1 合理设置Reduce数

过多的cpu资源出现空转浪费,过少影响任务性能。关于并行度、并发度的相关参数介绍,在2.2.1中已经介绍过。

2.2 输出产生小文件优化

1Join后的结果插入新表

join结果插入新表,生成的文件数等于shuffle并行度,默认就是200份文件插入到hdfs上。

解决方式:

1)可以在插入表数据前进行缩小分区操作来解决小文件过多问题,如coalescerepartition算子

2)调整shuffle并行度。根据2.2.2的原则来设置。

2、动态分区插入数据

1)没有Shuffle的情况下。最差的情况下,每个Task中都有表各个分区的记录,那文件数最终文件数将达到  Task数量 * 表分区数。这种情况下是极易产生小文件的。

INSERT overwrite table A partition ( aa ) 

SELECT * FROM B;

2)有Shuffle的情况下,上面的Task数量 就变成了spark.sql.shuffle.partitions(默认值200)。那么最差情况就会有 spark.sql.shuffle.partitions * 表分区数。

当spark.sql.shuffle.partitions设置过大时,小文件问题就产生了;当spark.sql.shuffle.partitions设置过小时,任务的并行度就下降了,性能随之受到影响。

最理想的情况是根据分区字段进行shuffle,在上面的sql中加上distribute by aa。把同一分区的记录都哈希到同一个分区中去,由一个Spark的Task进行写入,这样的话只会产生N个文件, 但是这种情况下也容易出现数据倾斜的问题。

解决思路:

结合第4章解决倾斜的思路,在确定哪个分区键倾斜的情况下,将倾斜的分区键单独拎出来:

将入库的SQL拆成(where 分区 !倾斜分区键 )和 (where 分区 = 倾斜分区键) 几个部分,非倾斜分区键的部分正常distribute by 分区字段,倾斜分区键的部分 distribute by随机数,sql如下:

//1.非倾斜键部分

INSERT overwrite table A partition ( aa ) 

SELECT *

FROM B where aa != 大key

distribute by aa;

//2.倾斜键部分

INSERT overwrite table A partition ( aa ) 

SELECT *

FROM B where aa = 大key

distribute by cast(rand() * 5 as int);

案例实操:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.reduce.DynamicPartitionSmallFileTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

2.3 增大reduce缓冲区,减少拉取次数

Spark Shuffle过程中,shuffle reduce task的buffer缓冲区大小决定了reduce task每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。

reduce端数据拉取缓冲区的大小可以通过spark.reducer.maxSizeInFlight参数进行设置,默认为48MB。

源码:BlockStoreShuffleReader.read()

2.4 调节reduce端拉取数据重试次数

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

reduce端拉取数据重试次数可以通过spark.shuffle.io.maxRetries参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为3:

2.5 调节reduce端拉取数据等待间隔

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如60s),以增加shuffle操作的稳定性。

reduce端拉取数据等待间隔可以通过spark.shuffle.io.retryWait参数进行设置,默认值为5s。

综合2.32.42.5,案例实操:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.reduce.ReduceShuffleTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

2.6 合理利用bypass

当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200)且不需要map端进行合并操作,则shuffle write过程中不会进行排序操作,使用BypassMergeSortShuffleWriter去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

当你使用SortShuffleManager时,如果确实不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

源码分析:SortShuffleManager.registerShuffle()

SortShuffleManager.getWriter()

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.reduce.BypassTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

3 整体优化

3.1 调节数据本地化等待时长

在 Spark 项目开发阶段,可以使用 client 模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的 Task 数据本地化的级别,如果大部分都是 PROCESS_LOCAL、NODE_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是 RACK_LOCAL、ANY,那么需要对本地化的等待时长进行调节,应该是反复调节,每次调节完以后,再来运行观察日志,看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短。

注意过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得 Spark 作业的运行时间反而增加了。

下面几个参数,默认都是3s,可以改成如下:

spark.locality.wait //建议6s、10s

spark.locality.wait.process //建议60s

spark.locality.wait.node //建议30s

spark.locality.wait.rack //建议20s

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.job.LocalityWaitTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

3.2 使用堆外内存

1、堆外内存参数

讲到堆外内存,就必须去提一个东西,那就是去yarn申请资源的单位,容器。Spark  on yarn模式,一个容器到底申请多少内存资源。

一个容器最多可以申请多大资源,是由yarn参数yarn.scheduler.maximum-allocation-mb决定, 需要满足:

spark.executor.memoryOverhead + spark.executor.memory + spark.memory.offHeap.size

≤ yarn.scheduler.maximum-allocation-mb

参数解释:

  • spark.executor.memory:提交任务时指定的堆内内存。
  • spark.executor.memoryOverhead:堆外内存参数,内存额外开销。

默认开启,默认值为spark.executor.memory*0.1并且会与最小值384mb做对比,取最大值。所以spark on yarn任务堆内内存申请1个g,而实际去yarn申请的内存大于1个g的原因。

  • spark.memory.offHeap.size:堆外内存参数,spark中默认关闭,需要将spark.memory.enable.offheap.enable参数设置为true。

注意:很多网上资料说spark.executor.memoryOverhead包含spark.memory.offHeap.size,这是由版本区别的,仅限于spark3.0之前的版本。3.0之后就发生改变,实际去yarn申请的内存资源由三个参数相加。

测试申请容器上限:

yarn.scheduler.maximum-allocation-mb修改为7G,将三个参数设为如下大于7G,会报错:

spark-submit --master yarn --deploy-mode client --driver-memory 1g  --num-executors 3 --executor-cores 4 --conf  spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=2g  --executor-memory 5g  --class com.atguigu.sparktuning.join.SMBJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

spark.memory.offHeap.size修改为1g后再次提交:

spark-submit --master yarn --deploy-mode client --driver-memory 1g  --num-executors 3 --executor-cores 4 --conf  spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1g  --executor-memory 5g  --class com.atguigu.sparktuning.join.SMBJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

2、使用堆外缓存

使用堆外内存可以减轻垃圾回收的工作,也加快了复制的速度。

当需要缓存非常大的数据量时,虚拟机将承受非常大的GC压力,因为虚拟机必须检查每个对象是否可以收集并必须访问所有内存页。本地缓存是最快的,但会给虚拟机带来GC压力,所以,当你需要处理非常多GB的数据量时可以考虑使用堆外内存来进行优化,因为这不会给Java垃圾收集器带来任何压力。让JAVA GC为应用程序完成工作,缓存操作交给堆外。

spark-submit --master yarn --deploy-mode client --driver-memory 1g  --num-executors 3 --executor-cores 4 --conf  spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1g  --executor-memory 5g  --class com.atguigu.sparktuning.job.OFFHeapCache spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

3.3 调节连接等待时长

在Spark作业运行过程中,Executor优先从自己本地关联的BlockManager中获取某份数据,如果本地BlockManager没有的话,会通过TransferService远程连接其他节点上Executor的BlockManager来获取数据。

如果task在运行过程中创建大量对象或者创建的对象较大,会占用大量的内存,这回导致频繁的垃圾回收,但是垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark的Executor进程就会停止工作,无法提供相应,此时,由于没有响应,无法建立网络连接,会导致网络连接超时。

在生产环境下,有时会遇到file not found、file lost这类错误,在这种情况下,很有可能是Executor的BlockManager在拉取数据的时候,无法建立连接,然后超过默认的连接等待时长120s后,宣告数据拉取失败,如果反复尝试都拉取不到数据,可能会导致Spark作业的崩溃。这种情况也可能会导致DAGScheduler反复提交几次stage,TaskScheduler反复提交几次task,大大延长了我们的Spark作业的运行时间。

为了避免长时间暂停(如GC)导致的超时,可以考虑调节连接的超时时长,连接等待时长需要在spark-submit脚本中进行设置,设置方式可以在提交时指定:

--conf spark.core.connection.ack.wait.timeout=300s

调节连接等待时长后,通常可以避免部分的XX文件拉取失败、XX文件lost等报错。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 1g --conf spark.core.connection.ack.wait.timeout=300s --class com.atguigu.sparktuning.job.AckWaitTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/140172.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于轻量级卷积神经网络CNN开发构建打架斗殴识别分析系统

在很多公共场合中&#xff0c;因为一些不可控因素导致最终爆发打架斗殴或者大规则冲突事件的案例层出不穷&#xff0c;基于视频监控等技术手段智能自动化地识别出已有或者潜在的危险行为对于维护公共场合的安全稳定有着重要的意义。本文的核心目的就是想要基于CNN模型来尝试开发…

Mathtype公式自动转Word自带公式

Mathtype公式自动转Word自带公式 前言/word技巧探索过程参考资料&#xff08;有效与无效&#xff09;全自动方案/代码/教程 前言/word技巧 word公式 用ALT号可以输入简单latex显示公式&#xff1b;复杂度&#xff0c;需要引入latex包的不行&#xff1b;显示不出来的话按一下en…

CentOS指令学习

目录 一、常用命令 1、ls 2、cd_pwd 3、touch_mkdir_rmdir_rm 4、cp_mv 5、whereis_which_PATH 6、find 7、grep 8、man_help 9、关机与重启 二、压缩解压 1、zip_unzip 2、gzip_gunzip 3、tar 三、其他指令 1、查看用户登录信息 2、磁盘使用情况 3、查看文件…

74HC165 并入串出

/******************************************************** 程序名&#xff1a;main.C 版 本&#xff1a;Ver1.0 芯 片&#xff1a;AT89C51或STC89C51 晶 体&#xff1a;片外12MHz 编 程: Joey 日 期&#xff1a;2023-11-13 描 述&#xff1a;通过 74HC165 对 16 按键…

Radius是什么意思? 安当加密

Radius是什么意思&#xff1f; RADIUS&#xff08;Remote Authentication Dial In User Service&#xff09;是一种远程用户拨号认证系统&#xff0c;它由RFC 2865和RFC 2866定义&#xff0c;是应用最广泛的AAA&#xff08;Authentication、Authorization、Accounting&#xf…

Codeforces Round 788 (Div. 2) E. Hemose on the Tree(树上构造)

题目 t(t<5e4)组样例&#xff0c;每次给定一个数p&#xff0c; 表示一棵节点数为的树&#xff0c; 以下n-1条边&#xff0c;读入树边 对于n个点和n-1条边&#xff0c;每个点需要赋权&#xff0c;每条边需要赋权&#xff0c; 权值需要恰好构成[1,2n-1]的排列 并且当你赋…

《网络协议》04. 应用层(DNS DHCP HTTP)

title: 《网络协议》04. 应用层&#xff08;DNS & DHCP & HTTP&#xff09; date: 2022-09-05 14:28:22 updated: 2023-11-12 06:55:52 categories: 学习记录&#xff1a;网络协议 excerpt: 应用层、DNS、DHCP、HTTP&#xff08;URI & URL&#xff0c;ABNF&#xf…

【数据结构初阶】顺序表SeqList

描述 顺序表我们可以把它想象成在一个表格里面填数据&#xff0c;并对数据做调整&#xff1b; 那我们的第一个问题是&#xff1a;怎么样在创建出足够的空间呢&#xff1f; 我们可以去堆上申请&#xff0c;用一个指针指向一块空间&#xff0c;如果申请的空间不够&#xff0c;我…

Linux 部署Sentinel控制台

Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件&#xff0c;主要以流量为切入点&#xff0c;从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。 1.版本选择 SpringCloudAlibaba SpringClo…

已解决:java.net.BindException: 地址已在使用

解决zookeeper报错&#xff1a;java.net.BindException: 地址已在使用&#xff0c;是因为端口被占用。显示Starting zookeeper ... STARTED&#xff0c;jps没有QuorumPeerMain进程。 问题截图&#xff1a; 看似Starting zookeeper ... STARTED&#xff0c;实则集群并没有启动…

2023.11.9 IDEA 配置 Lombok

目录 什么是 Lombok 如何使用 Lombok Lombok 的 Data 注解 什么是 Lombok Lombok 是一个 Java 库&#xff0c;能自动插入编译器并构建工具&#xff0c;简化 Java 开发它通过注解实现这一目的&#xff0c;可用来帮助开发人员消除 Java 的冗长代码&#xff0c;尤其是对于简单…

终端安全/SOC安全/汽车信息安全大课来袭-共计204节课

在近两年的时间里&#xff0c;我投入了大量的心血和精力&#xff0c;不仅创作了数千篇精美的图片&#xff0c;还编写了超过1000篇文章&#xff0c;以及数百篇内容丰富的PPT。经过这番努力我终于成功地构建出两套系统化的学习课程&#xff0c;它们分别是“Trustzone/TEE/安全从入…

什么是 CASB,在网络安全中的作用

数字化转型正在稳步攀升&#xff0c;组织现在越来越关注在线生产力系统和协作平台&#xff0c;各行各业的企业都采用了不同的云基础设施服务模式。云基础架构提供按需服务&#xff0c;可提高易用性、访问控制、内容协作和减少内部存储资源&#xff0c;以及许多其他好处。迁移到…

go学习之接口知识

文章目录 接口1.接口案例代码展示2.基本介绍3.基本语法4.应用场景介绍5.注意事项和细节6.接口编程经典案例7.接口与继承之间的比较8.面向对象编程--多态1&#xff09;基本介绍2&#xff09;快速入门3&#xff09;接口体现多态的两种形式 9.类型断言1&#xff09;先看一个需求2&…

登顶request模块

华子目录 Requests介绍安装requests模块常用方法常用属性实例引入各种请求方式基于get请求带参数的get请求推荐写法 基于post请求添加headers信息content获取二进制数据bytes类型获取json数据第一种方式第二种方式 response响应状态码判断 高级操作会话维持通过cookie维持会话通…

【Vue3】scoped 和样式穿透

我们使用很多 vue 的组件库&#xff08;element-plus、vant&#xff09;&#xff0c;在修改样式的时候需要进行其他操作才能成功更改样式&#xff0c;此时就用到了样式穿透。 而不能正常更改样式的原因就是 scoped 标记。 scoped 的渲染规则&#xff1a; <template>&l…

U-Mail邮件中继,让海外邮件沟通更顺畅

在海外&#xff0c;电子邮件是人们主要的通信工具&#xff0c;尤其是商务往来沟通&#xff0c;企业邮箱是标配。这主要是因为西方国家互联网发展较早&#xff0c;在互联网早期&#xff0c;电子邮件技术较为成熟&#xff0c;大家都用电子邮件交流&#xff0c;于是这成了一种潮流…

Android 基本属性绘制文本对象FontMetrics

FontMetrics对象 它以四个基本坐标为基准&#xff0c;分别为&#xff1a; ・FontMetrics.top ・FontMetrics.ascent ・FontMetrics.descent ・FontMetrics.bottom 如图: 要点如下&#xff1a; 1. 基准点是baseline 2. Ascent是baseline之上至字符最高处的距离 3. Descent是ba…

RT-Thread:嵌入式实时操作系统的设计与应用

RT-Thread&#xff08;Real-Time Thread&#xff09;是一个开源的嵌入式实时操作系统&#xff0c;其设计和应用在嵌入式领域具有重要意义。本文将从RT-Thread的设计理念、核心特性&#xff0c;以及在嵌入式系统中的应用等方面进行探讨&#xff0c;对其进行全面的介绍。 首先&a…

SMART PLC MODBUSTCP速度测试

SMART PLC MODBUSTCP通信详细介绍请参看下面文章链接: S7-200SMART PLC ModbusTCP通信(多服务器多从站轮询)_matlab sumilink 多个modbustcp读写_RXXW_Dor的博客-CSDN博客文章浏览阅读6.4k次,点赞5次,收藏10次。MBUS_CLIENT作为MODBUS TCP客户端通过S7-200 SMART CPU上的…