【pyspark速成专家】11_Spark性能调优方法2

目录

​编辑

二,Spark任务UI监控

三,Spark调优案例


二,Spark任务UI监控

Spark任务启动后,可以在浏览器中输入 http://localhost:4040/ 进入到spark web UI 监控界面。

该界面中可以从多个维度以直观的方式非常细粒度地查看Spark任务的执行情况,包括任务进度,耗时分析,存储分析,shuffle数据量大小等。

最常查看的页面是 Stages页面和Excutors页面。

Jobs: 每一个Action操作对应一个Job,以Job粒度显示Application进度。有时间轴Timeline。

Stages: Job在遇到shuffle切开Stage,显示每个Stage进度,以及shuffle数据量。可以点击某个Stage进入详情页,查看其下面每个Task的执行情况以及各个partition执行的费时统计。

Storage:

监控cache或者persist导致的数据存储大小。

Environment:
显示spark和scala版本,依赖的各种jar包及其版本。

Excutors : 监控各个Excutors的存储和shuffle情况。

SQL: 显示各种SQL命令在那些Jobs中被执行。

三,Spark调优案例

下面介绍几个调优的典型案例:

1,资源配置优化

2,利用缓存减少重复计算

3,数据倾斜调优

4,broadcast+map代替join

5,reduceByKey/aggregateByKey代替groupByKey

1,资源配置优化

下面是一个资源配置的例子:

优化前:

#提交python写的任务
spark-submit --master yarn \
--deploy-mode cluster \
--executor-memory 12G \
--driver-memory 12G \
--num-executors 100 \
--executor-cores 8 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.task.maxFailures=10 \
--conf spark.stage.maxConsecutiveAttempts=10 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python  #cluster模式时候设置
--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
--files  data.csv,profile.txt
--py-files  pkg.py,tqdm.py
pyspark_demo.py 

优化后:这里主要减小了 executor-cores数量,一般设置为1~4,过大的数量可能会造成每个core计算和存储资源不足产生OOM,也会增加GC时间。 此外也将默认分区数调到了1600,并设置了2G的堆外内存。

#提交python写的任务
spark-submit --master yarn \
--deploy-mode cluster \
--executor-memory 12G \
--driver-memory 12G \
--num-executors 100 \
--executor-cores 2 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.default.parallelism=1600 \
--conf spark.sql.shuffle.partitions=1600 \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=2g\
--conf spark.task.maxFailures=10 \
--conf spark.stage.maxConsecutiveAttempts=10 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python  #cluster模式时候设置
--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
--files  data.csv,profile.txt
--py-files  pkg.py,tqdm.py
pyspark_demo.py 

2, 利用缓存减少重复计算

%%time
# 优化前:
import math 
rdd_x = sc.parallelize(range(0,2000000,3),3)
rdd_y = sc.parallelize(range(2000000,4000000,2),3)
rdd_z = sc.parallelize(range(4000000,6000000,2),3)
rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x))
s = rdd_data.reduce(lambda a,b:a+b+0.0)
n = rdd_data.count()
mean = s/n 
print(mean)%%time 
# 优化后: 
import math 
from  pyspark.storagelevel import StorageLevel
rdd_x = sc.parallelize(range(0,2000000,3),3)
rdd_y = sc.parallelize(range(2000000,4000000,2),3)
rdd_z = sc.parallelize(range(4000000,6000000,2),3)
rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x)).persist(StorageLevel.MEMORY_AND_DISK)s = rdd_data.reduce(lambda a,b:a+b+0.0)
n = rdd_data.count()
mean = s/n 
rdd_data.unpersist()
print(mean)

3, 数据倾斜调优

%%time 
# 优化前: 
rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)
rdd_word = rdd_data.flatMap(lambda x:x.split(" "))
rdd_one = rdd_word.map(lambda x:(x,1))
rdd_count = rdd_one.reduceByKey(lambda a,b:a+b+0.0)
print(rdd_count.collect()) %%time 
# 优化后: 
import random 
rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)
rdd_word = rdd_data.flatMap(lambda x:x.split(" "))
rdd_one = rdd_word.map(lambda x:(x,1))
rdd_mid_key = rdd_one.map(lambda x:(x[0]+"_"+str(random.randint(0,999)),x[1]))
rdd_mid_count = rdd_mid_key.reduceByKey(lambda a,b:a+b+0.0)
rdd_count = rdd_mid_count.map(lambda x:(x[0].split("_")[0],x[1])).reduceByKey(lambda a,b:a+b+0.0)
print(rdd_count.collect())  #作者按:此处仅示范原理,单机上该优化方案难以获得性能优势

4, broadcast+map代替join

该优化策略一般限于有一个参与join的rdd的数据量不大的情况。

%%time 
# 优化前:rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")])
rdd_students = rdd_age.join(rdd_gender).map(lambda x:(x[0],x[1][0],x[1][1]))print(rdd_students.collect())%%time # 优化后:
rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")],2)
ages = rdd_age.collect()
broads = sc.broadcast(ages)def get_age(it):result = []ages = dict(broads.value)for x in it:name = x[0]age = ages.get(name,0)result.append((x[0],age,x[1]))return iter(result)rdd_students = rdd_gender.mapPartitions(get_age)print(rdd_students.collect())

5,reduceByKey/aggregateByKey代替groupByKey

groupByKey算子是一个低效的算子,其会产生大量的shuffle。其功能可以用reduceByKey和aggreagateByKey代替,通过在每个partition内部先做一次数据的合并操作,大大减少了shuffle的数据量。

%%time 
# 优化前:
rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),("class1","Ann"),("class1","Jim"),("class2","Lily")])
rdd_names = rdd_students.groupByKey().map(lambda t:(t[0],list(t[1])))
names = rdd_names.collect()
print(names)%%time 
# 优化后:
rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),("class1","Ann"),("class1","Jim"),("class2","Lily")])
rdd_names = rdd_students.aggregateByKey([],lambda arr,name:arr+[name],lambda arr1,arr2:arr1+arr2)names = rdd_names.collect()
print(names)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/17862.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VSCODE中F12无法跳转,快捷键设置F12和insert混淆了

异常现象 最近用新电脑(华为)的时候,发现VSCODE经常按F12无法跳转,在快捷键设置当中,也是设置成功的; 此时重新去快捷键设置,会发现按 F12变为了Insert 解决方法 华为笔记本的Fx按键&#x…

LeetCode/NowCoder-栈和队列OJ练习

孜孜不倦:孜孜:勤勉,不懈怠。指工作或学习勤奋不知疲倦。💓💓💓 目录 说在前面 题目一:括号匹配问题 题目二:用队列实现栈 题目三:用栈实现队列 题目四:设…

这款网站测试工具,炫酷且强大!【送源码】

随着互联网的普及和发展,Web 应用程序的数量也越来越多,各种网络问题也是层出不穷,因而监测这些 Web 应用程序的性能和可用性变得非常重要。 今天的文章,了不起和大家分享一款十分好用的的网站分析项目 - Web-Check。 项目简介 …

华为WLAN实验继续-2,多个AP如何部署

----------------------------------------如果添加新的AP,如何实现多AP的服务----------- 新增加一个AP2启动之后发现无法获得IP地址 在AP2上查看其MAC地址,并与将其加入到AC中去 打开AC,将AP2的MAC加入到AC中 sys Enter system view, re…

HttpClient cookie爬虫记录

记录一次java语言使用httpclient爬取网站接口数据的经历 需要用到的依赖&#xff1a; httpclient和httpcore是封装了http请求的工具类 jsoup可以将返回的网页html找到你需要的xml节点&#xff0c;很方便 <dependency><groupId>org.apache.httpcomponents</gr…

Java面试八股之++操作符是线程安全的吗

操作符是线程安全的吗 操作符本身在Java中并不是线程安全的。这个操作实际上包含三个步骤&#xff1a;读取变量的值、将值加1、然后将新值写回内存。在多线程环境下&#xff0c;如果多个线程同时对同一个变量执行操作&#xff0c;就可能出现竞态条件&#xff08;race conditio…

中科驭数驭云、超低时延网络案例双双入选第七届数字中国建设峰会数字化转型典型应用案例

5月24日-25日&#xff0c;第七届数字中国建设峰会在福州召开。在“数字赋能民营经济专业工作会议”上&#xff0c;中关村云计算产业联盟发布了《2024中小企业数字化转型典型应用案例集》&#xff0c;中科驭数驭云、超低时延网络两大方案入选。 作为国内领先的DPU芯片及解决方案…

vue中在mounted使用$refs获取不到DOM元素

vue中在mounted使用$refs获取不到DOM元素 前言解决方案1、通过使用$nextTick来获取2、updated中获取 前言 在使用ref的时候&#xff0c;在mounted中通过$ref获取节点是获取不到报undefined this.$refs.xx 为 undefined 解决方案 在mounted钩子中加载回来的数据不会在这个阶段更…

商品上线搜索服务

文章目录 1.引入检索页面1.确保search目录和list.html都成功引入2.修改list.html&#xff0c;增加命名空间3.后端编写接口 SearchController.java4.测试访问 2.带条件分页检索1.前端要求返回数据的格式2.构建vo&#xff0c;SearchResult.java3.SkuInfoService.java 购买用户根据…

电脑无法远程桌面连接,关于电脑无法建立远程桌面连接的问题分析与解决方案

在信息化快速发展的今天&#xff0c;远程桌面连接已成为许多企业和个人用户进行远程办公、技术支持以及数据管理的必备工具。然而&#xff0c;当电脑无法建立远程桌面连接时&#xff0c;可能会对用户的工作和日常生活造成极大的不便。本文将深入分析电脑无法远程桌面连接的原因…

详解Java ThreadLocal

个人博客 详解Java ThreadLocal | iwts’s blog Java ThreadLocal ThreadLocal提供了线程内存储变量的能力&#xff0c;这些变量不同之处在于每一个线程读取的变量是对应的互相独立的。通过get和set方法就可以得到当前线程对应的值。 TreadLocal存储模型 ThreadLocal的静态…

Oracle EBS API创建AP发票报错:ZX_TAX_STATUS_NOT_EFFECTIVE和ZX_REGIME_NOT_EFF_IN_SUBSCR-

背景 由创建国外业务实体财务未能提供具体国家地区会计税制&#xff0c;而是实施人员随便选择其它国外国家地区会计税制。导致客户化创建AP发票程序报错&#xff1a;UNEXPECTED TAX ERROR-导入时出现意外的税务错误ZX_TAX_STATUS_NOT_EFFECTIVE-ZX_REGIME_NOT_EFF_IN_SUBSCR-ZX…

如何克隆非默认分支

直接git clone下来的我们知道是默认分支&#xff0c;那如何克隆其他分支呢&#xff1a; 比如这个&#xff0c;我们想克隆AdvNet。 我们可以在本地文件夹打开Git Bash 依次输入&#xff1a; git clone --branch AdvNet https://github.com/wgcban/SemiCD.git cd SemiCD git b…

计算机操作系统体系结构

我是荔园微风&#xff0c;作为一名在IT界整整25年的老兵&#xff0c;今天给大家讲讲操作系统。 当今的操作系统趋向于越来越复杂&#xff0c;因为它们提供许多服务&#xff0c;并支持各种硬件和软件资源&#xff08;请参见“操作系统思想&#xff1a;尽量保持简单”&#xff0…

267 基于matlab的信号处理GUI人机交互

基于matlab的信号处理GUI人机交互&#xff0c;利用GUI功能完成包括振幅调制AM&#xff08;Amplitude Modulation&#xff09;&#xff0c;双边带调幅信号DSB&#xff08;double sideband&#xff09;&#xff0c;单边带信号SSB&#xff08;single sideband &#xff09;&#x…

《TCP/IP网络编程》(第十一章)进程间通信

进程间通信意味着两个不同的进程间可以交换数据&#xff0c;它使得不同的进程能够协同工作&#xff0c;实现复杂的系统功能。 1.通过管道实现进程间通信 下图是基于 管道&#xff08;PIPE&#xff09; 的进程间通信结构模型 管道不属于进程的资源&#xff0c;属于操作系统的资…

数字化工厂怎么收集,处理数据?

数字化工厂的数据收集与处理 数字化工厂是现代化工厂&#xff0c;利用数字技术和数据分析提高效率和优化流程。数据分析作为数字化工厂的核心技术&#xff0c;对数据的获取与处理至关重要。在数字化工厂中&#xff0c;数据的来源包括企业内部信息系统、物联网信息以及外部信息&…

美光EMMC芯片丝印型号查询 8LK17/D9PSK, OXA17/JY997

问题说明 最近在使用美光EMMC的时候&#xff0c;发现通过芯片丝印查询不到 芯片的规格说明书&#xff1b; 经过查阅资料&#xff0c;发现美光的EMMC芯片 “由于空间限制&#xff0c;FBGA 封装组件具有与部件号不同的缩写部件标记”&#xff0c;需要通过官网查询丝印的FBGA cod…

Autosar Dcm配置-特定NRC实现方式-基于ETAS软件

文章目录 前言工具配置代码编写总结 前言 项目开发过程中&#xff0c;诊断服务一般客户需求或系统需求都会有特定NRC(一般为NRC22-条件不满足)&#xff0c;也就会有特定的条件&#xff0c;需要手动加代码实现。本文介绍ETAS工具中配置的接口及简单实现。 工具配置 对于每一个…