《深入理解Spark RDD缓存机制》(第4天)

文章目录

  • 前言
  • 一、小试牛刀:解剖RDD缓存机制?
    • 1. 什么是Spark RDD缓存策略
      • 1.1 为什幺RDD要做缓存
      • 1.2 缓存相关API:
      • 1.3 缓存案例解析:
      • 1.4 图解缓存效果:
    • 2. 什么是checkpoint缓存
      • 2.1 为什么要做checkpoint缓存
      • 2.2 checkpoint相关API:
      • 2.3 checkpoint案例解析
    • 3. 缓存和checkpoint的区别
      • 3.1 案例解析
  • 二、打铁趁热:面试题思考
    • 1. cache缓存和checkpoint检查点的区别
    • 2. 既然持久化的方案有两种,那么在生产环境中应该使用哪种方案呢?
  • 总结


前言

Apache Spark是一个大规模数据处理框架,它提供了高效、快速和通用的数据处理能力。在Spark中,弹性分布式数据集(RDD, Resilient Distributed Dataset)是一个核心概念,而RDD的缓存机制则是确保Spark性能高效的关键因素之一。本文将通过’案例’,'图文’等解析方式深入探讨Spark RDD的缓存机制。


一、小试牛刀:解剖RDD缓存机制?

1. 什么是Spark RDD缓存策略

  • 在Spark中,RDD的缓存机制允许我们将计算的结果存储在内存中,从而避免在后续的计算中重复计算相同的RDD。这对于迭代计算、机器学习等场景尤为重要,可以显著提高计算效率。
    在这里插入图片描述

1.1 为什幺RDD要做缓存

  • 当RDD被重复使用,或者计算该RDD比较容易出错,而且需要消耗比较多的资源和时间的时候,我们就可以将该RDD缓存起来。
  • 主要作用: 提升Spark程序的计算效率
  • 注意事项: RDD的缓存可以存储在内存或者是磁盘上,甚至可以存储在Executor进程的堆外内存中。主要是放在内存中,因此缓存的数据是不太稳定可靠。
    由于是临时存储,可能会存在丢失,所以缓存操作,并不会将RDD之间的依赖关系给截断掉(丢失掉),因为当缓存失效后,可以全部重新计算且缓存的API都是Lazy惰性的,如果需要触发缓存操作,推荐调用“count算子” ,因为运行效率高

1.2 缓存相关API:

设置缓存的API: rdd.cache(): 将RDD的数据缓存储内存中rdd.persist(缓存的级别/位置): 将RDD的数据存储在指定位置手动清理缓存API:rdd.unpersist()
默认情况下,当整个Spark应用程序执行完成后,缓存数据会自动失效,会被自动删除缓存的级别/位置:DISK_ONLY: 只存储在磁盘DISK_ONLY_2: 只存储在磁盘,并且有2个副本DISK_ONLY_3: 只存储在磁盘,并且有3个副本MEMORY_ONLY: 只存储在内存中MEMORY_ONLY_2: 只存储在内存中,并且有2个副本MEMORY_AND_DISK: 存储在内存和磁盘中,先放在内存,再放在磁盘MEMORY_AND_DISK_2: 存储在内存和磁盘中,先放在内存,再放在磁盘,并且有2个副本OFF_HEAP: Executor进程的堆外内存工作中最常用的是: MEMORY_AND_DISK和MEMORY_AND_DISK_2

1.3 缓存案例解析:

# 导包
import os
import timeimport jieba
from pyspark import SparkConf, SparkContext, StorageLevel# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'def get_topN_keyword(etlRDD, n):r1 = etlRDD.flatMap(lambda line_list: list(jieba.cut(line_list[2]))) \.filter(lambda word: word not in ('.', '+', '的')) \.map(lambda word: (word, 1)) \.reduceByKey(lambda agg, curr: agg + curr) \.top(n, lambda t: t[1])print(r1)def get_topN_search(etlRDD, n):r2 = etlRDD.map(lambda line_list: ((line_list[1], line_list[2]), 1)) \.reduceByKey(lambda agg, curr: agg + curr) \.top(n, lambda t: t[1])print(r2)# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象conf = SparkConf().setAppName('pyspark_demo').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.数据输入textRDD = sc.textFile('file:///export/data/spark_project/spark_core/data/SogouQ.sample')# 3.数据处理(切分,转换,分组聚合)etlRDD = textRDD.filter(lambda line: line.strip() != '').map(lambda line: line.split()).filter(lambda line_list: len(line_list) >= 6)# 去除搜索内容两端的 [ ]etlRDD = etlRDD.map(lambda line_list:[line_list[0],line_list[1],line_list[2][1:-1],line_list[3],line_list[4],line_list[5]])# 不加缓存# etlRDD.count()# 7.TODO: cache添加缓存,注意: 只能把缓存添加内存!相对用的少# etlRDD.cache().count()# 8.TODO: persist添加缓存,注意: 可以修改缓存级别etlRDD.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_2).count()# 4.数据输出# 需求一: 统计每个 关键词 出现了多少次, 最终展示top10数据 注意:'.', '+', '的'  都需要过滤# 伪SQL:select 关键词 ,count(*)  from  搜狗表 group by 关键词get_topN_keyword(etlRDD, 10)# 8.TODO: 如果不想用缓存,可以使用unpersist释放缓存,给哪个rdd加的,就给哪个释放etlRDD.unpersist()# 需求二: 统计每个用户 每个 搜索内容 点击的次数, 最终展示top5数据# 伪SQL:select 用户,搜索内容,count(*)  from  搜狗表 group by 用户,搜索内容get_topN_search(etlRDD, 5)# 6.为了方便查看页面,可以让程序多睡会儿time.sleep(500)# 5.关闭资源sc.stop()

1.4 图解缓存效果:

  • 无缓存的DAG流程图显示:
    在这里插入图片描述

  • 有缓存的DAG流程图显示:
    在这里插入图片描述

  • cache基于内存
    在这里插入图片描述

  • persist可以修改缓存级别: 同时基于内存和磁盘
    在这里插入图片描述

2. 什么是checkpoint缓存

Checkpoint缓存,或称Checkpoint机制,是Apache Spark中用于确保数据一致性和容错性的一种技术。在不同的系统中,其实现方式和用途略有不同,但核心思想是一致的:确保关键数据或中间计算结果被安全地存储,以便在系统崩溃或需要恢复时能够重新使用。

2.1 为什么要做checkpoint缓存

  • RDD缓存主要是将数据存储在内存中,是临时存储,不太稳定,它主要是用来提升程序运行效率的。RDD的checkpoint(检查点)主要是将数据存储在HDFS上,是持久化存储。而HDFS存储数据有3副本的机制,让数据更加安全可靠。

  • checkpoint认为使用磁盘或者HDFS存储数据之后,数据非常的安全可靠,因此checkpoint会将RDD间的依赖关系给删除/丢弃掉。因此如果checkpoint的数据真的出现了问题,是无法在从头开始计算。

  • checkpoint主要作用: 提高程序的容错性
    注意事项: checkpoint可以将数据存储在磁盘或者HDFS上,主要是将数据存储在HDFS上。

2.2 checkpoint相关API:

	sc.setCheckpointDir(存储路径): 设置checkpoint数据存放路径rdd.checkpoint(): 对指定RDD启用checkpointrdd.count(): 触发checkpoint

2.3 checkpoint案例解析

# 导包
import os
import timeimport jieba
from pyspark import SparkConf, SparkContext, StorageLevel# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'def get_topN_keyword(etlRDD, n):r1 = etlRDD.flatMap(lambda line_list: list(jieba.cut(line_list[2]))) \.filter(lambda word: word not in ('.', '+', '的')) \.map(lambda word: (word, 1)) \.reduceByKey(lambda agg, curr: agg + curr) \.top(n, lambda t: t[1])print(r1)def get_topN_search(etlRDD, n):r2 = etlRDD.map(lambda line_list: ((line_list[1], line_list[2]), 1)) \.reduceByKey(lambda agg, curr: agg + curr) \.top(n, lambda t: t[1])print(r2)# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象conf = SparkConf().setAppName('pyspark_demo').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.数据输入textRDD = sc.textFile('file:///export/data/spark_project/spark_core/data/SogouQ.sample')# 3.数据处理(切分,转换,分组聚合)etlRDD = textRDD.filter(lambda line: line.strip() != '').map(lambda line: line.split()).filter(lambda line_list: len(line_list) >= 6)# 去除搜索内容两端的 [ ]etlRDD = etlRDD.map(lambda line_list:[line_list[0],line_list[1],line_list[2][1:-1],line_list[3],line_list[4],line_list[5]])# 不加缓存# etlRDD.count()# 7.TODO: 先拿着sc对象设置检查点保存位置, 建议用hdfs,这样能利用hdfs的高可靠高可用性sc.setCheckpointDir('hdfs://node1:8020/ckpt')# 8.TODO: 添加检查点checkpointetlRDD.checkpoint()etlRDD.count()# 4.数据输出# 需求一: 统计每个 关键词 出现了多少次, 最终展示top10数据 注意:'.', '+', '的'  都需要过滤# 伪SQL:select 关键词 ,count(*)  from  搜狗表 group by 关键词get_topN_keyword(etlRDD, 10)# 需求二: 统计每个用户 每个 搜索内容 点击的次数, 最终展示top5数据# 伪SQL:select 用户,搜索内容,count(*)  from  搜狗表 group by 用户,搜索内容get_topN_search(etlRDD, 5)# 6.为了方便查看页面,可以让程序多睡会儿time.sleep(500)# 5.关闭资源sc.stop()
  • 没有设置检查点正常的DAG执行流图:
    在这里插入图片描述
  • 设置检查点后:
    在这里插入图片描述

3. 缓存和checkpoint的区别

  • 面试题:Spark提供了两种持久化方案。一种为缓存操作,一种为checkpoint方案。请问有什么区别呢?
1- 数据存储位置不同缓存: 存储在内存或者磁盘 或者 堆外内存中checkpoint检查点: 可以将数据存储在磁盘或者HDFS上, 在集群模式下, 仅能保存到HDFS上2- 数据生命周期:缓存: 当程序执行完成后, 或者手动调用unpersist 缓存都会被删除checkpoint检查点: 即使程序退出后, checkpoint检查点的数据依然是存在的, 不会删除, 需要手动删除3- 血缘关系:缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能是失效, 当失效后, 需要重新回溯计算操作checkpoint检查点: 会截断掉依赖关系, 因为checkpoint将数据保存到更加安全可靠的位置, 不会发生数据丢失的问题, 当执行失败的时候, 也不需要重新回溯执行4- 主要作用不同:缓存: 提高Spark程序的运行效率和容错性checkpoint检查点: 提高Spark程序的容错性和高可用,高可靠性
  • 思考:既然持久化的方案有两种,那么在生产环境中应该使用哪种方案呢?
在同一个项目中,推荐缓存和checkpoint(检查点)同时配合使用。使用顺序如下: 在代码中设置缓存和checkpoint检查点,然后再一同使用Action算子触发!!! 
使用count算子触发实际过程如下: 程序会优先从缓存中读取数据,如果发现缓存中没有数据。
再从checkpoint中读取数据,并且接着将读取到的数据重新在内存中放置一份,
后续还是优先从缓存中读取

在这里插入图片描述

3.1 案例解析

# 导包
import os
import timeimport jieba
from pyspark import SparkConf, SparkContext, StorageLevel# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'def get_topN_keyword(etlRDD, n):r1 = etlRDD.flatMap(lambda line_list: list(jieba.cut(line_list[2]))) \.filter(lambda word: word not in ('.', '+', '的')) \.map(lambda word: (word, 1)) \.reduceByKey(lambda agg, curr: agg + curr) \.top(n, lambda t: t[1])print(r1)def get_topN_search(etlRDD, n):r2 = etlRDD.map(lambda line_list: ((line_list[1], line_list[2]), 1)) \.reduceByKey(lambda agg, curr: agg + curr) \.top(n, lambda t: t[1])print(r2)# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象conf = SparkConf().setAppName('pyspark_demo').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.数据输入textRDD = sc.textFile('file:///export/data/spark_project/spark_core/data/SogouQ.sample')# 3.数据处理(切分,转换,分组聚合)etlRDD = textRDD.filter(lambda line: line.strip() != '').map(lambda line: line.split()).filter(lambda line_list: len(line_list) >= 6)# 去除搜索内容两端的 [ ]etlRDD = etlRDD.map(lambda line_list:[line_list[0],line_list[1],line_list[2][1:-1],line_list[3],line_list[4],line_list[5]])# 不加缓存# etlRDD.count()# 7.TODO: persist添加缓存,注意: 可以修改缓存级别etlRDD.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_2)# 8.TODO: 先拿着sc对象设置检查点保存位置, 建议用hdfs,这样能利用hdfs的高可靠高可用性sc.setCheckpointDir('hdfs://node1:8020/ckpt')etlRDD.checkpoint()etlRDD.count()# TODO:触发缓存和检查点etlRDD.count()# 4.数据输出# 需求一: 统计每个 关键词 出现了多少次, 最终展示top10数据 注意:'.', '+', '的'  都需要过滤# 伪SQL:select 关键词 ,count(*)  from  搜狗表 group by 关键词get_topN_keyword(etlRDD, 10)# 7.TODO: 如果不想用缓存,可以使用unpersist释放缓存,给哪个rdd加的,就给哪个释放# etlRDD.unpersist()# 需求二: 统计每个用户 每个 搜索内容 点击的次数, 最终展示top5数据# 伪SQL:select 用户,搜索内容,count(*)  from  搜狗表 group by 用户,搜索内容get_topN_search(etlRDD, 5)# 6.为了方便查看页面,可以让程序多睡会儿time.sleep(500)# 5.关闭资源sc.stop()
  • DAG有向无环图:
    在这里插入图片描述

二、打铁趁热:面试题思考

1. cache缓存和checkpoint检查点的区别

1- 数据存储位置不同缓存: 存储在内存或者磁盘 或者 堆外内存中checkpoint检查点: 可以将数据存储在磁盘或者HDFS上, 在集群模式下, 仅能保存到HDFS上2- 数据生命周期:缓存: 当程序执行完成后, 或者手动调用unpersist 缓存都会被删除checkpoint检查点: 即使程序退出后, checkpoint检查点的数据依然是存在的, 不会删除, 需要手动删除3- 血缘关系:缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能是失效, 当失效后, 需要重新回溯计算操作checkpoint检查点: 会截断掉依赖关系, 因为checkpoint将数据保存到更加安全可靠的位置, 不会发生数据丢失的问题, 当执行失败的时候, 也不需要重新回溯执行4- 主要作用不同:缓存: 提高Spark程序的运行效率和容错性checkpoint检查点: 提高Spark程序的容错性和高可用,高可靠性

2. 既然持久化的方案有两种,那么在生产环境中应该使用哪种方案呢?

在同一个项目中,推荐缓存和checkpoint(检查点)同时配合使用。使用顺序如下: 在代码中设置缓存和checkpoint检查点,然后再一同使用Action算子触发!!! 
使用count算子触发实际过程如下: 程序会优先从缓存中读取数据,如果发现缓存中没有数据。
再从checkpoint中读取数据,并且接着将读取到的数据重新在内存中放置一份,
后续还是优先从缓存中读取

总结

本文主要通过案例和图文的方式详解了Spark RDD 数据持久化的2种方案,重点思考项目中该采取什么方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/30941.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

七彩影视双端新版本源码 支持PC+WAP+APP三端 对接苹果CMS后台

下载地址:七彩影视双端新版本源码 支持PCWAPAPP三端 对接苹果CMS后台 适合做影视类,高端大气

强化学习——基本概念

何为强化学习 机器学习的一大分支 强化学习(Reinforcement Learning)是机器学习的一种,它通过与环境不断地交互,借助环境的反馈来调整自己的行为,使得累计回报最大。强化学习要解决的是决策问题——求取当前状态下最…

群晖NAS本地部署并运行一个基于大语言模型Llama2的个人本地聊天机器人

前言 本文主要分享如何在群晖 NAS 本地部署并运行一个基于大语言模型 Llama 2 的个人本地聊天机器人并结合内网穿透工具发布到公网远程访问。本地部署对设备配置要求高一些,如果想要拥有比较好的体验,可以使用高配置的服务器设备. 目前大部分大语言模型的产品都是基于网络线上…

【ai】tx2-nx:安装深度学习环境及4.6对应pytorch

参考:https://www.waveshare.net/wiki/Jetson_TX2_NX#AI.E5.85.A5.E9.97.A8 英伟达2021年发布的的tritionserver 2.17 版本中,backend 有tensorflow1 和 onnxruntime ,他们都是做什么用的,作为backend 对于 triton 推理server意义是什么,是否应该有pytorch? Triton Infer…

PS给logo加白色描边

步骤1:打开你的Logo文件 步骤2:选择Logo层 在“图层”面板中找到你的Logo所在的图层。如果你的Logo是在背景图层上,可以将它转换为普通图层(右键点击背景图层,选择“从背景图层转换”)(此处也…

五大数据防泄漏系统排名|高效实用的防泄漏软件有哪些

在数字化时代,数据泄露已成为企业面临的重要安全挑战之一。为了有效应对这一挑战,企业需要借助先进的数据泄露防护系统来保护其敏感信息免受非法访问、使用和泄露。以下是五大备受推崇的数据泄露防护系统,它们各具特色,功能强大&a…

查看nginx安装/配置路径,一个服务器启动两个nginx

查看nginx安装/配置路径 查看nginx的pid: ps -ef | grep nginx查看pid对应服务的启动路径 ll /proc/2320/exe使用检查配置文件命令,查看配置文件位置 /usr/local/nginx/sbin/nginx -t一个服务启动两个nginx 拷贝一份程序,cpbin是我自己创…

阿里云服务器提醒漏洞要不要打补丁?

我们自己用的电脑一旦发现漏洞,往往是第一时间进行打补丁重启等等,但是作为服务器而言,往往没有这个习惯,为什么?因为害怕服务器打补丁以后,重启后出现打不开的情况,毕竟稳定的运行似乎在这种情…

java.io.eofexception:ssl peer shut down incorrectly

可能是因为 1)https设置 2)超时设置 FeignConfig.java package zwf.service;import java.io.IOException; import java.io.InputStream; import java.security.KeyStore;import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory;import org.apac…

北斗短报文终端在应急消防通信场景中的应用

在应对自然灾害和紧急情况时,北斗三号短报文终端以其全球覆盖、实时通信和精准定位的能力,成为应急消防通信的得力助手。它不仅能够在地面通信中断的极端条件下保障信息传递的畅通,还能提供精准的位置信息,为救援行动提供有力支持…

Adams 插件Plugin二次开发教程

通过cmd或python开发的Adams程序,可以通过执行cmd(python)命令的方式直接运行,也可以根据cmd教程中提供的创建菜单和对话框的方式调用这些程序,当然更合适的方式是通过插件的方式对二次开发的程序进行管理,…

Java程序员Python一小时速成

背景 由于最近要开发一些AI LLM(Large Language Model 大语言模型)应用程序,然后又想使用LangChain(LangChain 是一个用于构建和操作大语言模型(LLMs)的框架,旨在帮助开发者更方便地集成和使用…

Shell脚本、相关命令;重定向、管道符、变量相关命令讲解

目录 Shell脚本 概念 执行命令流程的交互区别 交互式 非交互式 Shell脚本应用场景 Shell的作用 Shell的作用 —— 命令解释器,“翻译官” 列出系统中全部解释器 实验 脚本的基本书写格式和执行命令 在子bash下执行脚本 指定解释器的方式执行脚本 指定…

代码讲解——ssm+jsp+maven项目目录结构说明

1 applicationContext.xml 应用上下文配置 2 db.properties 数据库配置 3 log4j.properties日志配置 4 mybatis-config.xml mybatis配置 5 springmvc.xml springmvc配置

万元主力机型该选什么固态硬盘,佰维NV7200、NV3500 的实用一定要让你知道

固态硬盘:变革存储技术,探索无尽可能 今年的固态市场价格一直是稳中上涨。 固态的价格上涨有技术上的因素,也有人工成本上的因素。好在国产固态技术的崛起,在固态价格上涨之下,依旧能选购到性价比和性能出众的型号。…

Elasticsearch搜索引擎(初级篇)

1.1 初识ElasticSearch | 《ElasticSearch入门到实战》电子书 (chaosopen.cn) 目录 第一章 入门 1.1 ElasticSearch需求背景 1.2 ElasticSearch 和关系型数据库的对比 1.3 基础概念 文档和字段 索引和映射 第二章 索引操作 2.0 Mapping映射属性 2.1 创建索引 DS…

【SEMI-e ·国际半导体深圳展】| 06月26-28日唯创知音语音芯片供应商 邀您来观展

世界聚焦半导体,产业规模空前!一场高端产业研学盛会即将如约而至。 SEMI-e 第六届2024国际半导体展深圳站,2024年06月26-28日将在深圳国际会展中心(宝安)开展,展会展出面积60000平方米,汇聚全国…

鄂州职业大学2024年成人高等继续教育招生简章

鄂州职业大学,作为一所享有盛誉的高等学府,一直以来都致力于为社会培养具备专业技能和良好素养的优秀人才。在成人高等继续教育领域,该校同样表现出色,为广大渴望继续深造、提升自身能力的成年人提供了宝贵的学习机会。 随着社会…

【C语言】12.指针与数组的关系

一、数组名的理解 #include <stdio.h> int main() {int arr[10] { 1,2,3,4,5,6,7,8,9,10 };printf("&arr[0] %p\n", &arr[0]);printf("arr %p\n", arr);return 0; }通过上述代码输出结果我们发现结果相同&#xff0c;因此我们得出结论&a…

李宏毅深度学习03——神经网络训练不起来怎么办

视频链接 如果Optimization失败的时候&#xff0c;怎么把梯度下降做的更好&#xff1f; 只考虑这种情况&#xff0c;不考虑overfitting 局部最小值&#xff08;local minima&#xff09;和鞍点&#xff08;saddle point&#xff09; 为什么Optimization会失败&#xff1f; …