Spark学习笔记
前言:今天是温习 Spark 的第 3 天啦!主要梳理了 Spark 核心数据结构:RDD(弹性分布式数据集),包括RDD持久化,checkpoint机制,spark两种共享变量以及spark内核调度原理,希望对大家有帮助!
Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!
喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"
文章目录
- Spark学习笔记
- 5. RDD持久化[掌握]
- (1)为什么使用缓存
- (2)如何进行缓存
- (3)何时缓存数据
- 6. Checkpoint机制[掌握]
- (1) 为什么要检查点
- (2)如何进行检查点
- (3)检查点机制有哪些作用
- (4) 如何实现spark的容错
- (5)持久化和检查点的区别
- (6)持久化和检查点并存
- 7.两种共享变量[掌握]
- (1)累加器
- (2)广播变量
- 8. Spark的内核调度
- (1) RDD依赖
- (2) DAG
- (3) Job的调度流程
(本节的所有数据集放在我的资源下载区哦,感兴趣的小伙伴可以自行下载:
最全面的SparkCore系列案例数据集
)
5. RDD持久化[掌握]
(1)为什么使用缓存
- 缓存可以加速计算,比如在wordcount操作的时候对reduceByKey算子进行cache的缓存操作,这时候后续的操作直接基于缓存后续的计算
- 缓存可以解决容错问题,因为RDD是基于依赖链的Dependency
- 使用经验:一次缓存可以多次使用
(2)如何进行缓存
- spark中提供cache方法
- spark中提供persist方法
_15_acheOrpersist.py# -*- coding: utf-8 -*-
# Program function:演示join操作
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
import time
if __name__ == '__main__':print('PySpark join Function Program')# TODO:1、创建应用程序入口SparkContext实例对象conf = SparkConf().setAppName("miniProject").setMaster("local[*]")sc = SparkContext.getOrCreate(conf)# TODO: 2、从本地文件系统创建RDD数据集x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])y = sc.parallelize([(1001, "sales"), (1002, "tech")])# TODO:3、使用join完成联合操作join_result_rdd = x.join(y)print(join_result_rdd.collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]print(x.leftOuterJoin(y).collect())print(x.rightOuterJoin(y).collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]# 缓存--基于内存缓存-cache底层调用的是self.persist(StorageLevel.MEMORY_ONLY)join_result_rdd.cache()# join_result_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)# 如果执行了缓存的操作,需要使用action算子触发,在4040页面上看到绿颜色标识join_result_rdd.collect()# 如果后续执行任何的操作会直接基于上述缓存的数据执行,比如count==============================> 4040端口出现绿点print(join_result_rdd.count())print(join_result_rdd.first())time.sleep(600)sc.stop()
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech')), (1003, ('wangwu', None)), (1004, ('zhangliu', None))]
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
2
(1001, ('zhangsan', 'sales'))
(3)何时缓存数据
- rdd来之不易
- 经过很长依赖链计算
- 经过shuffle
- rdd被使用多次
- 缓存cache或persist
问题1:缓存将数据保存在内存或磁盘中,内存或磁盘都属于易失介质
内存在重启之后没有数据了,磁盘也会数据丢失
注意:缓存会将依赖链进行保存的
问题2:如何解决基于cache或persist的存储在易失介质的问题?
引入checkpoint检查点机制
将元数据和数据统统存储在HDFS的非易失介质,HDFS有副本机制
checkpoint切断依赖链,直接基于保存在hdfs的中元数据和数据进行后续计算
什么是元数据?
管理数据的数据
比如,数据大小,位置等都是元数据
6. Checkpoint机制[掌握]
(1) 为什么要检查点
为什么有检查点机制?
- 1-因为cache或perisist将数据缓存在内存或磁盘中,会有丢失数据情况,引入检查点机制,可以将数据斩断依赖之后存储到HDFS的非易失介质中,解决Spark的容错问题
- 2-Spark的容错问题?
- 有一些rdd出错怎么办?可以借助于cache或Persist,或checkpoint
(2)如何进行检查点
如何使用检查点机制?
- 1-指定数据保存在哪里?:sc.setCheckpointDir(“hdfs://node1:9820/chehckpoint/”)
- 2-对谁缓存?:算子
- 3-rdd1.checkpoint() :斩断依赖关系进行检查点
- 4-检查点机制触发方式:action算子可以触发
- 5-后续的计算过程:Spark机制直接从checkpoint中读取数据
(3)检查点机制有哪些作用
检查点机制那些作用?
- 将数据和元数据保存在HDFS中
- 后续执行rdd的计算直接基于checkpoint的rdd
- 起到了容错的作用
(4) 如何实现spark的容错
面试题:如何实现Spark的容错?
- 1-首先会查看Spark是否对数据缓存,cache或perisist,直接从缓存中提取数据
- 2-否则查看checkpoint是否保存数据
- 3-否则根据依赖关系重建RDD
(5)持久化和检查点的区别
- 1-存储位置:缓存放在内存或本地磁盘,检查点机制在hdfs
- 2-生命周期:缓存通过LRU或unpersist释放,检查点机制会根据文件一直存在
- 3-依赖关系:缓存保存依赖关系,检查点斩断依赖关系链
_16_checkpoint.py
# -*- coding: utf-8 -*-
# Program function:checkpoint RDDfrom pyspark import SparkContext, SparkConf
import os
import timefrom pyspark.storagelevel import StorageLevelos.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python3"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONif __name__ == '__main__':print('PySpark checkpoint Program')# TODO:1、创建应用程序入口SparkContext实例对象conf = SparkConf().setAppName("miniProject").setMaster("local[*]")sc = SparkContext.getOrCreate(conf)# TODO: 2、RDD的checkpointsc.setCheckpointDir("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/checkpoint1")# TODO: 3、调用集合RDD中函数处理分析数据fileRDD = sc.textFile("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/words.txt")# TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发fileRDD.checkpoint()print(fileRDD.count())# TODO: 再次执行count函数, 此时从checkpoint读取数据print(fileRDD.count())time.sleep(100)print('停止 PySpark SparkSession 对象')# 关闭SparkContextsc.stop()
2
2
停止 PySpark SparkSession 对象
(6)持久化和检查点并存
先cache 再 checkpoint测试
- 1-读取数据文件
- 2-设置检查点目录
- 3-rdd.checkpoint() 和rdd.cache()
- 4-执行action操作,根据spark容错选择首先从cache中读取数据,时间更少,速度更快
- 5-如果对rdd实现unpersist
- 6-从checkpoint中读取rdd的数据
- 7-通过action可以查看时间
_17_acheCheckpoint.py
# -*- coding: utf-8 -*-
# Program function:cache&checkpoint RDDfrom pyspark import SparkContext, SparkConf
import os
import timefrom pyspark.storagelevel import StorageLevelos.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python3"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONif __name__ == '__main__':print('PySpark cache&checkpoint Program')# TODO:1、创建应用程序入口SparkContext实例对象conf = SparkConf().setAppName("miniProject").setMaster("local[*]")sc = SparkContext.getOrCreate(conf)# TODO: 2、RDD的checkpointsc.setCheckpointDir("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/checkpoint1")# TODO: 3、调用集合RDD中函数处理分析数据fileRDD = sc.textFile("/export/data/spark_practice/PySpark-SparkCore_3.1.2/data/words.txt")# TODO: 调用checkpoint和cache函数,将RDD进行容错,需要RDD中Action函数触发print("=======1-同时做cache和Perisist========")fileRDD.cache()fileRDD.checkpoint()print("=======2-启动Job1跑正常任务,启动Job2就会先从Cache读取数据,Web页面可以看到ProcessLocal========")fileRDD.count()# TODO: 再次执行count函数, 此时从checkpoint读取数据fileRDD.count()print("=======3-启动一个Job发现查询数据从checkpoint的hdfs中查找========")# TODO:释放cache之后如果在查询数据从哪里读取? 答案是checkpoint的hdfs的数据中。fileRDD.unpersist(True)fileRDD.count()time.sleep(100)print('停止 PySpark SparkSession 对象')# 关闭SparkContextsc.stop()
7.两种共享变量[掌握]
(1)累加器
- 1-原理
- 在Driver端和exeutor端可以共享Executor执行计算的结果
- 2-不使用累加器
- python本地集合可以直接得到结果
- 但是在分布式集合中得不到累加的
- 3-使用累加器
- acc=sc.accumulate(10),10是初始值
- acc.add(num)
- print(acc.value)通过value获取累加器的值
(2)广播变量
- 1-广播变量不是在每个Task拥有一份变量,而是每个节点的executor一份副本
- 2-广播变量通过本地的executor从blockmanager中过去driver上面变量的副本(计算资源+计算程序)
8. Spark的内核调度
(1) RDD依赖
- RDD依赖
- 为什么设计依赖?
- 1-为了实现Spark的容错,rdd1-rdd2-rdd3-rdd4
- 2-并行计算,划分依赖、
- 为什么划分宽窄依赖?
- 为了加速并行计算
- 窄依赖可以并行计算,如果是宽依赖无法并行计算
- 依赖的划分
- 窄依赖:*父 RDD 与子 RDD 间的分区是一对一的*
- 宽依赖:划分Stage
- *父 RDD 中的分区可能会被多个子 RDD 分区使用*
- 如何区分宽窄依赖?
- 比如map。filter,flatMap 窄依赖,无需进行shuffle
- 比如reduceByKey(合并多个窄依赖),groupByKey,宽依赖(shuffle)
- 不能说:一个子RDD依赖于多个父rdd,该种情况无法判断
(2) DAG
什么是DAG?
- 有向无环图
- DAG如何划分Stage?
- 一个Dag就是一个Job,一个Dag是由Action算子进行划分
- 一个Job下面有很多Stage,根据宽依赖Shuffle依赖划分Stage
- 一个Spark应用程序包括Job、Stage及Task:
- 第一:Job是以Action方法为界,遇到一个Action方法则触发一个Job;一个Job就是dag
- 第二:Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分;
- 第三:Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。
(3) Job的调度流程
-
1-用户代码编写: 用户根据需求编写 Spark 应用程序,包括定义 RDD、转换操作和行动操作等。
-
2-DAG 构建: Spark 将用户编写的代码进行解析,并构建出一个有向无环图(DAG),该图表示了任务之间的依赖关系。DAG 由一系列的阶段(Stage)组成,每个阶段包含一组可以并行执行的任务。
-
3-Stage 划分: 根据任务之间的依赖关系,Spark 将 DAG 进一步划分为不同的阶段。一个阶段包含一组可以在无需 shuffle 的情况下并行执行的任务。
-
4-Task 划分: 对于每个阶段,Spark 将其划分为一系列的任务(Task),每个任务对应于一个 RDD partition 的处理。任务的划分是根据数据的分区方式和计算的转换操作来确定的。
-
5-资源分配: Spark 根据集群的资源情况,将任务分配给可用的 Executor,以便在集群中并行执行。
-
6-DAG 调度: Spark 根据阶段之间的依赖关系,按照拓扑顺序调度阶段的执行。每个阶段的任务会在 Executor 上启动,并且会根据需要进行数据的 shuffle 操作。
-
7-任务执行: Executor 在分配到的资源上并行执行任务。每个任务会根据用户编写的转换操作对 RDD 进行处理,并将结果传递给下一个阶段的任务。
-
8-结果输出: 最后一个阶段完成后,Spark 将最终的结果返回给用户代码,或者将结果写入外部存储系统,如 HDFS、数据库等。