Spark RDD算子

Spark RDD算子

转换算子(Transformation Operators)

类别算子名称简要介绍
映射类算子map对RDD中的每个元素进行操作,返回一个新的RDD
flatMap类似于map,但每个输入元素可映射到0或多个输出元素
mapPartitions对RDD的每个分区中的元素进行操作,返回一个新的RDD
mapPartitionsWithIndex类似于mapPartitions,但提供了分区索引
mapValues对键值对RDD中的每个值进行操作,返回一个新的键值对RDD
flatMapValues对键值对RDD中的每个值进行操作,每个值可映射到0或多个输出元素
过滤类算子filter过滤出满足条件的元素,返回一个新的RDD
distinct去除重复元素,返回一个新的RDD
分组类算子groupBy根据指定的函数对RDD中的元素进行分组,返回一个新的RDD
groupByKey对键值对RDD中的值进行分组,返回一个新的键值对RDD
reduceByKey对具有相同键的值进行合并,返回一个新的键值对RDD
aggregateByKey使用指定的聚合函数和中间结果进行聚合,返回一个新的键值对RDD
combineByKey类似于aggregateByKey,但允许更灵活的聚合过程
cogroup对两个或多个键值对RDD进行分组,返回一个新的RDD
排序类算子sortByKey根据键对键值对RDD进行排序,返回一个新的键值对RDD
sortBy根据指定的函数对RDD进行排序,返回一个新的RDD
repartitionAndSortWithinPartitions对RDD进行重新分区并在每个分区内排序,返回一个新的RDD
连接类算子union合并两个RDD,返回一个新的RDD
intersection计算两个RDD的交集,返回一个新的RDD
subtract从一个RDD中移除另一个RDD中的元素,返回一个新的RDD
cartesian计算两个RDD的笛卡尔积,返回一个新的RDD
join根据键连接两个键值对RDD,返回一个新的键值对RDD
leftOuterJoin左外连接两个键值对RDD,返回一个新的键值对RDD
rightOuterJoin右外连接两个键值对RDD,返回一个新的键值对RDD
fullOuterJoin全外连接两个键值对RDD,返回一个新的键值对RDD
分区类算子sample随机采样RDD中的元素,返回一个新的RDD
randomSplit随机划分RDD中的元素,返回一个RDD数组
pipe将RDD的每个分区作为外部进程的输入,返回一个新的RDD
coalesce减少RDD的分区数,返回一个新的RDD
repartition增加或减少RDD的分区数,并进行数据洗牌,返回一个新的RDD
partitionBy根据指定的分区器对键值对RDD进行分区,返回一个新的键值对RDD

行动算子(Action Operators)

类别算子名称简要介绍
聚合类算子reduce对RDD中的元素进行归约操作,返回一个结果
fold类似于reduce,但带有初始值
aggregate使用指定的聚合函数和中间结果对RDD进行聚合,返回一个结果
计数类算子count计算RDD中的元素数量,返回一个长整型结果
countByKey计算每个键的出现次数,返回一个键值对Map
countByValue计算每个值的出现次数,返回一个值对Map
收集类算子collect将RDD的所有元素收集到驱动程序,返回一个数组
collectAsMap将键值对RDD的所有元素收集到驱动程序,返回一个Map
take获取RDD的前n个元素,返回一个列表
takeSample随机获取RDD的样本,返回一个列表
takeOrdered获取排序后的RDD的前n个元素,返回一个列表
获取类算子first获取RDD的第一个元素,返回一个元素
top获取排序后的RDD的前n个元素,返回一个列表
遍历类算子foreach对RDD中的每个元素执行操作,不返回结果
foreachPartition对RDD的每个分区中的元素执行操作,不返回结果
保存类算子saveAsTextFile将RDD保存到文本文件中
saveAsSequenceFile将RDD保存为序列文件
saveAsObjectFile将RDD保存为对象文件
saveAsHadoopFile将RDD保存为Hadoop文件
saveAsNewAPIHadoopFile将RDD保存为新API的Hadoop文件
saveAsHadoopDataset将RDD保存为Hadoop数据集

一、转换算子(Transformation Operators)

1、逐条处理

// val rdd2: RDD[U] = rdd.map(f: T=>U)
val data = Array("word1", "word2", "word3")
// 将单词 word 映射为元组 (word, 1)
val rdd: RDD[(String, Int)] = sc.parallelize(data).map(word => (word, 1))

2、扁平化处理

val sentences = List("Hello world", "Apache Spark is awesome", "FlatMap example")
// 将一个句子拆分为多个单词,1=>多,并将所有的结果元素合并为一个新的 RDD
val rdd: RDD[String] = sc.parallelize(sentences).flatMap(word => word.split(" "))

3、分区内逐行处理

【✔ 】:以分区为单位(分区不变)逐行处理数据
map VS mapPartitions
1、数量:前者一进一出IO数量一致,后者多进多出IO数量不一定一致
2、性能:前者多分区逐条处理,后者各分区并行逐条处理更佳,常时间占用内存易导致OOM,内存小不推荐
3、类型:两者入口和出口类型都必须一致,后者进出都必须是迭代器

// 在同一个 mapPartitions 操作中进行过滤和转换操作,可以减少对数据的多次遍历,从而提高性能。
mapParitions( 
// ≈ 子查询
it.filter(...) // 谓词下缀
)// 分别使用 mapPartitions 和 filter 进行转换和过滤操作,增加计算开销。
mapParitions(...)
fielter(...)     // where
// 一个包含整数的 RDD,过滤出偶数并将其乘以 2。
val data = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
val rdd: RDD[Int] = sc.parallelize(data, 3) // 创建3个分区
// 使用 mapPartitions 进行过滤和转换
val resultRDD: RDD[Int] = rdd.mapPartitions { it =>val filtered = it.filter(_ % 2 == 0) // 过滤偶数filtered.map(_ * 2) // 将每个偶数乘以 2
}// 【分区内逐行处理】:以分区为单位(分区不变)逐行处理数据,并追加分区编号
val rdd2: RDD[U] = rdd.mapPartitionsWithIndex(f:(Int,Iterator[T])=>Iterator[U][,preservePar:Boolean])
// mapPartitionsWithIndex 功能同mapPartitions,区别在于追加了分区编号,一般用于去掉表头和分析调试

4、转内存数组

分区的数据转为同类型的内存数组,分区不变 rdd:RDD[T]

// val rdd2: RDD[Array[T]] = rdd.glom();
// 创建一个包含 1 到 100 的整数 RDD,分为 5 个分区
val rdd = sc.parallelize(1 to 100, 5)
// 使用 glom() 将每个分区的元素合并成一个数组
val glommedRDD = rdd.glom()
// 输出每个分区合并后的数组
glommedRDD.collect().foreach { partition =>println("Partition contains: " + partition.mkString(", "))
}

5、数据过滤

过滤规则 f:T=>Boolean,保留满足条件数据,分区不变,不推荐用
【数据可能倾斜】某些分区的数据量远远超过了其他分区,造成数据分布不均匀

// val rdd2: RDD[T] = rdd.filter(f:T=>Boolean)
// 创建一个包含 1 到 10 的整数 RDD
val rdd = sc.parallelize(1 to 10)
// 使用 filter 过滤出偶数
val evenRDD = rdd.filter(num => num % 2 == 0)

6、数据分组

同键同组同区,同区可多组;打乱shuffle,按f:T=>K规则,分区不变,【数据可能倾斜】

// val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K)
// val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K, partioner:Partitioner)
// val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K, numPartitions:Int)// 定义一个自定义分区器,根据奇偶数进行分区
class OddEvenPartitioner(numPartitions: Int) extends Partitioner {override def numPartitions: Int = numPartitionsoverride def getPartition(key: Any): Int = {val k = key.toStringif (k == "even") 0 else 1}
}	
// 创建一个包含 1 到 10 的整数 RDD
val rdd = sc.parallelize(1 to 10)
// 使用 groupBy 方法,根据奇偶数进行分组
val groupedRDD = rdd.groupBy(num => if (num % 2 == 0) "even" else "odd", new OddEvenPartitioner(2))

7、数据抽样

函数名:sample
参数:
withReplacement:Boolean 是否有放回抽样
fraction:Double 抽样率
seed:Long 随机种子,默认为当前时间戳(一般缺省)
无放回抽样: sample(false, 0.4) => 抽样40%的数据,40条左右
有放回抽样: sample(true, 0.4) => 每条数据被抽取的概率为40% (可能有重复的元素)

// val rdd2: RDD[T] = rdd.sample(withReplacement:Boolean, fraction:Double, seed:Long)
// 创建一个包含 1 到 10 的整数 RDD
val rdd = sc.parallelize(1 to 10)
// 使用 sample 方法,从 RDD 中有放回随机抽样抽中每个数概率为0.9 
val sampledRDD = rdd.sample(withReplacement = true, fraction = 0.9)

8、数据去重

采用该方法去重,数据规模比较大的情况下,数据压力比较大,因为数据需要在不同的分区间比较
一般采用分组的方式,将去重字段作为分组字段,在不同的分区内并行去重

val rdd2: RDD[T] = rdd.distinct()
// numPartitions: Int 设定去重后的分区数
// 隐式参数 order: Ordering[T] 用于指定元素类型 T 的排序方式,以便在对元素进行去重时进行比较。
val rdd2: RDD[T] = rdd.distinct(numPartitions: Int)(implicit order: Ording[T] = null)

9、数据排序

处理数据f:T=>K,升降序asc: Boolean,分区数numPartitions:Int
默认排序前后分区一致,【有shuffle】,除非重新设定 numPartitions
全局排序,多分区间交换数据,效率较低。优化见 PairRDD
若:K为基本类型,则无需提供第二参数列表中的隐式参数 ord: Ordering[K]
若:K为自定义类型,则必须提供第二参数

case class Student(stu_id: Int, stu_name: String, stu_class: String)
val stuData = Seq(Student(1, "李明", "ccc"),Student(2, "小刚", "bbb"),Student(3, "小红", "aaa"),Student(4, "张三", "aaa")
)

sortBy

// val rdd: RDD[T] = rdd.sortBy(f:T=>K,asc:Boolean,numPartitions:Int)(implicit ord: Ordering[K], ctag: ClassTag[K])val rdd = sc.parallelize(stuData)
// 使用 sortBy 方法对 RDD 按照 stu_class 进行排序,升序,指定分区数为 2
val sortedRdd: RDD[Student] = rdd.sortBy(stu => stu.stu_class, ascending = true, 2)// 按照班级升序排序,如果班级相同则按照学生 ID 降序
val sortedRdd: RDD[Student] = rdd.sortBy(s => s, ascending = true, numPartitions = 2)(Ordering.by(s => (s.stu_class, -s.stu_id)),ClassTag(classOf[Student])    // 显式地传递了 ClassTag 参数 ClassTag(classOf[Student]),以确保在运行时能获取 Student 类型信息。
)

sortByKey

//  按照班级升序排序,如果班级相同则按照学生 ID 降序
val rdd = sc.parallelize(stuData)
// 将 RDD 转换为键值对 RDD,其中键是 (stu_class, -stu_id)
val pairRDD: RDD[((String, Int), Student)] = rdd.map(student => ((student.stu_class, -student.stu_id), student))
// 使用 sortByKey 方法对键进行排序
val sortedPairRDD: RDD[((String, Int), Student)] = pairRDD.sortByKey()
// 取出排序后的 Student 对象
val sortedRDD: RDD[Student] = sortedPairRDD.map(_._2)

10、交并补差

多个类型 RDD[T]:纵向
交并差操作:数据类型一致,根据元素 equals 认定是否相同
【自定义类型】:必须重写 equals 方法,因为默认等值判断 == 判断地址
拉链操作:要求分区数和分区内的数据量一致

val rdd1: RDD[Int] = sc.makeRDD(Seq(1, 3, 4, 6, 4, 3, 33, 31))
val rdd2: RDD[Int] = sc.makeRDD(Seq(2, 2, 4, 5, 1, 22, 3, 1))
// 【求交集】:重载可重设分区数numPartitions: Int,或定义分区规则par: Partitioner[T]
val rdd3: RDD[T] = rdd1.intersection(rdd2:RDD[T])
val rdd3: RDD[T] = rdd1.intersection(rdd2:RDD[T], numPartitions:Int)
val rdd3: RDD[T] = rdd1.intersection(rdd2:RDD[T], par:Partitioner[T])
// 【求并集】:不去重
val rdd3: RDD[T] = rdd1.union(rdd2:RDD[T])
// 【求差集】:重载可重设分区数numPartitions:Int,或定义分区规则par:Partitioner[T]
val rdd3: RDD[T] = rdd1.subtract(rdd2:RDD[T])
val rdd3: RDD[T] = rdd1.subtract(rdd2:RDD[T], numPartitions:Int)
val rdd3: RDD[T] = rdd1.subtract(rdd2:RDD[T], par:Partitioner[T])

11、拉链操作

val rdd1: RDD[Int] = sc.makeRDD(Seq(1, 3, 4, 6, 4, 3, 33, 31))
val rdd2: RDD[Int] = sc.makeRDD(Seq(2, 2, 4, 5, 1, 22, 3, 1))val rdd2: RDD[(T,U)] = rdd1.zip(rdd2:RDD[U])
val rdd2: RDD[(T,Long)] = rdd1.zipWithIndex()
val rdd2: RDD[(T,Long)] = rdd1.zipWithUniqueId()
// 有三个重载:1+1,1+2,1+3
val rdd2: RDD[V]=rdd.zipPartitions(rddA:RDD[A])(f:(Iterator[T],Iterator[A])=>Iterator[V])
val rdd2: RDD[V]=rdd.zipPartitions(rddA:RDD[A],preserveParitioning:Boolean)(f:(Iterator[T],Iterator[A])=>Iterator[V])

12、键值映射类算子

map

case class Student(stu_id: Int, stu_name: String, stu_class: String)
val stuData = Seq(Student(1, "李明", "ccc"),Student(2, "小刚", "bbb"),Student(3, "小红", "aaa"),Student(4, "张三", "aaa")
)
val rdd = sc.parallelize(stuData)// 使用 map 对 RDD 中的每个 Student 对象的 stu_id 增加 1
val resultRdd1 = rdd.map(stu => stu.copy(stu_id = stu.stu_id + 1))// Student 对象转换为一个包含 (stu_id, stu_name) 元组的 RDD
val resultRdd2 = rdd.map(stu => (stu.stu_id, stu.stu_name))

mapPartitions

val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices = 2)
val resultRDD = rdd.mapPartitions(iter => {// 对每个分区的元素求和val sum = iter.sum// 返回求和结果作为新的分区Iterator(sum)
})
resultRDD.collect().foreach(println)	// 输出为6 15

13、键值分组聚合类

reduceByKey

// reduceByKey + foldByKey + aggregateByKey 都调用 combineByKeyClassTag
// 【按键聚合值】: combiner和reduce的值类型相同,计算规则相同
// group by + combiner + reduce
val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(f:(V,V)=>V)
val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(f:(V,V)=>V, numPartitions:Int)
val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(partitioner:Partitioner, f:(V,V)=>V)val scoresRDD = sc.parallelize(Seq(("Alice", 80),("Bob", 90),("Alice", 85),("Bob", 95),("Alice", 75)
))
val totalScoresRDD = scoresRDD.reduceByKey((score1, score2) => score1 + score2)
// 简写 val totalScoresRDD = scoresRDD.reduceByKey(_+_)
totalScoresRDD.collect().foreach(println)	// 输出(Alice,240) (Bob,185)

foldByKey

// 【按键聚合值】: combiner和reduce的值类型相同,计算规则相同,带初值
val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V)(inParOp:(V,V)=>V)
val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V,numPartitions:Int)(inParOp:(V,V)=>V)
val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V,partitioner:Partitioner)(inParOp:(V,V)=>V)
// 数据同上 reduceByKey
val initialScores = 0
val totalScoresRDD = scoresRDD.foldByKey(initialScores)((score1, score2) => score1 + score2)

aggregateByKey

// 与 reduceByKey 和 foldByKey 不同,aggregateByKey 允许指定一个初始值和两个聚合函数,可以对每个键的值进行更复杂的聚合操作。
// 【✔ 按键分别执行分区内和分区间计算】: combiner和reduce的值类型可不同,计算规则可不同
val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U,numPartitions:Int)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U,partitioner:Partitioner)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val rddPair2: RDD[(String, Float)] = rddPair.aggregateByKey(0.0f)(_+_,_+_)val resultRDD = pairRdd.aggregateByKey(zeroValue)((acc, value) => {// 对每个分区中的值进行聚合操作// 初始值为 zeroValue// 返回值将作为每个分区聚合的结果// 这个函数在每个分区内执行// acc 是聚合的累加器,value 是当前处理的值// 返回一个新的累加器},(acc1, acc2) => {// 对多个分区的聚合结果进行合并操作// acc1 和 acc2 是两个分区的聚合结果// 返回合并后的结果}
)
val scoresRDD = sc.parallelize(Seq(("Alice", 80),("Bob", 90),("Alice", 85),("Bob", 95),("Alice", 75)
))
val zeroValue = (0, 0) // 初始值为元组 (sum, count)
val resultRDD = scoresRDD.aggregateByKey(zeroValue)((acc, score) => {// 分区内聚合函数,计算每个学生的总成绩和数量(acc._1 + score, acc._2 + 1)},(acc1, acc2) => {// 分区间聚合函数,合并每个学生的总成绩和数量(acc1._1 + acc2._1, acc1._2 + acc2._2)}
)
// 每个学生的总成绩和数量
resultRDD.foreach(println)	// (Alice,(240,3)) (Bob,(185,2))

combineByKey

// 【按键分别执行分区内和分区间计算】: combiner和reduce的值类型可不同,计算规则可不同
val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U,numPartitions:Int)
val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U,partitioner:Partitioner,mapSideCombine:Boolean,serializer:Serializer)val resultRDD = pairRDD.combineByKey((value) => {// 创建组合器函数,用于将每个值转换为另一种形式(通常是一个累加器或缓冲区)// 返回值将作为每个键的第一个值的处理结果},(acc, value) => {// 合并值函数,用于将每个值与累加器或缓冲区进行合并// 返回值将作为每个键的聚合结果},(acc1, acc2) => {// 合并累加器函数,用于合并两个累加器或缓冲区// 返回值将作为多个分区的聚合结果}
)
val scoresRDD = sc.parallelize(Seq(("Alice", 80),("Bob", 90),("Alice", 85),("Bob", 95),("Alice", 75)
))val resultRDD = scoresRDD.combineByKey((score) => (score, 1), // 创建组合器,初始值为 (成绩, 1),表示总成绩和数量(acc, score) => (acc._1 + score, acc._2 + 1), // 合并值,累加总成绩和数量(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // 合并累加器,合并总成绩和数量
)// 计算平均成绩
val avgScoresRDD = resultRDD.mapValues(sumCount => sumCount._1.toDouble / sumCount._2)avgScoresRDD.collect().foreach(println)

groupByKey

// groupByKey 会将具有相同键的元素分组在一起,但是这个操作可能会导致数据倾斜,因为它会将所有具有相同键的元素放在同一个分区中。
// 【✔ 按键分组】
val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey()
val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey(numPartitions:Int)
val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey(partitioner:Partitioner)val scoresRDD = sc.parallelize(Seq(("Alice", 80),("Bob", 90),("Alice", 85),("Bob", 95),("Alice", 75)
))val groupedRDD = scoresRDD.groupByKey()groupedRDD.collect().foreach { case (name, scores) =>println(s"$name: ${scores.mkString(", ")}")
}

14、关联聚合

groupWith

// groupWith 对两个 RDD 中具有相同键的元素进行分组,返回一个新的 RDD,其中每个键对应的值是一个包含两个 RDD 中相同键的元素组成的元组。
// 【多数据集分组】:1VN 同键同组,不同RDD值进入TupleN的不同Iterable
-------------------------------------------------------------------------------
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.groupWith(otherA: RDD[(K,V1)])
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1],Iterable[V2])] = pairRdd.groupWith(otherA: RDD[(K,V1)],otherB: RDD[(K,V2)])
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1],Iterable[V2],Iterable[V3])] = pairRdd.groupWith(otherA: RDD[(K,V1)],otherB: RDD[(K,V2)],otherC: RDD[(K,V3)])val scoresRDD1 = sc.parallelize(Seq(("Alice", 80),("Bob", 90),("Alice", 85),("Bob", 95),("Alice", 75)
))val scoresRDD2 = sc.parallelize(Seq(("Alice", "Good"),("Bob", "Excellent"),("Alice", "Good"),("Bob", "Excellent"),("Alice", "Good")
))val groupedRDD = scoresRDD1.groupWith(scoresRDD2)groupedRDD.collect().foreach { case (name, (scores1, scores2)) =>println(s"$name: (${scores1.mkString(", ")}) | (${scores2.mkString(", ")})")
}

cogroup

// 重载 1+1 1+2 1+3,追加再分区操作
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)])
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)],numPartitions:Int)
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)],partitioner:Partitioner)
// 示例同groupWith,两者区别:cogroup可对分组后的元素进行更复杂的操作,如在分组后对每个键的值进行聚合计算。

15、关联操作

【关联操作】:1V1
横向,根据键做关联
重载:numPartitions:Int 或 partitioner:Partitioner

val pairRdd: RDD[(K, (V, V1))] = pairRdd1.join(pairRdd3:RDD[(K,V1)])
val pairRdd: RDD[(K, (V, Option[V1]))] = pairRdd1.leftOuterJoin(pairRdd3:RDD[(K,V1)])
val pairRdd: RDD[(K, (Option[V]), V1)] = pairRdd1.rightOuterJoin(pairRdd3:RDD[(K,V1)])
val pairRdd: RDD[(K, (Option[V]), Option[V1])] = pairRdd1.fullOuterJoin( pairRdd3:RDD[(K,V1)])// 创建两个 RDD
val rdd1 = sc.parallelize(Seq((1, "Alice"),(2, "Bob"),(3, "Charlie")
))
val rdd2 = sc.parallelize(Seq((1, 25),(3, 30),(4, 35)
))
// join:内连接
val innerJoinRDD = rdd1.join(rdd2)
// leftOuterJoin:左外连接
val leftJoinRDD = rdd1.leftOuterJoin(rdd2)
// rightOuterJoin:右外连接
val rightJoinRDD = rdd1.rightOuterJoin(rdd2)
// fullOuterJoin:全外连接
val fullOuterJoinRDD = rdd1.fullOuterJoin(rdd2)

二、行动算子(Action Operators)

【返回】所有元素分别在分区间和分区内执行【聚集】操作的结果
reduce & fold 分区内和分区间执行相同操作,且类型与元素类型一致
aggregate 分区内和分区间执行不同操作,且类型与元素类型不一致

val rst:T = rdd.reduce(f:(T,T)=>T)
val rst:T = rdd.fold(init:T)(f:(T,T)=>T)
val rst:U = rdd.aggregate(init:U)(f:(U,T)=>U,f:(U,T)=>U)
val array:Array[T] = rdd.collect()	// 返回包含数据集中所有元素的数组val rst:Long = rdd.count()	// 返回数据集中元素数量val rst:Map[K,Long] = pairRdd.countByKey()	// 返回一个包含键值对的 Map,其中键是 RDD 中的键,值是对应键出现的次数。val rst:T = rdd.max()	// 返回数据集中最大值val rst:T = rdd.min()	// 返回数据集中最小值val rst:T = rdd.first()	// 返回数据集中的第一个元素val array:Array[T] = rdd.take(num:Int)	// 返回数据集中的前 num 个元素val array:Array[T] = rdd.takeOrdered(num:Int)(implicit ord:Ordering[T])	// 返回排序后数据集中的前 num 个元素rdd.foreach(f:T=>Unit)	// 遍历迭代
rdd.foreachPartition(f:Iterable[T]=>Unit) // 写数据库操作首选

持久化至文本文件,重载追加压缩功能

import org.apache.hadoop.io.compress.{BZip2Codec, SnappyCodec}
import io.airlift.compress.lzo.LzopCodec
rdd.saveAsTextFile("out_path",classOf[BZip2Codec])
rdd.saveAsTextFile(path:String)
rdd.saveAsTextFile(path:String,codec: Class[_ <: CompressionCodec])
rdd.saveAsObjectFile(path:String)

三、Spark RDD 并行度控制

默认的并行度:200

分区数的体现方式

  • 分区数 numPartitions | numSlices: Int

    • 示例:val rdd = sc.parallelize(data, numPartitions = 5)

    • 分区逻辑

      partitionIndex = fieldName.hashCode() % numPartitions
      
    • 扩展随机字段0 ~ numPartitions

  • 分区器 partitioner: Partitioner (针对键值对 RDD)

    • 默认的分区器HashPartitioner
再分区算子

用于对数据进行重新分配。

  • coalescecoalesce(numPartitions: Int, shuffle: Boolean)
    • 用于减少分区数,通常在不需要洗牌时使用。
    • 示例:val coalescedRDD = rdd.coalesce(2, shuffle = false)
  • repartitionrepartition(numPartitions: Int)
    • 等同于 coalesce(numPartitions, true),用于增加或减少分区数,并进行数据洗牌。
    • 示例:val repartitionedRDD = rdd.repartition(4)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/25897.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在VMware虚拟机上安装win10 跳过 通过microsoft登录

在VMware虚拟机上安装win10 跳过 “通过microsoft登录” 配置虚拟机&#xff0c;将网卡断开&#xff0c; 具体操作&#xff1a; 虚拟机/设置/硬件/网络适配器/设备状态&#xff0c;取消已连接和启动时连接的两个对号&#xff0c; 再把虚拟机重启&#xff0c;然后就可以跳过这个…

通过技术优化财务规划报告,重塑企业体验

财务报告使企业的管理层能够及时、准确、清晰且一致地了解整个企业的财务业绩和风险机遇。它促进了企业内部利益相关者之间的沟通&#xff0c;从而支持基于数据驱动的洞察力提升和战略决策。但财务报告往往需要占用大量的时间来运行和准备&#xff0c;且可能使最终结论偏离核心…

什么是PV操作

PV操作是一种在操作系统中用于同步和互斥的机制,它基于信号量(Semaphore)的概念。在并发编程中,多个进程或线程可能会同时访问共享资源,PV操作可以用来确保这些访问是同步的,以防止竞态条件和数据不一致的问题。 PV操作包括两个原子操作: P操作(Proberen,测试):这…

使用 C# 学习面向对象编程:第 4 部分

C# 构造函数 第 1 部分仅介绍了类构造函数的基础知识。 在本课中&#xff0c;我们将详细讨论各种类型的构造函数。 属性类型 默认构造函数构造函数重载私有构造函数构造函数链静态构造函数析构函数 请注意构造函数的一些基本概念&#xff0c;并确保你的理解非常清楚&#x…

从入门到精通:进程间通信

引言 在现代操作系统中&#xff0c;进程是程序运行的基本单位。为了实现复杂的功能&#xff0c;多个进程常常需要进行通信。进程间通信&#xff08;Inter-Process Communication, IPC&#xff09;是指多个进程之间进行数据交换的一种机制。IPC的主要目的包括数据传输、资源共享…

WDF驱动开发-电源策略(三)

多组件设备的 KMDF 驱动程序只能将请求发送到处于活动状态的组件。 通常&#xff0c;驱动程序将 I/O 队列分配给组件或组件集。 首先考虑分配给单个组件的队列。 驱动程序在组件变为活动状态时启动队列&#xff0c;并在组件空闲时停止队列。 因此&#xff0c;当 KMDF 调用队列…

生成式人工智能重置:从初期热潮到战略扩展

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

PyTorch学习8:多分类问题

文章目录 前言一、说明二、示例1.步骤2.示例代码 总结 前言 介绍如何利用PyTorch中Softmax 分类器实现多分类问题。 一、说明 1.多分类问题的输出是一个分布&#xff0c;满足和为1. 2.Softmax 分类器 3.损失函数&#xff1a;交叉熵损失 torch.nn.CrossEntropyLoss() 二、…

运维开发详解:DevOps 理念下的高效运维实践

目录 前言 1、 运维开发的核心概念 2、 运维开发的技术栈 3、运维开发的实践案例 4、 运维开发的挑战与机遇 5、 运维开发的未来发展趋势 6、运维开发概念 7、运维开发的角色 8、成为一名优秀的运维开发工程师 9、总结 前言 随着互联网业务的快速发展&#xff0c;传…

虚拟化 之一 详解 jailhouse 架构及原理、软硬件要求、源码文件、基本组件

Jailhouse 是一个基于 Linux 实现的针对创建工业级应用程序的小型 Hypervisor&#xff0c;是由西门子公司的 Jan Kiszka 于 2013 年开发的&#xff0c;并得到了官方 Linux 内核的支持&#xff0c;在开源社区中获得了知名度和吸引力。 Jailhouse Jailhouse 是一种轻量级的虚拟化…

微软如何打造数字零售力航母系列科普13 - Prime Focus Technologies在NAB 2024上推出CLEAR®对话人工智能联合试点

Prime Focus Technologies在NAB 2024上推出CLEAR对话人工智能联合试点 彻底改变您与内容的互动方式&#xff0c;从内容的创建到分发 洛杉矶&#xff0c;2024年4月9日/PRNewswire/-媒体和娱乐&#xff08;M&E&#xff09;行业人工智能技术解决方案的先驱Prime Focus Techn…

架构师如何评估团队成员的成熟度

评估团队成员的成熟度是一个涉及观察、沟通和反馈的过程。以下是一些方法和步骤&#xff0c;可以帮助你评估团队成员的成熟度&#xff0c;无论是在技术能力、还是职业发展方面&#xff1a; 设定评估标准&#xff1a;首先&#xff0c;明确你希望评估的成熟度方面&#xff0c;比…

人工智能在医学领域的应用及技术实现

欢迎来到 Papicatch的博客 目录 &#x1f349;引言 &#x1f349; 医学影像分析 &#x1f348;技术实现 &#x1f34d;数据准备 &#x1f34d;模型构建 &#x1f34d;模型训练 &#x1f34d;模型评估 &#x1f34d;应用部署 &#x1f348;示例代码 &#x1f349; 基因…

操作系统真象还原:内存管理系统

第8章-内存管理系统 这是一个网站有所有小节的代码实现&#xff0c;同时也包含了Bochs等文件 8.1 Makefile简介 8.1.1 Makefile是什么 8.1.2 makefile基本语法 make 给咱们提供了方法&#xff0c;可以在命令之前加个字符’&#xff20;’&#xff0c;这样就不会输出命令本身…

微信小程序使用 “云函数“ 获取 “openid“

文章目录 1.前期准备2.具体操作步骤 1.前期准备 必须使用云开发已经配置好云开发 2.具体操作步骤 1.进入小程序开发工具→在云函数目录上右键→选中新建云函数 创建结束&#xff0c;自动上传&#xff08;必须确认已经上传才生效&#xff09; 2.进入对应页面的js文件&#…

QT 信号和槽 信号关联到信号示例 信号除了可以绑定槽以外,信号还可以绑定信号

信号除了可以关联到槽函数&#xff0c;还可以关联到类型匹配的信号&#xff0c;实现信号的接力触发。上个示例中因为 clicked 信号没有参数&#xff0c;而 SendMsg 信号有参数&#xff0c;所以不方便直接关联。本小节示范一个信号到信号的关联&#xff0c;将按钮的 clicked 信号…

【优化过往代码】关于vue自定义事件的运用

【优化过往代码】关于vue自定义事件的运用 需求说明过往代码优化思路优化后代码&#xff08;Vue2&#xff09;遇到问题记录 Vue2官方自定义指令说明文档 Vue3官方自定义指令说明文档 需求说明 进入某些页面需要加载一些外部资源&#xff0c;并在资源加载完后进行一些处理&…

51单片机数码管显示的计数器,按键按下暂定,再次按下继续。(按键功能使用中断实现)

1、功能描述 数码管显示的计数器&#xff0c;按键按下暂定&#xff0c;再次按下继续。&#xff08;按键功能使用中断实现&#xff09; 2、实验原理 按键与中断&#xff1a;使用单片机的外部中断功能来检测按键动作&#xff0c;实现非阻塞的按键检测。 中断服务程序&…

十四、OpenAI之助手API(Asistants API)

助手API允许你在自己的应用系统中构建一个AI助手。助手有指令&#xff0c;能利用模型、工具和文件响应用户的查询。助手API目前支持3种类型的工具&#xff1a;代码交互&#xff0c;文件搜索和函数调用。 你可以使用助手后台探索助手的能力&#xff0c;或通过这个指南的大纲一步…

【栈】2751. 机器人碰撞

本文涉及知识点 栈 LeetCode2751. 机器人碰撞 现有 n 个机器人&#xff0c;编号从 1 开始&#xff0c;每个机器人包含在路线上的位置、健康度和移动方向。 给你下标从 0 开始的两个整数数组 positions、healths 和一个字符串 directions&#xff08;directions[i] 为 ‘L’ …