13.108.Spark 优化、Spark优化与hive的区别、SparkSQL启动参数调优、四川任务优化实践:执行效率提升50%以上

13.108.Spark 优化
1.1.25.Spark优化与hive的区别
1.1.26.SparkSQL启动参数调优
1.1.27.四川任务优化实践:执行效率提升50%以上

13.108.Spark 优化:

1.1.25.Spark优化与hive的区别

先理解spark与mapreduce的本质区别,算子之间(map和reduce之间多了依赖关系判断,即宽依赖和窄依赖。)
优化的思路和hive基本一致,比较大的区别就是mapreduce算子之间都需要落磁盘,而spark只有宽依赖才需要落磁盘,窄依赖不落磁盘。
在这里插入图片描述
在这里插入图片描述

1.1.26.SparkSQL启动参数调优

在这里插入图片描述

1)先对比结果:executors优化
Hive执行了30分钟(1800秒)的sql,没有优化过的SparkSQL执行需要,
最少化的Executor执行需要640秒(提高了Executor的并行度,牺牲了HDFS的吞吐量:5个core最合适),
最大化的Executor 281.634秒(最大限度的利用HDFS的吞吐量,牺牲Executor的并行度),
优化取中间值,253.189秒。

方案1:最少化 Fat executors

---------------------------------	Fat executors	--------------------------------------------------------------------------------
./spark-sql --master yarn \	# Fat executors (每个节点一个Executor)【优势:最佳吞吐量】
--num-executors 3 \			# 集群中的节点的数目 = 3
--executor-memory 30G \	# 每个节点的内存/每个节点的executor数目 = 30GB/1 = 30GB
--executor-cores 16 \		# 每个executor独占节点中所有的cores = 节点中的core的数目 = 16
--driver-memory 1G			# AM大约需要1024MB的内存和一个Executor
耗时:Time taken: 640 seconds

方案2:最大化Tiny executors

---------------------------------	Tiny executors	--------------------------------------------------------------------------------
./spark-sql --master yarn \	# Tiny executors [每个Executor一个Core]【优势:并行性】
--num-executors 48 \		# 集群中的core的总数 = 每个节点的core数目 * 集群中的节点数 = 16*3
--executor-memory 1.6G \	# 每个节点的内存/每个节点的executor数目 = 30GB/16 = 1.875GB
--executor-cores 1 \			# 每个executor一个core
--driver-memory 1G			# AM大约需要1024MB的内存和一个Executor
耗时:Time taken: 281.634 seconds
executor并发度只有45,task的并发度,1个executor 50左右,总数 18382

方案3:折中方案

---------------------------------	Balance between Fat (vs) Tiny	--------------------------------------------------------------------------------
./spark-sql --master yarn \	# Balance between Fat (vs) Tiny
--num-executors 8 \			# (16-1)*3/5 = 9 留一个executor给ApplicationManager => --num-executors = 9-1 = 8# 每个节点的executor的数目 = 9 / 3 = 3
--executor-memory 10G \	# 每个executor的内存 = 30GB / 3 = 10GB【默认分配的是8G,需要修改配置文件支持到10G。】# 计算堆开销 = 7% * 10GB = 0.7GB。因此,实际的 --executor-memory = 10 - 0.7 = 9.3GB
--executor-cores 5 \			# 给每个executor分配5个core,保证良好的HDFS吞吐。# 每个节点留一个core给Hadoop/Yarn守护进程 => 每个节点可用的core的数目= 16 - 1
--driver-memory 1G			
耗时:Time taken: 253 seconds

Task并行度优化
1.调整 Executors 下 每个stage的默认task数量,即设置Task 的并发度:

【当集群数量比较大时】
很多人常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,
!【默认是一个HDFS block对应一个task(如果不设置那么可以通过第三种方案来优化!)】。
通常来说,Spark默认设置的数量是偏少的(比如就几十个task),
如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。
试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,
那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!
因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,
比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

30 G 16 core

/home/admin/bigdata/spark-2.2.0-bin-hadoop2.6/bin/spark-sql \
--master yarn \
--num-executors 16 \
--executor-memory 1G \
--executor-cores 10 \
--driver-memory 1G \
--conf spark.default.parallelism=450 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3	

1.1.27.四川任务优化实践:执行效率提升50%以上

一、四川的信息
账号:xxxxxx 密码: xxxxxxxx

一、事实表优化
1、**优化结果: 20 分钟左右,优化完成后 5 分钟左右。**数据量:5.8亿

2、原SQL:(spark不一定快)

drop table if exists dc_f_organization;
create table if not exists dc_f_organization (orgid int,orgcode string,yearmonth string ,zzdate string,orgname string,orglevel int,id int,orgtagging int, createdate timestamp
);insert into dc_f_organization
select a.orgid, .orgcode, a.yearmonth, a.zzdate, n.orgname, n.orglevel, n.id, n.orgtagging, n.createdate
from ( select o.id orgid, o.orgcode, d.yearmonth, d.zzdate from dc_d_organization o, dc_d_wddate ) aleft join dc_d_organization n on to_date(n.createdate)=a.zzdate and n.orgcode = a.orgcode;

3、优化方案:
– ############################## HIVE 执行:增加 block 的数量,提高Spark的并发度(当前任务文件比较小,设置了26;一般参考数量:300左右;) #################################
– (1) 单独执行笛卡尔积,
– 先拆分文件:(改用hive,拆分文件,增加并行度)
– 【耗时:101.586 seconds;结果文件数量 26】
– 检查文件块数量:hadoop fs -ls /user/hive/warehouse/test.db/dc_d_org_date 26 个block

set mapreduce.map.memory.mb=1024;
set mapred.max.split.size=524288;
set mapred.min.split.size.per.node=524288;
set mapred.min.split.size.per.rack=524288;
drop table if exists dc_d_org_date;
create table dc_d_org_date as select o.id orgid,o.orgcode,d.yearmonth,d.zzdate from dc_d_organization o CROSS JOIN dc_d_wddate d;
-- ##############################	SPARK 执行;参数:spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3G	#################################
-- (2)【Spark:Time taken: 115.78 seconds;】
set spark.shuffle.consolidateFiles=true;
drop table if exists dc_f_organization;
create table if not exists dc_f_organization
(orgid int,orgcode string,YEARMONTH string ,ZZDATE string,ORGNAME string,orglevel int,id int,ORGTAGGING int, createdate timestamp);insert into dc_f_organization
select a.orgid,a.orgcode,a.YEARMONTH,a.ZZDATE,n.ORGNAME,n.orglevel,n.id,n.ORGTAGGING,n.createdate
from dc_d_org_date a
left join DC_D_ORGANIZATION n on to_date(n.CREATEDATE)=a.ZZDATE and n.orgcode = a.orgcode;

– ############################## 持续优化方向:将上述两者合并到一起在 spark 中执行 ##############################
问题:可能是因为文件太小,spark 分区命令没有生效。set spark.sql.shuffle.partitions=300;
注意:SPARK中笛卡尔积需要改成 CROSS JOIN,否则语法报错。

二、优化CUBE表

  1、优化结果:原来1小时左右,优化后26分钟。总结:shuffle时间:16分钟,数据量	35.2亿任务含有宽依赖(group)被分成2个stage✔采用方案 1:改用spark执行。提高并行度。执行参数:spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3Gstage 1 执行时间:11(partitions=300)stage 2 执行时间:15(partitions=200)设置分区数量,默认是200set spark.sql.shuffle.partitions=300;(理论上可以提高 stage 2 30%的速度,实际运行的时候可能会丢失executor,运行不稳定,不建议设置。)(原因可能是设置了虚拟核心数量。)方案 2:将case when的操作独立出一张表,去除部分重复扫描计算,减少cube阶段的计算量。抽取的时间增加了2分钟,节省的 shuffle 时间也是2分钟。没有意义。预处理时间:2-3分钟stage 1 执行时间:11stage 2 执行时间:13(节省的时间也是2-3分钟)方案 3:提高 shuffle 使用内存的占比 设置为60%执行参数:spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3G --conf spark.storage.memoryFraction=0.3 --conf spark.shuffle.memoryFraction=0.5执行结果:效果不明显,多次执行时间也不太一致。方案 4:减少CUBE的维度数量, orgid 和 orgcode是一对一关系,可以去掉1个维度,计算完成之后再join执行结果:join 消耗的时间更久。2、采用的方案1:SPARK执行-- 执行参数 spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3G-- set spark.sql.shuffle.partitions=300;drop table  if  exists dc_c_organization;create table if not exists dc_c_organization(YEARMONTH string,ZZDATE string,orgid int ,orgcode string,total int,provinceNum int,cityNum int,districtNum int, newDistrictNum int,townNum int,streetNum int,otherNum int,communityNum int,villageNum int,gridNum int);-- 如果用 hive 执行可以开启 combiner,map端先预聚合,减少reduce端的数据量和计算量,减少磁盘的IO和网络传输时间。-- set hive.map.aggr = true;-- set hive.groupby.mapaggr.checkinterval = 10000;-- ##############################	SPARK	#################################-- set spark.sql.shuffle.partitions=300;insert into dc_c_organizationselect  n.YEARMONTH,n.ZZDATE,n.orgid,n.orgcode,count(n.id) total,nvl(SUM(case when pt.displayname='省' then 1  else 0 end),0) AS provinceNum,nvl(SUM(case when pt.displayname='市' then 1  else 0 end),0) as cityNum,nvl(SUM(case when pt.displayname='县(区)' then 1  else 0 end),0) AS districtNum,(nvl(SUM(case when pt.displayname='县(区)'  then 1  else 0 end),0) -nvl(SUM(case when pt.displayname='县(区)' AND n.ORGTAGGING= 31 then 1  else 0 end),0)) as newDistrictNum,nvl(SUM(case when  ((n.ORGNAME LIKE '%乡%' OR n.ORGNAME LIKE '%镇%' OR n.ORGNAME LIKE '%乡镇%')) AND pt.displayname='乡镇(街道)' then 1  else 0 end),0) townNum,nvl(SUM(case when (n.ORGNAME LIKE '%街道%') AND pt.displayname='乡镇(街道)' then 1  else 0 end),0) streetNum,(nvl(SUM(case when pt.displayname='乡镇(街道)'then 1  else 0 end),0)-nvl(SUM(case when ((n.ORGNAME LIKE '%乡%' OR n.ORGNAME LIKE '%镇%' OR n.ORGNAME LIKE '%乡镇%') ) AND pt.displayname='乡镇(街道)' then 1  else 0 end),0)-nvl(SUM(case when (n.ORGNAME LIKE '%街道%' )  AND pt.displayname='乡镇(街道)' then 1  else 0 end),0)) otherNum,(nvl(SUM(case when pt.displayname='村(社区)' then 1  else 0 end),0)-nvl(SUM(case when ((n.ORGNAME LIKE '%村' OR n.ORGNAME LIKE '%村民委员会' OR n.ORGNAME LIKE '%农村工作中心站' OR n.ORGNAME LIKE '%村委会')) AND pt.displayname='村(社区)' then 1  else 0 end),0)) communityNum,nvl(SUM(case when ((n.ORGNAME LIKE '%村' OR n.ORGNAME LIKE '%村民委员会' OR n.ORGNAME LIKE '%农村工作中心站' OR n.ORGNAME LIKE '%村委会')) AND pt.displayname='村(社区)' then 1  else 0 end),0) villageNum,nvl(SUM(case when pt.displayname='片组片格'then 1  else 0 end),0) gridNumfrom dc_f_organization nleft join dc_d_property pt on n.orglevel = pt.idGROUP BY n.YEARMONTH,n.ZZDATE,n.orgid,n.orgcodeWITH CUBE;3、优化方案2:从业务逻辑上进行优化。(发现SQL逻辑中存在重复的计算)-- ############################	预处理:去除重复计算和减少CUBE的计算量	############################drop table if exists temp_dc_c_organization;create table temp_dc_c_organizationas selectn.yearmonth,n.zzdate,n.orgid,n.orgcode,n.id as id,case when pt.displayname='省' then 1  else 0 end as provincenum,case when pt.displayname='市' then 1  else 0 end as citynum,case when pt.displayname='县(区)' then 1  else 0 end as districtnum,case when pt.displayname='县(区)' and n.orgtagging= 31 then 1  else 0 end as old_districtnum,
【重复1case when ((n.orgname like '%乡%' or n.orgname like '%镇%' or n.orgname like '%乡镇%')) and pt.displayname='乡镇(街道)' then 1  else 0 end townnum,
【重复2case when (n.orgname like '%街道%') and pt.displayname='乡镇(街道)' then 1  else 0 end streetnum,case when pt.displayname='乡镇(街道)'then 1  else 0 end as total_streetnum_01,
【重复1case when ((n.orgname like '%乡%' or n.orgname like '%镇%' or n.orgname like '%乡镇%')) and pt.displayname='乡镇(街道)' then 1  else 0 end as total_streetnum_02,
【重复2case when (n.orgname like '%街道%') and pt.displayname='乡镇(街道)' then 1  else 0 end as total_streetnum_03,case when pt.displayname='村(社区)' then 1  else 0 end as communitynum_01,
【重复3case when ((n.orgname like '%村' or n.orgname like '%村民委员会' or n.orgname like '%农村工作中心站' or n.orgname like '%村委会')) and pt.displayname='村(社区)' then 1  else 0 end as communitynum_02,
【重复3case when ((n.orgname like '%村' or n.orgname like '%村民委员会' or n.orgname like '%农村工作中心站' or n.orgname like '%村委会')) and pt.displayname='村(社区)' then 1  else 0 end villagenum,case when pt.displayname='片组片格'then 1  else 0 end gridnumfromdc_f_organization nleft join dc_d_property pt on n.orglevel = pt.id;-- ############################	CUBE:节省的时间相当于预处理的时间。############################create table dc_c_organization_02as select  yearmonth,zzdate,orgid,count(id) total,sum(provincenum) as provincenum,sum(citynum) as citynum,sum(districtnum) as districtnum,sum(districtnum)-sum(old_districtnum) as newdistrictnum,sum(townnum) townnum,sum(streetnum) streetnum,sum(total_streetnum_01)-sum(townnum)-sum(streetnum) othernum,sum(communitynum_01)-sum(villagenum) communitynum,sum(villagenum) villagenum,sum(gridnum) gridnumfrom temp_dc_c_organization as ngroup by yearmonth, zzdate, orgid with cube;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/67079.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【高性能计算】opencl语法及相关概念(四):结合opencv进行图像高斯模糊处理

目录 高斯模糊简介主函数:host端设备端函数:mywork.cl效果图对比 高斯模糊简介 高斯模糊是一种常用的图像处理技术,用于减少图像中的噪点和细节,并实现图像的平滑效果。它是基于高斯函数的卷积操作,通过对每个像素周围…

使用acme,自动续签免费的SSL,无忧http升级https

使用acme自动续签免费的SSL 安装acme.sh颁发域名将证书安装到nginx下配置nginx的ssl自动续签 这里只进行最简单的操作 安装acme.sh 进入你的用户目录,如果你使用root登陆,那么你的用户目录就是 /root/ curl https://get.acme.sh | sh -s emailmyexam…

Linux环境基础开发工具

xshellssh xshell--充当客户端,提供远程登录服务 yum 背景知识 在Linux下安装软件, 一个通常的办法是下载到程序的源代码, 并进行编译, 得到可执行程序. 但是这样太麻烦了, 于是有些人把一些常用的软件提前编译好, 做成软件包(可以理解成windows上的安装程序)放…

Elasticsearch:利用矢量搜索进行音乐信息检索

作者:Alex Salgado 欢迎来到音乐信息检索的未来,机器学习、矢量数据库和音频数据分析融合在一起,带来令人兴奋的新可能性! 如果你对音乐数据分析领域感兴趣,或者只是热衷于技术如何彻底改变音乐行业,那么本…

隧道结构健康监测系统,保障隧道稳定安全运行

隧道是地下隐蔽工程,会受到潜在、无法预知的地质因素影响,早期修建的隧道经常出现隧道拱顶开裂、地表沉降、隧道渗漏水、围岩变形、附近建筑物倾斜等隧道的健康问题变得日益突出,作为城市生命线不可或缺的一部分,为了确保隧道工程…

GraphQL渗透测试案例及防御办法

什么是GraphQL GraphQL 是一种 API 查询语言,旨在促进客户端和服务器之间的高效通信。它使用户能够准确指定他们在响应中所需的数据,从而有助于避免有时使用 REST API 看到的大型响应对象和多个调用。 GraphQL 服务定义了一个合约,客户端可…

计算机视觉与人工智能在医美人脸皮肤诊断方面的应用

一、人脸皮肤诊断方法 近年来,随着计算机技术和人工智能的不断发展,中医领域开始逐渐探索利用这些先进技术来辅助面诊和诊断。在皮肤望诊方面,也出现了一些现代研究,尝试通过图像分析技术和人工智能算法来客观化地获取皮肤相关的…

【工作笔记-0038】mongodb mongorestore 命令行导入 bson.gz数据

1. 导出的集合文件格式如下(也就是导出的表文件): 例如: D:\Files\xxxx集合名称.bson.gz 怎样导出,这里不做介绍,用 mongodb compass 或者 studio 3t 都可以 2. 下载命令行导入工具: 官方…

ZLMeidaKit在Windows上启动时:计算机中丢失MSVCR110.dll,以及rtmp推流后无法转换为flv视频流解决

场景 ZLMediaKit在Windows上实现Rtmp流媒体服务器以及模拟rtmp推流和http-flv拉流播放: ZLMediaKit在Windows上实现Rtmp流媒体服务器以及模拟rtmp推流和http-flv拉流播放_zlm流媒体服务器_霸道流氓气质的博客-CSDN博客 按照以上教程启动MediaServer.exe时提示&am…

Docker Storage

文章目录 存储持久化存储类型Volumes使用场景管理Volume挂载Volume备份恢复Volume Bind mounts使用场景挂载bind tmpfs挂载tmpfs 存储持久化 在容器中所有创建的文件都是存储在容器可写层 当容器不存在后数据不会持久化,并且如果另一个进程需要数据,很…

在 Amazon 搭建无代码可视化的数据分析和建模平台

现代企业常常会有利用数据分析和机器学习帮助解决业务痛点的需求。如制造业中,利用设备采集上来的数据做预测性维护,质量控制;在零售业中,利用客户端端采集的数据做渠道转化率分析,个性化推荐等。 亚马逊云科技开发者…

HTML5

写在前面 一、简单认识HTML 1.1 什么是网页【2023/08/31】 网站是指因特网上根据一定的规则,使用HTML等制作的用于展示特定内容相关的网页集合。 网页是网站中的一“页”,通常是HTML格式的文件,它要通过浏览器来阅读。 网页是构成网站的…

【微服务】服务发现和管理技术框架选型调研

选型背景 方案对比 结论 结合实际业务和开发需要,着重考虑性能可靠性、功能和社区支持程度三方面,认为Nacos更适合作为服务发现和管理的技术框架。具体理由如下: 性能更好,可靠性更高 经过阿里、APISIX、SpringCloudAlibaba,阿…

华为数通方向HCIP-DataCom H12-821题库(单选题:201-220)

第201题 BGP 协议用​​ beer default-route-advertise​​ 命令来给邻居发布缺省路由,那么以下关于本地 BGP 路由表变化的描述,正确的是哪一项? A、在本地 BGP 路由表中生成一条活跃的缺省路由并下发给路由表 B、在本地 BGP 路由表中生成一条不活跃的缺省路由&…

基于Citespace、vosviewer、R语言的文献计量学可视化分析技术及全流程文献可视化SCI论文高效写作

文献计量学是指用数学和统计学的方法,定量地分析一切知识载体的交叉科学。它是集数学、统计学、文献学为一体,注重量化的综合性知识体系。特别是,信息可视化技术手段和方法的运用,可直观的展示主题的研究发展历程、研究现状、研究…

vr健康管理服务情景化教学弥补现代医学教学中的诸多不足之处

高职高专临床医学院校以培养岗位胜任力为目的,该专业是一门专业性、实践性较强的医学学科,要求培养出来的学生具有较强的临床实践能力,医学生所学的全部知识,都应与实践相结合,解决临床的实际问题,为患者解…

ArrayList、LinkedList、Collections.singletonList、Arrays.asList与ImmutableList.of

文章目录 ListArrayListLinkedListArrayList与LinkedList的区别快速构建list集合Collections.singletonListArrays.asListImmutableList.of Java集合类型有三种:set(集)、list(列表)和map(映射),而List集合是很常用的一种集合类型, List 我…

2023年MySQL-8.0.34保姆级安装教程

重点放前面:演示环境为windows环境。 MySQL社区版本安装教程如下: 一、MySQL安装包下载二、安装配置设置三、配置环境变量 大体分为3个步骤:①安装包的下载;②安装配置设置;③配置环境变量 一、MySQL安装包下载 下载官…

架构设计基础设施保障IaaS存储

目录 1. 云硬盘2. 对象存储3. 表单上传案例4. 服务上传验证5. 云数据库6. 云数据库操作7. 服务连接云数据库8. 新一代原生数据库9 阿里云PolarDB生产最佳实践 1. 云硬盘 HDD(普通云盘) 特征: 性能一般, IOPS大概在数百左右。 应…

江苏移动基于OceanBase稳步创新推进核心数据库分布式升级

*本文首发自《中国电信业》 数字经济时代,数据库作为企业核心数据存储、处理、挖潜等方面的关键载体,重要性日益凸显。对于运营商而言,数据库具有行业用户数量多、访问数量多、业务复杂度高、数据安全性高、响应要求性高以及需要 7*24 小时服…