目录
Spark简介
1 什么是Spark
2 Spark特点
3 Spark分布式环境安装
3.1 Spark HA的环境安装
3.2 动态增删一个worker节点到集群
4 Spark核心概念
5 Spark案例
5.2 Master URL
5.3 spark日志的管理
5.4 WordCount案例程序的执行过程
6 Spark作业运行架构图(standalone模式)
7 RDD操作
7.1 RDD初始化
7.2 RDD操作
7.3 transformation转换算子
7.3 action行动算子
8 高级排序
8.1 普通的排序
8.2 二次排序
8.3 分组TopN
8.4 优化分组TopN
9 持久化操作
9.1 为什要持久化
9.2 如何进行持久化
9.3 持久化策略
9.4 如何选择持久化策略
10 共享变量
10.1 概述
10.2 broadcast广播变量
10.3 accumulator累加器
10.4 自定义累加器
SparkCore基础
1 什么是Spark
Spark是一个通用的可扩展的处理海量数据集的计算引擎。
Spark集成离线计算,实时计算,SQL查询,机器学习,图计算为一体的通用的计算框架。
2 Spark特点
(1)快:相比给予MR,官方表明基于内存计算spark要快mr100倍,基于磁盘计算spark要快mr10倍
快的原因:①基于内存计算,②计算和数据的分离 ③基于DAGScheduler的计算划分 ④只有一次的Shuffle输出操作
(2)易用:Spark提供超过80多个高阶算子函数,来支持对数据集的各种各样的计算,使用的时候,可以使用java、scala、python、R,非常灵活易用。
(3)通用:在一个项目中,既可以使用离线计算,也可以使用其他比如,SQL查询,机器学习,图计算等等,而这时Spark最强大的优势
(4)到处运行
3 Spark分布式环境安装
(1)下载解压,添加环境变量
(2)修改配置文件
spark的配置文件,在$SPARK_HOME/conf目录下
①拷贝slaves和spark-env.sh文件 :cp slaves.template slaves和cp spark-env.sh.template spark-env.sh
②修改slaves配置,配置spark的从节点的主机名,spark中的从节点叫做worker,主节点叫做Master。vim slaves
bigdata02
bigdata03
③修改spark-env.sh文件,添加如下内容
export JAVA_HOME=/opt/jdk
export SCALA_HOME=/home/refuel/opt/mouldle/scala
export SPARK_MASTER_IP=bigdata01
export SPARK_MASTER_PORT=7077 ##rpc通信端口,类似hdfs的9000端口,不是50070
export SPARK_WORKER_CORES=2
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/home/refuel/opt/mouldle/hadoop/etc/hadoop
(3)同步spark到其它节点中
3.1 Spark HA的环境安装
有两种方式解决单点故障,一种基于文件系统FileSystem(生产中不用),还有一种基于Zookeeper(使用)。 配置基于Zookeeper的一个ha是非常简单的,只需要在spark-env.sh中添加一句话即可。
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata01:2181,bigdata02:2181,bigdata03:2181 -Dspark.deploy.zookeeper.dir=/spark"
spark.deploy.recoveryMode设置成 ZOOKEEPER spark.deploy.zookeeper.urlZooKeeper URL spark.deploy.zookeeper.dir ZooKeeper 保存恢复状态的目录,缺省为 /spark。因为ha不确定master在bigdata01上面启动,所以将export SPARK_MASTER_IP=bigdata01和export SPARK_MASTER_PORT=7077注释掉
3.2 动态增删一个worker节点到集群
(1)上线一个节点:不需要在现有集群的配置上做任何修改,只需要准备一台worker机器即可,可和之前的worker的配置相同。
(2)下线一个节点:kill或者stop-slave.sh都可以
4 Spark核心概念
ClusterManager:在Standalone(依托于spark集群本身)模式中即为Master(主节点),控制整个集群,监控Worker。在YARN模式中为资源管理器ResourceManager。
Worker:从节点,负责控制计算节点,启动Executor。在YARN模式中为NodeManager,负责计算节点的控制,启动的进程叫Container。
Driver:运行Application的main()函数并创建SparkContext(是spark中最重要的一个概念,是spark编程的入口,作用相当于mr中的Job)。
Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executors。
SparkContext:整个应用的上下文,控制应用的生命周期,是spark编程的入口。
RDD:弹性式分布式数据集。Spark的基本计算单元,一组RDD可形成执行的有向无环图RDD Graph。
DAGScheduler:实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到TaskScheduler中。 DAGScheduler就是Spark的大脑,中枢神经
TaskScheduler:将任务(Task)分发给Executor执行。
Stage:一个Spark作业一般包含一到多个Stage。
Task :一个Stage包含一到多个Task,通过多个Task实现并行运行的功能。 task的个数由rdd的partition分区决定
Transformations:转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。
Actions:操作/行动(Actions)算子 (如:count, collect, foreach等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
SparkEnv:线程级别的上下文,存储运行时的重要组件的引用。SparkEnv内创建并包含如下一些重要组件的引用。
MapOutPutTracker:负责Shuffle元信息的存储
BroadcastManager:负责广播变量的控制与元信息的存储。
BlockManager:负责存储管理、创建和查找块。
MetricsSystem:监控运行时性能指标信息。
SparkConf:负责存储配置信息。作用相当于hadoop中的Configuration。
5 Spark案例
pom文件的依赖配置如下
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><!-- scala去除,因为spark-core包里有了scala的依赖<dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency> --><!-- sparkcore --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.2.2</version></dependency><!-- sparksql --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.2.2</version></dependency><!-- sparkstreaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.2</version></dependency></dependencies>
注意:入口类为SparkContext,java版本的是JavaSparkContext,scala的版本就是SparkContext;SparkSQL的入口有SQLContext、HiveContext;SparkStreaming的入口又是StreamingContext。
java版本
public class JavaSparkWordCountOps {public static void main(String[] args) {//step 1、创建编程入口类SparkConf conf = new SparkConf();conf.setMaster("local[*]");conf.setAppName(JavaSparkWordCountOps.class.getSimpleName());JavaSparkContext jsc = new JavaSparkContext(conf);//step 2、加载外部数据 形成spark中的计算的编程模型RDDJavaRDD<String> linesRDD = jsc.textFile("E:/hello.txt");// step 3、对加载的数据进行各种业务逻辑操作---转换操作transformationJavaRDD<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() {public Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split("\\s+")).iterator();}});//JavaRDD<String> wordsRDD = linesRDD.flatMap(line -> Arrays.asList(line.split("\\s+")).iterator());System.out.println("-----经过拆分之后的rdd数据----");wordsRDD.foreach(new VoidFunction<String>() {public void call(String s) throws Exception {System.out.println(s);}});System.out.println("-----word拼装成键值对----");JavaPairRDD<String, Integer> pairsRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {public Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word, 1);}});//JavaPairRDD<String, Integer> pairsRDD = wordsRDD.mapToPair(word -> new Tuple2<String, Integer>(word, 1));pairsRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {public void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t._1 + "--->" + t._2);}});System.out.println("------按照相同的key,统计value--------------");JavaPairRDD<String, Integer> retRDD = pairsRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {public Integer call(Integer v1, Integer v2) throws Exception {int i = 1 / 0; //印证出这些转换的transformation算子是懒加载的,需要action的触发return v1 + v2;}});//JavaPairRDD<String, Integer> retRDD = pairsRDD.reduceByKey((v1, v2) -> v1 + v2);retRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {public void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t._1 + "--->" + t._2);}});}
}
scala版本
object SparkWordCountOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("SparkWordCount")val sc = new SparkContext(conf)//load data from fileval linesRDD:RDD[String] = sc.textFile("E:/hello.txt")val wordsRDD:RDD[String] = linesRDD.flatMap(line => line.split("\\s+"))val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1))val ret = pairsRDD.reduceByKey((v1, v2) => v1 + v2)ret.foreach(t => println(t._1 + "---" + t._2))sc.stop()}
}
5.2 Master URL
master-url通过sparkConf.setMaster来完成。代表的是spark作业的执行方式,或者指定的spark程序的cluster-manager的类型。
master | 含义 |
---|---|
local | 程序在本地运行,同时为本地程序提供一个线程来处理 |
local[M] | 程序在本地运行,同时为本地程序分配M个工作线程来处理 |
local[*] | 程序在本地运行,同时为本地程序分配机器可用的CPU core的个数工作线程来处理 |
local[M, N] | 程序在本地运行,同时为本地程序分配M个工作线程来处理,如果提交程序失败,会进行最多N次的重试 |
spark://ip:port | 基于standalone的模式运行,提交撑到ip对应的master上运行 |
spark://ip1:port1,ip2:port2 | 基于standalone的ha模式运行,提交撑到ip对应的master上运行 |
yarn/启动脚本中的deploy-mode配置为cluster | 基于yarn模式的cluster方式运行,SparkContext的创建在NodeManager上面,在yarn集群中 |
yarn/启动脚本中的deploy-mode配置为client | 基于yarn模式的client方式运行,SparkContext的创建在提交程序的那台机器上面,不在yarn集群中 |
5.3 spark日志的管理
(1)全局管理:项目classpath下面引入log4j.properties配置文件进行管理
# 基本日志输出级别为INFO,输出目的地为console
log4j.rootCategory=INFO, consolelog4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n# 输出配置的是spark提供的日志级别
log4j.logger.org.spark_project.jetty=INFO
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
(2)局部管理 :就是在当前类中进行日志的管理。
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.spark_project").setLevel(Level.WARN)
5.4 WordCount案例程序的执行过程
当deploy-mode为client模式的时候,driver就在我们提交作业的本机,而spark的作业对应的executor在spark集群中运行。
在上图中可以发现相邻两个rdd之间有依赖关系,依赖分为宽依赖和窄依赖。
窄依赖:rdd中的partition中的数据只依赖于父rdd中的一个partition或者常数个partition。常见的窄依赖操作有:flatMap,map,filter,coalesce等
宽依赖:rdd中的partition中的数据只依赖于父rdd中的所有partition。常见的宽依赖操作有reduceByKey,groupByKey,join,sortByKey,repartition等
rdd和rdd之间的依赖关系构成了一个链条,这个链条称之为lineage(血缘)
6 Spark作业运行架构图(standalone模式)
①启动spark集群:通过spark的start-all.sh脚本启动spark集群,启动了对应的Master进程和Worker进程
②Worker启动之后向Master进程发送注册信息
③Worker向Master注册成功之后,worker要不断的向master发送心跳包,去监听主节点是否存在
④Driver向Spark集群提交作业,就是向Master提交作业,申请运行资源
⑤Master收到Driver的提交请求,向Worker节点指派相应的作业任务,就是在对应的Worker节点上启动对应的executor进程
⑥Worker节点接收到Master节点启动executor任务之后,就启动对应的executor进程,向master汇报成功启动,可以接收任务
⑦executor进程启动之后,就像Driver进程进行反向注册,告诉Driver谁可以执行spark任务
⑧Driver接收到注册之后,就知道向谁发送spark作业,那么这样在spark集群中就有一组独立的executor进程为该Driver服务
⑨DAGScheduler根据编写的spark作业逻辑,将spark作业分成若干个阶段Stage(基于Spark的transformation里是否有shuffle Dependency),然后为每一个阶段组装一批task组成taskSet(task里面包含了序列化之后的我们编写的spark transformation),然后将这些DAGScheduler组装好的taskSet,交给taskScheduler,由taskScheduler将这些任务发给对应的executor
⑩executor进程接收到了Driver发送过来的taskSet之后,进行反序列化,然后将这些task封装进一个叫tasksunner的线程中,然后放到本地线程池中调度我们的作业的执行。
7 RDD操作
7.1 RDD初始化
RDD的初始化,原生api提供的2中创建方式:
①是读取文件textFile
②加载一个scala集合parallelize。
当然,也可以通过transformation算子来创建的RDD。
7.2 RDD操作
RDD操作算子的分类,基本上分为两类:transformation和action,当然更加细致的分,可以分为输入算子,转换算子,缓存算子,行动算子。
输入:在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理。
运行:在Spark数据输入形成RDD后便可以通过变换算子,如filter等,对数据进行操作并将RDD转化为新的RDD,通过Action算子,触发Spark提交作业。 如果数据需要复用,可以通过Cache算子,将数据缓存到内存。
输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS),或Scala数据或集合中(collect输出到Scala集合,count返回Scala int型数据)。
7.3 transformation转换算子
(1)map
rdd.map(func):RDD,对rdd集合中的每一个元素,都作用一次该func函数,之后返回值为生成元素构成的一个新的RDD。
(2)flatMap
rdd.flatMap(func):RDD ==>rdd集合中的每一个元素,都要作用func函数,返回0到多个新的元素,这些新的元素共同构成一个新的RDD。
map操作是一个一到一的操作,flatMap操作是一个1到多的操作
(3)filter
rdd.filter(func):RDD ==> 对rdd中的每一个元素操作func函数,该函数的返回值为Boolean类型,保留返回值为true的元素,共同构成一个新的RDD,过滤掉哪些返回值为false的元素。
(4)sample
rdd.sample(withReplacement:Boolean, fraction:Double [, seed:Long]):RDD ===> 抽样,sample抽样不是一个精确的抽样。一个非常重要的作用,就是来看rdd中数据的分布情况,根据数据分布的情况,进行各种调优与优化,防止数据倾斜。
withReplacement:抽样的方式,true有放回抽样, false为无返回抽样
fraction: 抽样比例,取值范围就是0~1
seed: 抽样的随机数种子,有默认值,通常也不需要传值
(5)union
rdd1.union(rdd2),联合rdd1和rdd2中的数据,形成一个新的rdd,其作用相当于sql中的union all。
(6)join
join就是sql中的inner join。
注意:要想两个RDD进行连接,那么这两个rdd的数据格式,必须是k-v键值对的,其中的k就是关联的条件,也就是sql中的on连接条件。
RDD1的类型[K, V], RDD2的类型[K, W]
内连接 :val joinedRDD:RDD[(K, (V, W))] = rdd1.join(rdd2)
左外连接 :val leftJoinedRDD:RDD[(K, (V, Option[W]))] = rdd1.leftOuterJoin(rdd2)
右外连接 :val rightJoinedRDD:RDD[(K, (Option[V], W))] = rdd1.rightOuterJoin(rdd2)
全连接 :val fullJoinedRDD:RDD[(K, (Option[V], Option[W]))] = rdd1.fullOuterJoin(rdd2)
(7)groupByKey
rdd.groupByKey(),按照key进行分组,如果原始rdd的类型时[(K, V)] ,那必然其结果就肯定[(K, Iterable[V])],是一个shuffle dependency宽依赖shuffle操作,但是这个groupByKey不建议在工作过程中使用,除非非要用,因为groupByKey没有本地预聚合,性能较差,一般我们能用下面的reduceByKey或者combineByKey或者aggregateByKey代替就尽量代替。
(8)reduceByKey
rdd.reduceByKey(func:(V, V) => V):RDD[(K, V)] :在scala集合中学习过一个reduce(func:(W, W) => W)操作,是一个聚合操作,这里的reduceByKey按照就理解为在groupByKey(按照key进行分组[(K, Iterable[V])])的基础上,对每一个key对应的Iterable[V]执行reduce操作。
同时reduceByKey操作会有一个本地预聚合的操作,所以是一个shuffle dependency宽依赖shuffle操作。
(9)sortByKey
按照key进行排序
(10)combineByKey
这是spark最底层的聚合算子之一,按照key进行各种各样的聚合操作,spark提供的很多高阶算子,都是基于该算子实现的。
def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] = {...
}
createCombiner: V => C, 相同的Key在分区中会调用一次该函数,用于创建聚合之后的类型,为了和后续Key相同的数据进行聚合;mergeValue: (C, V) => C, 在相同分区中基于上述createCombiner基础之上的局部聚合;mergeCombiners: (C, C) => C) 将每个分区中相同key聚合的结果在分区间进行全局聚合
(11)aggregateByKey
aggregateByKey[U:
ClassTag](zeroValue
:
U)(seqOp
:
(U, V)
=
> U, combOp
:
(U, U)
=
> U)
:
RDD[(K, U)]
和combineByKey都是一个相对底层的聚合算子,可以完成系统没有提供的其它操作,相当于自定义算子。aggregateByKey底层使用combineByKeyWithClassTag来实现,所以本质上二者没啥区别,区别就在于使用时的选择而已。
aggregateByKey更为简单,但是如果聚合前后数据类型不一致,建议使用combineByKey;同时如果初始化操作较为复杂,也建议使用combineByKey。
7.3 action行动算子
这些算子都是在rdd上的分区partition上面执行的,不是在driver本地执行。
(1)foreach
用于遍历RDD,将函数f应用于每一个元素,无返回值(action算子)
(2)count
统计该rdd中元素的个数
(3)take(n)
返回该rdd中的前N个元素,如果该rdd的数据是有序的,那么take(n)就是Top N
(4)first
take(n)中比较特殊的一个take(1)(0)
(5)collect
将分布在集群中的各个partition中的数据拉回到driver中,进行统一的处理;但是这个算子有很大的风险存在,第一,driver内存压力很大,第二数据在网络中大规模的传输,效率很低;所以一般不建议使用,如果非要用,请先执行filter。
(6)reduce
reduce是一个action操作,reduceByKey是一个transformation。reduce对一个rdd执行聚合操作,并返回结果,结果是一个值。
(7)countByKey
统计key出现的次数
(8)saveAsTextFile
保存到文件,本质上是saveAsHadoopFile[TextOutputFormat[NullWritable, Text]]
(9)saveAsObjectFile和saveAsSequenceFile
saveAsObjectFile本质上是saveAsSequenceFile
(10)saveAsHadoopFile和saveAsNewAPIHadoopFile
这二者的主要区别就是OutputFormat的区别,接口org.apache.hadoop.mapred.OutputFormat,
抽象类org.apache.hadoop.mapreduce.OutputFormat 所以saveAshadoopFile使用的是接口OutputFormat,saveAsNewAPIHadoopFile使用的抽象类OutputFormat,建议使用后者。
8 高级排序
8.1 普通的排序
(1)sortByKey
object SortByKeyOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SortByKeyOps").setMaster("local[2]")val sc = new SparkContext(conf)//sortByKey 数据类型为k-v,且是按照key进行排序val stuRDD:RDD[Student] = sc.parallelize(List(Student(1, "refuel01", 19, 168),Student(2, "refuel02", 25, 175),Student(3, "refuel03", 25, 176),Student(4, "refuel04", 16, 180),Student(5, "refuel05", 18, 168.5)))//按照学生身高进行降序排序val height2Stu = stuRDD.map(stu => (stu.height, stu))//注意:sortByKey是局部排序,不是全局排序,如果要进行全局排序,// 必须将所有的数据都拉取到一台机器上面才可以val sorted = height2Stu.sortByKey(ascending = false, numPartitions = 1)sorted.foreach{case (height, stu) => println(stu)}sc.stop()}
}case class Student(id:Int, name:String, age:Int, height:Double)
(2)sortBy
这个sortBy其实使用sortByKey来实现,但是比sortByKey更加灵活,因为sortByKey只能应用在k-v数据格式上,而这个sortBy可以应在非k-v键值对的数据格式上面。
val sortedBy = stuRDD.sortBy(stu => stu.height,ascending = true,numPartitions = 1)(new Ordering[Double](){override def compare(x: Double, y: Double) = y.compareTo(x)},ClassTag.Double.asInstanceOf[ClassTag[Double]])
sortedBy.foreach(println)
sortedBy的操作,除了正常的升序,分区个数以外,还需需要传递一个将原始数据类型,提取其中用于排序的字段;并且提供用于比较的方式,以及在运行时的数据类型ClassTag标记型trait。
(3)takeOrdered
takeOrdered也是对rdd进行排序,但是和上述的sortByKey和sortBy相比较,takeOrdered是一个action操作,返回值为一个集合,而前两者为transformation,返回值为rdd。如果我们想在driver中获取排序之后的结果,那么建议使用takeOrdered,因为该操作边排序边返回。其实是take和sortBy的一个结合体。
takeOrdered(n),获取排序之后的n条记录
//先按照身高降序排序,身高相对按照年龄升序排 ---> 二次排序
stuRDD.takeOrdered(3)(new Ordering[Student](){override def compare(x: Student, y: Student) = {var ret = y.height.compareTo(x.height)if(ret == 0) {ret = x.age.compareTo(y.age)}ret}
}).foreach(println)
8.2 二次排序
所谓二次排序,指的是排序字段不唯一,有多个,共同排序
object SortByKeyOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SortByKeyOps").setMaster("local[2]")val sc = new SparkContext(conf)//sortByKey 数据类型为k-v,且是按照key进行排序val personRDD:RDD[Student] = sc.parallelize(List(Student(1, "refuel01", 19, 168),Student(2, "refuel02", 25, 175),Student(3, "refuel03", 25, 176),Student(4, "refuel04", 16, 180),Student(5, "refuel05", 18, 168.5)))personRDD.map(stu => (stu, null)).sortByKey(true, 1).foreach(p => println(p._1))sc.stop()}
}case class Person(id:Int, name:String, age:Int, height:Double) extends Ordered[Person] {//对学生的身高和年龄依次排序override def compare(that: Person) = {var ret = this.height.compareTo(that.height)if(ret == 0) {ret = this.age.compareTo(that.age)}ret}
}
8.3 分组TopN
在分组的情况之下,获取每个组内的TopN数据
object GroupSortTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupSortTopN").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/topn.txt")//按照科目进行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总val course2Infos:RDD[(String, Iterable[String])] = course2Info.groupByKey()//按照key进行分组//分组内的排序val sorted:RDD[(String, mutable.TreeSet[String])] = course2Infos.map{case (course, infos) => {val topN = mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})for(info <- infos) {topN.add(info)}(course, topN.take(3))}}sorted.foreach(println)sc.stop()}
}
8.4 优化分组TopN
上述在编码过程当中使用groupByKey,我们说着这个算子的性能很差,因为没有本地预聚合,所以应该在开发过程当中尽量避免使用,能用其它代替就代替。
(1)使用combineByKey优化1
object GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupSortByCombineByKeyTopN").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/topn.txt")//按照科目进行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总val course2Infos= course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)//分组内的排序val sorted:RDD[(String, mutable.TreeSet[String])] = course2Infos.map{case (course, infos) => {val topN = mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})for(info <- infos) {topN.add(info)}(course, topN.take(3))}}sorted.foreach(println)sc.stop()}def createCombiner(info:String): ArrayBuffer[String] = {val ab = new ArrayBuffer[String]()ab.append(info)ab}def mergeValue(ab:ArrayBuffer[String], info:String): ArrayBuffer[String] = {ab.append(info)ab}def mergeCombiners(ab:ArrayBuffer[String], ab1: ArrayBuffer[String]): ArrayBuffer[String] = {ab.++:(ab1)}
}
此时这种写法和上面的groupByKey性能一模一样,没有任何的优化。
(2)使用combineByKey的优化2
object GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupSortByCombineByKeyTopN").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/spark/topn.txt")//按照科目进行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总val sorted= course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)sorted.foreach(println)sc.stop()}def createCombiner(info:String): mutable.TreeSet[String] = {val ts = new mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})ts.add(info)ts}def mergeValue(ab:mutable.TreeSet[String], info:String): mutable.TreeSet[String] = {ab.add(info)if(ab.size > 3) {ab.take(3)} else {ab}}def mergeCombiners(ab:mutable.TreeSet[String], ab1: mutable.TreeSet[String]): mutable.TreeSet[String] = {for (info <- ab1) {ab.add(info)}if(ab.size > 3) {ab.take(3)} else {ab}}
}
9 持久化操作
9.1 为什要持久化
一个RDD如果被多次操作,为了提交后续的执行效率,我们建议对该RDD进行持久化操作。
9.2 如何进行持久化
rdd.persist()/cache()就完成了rdd的持久化操作,我们可以将该rdd的数据持久化到内存,磁盘,等等。
如果我们已经不再对该rdd进行一个操作,而此时程序并没有终止,可以卸载已经持久化的该rdd数据,rdd.unPersist()。
9.3 持久化策略
可以通过persist(StoreageLevle的对象)来指定持久化策略,eg:StorageLevel.MEMORY_ONLY。
持久化策略 | 含义 |
---|---|
MEMORY_ONLY(默认) | rdd中的数据以未经序列化的java对象格式,存储在内存中。如果内存不足,剩余的部分不持久化,使用的时候,没有持久化的那一部分数据重新加载。这种效率是最高,但是是对内存要求最高的。 |
MEMORY_ONLY_SER | 就比MEMORY_ONLY多了一个SER序列化,保存在内存中的数据是经过序列化之后的字节数组,同时每一个partition此时就是一个比较大的字节数组。 |
MEMORY_AND_DISK | 和MEMORY_ONLY相比就多了一个,内存存不下的数据存储在磁盘中。 |
MEMEORY_AND_DISK_SER | 比MEMORY_AND_DISK多了个序列化。 |
DISK_ONLY | 就是MEMORY_ONLY对应,都保存在磁盘,效率太差,一般不用。 |
xxx_2 | 就是上述多个策略后面加了一个_2,比如MEMORY_ONLY_2,MEMORY_AND_DISK_SER_2等等,就多了一个replicate而已,备份,所以性能会下降,但是容错或者高可用加强了。所以需要在二者直接做权衡。如果说要求数据具备高可用,同时容错的时间花费比从新计算花费时间少,此时便可以使用,否则一般不用。 |
HEAP_OFF(experimental) | 使用非Spark的内存,也即堆外内存,比如Tachyon,HBase、Redis等等内存来补充spark数据的缓存。 |
9.4 如何选择持久化策略
(1)如果要持久化的数据是可以在内存中进行保存,那么毫无疑问,选择MEMEORY_ONLY,因为这种方式的效率是最高的,但是在生成中往往要进行缓存的数据量还是蛮大的,而且因为数据都是未经序列化的java对象,所以很容易引起频繁的gc。
(2)如果上述满足不了,就退而求其次,MEMORY_ONLY_SER,这种方式增加的额外的性能开销就是序列化和反序列化,经过反序列化之后的对象就是纯java对象,因此性能还是蛮高的。
(3)如果还是扛不住,再退而求其次,MEMOEY_AND_DISK_SER,因为到这一步的话,那说明对象体积确实很多,为了提交执行效率,应该尽可能的将数据保存在内存,所以就对数据进行序列化,其次在序列化到磁盘。
(4)一般情况下DISK_ONLY,DISK_SER不用,效率太低,有时候真的不容从源头计算一遍。
(5)一般情况下我们都不用XXX_2,代备份的种种持久化策略,除非程序对数据的安全性要求非常高,或者说备份的对性能的消耗低于从头再算一遍,我们可以使用这种xxx_2以外,基本不用。
10 共享变量
10.1 概述
如果transformation使用到Driver中的变量,在executor中执行的时候,就需要通过网络传输到对应的executor,如果该变量很大,那么网络传输一定会成为性能的瓶颈。Spark就提供了两种有限类型的共享变量:累加器和广播变量
10.2 broadcast广播变量
广播变量:为每个task都拷贝一份变量,将变量包装成为一个广播变量(broadcast),只需要在executor中拷贝一份,在task运行的时候,直接从executor调用即可,相当于局部变量变成成员变量,性能就得到了提升。
val num:Any = xxxval numBC:Broadcast[Any] = sc.broadcast(num)调用:val n = numBC.value注意:该num需要进行序列化。
10.3 accumulator累加器
累加器的一个好处是,不需要修改程序的业务逻辑来完成数据累加,同时也不需要额外的触发一个action job来完成累加,反之必须要添加新的业务逻辑,必须要触发一个新的action job来完成,显然这个accumulator的操作性能更佳!
构建一个累加器val accu = sc.longAccumuator()累加的操作accu.add(参数)获取累加器的结果val ret = accu.value
val conf = new SparkConf()
.setAppName("AccumulatorOps")
.setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/data.txt")
val words = lines.flatMap(_.split("\\s+"))//统计每个单词出现的次数
val accumulator = sc.longAccumulatorval rbk = words.map(word => {if(word == "is")accumulator.add(1)(word, 1)
}).reduceByKey(_+_)
rbk.foreach(println)
println("================使用累加器===================")
println("is: " + accumulator.value)Thread.sleep(1000000)
sc.stop()
注意:累加器的调用,在action之后被调用,也就是说累加器必须在action触发之后;多次使用同一个累加器,应该尽量做到用完即重置;尽量给累加器指定name,方便我们在web-ui上面进行查看
10.4 自定义累加器
自定义一个类继承AccumulatorV2,重写方法
/*自定义累加器IN 指的是accmulator.add(sth.)中sth的数据类型OUT 指的是accmulator.value返回值的数据类型*/
class MyAccumulator extends AccumulatorV2[String, Map[String, Long]] {private var map = mutable.Map[String, Long]()/*** 当前累加器是否有初始化值* 如果为一个long的值,0就是初始化值,如果为list,Nil就是初始化值,是map,Map()就是初始化值*/override def isZero: Boolean = trueoverride def copy(): AccumulatorV2[String, Map[String, Long]] = {val accu = new MyAccumulatoraccu.map = this.mapaccu}override def reset(): Unit = map.clear()//分区内的数据累加 is: 5, of:4override def add(word: String): Unit = {if(map.contains(word)) {val newCount = map(word) + 1map.put(word, newCount)} else {map.put(word, 1)}
// map.put(word, map.getOrElse(word, 0) + 1)}//多个分区间的数据累加override def merge(other: AccumulatorV2[String, Map[String, Long]]): Unit = {other.value.foreach{case (word, count) => {if(map.contains(word)) {val newCount = map(word) + countmap.put(word, newCount)} else {map.put(word, count)}
// map.put(word, map.getOrElse(word, 0) + count)}}}override def value: Map[String, Long] = map.toMap
}
注册使用:
object _08AccumulatorOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("$AccumulatorOps").setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data.txt")val words = lines.flatMap(_.split("\\s+"))//注册val myAccu = new MyAccumulator()sc.register(myAccu, "myAccu")//统计每个单词出现的次数val pairs = words.map(word => {if(word == "is" || word == "of" || word == "a")myAccu.add(word)(word, 1)})val rbk = pairs.reduceByKey(_+_)rbk.foreach(println)println("=============累加器==========")myAccu.value.foreach(println)Thread.sleep(10000000)sc.stop()}
}