RDD算子(四)、血缘关系、持久化

1. foreach

分布式遍历每一个元素,调用指定函数

val rdd = sc.makeRDD(List(1, 2, 3, 4))
rdd.foreach(println)

结果是随机的,因为foreach是在每一个Executor端并发执行,所以顺序是不确定的。如果采集collect之后再调用foreach打印,则是在Driver端执行。

RDD的方法之所以叫算子,就是为了与scala集合的方法区分开来, scala集合的方法是在同一个节点执行的,而RDD的算子则是在Executor(分布式节点)执行的。从计算的角度讲,RDD算子外部的操作都是在Driver端执行,算子内部的操作都是在Executor端执行。

2. 闭包检测

class User {var age : Int = 30
}
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val user = new User()
rdd.foreach(num => {println("age = " + (user.age + num))
})

因为foreach内部的操作是在Executor上执行的,所以在Driver上创建的user需要传递给各个Executor,如果user没有序列化,则会报错

class User extends Serializable{var age : Int = 30
}

或者将User变为样例类,因为样例类在编译时会自动混入序列化特质(实现可序列化接口)

case class User {var age : Int = 30
}

如果把原始集合变为空,依然会报错,这是因为RDD算子中传递的函数会包含闭包操作(匿名函数,算子内用到了算子外的数据),所以会进行闭包检测,即检查里面的变量是否序列化

val rdd = sc.makeRDD(List[Int]())
val user = new User()
rdd.foreach(num => {println("age = " + (user.age + num))
})

 再看如下案例:

class Search(query:String) {def isMatch(s:String): Boolean {s.contains(query)}def getMatch1(rdd: RDD[String]): RDD[String] {rdd.filter(isMatch)}def getMatch2(rdd: RDD[String]): RDD[String] {rdd.filter(x => x.contains(query))}
}
val rdd = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))
val search = new Search("h")
search.getMatch1(rdd).collect().foreach(println)

此时会报错Search类没有序列化,因为在rdd的filter算子内调用了query,而query作为类的构造参数,实际上是类的私有变量,isMatch方法相当于:

def isMatch(s:String): Boolean {s.contains(this.query)
}

this相当于类的对象,因此需要进行闭包检测。getMatch2也有类似的问题。除了将类序列化以及改为样例类之外,还可以将query赋给方法内部的临时变量:

def getMatch2(rdd: RDD[String]): RDD[String] {val s = queryrdd.filter(x => x.contains(s))
}

3. 依赖关系

 每个RDD会保存血缘关系(不会保存数据),这样提高了容错性,因为如果其中某个RDD转换到另一个RDD失败了,就可以根据血缘关系来重新读取。RDD保存依赖关系而示意图如下:

血缘关系展示代码:

val lines : RDD[String] = sc.textFile("datas")
println(lines.toDebugString)
println("******************")val words : RDD[String] = lines.flatMap(_.split(" "))
println(words.toDebugString)
println("******************")val wordToOne = words.map(word=>(word, 1))
println(wordToOne.toDebugString)
println("******************")val wordToSum : RDD[(String, Int] = wordToOne.reduceByKey(_+_)
println(wordToSum.toDebugString)
println("******************")

查看依赖关系只需直接将代码中的toDebugString改为dependencies即可

val lines : RDD[String] = sc.textFile("datas")
println(lines.dependencies)
println("******************")val words : RDD[String] = lines.flatMap(_.split(" "))
println(words.dependencies)
println("******************")val wordToOne = words.map(word=>(word, 1))
println(wordToOne.dependencies)
println("******************")val wordToSum : RDD[(String, Int] = wordToOne.reduceByKey(_+_)
println(wordToSum.dependencies)
println("******************")

​​​​​​​

 一对一的依赖关系表示新的RDD的一个分区的数据来源于旧的RDD的一个分区的数据,也叫窄依赖,而Shuffle依赖关系表示新的RDD的一个分区的数据来源于旧的RDD的多个分区的数据,也叫宽依赖。

4. 阶段和任务划分

窄依赖中,分区有多少个,就有多少个任务,只有一个阶段;宽依赖中,有两个阶段,每个阶段的任务数等于分区数。

RDD阶段由有向无环图(DAG)表示

Application->Job->Stage->Task每一个层级是1对n的关系。

每个Application中可能会提交多个作业,一个作业会划分为多个阶段(阶段数=宽依赖个数+1),一个阶段可能因为多个分区而包含多个任务,一个阶段中的任务数=最后一个RDD的分区数。

5. cache和persist

如果对于同一份数据源,想做多个不同的功能(比如统计单词数以及根据单词分组),这些不同的功能在实现过程中有很多重复的步骤(比如很多相同的RDD转换),此时可能会引入性能问题。虽然看起来RDD转换的过程复用了,但是RDD不存储数据,只有逻辑,所以最终的行为算子会从头开始再读取相同的数据,比如下面代码:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

 

可以看到,在一行*上下,@都执行了,说明数据从头开始读取,从最开始的RDD再次执行。 

为了解决这种性能问题,可以对mapRDD里的数据进行缓存,要么缓存在内存中,要么缓存在磁盘中,得看具体情况,这就是RDD的持久化操作。使用cache方法缓存在内存中:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.cache()val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

 

放在内存中可能不安全,使用persist方法缓存在磁盘中:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.persist(StorageLevel.DISK_ONLY)val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

注意:持久化操作也是等到行动算子触发才会真正执行。持久化操作不一定都是为了重用才引入的,有些情况下,前面一些RDD转换操作耗时很长或者数据很重要的场合,也可以进行持久化操作,这样一旦中间出了问题,重新执行任务不至于再执行之前耗时很长的操作。

除了以上的cache和persist方法,还可以使用检查点(checkpoint)的方法进行持久化操作。checkpoint需要落盘,因此需要指定存储路径,之前的persist方法也要落盘,只不过它存储在临时路径,任务执行完就会删除,而checkpoint是永久化存储

sc.setCheckPointDir("cp")val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.checkpoint()val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

但是checkpoint会单独再开启一个作业,因此效率可能更低。但是与cache联合执行,即先cache,再checkpoint,就不会开启新的作业。

另外,使用cache方法会改变RDD的血缘关系:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.checkpoint()
println(mapRDD.toDebugString)val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")
println(mapRDD.toDebugString)

 

 

可以看到,cache方法(其实persis方法也会) 在血缘关系中添加新的依赖(原来的依赖还保留)。但是checkpoint方法会改变原来的血缘关系,建立新的血缘关系(等同于数据源变了):

sc.setCheckPointDir("cp")val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.checkpoint()
println(mapRDD.toDebugString)val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")
println(mapRDD.toDebugString)

 

6. 自定义分区器

class MyPartitioner extends Partitioner {override def numPartitions : Int = n  //自定义,可以写死override def getPartition(key : Any) : Int = {key match {case "xxx" => 0case _ => 1}}}

分区器传给RDD:

val partitionRDD = rdd.partitionBy(new MyPartitioner)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/794491.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringMVC --- 老杜

1、什么是SpringMVC? SpringMVC是一个基于Java实现了MVC设计模式的请求驱动类型的轻量级Web框架,通过把Model,View,Controller分离,将web层进行职责解耦,把复杂的web应用分成逻辑清晰的及部分,…

Adobe Bridge 2024:连接创意,探索无限可能 mac/win版

Adobe Bridge 2024,作为Adobe家族中的一款强大的创意管理工具,再次革新了数字资产管理和工作流程优化的标准。这款软件不仅继承了Adobe Bridge一贯的直观界面和强大功能,更在多个方面进行了突破性的改进。 Bridge 2024软件获取 全面的资源管…

idea常用代码模板

1、非空判断 变量.null:if(变量 null)变量.nn:if(变量 ! null)变量.notnull:if(变量 ! null)ifn:if(xx null)inn:if(xx ! null) 2、遍历数组和集合 数组或集合变量.fori:for循环数组或集合变量.for&am…

突破编程_C++_网络编程(TCPIP 四层模型(传输层))

1 传输层的功能与作用 在 TCP/IP 四层模型中,传输层位于网络层之上和应用层之下,负责在源主机和目标主机之间提供端到端的可靠数据传输服务。传输层的主要功能与作用体现在以下几个方面: 分段与重组:由于网络层的数据包大小有限制…

内网穿透的应用-如何在Android Termux上部署MySQL数据库并实现无公网IP远程访问

文章目录 前言1.安装MariaDB2.安装cpolar内网穿透工具3. 创建安全隧道映射mysql4. 公网远程连接5. 固定远程连接地址 前言 Android作为移动设备,尽管最初并非设计为服务器,但是随着技术的进步我们可以将Android配置为生产力工具,变成一个随身…

labview如何创建2D多曲线XY图和3D图

1如何使用labview创建2D多曲线图 使用“索引与捆绑簇数组”函数将多个一维数组捆绑成一个簇的数组,然后将结果赋值给XY图,这样一个多曲线XY图就生成了。也可以自己去手动索引,手动捆绑并生成数组,结果是一样的 2.如何创建3D图 在…

pix2pix GAN

import os os.environ[TF_CPP_MIN_LOG_LEVEL] = 2#设置tensorflow的日志级别 from tensorflow.python.platform import build_info import tensorflow as tf import os # 用于处理文件系统路径的面向对象的库。pathlib 提供了 Path 类, #该类表示文件系统路径,并提供了很多方…

Vue2 —— 学习(一)

(二)简单案例 1.实现过程 容器设置 Vue 实例设置 2.实现结果 3.注意事项 (三)Vue 插件 ​编辑三、Vue 模板语法 (一)插值语法 {{ }}: (二)指令语法 v- 四、…

如何更新Code::blocks的MinGW

前言 LVGL V9版本更新了很多新特性,其中windows平台部分也进行了优化,如果你是用的是Code::blocks体验LVGL那么在编译时会不通过;因为如果你使用的是 Code::blocks 20.03并且使用内置的MinGW,那么就会因为MinGW版本过低遇到下面所…

babyAGI(8)-babyCoder5主程序逻辑

前期代码都以阅读完毕,接下来我们来看主程序逻辑,建议大家好好看看流程图,有个流程的影响 1. 创建任务 下面一段代码主要用来创建任务以及打印相关信息,调用了四个agents code_tasks_initializer_agent 初始化任务code_tasks_…

信息系统项目管理师——第18章项目绩效域管理(二)

项目工作绩效域 预期目标 高效且有数的项目绩效 2.适合项目和环境的项目过程 3.干系人适当的沟通和参与 4.对实物资源进行了有效管理 5.对采购进行了有效管理 6.有效处理了变更 7.通过持续学习和过程改进提高了团队能力 绩效要点 1.项目过程 2.项目制约因素 3.专注于工作过…

React - 连连看小游戏

简介 小时候经常玩连连看小游戏。在游戏中,当找到2个相同的元素就可以消除元素。 本文会借助react实现连连看小游戏。 实现效果 实现难点 1.item 生成 1. 每一个图片都是一个item,items数组的大小为size*size。 item对象包括grid布局的位置,…

【爬虫开发】爬虫从0到1全知识md笔记第4篇:Selenium课程概要,selenium的介绍【附代码文档】

爬虫开发从0到1全知识教程完整教程(附代码资料)主要内容讲述:爬虫课程概要,爬虫基础爬虫概述,,http协议复习。requests模块,requests模块1. requests模块介绍,2. response响应对象,3. requests模块发送请求,4. request…

入门用Hive构建数据仓库

在当今数据爆炸的时代,构建高效的数据仓库是企业实现数据驱动决策的关键。Apache Hive 是一个基于 Hadoop 的数据仓库工具,可以轻松地进行数据存储、查询和分析。本文将介绍什么是 Hive、为什么选择 Hive 构建数据仓库、如何搭建 Hive 环境以及如何在 Hi…

分解因数

描述 给出一个正整数 a&#xff0c;要求分解成若干个正整数的乘积&#xff0c;即 aa1a2a3…an&#xff0c;并且 1<a1≤a2≤a3≤…≤an&#xff0c;问这样的分解的方案种数有多少。注意到aa 也是一种分解。 输入描述 第 1 行是测试数据的组数 n(1≤n≤10)&#xff0c;后面…

AcWing 4199. 公约数(数学-约数)

给定两个正整数 a a a 和 b b b。 你需要回答 q q q 个询问。 每个询问给定两个整数 l , r l,r l,r&#xff0c;你需要找到最大的整数 x x x&#xff0c;满足&#xff1a; x x x 是 a a a 和 b b b 的公约数。 l ≤ x ≤ r l≤x≤r l≤x≤r。 输入格式 第一行包含两个…

【PaletX】ui组件使用

表单 当表单不是以component和template形式时&#xff0c;不需要patchValue重新赋值 srcObj用于赋值表单初始值 表单校验 优先级&#xff1a;输入过程中的校验 > 焦点离开后的校验 > 点击确定按钮后的校验 适用场景&#xff1a; 输入过程中的校验&#xff1a;焦点进入…

类与对象(一)

目录 一、类的引入和定义 二、类的访问限定符及封装 1&#xff09;访问限定符 2&#xff09;封装 三、类的作用域和实例化 1&#xff09;类的作用域 2&#xff09;实例化 四、类的大小 1&#xff09;类的大小计算方式 2&#xff09;特殊的类的大小 五、this指针 1&…

C++设计模式:观察者模式(三)

1、定义与动机 观察者模式定义&#xff1a;定义对象间的一种1对多&#xff08;变化&#xff09;的依赖关系&#xff0c;以便当一个对象&#xff08;Subject&#xff09;的状态发生比改变时&#xff0c;所有依赖于它的对象都得到通知并且自动更新 再软件构建过程中&#xff0c…

回溯算法|332.重新安排行程 51. N皇后 37. 解数独

332.重新安排行程 力扣题目链接 class Solution { private: // unordered_map<出发机场, map<到达机场, 航班次数>> targets unordered_map<string, map<string, int>> targets; bool backtracking(int ticketNum, vector<string>& result…