04-240606Spark笔记
1.行动算子-2
-
save相关算子:
格式:
def saveAsTextFile(path: String): Unit def saveAsObjectFile(path: String): Unit def saveAsSequenceFile(path: String,codec: Option[Class[_ <: CompressionCodec]] = None): Unit
例子:
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3))) rdd.saveAsTextFile("output")rdd.saveAsObjectFile("output1")// saveAsSequenceFile方法要求数据的格式必须为K-V类型rdd.saveAsSequenceFile("output2")
输出结果:
-
foreach
格式:
def foreach(f: T => Unit): Unit = withScope {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
例子:
val rdd = sc.makeRDD(List(1,2,3,4)) //foreach 其实是Driver端内存集合的循环遍历方法rdd.collect().foreach(println) //Driverprintln("***************")// foreach 其实是Executor端内存数据打印rdd.foreach(println) // Executor// 算子 : Operator(操作)// RDD的方法和Scala集合对象的方法不一样// 集合对象的方法都是在同一个节点的内存中完成的。// RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行// 为了区分不同的处理效果,所以将RDD的方法称之为算子。// RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。
输出结果:
2. 序列化
2.1 闭包检测
-
闭包检测
因为Driver需要给两个Executor共享User方法,共享就需要序列化
案例:
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List[Int]()) val user = new User() // SparkException: Task not serializable// NotSerializableException: com.atguigu.bigdata.spark.core.rdd.operator.action.Spark07_RDD_Operator_Action$User // RDD算子中传递的函数是会包含闭包操作,那么就会进行检测功能// 闭包检测rdd.foreach(num => {println("age = " + (user.age + num))}) sc.stop() }//class User extends Serializable {// 样例类在编译时,会自动混入序列化特质(实现可序列化接口)//case class User() {class User {var age : Int = 30}
-
RDD 的分区器
自己来写分区器:
def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf) val rdd = sc.makeRDD(List(("nba", "xxxxxxxxx"),("cba", "xxxxxxxxx"),("wnba", "xxxxxxxxx"),("nba", "xxxxxxxxx"),),3)val partRDD: RDD[(String, String)] = rdd.partitionBy( new MyPartitioner ) partRDD.saveAsTextFile("output") sc.stop()}
自定义的分区器:
class MyPartitioner extends Partitioner{// 分区数量override def numPartitions: Int = 3 // 根据数据的key值返回数据所在的分区索引(从0开始)override def getPartition(key: Any): Int = {key match {case "nba" => 0case "wnba" => 1case _ => 2}}}
* 自定义分区器 * 1. 继承Partitioner * 2. 重写方法
输出结果:
-
RDD 文件读取与保存
案例1:
def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf) val rdd = sc.textFile("output1")println(rdd.collect().mkString(",")) val rdd1 = sc.objectFile[(String, Int)]("output2")println(rdd1.collect().mkString(",")) val rdd2 = sc.sequenceFile[String, Int]("output3")println(rdd2.collect().mkString(",")) sc.stop()}
输出结果:
案例2:
def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf) val rdd = sc.makeRDD(List(("a", 1),("b", 2),("c", 3))) rdd.saveAsTextFile("output1")rdd.saveAsObjectFile("output2")rdd.saveAsSequenceFile("output3") sc.stop()}
输出结果:
1. 数据结构:
-
累加器
累加器用来把 Executor 端变量信息聚合到 Driver 端。
![image-20240605202228850](E:\Files2\Typictures\image-20240605202228850.png
Acc,累加器可以把Excutor端的数据返回到Driver中去:
案例:
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf) val rdd = sc.makeRDD(List(1,2,3,4)) // reduce : 分区内计算,分区间计算//val i: Int = rdd.reduce(_+_)//println(i)var sum = 0rdd.foreach(num => {sum += num})println("sum = " + sum) sc.stop() }
-
系统累加器
案例:
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf) val rdd = sc.makeRDD(List(1,2,3,4)) // 获取系统累加器// Spark默认就提供了简单数据聚合的累加器val sumAcc = sc.longAccumulator("sum") //sc.doubleAccumulator//sc.collectionAccumulator rdd.foreach(num => {// 使用累加器sumAcc.add(num)}) // 获取累加器的值println(sumAcc.value) sc.stop() }
累加器的一些特殊情况:
少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行 一般情况下,累加器会放置在行动算子进
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf) val rdd = sc.makeRDD(List(1,2,3,4)) // 获取系统累加器// Spark默认就提供了简单数据聚合的累加器val sumAcc = sc.longAccumulator("sum") //sc.doubleAccumulator//sc.collectionAccumulator val mapRDD = rdd.map(num => {// 使用累加器sumAcc.add(num)num}) // 获取累加器的值// 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行// 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行// 一般情况下,累加器会放置在行动算子进行操作mapRDD.collect()mapRDD.collect()println(sumAcc.value) sc.stop() }
-
自定义累加器
分布式共享只写变量
案例:
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf) val rdd = sc.makeRDD(List("hello", "spark", "hello")) // 累加器 : WordCount// 创建累加器对象val wcAcc = new MyAccumulator()// 向Spark进行注册sc.register(wcAcc, "wordCountAcc") rdd.foreach(word => {// 数据的累加(使用累加器)wcAcc.add(word)}) // 获取累加器累加的结果println(wcAcc.value) sc.stop() }/*自定义数据累加器:WordCount 1. 继承AccumulatorV2, 定义泛型IN : 累加器输入的数据类型 StringOUT : 累加器返回的数据类型 mutable.Map[String, Long] 2. 重写方法(6)*/class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] { private var wcMap = mutable.Map[String, Long]() // 判断是否初始状态override def isZero: Boolean = {wcMap.isEmpty} override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {new MyAccumulator()} override def reset(): Unit = {wcMap.clear()} // 获取累加器需要计算的值override def add(word: String): Unit = {val newCnt = wcMap.getOrElse(word, 0L) + 1wcMap.update(word, newCnt)} // Driver合并多个累加器override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = { val map1 = this.wcMapval map2 = other.value map2.foreach{case ( word, count ) => {val newCount = map1.getOrElse(word, 0L) + countmap1.update(word, newCount)}}} // 累加器结果override def value: mutable.Map[String, Long] = {wcMap}}
-
广播变量
实现原理:
广播变量用来高效分发较大的对象。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务
分别发送。
案例:
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf) val rdd1 = sc.makeRDD(List(("a", 1),("b", 2),("c", 3))) // val rdd2 = sc.makeRDD(List( // ("a", 4),("b", 5),("c", 6) // ))val map = mutable.Map(("a", 4),("b", 5),("c", 6)) // join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用//val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)//joinRDD.collect().foreach(println)// (a, 1), (b, 2), (c, 3)// (a, (1,4)),(b, (2,5)),(c, (3,6))rdd1.map {case (w, c) => {val l: Int = map.getOrElse(w, 0)(w, (c, l))}}.collect().foreach(println) sc.stop() }
join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用
Spark 中的广播变量就可以将闭包的数据保存到Executor的内存中
Spark 中的广播变量不能更改 : 分布式共享只读变量
封装广播变量1
案例:
def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf) val rdd1 = sc.makeRDD(List(("a", 1),("b", 2),("c", 3)))val map = mutable.Map(("a", 4),("b", 5),("c", 6)) // 封装广播变量val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map) rdd1.map {case (w, c) => {// 方法广播变量val l: Int = bc.value.getOrElse(w, 0)(w, (c, l))}}.collect().foreach(println) sc.stop() }