【spark operator】spark operator动态分配executor

背景:

之前在使用spark operator的时候必须指定executor的个数,在将任务发布到spark operator后,k8s会根据指定的个数启动executor,但是对于某些spark sql可能并不需要用到那么多executor,在此时executor的数量就不好控制了。而executor的多少代表了集群资源的多少,如果不提前指定,executor能够动态扩展那将是最好的策略。在查询了资料后,得知spark3.0已经支持了executor的动态分配。而且用法也很简单,所以在之前的spark operator发布k8s的基础上,又做了动态生成executor的功能。

本文参考之前一篇的文章,做了部分修改以支持该功能。

【Java Kubernates】Java调用kubernates提交Yaml到SparkOperator-CSDN博客文章浏览阅读1.1k次,点赞18次,收藏18次。最终我选择了fabric8io,因为我们需要使用k8s的自定义资源sparkApplication,对于自定义资源,kubernetes-client/java需要创建各个k8s对象的pojo,比较麻烦。这里提一下,我在重新使用spark operator的时候,发现原来官方的google的spark operator镜像已经不能拉取了,貌似是google发现它的两个镜像存在漏洞,所以关闭了开源镜像。目前查询框架使用的是trino,但是trino也有其局限性,需要准备一个备用的查询框架。https://blog.csdn.net/w8998036/article/details/135821058?spm=1001.2014.3001.5501

一 删除yaml中executor指定个数的配置

//测试spark 3.0的动态分配instanceprivate static String buildSparkApplicationYAMLDynamic(String taskName, String sparkImage, String sparkJarFile, String mainClass, String instance,String driverCpu, String driverMemory, String executorCpu, String executorMemory, String dynamicSQLQuery) {return String.format("apiVersion: \"sparkoperator.k8s.io/v1beta2\"\n" +"kind: SparkApplication\n" +"metadata:\n" +"  name: %s\n" +"  namespace: spark-app\n" +"spec:\n" +"  type: Scala\n" +"  mode: cluster\n" +"  image: \"%s\"\n" +"  imagePullPolicy: Always\n" +"  imagePullSecrets: [\"harbor\"]\n" +"  mainClass: \"%s\"\n" +"  mainApplicationFile: \"%s\"\n" +"  sparkVersion: \"3.3.1\"\n" +"  restartPolicy:\n" +"    type: Never\n" +"  volumes:\n" +"    - name: nfs-spark-volume\n" +"      persistentVolumeClaim:\n" +"        claimName: sparkcode\n" +"  driver:\n" +"    cores: %s\n" +"    coreLimit: \"1200m\"\n" +"    memory: \"%s\"\n" +"    labels:\n" +"      version: 3.3.1\n" +"    serviceAccount: spark-svc-account\n" +"    volumeMounts:\n" +"      - name: nfs-spark-volume\n" +"        mountPath: \"/app/sparkcode\"\n" +"    env:\n" +"      - name: AWS_REGION\n" +"        valueFrom:\n" +"          secretKeyRef:\n" +"            name: minio-secret\n" +"            key: AWS_REGION\n" +"      - name: AWS_ACCESS_KEY_ID\n" +"        valueFrom:\n" +"          secretKeyRef:\n" +"            name: minio-secret\n" +"            key: AWS_ACCESS_KEY_ID\n" +"      - name: AWS_SECRET_ACCESS_KEY\n" +"        valueFrom:\n" +"          secretKeyRef:\n" +"            name: minio-secret\n" +"            key: AWS_SECRET_ACCESS_KEY\n" +"      - name: MINIO_ENDPOINT\n" +"        valueFrom:\n" +"          secretKeyRef:\n" +"            name: minio-secret\n" +"            key: MINIO_ENDPOINT\n" +"      - name: MINIO_HOST\n" +"        valueFrom:\n" +"          secretKeyRef:\n" +"            name: minio-secret\n" +"            key: MINIO_HOST\n" +"      - name: BUCKET_NAME\n" +"        valueFrom:\n" +"          secretKeyRef:\n" +"            name: minio-secret\n" +"            key: BUCKET_NAME\n" +"  executor:\n" +"    cores: %s\n" +
############去除该配置#############################################################//"    instances: %s\n" +
##################################################################################"    memory: \"%s\"\n" +"    labels:\n" +"      version: 3.3.1\n" +"    volumeMounts:\n" +"      - name: nfs-spark-volume\n" +"        mountPath: \"/app/sparkcode\"\n" +"    env:\n" +"      - name: AWS_REGION\n" +"        valueFrom:\n" +"          secretKeyRef:\n" +"            name: minio-secret\n" +"            key: AWS_REGION\n" +"      - name: AWS_ACCESS_KEY_ID\n" +"        valueFrom:\n" +"          secretKeyRef:\n" +"            name: minio-secret\n" +"            key: AWS_ACCESS_KEY_ID\n" +"      - name: AWS_SECRET_ACCESS_KEY\n" +"        valueFrom:\n" +"          secretKeyRef:\n" +"            name: minio-secret\n" +"            key: AWS_SECRET_ACCESS_KEY\n" +"      - name: MINIO_ENDPOINT\n" +"        valueFrom:\n" +"          secretKeyRef:\n" +"            name: minio-secret\n" +"            key: MINIO_ENDPOINT\n" +"      - name: MINIO_HOST\n" +"        valueFrom:\n" +"          secretKeyRef:\n" +"            name: minio-secret\n" +"            key: MINIO_HOST\n" +"      - name: BUCKET_NAME\n" +"        valueFrom:\n" +"          secretKeyRef:\n" +"            name: minio-secret\n" +"            key: BUCKET_NAME\n" +"  sparkConf:\n" +"    spark.query.sql: \"%s\"",taskName,sparkImage,mainClass,sparkJarFile,driverCpu,driverMemory,executorCpu,executorMemory,dynamicSQLQuery);}

二 配置动态参数

//测试spark3.0 动态分配executor的instancepublic static void sparkQueryForFhcS3DynamicExecutor() throws Exception{System.out.println("=========================1");String warehouseLocation = new File("spark-warehouse").getAbsolutePath();System.out.println("===========================2");String metastoreUri = "thrift://wuxdihadl09b.seagate.com:9083";SparkConf sparkConf = new SparkConf();sparkConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");sparkConf.set("fs.s3a.access.key", "apPeWWr5KpXkzEW2jNKW");sparkConf.set("spark.hadoop.fs.s3a.path.style.access", "true");sparkConf.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "true");sparkConf.set("fs.s3a.secret.key", "cRt3inWAhDYtuzsDnKGLGg9EJSbJ083ekuW7PejM");sparkConf.set("fs.s3a.endpoint", "wuxdimiov001.seagate.com:9000"); // 替换为实际的 S3 存储的地址sparkConf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");sparkConf.set("spark.sql.metastore.uris", metastoreUri);sparkConf.set("spark.sql.warehouse.dir", warehouseLocation);sparkConf.set("spark.sql.catalogImplementation", "hive");sparkConf.set("hive.metastore.uris", metastoreUri);#####################################################添加动态参数#######################        //#总开关,是否开启动态资源配置,根据工作负载来衡量是否应该增加或减少executor,默认falsesparkConf.set("spark.dynamicAllocation.enabled", "true");//#spark3新增,之前没有官方支持的on k8s的Dynamic Resouce Allocation。启用shuffle文件跟踪,此配置不会回收保存了shuffle数据的executorsparkConf.set("spark.dynamicAllocation.shuffleTracking.enabled", "true");//#启用shuffleTracking时控制保存shuffle数据的executor超时时间,默认使用GC垃圾回收控制释放。如果有时候GC不及时,配置此参数后,即使executor上存在shuffle数据,也会被回收。sparkConf.set("spark.dynamicAllocation.shuffleTracking.timeout", "60s");//#动态分配最小executor个数,在启动时就申请好的,默认0sparkConf.set("spark.dynamicAllocation.minExecutors", "1");//#动态分配最大executor个数,默认infinitysparkConf.set("spark.dynamicAllocation.maxExecutors", "10");//#动态分配初始executor个数默认值=spark.dynamicAllocation.minExecutorssparkConf.set("spark.dynamicAllocation.initialExecutors", "2");//#当某个executor空闲超过这个设定值,就会被kill,默认60ssparkConf.set("spark.dynamicAllocation.executorIdleTimeout", "60s");//#当某个缓存数据的executor空闲时间超过这个设定值,就会被kill,默认infinitysparkConf.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "240s");//#任务队列非空,资源不够,申请executor的时间间隔,默认1s(第一次申请)sparkConf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "3s");//#同schedulerBacklogTimeout,是申请了新executor之后继续申请的间隔,默认=schedulerBacklogTimeout(第二次及之后)sparkConf.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", "30s");//#开启推测执行,对长尾task,会在其他executor上启动相同task,先运行结束的作为结果sparkConf.set("spark.specution", "true");#######################################################################################       //Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");long zhenyang2 =  System.currentTimeMillis();SparkSession sparkSession = SparkSession.builder().appName("Fhc Spark Query").config(sparkConf).enableHiveSupport().getOrCreate();System.out.println("sparkSession create cost:"+(System.currentTimeMillis()-zhenyang2));System.out.println("==============================3");// 获取 SparkConf 对象String exesql = sparkSession.sparkContext().getConf().get("spark.query.sql");System.out.println("==============================3.1:"+exesql);System.out.println("Hive Metastore URI: " + sparkConf.get("spark.sql.metastore.uris"));System.out.println("Hive Warehouse Directory: " + sparkConf.get("spark.sql.warehouse.dir"));System.out.println("SHOW DATABASES==============================3.1:"+exesql);sparkSession.sql("SHOW DATABASES").show();long zhenyang3 =  System.currentTimeMillis();Dataset<Row> sqlDF = sparkSession.sql(exesql);System.out.println("sparkSession sql:"+(System.currentTimeMillis()-zhenyang3));System.out.println("======================4");//System.out.println("===========sqlDF count===========:"+sqlDF.count());//sqlDF.show();long zhenyang5 =  System.currentTimeMillis();List<Row> jaList= sqlDF.javaRDD().collect();System.out.println("rdd collect cost:"+(System.currentTimeMillis()-zhenyang5));System.out.println("jaList list:"+jaList.size());List<TaskListModel> list = new ArrayList<TaskListModel>();long zhenyang4 =  System.currentTimeMillis();AtomicInteger i = new AtomicInteger(0);jaList.stream().forEachOrdered(result -> {i.incrementAndGet();//System.out.println("serial_num is :"+result.getString(1));});System.out.println("for each times:"+i.get());System.out.println("SparkDemo foreach cost:"+(System.currentTimeMillis()-zhenyang4));System.out.println("=========================5");sparkSession.close();sparkSession.stop();}

三 发布一,二中的程序(逻辑见前面的博客文章)

四 测试

首先提交一个简单sql: select * from cimarronbp_n.p_vbar_metric_summary limit 10

查看k8s spark operator生成的pod

根据pod启动的时间可以看出,先生成了2个executor,在16s后又生成了1个,最后完成,可以看出executor确实根据任务的执行情况动态生成了。而之前文章中的executor 20个是同一时间生成的

再测试一个join的sql

select distinct t1.serial_num,t1.trans_seq,t2.state_name,t2.p_vbar_metric_summary,t1.event_date from cimarronbp_n.p_vbar_metric_summary t1 left join cimarronbp_n.p_vbar_metric_summary t2 on t1.serial_num = t2.serial_num AND t1.trans_seq = t2.trans_seq where t1.event_date='20231204' and t1.family='2TJ' and t1.operation='CAL2'

查看k8s spark operator生成的pod

executor从2个到3个,3个到4个,是动态的!

五 本文参考链接

「Spark从精通到重新入门(二)」Spark中不可不知的动态资源分配-阿里云开发者社区资源是影响 Spark 应用执行效率的一个重要因素。Spark 应用中真正执行 task 的组件是 Executor,可以通过spark.executor.instances 指定 Spark 应用的 Executor 的数量。在运行过程中,无论 Executor上是否有 task 在执行,都会被一直占有直到此 Spark 应用结束。icon-default.png?t=N7T8https://developer.aliyun.com/article/832482

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/731867.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python快速入门系列-1

Python快速入门系列 第一章: Python简介1.1 Python的历史与发展1.2 Python的优势与特点1.2.1 易学易用1.2.2 动态类型1.2.3 丰富的标准库与第三方库1.2.4 面向对象与函数式编程1.2.5 广泛应用领域 1.3 Python的应用领域 第一章: Python简介 1.1 Python的历史与发展 Python是一…

sizeof和strlen的详细万字解读

sizeof和strlen的对比 sizeof不是函数 侧面证明sizeof不是函数 如果是函数 应该需要有括号 不能落下来 strlen 只针对字符串 包含头文件 string.h 并且这个是个函数 随机数值 sizeof里面有表达式的话 表达式里面是不参与计算的 下面的s求出的是4 就是因为是不参与计算的 不…

AI绘画StableDiffusion实操教程:冰霜旗袍美女

前几天分享了StableDiffusion的入门到精通教程&#xff1a;AI绘画&#xff1a;Stable Diffusion 终极炼丹宝典&#xff1a;从入门到精通 但是还有人就问&#xff1a;安装是安装好了&#xff0c;可是为什么生成的图片和你生成的图片差距那么远呢&#xff1f; 怎么真实感和质感…

pytorch CV入门3-预训练模型与迁移学习

专栏链接&#xff1a;https://blog.csdn.net/qq_33345365/category_12578430.html 初次编辑&#xff1a;2024/3/7&#xff1b;最后编辑&#xff1a;2024/3/8 参考网站-微软教程&#xff1a;https://learn.microsoft.com/en-us/training/modules/intro-computer-vision-pytorc…

GitHub会员充值

GitHub是一个基于Web的代码托管平台&#xff0c;为开发者提供了协作、版本控制和代码管理的工具。它允许个人和团队共同协作开发软件项目&#xff0c;并提供了许多功能&#xff0c;使得代码的管理和维护更加容易 版本控制系统&#xff1a; GitHub使用Git作为其版本控制系统。Gi…

基于Springboot的高校宣讲会管理系统。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的高校宣讲会管理系统。Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spring Spri…

基于Java的开放实验室管理系统(Vue.js+SpringBoot)

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 实验室类型模块2.2 实验室模块2.3 实验管理模块2.4 实验设备模块2.5 实验订单模块 三、系统设计3.1 用例设计3.2 数据库设计 四、系统展示五、样例代码5.1 查询实验室设备5.2 实验放号5.3 实验预定 六、免责说明 一、摘…

Charles抓包工具使用

Charles简介 Charles是一款基于HTTP协议的代理服务器和HTTP监视器&#xff0c;通过将自己设置为电脑或浏览器的网络访问代理&#xff0c;能够截取请求和请求结果&#xff0c;从而达到分析抓包的目的。它允许开发者查看所有连接互联网的HTTP通信&#xff0c;包括请求、响应和HTT…

人工智能|机器学习——Canopy聚类算法(密度聚类)

1.简介 Canopy聚类算法是一个将对象分组到类的简单、快速、精确地方法。每个对象用多维特征空间里的一个点来表示。这个算法使用一个快速近似距离度量和两个距离阈值T1 > T2 处理。 Canopy聚类很少单独使用&#xff0c; 一般是作为k-means前不知道要指定k为何值的时候&#…

专题一 - 双指针 - leetcode 202. 快乐数 | 简单难度

leetcode 202. 快乐数 leetcode 202. 快乐数 | 简单难度1. 题目详情1. 原题链接2. 基础框架 2. 解题思路1. 题目分析2. 算法原理3. 时间复杂度 3. 代码实现4. 知识与收获 leetcode 202. 快乐数 | 简单难度 1. 题目详情 编写一个算法来判断一个数 n 是不是快乐数。 「快乐数」…

简单BFF架构设计

又到周五了有了一个小时的闲暇时间简单写点东西&#xff0c;介绍一个简单的BFF的架构。BFF:Backends For Frontends,其实现在是个比较常见的前端架构设计的方案&#xff0c;其最大的优势便在于前端可以高度自由的在Node层做一些server端才可以做的东西&#xff0c;比如SSR、登录…

SSH安全协议介绍

知识改变命运&#xff0c;技术就是要分享&#xff0c;有问题随时联系&#xff0c;免费答疑&#xff0c;欢迎联系&#xff01; SSH&#xff08;Secure Shell&#xff0c;安全外壳&#xff09;是一种网络安全协议&#xff0c;通过加密和认证机制实现安全的访问和文件传输等业…

气象数据免费下载(超级好用)

你是不是做实验经常性的需要一些气象数据&#xff0c;例如PM2.5、相对湿度、月均温度等等…… 但是当你开始寻找数据时就遇到困难了&#xff0c;由于权限、数据网站之类的麻烦你会花费大量无用时间&#xff0c;甚至有时候一无所获得不偿失&#xff0c;这就很头疼了&#xff01;…

服务器配置禁止IP直接访问,只允许域名访问

联网信息系统需设置只允许通过域名访问&#xff0c;禁止使用IP地址直接访问&#xff0c;建议同时采用云防护技术隐藏系统真实IP地址且只允许云防护节点IP访问服务器&#xff0c;提升网络安全防护能力。 一、Nginx 修改配置文件nginx.conf&#xff0c;在server段里插入正则表达式…

智昊电气推出RCL-0923U型光伏并网点电压自动控制装置/分布式光伏并网点电压自动控制设备/电压控制器

一&#xff1a;行业背景分析 在新型电网的发展的业态下&#xff0c;随着以光伏&#xff0c;风电等可再生能源为代表的新型能源的大量并网接入&#xff0c;配电网从单向电网走向双向电网&#xff0c;从无源电网走向有源电网的演进。但新能源在并网过程对大电网带来诸多影响&…

2024甘肃事业单位报名流程,注意,超全超详细!

✔️报名时间&#xff1a;3月8日9:00-3月12日18:00 ✔️资格审查&#xff1a;3月8日9:00-3月13日18:00 ✔️报名缴费&#xff1a;3月8日9:30-3月14日18:00 ✔️打印准考证&#xff1a;4月16日9:00-4月21日9:00 ✔️笔试时间&#xff1a;2024年4月21日上午 8:30-10:00 职业能力倾…

RT-DETR优化改进:特征融合篇 | GELAN(广义高效层聚合网络)结构来自YOLOv9

🚀🚀🚀本文改进:使用GELAN改进架构引入到RT-DETR 🚀🚀🚀RT-DETR改进创新专栏:http://t.csdnimg.cn/vuQTz 🚀🚀🚀学姐带你学习YOLOv8,从入门到创新,轻轻松松搞定科研; 🚀🚀🚀RT-DETR模型创新优化,涨点技巧分享,科研小助手; 1.YOLOv9介绍 论…

【vue.js】文档解读【day 3】 | 列表渲染

如果阅读有疑问的话&#xff0c;欢迎评论或私信&#xff01;&#xff01; 文章目录 列表渲染v-forv-for 与对象在 v-for 里使用范围值template 上的 v-forv-for与v-if通过key管理状态组件上使用v-for数组变化侦测 列表渲染 v-for 在我们想要渲染出一个数组中的元素时&#xf…

C# 中 Math.Round 数学函数

在 C# 中&#xff0c;Math.Round 是一个数学函数&#xff0c;用于对一个浮点数进行四舍五入操作。它接受一个浮点数作为输入&#xff0c;并返回一个最接近输入值的整数或指定小数位数的浮点数。 Math.Round 方法有多个重载&#xff0c;其中最常用的重载有以下两种形式&#xf…

C语言数据结构之二叉堆

愿你千山暮雪 海棠依旧 不为岁月惊扰平添忧愁 &#x1f3a5;前期回顾-二叉树 &#x1f525;数据结构专栏 期待小伙伴们的支持与关注&#xff01;&#xff01;&#xff01; 目录 前期回顾 二叉堆的概念及结构 二叉堆的创建 顺序表的结构声明 顺序表的创建与销毁 二叉堆的插入 …