【Spark精讲】RDD缓存源码分析

面试题:cache后面能不能接其他算子,它是不是action操作?

能,不是action算子。

源码解析

RDD调用cache或persist之后,会指定RDD的缓存级别,但只是在成员变量中记录了RDD的存储级别,并未真正地对RDD进行缓存。只有当RDD计算的时候才会对RDD进行缓存。

以HadoopRDD为例

    override def compute(split: Partition, context: TaskContext): Iterator[U] = {val partition = split.asInstanceOf[HadoopPartition]val inputSplit = partition.inputSplit.valuef(inputSplit, firstParent[T].iterator(split, context))}

调用的iterator方法

  /*** Internal method to this RDD; will read from cache if applicable, or otherwise compute it.* This should ''not'' be called by users directly, but is available for implementors of custom* subclasses of RDD.*/final def iterator(split: Partition, context: TaskContext): Iterator[T] = {if (storageLevel != StorageLevel.NONE) {getOrCompute(split, context)} else {computeOrReadCheckpoint(split, context)}}

继续看 getOrCompute方法:这里可以看到blockId的生成规则,可以确定block和partition是一一对应的

@DeveloperApi
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {override def name: String = "rdd_" + rddId + "_" + splitIndex
}

在executor端调用SparkEnv.get.blockManager.getOrElseUpdate()方法,

  /*** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached.*/private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {val blockId = RDDBlockId(id, partition.index)var readCachedBlock = true// This method is called on executors, so we need call SparkEnv.get instead of sc.env.SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {readCachedBlock = falsecomputeOrReadCheckpoint(partition, context)}) match {case Left(blockResult) =>if (readCachedBlock) {val existingMetrics = context.taskMetrics().inputMetricsexistingMetrics.incBytesRead(blockResult.bytes)new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {override def next(): T = {existingMetrics.incRecordsRead(1)delegate.next()}}} else {new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])}case Right(iter) =>new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])}}

再看BlockManager中的getOrElseUpdate方法,用来缓存数据的

  /*** Retrieve the given block if it exists, otherwise call the provided `makeIterator` method* to compute the block, persist it, and return its values.** @return either a BlockResult if the block was successfully cached, or an iterator if the block*         could not be cached.*/def getOrElseUpdate[T](blockId: BlockId,level: StorageLevel,classTag: ClassTag[T],makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {// Attempt to read the block from local or remote storage. If it's present, then we don't need// to go through the local-get-or-put path.get[T](blockId)(classTag) match {case Some(block) =>return Left(block)case _ =>// Need to compute the block.}// Initially we hold no locks on this block.doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {case None =>// doPut() didn't hand work back to us, so the block already existed or was successfully// stored. Therefore, we now hold a read lock on the block.val blockResult = getLocalValues(blockId).getOrElse {// Since we held a read lock between the doPut() and get() calls, the block should not// have been evicted, so get() not returning the block indicates some internal error.releaseLock(blockId)throw new SparkException(s"get() failed for block $blockId even though we held a lock")}// We already hold a read lock on the block from the doPut() call and getLocalValues()// acquires the lock again, so we need to call releaseLock() here so that the net number// of lock acquisitions is 1 (since the caller will only call release() once).releaseLock(blockId)Left(blockResult)case Some(iter) =>// The put failed, likely because the data was too large to fit in memory and could not be// dropped to disk. Therefore, we need to pass the input iterator back to the caller so// that they can decide what to do with the values (e.g. process them without caching).Right(iter)}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/594172.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SQL优化:物化视图

在前面一篇内容中,我们讲解了索引的创建。索引作为数据的目录,占用独立的存储空间,可以帮助我们提高查询的速度。 除了使用索引,还有一种方法可以加速查询,尤其是当我们的查询中使用了各种聚合函数,或者进…

[情商-6]:识别职场中、情侣之间的暗捧、暗拍、暗赞、暗赏,保持良好的关系

目录 1. 发现他们的优点和长处 2. 保持积极的态度 3. 赠送小礼物 4. 提供有用的建议 5. 维护他们的荣誉 6. 询问他们的意见!!! 7. 注意细节 8. 不要过度夸张 9. 尊重上司的权威 10. 保持适当的距离 前言: 技术人员常把…

pytest conftest定义一个fixtrue获取测试环境地址

方便全局切换地址 pytest.fixture() def config():data {测试环境: {A环境: 127.0.0.1,B环境: 127.0.0.2,C环境: 127.0.0.3,D环境: 127.0.0.4},}return data.get(测试环境, {}).get(A环境)import pytestdef test_case001(config):url http://str(config):8080/api/user/logi…

(2023|AABI,多模态信息瓶颈,变分近似,视觉语言模型可解释性)通过多模态信息瓶颈归因对图像文本表示的视觉解释

Visual Explanations of Image-Text Representations via Multi-Modal Information Bottleneck Attribution 公和众和号:EDPJ(添加 VX:CV_EDPJ 或直接进 Q 交流群:922230617 获取资料) 目录 0. 摘要 3. 通过多模态…

wy的leetcode刷题记录_Day70

wy的leetcode刷题记录_Day70 声明 本文章的所有题目信息都来源于leetcode 如有侵权请联系我删掉! 时间: 前言 目录 wy的leetcode刷题记录_Day70声明前言466. 统计重复个数题目介绍思路代码收获 70. 爬楼梯题目介绍思路代码收获 466. 统计重复个数 今天的每日一题…

Leetcode11-快乐数(202)

1、题目 编写一个算法来判断一个数 n 是不是快乐数。 「快乐数」 定义为: 对于一个正整数,每一次将该数替换为它每个位置上的数字的平方和。 然后重复这个过程直到这个数变为 1,也可能是 无限循环 但始终变不到 1。 如果这个过程 结果为 1…

8K自动化测试面试题分享(有答案,非常详细)

关于自动化测试面试,会问到哪些问题呢?给大家简单总结了一下,每一个都是学员反馈过来的企业真题,相信对大家有帮助,最近有面试机会的,快来背一下答案吧 1、你会封装自动化测试框架吗? 这个问得…

华为月薪25K的自动化测试工程师到底要会那些技能!

​前言 3年自动化测试软件测试工程师职业生涯中,我所经历过的项目都是以自动化测试为主的。由于自动化测试是一个广泛的领域,我将自己的经验整理了一下分享给大家,话不多说,直接上干货。 自动化测试的目标和实践选择合适的自动化…

zookeeper 常见客户端介绍和使用 zkCli、自带API、 zkClient、Curator

文章目录 一、Zookeeper的命令行使用二、Zookeeper自带API的使用2.1 引入API2.1 API简单使用 三、Zookeeper三方客户端zkClient的使用3.1 引入依赖3.2 简单的使用案例四、Curator 客户端框架4.1 引入依赖4.2 简单使用案例 一、Zookeeper的命令行使用 ZooKeeper解压后&#xff…

使用Helmfile 管理helm charts

官网 ## https://github.com/helmfile/helmfile## 参考 https://cloud.tencent.com/developer/article/1766822 介绍 使用helmfile时,我们首先得了解helm的使用,以及如何开发一个helm chart。 helm是kubernetes的包管理工具。在实际的使用场景中我们涉…

分布式(4)

目录 16.分布式缓存可能会存在哪些问题? 17.分布式限流了解过吗? 18.分布式定时任务怎么实现? 19.什么是分布式系统的副本一致性?有哪些? 20.在分布式系统中有哪些常见的一致性算法? 21.谈谈你对一致性…

【排序算法】归并排序与快速排序:深入解析与比较

文章目录 1. 引言2. 归并排序(Merge Sort)3. 快速排序(Quick Sort)4. 归并排序与快速排序的比较5. 结论 1. 引言 排序算法是计算机科学中最基本且至关重要的概念之一。它们不仅是理解更复杂算法和数据结构的基石,而且…

面试复盘5——后端开发——一面面经——大厂的面试果然干货满满

前言 本文主要用于个人复盘学习,因此为保障公平,所以本文不指出公司名,题目编号只是为了自己区别而已。对待面经,望读者还是更多从其中学习总结,而不是去碰原题。 面试岗位信息 后端开发秋招,上海某大中…

【Linux Shell】2. Shell 变量

文章目录 【 1. 变量命名规则 】【 2. 变量的使用 】【 3. 只读变量 】【 4. 删除变量 】【 5. 变量类型 】【 6. Shell 字符串 】6.1 字符串的分类6.2 字符串操作 【 7. Shell 数组 】7.1 定义数组7.2 读取数组7.3 获取数组的长度 【 8. Shell 注释 】8.1 单行注释8.2 多行注释…

机器学习的算法简单介绍-朴素贝叶斯算法

朴素贝叶斯网络(Naive Bayes Network)与贝叶斯网络(Bayesian Network)有一些不同之处,让我们来澄清一下这两个概念。 贝叶斯网络(Bayesian Network):贝叶斯网络是一种用于建模概率关…

Hi5 2.0 虚拟手与追踪器(Tracker)的位置修正

问题描述 使用环境与工具:Unity 2022.3.4fc1,steam VR(2.7.3),steamvrSDK(1.14.15),HTC vive pro专业版,Hi5 2.0数据手套 首先按照Hi5 2.0的使用说明(可参考:HI5 2.0 交…

windows 和linux 的区别

目前国内 Linux 更多的是应用于服务器上,而桌面操作系统更多使用的是 Windows。主要区别如下 比较层面WindowsLinux界面界面统一,外壳程序固定所有 Windows 程序菜单几乎一致,快捷键也几乎相同图形界面风格依发布版不同而不同,可…

Netty使用SSL实现双向通信加密

最近项目有个需求,TCP服务器实现基于证书通信加密,之前没做过,花了一些时间调研,今天整理下。 SSL(Secure Sockets Layer 安全套接字协议) 1、原理 算法原理 简而言之就是非对称加密算法 私钥自己持有,公钥发给对方,对方在发送信息的时候使用公钥进行加密数据,当接收到…

C# 数组相关操作

一。int[] 类型数组 1.求int[]数组中的最大值和最小值 int[] intArrnew int[]{ 1,2,3,4,5,-24,66};int a intArr.Max();int b intArr.Min();Console.WriteLine(a); //最大值为66Console.WriteLine(b); //最小值为-24 2.判断int[]数组中是否包含某个值 int[] intArrnew int[]…

IPA打包过程中的Invalid Bundle Structure错误如果解决

在iOS应用程序开发中,打包和发布应用程序是一个必要的步骤。有的时候在打包的过程中可能会遇到一些错误,其中一个比较常见的错误是"Invalid Bundle Structure"。这个错误通常意味着应用程序的文件结构不正确,而导致的无法成功打包应…