03-240605
1. 行动算子-1
-
reduce
聚合
格式:
def reduce(f: (T, T) => T): T
例子:
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(1,2,3,4)) // TODO - 行动算子 //reduceval i: Int = rdd.reduce(_+_)println(i)
输出结果:
10
-
collect
采集
格式:
def collect(): Array[T]
例子:
// collect : 方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组val ints: Array[Int] = rdd.collect()println(ints.mkString(","))
输出结果:
1,2,3,4
-
count
计数
格式:
def count(): Long
例子:
// count : 数据源中数据的个数val cnt = rdd.count()println(cnt)
运行结果:
4
-
first
获取数据源的第一个数据
格式:
def first(): T
例子:
// first : 获取数据源中数据的第一个val first = rdd.first()println(first)
输出结果:
1
-
take
获取数据源的N个数据
格式:
def take(num: Int): Array[T]
例子:
// take : 获取N个数据val ints: Array[Int] = rdd.take(3)println(ints.mkString(","))
输出结果:
1,2,3
-
takeOrdered
数据排序后.再取第N个数据
格式:
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
例子:
// takeOrdered : 数据排序后,取N个数据val rdd1 = sc.makeRDD(List(4,2,3,1))val ints1: Array[Int] = rdd1.takeOrdered(3)println(ints1.mkString(","))
输出结果:
1,2,3
-
aggregate
给定初始值,初始值参与分区内与分区间的计算
格式:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
例子:
val rdd = sc.makeRDD(List(1,2,3,4),2)//10 + 13 + 17 = 40// aggregateByKey : 初始值只会参与分区内计算// aggregate : 初始值会参与分区内计算,并且和参与分区间计算val result = rdd.aggregate(10)(_+_._+_)println(result)
输出结果:
40
-
fold
折叠操作,aggregate的简化版操作
格式:
def fold(zeroValue: T)(op: (T, T) => T): T
例子:
//10 + 13 + 17 = 40// aggregateByKey : 初始值只会参与分区内计算// aggregate : 初始值会参与分区内计算,并且和参与分区间计算//val result = rdd.aggregate(10)(_+_, _+_)val result = rdd.fold(10)(_+_)println(result)
输出结果:
40
-
countByKey 与 countByValue
都是统计每种Key或者Value出现的个数
格式:
def countByKey(): Map[K, Long]
例子:
val rdd = sc.makeRDD(List(("a", 1),("a", 2),("a", 3)))//val intToLong: collection.Map[Int, Long] = rdd.countByValue()//println(intToLong)val stringToLong: collection.Map[String, Long] = rdd.countByKey()println(stringToLong)
输出结果:
Map(a -> 3)
-
WordCount 不同的实现方式:
运用9种不同的方式实现WordCount
-
使用groupBy:
// groupBydef wordcount1(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val group: RDD[(String, Iterable[String])] = words.groupBy(word=>word)val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size)}
-
使用groupByKey:
// groupByKeydef wordcount2(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val group: RDD[(String, Iterable[Int])] = wordOne.groupByKey()val wordCount: RDD[(String, Int)] = group.mapValues(iter=>iter.size)}
-
使用reduceByKey:
// reduceByKeydef wordcount3(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_+_)}
-
使用aggregateByKey
// aggregateByKeydef wordcount4(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.aggregateByKey(0)(_+_, _+_)}
-
使用foldByKey:
// foldByKeydef wordcount5(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.foldByKey(0)(_+_)}
-
使用combineByKey:
// combineByKeydef wordcount6(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: RDD[(String, Int)] = wordOne.combineByKey(v=>v,(x:Int, y) => x + y,(x:Int, y:Int) => x + y)}
-
使用countByKey:
// countByKeydef wordcount7(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordOne = words.map((_,1))val wordCount: collection.Map[String, Long] = wordOne.countByKey()}
-
使用countByValue:
// countByValuedef wordcount8(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))val wordCount: collection.Map[String, Long] = words.countByValue()}
-
使用reduce:
def wordcount91011(sc : SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))val words = rdd.flatMap(_.split(" "))// 【(word, count),(word, count)】// word => Map[(word,1)]val mapWord = words.map(word => {mutable.Map[String, Long]((word,1))})val wordCount = mapWord.reduce((map1, map2) => {map2.foreach{case (word, count) => {val newCount = map1.getOrElse(word, 0L) + countmap1.update(word, newCount)}}map1})println(wordCount)}
2. 序列化
算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。
-
RDD序列化
案例:
object Spark01_RDD_Serial {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf)val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))val search = new Search("h")//search.getMatch1(rdd).collect().foreach(println)search.getMatch2(rdd).collect().foreach(println)sc.stop()}// 查询对象// 类的构造参数其实是类的属性, 构造参数需要进行闭包检测,其实就等同于类进行闭包检测class Search(query:String){def isMatch(s: String): Boolean = {s.contains(this.query)}// 函数序列化案例def getMatch1 (rdd: RDD[String]): RDD[String] = {rdd.filter(isMatch)}// 属性序列化案例def getMatch2(rdd: RDD[String]): RDD[String] = {val s = queryrdd.filter(x => x.contains(s))}} }
输出结果:
-
Kryo序列化框架
Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。
了解一下就行
案例:
def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SerDemo").setMaster("local[*]")// 替换默认的序列化机制.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 注册需要使用 kryo 序列化的自定义类.registerKryoClasses(Array(classOf[Searcher]))val sc = new SparkContext(conf)val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2)val searcher = new Searcher("hello")val result: RDD[String] = searcher.getMatchedRDD1(rdd)result.collect.foreach(println)} } case class Searcher(val query: String) {def isMatch(s: String) = {s.contains(query)}def getMatchedRDD1(rdd: RDD[String]) = {rdd.filter(isMatch) }def getMatchedRDD2(rdd: RDD[String]) = {val q = queryrdd.filter(_.contains(q))}
Kryo绕过了Java的序列化机制,Kryo比Java序列化小,适合大数据传输、存储
-
RDD 血缘关系
toDebugString查看血缘关系
多个连续的RDD的依赖关系,称之为血缘关系
演示:
关于如何将RDD间的关系保存下来:
血缘关系演示:
-
RDD的依赖关系
dependencies查看依赖关系
OneToOne依赖(窄依赖)
窄依赖我们形象的比喻为独生子女。
Shuffle依赖(宽依赖):
宽依赖我们形象的比喻为多生。
-
RDD 阶级划分
-
RDD 任务划分
源码演示:
-
RDD 的持久化
这样的复用在底层不是很好用:
应该这样:
放在内存中 mapRDD.cache()
放在磁盘中 mapRDD.persist()
Cache缓存:
-
RDD CheckPoint 检查点
checkpoint 需要落盘,需要指定检查点保存路径
检查点路径保存的文件,当作业执行完毕后,不会被删除
一般保存路径都是在分布式存储系统: HDFS
-
checkpoint、Cache、Persist的区别:
以上三个都可以存储,关于他们的区别:
cache : 将数据临时存储在内存中进行数据重用
会在血缘关系中添加新的依赖。一旦出现问题,可以重新读取数据
persist : 将数据临时存储在硬盘文件中进行数据重用
涉及到磁盘IO,性能较低,但是数据安全
如果作业执行完毕,临时保存的数据文件就会丢失
checkpoint : 将数据长久地保存在磁盘文件中进行数据重用
涉及到磁盘IO,性能较低,但是数据安全
为了保证数据安全,所以一般情况下,会独立执行作业
为了能够提高效率,一般情况下,是需要和cache联合使用
执行过程中,会切断血缘关系,重新建立新的血缘关系
checkpoint等同于改变数据源