Spark的内核调度

目录

概述

RDD的依赖

 DAG和Stage

 DAG执行流程图形成和Stage划分

 Stage内部流程

Spark Shuffle

Spark中shuffle的发展历程

优化前的Hash shuffle

 经过优化后的Hash shuffle

 Sort shuffle

Sort shuffle的普通机制

Job调度流程

Spark RDD并行度


概述

Spark内核调度任务:

1.构建DAG有向无环图

2.划分stage夹断

3.Driver底层的运转

4.分区的划分(线程)

的Spark内核调度的目的:尽可能用最少的资源高效地完成任务计算

RDD的依赖

RDD的依赖:一个RDD的形成可能由一个或者多个RDD得到的,此时这个RDD和之前的RDD之间产生依赖关系

Spark中,RDD之间的依赖关系,只要有两种类型:宽依赖和窄依赖

窄依赖:

作用:能够让Spark程序并行计算,也就是一个分区数据计算出现问题的时候,其它分区不受影响

特点:父RDD的分区和子RDD的分区是一对一关系,也就是父RDD分区的数据会整个被下游子RDD的分区接收

宽依赖:

作用:划分stage的重要依据,宽依赖也叫shuffle依赖

特点:父RDD的分区和子RDD的分区关系是一对多的关系,也就是父RDD的分区数据会被划成多份给到下游子RDD的多个分区做接收

注意:如果有宽依赖,shuffle下游的其他操作,必须等待shuffle执行完成以后才能够继续执行,为了避免数据的不完整

算子中一般以ByKey结尾的会发生shuffle;另外是重分区算子会发生shuffle

 DAG和Stage

DAG:有向无环图,只要描述一段执行任务,从开始一直往下走,不允许出现回调操作

Spark应用程序中,遇到一个Action算子,就会触发一个JOB任务的产生

对于每个JOB的任务,都会产生一个DAG执行流程图,流程图的形成的层级关系如下:

层级关系:

1.一个spark应用程序→遇到一个Action算子,就会触发形成一个JOB任务

2.一个JOB任务只有一个DAG有向无环图

3.一个DAG有向无环图→有多个stage

4.一个stage→有多个Task线程

5.一个RDD→有多个分区

6.一个分区会被一个Task线程所处理

 DAG执行流程图形成和Stage划分

 1.spark应用程序遇到Action算子后,就会触发一个JOB任务的产生,JOB任务就会将它所依赖的算子全部加载进来,形成一个stage

2.接着从action算子从后往前回溯,遇到窄依赖就将算子放在同一个stage中,如果遇到宽依赖,就划分形成新的stage,最后一直到回溯完成

 Stage内部流程

 默认并行度值的确认:

1.使用textFile读取HDFS上的文件,因此RDD分区数=max(文件的block块数量,defaultminpartition),继续需要知道defaultminpartition的值是多少

2.defaultminpartition=min(spark.default.parallelism,2)取最小值,最终确认spark.default.parallelism的参数值就能最终确认RDD的分区数有多少个

spark.default.parallelism参数值的确认:

1.如果有父RDD,就取父RDD的最大分区数

2.如果没有父RDD,根据集群模式进行取值

        本地模式:机器的最大cpu核数

        Mesos:默认是8

        其它模式:所有执行节点上的核总数或2,以较大者为准

Spark Shuffle

Spark中shuffle的发展历程

1- 在1.1版本以前,Spark采用Hash shuffle (优化前 和 优化后)

2- 在1.1版本的时候,Spark推出了Sort Shuffle

3- 在1.5版本的时候,Spark引入钨丝计划(优化为主)

4- 在1.6版本的时候,将钨丝计划合并到sortShuffle中

5- 在2.0版本的时候,将Hash Shuffle移除,将Hash shuffle方案移植到Sort Shuffle

优化前的Hash shuffle

 存在的问题:

        上游(map端)的每个Task会产生与下游Task个数相等的小文件个数,导致上游有非常多的小文件,下游(reduce端)来拉取文件的时候,会有大量的网络IO和磁盘IO过程,因为要打开和读取多个小文件

 经过优化后的Hash shuffle

优化后的Hash shuffle:

变成了由每个Executor进程产生与下游Task个数相等的小文件数,这样可以大量减少小文件的产生,以及降低下游拉取文件时候的网络IO和磁盘IO过程

 Sort shuffle

 Sort shuffle分成了两种:普通机制和bypass机制,具体使用哪种由spark底层决定

Sort shuffle的普通机制

 普通机制的运行过程:

每个上游task线程处理数据,数据处理完以后,先放在内存中,接着对内存中的数据进行分区,排序,将内存中的数据溢写到磁盘,形成一个个小文件,溢写完成后,将多个小文件合并成一个大的磁盘文件,并且针对每个大的磁盘文件,提供一个索引文件,接着是下游Task根据索引文件来读取相应的数据

Sort shuffle的bypass机制 

bypass机制 :就是在普通机制的基础上,省略了排序的过程

bypass机制的触发条件:

1.上游的RDD数量不能超过100个

2.上游不能对数据进行提前聚合操作(因为提前聚合,需要先进行分组操作,而分组的操作实际上是有排序的操作)

Job调度流程

主要是讨论:在Driver内部,是如何调度任务

1.Driver进程启动后,底层PY4J创建SparkContext顶级对象,在创建该对象的进程中,还会创建另外两个对象,分别是:DAGScheduler和TaskScheduler

        DAGScheduler:DAG调度器,将Job任务形成DAG有向无环图和划分Stage的阶段

        TaskScheduler:Task调度器,将Task线程分配给到具体的Executor执行

2.一个saprk程序遇到一个action算子触发产生一个job任务,SparkContext将job任务给到DAG调度器,拿到job任务后,会将job任务形成有向无环图和划分stage阶段,并且确定每个stage有多少个Task线程,会将众多的Task线程放到TaskSet的集合中,DAG调度器将TaskSet集合给到Task调度器

3.Task调度器拿到TaskSet集合以后,将Task分配给到具体的Executor执行,底层是基于SchedulerBackend调度队列来实现的

4.Executor开始执行任务,并且Driver会监控各个Executor的执行状态,知道所有的Executor执行完成,就认为任务运行结束

5.Driver通知Namenote释放资源

Spark RDD并行度

整个Spark应用中,影响并行度的因素有以下两个原因:

        1.资源的并行度:Executor数量和CPU核数以及内存的大小

        2.数据的并行度:Task的线程和分区数量

一般将Task想层数量设置为CPU核数的2-3被,另外每个线程分配3-5GB的内存资源

说明: spark.default.parallelism该参数是SparkCore中的参数。该参数只会影响shuffle以后的分区数量。另外该参数对parallelize并行化本地集合创建的RDD不起作用。

import timefrom pyspark import SparkConf, SparkContext
import os# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':print("Spark入门案例: WordCount词频统计")# 1- 创建SparkContext对象conf = SparkConf()\.set("spark.default.parallelism", "5")\.setAppName('spark_wordcount_demo')\.setMaster('local[*]')# 设置并行度参数方式一# conf.set("spark.default.parallelism", "4")sc = SparkContext(conf=conf)# 2- 数据输入init_rdd = sc.textFile("file:///export/data/gz16_pyspark/01_spark_core/data/content.txt")# 3- 数据处理flatmap_rdd = init_rdd.flatMap(lambda line: line.split(" "))map_rdd = flatmap_rdd.map(lambda word: (word,1))# shuffle前分区数print("shuffle前分区数",map_rdd.getNumPartitions())result = map_rdd.reduceByKey(lambda agg,curr: agg+curr)# shuffle后分区数print("shuffle后分区数", result.getNumPartitions())# 4- 数据输出print(result.collect())# 5- 释放资源sc.stop()

通过parallelize构建得到RDD的分区情况(了解):

from pyspark import SparkConf, SparkContext
import os# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("并行化本地集合创建RDD")# 1- 创建SparkContext对象conf = SparkConf().setAppName('parallelize_rdd').setMaster('local[1]')# 设置并行度参数conf.set("spark.default.parallelism", 4)sc = SparkContext(conf=conf)# 2- 数据输入# 并行化本地集合得到RDDinit_rdd = sc.parallelize([1,2,3,4,5])# shuffle前分区数print("分区数", init_rdd.getNumPartitions())# 3- 数据处理# 4- 数据输出# 获取分区数print(init_rdd.getNumPartitions())# 获取具体分区内容print(init_rdd.glom().collect())# 5- 释放资源sc.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/617510.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

强化学习应用(四):基于Q-learning的无人机物流路径规划研究(提供Python代码)

一、Q-learning简介 Q-learning是一种强化学习算法,用于解决基于马尔可夫决策过程(MDP)的问题。它通过学习一个价值函数来指导智能体在环境中做出决策,以最大化累积奖励。 Q-learning算法的核心思想是通过不断更新一个称为Q值的…

vue表格插件vxe-table导出 excel

vxe-table 默认支持导出 CSV、HTML、XML、TXT格式的文件,不支持 xlsx 文件 要想导出 xlsx 文件,需要使用 vxe-table-plugin-export-xlsx 依赖 参考:https://cnpmjs.org/package/vxe-table-plugin-export-xlsx/v/2.1.0-beta 1.安装 npm inst…

jetson orin nano 使用yolov8导出engine

1. 导出onnx 经过前面训练,得到了best.pt模型,现在想要使用tensorrt进行推理,需要先导出为onnx格式,再转化为engine格式。 yolo export modelbest.pt formatonnx opset12 simplifyTrue2.解决错误 在导出过程中,可能…

Android代码混淆

Android之代码混淆 代码混淆的作用设置混淆1. 在模块目录下的 build.gradle 文件中配置以下代码2. 在 proguard-rules.pro 文件中添加混淆规则 通用混淆规则常用匹配符常用命令注意事项如何查看是否已混淆 代码混淆的作用 1.令 APK 难以被逆向工程,即很大程度上增加…

开源项目CuteSqlite开发笔记(七):CuteSqlite释放BETA版本啦

经过大半年的开发,CuteSqlite程序代码不知不觉来到了6万行,有效行数4万行,CuteSqlite开发完成了一个小版本,进入下一个阶段,并于2024元旦释放BETA版本,有兴趣的朋友可以下载试用。 GitHub下载https://gith…

Handsfree_ros_imu:ROS机器人IMU模块的get_imu_rpy.py文件学习记录

上一篇博客写了关于Handsfree_ros_imu:ROS机器人IMU模块ARHS姿态传感器(A9)Liunx系统Ubuntu20.04学习启动和运行教程: https://blog.csdn.net/qq_54900679/article/details/135539176?spm1001.2014.3001.5502 这次带来get_imu_r…

池化、线性、激活函数层

一、池化层 池化运算是深度学习中常用的一种操作,它可以对输入的特征图进行降采样,从而减少特征图的尺寸和参数数量。 池化运算的主要目的是通过“收集”和“总结”输入特征图的信息来提取出主要特征,并且减少对细节的敏感性。在池化运算中…

ElasticSearch 学习9 spring-boot ,elasticsearch7.16.1实现中文拼音分词搜索

一、elasticsearch官网下载:Elasticsearch 7.16.1 | Elastic 二、拼音、ik、繁简体转换插件安装 ik分词:GitHub - medcl/elasticsearch-analysis-ik: The IK Analysis plugin integrates Lucene IK analyzer into elasticsearch, support customized d…

高质量训练数据助力大语言模型摆脱数据困境 | 景联文科技

目前,大语言模型的发展已经取得了显著的成果,如OpenAI的GPT系列模型、谷歌的BERT模型、百度的文心一言模型等。这些模型在文本生成、问答系统、对话生成、情感分析、摘要生成等方面都表现出了强大的能力,为自然语言处理领域带来了新的突破。 …

Spring Boot集成Redis简单示例

要在Spring Boot中集成Redis&#xff0c;你可以使用Spring Data Redis库来简化操作。 下面是一个示例代码&#xff1a; 首先&#xff0c;在你的Spring Boot项目的pom.xml文件中添加以下依赖&#xff1a; <dependencies><!-- 其他依赖... --><dependency>&…

springboot启动加载数据库数据到内存

1、概述 一般来说&#xff0c;springboot工程环境配置放在properties文件中&#xff0c;启动的时候将工程中的properties/yaml文件的配置项加载到内存中。但这种方式改配置项的时候&#xff0c;需要重新编译部署&#xff0c;考虑到这种因素&#xff0c;今天介绍将配置项存到数…

ROS2——launcher

在ROS2中&#xff0c;launcher 文件是通过Python构建的&#xff0c;它们的功能是声明用哪些选项或参数来执行哪些程序&#xff0c;可以通过 launcher 文件快速同时启动多个节点。一个 launcher 文件内可以引用另一个 launcher 文件。 使用 launcher 文件 ros2 launch 可以代替…

掌握 Vue 响应式系统,让数据驱动视图(上)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

问答机器人prompt

def build_prompt(prompt_template, **kwargs): ‘’‘将 Prompt 模板赋值’‘’ prompt prompt_template for k, v in kwargs.items(): if isinstance(v, str): val v elif isinstance(v, list) and all(isinstance(elem, str) for elem in v): val ‘\n’.join(v) else: v…

人机协同中的偏序关系

偏序关系是指集合中的元素之间存在一种有限的、非全序的关系。在该关系下&#xff0c;元素之间可以进行比较&#xff0c;但不一定能够确定它们的相对顺序。 在人机协同中&#xff0c;偏序关系可以用来描述人和机器之间的合作关系、信息传递关系或任务分配关系。例如&#xff0c…

数据库面经---10则

数据库范式有哪些&#xff1a;​​​​​​​ 第一范式&#xff08;1NF&#xff09;&#xff1a; 数据表中的每一列都是不可分割的原子值。每一行数据在关系表中都有唯一标识&#xff0c;通常是通过主键来实现。第二范式&#xff08;2NF&#xff09;&#xff1a; 满足第一范式。…

GitLab任意用户密码重置漏洞(CVE-2023-7028)

GitLab CVE-2023-7028 POC user[email][]validemail.com&user[email][]attackeremail.com 本文链接&#xff1a; https://www.黑客.wang/wen/47.html

[论文笔记] PAI-Megatron中qwen和mistral合并到Megtron-LM

一、千问 关于tokenizer的改动: 1.1、更改build_tokenizer中tokenizer类的加载。 /mnt/nas/pretrain/code/Megatron-LM/megatron/tokenizer/__init__.py 或者 tokenizer.py 在build_tokenizer.py函数中: ​elif args.tokenizer_type == "QwenTokenizer":assert a…

Webhook端口中的自定义签名身份认证

概述 如果需要通过 Webhook 端口从交易伙伴处接收数据&#xff0c;但该交易伙伴可能对于安全性有着较高的要求&#xff0c;而不仅仅是用于验证入站 Webhook 要求的基本身份验证用户名/密码&#xff0c;或者用户可能只想在入站 Webhook 消息上增加额外的安全层。 使用 Webhook…

Servlet-基本概念

一、概念 根据百度百科&#xff1a;Servlet&#xff08;Server Applet&#xff09;是Java Servlet的简称&#xff0c;是用Java编写的服务器端程序&#xff0c;主要功能在于交互式地浏览和生成数据&#xff0c;生成动态Web内容。 加深理解&#xff1a; 上面提到的Web内容我们…