Spark的内核调度

目录

概述

RDD的依赖

 DAG和Stage

 DAG执行流程图形成和Stage划分

 Stage内部流程

Spark Shuffle

Spark中shuffle的发展历程

优化前的Hash shuffle

 经过优化后的Hash shuffle

 Sort shuffle

Sort shuffle的普通机制

Job调度流程

Spark RDD并行度


概述

Spark内核调度任务:

1.构建DAG有向无环图

2.划分stage夹断

3.Driver底层的运转

4.分区的划分(线程)

的Spark内核调度的目的:尽可能用最少的资源高效地完成任务计算

RDD的依赖

RDD的依赖:一个RDD的形成可能由一个或者多个RDD得到的,此时这个RDD和之前的RDD之间产生依赖关系

Spark中,RDD之间的依赖关系,只要有两种类型:宽依赖和窄依赖

窄依赖:

作用:能够让Spark程序并行计算,也就是一个分区数据计算出现问题的时候,其它分区不受影响

特点:父RDD的分区和子RDD的分区是一对一关系,也就是父RDD分区的数据会整个被下游子RDD的分区接收

宽依赖:

作用:划分stage的重要依据,宽依赖也叫shuffle依赖

特点:父RDD的分区和子RDD的分区关系是一对多的关系,也就是父RDD的分区数据会被划成多份给到下游子RDD的多个分区做接收

注意:如果有宽依赖,shuffle下游的其他操作,必须等待shuffle执行完成以后才能够继续执行,为了避免数据的不完整

算子中一般以ByKey结尾的会发生shuffle;另外是重分区算子会发生shuffle

 DAG和Stage

DAG:有向无环图,只要描述一段执行任务,从开始一直往下走,不允许出现回调操作

Spark应用程序中,遇到一个Action算子,就会触发一个JOB任务的产生

对于每个JOB的任务,都会产生一个DAG执行流程图,流程图的形成的层级关系如下:

层级关系:

1.一个spark应用程序→遇到一个Action算子,就会触发形成一个JOB任务

2.一个JOB任务只有一个DAG有向无环图

3.一个DAG有向无环图→有多个stage

4.一个stage→有多个Task线程

5.一个RDD→有多个分区

6.一个分区会被一个Task线程所处理

 DAG执行流程图形成和Stage划分

 1.spark应用程序遇到Action算子后,就会触发一个JOB任务的产生,JOB任务就会将它所依赖的算子全部加载进来,形成一个stage

2.接着从action算子从后往前回溯,遇到窄依赖就将算子放在同一个stage中,如果遇到宽依赖,就划分形成新的stage,最后一直到回溯完成

 Stage内部流程

 默认并行度值的确认:

1.使用textFile读取HDFS上的文件,因此RDD分区数=max(文件的block块数量,defaultminpartition),继续需要知道defaultminpartition的值是多少

2.defaultminpartition=min(spark.default.parallelism,2)取最小值,最终确认spark.default.parallelism的参数值就能最终确认RDD的分区数有多少个

spark.default.parallelism参数值的确认:

1.如果有父RDD,就取父RDD的最大分区数

2.如果没有父RDD,根据集群模式进行取值

        本地模式:机器的最大cpu核数

        Mesos:默认是8

        其它模式:所有执行节点上的核总数或2,以较大者为准

Spark Shuffle

Spark中shuffle的发展历程

1- 在1.1版本以前,Spark采用Hash shuffle (优化前 和 优化后)

2- 在1.1版本的时候,Spark推出了Sort Shuffle

3- 在1.5版本的时候,Spark引入钨丝计划(优化为主)

4- 在1.6版本的时候,将钨丝计划合并到sortShuffle中

5- 在2.0版本的时候,将Hash Shuffle移除,将Hash shuffle方案移植到Sort Shuffle

优化前的Hash shuffle

 存在的问题:

        上游(map端)的每个Task会产生与下游Task个数相等的小文件个数,导致上游有非常多的小文件,下游(reduce端)来拉取文件的时候,会有大量的网络IO和磁盘IO过程,因为要打开和读取多个小文件

 经过优化后的Hash shuffle

优化后的Hash shuffle:

变成了由每个Executor进程产生与下游Task个数相等的小文件数,这样可以大量减少小文件的产生,以及降低下游拉取文件时候的网络IO和磁盘IO过程

 Sort shuffle

 Sort shuffle分成了两种:普通机制和bypass机制,具体使用哪种由spark底层决定

Sort shuffle的普通机制

 普通机制的运行过程:

每个上游task线程处理数据,数据处理完以后,先放在内存中,接着对内存中的数据进行分区,排序,将内存中的数据溢写到磁盘,形成一个个小文件,溢写完成后,将多个小文件合并成一个大的磁盘文件,并且针对每个大的磁盘文件,提供一个索引文件,接着是下游Task根据索引文件来读取相应的数据

Sort shuffle的bypass机制 

bypass机制 :就是在普通机制的基础上,省略了排序的过程

bypass机制的触发条件:

1.上游的RDD数量不能超过100个

2.上游不能对数据进行提前聚合操作(因为提前聚合,需要先进行分组操作,而分组的操作实际上是有排序的操作)

Job调度流程

主要是讨论:在Driver内部,是如何调度任务

1.Driver进程启动后,底层PY4J创建SparkContext顶级对象,在创建该对象的进程中,还会创建另外两个对象,分别是:DAGScheduler和TaskScheduler

        DAGScheduler:DAG调度器,将Job任务形成DAG有向无环图和划分Stage的阶段

        TaskScheduler:Task调度器,将Task线程分配给到具体的Executor执行

2.一个saprk程序遇到一个action算子触发产生一个job任务,SparkContext将job任务给到DAG调度器,拿到job任务后,会将job任务形成有向无环图和划分stage阶段,并且确定每个stage有多少个Task线程,会将众多的Task线程放到TaskSet的集合中,DAG调度器将TaskSet集合给到Task调度器

3.Task调度器拿到TaskSet集合以后,将Task分配给到具体的Executor执行,底层是基于SchedulerBackend调度队列来实现的

4.Executor开始执行任务,并且Driver会监控各个Executor的执行状态,知道所有的Executor执行完成,就认为任务运行结束

5.Driver通知Namenote释放资源

Spark RDD并行度

整个Spark应用中,影响并行度的因素有以下两个原因:

        1.资源的并行度:Executor数量和CPU核数以及内存的大小

        2.数据的并行度:Task的线程和分区数量

一般将Task想层数量设置为CPU核数的2-3被,另外每个线程分配3-5GB的内存资源

说明: spark.default.parallelism该参数是SparkCore中的参数。该参数只会影响shuffle以后的分区数量。另外该参数对parallelize并行化本地集合创建的RDD不起作用。

import timefrom pyspark import SparkConf, SparkContext
import os# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':print("Spark入门案例: WordCount词频统计")# 1- 创建SparkContext对象conf = SparkConf()\.set("spark.default.parallelism", "5")\.setAppName('spark_wordcount_demo')\.setMaster('local[*]')# 设置并行度参数方式一# conf.set("spark.default.parallelism", "4")sc = SparkContext(conf=conf)# 2- 数据输入init_rdd = sc.textFile("file:///export/data/gz16_pyspark/01_spark_core/data/content.txt")# 3- 数据处理flatmap_rdd = init_rdd.flatMap(lambda line: line.split(" "))map_rdd = flatmap_rdd.map(lambda word: (word,1))# shuffle前分区数print("shuffle前分区数",map_rdd.getNumPartitions())result = map_rdd.reduceByKey(lambda agg,curr: agg+curr)# shuffle后分区数print("shuffle后分区数", result.getNumPartitions())# 4- 数据输出print(result.collect())# 5- 释放资源sc.stop()

通过parallelize构建得到RDD的分区情况(了解):

from pyspark import SparkConf, SparkContext
import os# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("并行化本地集合创建RDD")# 1- 创建SparkContext对象conf = SparkConf().setAppName('parallelize_rdd').setMaster('local[1]')# 设置并行度参数conf.set("spark.default.parallelism", 4)sc = SparkContext(conf=conf)# 2- 数据输入# 并行化本地集合得到RDDinit_rdd = sc.parallelize([1,2,3,4,5])# shuffle前分区数print("分区数", init_rdd.getNumPartitions())# 3- 数据处理# 4- 数据输出# 获取分区数print(init_rdd.getNumPartitions())# 获取具体分区内容print(init_rdd.glom().collect())# 5- 释放资源sc.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/617510.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

强化学习应用(四):基于Q-learning的无人机物流路径规划研究(提供Python代码)

一、Q-learning简介 Q-learning是一种强化学习算法,用于解决基于马尔可夫决策过程(MDP)的问题。它通过学习一个价值函数来指导智能体在环境中做出决策,以最大化累积奖励。 Q-learning算法的核心思想是通过不断更新一个称为Q值的…

jetson orin nano 使用yolov8导出engine

1. 导出onnx 经过前面训练,得到了best.pt模型,现在想要使用tensorrt进行推理,需要先导出为onnx格式,再转化为engine格式。 yolo export modelbest.pt formatonnx opset12 simplifyTrue2.解决错误 在导出过程中,可能…

Android代码混淆

Android之代码混淆 代码混淆的作用设置混淆1. 在模块目录下的 build.gradle 文件中配置以下代码2. 在 proguard-rules.pro 文件中添加混淆规则 通用混淆规则常用匹配符常用命令注意事项如何查看是否已混淆 代码混淆的作用 1.令 APK 难以被逆向工程,即很大程度上增加…

开源项目CuteSqlite开发笔记(七):CuteSqlite释放BETA版本啦

经过大半年的开发,CuteSqlite程序代码不知不觉来到了6万行,有效行数4万行,CuteSqlite开发完成了一个小版本,进入下一个阶段,并于2024元旦释放BETA版本,有兴趣的朋友可以下载试用。 GitHub下载https://gith…

Handsfree_ros_imu:ROS机器人IMU模块的get_imu_rpy.py文件学习记录

上一篇博客写了关于Handsfree_ros_imu:ROS机器人IMU模块ARHS姿态传感器(A9)Liunx系统Ubuntu20.04学习启动和运行教程: https://blog.csdn.net/qq_54900679/article/details/135539176?spm1001.2014.3001.5502 这次带来get_imu_r…

池化、线性、激活函数层

一、池化层 池化运算是深度学习中常用的一种操作,它可以对输入的特征图进行降采样,从而减少特征图的尺寸和参数数量。 池化运算的主要目的是通过“收集”和“总结”输入特征图的信息来提取出主要特征,并且减少对细节的敏感性。在池化运算中…

ElasticSearch 学习9 spring-boot ,elasticsearch7.16.1实现中文拼音分词搜索

一、elasticsearch官网下载:Elasticsearch 7.16.1 | Elastic 二、拼音、ik、繁简体转换插件安装 ik分词:GitHub - medcl/elasticsearch-analysis-ik: The IK Analysis plugin integrates Lucene IK analyzer into elasticsearch, support customized d…

高质量训练数据助力大语言模型摆脱数据困境 | 景联文科技

目前,大语言模型的发展已经取得了显著的成果,如OpenAI的GPT系列模型、谷歌的BERT模型、百度的文心一言模型等。这些模型在文本生成、问答系统、对话生成、情感分析、摘要生成等方面都表现出了强大的能力,为自然语言处理领域带来了新的突破。 …

ROS2——launcher

在ROS2中,launcher 文件是通过Python构建的,它们的功能是声明用哪些选项或参数来执行哪些程序,可以通过 launcher 文件快速同时启动多个节点。一个 launcher 文件内可以引用另一个 launcher 文件。 使用 launcher 文件 ros2 launch 可以代替…

掌握 Vue 响应式系统,让数据驱动视图(上)

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云…

GitLab任意用户密码重置漏洞(CVE-2023-7028)

GitLab CVE-2023-7028 POC user[email][]validemail.com&user[email][]attackeremail.com 本文链接: https://www.黑客.wang/wen/47.html

Webhook端口中的自定义签名身份认证

概述 如果需要通过 Webhook 端口从交易伙伴处接收数据,但该交易伙伴可能对于安全性有着较高的要求,而不仅仅是用于验证入站 Webhook 要求的基本身份验证用户名/密码,或者用户可能只想在入站 Webhook 消息上增加额外的安全层。 使用 Webhook…

【数据采集与预处理】流数据采集工具Flume

目录 一、Flume简介 (一)Flume定义 (二)Flume作用 二、Flume组成架构 三、Flume安装配置 (一)下载Flume (二)解压安装包 (三)配置环境变量 &#xf…

环形链表[简单]

优质博文:IT-BLOG-CN 一、题目 给你一个链表的头节点head,判断链表中是否有环。 如果链表中有某个节点,可以通过连续跟踪next指针再次到达,则链表中存在环。为了表示给定链表中的环,评测系统内部使用整数pos来表示链…

数据结构中的一棵树

一、树是什么? 有根有枝叶便是树!根只有一个,枝叶可以有,也可以没有,可以有一个,也可以有很多。 就像这样: 嗯,应该是这样: 二、一些概念 1、高度 树有多高&#x…

MySQL之导入导出远程备份(详细讲解)

文章目录 一、Navicat导入导出二、mysqldump命令导入导出2.1导出2.2导入(使用mysqldump导入 包含t_log表的整个数据库) 三、LOAD DATA INFILE命令导入导出3.1设置;3.2导出3.3导入(使用单表数据导入load data infile的方式) 四、远程备份4.1导出4.2导入 一…

redis系列:01 数据类型及操作

redis的数据类型有哪些 string,list,set,sorted_set,hash 操作 sting: set name maliao get name exists name expire name 5 ttl name del name setex name 10 maliao 设置key和过期时间 setnx name maliao 当key不存在时才添加list: lpush letter a lpush le…

OpenCV-22高斯滤波

一、高斯函数的基础 要理解高斯滤波首先要直到什么是高斯函数,高斯函数是符合高斯分布的(也叫正态分布)的数据的概率密度函数。 高斯函数的特点是以x轴某一点(这一点称为均值)为对称轴,越靠近中心数据发生…

【Linux实用篇】Linux常用命令(1)

目录 1.1 Linux命令初体验 1.1.1 常用命令演示 1.1.2 Linux命令使用技巧 1.1.3 Linux命令格式 1.2 文件目录操作命令 1.2.1 ls 1.2.2 cd 1.2.3 cat 1.2.4 more 1.2.5 tail 1.2.6 mkdir 1.2.7 rmdir 1.2.8 rm 1.1 Linux命令初体验 1.1.1 常用命令演示 在这一部分中…

遥感影像-语义分割数据集:Landsat8云数据集详细介绍及训练样本处理流程

原始数据集详情 简介:该云数据集包括RGB三通道的高分辨率图像,在全球不同区域的分辨率15米。这些图像采集自Lansat8的五种主要土地覆盖类型,即水、植被、湿地、城市、冰雪和贫瘠土地。 KeyValue卫星类型landsat8覆盖区域未知场景水、植被、…