RDD持久化、广播、累加器

1、持久化

RDD的持久化包括两个方面:①操作RDD的时候怎么保存结果,这个部分属于action算子的部分②在实现算法的时候要进行cache、persist,还有checkpoint进行持久化。

1.1 persist和cache

Spark稍微复杂一点的算法里面都会有persit的身影,因为spark默认情况下是放在内存中,比较适合高速的迭代,如一个Stage有步骤非常多,中间不会产生临时数据,对于高速迭代是非常好的事情,但是对于分布式文件系统风险非常高,容易出错,这个时候就涉及到容错,由于RDD有血统继承关系,后面的RDD如果数据分片出错或者RDD本身出错之后可以根据前面的依赖血统关系算出来,但是如果没有对父RDD进行persist或cache还是要从头开始做。

首先先看下StorageLevel类,里面设置了RDD的各种缓存级别,总共有12种,其实是它的多个构造参数的组合形成的。

cache源码如下:

由源码可知,cache方法实际上调用了无参数的persist方法,缓存级别为仅在内存中。

persist源码如下:

无参的persist方法,默认缓存级别为仅在内存中

其实有2种情况:①是我们之前对RDD调用了checkpoint方法,这个方法是把RDD存储到disk上,之后我们再调用persist(newLevel)方法也是不会报错的,他会做检查你是否执行过checkpoint方法(即isLocallyCheckpointed),如果是的话就会调用persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true),而这里LocalRDDCheckpointData.transformStorageLevel(newLevel)返回的缓存级别是disk级别,故不会报错②如果我们之前设置过RDD的缓存级别,现在再次调用此方法进行缓存级别设置,但是缓存级别与之前一样,程序也是不会报错的,因为里面调用了persist(newLevel, allowOverride = false)方法

这个persist方法,适用于之前我们设置过了RDD的缓存级别,现在想要修改RDD的缓存级别的情况,只需要把allowOverride设置为true

这一段程序也解释了上面第一种方法的第一个特殊情况为什么不会报错

总结:①cache方法其实是persist方法的一个特例:调用的是无参数的persist(),代表缓存级别是仅内存的情况②persist方法有三种,分为默认无参数仅内存级别的persist(),还有persist(newLevel):这个方法需要之前对RDD没有设置过缓存级别,persist(newLevel,allowOverride):这个方法适用于之前对RDD设置过缓存级别,但是想更改缓存级别的情况。③取消缓存统一使用unpersist()方法④persist是lazy级别的(前面的算子都是lazy的每执行,所以他肯定也要是lazy级别的),unpersist是eager级别的(即调用的时候会立即清除)

注意:①cache之后一定不能立即有其他算子,cache后有算子的话,它每次都会重新触发这个计算过程,cache不是一个action;②cache被gc清除的两种方式:unpersist强制销毁数据;会被后续的计算结果挤掉

缓存实现的原理:DiskStore磁盘存储和MemoryStore内存存储

DiskStore磁盘存储:spark会在磁盘上创建spark文件夹,命名为(spark-local-x年x月x日时分秒-随机数),block块都会存在这里,然后把block id映射成相应的文件路径,就可以存取文件了

MemoryStore内存存储:使用hashmap管理block就行了,block id作为key,MemoryEntry为value

1.2 做缓存的时机

(1)计算特别耗时

(2)计算链条很长,失败的时候会有很大的代价:假设900个步骤在第800个步骤缓存,801的步骤失败了就会在800个步骤开始恢复;

(3)shuffle之后:shuffle是进行分发数据,缓存之后假设后面失败就不需要重新shuffle;

(4)checkpoint之前:checkpoint是把整个数据放到分布是文件系统中或磁盘,checkpoint是在当前作业执行之后,再触发一个作业,恢复时前面的步骤就不需要计算

缓存是不一定可靠的,缓存在内存中不一定是可靠的,把数据缓存在内存中有可能会丢失,例如只缓存在内存中,而不同时放在内存和磁盘上,可能内存crash(奔溃),crash内存现在有一种办法就是用Tachyon做底层存储,但是使用checkpoint的数数据一定放在文件系统上,这个时候数据就不会丢失。假设缓存了100万个数据分片,开始缓存是成功的,由于内存的紧张在一些机器上把一些数据分片清理掉了,那这时候就需要重新计

(5)shuffle之前persist(不过框架已经默认帮我们把数据持久化到本地磁盘)

//cache实例
val cached=sc.textFile("G:\\Scala\\data\\README.md").flatMap(_.split(" ")).map(_=>(_,1)).reduceByKey(_+_,1).cache
cached.count

广播BroadCast

为什么需要广播?每个task运行,读取全局数据的时候每个task每次都要拷贝一个数据副本,他的好处就是状态一致性,不好的就是耗大量的内存,变量大就容易OOM,这种是非常严重的,必须全部放在内存上,不能一部分放磁盘上。

广播过去的全局只读不能修改的,广播到worker的executor内存中。广播变量不需要销毁,应用程序存在他就存在,sc销毁它也就销毁了。

总结:广播是由Driver发送给当前Application分配的所有Executor内存级别的全局只读变量,Executor中的线程池中的线程共享该全局变量,极大减少网络传输(否则每个task都要传输一次该变量),并极大的节省内存,减少OOM的可能,当然也隐形的提高CPU的有效工作(因为每次传CPU也很忙)

//广播实例:
val number =6
val broadcastVar = sc.broadcast(number ) //广播只能由broadcast广播
val data=sc.parallelize(1 to 10)//创建RDD,由Task使用广播变量
val bn=data.map(_*broadcastVar .value)

3 累加器Accumulator

为什么需要累加器?累加器的特征:是全局级别的,且Executor中的task只能修改(增加内容),只有driver可读,因为我们通过driver控制整个集群有必要知道整个集群的状态。(对于Executor只能修改但不可读,只对driver可读)。Executor修改一定不会彼此覆盖相当于加锁了。

因为他的特性,在记录集群状态的时候,尤其是全局唯一状态的时候至关重要,可以保存唯一的全局变量。

累加器原理:由于被driver控制,在实际task运行的时候,每次都可以保证只对driver可读获取全局唯一的状态对象

总结:①累加器是全局唯一的,每次操作只增不减②在executor中只能修改他,也就是只能增加他的值。

可以认为Broadcast是线程级别全局共享,累加器是executor全局共享

//累加器实例:
val acc = sc.accumulator(0) 
val data=sc.parallelize(1 to 10)
val result=data.foreach(item=>acc +=item)
println(result)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473679.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 网易-1. 分割环(前缀和 + 哈希)

文章目录1. 题目2. 解题1. 题目 小易有 n 个数字排成一个环,你能否将它们分成连续的两个部分(即在环上必须连续),使得两部分的和相等? 输入描述: 第一行数据组数 T ,对于每组数据 第一行数字 n ,表示数字…

RDD的依赖与分区

1 宽依赖和窄依赖 RDD从具体的依赖的角度讲,有窄依赖和宽依赖2种情况。 窄依赖:指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、filter等都会产生窄依赖; 宽依赖:指一个父RDD的Partition会被…

爬虫案列:京东商城长裤信息获取

1、创建Scrapy项目 使用全局命令startproject创建项目,创建新文件夹并且使用命令进入文件夹,创建一个名为jingdong的Scrapy项目。 [python] view plaincopy scrapy startproject jingdong 2.使用项目命令genspider创建Spider [python] view plaincopy …

ACwing 2. 01背包问题(DP)

文章目录1. 题目2. 解题1. 题目 有 N 件物品和一个容量是 V 的背包。每件物品只能使用一次。 第 i 件物品的体积是 vi,价值是 wi。 求解将哪些物品装入背包,可使这些物品的总体积不超过背包容量,且总价值最大。 输出最大价值。 输入格式 …

Redis-Scrapy分布式爬虫:当当网图书为例

Scrapy-Redis分布式策略: Scrapy_redis在scrapy的基础上实现了更多,更强大的功能,具体体现在: reqeust去重,爬虫持久化,和轻松实现分布式 假设有四台电脑:Windows 10、Mac OS X、Ubuntu 16.04、…

Saprk排序

1、基础排序算子sortBy和sortByKey 在Spark中存在两种对RDD进行排序的函数,分别是 sortBy和sortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark0.9.0之后才引入的。而sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。下面…

ACwing 3. 完全背包问题(DP)

文章目录1. 题目2. 解题1. 题目 有 N 种物品和一个容量是 V 的背包,每种物品都有无限件可用。 第 i 种物品的体积是 vi,价值是 wi。 求解将哪些物品装入背包,可使这些物品的总体积不超过背包容量,且总价值最大。 输出最大价值。…

Crontab定时任务访问url实例

以下操作均是在ubuntu 下操作的: 1、进入crontab文件的编写状态: crontab -e 2、第一次进入编写crontab文件的界面,系统会提示选择相应的编辑器,一般我们选择vi编辑器就可以了:选择/usr/bin/vim.tiny 12345Select a…

ACwing 4. 多重背包问题 I(DP)

文章目录1. 题目2. 解题1. 题目 有 N 种物品和一个容量是 V 的背包。 第 i 种物品最多有 si 件,每件体积是 vi,价值是 wi。 求解将哪些物品装入背包,可使物品体积总和不超过背包容量,且价值总和最大。 输出最大价值。 输入格式…

数据算法与结构基本知识

数据结构与算法作用 没有看过数据结构和算法,有时面对问题可能会没有任何思路,不知如何下手去解决;大部分时间可能解决了问题,可是对程序运行的效率和开销没有意识,性能低下;有时会借助别人开发的利器暂时…

Master HA源码解析

1、Master HA概述 Spark在生产上做HA一般采用的是通过zookeeper的方式,配置3个master的话是比较可靠的方式。采用zookeeper做HA的话zookeeper会保存整个Spark程序运行时候的元数据(包括Workers,Drivers,Applications,…

DNS坑爹呢?!

昨天下午3点多,大量网民反映无法上网。多家DNS服务商通过微博透露,在1月21日下午3点20分左右,全国所有通用顶级域的根出现异常,导致部分国内网民无法访问.com域名网站,对中国互联网造成严重影响。 昨天下午有事出去&am…

数据结构顺序表基本流程

生活中很多事物是有顺序关系的,如班级座位从前到后是按排的顺序,从左到右是按列的顺序,可以很方便的定位到某一个位置,但如果座位是散乱的,就很难定位。 在程序中,经常需要将一组(通常是同为某…

Spark2.x RPC解析

1、概述 在Spark中很多地方都涉及网络通信,比如Spark各个组件间的消息互通、用户文件与Jar包的上传、节点间的Shuffle过程、Block数据的复制与备份等。Spark 2.0 之后,master 和worker 之间完全不使用akka 通信,改用netty实现。因为使用Akka…

LeetCode 1629. 按键持续时间最长的键

文章目录1. 题目2. 解题1. 题目 LeetCode 设计了一款新式键盘,正在测试其可用性。测试人员将会点击一系列键(总计 n 个),每次一个。 给你一个长度为 n 的字符串 keysPressed ,其中 keysPressed[i] 表示测试序列中第 …

数据结构中的栈

整理衣服时,先放冬天的衣服,后放夏天的衣服,这样夏天的衣服就在上面,方便夏季取用。 栈(stack),有些地方称为堆栈,是一种容器,可存入数据元素、访问元素、删除元素&…

数据结构中的队列

生活中很多时候需要排队来维持秩序,如等公交、取票、办理银行业务等。 队列(queue)是只允许在一端进行插入操作,而在另一端进行删除操作的线性表。 队列是一种先进先出的(First In First Out)的线性表&am…

SparkContext解析

1、SparkContext概述 Spark的程序编写是基于SparkContext的,体现在2方面:①Spark编程的核心基础(RDD),第一个RDD是由SparkContext创建的;②Spark程序的调度优化也是基于SparkContext,RDD在一开…

LeetCode 1630. 等差子数组

文章目录1. 题目2. 解题1. 题目 如果一个数列由至少两个元素组成,且每两个连续元素之间的差值都相同,那么这个序列就是 等差数列 。更正式地,数列 s 是等差数列,只需要满足:对于每个有效的 i , s[i1] - s[…

LeetCode 1631. 最小体力消耗路径(DFS + 二分查找)

文章目录1. 题目2. 解题1. 题目 你准备参加一场远足活动。给你一个二维 rows x columns 的地图 heights ,其中 heights[row][col] 表示格子 (row, col) 的高度。 一开始你在最左上角的格子 (0, 0) ,且你希望去最右下角的格子 (rows-1, columns-1) &…