Spark算子介绍

Spark算子

文章目录

  • Spark算子
    • 一、转换算子
      • coalesce函数
      • repartition函数
      • flatMap——flatMap变换
      • sample——抽样
      • zip——联结
      • mapValues——对Value值进行变换
    • 二、行动Action算子
      • 数据运算类行动算子
        • reduce——Reduce操作
        • collect——收集元素
        • countByKey——按Key值统计Key/Value型RDD中的元素个数
        • countByValue——统计RDD中元素值出现的次数
        • foreach——逐个处理RDD元素
        • lookup——查找元素
        • take——获取前n个元素
        • takeSample——提取n个元素
        • takeOrdered——获取排序后的前n个元素
      • 存储型行动算子
        • saveAsObjectFile——存储为二进制文件
        • saveAsTextFile——存储为文本文件
        • saveAsNewAPIHadoopFile——存储为Hadoop文件
    • 三、缓存算子
      • cache——缓存RDD
      • checkpoint——建立RDD的检查点
      • persist——持久化RDD

一、转换算子

coalesce函数

返回一个经过简化到numPartitions个分区的新RDD。这会导致一个窄依赖,例如:你将1000个分区转换成100个分区,这个过程不会发生shuffle,相反如果10个分区转换成100个分区将会发生shuffle。然而如果你想大幅度合并分区,例如合并成一个分区,这会导致你的计算在少数几个集群节点上计算(言外之意:并行度不够)。为了避免这种情况,你可以将第二个shuffle参数传递一个true,这样会在重新分区过程中多一步shuffle,这意味着上游的分区可以并行运行。

注意:第二个参数shuffle=true,将会产生多于之前的分区数目,例如你有一个个数较少的分区,假如是100,调用coalesce(1000, shuffle = true)将会使用一个 HashPartitioner产生1000个分区分布在集群节点上。这个(对于提高并行度)是非常有用的。如果shuff为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDD的partition数变多的

repartition函数

返回一个恰好有numPartitions个分区的RDD,可以增加或者减少此RDD的并行度。内部,这将使用shuffle重新分布数据,如果你减少分区数,考虑使用coalesce,这样可以避免执行shuffle

Repartition函数内部调用了coalesce函数 shuffle 为True

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)}

flatMap——flatMap变换

算子函数格式:

flatMap[U](f:FlatMapFunction[T,U]):JavaRDD[U]

在前面我们已经了解到map变换是对原RDD中的每个元素进行一对一变换生成

新RDD,而flatMap不同的地方在于,它是对原RDD中的每个元素用指定函数f进行一

对多(这也是lat前缀的由来)的变换,然后将转换后的结果汇聚生成新RDD.

示例:

flatMap示例代码

scala>valrdd=sc,parallelize(0 to 3,1)//生成由0-3序列构成的RDDrdd:org.apache,spark.rdd.RDD[Int]=ParallelCollectionRDD[17] at parallelize at:21scala>val flatMappedRDD=rdd.flatMap(x=>0tox)//使用flatMap将每个原始变换为一个序列flatMappedRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[18] at flatMap at:23scala>flatMappedRDD.collect//显示新的RDDres0:Array[Int]=Array(0,0,1,0,1,2,0,1,2,3)

sample——抽样

算子函数格式:

sample(withReplacement:Boolean,fraction:Double,seed:Long):JavaRDD[T]

对原始RDD中的元素进行随机抽样,抽样后产生的元素集合构成新的RDD.

参数fraction 指定新集合中元素的数量占原始集合的比例.抽样时的随机数种子由seed指定.

参数withReplacement为false时,抽样方式为不放回抽样.

参数withReplacement为true时,抽样方式为放回抽样.

示例:

sample示例代码

1:scala>valrdd=sc.parallelize(0to9,1)//生成由0-9的序列构成的RDDrdd:org.apache.spark.rdd.RDD [Int1=ParallelCollectionRDD[5]at parallelize at:212:scala>rdd.sample(false,0.5).collect//不放回抽样一半比例的元素生成新的RDDres4:Array[Int]=Array(0,1,2,3,4,7)3:rdd.sample(false,0.5).collect//再次不放回抽样一半比例的元素生成新的RDDres7:Array [Int]=Array(0,1,3,6,8)4:scala>rdd.sample(false,0.8).collect//不放回抽样80%比例的元素生成新的RDDres8:Array[Int]=Array(0,1,2,5,6,8,9)5:scala>rdd.sample(true,0.5).co1lect//放回抽样一半比例的元素生成新的RDDres9:Array[Int]=Array(0,2,3,4,4,6,7,9)

zip——联结

算子函数格式:

zip[U](other:JavaRDDLike[U,_]):JavapairRDD(T,U]

输入参数为另一个RDD,zip变换生成由原始RDD的值为Key、输入参数RDD的值为Value依次配对构成的所有Key/Value对,并返回这些Key/Value对集合构成的新RDD.

示例:

zip示例代码

1:scala>val rdd1=sc.parallelize(0 to 4,1)//构建原始RDDrdd_1:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[19]at parallelize at :212:scala>val rdd2=sc.parallelize(5 to 9,1)//构建输入参数RDDrdd_2:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD [20]at parallelize at :213:scala>rdd_1.zip(rdd_2).collect//对两个RDD进行联结res5:Array[(Int,Int)]=Array((0,5),(1,6),(2,7),(3,8),(4,9})

mapValues——对Value值进行变换

算子函数格式:

mapValues[u](f:Function[v,U]):JavapairRDD[K,U]

将Key/Value型RDD中的每个元素的Value值,使用输入参数函数f进行变换,生成新的RDD.

示例:

1:scala>val pairs=sc.parallelize(List("apple","banana","berry","cherry","cumquat","haw"),1).keyBy(_.1ength)//构建原始RDDpairs:org.apache.spark.rdd.RDDI(Int,String)]=MappedRDD[16]at keyBy at:122:scala>pairs.mapvalues(v=>v+""+V{0)).collect//生成将单词加单词首字母的RDDres0:Array[(Int,string)]=Array{(5,apple a),(6,banana b),(5,berry b),(6,cherry c),7,cumquat c),(3,haw h))

二、行动Action算子

数据运算类行动算子

reduce——Reduce操作

算子函数格式:

reduce(f:Function2[T,T,T]):T

对RDD中的每个元素依次使用指定的函数f进行运算,并输出最终的计算结果.

需要注意的是,Spark中的reduce操作与Hadoop中的reduce操作并不一样.在Hadoop中,reduce操作是将指定的函数作用在Key值相同的全部元素上.而Spark的reduce操作则是对所有元素依次进行相同的函数计算.

示例:

1:scala>val nums=sc.parallelize(0 to 9,5)//构建由数字0-9构成的RDDnums:org,apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[18]at parallelize at:122:scala>nums.reduce(_+_)//计算RDD中所有数字的和

collect——收集元素

算子函数格式:

co1lect():List[T]

collect的作用是以数组格式返回RDD内的所有元素.

示例:

1:scala>val data=sc.parallelize(List(1,2,3,4,5,6,7,8,9,0},2)//构建原始RDDdata;org.apache.spark.rdd.RDD[Int]=ParallelCo1lectionRDD[8]at parallelize at:122:scala>data.collect//显示原始RDD中的元素res0:Array[Int]=Array(l,2,3,4,5,6,7,8,9,0)

countByKey——按Key值统计Key/Value型RDD中的元素个数

算子函数格式:

countByKey():Map[K,Long]

计算Key/Value型RDD中每个Key值对应的元素个数,并以Map数据类型返回

统计结果.

示例:

1:scala>val pairRDD=sc.parallelize(List(("fruit","Apple"),("fruit","Banana"),{"fruit","Cherry "),{"vegetable","bean"),("vegetable","cucumber"),("vegetable","pepper")),2} //构建原始 RDDpairRDD:org.apache.spark.rdd.RDD [(String,String)]=Paralle1 Collection RDD[3 Jat parallelize at :122:sca1a>pairRDD.countByKey //统计原始RDD中每个物品类型下的物品数量res0:scala.collection.Map[String,Long]=Map(fruit->3,vegetable->3)

countByValue——统计RDD中元素值出现的次数

算子函数格式:

countByValue():Map[T,Long]

计算RDD中每个元素的值出现的次数,并以Map数据类型返回统计结果.

countByValue示例代码

1:scala>val num=sc.parallelize(List(1,1,1,2,2,3),2)//构建原始RDDnum:org.apache.spark.rdd.RDD [Int]=ParallelcollectionRDD[4]atparallelize at:122:scala>num.countByValue//统计原始RDD中每个数字出现的次数res0:scala.collection.Map[Int,Long]=Map(2->2,1->3,3->1)

foreach——逐个处理RDD元素

算子函数格式:

foreach(f:VoidFunction[(K,V)]):Unit

对RDD中的每个元素,使用参数f指定的函数进行处理.

示例:

1:scala>val words=sc.parallelize(List("A","B","C","D"),2)//构建原始 RDDwords;org.apache.spark.rdd,RDD[String]=ParallelCollectionRDD[9] at parallelize at :212:scala>words.foreach(x=>print1n(x+"is a letter."))/打印输出每个单词构造的一句话Cis a letter.Ais a letter.Dis a letter.Bis a letter.

lookup——查找元素

算子函数格式:

lookup(key:K):List[V]

在Key/Value型的RDD中,查找与参数key相同Key值的元素,并得到这些元素

的Value值构成的序列.

示例:

1:scala>val pairs=sc.parallelize(List("apple","banana","berry","cherry","cumcquat","haw"),1).keyBy (_.1ength)//构建原始RDDpairs:org.apache.spark.rdd.RDDt(Int,String)]=MapPartitionsRDD[13] at keyBy at:212:scala>pairs.collectres18:Array [(Int,String)]=Array((5,apple),(6,banana),(5,berry),(6,cherry),(7,cumcuat),(3,haw))3:scala>pairs.lookup(5)//查找长度为5的单词res19:Seq[string]=WrappedArray (apple,berry)

take——获取前n个元素

takeSample——提取n个元素

takeOrdered——获取排序后的前n个元素

存储型行动算子

saveAsObjectFile——存储为二进制文件

算子函数格式:

saveAsobjectPile(path:string):Unit

将RDD转换为序列号对象后,以Hadoop SequenceFile文件格式保存,保存路径由

参数path指定.

示例:

1:scala>val data=sc.parallelize(0to9,1)//构建0-9组成的RDDdata:org.apache.spark.rdd.RDD[Int]=Paralle1CollectionRDD[40]at parallelize at :122:scala>data.saveAsobjectFile("obj")//将RDD以SequenceFile文件格式保存,文件名为obj

saveAsTextFile——存储为文本文件

saveAsNewAPIHadoopFile——存储为Hadoop文件

三、缓存算子

为了提高计算效率,Spark采用了两个重要机制:

①基于分布式内存数据集进行运算,也就是我们已经熟知的RDD;

②变换算子的惰性执行(Lazy Evaluation),即RDD的变换操作并不是在运行到该行代码时立即执行,而仅记录下转换操作的操作对象.只有当运行到一个行动算子代码时,变换操作的计算逻辑才真正执行.

这两个机制帮助Spark提高了运算效率,但正如’硬币都有两面’一样,在带来提升性能的好处的同时,这两个机制也留下了隐患.

例如:

①如果在计算过程中,需要反复使用某个RDD,而该RDD需要经过多次变换才能得到,则每次使用该RDD时都需要重复这些变换操作,这种运算效率是很低的;

②在计算过程中数据存放在内存中,如果出现参与计算的某个节点出现问题,则存放在该节点内存中的RDD数据会发生损坏.如果损坏的也是需要经过多次变换才能得到的RDD,此时虽然可以通过再次执行计算恢复该RDD,但仍然要付出很大的代价.因此,Spark提供了一类缓存算子,以帮助用户解决此类问题.

cache——缓存RDD

算子函数格式:

cache():JavaRDD[T]

cache将RDD的数据持久化存储在内存中,其实现方法是使用后面我们会介绍的persist算子.当需要反复使用某RDD时,使用cache缓存后,可以直接从内存中读出,不再需要执行该RDD的变换过程.需要注意的是,这种缓存方式虽然可以提高再次使用某个RDD的效率,但由于cache后的数据仅仅存储在内存中,因此不能解决RDD出错时需要再次恢复运算的问题.而且cache保存的数据在Driver关闭后会被清除,因此不能被在其他Driver中启动的Spark程序使用.

示例:

1:scala>val num=sc.parallelize(0to9,1)//构建RDDnum:org,apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[7]at parallelize at:212:scala>val result=num.map(x=>x*x)//对原始RDD进行map变换result:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD [8]at map at:233:scala>result.cache//对新RDD进行缓存res19:result.type=MapPartitionsRDD[8 Jat map at :234:scala>result.count//统计新RDD中的元素个数res30:Long=105:scala>result.collect().mkstring(',")//再次使用新RDD,生成用逗号分隔的序列res31:String=0,1,4,9,16,25,36,49,64,81

checkpoint——建立RDD的检查点

算子函数格式:

checkpoint():Unit

对于需要很长时间才能计算出或者需要依赖很多其他RDD变化才能得到的RDD,如果在计算过程中出错,要从头恢复需要付出很大的代价.此时,可以利用checkpoint建立中间过程的检查点,Spark会将执行checkpoint操作的RDD持久化,以二进制文件的形式存放在指定的目录下.与cache不同的是,checkpoint保存的数据在Driver关闭后仍然以文件的形式存在,因此可以被其他Driver中的Spark程序使用.

示例:

1:scala>val rdd=sc.makeRDD(1to9,2)//构建原始RDDrdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]at makeRDD at :212:scala>val flatMapRDD=rdd.flatMap(x=>Seq(x,x))//对原始RDD做flatMap变换flatMapRDD:org.apache.spark.rdd.RDD [Int]=MappartitionsRDD[1]at flatMap at:233:scala>sc.setCheckpointDir("my_checkpoint")//指定checkpoint存放的目录4:scala>flatMapRDD.checkpoint()//建立 checkpoint5:scala>flatMapRDD.dependencies.head.rdd//显示变换后RDD的依赖res2:org.apache.spark.rdd.RDD(_]=ParallelcollectionRDD[0]at makeRDD at:216:scala>flatMapRDD.collect()//显示变换后的RDDres3:Array[Int]=Array(1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8,9,9)7:scala>flatMapRDD.dependencies.head.rdd//再次显示变换后RDD的依赖res4:org.apache.spark.rdd.RDD [_1=CheckpointRDD[2]at collect at:26

persist——持久化RDD

算子函数格式:

persist(newLeve1:storageLeve1):JavaRDD[T]

调用persist可对RDD进行持久化操作,利用参数newlevel可以指定不同的持久化方式,常用的持久化方式包括:

  • MEMORY_ONLY:仅在内存中持久化,且将RDD作为非序列化的Java对象存储在JVM中.这种方式比较轻量,是默认的持久化方式.

  • MEMORY_ONLY_SER:仅在内存中持久化,且将RDD作为序列化的Java对象存储(每个分区一个byte数组).这种方式比MEMORY_ONLY方式要更加节省空间,但会耗费更多的CPU资源进行序列化操作.

  • MEMORY_ONLY_2:仅在内存中持久化,且将数据复制到集群的两个节点中.

  • MEMORY_AND_DISK:同时在内存和磁盘中持久化,且将RDD作为非序列化的Java对象存储.

  • MEMORY_AND_DISK_SER:同时在内存和磁盘中持久化,且将RDD作为序列化的Java对象存储.

  • MEMORY_AND_DISK_2:同时在内存和磁盘中持久化,且将数据复制到集群的两个节点中.

persist示例代码

1:scala>val num=sc.parallelize(0to9,1)//构建RDDnum:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]at parallelize at :122:scala>num.getStorageLeve1//显示RDD当前的持久化状态res8:org.apache.spark.storage.Storagelevel=StorageLevel{false,false,false,false,1)3:scala>num.persist()//使用persist进行默认的MEMORY_ONLY持久化res9:num.type=ParallelCollectionRDD [5] at parallelize at:214:scala>num.getStorageLeve1//显示RDD新的持久化状态res10:org,apache,spark,storage.StorageLevel=StorageLevel(false,true,false,true,1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509520.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构实验之二叉树六:哈夫曼编码

题目描述 字符的编码方式有多种,除了大家熟悉的ASCII编码,哈夫曼编码(Huffman Coding)也是一种编码方式,它是可变字长编码。该方法完全依据字符出现概率来构造出平均长度最短的编码,称之为最优编码。哈夫曼编码常被用于数据文件压…

hdu3790最短路径问题 (Dijkstra算法)

最短路径问题 Time Limit: 2000/1000 MS (Java/Others) Memory Limit: 32768/32768 K (Java/Others) Total Submission(s): 32544 Accepted Submission(s): 9565Problem Description给你n个点,m条无向边,每条边都有长度d和花费p,给你起…

spark master web ui 端口8080被占用解决方法

spark master web ui 端口8080被占用解决方法 Spark master web ui 默认端口为8080,当系统有其它程序也在使用该接口时,启动master时也不会报错,spark自己会改用其它端口,自动端口号加1,但为了可以控制到指定的端口&a…

GDB调试工具使用教程(博客)

http://blog.csdn.net/haoel/article/details/2879

树-堆结构练习——合并果子之哈夫曼树

题目描述 在一个果园里,多多已经将所有的果子打了下来,而且按果子的不同种类分成了不同的堆。多多决定把所有的果子合成一堆。 每一次合并,多多可以把两堆果子合并到一起,消耗的体力等于两堆果子的重量之和。可以看出,…

DataFrame函数介绍

DataFrame函数 文章目录DataFrame函数DataFrame 的函数Action 操作dataframe的基本操作集成查询DataFrame 的函数 Action 操作 collect() ,返回值是一个数组,返回dataframe集合所有的行 collectAsList() 返回值是一个java类型的数组,返回dataframe集合…

GCC编译器和GDB调试器常用选项

GCC编译器 gcc hello.c -o hello #将hello.c编译成hello可执行文件 gcc -E hello.c -o hello.i #将hello.c 转换成预处理后的文件hello.igcc -S hello.c -o hello.S #将hello.c 转换成汇编文件 hello.Sgcc -c hello.c -o hello.…

树结构练习——判断给定森林中有多少棵树

题目描述 众人皆知,在编程领域中,C是一门非常重要的语言,不仅仅因为其强大的功能,还因为它是很多其他面向对象语言的祖先和典范。不过这世上几乎没什么东西是完美的,C也不例外,多继承结构在带来强大功能的同…

Spark RDD分区2G限制

Spark RDD分区2G限制 文章目录Spark RDD分区2G限制问题现象解决方法为什么2G限制个人思(yu)考(jian)问题现象 遇到这个问题时,spark日志会报如下的日志 片段1: 15/04/16 14:13:03 WARN scheduler.TaskSe…

hdu3790最短路径问题(迪杰斯特拉算法+详解+代码)

最短路径问题 Time Limit: 2000/1000 MS (Java/Others) Memory Limit: 32768/32768 K (Java/Others) Total Submission(s): 32544 Accepted Submission(s): 9565Problem Description给你n个点,m条无向边,每条边都有长度d和花费p,给你起…

T型知识结构

传统的知识结构,即仅有某一专业知识的结构。这是惟一的知识结构,或称线性结构。这种知识结构已远远不能适应形势对管理者的要求。新型的人才知识结构通常可分为三角形、宝塔形、衣架型、T型、H型、X型等。前三个类型一般是指专业技术人才,在某…

priority_queueint,vectorint,greaterint优先队列,按照从小到大

原网址&#xff1a; 优先队列 C优先队列的基本使用方法 在优先队列中&#xff0c;优先级高的元素先出队列。 标准库默认使用元素类型的<操作符来确定它们之间的优先级关系。 优先队列的第一种用法&#xff0c;也是最常用的用法&#xff1a; priority_queue<int>qi;通…

Spark stage如何划分

窄依赖和宽依赖 窄依赖&#xff1a; 指父RDD的每一个分区最多被一个子RDD的分区所用&#xff0c;表现为一个父RDD的分区对应于一个子RDD的分区&#xff0c;和两个父RDD的分区对应于一个子RDD 的分区。图中&#xff0c;map/filter和union属于第一类&#xff0c;对输入进行协同…

引出发射和什么是反射和Class类和Class实例、基本类型的字节码对象

引出发射和什么是反射 问题1: 1.对象有编译类型和运行类型Object obj new java.util.Date();编译类型: Object运行类型: java.util.Date需求:通过obj对象,调用java.util.Date类中的toLocaleString方法.obj.toLocaleString(); 此时编译报错, 编译时,会检查该编译类型中是…

GCC常用命令详解

GCC(GNU Compiler Collection)是Linux下最常用的C语言编译器&#xff0c;是GNU项目中符合ANSI C标准的编译系统,能够编译用C、C和Object C等语言编写的程序。同时它可以通过不同的前端模块来支持各种语言&#xff0c;如Java、Fortran、Pascal、Modula-3和Ada等。穿插一个玩笑&a…

判断给定森林中有多少棵树特别版

题目描述 众人皆知&#xff0c;在编程领域中&#xff0c;C是一门非常重要的语言&#xff0c;不仅仅因为其强大的功能&#xff0c;还因为它是很多其他面向对象语言的祖先和典范。不过这世上几乎没什么东 西是完美的&#xff0c;C也不例外&#xff0c;多继承结构在带来强大功能的…

Spark使用HanLP分词

Spark使用HanLP分词 将HanLP的data(包含词典和模型)放到hdfs上&#xff0c;然后在项目配置文件hanlp.properties中配置root的路径&#xff0c;比如&#xff1a;roothdfs://localhost:9000/tmp/ 实现com.hankcs.hanlp.corpus.io.IIOAdapter接口 public static class Hadoop…

获取类中的构造器

需求:通过反射来获取某一个类的构造器: 1):获取该类的字节码对象. 2):从该字节码对象中去找需要获取的构造器. ------------------------------------------------------------------------ Class类获取构造器方法: Constructor类:表示类中构造器的类型,Constructor的实例…

SparkStreaming Kafka 自动保存offset到zookeeper

SparkStreaming Kafka 自动保存offset到zookeeper 场景 spark使用的是1.6&#xff0c;SparkStreaming1.6时候使用的kafka jar包为0.8的&#xff0c;消费时候不记录消费到的信息&#xff0c;导致重复消费&#xff0c;故手动保存到zookeeper&#xff0c;SparkStreaming2.1.1时使…

数据结构实验之查找一:二叉排序树

题目描述 对应给定的一个序列可以唯一确定一棵二叉排序树。然而&#xff0c;一棵给定的二叉排序树却可以由多种不同的序列得到。例如分别按照序列{3,1,4}和{3,4,1}插入初始为空的二叉排序树&#xff0c;都得到一样的结果。你的任务书对于输入的各种序列&#xff0c;判断它们是否…