spark中Rdd依赖和SparkSQL介绍--学习笔记

1,RDD的依赖

1.1概念

rdd的特性之一

相邻rdd之间存在依赖关系(因果关系)

窄依赖

每个父RDD的一个Partition最多被子RDD的一个Partition所使用

父rdd和子rdd的分区是一对一(多对一)

触发窄依赖的算子

map(),flatMap(),filter()

宽依赖

父RDD的一个partition会被子rdd的多个Partition所使用
父rdd和子rdd的分区是一对多
触发宽依赖的算子:
grouBy(),groupByKey,sortBy(),SortByKey(),reduceByKey,distinct()

1.3DAG图计算

DAG又叫做有向无环图

管理rdd依赖关系,保证rdd按照依赖关系进行数据的顺序计算

会根据rdd的依赖关系将计算过程分成多个计算步骤,每个计算步骤成为一个fasse

在计算的rdd依赖关系中,一旦发生了宽依赖就会进行数据才分生成新的stage

1.4Spark术语

app-应用程序(一个py文件/一个交互式页面)

job->作业(调用action算子时会触发job)

stage->计算步骤,由DAG图根据宽依赖产生新的stage

task->任务,有多少个分区数,就有多少个task任务,task任务是以线程方式执行

1.5为什么划分stage计算步骤

spark的task的任务是以线程方式并行计算

线程方式并行计算会有资源竞争导致计算不准确问题

通过stage来解决计算不准确的问题

同一个stage中数据不会进行shuffle(重新洗牌),多个task是可以并行计算

不同stage之间是需要等待上一个stage执行完成后(获取所有数据),再执行下一个stage
如何划分stage?
对于窄依赖,partition的转换处理在说stage中完成计算,不划分(将窄依赖尽量放在同一个stage中,可以实现流水线计算)
对于宽依赖,只能在父EDD处理完成后,才能开始接下来的计算也就是说需要划分stage
遇到宽依赖就需要划分stage
在这里插入图片描述

2,Spark的运行流程(内核调度)

Spark的核心是根据RDD来实现的,Spark Scheduler则为Spark核心实现的重要一环,其作用就是任务调度。Spark的任务调度就是如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,将每个Stage中的任务发到指定节点运行。基于Spark的任务调度原理,可以合理规划资源利用,做到尽可能用最少的资源高效地完成任务计算。

  • DAGScheduler
    • 根据rdd间的依赖关系,将提交的job划分成多个stage。
    • 对每个stage中的task进行描述(task编号,task执行的rdd算子)
  • TaskScheduler
    • 获取DAGScheduler提交的task
    • 调用SchedulerBackend获取executor的资源信息
    • 给task分配资源,维护task和executor对应关系
    • 管理task任务队列
    • 将task给到SchedulerBackend,然后由SchedulerBackend分发对应的executor执行
  • SchedulerBackend
    • 向RM申请资源
    • 获取executor信息
    • 分发task任务
      在这里插入图片描述

3,Spark的shuffle过程

spark shuffle的两个部分

  • shuffle write 写 map阶段,上一个stage得到最后的结果写入
  • shuffle read 读 reduce阶段,下一个stage拉取上一个stage进行合并
  • 会进行文件的读写,影响spark的计算速度

sortshuffle

  • 进行的是排序计算
  • bypass模式版本和普通模式版本
  • bypass模式版本不会排序,会进行hash操作
  • 普通模式版本会进行排序
  • 可以通过配置指定按照哪种模式执行

在这里插入图片描述
无论是hash还是排序都是将相同key值放在一起处理

  • [(‘a’,1),(‘b’,2),(‘a’,1)]
  • hash(key)%分区数,相同的key数据余数是相同的,会放一起,交给同一个分区进行处理
  • 按照key排序,相同key的数据也会放在一起 ,然后交给同一分区处理

3.1 sparkShuffle配置

spark.shuffle.file.buffer
(默认是32K)。将数据写到磁盘文件之前会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。 可以将其调整为原来的2倍 3倍
spark.reducer.maxSizeInFlight
如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m,默认48M),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。
spark.shuffle.io.maxRetries :
shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。(默认是3次)
spark.shuffle.io.retryWait:
该参数代表了每次重试拉取数据的等待间隔。(默认为5s)
调优建议:一般的调优都是将重试次数调高,不调整时间间隔。
spark.shuffle.memoryFraction=10
该参数代表了Executor 1G内存中,分配给shuffle read task进行聚合操作内存比例。
spark.shuffle.manager
参数说明:该参数用于设置shufflemanager的类型(默认为sort)

Hash:spark1.x版本的默认值,HashShuffleManager

Sort:spark2.x版本的默认值,普通机制。当shuffle read task 的数量小于等于200采用bypass机制
spark.shuffle.sort.bypassMergeThreshold=200

  • 根据task数量决定sortshuffle的模式
  • task数量小于等于200 就采用bypass task大于200就采用普通模式
    当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些
pyspark --master yarn --name shuffle_demo --conf 'spark.shuffle.sort.bypassMergeThreshold=300'

代码中配置

SparkConf().set('spark.shuffle.sort.bypassMergeThreshold','300')# 将配置添加到sparkcontext中
# appName 就是计算任务名称指定和--name作用一样
# conf 参数就算是指定配置信息的
sc = SparkContext(master='yarn',appName='shuffle_demo',conf=conf)

4,spark并行度

4.1 资源并行度(物理并行)

资源并行度调整只能通过交互式,不能通过脚本,由executors节点数和cores核数决定

spark中cpu核心数据设置

  • –num-executors=2 设置executors数量
  • –executor-cores=2 设置每个executors中的cpu核心数,不能超过服务器cpu核心数

4.2数据并行度(逻辑并行)

由task数量决定,task由分区数决定。

为了保证task能充分利用cpu资源,实现并行计算,需要设置的task数量应该和资源并行度(cpu核心数)一致

  • task = cpu core 这样会导致计算快的task执行结束后,一些资源就会处于等待状态,浪费资源

    在实际公司中就要根据公司资源并行度设置分区数

  • 有的场景下公司会要求数据并行度大于资源并行度

  • 建议task数量是cpu core的2~3倍

  • 只有task足够多才能更好的利用资源,但是如果task很多的话,资源少,那么就会先执行一批后再执行下一批

4.3并行度设置

交互式模式

pyspark --master yarn --num-executors=3  --executor-cores=2

开发模式设置

spark-submit --master yarn --num-executors=3 --executor-cores=2  /root/python_spark/a.py

4.1spark调优

  • 并行度
    • 调整集群节点数和核心数
    • 调整数据分区数
  • shuffle
    • 调整缓冲区的大小 32kb 32M
    • 调整shuffle模式
    • 调整分配给read的过程聚合操作的内存大小
  • cache和checkpoint
    • 提升计算效率
    • 容错性
  • sql代码编写调优

5,sparkSql介绍

5.1什么是sparksql

是spark的一个模块

是Apache Spark用于处理数据结构化数据的模块

结构化数据

表数据 包含行,列字段

python ->DataFrame数据类型

java/scala - >Dataset数据类型

1.2特点

融合性

可以使用纯sql进行数据计算

可以使用DSL方式进行数据计算->将sql中的关键字替换成方法进行使用

统一数据访问

read->读取mysql/hdfs/es/kafka/hbase/文件数据转换成DataFrame数据类型

write->将计算结果保存到mysql/hdfs/es/kafka/hbase/文件兼容Hive

支持Hivesql转换成spark计算任务

标准化数据连接

可以使用三方工具(pycharm/datagrip)通过jdbc或odbc方式连接Spark Sql

5.2数据类型

三种数据类型

RDD:spark中最基础的数据类型,所有的组件的代码都是要转换成rdd任务进行执行,只是存储的数据值

Dataframe

sparksql中数据类型,结构化数据类型。

存储了数据值行数据,row对象

存储了表结构(schema)->schema对象

一条数据就是rdd中的一个元素

dataset类型->java/scale

一条数据就是一个dataframe

5.3DataFrame基本使用

from pyspark.sql import Row
from pyspark.sql.types import  *
from pyspark.sql import SparkSession#SparkSession类型
#SparkSession.builder  类名.属性名  -》返回builser类的对象ss = SparkSession.builder.getOrCreate()#row对象
row1 = Row(id = 1,name = '小明',age = 22)
row2 = Row(id = 2,name = '小红',age = 22)#创建schema对象(指定数据类型,方式一)
schemal1 = StructType().add('id',IntegerType(),True).\add('name',StringType(),False).\add(field='age',data_type=IntegerType(),nullable=True)#创建dataframe数据df对象
#data:接受的是一个结构化数据类型,二维数据类型[[],[]],[(),()],[{},{}]等都可以df1 = ss.createDataFrame(data=[row1,row2],schema=schemal1)df1.show()
# 结果
# +---+----+---+
# | id|name|age|
# +---+----+---+
# |  1|小明| 22|
# |  2|小红| 22|
# +---+----+---+
df1.printSchema()
# 结果
# root
#  |-- id: integer (nullable = true)
#  |-- name: string (nullable = false)
#  |-- age: integer (nullable = true)
#指定数据类型(方式二)
schemal2 = 'id int ,name string,age int'data_list = [(1,'张三',20),(2,'李四',30)]
df2 = ss.createDataFrame(data=data_list,schema=schemal2)
df2.show()data_list2 = [{'id':1,'name':'阿三','age':5},{'id':2,'name':'王六','age':80}]
#不指定数据类型会自动创建
df3 = ss.createDataFrame(data_list2)
df3.show()
# 结果
# +---+---+----+
# |age| id|name|
# +---+---+----+
# |  5|  1|阿三|
# | 80|  2|王六|
# +---+---+----+
df3.printSchema()
# 结果
# root
#  |-- age: long (nullable = true)
#  |-- id: long (nullable = true)
#  |-- name: string (nullable = true)

5.4Rdd和Dataframe的相互转换

    from pyspark.sql import SparkSessionfrom  pyspark.sql.types import *#创建ss对象ss = SparkSession.builder.getOrCreate()#创建sc对象(ss可以通过sparkContext方法将自己转化为sc对象)#@property装饰器可以实现调用方法时通过属性方式来调用sc = ss.sparkContext#sc对象才能创建rdd对象#rdd数据结构时列表嵌套->二位数据结构rdd1 = sc.parallelize([[1,'张三',20],[2,'李四',34]])df1 = ss.createDataFrame(data=rdd1)df1.show()# +---+----+---+# | _1|  _2| _3|# +---+----+---+# |  1|张三| 20|# |  2|李四| 34|# +---+----+---+df1.printSchema()#创建schema对象(指定数据类型,方式一)schemal1 = StructType().add('id',IntegerType(),True).\add('name',StringType(),False).\add(field='age',data_type=IntegerType(),nullable=True)#将rdd结构的数据转换为dataFramedf2 = ss.createDataFrame(data=rdd1,schema=schemal1)print(type(df2))#结果<class 'pyspark.sql.dataframe.DataFrame'>df2.show()rdd1 = df2.rddprint(type(rdd1))#结果<class 'pyspark.rdd.RDD'>######################schemal2 = 'id int ,name string,age int'df3 = ss.createDataFrame(data=rdd1,schema=schemal2)df3.show()#schema:后面只需要传入列名列表不需要类型df4 = rdd1.toDF(schema=['id' ,'name','age'])print(type(df4))#<class 'pyspark.sql.dataframe.DataFrame'>df4.show()#结果# +---+----+---+# | id|name|age|# +---+----+---+# |  1|张三| 20|# |  2|李四| 34|# +---+----+---+new_rdd = df3.rddprint(type(new_rdd))#结果<class 'pyspark.rdd.RDD'>print(new_rdd.collect())#获取rdd中每个row对象元素中的id列的值#x->rdd_map = new_rdd.map(lambda x:x.id)print(rdd_map.collect())

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/628426.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

suse ha集群多节点异常重启故障案例一则

关键词 suse linux hae 、pacemakeroracle、nfs crm_failcount、timeout、trace 一、问题现象 接故障反馈&#xff0c;某业务几套suse ha集群系统&#xff0c;在某天不同时间点&#xff0c;分别发生了oracle数据库及主机异常切换重启的故障现象&#xff0c;数据库切换重启期…

(亲测可行)关于提高IDEA运行速度的方案

1.作者IDEA软件版本和计算机内存 Ultimate 2022.1.2版IDEA&#xff0c;计算机内存为12GB 2.修改配置以提高IDEA运行速度的误区-调高堆内存 很多文章会教调配置的内存&#xff0c;但大多是让你调高堆内存&#xff0c;比如会让你调高-Xms -Xmx &#xff0c;这两种对应的是最…

Java实现城市桥梁道路管理系统 JAVA+Vue+SpringBoot+MySQL

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统展示四、核心代码4.1 查询城市桥梁4.2 新增城市桥梁4.3 编辑城市桥梁4.4 删除城市桥梁4.5 查询单个城市桥梁 五、免责说明 一、摘要 1.1 项目介绍 基于VueSpringBootMySQL的城市桥梁道路管理系统&#xff0c;支持…

LabVIEW在金属铜大气腐蚀预测评价系统中的应用

为了应对电子设备和仪器中金属铜因大气腐蚀带来的挑战&#xff0c;开发一种基于LabVIEW平台的先进预测评价系统。这个系统的设计宗旨是准确预测并评估在不同室内外环境中金属铜的腐蚀状况。我们团队在LabVIEW的强大数据处理和图形化编程支持下&#xff0c;结合实际的大气腐蚀数…

Vue实战:两种方式创建Vue项目

文章目录 一、实战概述二、实战步骤&#xff08;一&#xff09;安装Vue CLI脚手架1、从Node.js官网下载LTS版本2、安装Node.js到指定目录3、配置Node.js环境变量4、查看node版本5、查看npm版本6、安装Vue Cli脚手架7、查看Vue Cli版本 &#xff08;二&#xff09;命令行方式构建…

Linux 入门命令大全汇总 + Linux 集锦大全 【20240115】

文章目录 Linux 入门命令大全汇总Linux 集锦大全更多信息 Linux 入门命令大全汇总 别有一番风趣的alias 刚刚好合适的 apropos 命令 迷你计算器 bc 可看黄道吉日的 cal 全文可查看&#xff1a; Linux入门命令大全全文 Linux 集锦大全 linux终端中最漂亮的几款字体介绍及…

计算机网络安全教程(第三版)课后简答题答案大全[6-12章]

目录 第 6 章 网络后门与网络隐身 第 7 章 恶意代码分析与防治 第 8 章 操作系统安全基础 第 9 章 密码学与信息加密 第 10 章 防火墙与入侵检测 第 11 章 IP安全与Web安全 第 12 章 网络安全方案设计 链接&#xff1a;计算机网络安全教程(第三版)课后简答题答案大全[1-5…

设计模式 代理模式(静态代理 动态代理) 与 Spring Aop源码分析 具体是如何创建Aop代理的

代理模式 代理模式是一种结构型设计模式&#xff0c;它通过创建一个代理对象来控制对真实对象的访问。这种模式可以用于提供额外的功能操作&#xff0c;或者扩展目标对象的功能。 在代理模式中&#xff0c;代理对象与真实对象实现相同的接口&#xff0c;以便在任何地方都可以使…

系统性学习vue-组件及脚手架

书接上文 Vue组件及脚手架 初始化脚手架说明步骤 分析脚手架结构render函数修改默认配置ref属性props配置mixin 混入/混合定义混合局部混合全局混合 插件scoped样式安装less-loader 浏览器的本地存储 webStoragelocalStroage 本地存储sessionStorage 会话存储 组件自定义事件绑…

【方案】世微AP5127平均电流型LED降压恒流IC 12-50V /6V2.5A双色LED灯

这是一款双色切换的LED灯方案&#xff0c;12-50V 降压恒流,输出&#xff1a;6V 2.5A ​ 这是一款PWM工作模式 , 高效率、 外围简单、内置功率管&#xff0c;适用于 输入的 高 精度降压 LED 恒流驱动芯片。输出大功率可 达 25W&#xff0c;电流 2.5A。 可实现全亮/半亮功能切换…

重新认识Word——页眉页脚

重新认识Word——页眉页脚 节设置页脚第X页&#xff0c;共Y页 奇偶页不同页眉包含章节号清除页眉横线 我们之前已经全面的构建了我们的文章&#xff0c;现在我们来了解一下&#xff0c;我们毕业论文的页眉&#xff08;页面信息&#xff09;页脚&#xff08;页码&#xff09;的设…

遥测终端机选择要点:功能、稳定性与成本的综合考量

在当今的智能化时代&#xff0c;遥测终端机作为数据采集、传输和处理的关键设备&#xff0c;广泛应用于水利、气象、环保等领域。然而&#xff0c;面对市场上琳琅满目的遥测终端机产品&#xff0c;如何选择一款性能优良、稳定性高且成本合理的设备成为摆在用户面前的一大难题。…

Dockerfile的ADD和COPY

文章目录 环境ADD规则校验远程文件checksum添加Git仓库添加私有Git仓库ADD --link COPYCOPY --parent 使用ADD还是COPY&#xff1f;参考 环境 RHEL 9.3Docker Community 24.0.7 ADD ADD 指令把 <src> 的文件、目录、或URL链接的文件复制到 <dest> 。 ADD 有两种…

C++核心编程之类和对象---C++面向对象的三大特性--多态

目录 一、多态 1. 多态的概念 2.多态的分类&#xff1a; 1. 静态多态&#xff1a; 2. 动态多态&#xff1a; 3.静态多态和动态多态的区别&#xff1a; 4.动态多态需要满足的条件&#xff1a; 4.1重写的概念&#xff1a; 4.2动态多态的调用&#xff1a; 二、多态 三、多…

散列函数,哈希表hash table

附上一句话&#xff1a;我知道大家可能曾经了解过这个散列表了&#xff0c;我发现&#xff0c;如果多看几个相关的视频&#xff0c;从不同的表述方式和不同的理解角度来理解这个问题&#xff0c;我会明白的更透彻&#xff0c;也有更多新的收获&#xff0c;尤其是对这个算法的应…

【PostgreSQL】安装和常用命令教程

PostgreSQL window安装教程 window安装PostgreSQL 建表语句&#xff1a; DROP TABLE IF EXISTS student; CREATE TABLE student (id serial NOT NULL,name varchar(100) NOT NULL,sex varchar(5) NOT NULL,PRIMARY KEY (id) );INSERT INTO student (id, name, sex) VALUES (…

无人超市系统的设计与实现:从需求分析到实际应用

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

Python超详细基础文件操作(详解版)

一、文件操作 1. 文件打开与关闭 1.1 打开文件 在Python中&#xff0c;你可以使用 open() 函数来打开文件。 以下是一个简单的例子&#xff1a; # 打开文件&#xff08;默认为只读模式&#xff09; file_path example.txt with open(file_path, r) as file:# 执行文件操作…

C语言天花板——指针(进阶2)

好久不见了各位&#xff0c;甚是想念啊&#xff01;&#xff01;&#xff01;&#x1f3b6;&#x1f3b6;&#x1f3b6; 文章接上次的指针(进阶1)(http://t.csdnimg.cn/c39SJ)&#xff0c;我们继续发车咯&#x1f697;&#x1f697;&#x1f697; 五、函数指针 上次我们只是浅…

网卡唯一标识你了解吗?MAC地址详解

本文内容&#xff1a; MAC地址概述 MAC地址组成 单播、组播、广播MAC地址 本地管理和全球管理MAC地址 一、MAC地址概述 MAC地址&#xff08;Media Access Control Address&#xff09;的全称叫做媒体访问控制地址&#xff0c;也称作局域网地址&#xff0c;以太网地址或者物…