RDD中的所有转换(Transformation)算子都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。
常用Transformation类算子列表
常用Transformation类算子实例
- map(func): 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成, map操作是一对一操作,每进去一个元素,就出来一个元素
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :24# 对每个元素乘以10返回新的rdd2scala> val rdd2 = rdd1.map(_*10)rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at :25scala> rdd2.collectres1: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)# 对每个元素拼接一个字符串,返回新的String类型的RDDscala> val rdd3 = rdd1.map(_+"@map.com")rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at map at :25scala> rdd3.collectres3: Array[String] = Array(1@map.com, 2@map.com, 3@map.com, 4@map.com, 5@map.com, 6@map.com, 7@map.com, 8@map.com, 9@map.com, 10@map.com)
- filter(func): 过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成, RDD元素的类型不会改变。
scala> val rdd1 = sc.parallelize(Array("乔峰","段誉","虚竹","鸠摩智","达摩祖师"))rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at :24# filter中为true的会被保留,会false的会被过滤scala> val rdd2 = rdd1.filter(!_.contains("摩"))rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at filter at :25scala> rdd2.collectres4: Array[String] = Array(乔峰, 段誉, 虚竹)
- flatMap(func): 压平。类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
scala> val rdd1 = sc.parallelize(Array("say you say me say it together","good good study day day up"))rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at :24# 进去一条,出来多条,是一对多的转换scala> val rdd2 = rdd1.flatMap(_.split(" "))rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at flatMap at :25scala> rdd2.collectres5: Array[String] = Array(say, you, say, me, say, it, together, good, good, study, day, day, up)
集合类Transformation算子实例
- union(otherRDD): 对源RDD和参数RDD求并集后返回一个新的RDD, 需要两个RDD
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(2,3,4,5,6))rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at :24# 求两个RDD的并集scala> val rdd3 = rdd1.union(rdd2)rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[11] at union at :27scala> rdd3.collectres6: Array[Int] = Array(1, 2, 3, 4, 5, 2, 3, 4, 5, 6)
- subtract(otherRDD): 对源RDD和参数RDD求差集后返回一个新的RDD, 需要两个RDD
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(2,3,4,5,6))rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at :24scala> val rdd3 = rdd1.subtract(rdd2)rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at subtract at :27# rdd1与rdd2的差集是"1"scala> rdd3.collectres7: Array[Int] = Array(1)# rdd2与rdd1的差集是"6"scala> val rdd4 = rdd2.subtract(rdd1)rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at subtract at :27scala> rdd4.collect()res8: Array[Int] = Array(6)
- intersection(otherRDD): 对源RDD和参数RDD求交集后返回一个新的RDD, 需要有两个RDD
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(2,3,4,5,6))rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at :24# 求两个RDD的交集返回新的RDDscala> val rdd3 = rdd1.intersection(rdd2)rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[25] at intersection at :27scala> rdd3.collect()res9: Array[Int] = Array(4, 3, 5, 2)
- distinct(): 对源RDD进行去重后返回一个新的RDD, 只需要一个RDD
scala> val rdd1 = sc.parallelize(Array(1,1,1,2,2,2,3,3,3))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at :24# 在一个RDD中实现去重功能scala> val rdd2 = rdd1.distinct()rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[29] at distinct at :25scala> rdd2.collect()res10: Array[Int] = Array(1, 3, 2)
其底层的实现原理(如下面Java代码所示)是:mapToPair+reduceByKey+mapToPair =>
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.*;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;/** * distinct: 对RDD中的元素去重 */public class distinctOperator { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local") .setAppName("distinct"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("WARN"); JavaRDD rdd1 = sc.parallelize(Arrays.asList( "a", "a", "a", "a", "b", "b", "b", "b" )); /** * 传统方式实现RDD元素去重需要三步 * 第一步:把RDD转换成K,V格式的RDD, K为元素,V为1 * 每二步:对K,V格式的RDD的Key进行分组计算 * 第三步:对得到的RDD只取第一位键 */ // [(a,1),(a,1),(a,1),(a,1),(b,1),b,1),b,1),b,1)] JavaPairRDD mapToPairRDD = rdd1.mapToPair(new PairFunction() { @Override public Tuple2 call(String s) throws Exception { return new Tuple2(s, 1); } }); //对每个key进行聚合 //[(a,4),(b,4)] JavaPairRDD reduceRDD = mapToPairRDD.reduceByKey(new Function2() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //只取键,不要值 JavaRDD mapRDD = reduceRDD.map(new Function, String>() { @Override public String call(Tuple2 tuple) throws Exception { return tuple._1; } }); mapRDD.foreach(new VoidFunction() { @Override public void call(String s) throws Exception { System.out.println(s); } }); System.out.println("-----------------------------------"); //使用Spark提供的算子distinct实现RDD元素去重 JavaRDD distinctRDD = rdd1.distinct(); distinctRDD.foreach(new VoidFunction() { @Override public void call(String s) throws Exception { System.out.println(s); } }); sc.stop(); }}
分组类的转换算子
groupByKey([numTasks]): 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD。偏底层scala> val rdd1 = sc.parallelize(List(("张军",1000),("李军",2500),("王军",3000),("张军",1500)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at :24scala> val rdd2 = rdd1.groupByKey()rdd2: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[31] at groupByKey at :25scala> rdd2.collect()res11: Array[(String, Iterable[Int])] = Array((王军,CompactBuffer(3000)), (张军,CompactBuffer(1000, 1500)), (李军,CompactBuffer(2500)))
- reduceByKey(func, [numTasks]): 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置。调用groupByKey。
scala> val rdd1 = sc.parallelize(Array(("red",10),("red",20),("red",30),("red",40),("red",50),("yellow",100),("yellow",100)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at :24# 按照key进行聚合操作scala> val rdd2 = rdd1.reduceByKey(_+_)rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at reduceByKey at :25scala> rdd2.collect()res12: Array[(String, Int)] = Array((yellow,200), (red,150))
- cogroup(otherRDD, [numTasks]): 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
scala> val rdd1 = sc.parallelize(Array(("张飞","丈八蛇矛"),("关羽","青龙偃月刀"),("吕布","方天画戟")))rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(("张飞",30),("关羽",35),("吕布",45),("刘备",42)))rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[35] at parallelize at :24scala> val rdd3=rdd1.cogroup(rdd2)rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[Int]))] = MapPartitionsRDD[37] at cogroup at :27scala> rdd3.collect()res13: Array[(String, (Iterable[String], Iterable[Int]))] = Array((吕布,(CompactBuffer(方天画戟),CompactBuffer(45))), (关羽,(CompactBuffer(青龙偃月刀),CompactBuffer(35))), (张飞,(CompactBuffer(丈八蛇矛),CompactBuffer(30))), (刘备,(CompactBuffer(),CompactBuffer(42))))
排序类Transformation算子
sortBy(func,[ascending], [numTasks]): 与sortByKey类似,但是更灵活scala> val rdd1=sc.parallelize(Array(10,9,8,7,4,6,5,3,1,2))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at :24scala> val rdd2=rdd1.sortBy(x=>x,true)rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[41] at sortBy at :25scala> rdd2.collect()res14: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)# K, V格式的RDDscala> val rdd1=sc.parallelize(Array(("张飞",30),("刘备",42),("关羽",32),("曹操",46),("公孙瓒",62)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at parallelize at :24scala> val rdd2=rdd1.sortBy(tuple=>tuple._2, false)rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[45] at sortBy at :25scala> rdd2.collect()res15: Array[(String, Int)] = Array((公孙瓒,62), (曹操,46), (刘备,42), (关羽,32), (张飞,30))
sortByKey([ascending], [numTasks]): 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDDscala> val rdd1=sc.parallelize(Array(("张飞",30),("刘备",42),("关羽",32),("曹操",46),("公孙瓒",62)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at parallelize at :24# 同样对rdd1调用,需要进行转换scala> val rdd2=rdd1.map(tuple=>tuple.swap).sortByKey(false).map(tuple=>tuple.swap)rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[48] at map at :25scala> rdd2.collect()res16: Array[(String, Int)] = Array((公孙瓒,62), (曹操,46), (刘备,42), (关羽,32), (张飞,30))
高级类的转换算子
- mapPartitionWithIndex(func): 类似于mapPartitions, 但是func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) => Iterator[U]。其功能是对RDD中的每个分区进行操作,带有索引下标,可以取到分区号。
- func: 接收两个参数,第一个参数代表分区号,第二参数代表分区中的元素。
scala> val rdd1 = sc.parallelize(List("son1","son2","son3","son4","son5","son6","son7","son8","son9","son10","son11","son12"),4)rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at :24scala> val rdd2 = rdd1.mapPartitionsWithIndex((index, iter) => {iter.toList.map(x=> "【分区号为:"+index+", 值为:" + x+ "】").iterator})rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at mapPartitionsWithIndex at :25scala> rdd2.collect()res0: Array[String] = Array(【分区号为:0, 值为:son1】, 【分区号为:0, 值为:son2】, 【分区号为:0, 值为:son3】, 【分区号为:1, 值为:son4】, 【分区号为:1, 值为:son5】, 【分区号为:1, 值为:son6】, 【分区号为:2, 值为:son7】, 【分区号为:2, 值为:son8】, 【分区号为:2, 值为:son9】, 【分区号为:3, 值为:son10】, 【分区号为:3, 值为:son11】, 【分区号为:3, 值为:son12】)
- aggregateByKey: 后面有单独文章讲解此算子