【Spark精讲】一文讲透Spark RDD

MapReduce的缺陷

MR虽然在编程接口的种类和丰富程度上已经比较完善了,但这些系统普遍都缺乏操作分布式内存的接口抽象,导致很多应用在性能上非常低效 。 这些应用的共同特点是需要在多个并行操 作之间重用工作数据集 ,典型的场景就是机器学习和图应用中常用的迭代算法 (每一步对数据 执行相似的函数) 。

RDD

RDD是只读的。

RDD五大属性:①分区、②依赖、③计算函数、④分区器、⑤首选运行位置。

RDD 则是直接在编程接口层面提供了一种高度受限的共享内存模型,如图下图所示。 RDD 是 Spark 的核心数据结构,全称是弹性分布式数据集 (Resilient Distributed Dataset),其本质是一种分布式的内存抽象,表示一个只读的数据分区( Partition)集合 。一个 RDD 通常只能通过其他的 RDD转换而创建。 RDD 定义了各种丰富的转换操作(如 map、 join和 filter等),通过这些转换操作,新的 RDD 包含了如何从其他 RDD 衍生所必需的信息,这些信息构成了 RDD 之间的依赖关系( Dependency) 。 依赖具体分为两种, 一种是窄依赖, RDD 之间分区是一一对应的;另一种是宽依赖,下游 RDD 的每个分区与上游 RDD (也称之为父 RDD)的每个分区都有关,是多对多的关系 。 窄依赖中的所有转换操作可以通过类似管道(Pipeline)的方式全部执行,宽依赖意味着数据需要在不同节点之间 Shuffle 传输 。

RDD计算的时候会通过一个 compute函数得到每个分区的数据。 若 RDD是通过已有的文件系统构建的,则 compute 函数读取指定文件系统中的数据;如果 RDD 是通过其他 RDD 转换而来的,则 compute 函数执行转换逻辑,将其他 RDD 的数据进行转换。 RDD 的操作算子包括两 类, 一类是 transformation,用来将 RDD 进行转换,构建 RDD 的依赖关系;另一类称为 action, 用来触发 RDD 的计算,得到 RDD 的相关计算结果或将 RDD 保存到文件系统中。

在 Spark 中, RDD 可以创建为对象 ,通过对象上的各种方法调用来对 RDD 进行转换 。 经过一系列的 transformation逻辑之后,就可以调用 action来触发 RDD 的最终计算。 通常来讲, action 包括多种方式,可以 是 向应用程序返回结果( show、 count 和 collect等),也可以是向存 储系统保存数据(saveAsTextFile等)。 在Spark中,只有遇到 action,才会真正地执行 RDD 的计算(注:这被称为惰性计算,英文为 LazyEvqluation),这样在运行时可以通过管道的方式传输多个转换 。

总结而言,基于 RDD 的计算任务可描述为从稳定的物理存储(如分布式文件系统 HDFS) 中加载记录,记录被传入由一组确定性操作构成的 DAG (有向无环图),然后写回稳定存储。 RDD还可以将数据集缓存到内存中,使得在多个操作之间可以很方便地重用数据集。 总的来讲,RDD 能够很方便地支持 MapReduce 应用、关系型数据处理、流式数据处理(Stream Processing) 和迭代型应用(图计算、机器学习等)。

在容错性方面,基于 RDD 之间的依赖, 一个任务流可以描述为 DAG。 在实际执行的时候, RDD 通过 Lineage 信息(血缘关系)来完成容错,即使出现数据分区丢失,也可以通过 Lineage 信息重建分区。 如果在应用程序中多次使用同一个 RDD,则可以将这个 RDD 缓存起来,该 RDD 只有在第一次计算的时候会根据 Lineage 信息得到分区的数据,在后续其他地方用到这个 RDD 的时候,会直接从缓存处读取而不用再根据 Lineage信息计算,通过重用达到提升性能的目的 。 虽然 RDD 的 Lineage 信息可以天然地实现容错(当 RDD 的某个分区数据计算失败或丢 失时,可以通过 Lineage信息重建),但是对于长时间迭代型应用来说,随着迭代的进行,RDD 与 RDD之间的 Lineage信息会越来越长,一旦在后续迭代过程中出错,就需要通过非常长的 Lineage信息去重建,对性能产生很大的影响。 为此,RDD 支持用 checkpoint机制将数据保存到持久化的存储中,这样就可以切断之前的 Lineage信息,因为 checkpoint后的 RDD不再需要知道它的父 RDD ,可以从 checkpoint 处获取数据。

DAG

顾名思义,DAG 是一种“图”,图计算模型的应用由来已久,早在上个世纪就被应用于数据库系统(Graph databases)的实现中。任何一个图都包含两种基本元素:节点(Vertex)和边(Edge),节点通常用于表示实体,而边则代表实体间的关系。

DAG,有向无环图,Directed Acyclic Graph的缩写,常用于建模。Spark中使用DAG对RDD的关系进行建模,描述了RDD的依赖关系,这种关系也被称之为lineage,RDD的依赖关系使用Dependency维护,参考Spark RDD之Dependency,DAG在Spark中的对应的实现为DAGScheduler。
基础概念
介绍DAGScheduler中的一些概念,有助于理解后续流程。

  • Job:调用RDD的一个action,如count,即触发一个Job,spark中对应实现为ActiveJob,DAGScheduler中使用集合activeJobs和jobIdToActiveJob维护Job
  • Stage:代表一个Job的DAG,会在发生shuffle处被切分,切分后每一个部分即为一个Stage,Stage实现分为ShuffleMapStage和ResultStage,一个Job切分的结果是0个或多个ShuffleMapStage加一个ResultStage
  • TaskSet:一组Task
  • Task:最终被发送到Executor执行的任务,和stage的ShuffleMapStage和ResultStage对应,其实现分为ShuffleMapTask和ResultTask

把 DAG 图反向解析成多个阶段,每个阶段中包含多个任务,每个任务会被任务调度器分发给工作节点上的 Executor 上执行。

Web UI上DAG举例 

Checkpoint

RDD的依赖

checkpoint先了解一下RDD的依赖,比如计算wordcount:

scala>  sc.textFile("hdfs://leen:8020/user/hive/warehouse/tools.db/cde_prd").flatMap(_.split("\\\t")).map((_,1)).reduceByKey(_+_);
res0: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:28scala> res0.toDebugString
res1: String = 
(2) ShuffledRDD[4] at reduceByKey at <console>:28 []+-(2) MapPartitionsRDD[3] at map at <console>:28 []|  MapPartitionsRDD[2] at flatMap at <console>:28 []|  hdfs://leen:8020/user/hive/warehouse/tools.db/cde_prd MapPartitionsRDD[1] at textFile at <console>:28 []|  hdfs://leen:8020/user/hive/warehouse/tools.db/cde_prd HadoopRDD[0] at textFile at <console>:28 []

1、在textFile读取hdfs的时候就会先创建一个HadoopRDD,其中这个RDD是去读取hdfs的数据key为偏移量value为一行数据,因为通常来讲偏移量没有太大的作用所以然后会将HadoopRDD转化为MapPartitionsRDD,这个RDD只保留了hdfs的数据。
2、flatMap 产生一个RDD MapPartitionsRDD
3、map 产生一个RDD MapPartitionsRDD
4、reduceByKey 产生一个RDD ShuffledRDD

如何建立checkPoint

1、首先需要用sparkContext设置hdfs的checkpoint的目录,如果不设置使用checkpoint会抛出异常:

scala> res0.checkpoint
org.apache.spark.SparkException: Checkpoint directory has not been set in the SparkContextscala> sc.setCheckpointDir("hdfs://leen:8020/checkPointDir")

sc.setCheckpointDir("hdfs://leen:8020/checkPointDir")
执行了上面的代码,hdfs里面会创建一个目录:
/checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d

2、然后执行checkpoint

scala> res0.checkpoint
1

发现hdfs中还是没有数据,说明checkpoint也是个transformation的算子。

scala> res0.count()
INFO ReliableRDDCheckpointData: Done checkpointing RDD 4 to hdfs://leen:8020/checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4, new parent is RDD 5
res5: Long = 73689
1
2
3
hive > dfs -du -h /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4;
147    147    /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4/_partitioner
1.2 M  1.2 M  /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4/part-00000
1.2 M  1.2 M  /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4/part-00001

但是执行的时候相当于走了两次流程,前面计算了一遍,然后checkpoint又会计算一次,所以一般我们先进行cache然后做checkpoint就会只走一次流程,checkpoint的时候就会从刚cache到内存中取数据写入hdfs中,如下:

rdd.cache()
rdd.checkpoint()
rdd.collect

在源码中,在checkpoint的时候强烈建议先进行cache,并且当你checkpoint执行成功了,那么前面所有的RDD依赖都会被销毁,如下:

 /*** Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint* directory set with `SparkContext#setCheckpointDir` and all references to its parent* RDDs will be removed. This function must be called before any job has been* executed on this RDD. It is strongly recommended that this RDD is persisted in* memory, otherwise saving it on a file will require recomputation.*/def checkpoint(): Unit = RDDCheckpointData.synchronized {// NOTE: we use a global lock here due to complexities downstream with ensuring// children RDD partitions point to the correct parent partitions. In the future// we should revisit this consideration.if (context.checkpointDir.isEmpty) {throw new SparkException("Checkpoint directory has not been set in the SparkContext")} else if (checkpointData.isEmpty) {checkpointData = Some(new ReliableRDDCheckpointData(this))}}

RDD依赖被销毁

scala> res0.toDebugString
res6: String = 
(2) ShuffledRDD[4] at reduceByKey at <console>:28 []|  ReliableCheckpointRDD[5] at count at <console>:30 []

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/588076.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Apollo自动驾驶:改变交通运输的游戏规则

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 ChatGPT体验地址 文章目录 前言1. Apollo缓存层2. 本地状态管理库3. 离线同步和冲突解决4. 离线数据同步和离线优先策略结论 &#x1f4f2;&#x1f50c; 构建离线应用&#xff1a;Apollo…

ssm基于web 的个人时间管理系统+vue论文

基于web 的个人时间管理系统的设计与实现 摘要 当下&#xff0c;正处于信息化的时代&#xff0c;许多行业顺应时代的变化&#xff0c;结合使用计算机技术向数字化、信息化建设迈进。传统的个人时间信息管理模式&#xff0c;采用人工登记的方式保存相关数据&#xff0c;这种以人…

【数据结构】排序之交换排序(冒泡 | 快排)

交换目录 1. 前言2. 交换排序3. 冒泡排序3.1 分析3.2 代码实现 4. 快速排序4.1 hoare版本4.1.1 分析4.1.2 hoare版本代码 4.2 挖坑法4.2.1 分析4.2.2 挖坑法代码实现 4.3 前后指针版本4.3.1 分析4.3.2 前后指针版本代码实现 1. 前言 在之前的博客中介绍了插入排序&#xff0c;…

Linux基础知识学习2

tree命令的使用 可以看到dir2目录下的这些文件&#xff0c;要想显示dir2的具体结构&#xff0c;可用tree命令 mv命令 它可以实现两个功能 1.将文件移动到另一个目录中 2.对某一个文件进行重命名 1.将文件移动到另一个目录中 这里将dir1中的2.txt移动到他的子目录dir3中 执行…

“C语言与人生:手把手教你玩转C语言数组,从此编程无难题“

各位少年&#xff0c;我是博主那一脸阳光&#xff0c;由我来给大家介绍C语言的数组的详解。 在C语言中&#xff0c;数组是一种极其重要的数据结构&#xff0c;它允许我们存储和管理相同类型的一系列相关数据。通过理解并熟练掌握数组的使用&#xff0c;开发者能够高效地处理大量…

超真实随身WiFi测评,你确定不看一下?随身WiFi靠谱吗? 看完这篇文章你就懂了?随身WiFi真实评测

用了一年多的格行随身wifi&#xff0c;屏幕都磨花了。直接看图&#xff0c;都是自己实测&#xff01; 设备是去年买的&#xff0c;到现在也快1年了&#xff0c;一直有朋友蹲后续&#xff0c;现在把后续给大家&#xff01;到底是大牌子&#xff0c;确定是不跑路的随身wifi&…

Vue独立组件开发-递归组件

文章目录 一、前言二、实现三、总结四、最后 一、前言 递归组件就是指组件在模板中调用自己。 二、实现 开启递归组件的必要条件&#xff0c;就是在组件中设置一个 name 选项。 <template><div><my-component></my-component></div> </te…

提取 PE 文件的各种信息

前段时间项目需要实现对 Windows PE 文件版本信息的提取&#xff0c;如文件说明、文件版本、产品名称、版权、原始文件名等信息。获取这些信息在 Windows 下当然有一系列的 API 函数供调用&#xff0c;简单方便。 我们先看一下PE文件结构&#xff0c;PE文件由DOS首部&#xff0…

2023-12-21 LeetCode每日一题(美丽塔 II)

2023-12-21每日一题 一、题目编号 2866. 美丽塔 II二、题目链接 点击跳转到题目位置 三、题目描述 给你一个长度为 n 下标从 0 开始的整数数组 maxHeights 。 你的任务是在坐标轴上建 n 座塔。第 i 座塔的下标为 i &#xff0c;高度为 heights[i] 。 如果以下条件满足&a…

linux驱动(一):led

本文主要探讨210的led驱动相关知识。 驱动 操作系统驱动硬件的代码,驱动上层是系统调用API,下层是硬件 宏内核&#xff1a;内核整体上为一个过程实现,运行在同一地址空间,相互调用简单高效 微内核&#xff1a;功能为独立过程,过程间通过IPC通信 …

【华为OD机试真题2023CD卷 JAVAJS】测试用例执行计划

华为OD2023(C&D卷)机试题库全覆盖,刷题指南点这里 测试用例执行计划 时间限制:1s 空间限制:256MB 限定语言:不限 题目描述: 某个产品当前迭代周期内有N个特性()需要进行覆盖测试,每个特性都被评估了对应的优先级,特性使用其ID作为下标进行标识。 设计了M个测试用…

在Linux运行LaTeX

共有三个步骤1. 装LaTexTeX Live - TeX Users Group 下载对应版本安装包安装 文件比较大&#xff0c;这步花的时间多一点&#xff0c;不过也不会太多&#xff0c;感觉5分钟十分钟的样子吧 2. 装TexStidio 这一步是安装一个类似在windows系统下的TaTeX GUI软件 图标是这样3. 配置…

Tensorflow2.X的GPU版框架最快最稳搭建方法

一、环境基础 Windows10以上 已装Anaconda 支持GPU 二、搭建步骤 1. 在Anaconda中创建并进入虚拟环境 conda create -n envname python3.8 conda activate envname 注意&#xff1a;envname 替换为你自己想命名的&#xff0c;下文将以“Ljdenv”出现 2.安…

【实用工具】Tmux使用指南

Tmux 三个重要概念 session&#xff08;会话&#xff09;、window&#xff08;窗口&#xff09;、pane&#xff08;面板&#xff09; 一个会话可以有多个窗口&#xff0c;一个窗口可以划分为多个面板 注意在tmux中使用快捷命令的话&#xff0c;需要加上前缀ctrlb 关于session的…

2024 GMF|The Sandbox 为创作者赋能的新时代

以新的 GMF 模型和专门的参与池奖励来开启 2024 年吧。 11 月 3 日&#xff0c;我们在香港全球创作者日上宣布&#xff0c;The Sandbox 已为所有创作者分配了100,000,000 SAND&#xff0c;将通过 GMF 进行分发。作为首次启动的建设者挑战&#xff0c;我们准备了专门的 SAND 参与…

linux 防火墙查看放行端口,追加放行端口命令

linux 查看防火墙已经放行端口列表 firewall-cmd --list-ports 运行结果如下&#xff1a; linux 追加防火墙经放行端口&#xff08;如追加443&#xff09; firewall-cmd --zonepublic --add-port443/tcp --permanent 亲测有效&#xff01;

数据结构 模拟实现LinkedList单向不循环链表

目录 一、链表的简单介绍 二、链表的接口 三、链表的方法实现 &#xff08;1&#xff09;display方法 &#xff08;2&#xff09;size得到单链表的长度方法 &#xff08;3&#xff09;addFirst头插方法 &#xff08;4&#xff09;addLast尾插方法 &#xff08;5&#xf…

美团到店终端从标准化到数字化的演进之路

总第580篇 | 2023年第032篇 本文整理自美团技术沙龙第76期《大前端研发协同效能提升与实践》。前端团队在产研多角色协同形式上存在不同阶段&#xff0c;而大前端多技术栈在各阶段都有其独特的实践&#xff0c;同时又有类似的演进路线。本文从到店终端团队移动端和前端技术栈持…

Linux学习第48天:Linux USB驱动试验:保持热情,保持节奏,持续学习是作为一个技术人员应有的基本素质和要求

Linux版本号4.1.15 芯片I.MX6ULL 大叔学Linux 品人间百味 思文短情长 最近更新的速度和频率大不如以前&#xff0c;主要原因还是自己有些懈怠了。学习是一个持续努力的过程&#xff0c;一旦中断&#xff0c;再想保持以往的状态可能要…

轻量封装WebGPU渲染系统示例<55>- 顶点数据更新

当前示例源码github地址: https://github.com/vilyLei/voxwebgpu/blob/feature/material/src/voxgpu/sample/VertexUpdateTest.ts 当前示例运行效果: ​​​​​​​ 此示例基于此渲染系统实现&#xff0c;当前示例TypeScript源码如下: export class VertexUpdateTest {pr…