Spark RDD算子
转换算子(Transformation Operators)
类别 | 算子名称 | 简要介绍 |
---|---|---|
映射类算子 | map | 对RDD中的每个元素进行操作,返回一个新的RDD |
flatMap | 类似于map,但每个输入元素可映射到0或多个输出元素 | |
mapPartitions | 对RDD的每个分区中的元素进行操作,返回一个新的RDD | |
mapPartitionsWithIndex | 类似于mapPartitions,但提供了分区索引 | |
mapValues | 对键值对RDD中的每个值进行操作,返回一个新的键值对RDD | |
flatMapValues | 对键值对RDD中的每个值进行操作,每个值可映射到0或多个输出元素 | |
过滤类算子 | filter | 过滤出满足条件的元素,返回一个新的RDD |
distinct | 去除重复元素,返回一个新的RDD | |
分组类算子 | groupBy | 根据指定的函数对RDD中的元素进行分组,返回一个新的RDD |
groupByKey | 对键值对RDD中的值进行分组,返回一个新的键值对RDD | |
reduceByKey | 对具有相同键的值进行合并,返回一个新的键值对RDD | |
aggregateByKey | 使用指定的聚合函数和中间结果进行聚合,返回一个新的键值对RDD | |
combineByKey | 类似于aggregateByKey,但允许更灵活的聚合过程 | |
cogroup | 对两个或多个键值对RDD进行分组,返回一个新的RDD | |
排序类算子 | sortByKey | 根据键对键值对RDD进行排序,返回一个新的键值对RDD |
sortBy | 根据指定的函数对RDD进行排序,返回一个新的RDD | |
repartitionAndSortWithinPartitions | 对RDD进行重新分区并在每个分区内排序,返回一个新的RDD | |
连接类算子 | union | 合并两个RDD,返回一个新的RDD |
intersection | 计算两个RDD的交集,返回一个新的RDD | |
subtract | 从一个RDD中移除另一个RDD中的元素,返回一个新的RDD | |
cartesian | 计算两个RDD的笛卡尔积,返回一个新的RDD | |
join | 根据键连接两个键值对RDD,返回一个新的键值对RDD | |
leftOuterJoin | 左外连接两个键值对RDD,返回一个新的键值对RDD | |
rightOuterJoin | 右外连接两个键值对RDD,返回一个新的键值对RDD | |
fullOuterJoin | 全外连接两个键值对RDD,返回一个新的键值对RDD | |
分区类算子 | sample | 随机采样RDD中的元素,返回一个新的RDD |
randomSplit | 随机划分RDD中的元素,返回一个RDD数组 | |
pipe | 将RDD的每个分区作为外部进程的输入,返回一个新的RDD | |
coalesce | 减少RDD的分区数,返回一个新的RDD | |
repartition | 增加或减少RDD的分区数,并进行数据洗牌,返回一个新的RDD | |
partitionBy | 根据指定的分区器对键值对RDD进行分区,返回一个新的键值对RDD |
行动算子(Action Operators)
类别 | 算子名称 | 简要介绍 |
---|---|---|
聚合类算子 | reduce | 对RDD中的元素进行归约操作,返回一个结果 |
fold | 类似于reduce,但带有初始值 | |
aggregate | 使用指定的聚合函数和中间结果对RDD进行聚合,返回一个结果 | |
计数类算子 | count | 计算RDD中的元素数量,返回一个长整型结果 |
countByKey | 计算每个键的出现次数,返回一个键值对Map | |
countByValue | 计算每个值的出现次数,返回一个值对Map | |
收集类算子 | collect | 将RDD的所有元素收集到驱动程序,返回一个数组 |
collectAsMap | 将键值对RDD的所有元素收集到驱动程序,返回一个Map | |
take | 获取RDD的前n个元素,返回一个列表 | |
takeSample | 随机获取RDD的样本,返回一个列表 | |
takeOrdered | 获取排序后的RDD的前n个元素,返回一个列表 | |
获取类算子 | first | 获取RDD的第一个元素,返回一个元素 |
top | 获取排序后的RDD的前n个元素,返回一个列表 | |
遍历类算子 | foreach | 对RDD中的每个元素执行操作,不返回结果 |
foreachPartition | 对RDD的每个分区中的元素执行操作,不返回结果 | |
保存类算子 | saveAsTextFile | 将RDD保存到文本文件中 |
saveAsSequenceFile | 将RDD保存为序列文件 | |
saveAsObjectFile | 将RDD保存为对象文件 | |
saveAsHadoopFile | 将RDD保存为Hadoop文件 | |
saveAsNewAPIHadoopFile | 将RDD保存为新API的Hadoop文件 | |
saveAsHadoopDataset | 将RDD保存为Hadoop数据集 |
一、转换算子(Transformation Operators)
1、逐条处理
// val rdd2: RDD[U] = rdd.map(f: T=>U)
val data = Array("word1", "word2", "word3")
// 将单词 word 映射为元组 (word, 1)
val rdd: RDD[(String, Int)] = sc.parallelize(data).map(word => (word, 1))
2、扁平化处理
val sentences = List("Hello world", "Apache Spark is awesome", "FlatMap example")
// 将一个句子拆分为多个单词,1=>多,并将所有的结果元素合并为一个新的 RDD
val rdd: RDD[String] = sc.parallelize(sentences).flatMap(word => word.split(" "))
3、分区内逐行处理
【✔ 】:以分区为单位(分区不变)逐行处理数据
map VS mapPartitions
1、数量:前者一进一出IO数量一致,后者多进多出IO数量不一定一致
2、性能:前者多分区逐条处理,后者各分区并行逐条处理更佳,常时间占用内存易导致OOM,内存小不推荐
3、类型:两者入口和出口类型都必须一致,后者进出都必须是迭代器// 在同一个 mapPartitions 操作中进行过滤和转换操作,可以减少对数据的多次遍历,从而提高性能。 mapParitions( // ≈ 子查询 it.filter(...) // 谓词下缀 )// 分别使用 mapPartitions 和 filter 进行转换和过滤操作,增加计算开销。 mapParitions(...) fielter(...) // where
// 一个包含整数的 RDD,过滤出偶数并将其乘以 2。
val data = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
val rdd: RDD[Int] = sc.parallelize(data, 3) // 创建3个分区
// 使用 mapPartitions 进行过滤和转换
val resultRDD: RDD[Int] = rdd.mapPartitions { it =>val filtered = it.filter(_ % 2 == 0) // 过滤偶数filtered.map(_ * 2) // 将每个偶数乘以 2
}// 【分区内逐行处理】:以分区为单位(分区不变)逐行处理数据,并追加分区编号
val rdd2: RDD[U] = rdd.mapPartitionsWithIndex(f:(Int,Iterator[T])=>Iterator[U][,preservePar:Boolean])
// mapPartitionsWithIndex 功能同mapPartitions,区别在于追加了分区编号,一般用于去掉表头和分析调试
4、转内存数组
分区的数据转为同类型的内存数组,分区不变 rdd:RDD[T]
// val rdd2: RDD[Array[T]] = rdd.glom();
// 创建一个包含 1 到 100 的整数 RDD,分为 5 个分区
val rdd = sc.parallelize(1 to 100, 5)
// 使用 glom() 将每个分区的元素合并成一个数组
val glommedRDD = rdd.glom()
// 输出每个分区合并后的数组
glommedRDD.collect().foreach { partition =>println("Partition contains: " + partition.mkString(", "))
}
5、数据过滤
过滤规则 f:T=>Boolean,保留满足条件数据,分区不变,不推荐用
【数据可能倾斜】某些分区的数据量远远超过了其他分区,造成数据分布不均匀
// val rdd2: RDD[T] = rdd.filter(f:T=>Boolean)
// 创建一个包含 1 到 10 的整数 RDD
val rdd = sc.parallelize(1 to 10)
// 使用 filter 过滤出偶数
val evenRDD = rdd.filter(num => num % 2 == 0)
6、数据分组
同键同组同区,同区可多组;打乱shuffle,按f:T=>K规则,分区不变,【数据可能倾斜】
// val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K)
// val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K, partioner:Partitioner)
// val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K, numPartitions:Int)// 定义一个自定义分区器,根据奇偶数进行分区
class OddEvenPartitioner(numPartitions: Int) extends Partitioner {override def numPartitions: Int = numPartitionsoverride def getPartition(key: Any): Int = {val k = key.toStringif (k == "even") 0 else 1}
}
// 创建一个包含 1 到 10 的整数 RDD
val rdd = sc.parallelize(1 to 10)
// 使用 groupBy 方法,根据奇偶数进行分组
val groupedRDD = rdd.groupBy(num => if (num % 2 == 0) "even" else "odd", new OddEvenPartitioner(2))
7、数据抽样
函数名:
sample
参数:
withReplacement:Boolean
是否有放回抽样
fraction:Double
抽样率
seed:Long
随机种子,默认为当前时间戳(一般缺省)
无放回抽样:sample(false, 0.4)
=> 抽样40%的数据,40条左右
有放回抽样:sample(true, 0.4)
=> 每条数据被抽取的概率为40% (可能有重复的元素)
// val rdd2: RDD[T] = rdd.sample(withReplacement:Boolean, fraction:Double, seed:Long)
// 创建一个包含 1 到 10 的整数 RDD
val rdd = sc.parallelize(1 to 10)
// 使用 sample 方法,从 RDD 中有放回随机抽样抽中每个数概率为0.9
val sampledRDD = rdd.sample(withReplacement = true, fraction = 0.9)
8、数据去重
采用该方法去重,数据规模比较大的情况下,数据压力比较大,因为数据需要在不同的分区间比较
一般采用分组的方式,将去重字段作为分组字段,在不同的分区内并行去重
val rdd2: RDD[T] = rdd.distinct()
// numPartitions: Int 设定去重后的分区数
// 隐式参数 order: Ordering[T] 用于指定元素类型 T 的排序方式,以便在对元素进行去重时进行比较。
val rdd2: RDD[T] = rdd.distinct(numPartitions: Int)(implicit order: Ording[T] = null)
9、数据排序
处理数据f:T=>K,升降序asc: Boolean,分区数numPartitions:Int
默认排序前后分区一致,【有shuffle】,除非重新设定 numPartitions
全局排序,多分区间交换数据,效率较低。优化见 PairRDD
若:K为基本类型,则无需提供第二参数列表中的隐式参数 ord: Ordering[K]
若:K为自定义类型,则必须提供第二参数
case class Student(stu_id: Int, stu_name: String, stu_class: String)
val stuData = Seq(Student(1, "李明", "ccc"),Student(2, "小刚", "bbb"),Student(3, "小红", "aaa"),Student(4, "张三", "aaa")
)
sortBy
// val rdd: RDD[T] = rdd.sortBy(f:T=>K,asc:Boolean,numPartitions:Int)(implicit ord: Ordering[K], ctag: ClassTag[K])val rdd = sc.parallelize(stuData)
// 使用 sortBy 方法对 RDD 按照 stu_class 进行排序,升序,指定分区数为 2
val sortedRdd: RDD[Student] = rdd.sortBy(stu => stu.stu_class, ascending = true, 2)// 按照班级升序排序,如果班级相同则按照学生 ID 降序
val sortedRdd: RDD[Student] = rdd.sortBy(s => s, ascending = true, numPartitions = 2)(Ordering.by(s => (s.stu_class, -s.stu_id)),ClassTag(classOf[Student]) // 显式地传递了 ClassTag 参数 ClassTag(classOf[Student]),以确保在运行时能获取 Student 类型信息。
)
sortByKey
// 按照班级升序排序,如果班级相同则按照学生 ID 降序
val rdd = sc.parallelize(stuData)
// 将 RDD 转换为键值对 RDD,其中键是 (stu_class, -stu_id)
val pairRDD: RDD[((String, Int), Student)] = rdd.map(student => ((student.stu_class, -student.stu_id), student))
// 使用 sortByKey 方法对键进行排序
val sortedPairRDD: RDD[((String, Int), Student)] = pairRDD.sortByKey()
// 取出排序后的 Student 对象
val sortedRDD: RDD[Student] = sortedPairRDD.map(_._2)
10、交并补差
多个类型 RDD[T]:纵向
交并差操作:数据类型一致,根据元素 equals 认定是否相同
【自定义类型】:必须重写 equals 方法,因为默认等值判断 == 判断地址
拉链操作:要求分区数和分区内的数据量一致
val rdd1: RDD[Int] = sc.makeRDD(Seq(1, 3, 4, 6, 4, 3, 33, 31))
val rdd2: RDD[Int] = sc.makeRDD(Seq(2, 2, 4, 5, 1, 22, 3, 1))
// 【求交集】:重载可重设分区数numPartitions: Int,或定义分区规则par: Partitioner[T]
val rdd3: RDD[T] = rdd1.intersection(rdd2:RDD[T])
val rdd3: RDD[T] = rdd1.intersection(rdd2:RDD[T], numPartitions:Int)
val rdd3: RDD[T] = rdd1.intersection(rdd2:RDD[T], par:Partitioner[T])
// 【求并集】:不去重
val rdd3: RDD[T] = rdd1.union(rdd2:RDD[T])
// 【求差集】:重载可重设分区数numPartitions:Int,或定义分区规则par:Partitioner[T]
val rdd3: RDD[T] = rdd1.subtract(rdd2:RDD[T])
val rdd3: RDD[T] = rdd1.subtract(rdd2:RDD[T], numPartitions:Int)
val rdd3: RDD[T] = rdd1.subtract(rdd2:RDD[T], par:Partitioner[T])
11、拉链操作
val rdd1: RDD[Int] = sc.makeRDD(Seq(1, 3, 4, 6, 4, 3, 33, 31))
val rdd2: RDD[Int] = sc.makeRDD(Seq(2, 2, 4, 5, 1, 22, 3, 1))val rdd2: RDD[(T,U)] = rdd1.zip(rdd2:RDD[U])
val rdd2: RDD[(T,Long)] = rdd1.zipWithIndex()
val rdd2: RDD[(T,Long)] = rdd1.zipWithUniqueId()
// 有三个重载:1+1,1+2,1+3
val rdd2: RDD[V]=rdd.zipPartitions(rddA:RDD[A])(f:(Iterator[T],Iterator[A])=>Iterator[V])
val rdd2: RDD[V]=rdd.zipPartitions(rddA:RDD[A],preserveParitioning:Boolean)(f:(Iterator[T],Iterator[A])=>Iterator[V])
12、键值映射类算子
map
case class Student(stu_id: Int, stu_name: String, stu_class: String)
val stuData = Seq(Student(1, "李明", "ccc"),Student(2, "小刚", "bbb"),Student(3, "小红", "aaa"),Student(4, "张三", "aaa")
)
val rdd = sc.parallelize(stuData)// 使用 map 对 RDD 中的每个 Student 对象的 stu_id 增加 1
val resultRdd1 = rdd.map(stu => stu.copy(stu_id = stu.stu_id + 1))// Student 对象转换为一个包含 (stu_id, stu_name) 元组的 RDD
val resultRdd2 = rdd.map(stu => (stu.stu_id, stu.stu_name))
mapPartitions
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices = 2)
val resultRDD = rdd.mapPartitions(iter => {// 对每个分区的元素求和val sum = iter.sum// 返回求和结果作为新的分区Iterator(sum)
})
resultRDD.collect().foreach(println) // 输出为6 15
13、键值分组聚合类
reduceByKey
// reduceByKey + foldByKey + aggregateByKey 都调用 combineByKeyClassTag
// 【按键聚合值】: combiner和reduce的值类型相同,计算规则相同
// group by + combiner + reduce
val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(f:(V,V)=>V)
val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(f:(V,V)=>V, numPartitions:Int)
val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(partitioner:Partitioner, f:(V,V)=>V)val scoresRDD = sc.parallelize(Seq(("Alice", 80),("Bob", 90),("Alice", 85),("Bob", 95),("Alice", 75)
))
val totalScoresRDD = scoresRDD.reduceByKey((score1, score2) => score1 + score2)
// 简写 val totalScoresRDD = scoresRDD.reduceByKey(_+_)
totalScoresRDD.collect().foreach(println) // 输出(Alice,240) (Bob,185)
foldByKey
// 【按键聚合值】: combiner和reduce的值类型相同,计算规则相同,带初值
val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V)(inParOp:(V,V)=>V)
val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V,numPartitions:Int)(inParOp:(V,V)=>V)
val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V,partitioner:Partitioner)(inParOp:(V,V)=>V)
// 数据同上 reduceByKey
val initialScores = 0
val totalScoresRDD = scoresRDD.foldByKey(initialScores)((score1, score2) => score1 + score2)
aggregateByKey
// 与 reduceByKey 和 foldByKey 不同,aggregateByKey 允许指定一个初始值和两个聚合函数,可以对每个键的值进行更复杂的聚合操作。
// 【✔ 按键分别执行分区内和分区间计算】: combiner和reduce的值类型可不同,计算规则可不同
val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U,numPartitions:Int)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U,partitioner:Partitioner)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val rddPair2: RDD[(String, Float)] = rddPair.aggregateByKey(0.0f)(_+_,_+_)val resultRDD = pairRdd.aggregateByKey(zeroValue)((acc, value) => {// 对每个分区中的值进行聚合操作// 初始值为 zeroValue// 返回值将作为每个分区聚合的结果// 这个函数在每个分区内执行// acc 是聚合的累加器,value 是当前处理的值// 返回一个新的累加器},(acc1, acc2) => {// 对多个分区的聚合结果进行合并操作// acc1 和 acc2 是两个分区的聚合结果// 返回合并后的结果}
)
val scoresRDD = sc.parallelize(Seq(("Alice", 80),("Bob", 90),("Alice", 85),("Bob", 95),("Alice", 75)
))
val zeroValue = (0, 0) // 初始值为元组 (sum, count)
val resultRDD = scoresRDD.aggregateByKey(zeroValue)((acc, score) => {// 分区内聚合函数,计算每个学生的总成绩和数量(acc._1 + score, acc._2 + 1)},(acc1, acc2) => {// 分区间聚合函数,合并每个学生的总成绩和数量(acc1._1 + acc2._1, acc1._2 + acc2._2)}
)
// 每个学生的总成绩和数量
resultRDD.foreach(println) // (Alice,(240,3)) (Bob,(185,2))
combineByKey
// 【按键分别执行分区内和分区间计算】: combiner和reduce的值类型可不同,计算规则可不同
val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U,numPartitions:Int)
val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U,partitioner:Partitioner,mapSideCombine:Boolean,serializer:Serializer)val resultRDD = pairRDD.combineByKey((value) => {// 创建组合器函数,用于将每个值转换为另一种形式(通常是一个累加器或缓冲区)// 返回值将作为每个键的第一个值的处理结果},(acc, value) => {// 合并值函数,用于将每个值与累加器或缓冲区进行合并// 返回值将作为每个键的聚合结果},(acc1, acc2) => {// 合并累加器函数,用于合并两个累加器或缓冲区// 返回值将作为多个分区的聚合结果}
)
val scoresRDD = sc.parallelize(Seq(("Alice", 80),("Bob", 90),("Alice", 85),("Bob", 95),("Alice", 75)
))val resultRDD = scoresRDD.combineByKey((score) => (score, 1), // 创建组合器,初始值为 (成绩, 1),表示总成绩和数量(acc, score) => (acc._1 + score, acc._2 + 1), // 合并值,累加总成绩和数量(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // 合并累加器,合并总成绩和数量
)// 计算平均成绩
val avgScoresRDD = resultRDD.mapValues(sumCount => sumCount._1.toDouble / sumCount._2)avgScoresRDD.collect().foreach(println)
groupByKey
// groupByKey 会将具有相同键的元素分组在一起,但是这个操作可能会导致数据倾斜,因为它会将所有具有相同键的元素放在同一个分区中。
// 【✔ 按键分组】
val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey()
val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey(numPartitions:Int)
val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey(partitioner:Partitioner)val scoresRDD = sc.parallelize(Seq(("Alice", 80),("Bob", 90),("Alice", 85),("Bob", 95),("Alice", 75)
))val groupedRDD = scoresRDD.groupByKey()groupedRDD.collect().foreach { case (name, scores) =>println(s"$name: ${scores.mkString(", ")}")
}
14、关联聚合
groupWith
// groupWith 对两个 RDD 中具有相同键的元素进行分组,返回一个新的 RDD,其中每个键对应的值是一个包含两个 RDD 中相同键的元素组成的元组。
// 【多数据集分组】:1VN 同键同组,不同RDD值进入TupleN的不同Iterable
-------------------------------------------------------------------------------
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.groupWith(otherA: RDD[(K,V1)])
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1],Iterable[V2])] = pairRdd.groupWith(otherA: RDD[(K,V1)],otherB: RDD[(K,V2)])
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1],Iterable[V2],Iterable[V3])] = pairRdd.groupWith(otherA: RDD[(K,V1)],otherB: RDD[(K,V2)],otherC: RDD[(K,V3)])val scoresRDD1 = sc.parallelize(Seq(("Alice", 80),("Bob", 90),("Alice", 85),("Bob", 95),("Alice", 75)
))val scoresRDD2 = sc.parallelize(Seq(("Alice", "Good"),("Bob", "Excellent"),("Alice", "Good"),("Bob", "Excellent"),("Alice", "Good")
))val groupedRDD = scoresRDD1.groupWith(scoresRDD2)groupedRDD.collect().foreach { case (name, (scores1, scores2)) =>println(s"$name: (${scores1.mkString(", ")}) | (${scores2.mkString(", ")})")
}
cogroup
// 重载 1+1 1+2 1+3,追加再分区操作
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)])
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)],numPartitions:Int)
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)],partitioner:Partitioner)
// 示例同groupWith,两者区别:cogroup可对分组后的元素进行更复杂的操作,如在分组后对每个键的值进行聚合计算。
15、关联操作
【关联操作】:1V1
横向,根据键做关联
重载:numPartitions:Int 或 partitioner:Partitioner
val pairRdd: RDD[(K, (V, V1))] = pairRdd1.join(pairRdd3:RDD[(K,V1)])
val pairRdd: RDD[(K, (V, Option[V1]))] = pairRdd1.leftOuterJoin(pairRdd3:RDD[(K,V1)])
val pairRdd: RDD[(K, (Option[V]), V1)] = pairRdd1.rightOuterJoin(pairRdd3:RDD[(K,V1)])
val pairRdd: RDD[(K, (Option[V]), Option[V1])] = pairRdd1.fullOuterJoin( pairRdd3:RDD[(K,V1)])// 创建两个 RDD
val rdd1 = sc.parallelize(Seq((1, "Alice"),(2, "Bob"),(3, "Charlie")
))
val rdd2 = sc.parallelize(Seq((1, 25),(3, 30),(4, 35)
))
// join:内连接
val innerJoinRDD = rdd1.join(rdd2)
// leftOuterJoin:左外连接
val leftJoinRDD = rdd1.leftOuterJoin(rdd2)
// rightOuterJoin:右外连接
val rightJoinRDD = rdd1.rightOuterJoin(rdd2)
// fullOuterJoin:全外连接
val fullOuterJoinRDD = rdd1.fullOuterJoin(rdd2)
二、行动算子(Action Operators)
【返回】所有元素分别在分区间和分区内执行【聚集】操作的结果
reduce & fold 分区内和分区间执行相同操作,且类型与元素类型一致
aggregate 分区内和分区间执行不同操作,且类型与元素类型不一致
val rst:T = rdd.reduce(f:(T,T)=>T)
val rst:T = rdd.fold(init:T)(f:(T,T)=>T)
val rst:U = rdd.aggregate(init:U)(f:(U,T)=>U,f:(U,T)=>U)
val array:Array[T] = rdd.collect() // 返回包含数据集中所有元素的数组val rst:Long = rdd.count() // 返回数据集中元素数量val rst:Map[K,Long] = pairRdd.countByKey() // 返回一个包含键值对的 Map,其中键是 RDD 中的键,值是对应键出现的次数。val rst:T = rdd.max() // 返回数据集中最大值val rst:T = rdd.min() // 返回数据集中最小值val rst:T = rdd.first() // 返回数据集中的第一个元素val array:Array[T] = rdd.take(num:Int) // 返回数据集中的前 num 个元素val array:Array[T] = rdd.takeOrdered(num:Int)(implicit ord:Ordering[T]) // 返回排序后数据集中的前 num 个元素rdd.foreach(f:T=>Unit) // 遍历迭代
rdd.foreachPartition(f:Iterable[T]=>Unit) // 写数据库操作首选
持久化至文本文件,重载追加压缩功能
import org.apache.hadoop.io.compress.{BZip2Codec, SnappyCodec} import io.airlift.compress.lzo.LzopCodec rdd.saveAsTextFile("out_path",classOf[BZip2Codec])
rdd.saveAsTextFile(path:String)
rdd.saveAsTextFile(path:String,codec: Class[_ <: CompressionCodec])
rdd.saveAsObjectFile(path:String)
三、Spark RDD 并行度控制
默认的并行度:200
分区数的体现方式:
-
分区数
numPartitions
|numSlices: Int
-
示例:
val rdd = sc.parallelize(data, numPartitions = 5)
-
分区逻辑
partitionIndex = fieldName.hashCode() % numPartitions
-
扩展随机字段:
0 ~ numPartitions
-
-
分区器
partitioner: Partitioner
(针对键值对 RDD)- 默认的分区器:
HashPartitioner
- 默认的分区器:
再分区算子
用于对数据进行重新分配。
- coalesce:
coalesce(numPartitions: Int, shuffle: Boolean)
- 用于减少分区数,通常在不需要洗牌时使用。
- 示例:
val coalescedRDD = rdd.coalesce(2, shuffle = false)
- repartition:
repartition(numPartitions: Int)
- 等同于
coalesce(numPartitions, true)
,用于增加或减少分区数,并进行数据洗牌。 - 示例:
val repartitionedRDD = rdd.repartition(4)
- 等同于