Spark八:Spark性能优化

Spark性能调优

Spark调优的方法,包括RDD使用、文件读取,partition
学习资料:https://mp.weixin.qq.com/s/caCk3mM5iXy0FaXCLkDwYQ

一、Spark调优之RDD算子调优

1.1 RDD复用

在对RDD进行计算时,要避免相同的算子和计算逻辑下对RDD进行重复的计算。

1.2 尽早filter

获取到初始RDD后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升Spark作业的运行效率。

1.3 读取大量小文件-用wholeTextFiles

当我们将一个文本文件读取为 RDD 时,输入的每一行都会成为RDD的一个元素。
也可以将多个完整的文本文件一次性读取为一个pairRDD,其中键是文件名,值是文件内容。

// 这种读法,如果是传递目录,则将目录下所有的文件读取作为RDD,文件路径支持通配符
val input:RDD[String] = sc.textFile("dir/*.log") 

使用wholeTextFiles,返回值为RDD[(String, String)],其中Key是文件的名称,Value是文件的内容。举例:

val filesRDD: RDD[(String, String)] =
sc.wholeTextFiles("D:\\data\\files", minPartitions = 3)
val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n"))
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)

1.4 mapPartition和foreachPartition

1.4.1 mapPartitions

map(_...)表示每一个元素
mapPartitions(_...)表示每个分区的数据组成迭代器
普通的map算子对RDD中的每一个元素进行操作,而mapPartitions算子对RDD中每一个分区进行操作。

如果是普通的map算子,假设一个partition有1万条数据,那么map算子中的function要执行1万次,也就是对每个元素进行操作。
如果是mapPartition算子,由于一个task处理一个RDD的partition,那么一个task只会执行一次function,function一次接收所有的partition数据,效率比较高。

比如,当要把RDD中所有数据通过JDBC写入数据,如果使用map算子,那么需要对RDD中的每一个元素都创建一个数据库连接,这样对资源的消耗很大,如果使用mapPartitions,那么针对一个分区的数据,只需要建立一个数据库连接。

mapPartition算子缺点:对于普通的map操作,一次处理一条数据,如果在处理了2000条数据后内存不足,那么可以将已经处理完的2000条数据从内存中垃圾回收掉;但是如果使用mapPartitions算子,但数据量非常大时,function一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会OOM,即内存溢出

因此:mapPartitions算子适用于数据量不是特别大的时候,此时使用mapPartitions算子对性能的提升效果还是不错的

在项目中,应该首先估算一下RDD的数据量、每个partition的数据量,以及分配给每个Executor的内存资源,如果资源允许,可以考虑使用mapPartitions算子代替map。

1.4.2 foreachPartition

rdd.foreach(_...)表示每一个元素
rdd.foreachPartition(_...)表示每个分区的数据组成的迭代器

在生产环境中,通常使用foreachPartition算子来完成数据库的写入,通过foreachPartition算子的特性,可以优化写数据库的性能。
同mapPartition,如果使用foreach算子完成数据库的操作,由于foreach算子是遍历RDD的每条数据,因此,每条数据都会建立一个数据库连接,这是对资源的极大浪费,因此,对于写数据库操作,我们应当使用foreachPartition算子。

与mapPartitions算子非常相似,foreachPartition是将RDD的每个分区作为遍历对象,一次处理一个分区的数据。


使用foreachPartition 算子后,可以获得以下的性能提升:

  1. 对于我们写的function函数,一次处理一整个分区的数据;
  2. 对于一个分区内的数据,创建唯一的数据库连接;
  3. 只需要向数据库发送一次SQL语句和多组参数;

在生产环境中,全部都会使用foreachPartition算子完成数据库操作。foreachPartition算子存在一个问题,与mapPartitions算子类似,如果一个分区的数据量特别大,可能会造成OOM,即内存溢出。

1.5 filter+coalesce/repartition(减少分区)

在Spark任务中我们经常会使用filter算子完成RDD中数据的过滤,在任务初始阶段,从各个分区中加载到的数据量是相近的,但是一旦进过filter过滤后,每个分区的数据量有可能会存在较大差异,如下图所示:
在这里插入图片描述

上图有两个问题:

  1. 每个partition的数据量变小,如果按照之前与partition相等的task个数去处理当前数据,有点浪费task的计算资源;
  2. 每个partition的数据量不一样,会导致后面的每个task处理每个partition数据的时候,每个task要处理的数据量不同,这很有可能导致数据倾斜问题。

针对上面的问题,可以使用coalesce算子。

repartitioncoalesce都可以用来进行重分区,其中repartition只是coalesce接口中shuffle为true的简易实现,coalesce默认情况下不进行shuffle,但是可以通过参数进行设置。
假设我们希望将原本的分区个数A通过重新分区变为B,那么有以下几种情况:

  1. A>B:
    • A与B相差不大:此时使用coalesce即可,无需shuffle过程。
    • A与B相差大:可以使用coalesce并且不启用shuffle过程,但是会导致合并过程性能低下,所以推荐设置coalesce的第二个参数为true,即启动shuffle过程。
  2. A<B:
    • 此时使用repartition即可,如果使用coalesce需要将shuffle设置为true,否则coalesce无效。

可以在filter操作之后,使用coalesce算子针对每个partition的数据量各不相同的情况,压缩partition的数量,而且让每个partition的数据量尽量均匀紧凑,以便于后面的task进行计算操作,在某种程度上能够在一定程度上提升性能。

注意:local模式是进程内模拟集群运行,已经对并行度和分区数量有了一定的内部优化,因此不用去设置并行度和分区数量。

1.6 并行度设置

Spark作业中的并行度指各个stage的task的数量。
如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费。
Spark官方推荐,task数量应该设置为Spark作业总CPU core数量的2~3倍

之所以没有推荐task数量与CPU core总数相等,是因为task的执行时间不同,有的task执行速度快而有的task执行速度慢,如果task数量与CPU core总数相等,那么执行快的task执行完成后,会出现CPU core空闲的情况。如果task数量设置为CPU core总数的2~3倍,那么一个task执行完毕后,CPU core会立刻执行下一个task,降低了资源的浪费,同时提升了Spark作业运行的效率。

// Spark作业并行度的设置
val conf = new SparkConf().set("spark.default.parallelism", "500")

1.7 repartition/coalesce调节并行度

Spark 中有并行度的调节策略,但是,并行度的设置对于Spark SQL是不生效的,用户设置的并行度只对于Spark SQL以外的所有Spark的stage生效


Spark SQL自己会默认根据hive表对应的HDFS文件的split个数自动设置Spark SQL所在的那个stage的并行度,用户自己通 spark.default.parallelism 参数指定的并行度,只会在没Spark SQL的stage中生效。

由于Spark SQL所在stage的并行度无法手动设置,如果数据量较大,并且此stage中后续的transformation操作有着复杂的业务逻辑,而Spark SQL自动设置的task数量很少,这就意味着每个task要处理为数不少的数据量,然后还要执行非常复杂的处理逻辑。
这就可能表现为第一个有Spark SQL的stage速度很慢,而后续的没有Spark SQL的stage运行速度非常快。

为了解决Spark SQL无法设置并行度和task数量的问题,我们可以使用repartition算子,使用前后对比图如下:
在这里插入图片描述
Spark SQL的并行度和task数量没有办法改变,但是对于Spark SQL查询出来的RDD,立刻使用repartition算子,重新进行分区。从repartition之后的RDD操作,由于不再涉及Spark SQL,因此stage的并行度就会等于手动设置的值,这样就避免了Spark SQL所在的stage只能用少量的task去处理大量数据并执行复杂的算法逻辑。使用repartition算子的前后对比如上图所示。

1.8 reduceByKey本地预聚合

reduceByKey相较于普通的shuffle操作一个显著的特点就是会进行map端的本地聚合,map端会先对本地的数据进行combine操作,然后将数据写入给下个stage的每个task创建的文件中,也就是在map端,对每一个key对应的value,执行reduceByKey算子函数。
使用reduceByKey对性能的提升:

  1. 本地聚合后,在map端的数据量变少,减少了磁盘IO,也减少了对磁盘空间的占用;
  2. 本地聚合后,下一个stage拉取的数据量变少,减少了网络传输的数据量;
  3. 本地聚合后,在reduce端进行数据缓存的内存占用减少;
  4. 本地聚合后,在reduce端进行聚合的数据量减少。

基于reduceByKey的本地聚合特征,我们应该考虑使用reduceByKey代替其他的shuffle算子,例如groupByKey。

groupByKey与reduceByKey的运行原理如下图1和图2所示:
groupByKey不会进行map端的聚合,而是将所有map端的数据shuffle到reduce端,然后在reduce端进行数据的聚合操作。由于reduceByKey有map端聚合的特性,使得网络传输的数据量减小,因此效率要明显高于groupByKey。
在这里插入图片描述

1.9 使用持久化+checkpoint

Spark持久化在大部分情况下是没有问题的,但是有时候会数据丢失,如果数据一旦丢失,就需要对丢失的数据重新及逆行计算,计算后再缓存和使用。为了避免数据的丢失,可以选择对RDD进行checkpoint,也就是将数据据持久化一份到容错的文件系统上(HDFS)

一个RDD缓存并checkpoint后,一旦发现缓存丢失,会优先查看checkpoint数据存不存在。如果有,就会使用checkpoint数据,而不用重新计算。也即是说,checkpoint可以视为cache的保障机制,如果cache失败,就使用checkpoint的数据。

使用checkpoint的优缺点

优点:提高Spark作业的可靠性,一旦缓存出现问题,不需要重新计算数据
缺点:checkpoint时需要将数据写入HDFS等文件系统,对性能消耗较大。

1.10 使用广播变量

默认情况下,task中的算子如果使用了外部变量,每个task会获得一份变量的副本,这就造成了内存的极大消耗。
另一方面,如果后续对RDD进行持久化,可能就无法将RDD写入内存,只能写入磁盘,磁盘IO将会严重消耗性能;
另一方面,task在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的GC,GC会导致工作线程停止,进而导致Spark暂停工作一段时间,严重影响Spark性能。

假设当前任务配置了20个Executor,指定500个task,有一个20M的变量被所有task共用,此时会在500个task中产生500个副本,耗费集群10G的内存,如果使用了广播变量, 那么每个Executor保存一个副本,一共消耗400M内存,内存消耗减少了25倍。


原理:
在初始阶段,广播变量只在Driver中有一份副本。
task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager中尝试获取变量,如果本地没有,BlockManager就会从Driver或者其他节点的BlockManager上远程拉取变量的复本,并由本地的BlockManager进行管理;
之后此Executor的所有task都会直接从本地的BlockManager中获取变量。

广播变量在每个Executor保存一个副本,此Executor的所有task共用此广播变量,这让变量产生的副本数量大大减少。

val 广播变量名= sc.broadcast(会被各个Task用到的变量,即需要广播的变量)
广播变量名.value//获取广播变量

1.11 使用Kryo序列化

默认情况下,Spark使用Java的序列化机制。
Java的序列化机制使用方便,不需要额外的配置,在算子中使用的变量实现Serializable接口即可,但是,Java序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大

Spark官方宣称Kryo序列化机制比Java序列化机制性能提高10倍左右,Spark之所以没有默认使用Kryo作为序列化类库,是因为它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便,但从Spark 2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs 已经默认使用Kryo序列化方式了

Kryo序列化注册方式代码如下:

public class MyKryoRegistrator implements KryoRegistrator{@Overridepublic void registerClasses(Kryo kryo){kryo.register(StartupReportLogs.class);}
}

配置Kryo序列化方式的代码如下:

//创建SparkConf对象
val conf = new SparkConf().setMaster().setAppName()
//使用Kryo序列化库
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  
//在Kryo序列化库中注册自定义的类集合
conf.set("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator"); 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/610135.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机科学速成课【学习笔记】(3)——布尔逻辑和逻辑门

本集课程的B站链接 3. 布尔逻辑 和 逻辑门-Boolean Logic & Logic Gates_哔哩哔哩_bilibili3. 布尔逻辑 和 逻辑门-Boolean Logic & Logic Gates是【计算机科学速成课】[40集全/精校] - Crash Course Computer Science的第3集视频&#xff0c;该合集共计40集&#xff…

当需要视频监控技术升级时,应该如何操作呢?

在当今社会&#xff0c;信息技术的飞速发展为我们提供了前所未有的安全和管理工具&#xff0c;其中视频监控系统无疑是其中一项引人注目的成果。 视频监控系统不仅提高了安全性和生产效率&#xff0c;同时也为社会的可持续发展提供了有力支持。 客户案例 工业生产管理优化 广…

将dumpbin从Visual Studio中抠出来,并使用dumpbin查看exe和dll库的依赖关系

目录 1、初步说明 2、在开发的机器上使用dumpbin工具查看dll库的依赖关系 3、将dumpbin.exe从Visual Studio中抠出来 3.1、找到dumpbin.exe文件及其依赖的dll文件 3.2、在cmd中运行dumpbin&#xff0c;提示找不到link.exe文件 3.3、再次运行dumpbin.exe提示找不到mspdb10…

了解DC电源模块的基本参数及选择方法

BOSHIDA 了解DC电源模块的基本参数及选择方法 DC电源模块是一种用来提供稳定直流电源的设备&#xff0c;常被应用在电子产品测试、实验室设备等领域。了解DC电源模块的基本参数和选择方法有助于正确选择和使用合适的模块。 1. 输出电压范围&#xff1a;DC电源模块通常有固定的…

mybatisplus快速入门-个人理解版

mybatisplus快速入门 1.快速入门1.1准备开发环境-idea2019.2.1版第一步&#xff1a;新建工程第二步&#xff1a;导入依赖 1.2创建数据库和表创建库表添加数据 1.3编写代码进行测试第一步&#xff1a;配置application.yml第二步&#xff1a;添加实体类第三步&#xff1a;添加map…

【算法】链表-20240109

这里写目录标题 一、141. 环形链表二、876. 链表的中间结点三、面试题 02.01. 移除重复节点 一、141. 环形链表 简单 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中…

Excel如何插入行?4个简单方法轻松完成!

“我在使用Excel表格工作时&#xff0c;需要插入一些行来填写数据。但是我不知道应该如何操作&#xff0c;有没有朋友可以支支招呀&#xff1f;” Excel是办公室工作中不可或缺的工具&#xff0c;它强大的数据处理能力使得工作变得高效。因此很多用户在使用电脑时或许都会用到E…

vue倒计时60秒改变按钮状态效果demo(整理)

你可以使用Vue的计时器和绑定状态的方法来实现这个功能。 首先&#xff0c;在data中添加一个计时器countdown&#xff0c;初始值为0。 data() {return {countdown: 0} }<template><div><button click"startCountdown" :disabled"countdown > …

基于Jackson封装的JSON、Properties、XML、YAML 相互转换的通用方法

文章目录 一、概述二、思路三、实现四、测试 一、概述 我们在 yaml转换成JSON、MAP、Properties 通过引入 实现了JSON、Properties、XML、YAML文件的相互转换&#xff0c;具体封装的类、方法如下&#xff1a; 上面的实现&#xff0c;定义了多个类、多个方法&#xff0c;使用…

FineBI实战项目一(2):案例架构说明及数据准备

1 系统架构 基于MySQL搭建数据仓库基于Kettle进行数据处理帆软FineBI基于MySQL搭建的数据仓库进行数据分析 2 数据流程图 通过Kettle将MySQL业务系统数据库中&#xff0c;将数据抽取出来&#xff0c;然后装载到MySQL数据仓库中。编写SQL脚本&#xff0c;对MySQL数据仓库中的数…

Qt之有趣的数字钟

一.效果 基于网络代码修改,支持时、分、秒;支持滑动、翻页和旋转。 二.实现 #include <QtCore> #include <QPainter> #include <QAction> #include <QWidget> #include <QMainWindow> #include <QTimer> #include <QKeyEvent> #…

MySQL性能调优---BKA

1.BKA原理介绍 MySQL 5.6版本开始增加了提高表join性能的Batched Key Access (BKA)算法。BKA是对于多表join语句&#xff0c;当MySQL使用索引访问第二个join表的时候&#xff0c;使用一个join buffer来收集第一个操作对象生成的相关列值。BKA构建好key后&#xff0c;批量传给引…

HTML5网站小游戏源码系统:各种各样小游戏集合,你想要的这里都有+完整的安装代码包以及搭建教程

现如今&#xff0c;科技的不断发展&#xff0c;HTML5技术逐渐成为网页游戏开发的主流。为了满足广大游戏爱好者的需求&#xff0c;罗峰给大家推荐一款基于HTML5的网站小游戏源码系统。这款系统集成了众多经典小游戏&#xff0c;涵盖了各种类型&#xff0c;无论您是寻找休闲益智…

0基础学习VR全景平台篇第136篇:720VR全景,认识无人机

上课&#xff01;全体起立~ 大家好&#xff0c;欢迎观看蛙色官方系列全景摄影课程&#xff01; 无人驾驶飞行器&#xff0c;简称“无人机”&#xff0c;英文缩写为“UAV”。是利用无线电控制设备和自备程序控制操纵的不载人飞机。 &#xff08;一&#xff09;无人机介绍 较…

Linux 网络层收发包流程及 Netfilter 框架浅析

1. 前言 本文主要对 Linux 系统内核协议栈中网络层接收&#xff0c;发送以及转发数据包的流程进行简要介绍&#xff0c;同时对 Netfilter 数据包过滤框架的基本原理以及使用方式进行简单阐述。 内容如有理解错误而导致说明错误的地方&#xff0c;还请指正。如存在引用而没有添…

Shopee买家通系统助力虾皮买手号轻松获取

Shopee买家通系统可以进行虾皮买手号的全自动注册。这款先进的软件目前覆盖了菲律宾、泰国、马来西亚、越南、巴西、印度尼西亚等多个国家&#xff0c;为用户提供了便捷、高效的注册途径。 想要注册虾皮买家号号&#xff0c;首先需要准备一个支持接收短信的手机号。因为虾皮买家…

MATLAB读取图片并转换为二进制数据格式

文章目录 前言一、MATLAB 文件读取方法1、文本文件读取2、二进制文件读取3、 图像文件读取4、其他文件读取 二、常用的图像处理标准图片链接三、MATLAB读取图片并转换为二进制数据格式1、matlab 源码2、运行结果 前言 本文记录使用 MATLAB 读取图片并转换为二进制数据格式的方…

React入门 - 04(从编写一个简单的 TodoList 说起)

继上一节我们已经对 React组件和 ”JSX语法“有了大概的了解&#xff0c;这一节我们继续在 react-demo这个工程里编写代码。这一节我们来简单实现一个 TodoList来更加了解编写组件的一些细节。 1、在编辑器中打开 react-demo这个工程 2、打开 index.js文件&#xff0c;将组件 …

Spark---RDD(Key-Value类型转换算子)

文章目录 1.RDD Key-Value类型1.1 partitionBy1.2 reduceByKey1.3 groupByKeyreduceByKey和groupByKey的区别分区间和分区内 1.4 aggregateByKey获取相同key的value的平均值 1.5 foldByKey1.6 combineByKey1.7 sortByKey1.8 join1.9 leftOuterJoin1.10 cogroup 1.RDD Key-Value…

鸿蒙南向开发—PWM背光(OpenHarmony技术)

背光驱动模型也是基于HDF框架开发的&#xff0c;整个框架如下&#xff1a; 现在以RK3568为例&#xff0c;来看看PWM背光整个驱动&#xff0c;这里使用的是PWM占空比控制的背光&#xff0c;默认基于hdf的pwm驱动已经OK&#xff01; 需要注意的是&#xff1a;这里是基于HDF实现的…