Spark重温笔记(三):Spark在企业中为什么能这么强?——持久化、Checkpoint机制、共享变量与内核调度原理全攻略“

Spark学习笔记

前言:今天是温习 Spark 的第 3 天啦!主要梳理了 Spark 核心数据结构:RDD(弹性分布式数据集),包括RDD持久化,checkpoint机制,spark两种共享变量以及spark内核调度原理,希望对大家有帮助!

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"


文章目录

  • Spark学习笔记
      • 5. RDD持久化[掌握]
        • (1)为什么使用缓存
        • (2)如何进行缓存
        • (3)何时缓存数据
      • 6. Checkpoint机制[掌握]
        • (1) 为什么要检查点
        • (2)如何进行检查点
        • (3)检查点机制有哪些作用
        • (4) 如何实现spark的容错
        • (5)持久化和检查点的区别
        • (6)持久化和检查点并存
      • 7.两种共享变量[掌握]
        • (1)累加器
        • (2)广播变量
      • 8. Spark的内核调度
        • (1) RDD依赖
        • (2) DAG
        • (3) Job的调度流程

(本节的所有数据集放在我的资源下载区哦,感兴趣的小伙伴可以自行下载:最全面的SparkCore系列案例数据集

5. RDD持久化[掌握]

(1)为什么使用缓存
  • 缓存可以加速计算,比如在wordcount操作的时候对reduceByKey算子进行cache的缓存操作,这时候后续的操作直接基于缓存后续的计算
  • 缓存可以解决容错问题,因为RDD是基于依赖链的Dependency
  • 使用经验:一次缓存可以多次使用
(2)如何进行缓存
  • spark中提供cache方法
  • spark中提供persist方法
_15_acheOrpersist.py# -*- coding: utf-8 -*-
# Program function:演示join操作
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
import time
if __name__ == '__main__':print('PySpark join Function Program')# TODO:1、创建应用程序入口SparkContext实例对象conf = SparkConf().setAppName("miniProject").setMaster("local[*]")sc = SparkContext.getOrCreate(conf)# TODO: 2、从本地文件系统创建RDD数据集x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])y = sc.parallelize([(1001, "sales"), (1002, "tech")])# TODO:3、使用join完成联合操作join_result_rdd = x.join(y)print(join_result_rdd.collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]print(x.leftOuterJoin(y).collect())print(x.rightOuterJoin(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]# 缓存--基于内存缓存-cache底层调用的是self.persist(StorageLevel.MEMORY_ONLY)join_result_rdd.cache()# join_result_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)# 如果执行了缓存的操作,需要使用action算子触发,在4040页面上看到绿颜色标识join_result_rdd.collect()# 如果后续执行任何的操作会直接基于上述缓存的数据执行,比如count==============================> 4040端口出现绿点print(join_result_rdd.count())print(join_result_rdd.first())time.sleep(600)sc.stop()
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech')), (1003, ('wangwu', None)), (1004, ('zhangliu', None))]
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
2
(1001, ('zhangsan', 'sales'))
(3)何时缓存数据
  • rdd来之不易
  • 经过很长依赖链计算
  • 经过shuffle
  • rdd被使用多次
  • 缓存cache或persist

问题1:缓存将数据保存在内存或磁盘中,内存或磁盘都属于易失介质

  • 内存在重启之后没有数据了,磁盘也会数据丢失

  • 注意:缓存会将依赖链进行保存的

  • 问题2:如何解决基于cache或persist的存储在易失介质的问题?

  • 引入checkpoint检查点机制

  • 将元数据和数据统统存储在HDFS的非易失介质,HDFS有副本机制

  • checkpoint切断依赖链,直接基于保存在hdfs的中元数据和数据进行后续计算

  • 什么是元数据?

  • 管理数据的数据

  • 比如,数据大小,位置等都是元数据

6. Checkpoint机制[掌握]

(1) 为什么要检查点

为什么有检查点机制?

  • 1-因为cache或perisist将数据缓存在内存或磁盘中,会有丢失数据情况,引入检查点机制,可以将数据斩断依赖之后存储到HDFS的非易失介质中,解决Spark的容错问题
  • 2-Spark的容错问题?
    • 有一些rdd出错怎么办?可以借助于cache或Persist,或checkpoint
(2)如何进行检查点

如何使用检查点机制?

  • 1-指定数据保存在哪里?:sc.setCheckpointDir(“hdfs://node1:9820/chehckpoint/”)
  • 2-对谁缓存?:算子
  • 3-rdd1.checkpoint() :斩断依赖关系进行检查点
  • 4-检查点机制触发方式:action算子可以触发
  • 5-后续的计算过程:Spark机制直接从checkpoint中读取数据
(3)检查点机制有哪些作用

检查点机制那些作用?

  • 将数据和元数据保存在HDFS中
  • 后续执行rdd的计算直接基于checkpoint的rdd
  • 起到了容错的作用
(4) 如何实现spark的容错

面试题:如何实现Spark的容错?

  • 1-首先会查看Spark是否对数据缓存,cache或perisist,直接从缓存中提取数据
  • 2-否则查看checkpoint是否保存数据
  • 3-否则根据依赖关系重建RDD
(5)持久化和检查点的区别
  • 1-存储位置:缓存放在内存或本地磁盘,检查点机制在hdfs
  • 2-生命周期:缓存通过LRU或unpersist释放,检查点机制会根据文件一直存在
  • 3-依赖关系:缓存保存依赖关系,检查点斩断依赖关系链
_16_checkpoint.py
# -*- coding: utf-8 -*-
# Program function:checkpoint RDDfrom pyspark import SparkContext, SparkConf
import os
import timefrom pyspark.storagelevel import StorageLevelos.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python3"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONif __name__ == '__main__':print('PySpark checkpoint Program')# TODO:1、创建应用程序入口SparkContext实例对象conf = SparkConf().setAppName("miniProject").setMaster("local[*]")sc = SparkContext.getOrCreate(conf)# TODO: 2、RDD的checkpointsc.setCheckpointDir("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/checkpoint1")# TODO: 3、调用集合RDD中函数处理分析数据fileRDD = sc.textFile("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/words.txt")# TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发fileRDD.checkpoint()print(fileRDD.count())# TODO: 再次执行count函数, 此时从checkpoint读取数据print(fileRDD.count())time.sleep(100)print('停止 PySpark SparkSession 对象')# 关闭SparkContextsc.stop()
2
2
停止 PySpark SparkSession 对象  
(6)持久化和检查点并存

先cache 再 checkpoint测试

  • 1-读取数据文件
  • 2-设置检查点目录
  • 3-rdd.checkpoint() 和rdd.cache()
  • 4-执行action操作,根据spark容错选择首先从cache中读取数据,时间更少,速度更快
  • 5-如果对rdd实现unpersist
  • 6-从checkpoint中读取rdd的数据
  • 7-通过action可以查看时间
_17_acheCheckpoint.py
# -*- coding: utf-8 -*-
# Program function:cache&checkpoint RDDfrom pyspark import SparkContext, SparkConf
import os
import timefrom pyspark.storagelevel import StorageLevelos.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python3"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONif __name__ == '__main__':print('PySpark cache&checkpoint Program')# TODO:1、创建应用程序入口SparkContext实例对象conf = SparkConf().setAppName("miniProject").setMaster("local[*]")sc = SparkContext.getOrCreate(conf)# TODO: 2、RDD的checkpointsc.setCheckpointDir("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/checkpoint1")# TODO: 3、调用集合RDD中函数处理分析数据fileRDD = sc.textFile("/export/data/spark_practice/PySpark-SparkCore_3.1.2/data/words.txt")# TODO: 调用checkpoint和cache函数,将RDD进行容错,需要RDD中Action函数触发print("=======1-同时做cache和Perisist========")fileRDD.cache()fileRDD.checkpoint()print("=======2-启动Job1跑正常任务,启动Job2就会先从Cache读取数据,Web页面可以看到ProcessLocal========")fileRDD.count()# TODO: 再次执行count函数, 此时从checkpoint读取数据fileRDD.count()print("=======3-启动一个Job发现查询数据从checkpoint的hdfs中查找========")# TODO:释放cache之后如果在查询数据从哪里读取? 答案是checkpoint的hdfs的数据中。fileRDD.unpersist(True)fileRDD.count()time.sleep(100)print('停止 PySpark SparkSession 对象')# 关闭SparkContextsc.stop()

7.两种共享变量[掌握]

(1)累加器
  • 1-原理
    • 在Driver端和exeutor端可以共享Executor执行计算的结果
  • 2-不使用累加器
    • python本地集合可以直接得到结果
    • 但是在分布式集合中得不到累加的
  • 3-使用累加器
    • acc=sc.accumulate(10),10是初始值
    • acc.add(num)
    • print(acc.value)通过value获取累加器的值
(2)广播变量
  • 1-广播变量不是在每个Task拥有一份变量,而是每个节点的executor一份副本
  • 2-广播变量通过本地的executor从blockmanager中过去driver上面变量的副本(计算资源+计算程序)

8. Spark的内核调度

(1) RDD依赖
  • RDD依赖
  • 为什么设计依赖?
    • 1-为了实现Spark的容错,rdd1-rdd2-rdd3-rdd4
    • 2-并行计算,划分依赖、
  • 为什么划分宽窄依赖?
    • 为了加速并行计算
    • 窄依赖可以并行计算,如果是宽依赖无法并行计算
  • 依赖的划分
    • 窄依赖:*父 RDD 与子 RDD 间的分区是一对一的*
    • 宽依赖:划分Stage
      • *父 RDD 中的分区可能会被多个子 RDD 分区使用*
    • 如何区分宽窄依赖?
      • 比如map。filter,flatMap 窄依赖,无需进行shuffle
      • 比如reduceByKey(合并多个窄依赖),groupByKey,宽依赖(shuffle)
      • 不能说:一个子RDD依赖于多个父rdd,该种情况无法判断
(2) DAG

什么是DAG?

  • 有向无环图
  • DAG如何划分Stage?
    • 一个Dag就是一个Job,一个Dag是由Action算子进行划分
    • 一个Job下面有很多Stage,根据宽依赖Shuffle依赖划分Stage
  • 一个Spark应用程序包括Job、Stage及Task:
    • 第一:Job是以Action方法为界,遇到一个Action方法则触发一个Job;一个Job就是dag
    • 第二:Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分;
    • 第三:Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。
(3) Job的调度流程
  • 1-用户代码编写: 用户根据需求编写 Spark 应用程序,包括定义 RDD、转换操作和行动操作等。

  • 2-DAG 构建: Spark 将用户编写的代码进行解析,并构建出一个有向无环图(DAG),该图表示了任务之间的依赖关系。DAG 由一系列的阶段(Stage)组成,每个阶段包含一组可以并行执行的任务。

  • 3-Stage 划分: 根据任务之间的依赖关系,Spark 将 DAG 进一步划分为不同的阶段。一个阶段包含一组可以在无需 shuffle 的情况下并行执行的任务。

  • 4-Task 划分: 对于每个阶段,Spark 将其划分为一系列的任务(Task),每个任务对应于一个 RDD partition 的处理。任务的划分是根据数据的分区方式和计算的转换操作来确定的。

  • 5-资源分配: Spark 根据集群的资源情况,将任务分配给可用的 Executor,以便在集群中并行执行。

  • 6-DAG 调度: Spark 根据阶段之间的依赖关系,按照拓扑顺序调度阶段的执行。每个阶段的任务会在 Executor 上启动,并且会根据需要进行数据的 shuffle 操作。

  • 7-任务执行: Executor 在分配到的资源上并行执行任务。每个任务会根据用户编写的转换操作对 RDD 进行处理,并将结果传递给下一个阶段的任务。

  • 8-结果输出: 最后一个阶段完成后,Spark 将最终的结果返回给用户代码,或者将结果写入外部存储系统,如 HDFS、数据库等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/767524.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

牛客题霸-SQL篇(刷题记录二)

本文基于前段时间学习总结的 MySQL 相关的查询语法,在牛客网找了相应的 MySQL 题目进行练习,以便加强对于 MySQL 查询语法的理解和应用。 由于涉及到的数据库表较多,因此本文不再展示,只提供 MySQL 代码与示例输出。 以下内容是…

HarmonyOS应用开发实战 - Api9 拍照、拍视频、选择图片、选择视频、选择文件工具类

鸿蒙开发过程中,经常会进行系统调用,拍照、拍视频、选择图库图片、选择图库视频、选择文件。今天就给大家分享一个工具类。 1.话不多说,先展示样式 2.设计思路 根据官方提供的指南开发工具类,基础的拍照、拍视频、图库选照片、选…

使用Python进行自动化测试Selenium与PyTest的结合【第150篇—自动化测试】

👽发现宝藏 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。【点击进入巨牛的人工智能学习网站】。 使用Python进行自动化测试:Selenium与PyTest的结合 在软件开发中,自…

线程与进程的爱恨情仇???

线程与进程的爱恨情仇??? 一:有了进程,为什么还需要线程???二:线程三:线程和进程的区别与联系四:创建线程or创建进程 一:有了进程,为什么还需要线程&#xff…

css盒子模型及浮动

内容(content)、内边距(padding)、边框(border)、外边距(margin) oder:1px solid red; 边框的粗细 边框的样式(虚线还是实线) 边框的颜色 border中也有一些属性可以直接调某一个方向上的边框的粗细,样式,颜色 border-left\bord…

2024/3/24 LED点阵屏

显示原理: 类似矩阵键盘,逐行or逐列扫描 74HC595是串行 寄存器 感觉就是三转八寄存器 并行:同时输出;串行:一位一位输出 先配置74HC595,重新进行位声明 sbit RCKP3^5; //RCLK sbit SCKP3^6; …

芒果YOLOv8改进116:即插即用:集中特征金字塔Centralized Feature Pyramid 高效涨点改进

💡🚀🚀🚀本博客 改进源代码改进 适用于 YOLOv8 按步骤操作运行改进后的代码即可 该专栏完整目录链接: 芒果YOLOv8深度改进教程 🚀🚀🚀 文章目录 一、Centralized Feature Pyramid论文理论部分 + YOLOv8代码改进论文创新论文贡献论文网络部分实验对比2. YOLOv8 …

淘宝|天猫|京东|1688主流电商平台的实时数据返回接口|附Python实例

导读:随着淘宝/天猫直通车功能升级,很多功能越来越白盒化,越来越简化,更方便用户的操作,只需一键即可看出淘宝/天猫直通车存在的问题。淘宝/天猫直通车千人千面后有了实时数据工具,下面通过一个案例告诉大家…

23. UE5 RPG制作属性面板(一)

随着角色的属性越来越多,我们不能每次都进行showdebug abilitysystem进行查看,而且玩家也需要查看角色属性,所以需要一个查看玩家角色属性的面板。 在前面,我们创建三种类型的属性 Primary Attributes(主要属性&#…

Spring-Cloud原理详解

Spring Cloud 是一套基于Spring Boot实现的云应用开发工具集,它为快速构建分布式系统提供了全面的解决方案,大大简化了在分布式系统中常见的诸如服务注册与发现、配置中心、熔断器、服务路由、负载均衡、全链路监控、服务追踪等问题的解决过程。下面是对…

Java面试题:生产者消费者问题与工厂方法模式;线程池与观察者模式;ThreadLocal 与单例模式

Java 并发编程与设计模式综合面试题解析 在软件开发中,Java 并发编程和设计模式是两个非常关键的领域,它们可以提高程序的性能、可维护性和灵活性。本文将详细解析三道综合性的 Java 面试题,这些题目将涵盖 Java 设计模式、内存管理、多线程…

软件测试面试问题总结—CTO面试

有一家公司技术面一面和二面都过了,在CTO三面的时候折掉了,反思了下,确实CTO面试的时候问的问题比较宏观,我的回答都比较浅,现在再总结下答案。 1、除了自动化测试,怎么提高测试效率? &#xff…

常见的OOM 问题的 6 种场景

今天跟大家一起聊聊线上服务出现 OOM 问题的 6 种场景,希望对你会有所帮助。 一、堆内存 OOM 堆内存 OOM 是最常见的 OOM 了。 出现堆内存 OOM 问题的异常信息如下: java.lang.OutOfMemoryError: Java heap space此 OOM 是由于 JVM 中 heap 的最大值,已经不能满足需求了…

资深测试总结,性能测试-常见并发问题+解决总结(最全)

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 1、并发测试的定义…

Spring线程池ThreadPoolTaskExecutor的使用

Spring线程池ThreadPoolTaskExecutor的使用

指针和引用的权限

权限的放大,缩小,平移 适用于指针和引用 1.权限的放大(不允许) const int a 5;//const,对变量声明只读特性,保护变量值以防被修改 int* y &a;//把只读的赋给可改的,会报错 2.权限平移(允许&…

lavarel的php程序是顺序执行,用pdo mysql连接池好像没有什么用啊。没有办法挂起等待啊,为什么要用连接池,应用场景是什么

Laravel 的 PHP 程序确实是基于请求-响应模式,每个请求都是顺序执行的。这意味着一旦一个请求开始处理,它会按照代码的顺序执行,直到完成并返回响应。因此,从表面上看,使用 PDO 或 MySQL 连接池在 Laravel 中可能看起来…

vscode用SSH远程开发c语言

vscode配置远程 这里我使用虚拟机进行展示,首先需要你的虚拟机安装好ssh 没安装好就执行下面的命令安装并开启服务 sudo apt-get install ssh sudo service ssh start ps -e | grep sshvscode安装 remote-ssh扩展 点击左下角的远程连接,我这里已经连接…

【Hive】HIVE运行卡死没反应

Hive运行卡死 再次强调 hive:小兄弟,没想到吧,咱可不是随便的人。😄 那么,这次又遇见了hadoop问题,问题描述是这样的。 hive> insert into test values(1, nucty, 男); Query ID atguigu_202403241754…

图论基础|695. 岛屿的最大面积、1020. 飞地的数量、130. 被围绕的区域

695. 岛屿的最大面积 力扣题目链接(opens new window) 给你一个大小为 m x n 的二进制矩阵 grid 。 岛屿 是由一些相邻的 1 (代表土地) 构成的组合,这里的「相邻」要求两个 1 必须在 水平或者竖直的四个方向上 相邻。你可以假设 grid 的四个边缘都被 0&#xff0…