常用Action类算子列表
- reduce(func): 通过func函数来对RDD中所有元素进行聚合运算,先运算分区内数据,再运算分区间数据。
scala> val rdd1 = sc.makeRDD(1 to 100)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at :24# 对1到100进行求和scala> rdd1.reduce(_+_)res4: Int = 5050
- collect(): 在驱动程序Driver中或存储到HDFS等存储工具中,以数组的形式返回数据集的所有元素。
scala> val rdd1 = sc.makeRDD(1 to 10)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :24scala> rdd1.collect()res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val rdd1 = sc.makeRDD(1 to 100)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at :24scala> rdd1.count()res6: Long = 100
scala> val rdd1 = sc.makeRDD(5 to 20)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at makeRDD at :24scala> rdd1.first()res7: Int = 5
- take(num): 返回RDD中前num个元素组成的数据,当num=1时,相当于first()
scala> val rdd1 = sc.makeRDD(1 to 10)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at :24scala> rdd1.take(5)res8: Array[Int] = Array(1, 2, 3, 4, 5)
- takeOrdered(n): 返回原RDD排序(默认升序排)后,前n个元素组成的数组。
scala> val rdd1 = sc.makeRDD(Array(1,4,6,2,5,8,3,6,9))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at makeRDD at :24scala> rdd1.takeOrdered(5)res9: Array[Int] = Array(1, 2, 3, 4, 5)
- aggregate(zeroValue)(seqOp, combOp): 与key-value类型RDD的转换算子aggregateByKey类似,同样是三个参数:初始值zeroValue,分区内函数seqOp,分区间函数combOp,只不过它是一个action算子,触发执行。
scala> val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6),2)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at makeRDD at :24scala> import scala.math._import scala.math._scala> rdd1.aggregate(4)(max(_,_),_+_)res10: Int = 14
- countByKey(): 前面count执行算子是计算RDD中元素个数,countByKey是针对(K,V)类型的RDD,返回一个(K,Int)类型的map,表示每一个key对应的元素个数。
scala> val rdd1 = sc.parallelize(Array(("a",100),("a",200),("a",300),("b",1),("b",10),("c",1000)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[12] at parallelize at :27# Key为"a"的有3个,key为"b"的有两个,key为以的有"1"个,与value无关scala> rdd1.countByKey()res11: scala.collection.Map[String,Long] = Map(b -> 2, a -> 3, c -> 1)
- countByValue(): 对RDD中的(k,v)整体进行计数
scala> val rdd1 = sc.parallelize(Array(("a",100),("a",200),("a",200),("b",10),("b",10),("c",1000)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at parallelize at :27scala> rdd1.countByValue()res12: scala.collection.Map[(String, Int),Long] = Map((c,1000) -> 1, (a,200) -> 2, (b,10) -> 2, (a,100) -> 1)
- foreach(func): 遍历原RDD元素经过func函数运算过后的结果集
scala> val rdd1 = sc.makeRDD(1 to 10)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at makeRDD at :27scala> rdd1.foreach(println)12345678910
- foreachPartition(func): 按分区遍历原RDD元素经过func函数运算过后的结果, 尤其在这里有插入数据的动作时,需要频繁地创建连接与关闭连接,所以一个分区一个分区的处理效率高,减少gc
scala> val rdd1 = sc.makeRDD(1 to 10, 3)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at makeRDD at :27# 一个分区只创建一次连接scala> rdd1.foreachPartition(iter => {println("打开数据库连接");iter.foreach(x=>{println("拼拉sql, 插入数据库");x});println("关闭数据库连接")})打开数据库连接拼拉sql, 插入数据库拼拉sql, 插入数据库拼拉sql, 插入数据库关闭数据库连接打开数据库连接拼拉sql, 插入数据库拼拉sql, 插入数据库拼拉sql, 插入数据库拼拉sql, 插入数据库关闭数据库连接打开数据库连接拼拉sql, 插入数据库拼拉sql, 插入数据库拼拉sql, 插入数据库关闭数据库连接