SparkCore基础

目录

 

                                             Spark简介

1 什么是Spark

2 Spark特点

3 Spark分布式环境安装

3.1 Spark HA的环境安装

3.2 动态增删一个worker节点到集群

4 Spark核心概念

5 Spark案例

5.2  Master URL

5.3 spark日志的管理

5.4 WordCount案例程序的执行过程

6 Spark作业运行架构图(standalone模式)

7 RDD操作

7.1 RDD初始化

7.2 RDD操作

7.3 transformation转换算子

7.3 action行动算子

8 高级排序

8.1 普通的排序

8.2 二次排序

8.3 分组TopN

8.4 优化分组TopN

9 持久化操作

9.1 为什要持久化

9.2 如何进行持久化

9.3 持久化策略

9.4 如何选择持久化策略

10 共享变量

10.1 概述

10.2 broadcast广播变量

10.3 accumulator累加器

10.4 自定义累加器


                                             SparkCore基础

什么是Spark

Spark是一个通用的可扩展的处理海量数据集的计算引擎。

Spark集成离线计算,实时计算,SQL查询,机器学习,图计算为一体的通用的计算框架。

2 Spark特点

(1)快:相比给予MR,官方表明基于内存计算spark要快mr100倍,基于磁盘计算spark要快mr10倍

快的原因:①基于内存计算,②计算和数据的分离 ③基于DAGScheduler的计算划分 ④只有一次的Shuffle输出操作

(2)易用:Spark提供超过80多个高阶算子函数,来支持对数据集的各种各样的计算,使用的时候,可以使用java、scala、python、R,非常灵活易用。

(3)通用:在一个项目中,既可以使用离线计算,也可以使用其他比如,SQL查询,机器学习,图计算等等,而这时Spark最强大的优势

(4)到处运行

3 Spark分布式环境安装

(1)下载解压,添加环境变量

(2)修改配置文件

spark的配置文件,在$SPARK_HOME/conf目录下

①拷贝slaves和spark-env.sh文件 :cp slaves.template slaves和cp spark-env.sh.template spark-env.sh

②修改slaves配置,配置spark的从节点的主机名,spark中的从节点叫做worker,主节点叫做Master。vim slaves

bigdata02
bigdata03

③修改spark-env.sh文件,添加如下内容

export JAVA_HOME=/opt/jdk
export SCALA_HOME=/home/refuel/opt/mouldle/scala
export SPARK_MASTER_IP=bigdata01
export SPARK_MASTER_PORT=7077 ##rpc通信端口,类似hdfs的9000端口,不是50070
export SPARK_WORKER_CORES=2
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/home/refuel/opt/mouldle/hadoop/etc/hadoop

(3)同步spark到其它节点中

3.1 Spark HA的环境安装

有两种方式解决单点故障,一种基于文件系统FileSystem(生产中不用),还有一种基于Zookeeper(使用)。 配置基于Zookeeper的一个ha是非常简单的,只需要在spark-env.sh中添加一句话即可。

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata01:2181,bigdata02:2181,bigdata03:2181 -Dspark.deploy.zookeeper.dir=/spark"

spark.deploy.recoveryMode设置成 ZOOKEEPER spark.deploy.zookeeper.urlZooKeeper URL spark.deploy.zookeeper.dir ZooKeeper 保存恢复状态的目录,缺省为 /spark。因为ha不确定master在bigdata01上面启动,所以将export SPARK_MASTER_IP=bigdata01和export SPARK_MASTER_PORT=7077注释掉

3.2 动态增删一个worker节点到集群

(1)上线一个节点:不需要在现有集群的配置上做任何修改,只需要准备一台worker机器即可,可和之前的worker的配置相同。

(2)下线一个节点:kill或者stop-slave.sh都可以

4 Spark核心概念

ClusterManager:在Standalone(依托于spark集群本身)模式中即为Master(主节点),控制整个集群,监控Worker。在YARN模式中为资源管理器ResourceManager。

Worker:从节点,负责控制计算节点,启动Executor。在YARN模式中为NodeManager,负责计算节点的控制,启动的进程叫Container。

Driver:运行Application的main()函数并创建SparkContext(是spark中最重要的一个概念,是spark编程的入口,作用相当于mr中的Job)。

Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executors。

SparkContext:整个应用的上下文,控制应用的生命周期,是spark编程的入口。

RDD:弹性式分布式数据集。Spark的基本计算单元,一组RDD可形成执行的有向无环图RDD Graph。

DAGScheduler:实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到TaskScheduler中。 DAGScheduler就是Spark的大脑,中枢神经

TaskScheduler:将任务(Task)分发给Executor执行。

Stage:一个Spark作业一般包含一到多个Stage。

Task :一个Stage包含一到多个Task,通过多个Task实现并行运行的功能。 task的个数由rdd的partition分区决定

Transformations:转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。

Actions:操作/行动(Actions)算子 (如:count, collect, foreach等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。

SparkEnv:线程级别的上下文,存储运行时的重要组件的引用。SparkEnv内创建并包含如下一些重要组件的引用。

MapOutPutTracker:负责Shuffle元信息的存储

BroadcastManager:负责广播变量的控制与元信息的存储。

BlockManager:负责存储管理、创建和查找块。

MetricsSystem:监控运行时性能指标信息。

SparkConf:负责存储配置信息。作用相当于hadoop中的Configuration。

5 Spark案例

pom文件的依赖配置如下

<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><!-- scala去除,因为spark-core包里有了scala的依赖<dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency>  --><!-- sparkcore --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.2.2</version></dependency><!-- sparksql --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.2.2</version></dependency><!-- sparkstreaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.2</version></dependency></dependencies>

注意:入口类为SparkContext,java版本的是JavaSparkContext,scala的版本就是SparkContext;SparkSQL的入口有SQLContext、HiveContext;SparkStreaming的入口又是StreamingContext。

java版本

public class JavaSparkWordCountOps {public static void main(String[] args) {//step 1、创建编程入口类SparkConf conf = new SparkConf();conf.setMaster("local[*]");conf.setAppName(JavaSparkWordCountOps.class.getSimpleName());JavaSparkContext jsc = new JavaSparkContext(conf);//step 2、加载外部数据 形成spark中的计算的编程模型RDDJavaRDD<String> linesRDD = jsc.textFile("E:/hello.txt");// step 3、对加载的数据进行各种业务逻辑操作---转换操作transformationJavaRDD<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() {public Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split("\\s+")).iterator();}});//JavaRDD<String> wordsRDD = linesRDD.flatMap(line -> Arrays.asList(line.split("\\s+")).iterator());System.out.println("-----经过拆分之后的rdd数据----");wordsRDD.foreach(new VoidFunction<String>() {public void call(String s) throws Exception {System.out.println(s);}});System.out.println("-----word拼装成键值对----");JavaPairRDD<String, Integer> pairsRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {public Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word, 1);}});//JavaPairRDD<String, Integer> pairsRDD = wordsRDD.mapToPair(word -> new Tuple2<String, Integer>(word, 1));pairsRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {public void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t._1 + "--->" + t._2);}});System.out.println("------按照相同的key,统计value--------------");JavaPairRDD<String, Integer> retRDD = pairsRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {public Integer call(Integer v1, Integer v2) throws Exception {int i = 1 / 0; //印证出这些转换的transformation算子是懒加载的,需要action的触发return v1 + v2;}});//JavaPairRDD<String, Integer> retRDD = pairsRDD.reduceByKey((v1, v2) -> v1 + v2);retRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {public void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t._1 + "--->" + t._2);}});}
}

scala版本

object SparkWordCountOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("SparkWordCount")val sc = new SparkContext(conf)//load data from fileval linesRDD:RDD[String] = sc.textFile("E:/hello.txt")val wordsRDD:RDD[String] = linesRDD.flatMap(line => line.split("\\s+"))val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1))val ret = pairsRDD.reduceByKey((v1, v2) => v1 + v2)ret.foreach(t => println(t._1 + "---" + t._2))sc.stop()}
}

5.2  Master URL

master-url通过sparkConf.setMaster来完成。代表的是spark作业的执行方式,或者指定的spark程序的cluster-manager的类型。

master含义
local程序在本地运行,同时为本地程序提供一个线程来处理
local[M]程序在本地运行,同时为本地程序分配M个工作线程来处理
local[*]程序在本地运行,同时为本地程序分配机器可用的CPU core的个数工作线程来处理
local[M, N]程序在本地运行,同时为本地程序分配M个工作线程来处理,如果提交程序失败,会进行最多N次的重试
spark://ip:port基于standalone的模式运行,提交撑到ip对应的master上运行
spark://ip1:port1,ip2:port2基于standalone的ha模式运行,提交撑到ip对应的master上运行
yarn/启动脚本中的deploy-mode配置为cluster基于yarn模式的cluster方式运行,SparkContext的创建在NodeManager上面,在yarn集群中
yarn/启动脚本中的deploy-mode配置为client基于yarn模式的client方式运行,SparkContext的创建在提交程序的那台机器上面,不在yarn集群中

5.3 spark日志的管理

(1)全局管理:项目classpath下面引入log4j.properties配置文件进行管理

# 基本日志输出级别为INFO,输出目的地为console
log4j.rootCategory=INFO, consolelog4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n# 输出配置的是spark提供的日志级别
log4j.logger.org.spark_project.jetty=INFO
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

(2)局部管理 :就是在当前类中进行日志的管理。

import org.apache.log4j.{Level, Logger}
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.spark_project").setLevel(Level.WARN)

5.4 WordCount案例程序的执行过程

当deploy-mode为client模式的时候,driver就在我们提交作业的本机,而spark的作业对应的executor在spark集群中运行。

在上图中可以发现相邻两个rdd之间有依赖关系,依赖分为宽依赖和窄依赖。

窄依赖:rdd中的partition中的数据只依赖于父rdd中的一个partition或者常数个partition。常见的窄依赖操作有:flatMap,map,filter,coalesce等

宽依赖:rdd中的partition中的数据只依赖于父rdd中的所有partition。常见的宽依赖操作有reduceByKey,groupByKey,join,sortByKey,repartition等

rdd和rdd之间的依赖关系构成了一个链条,这个链条称之为lineage(血缘)

6 Spark作业运行架构图(standalone模式)

①启动spark集群:通过spark的start-all.sh脚本启动spark集群,启动了对应的Master进程和Worker进程

②Worker启动之后向Master进程发送注册信息

③Worker向Master注册成功之后,worker要不断的向master发送心跳包,去监听主节点是否存在

④Driver向Spark集群提交作业,就是向Master提交作业,申请运行资源

⑤Master收到Driver的提交请求,向Worker节点指派相应的作业任务,就是在对应的Worker节点上启动对应的executor进程

⑥Worker节点接收到Master节点启动executor任务之后,就启动对应的executor进程,向master汇报成功启动,可以接收任务

⑦executor进程启动之后,就像Driver进程进行反向注册,告诉Driver谁可以执行spark任务

⑧Driver接收到注册之后,就知道向谁发送spark作业,那么这样在spark集群中就有一组独立的executor进程为该Driver服务

⑨DAGScheduler根据编写的spark作业逻辑,将spark作业分成若干个阶段Stage(基于Spark的transformation里是否有shuffle Dependency),然后为每一个阶段组装一批task组成taskSet(task里面包含了序列化之后的我们编写的spark transformation),然后将这些DAGScheduler组装好的taskSet,交给taskScheduler,由taskScheduler将这些任务发给对应的executor

⑩executor进程接收到了Driver发送过来的taskSet之后,进行反序列化,然后将这些task封装进一个叫tasksunner的线程中,然后放到本地线程池中调度我们的作业的执行。

7 RDD操作

7.1 RDD初始化

RDD的初始化,原生api提供的2中创建方式:

①是读取文件textFile

②加载一个scala集合parallelize。

当然,也可以通过transformation算子来创建的RDD。

7.2 RDD操作

RDD操作算子的分类,基本上分为两类:transformation和action,当然更加细致的分,可以分为输入算子,转换算子,缓存算子,行动算子。

输入:在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理。

运行:在Spark数据输入形成RDD后便可以通过变换算子,如filter等,对数据进行操作并将RDD转化为新的RDD,通过Action算子,触发Spark提交作业。 如果数据需要复用,可以通过Cache算子,将数据缓存到内存。

输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS),或Scala数据或集合中(collect输出到Scala集合,count返回Scala int型数据)。

7.3 transformation转换算子

(1)map

rdd.map(func):RDD,对rdd集合中的每一个元素,都作用一次该func函数,之后返回值为生成元素构成的一个新的RDD。

(2)flatMap

rdd.flatMap(func):RDD ==>rdd集合中的每一个元素,都要作用func函数,返回0到多个新的元素,这些新的元素共同构成一个新的RDD。

map操作是一个一到一的操作,flatMap操作是一个1到多的操作

(3)filter

rdd.filter(func):RDD ==> 对rdd中的每一个元素操作func函数,该函数的返回值为Boolean类型,保留返回值为true的元素,共同构成一个新的RDD,过滤掉哪些返回值为false的元素。

(4)sample

rdd.sample(withReplacement:Boolean, fraction:Double [, seed:Long]):RDD ===> 抽样,sample抽样不是一个精确的抽样。一个非常重要的作用,就是来看rdd中数据的分布情况,根据数据分布的情况,进行各种调优与优化,防止数据倾斜。

withReplacement:抽样的方式,true有放回抽样, false为无返回抽样

fraction: 抽样比例,取值范围就是0~1

seed: 抽样的随机数种子,有默认值,通常也不需要传值

(5)union

rdd1.union(rdd2),联合rdd1和rdd2中的数据,形成一个新的rdd,其作用相当于sql中的union all。

(6)join

join就是sql中的inner join。

注意:要想两个RDD进行连接,那么这两个rdd的数据格式,必须是k-v键值对的,其中的k就是关联的条件,也就是sql中的on连接条件。

RDD1的类型[K, V], RDD2的类型[K, W]

内连接 :val joinedRDD:RDD[(K, (V, W))] = rdd1.join(rdd2)

左外连接 :val leftJoinedRDD:RDD[(K, (V, Option[W]))] = rdd1.leftOuterJoin(rdd2)

右外连接 :val rightJoinedRDD:RDD[(K, (Option[V], W))] = rdd1.rightOuterJoin(rdd2)

全连接 :val fullJoinedRDD:RDD[(K, (Option[V], Option[W]))] = rdd1.fullOuterJoin(rdd2)

(7)groupByKey

rdd.groupByKey(),按照key进行分组,如果原始rdd的类型时[(K, V)] ,那必然其结果就肯定[(K, Iterable[V])],是一个shuffle dependency宽依赖shuffle操作,但是这个groupByKey不建议在工作过程中使用,除非非要用,因为groupByKey没有本地预聚合,性能较差,一般我们能用下面的reduceByKey或者combineByKey或者aggregateByKey代替就尽量代替。

(8)reduceByKey

rdd.reduceByKey(func:(V, V) => V):RDD[(K, V)] :在scala集合中学习过一个reduce(func:(W, W) => W)操作,是一个聚合操作,这里的reduceByKey按照就理解为在groupByKey(按照key进行分组[(K, Iterable[V])])的基础上,对每一个key对应的Iterable[V]执行reduce操作。

同时reduceByKey操作会有一个本地预聚合的操作,所以是一个shuffle dependency宽依赖shuffle操作。

(9)sortByKey

按照key进行排序

(10)combineByKey

这是spark最底层的聚合算子之一,按照key进行各种各样的聚合操作,spark提供的很多高阶算子,都是基于该算子实现的。

def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] =  {...
}

createCombiner: V => C, 相同的Key在分区中会调用一次该函数,用于创建聚合之后的类型,为了和后续Key相同的数据进行聚合;mergeValue: (C, V) => C, 在相同分区中基于上述createCombiner基础之上的局部聚合;mergeCombiners: (C, C) => C) 将每个分区中相同key聚合的结果在分区间进行全局聚合

(11)aggregateByKey

aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]和combineByKey都是一个相对底层的聚合算子,可以完成系统没有提供的其它操作,相当于自定义算子。aggregateByKey底层使用combineByKeyWithClassTag来实现,所以本质上二者没啥区别,区别就在于使用时的选择而已。

aggregateByKey更为简单,但是如果聚合前后数据类型不一致,建议使用combineByKey;同时如果初始化操作较为复杂,也建议使用combineByKey。

7.3 action行动算子

这些算子都是在rdd上的分区partition上面执行的,不是在driver本地执行。

(1)foreach

用于遍历RDD,将函数f应用于每一个元素,无返回值(action算子)

(2)count

统计该rdd中元素的个数

(3)take(n)

返回该rdd中的前N个元素,如果该rdd的数据是有序的,那么take(n)就是Top N

(4)first

take(n)中比较特殊的一个take(1)(0)

(5)collect

将分布在集群中的各个partition中的数据拉回到driver中,进行统一的处理;但是这个算子有很大的风险存在,第一,driver内存压力很大,第二数据在网络中大规模的传输,效率很低;所以一般不建议使用,如果非要用,请先执行filter。

(6)reduce

reduce是一个action操作,reduceByKey是一个transformation。reduce对一个rdd执行聚合操作,并返回结果,结果是一个值。

(7)countByKey

统计key出现的次数

(8)saveAsTextFile

保存到文件,本质上是saveAsHadoopFile[TextOutputFormat[NullWritable, Text]]

(9)saveAsObjectFile和saveAsSequenceFile

saveAsObjectFile本质上是saveAsSequenceFile

(10)saveAsHadoopFile和saveAsNewAPIHadoopFile

这二者的主要区别就是OutputFormat的区别,接口org.apache.hadoop.mapred.OutputFormat,

抽象类org.apache.hadoop.mapreduce.OutputFormat   所以saveAshadoopFile使用的是接口OutputFormat,saveAsNewAPIHadoopFile使用的抽象类OutputFormat,建议使用后者。

8 高级排序

8.1 普通的排序

(1)sortByKey

object SortByKeyOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SortByKeyOps").setMaster("local[2]")val sc = new SparkContext(conf)//sortByKey 数据类型为k-v,且是按照key进行排序val stuRDD:RDD[Student] = sc.parallelize(List(Student(1, "refuel01", 19, 168),Student(2, "refuel02", 25, 175),Student(3, "refuel03", 25, 176),Student(4, "refuel04", 16, 180),Student(5, "refuel05", 18, 168.5)))//按照学生身高进行降序排序val height2Stu = stuRDD.map(stu => (stu.height, stu))//注意:sortByKey是局部排序,不是全局排序,如果要进行全局排序,// 必须将所有的数据都拉取到一台机器上面才可以val sorted = height2Stu.sortByKey(ascending = false, numPartitions = 1)sorted.foreach{case (height, stu) => println(stu)}sc.stop()}
}case class Student(id:Int, name:String, age:Int, height:Double)

(2)sortBy

这个sortBy其实使用sortByKey来实现,但是比sortByKey更加灵活,因为sortByKey只能应用在k-v数据格式上,而这个sortBy可以应在非k-v键值对的数据格式上面。

val sortedBy = stuRDD.sortBy(stu => stu.height,ascending = true,numPartitions = 1)(new Ordering[Double](){override def compare(x: Double, y: Double) = y.compareTo(x)},ClassTag.Double.asInstanceOf[ClassTag[Double]])
sortedBy.foreach(println)

sortedBy的操作,除了正常的升序,分区个数以外,还需需要传递一个将原始数据类型,提取其中用于排序的字段;并且提供用于比较的方式,以及在运行时的数据类型ClassTag标记型trait。

(3)takeOrdered

takeOrdered也是对rdd进行排序,但是和上述的sortByKey和sortBy相比较,takeOrdered是一个action操作,返回值为一个集合,而前两者为transformation,返回值为rdd。如果我们想在driver中获取排序之后的结果,那么建议使用takeOrdered,因为该操作边排序边返回。其实是take和sortBy的一个结合体。

takeOrdered(n),获取排序之后的n条记录

//先按照身高降序排序,身高相对按照年龄升序排 ---> 二次排序
stuRDD.takeOrdered(3)(new Ordering[Student](){override def compare(x: Student, y: Student) = {var ret = y.height.compareTo(x.height)if(ret == 0) {ret = x.age.compareTo(y.age)}ret}
}).foreach(println)

8.2 二次排序

所谓二次排序,指的是排序字段不唯一,有多个,共同排序

object SortByKeyOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SortByKeyOps").setMaster("local[2]")val sc = new SparkContext(conf)//sortByKey 数据类型为k-v,且是按照key进行排序val personRDD:RDD[Student] = sc.parallelize(List(Student(1, "refuel01", 19, 168),Student(2, "refuel02", 25, 175),Student(3, "refuel03", 25, 176),Student(4, "refuel04", 16, 180),Student(5, "refuel05", 18, 168.5)))personRDD.map(stu => (stu, null)).sortByKey(true, 1).foreach(p => println(p._1))sc.stop()}
}case class Person(id:Int, name:String, age:Int, height:Double) extends Ordered[Person] {//对学生的身高和年龄依次排序override def compare(that: Person) = {var ret = this.height.compareTo(that.height)if(ret == 0) {ret = this.age.compareTo(that.age)}ret}
}

8.3 分组TopN

在分组的情况之下,获取每个组内的TopN数据

object GroupSortTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupSortTopN").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/topn.txt")//按照科目进行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总val course2Infos:RDD[(String, Iterable[String])] = course2Info.groupByKey()//按照key进行分组//分组内的排序val sorted:RDD[(String, mutable.TreeSet[String])] = course2Infos.map{case (course, infos) => {val topN = mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})for(info <- infos) {topN.add(info)}(course, topN.take(3))}}sorted.foreach(println)sc.stop()}
}

8.4 优化分组TopN

上述在编码过程当中使用groupByKey,我们说着这个算子的性能很差,因为没有本地预聚合,所以应该在开发过程当中尽量避免使用,能用其它代替就代替。

(1)使用combineByKey优化1

object GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupSortByCombineByKeyTopN").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/topn.txt")//按照科目进行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总val course2Infos= course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)//分组内的排序val sorted:RDD[(String, mutable.TreeSet[String])] = course2Infos.map{case (course, infos) => {val topN = mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})for(info <- infos) {topN.add(info)}(course, topN.take(3))}}sorted.foreach(println)sc.stop()}def createCombiner(info:String): ArrayBuffer[String] = {val ab = new ArrayBuffer[String]()ab.append(info)ab}def mergeValue(ab:ArrayBuffer[String], info:String): ArrayBuffer[String] = {ab.append(info)ab}def mergeCombiners(ab:ArrayBuffer[String], ab1: ArrayBuffer[String]): ArrayBuffer[String] = {ab.++:(ab1)}
}

此时这种写法和上面的groupByKey性能一模一样,没有任何的优化。

(2)使用combineByKey的优化2

object GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupSortByCombineByKeyTopN").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/spark/topn.txt")//按照科目进行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总val sorted= course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)sorted.foreach(println)sc.stop()}def createCombiner(info:String): mutable.TreeSet[String] = {val ts = new mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})ts.add(info)ts}def mergeValue(ab:mutable.TreeSet[String], info:String): mutable.TreeSet[String] = {ab.add(info)if(ab.size > 3) {ab.take(3)} else {ab}}def mergeCombiners(ab:mutable.TreeSet[String], ab1: mutable.TreeSet[String]): mutable.TreeSet[String] = {for (info <- ab1) {ab.add(info)}if(ab.size > 3) {ab.take(3)} else {ab}}
}

 

9 持久化操作

9.1 为什要持久化

一个RDD如果被多次操作,为了提交后续的执行效率,我们建议对该RDD进行持久化操作。

9.2 如何进行持久化

rdd.persist()/cache()就完成了rdd的持久化操作,我们可以将该rdd的数据持久化到内存,磁盘,等等。

如果我们已经不再对该rdd进行一个操作,而此时程序并没有终止,可以卸载已经持久化的该rdd数据,rdd.unPersist()。

9.3 持久化策略

可以通过persist(StoreageLevle的对象)来指定持久化策略,eg:StorageLevel.MEMORY_ONLY。

持久化策略含义
MEMORY_ONLY(默认)rdd中的数据以未经序列化的java对象格式,存储在内存中。如果内存不足,剩余的部分不持久化,使用的时候,没有持久化的那一部分数据重新加载。这种效率是最高,但是是对内存要求最高的。
MEMORY_ONLY_SER就比MEMORY_ONLY多了一个SER序列化,保存在内存中的数据是经过序列化之后的字节数组,同时每一个partition此时就是一个比较大的字节数组。
MEMORY_AND_DISK和MEMORY_ONLY相比就多了一个,内存存不下的数据存储在磁盘中。
MEMEORY_AND_DISK_SER比MEMORY_AND_DISK多了个序列化。
DISK_ONLY就是MEMORY_ONLY对应,都保存在磁盘,效率太差,一般不用。
xxx_2就是上述多个策略后面加了一个_2,比如MEMORY_ONLY_2,MEMORY_AND_DISK_SER_2等等,就多了一个replicate而已,备份,所以性能会下降,但是容错或者高可用加强了。所以需要在二者直接做权衡。如果说要求数据具备高可用,同时容错的时间花费比从新计算花费时间少,此时便可以使用,否则一般不用。
HEAP_OFF(experimental)使用非Spark的内存,也即堆外内存,比如Tachyon,HBase、Redis等等内存来补充spark数据的缓存。

9.4 如何选择持久化策略

(1)如果要持久化的数据是可以在内存中进行保存,那么毫无疑问,选择MEMEORY_ONLY,因为这种方式的效率是最高的,但是在生成中往往要进行缓存的数据量还是蛮大的,而且因为数据都是未经序列化的java对象,所以很容易引起频繁的gc。

(2)如果上述满足不了,就退而求其次,MEMORY_ONLY_SER,这种方式增加的额外的性能开销就是序列化和反序列化,经过反序列化之后的对象就是纯java对象,因此性能还是蛮高的。

(3)如果还是扛不住,再退而求其次,MEMOEY_AND_DISK_SER,因为到这一步的话,那说明对象体积确实很多,为了提交执行效率,应该尽可能的将数据保存在内存,所以就对数据进行序列化,其次在序列化到磁盘。

(4)一般情况下DISK_ONLY,DISK_SER不用,效率太低,有时候真的不容从源头计算一遍。

(5)一般情况下我们都不用XXX_2,代备份的种种持久化策略,除非程序对数据的安全性要求非常高,或者说备份的对性能的消耗低于从头再算一遍,我们可以使用这种xxx_2以外,基本不用。

10 共享变量

10.1 概述

如果transformation使用到Driver中的变量,在executor中执行的时候,就需要通过网络传输到对应的executor,如果该变量很大,那么网络传输一定会成为性能的瓶颈。Spark就提供了两种有限类型的共享变量:累加器和广播变量

10.2 broadcast广播变量

广播变量:为每个task都拷贝一份变量,将变量包装成为一个广播变量(broadcast),只需要在executor中拷贝一份,在task运行的时候,直接从executor调用即可,相当于局部变量变成成员变量,性能就得到了提升。

val num:Any = xxxval numBC:Broadcast[Any] = sc.broadcast(num)调用:val n = numBC.value注意:该num需要进行序列化。

10.3 accumulator累加器

累加器的一个好处是,不需要修改程序的业务逻辑来完成数据累加,同时也不需要额外的触发一个action job来完成累加,反之必须要添加新的业务逻辑,必须要触发一个新的action job来完成,显然这个accumulator的操作性能更佳!

构建一个累加器val accu = sc.longAccumuator()累加的操作accu.add(参数)获取累加器的结果val ret = accu.value
val conf = new SparkConf()
.setAppName("AccumulatorOps")
.setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/data.txt")
val words = lines.flatMap(_.split("\\s+"))//统计每个单词出现的次数
val accumulator = sc.longAccumulatorval rbk = words.map(word => {if(word == "is")accumulator.add(1)(word, 1)
}).reduceByKey(_+_)
rbk.foreach(println)
println("================使用累加器===================")
println("is: " + accumulator.value)Thread.sleep(1000000)
sc.stop()

注意:累加器的调用,在action之后被调用,也就是说累加器必须在action触发之后;多次使用同一个累加器,应该尽量做到用完即重置;尽量给累加器指定name,方便我们在web-ui上面进行查看

10.4 自定义累加器

自定义一个类继承AccumulatorV2,重写方法

/*自定义累加器IN 指的是accmulator.add(sth.)中sth的数据类型OUT 指的是accmulator.value返回值的数据类型*/
class MyAccumulator extends AccumulatorV2[String, Map[String, Long]] {private var map = mutable.Map[String, Long]()/*** 当前累加器是否有初始化值* 如果为一个long的值,0就是初始化值,如果为list,Nil就是初始化值,是map,Map()就是初始化值*/override def isZero: Boolean = trueoverride def copy(): AccumulatorV2[String, Map[String, Long]] = {val accu = new MyAccumulatoraccu.map = this.mapaccu}override def reset(): Unit = map.clear()//分区内的数据累加 is: 5, of:4override def add(word: String): Unit = {if(map.contains(word)) {val newCount = map(word) + 1map.put(word, newCount)} else {map.put(word, 1)}
//        map.put(word, map.getOrElse(word, 0) + 1)}//多个分区间的数据累加override def merge(other: AccumulatorV2[String, Map[String, Long]]): Unit = {other.value.foreach{case (word, count) => {if(map.contains(word)) {val newCount = map(word) + countmap.put(word, newCount)} else {map.put(word, count)}
//            map.put(word, map.getOrElse(word, 0) + count)}}}override def value: Map[String, Long] = map.toMap
}

注册使用:

object _08AccumulatorOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("$AccumulatorOps").setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data.txt")val words = lines.flatMap(_.split("\\s+"))//注册val myAccu = new MyAccumulator()sc.register(myAccu, "myAccu")//统计每个单词出现的次数val pairs = words.map(word => {if(word == "is" || word == "of" || word == "a")myAccu.add(word)(word, 1)})val rbk = pairs.reduceByKey(_+_)rbk.foreach(println)println("=============累加器==========")myAccu.value.foreach(println)Thread.sleep(10000000)sc.stop()}
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473739.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 1320. 二指输入的的最小距离(动态规划)

文章目录1. 题目2. 解题1. 题目 二指输入法定制键盘在 XY 平面上的布局如上图所示&#xff0c;其中每个大写英文字母都位于某个坐标处&#xff0c; 例如字母 A 位于坐标 (0,0)&#xff0c;字母 B 位于坐标 (0,1)&#xff0c;字母 P 位于坐标 (2,3) 且字母 Z 位于坐标 (4,1)。 …

SparkStreaming基础

目录 SparkStreaming基础 1 流式计算 1.1 常见的离线和流式计算框架 2 SparkStreaming简介 2.1 核心概念DStream 2.2 工作原理 2.3 Storm&#xff0c;SparkStreaming和Flink的对比 2.4 如何选择流式处理框架 3 SparkStreaming实时案例 3.1 StreamingContext和Receiver…

【Kaggle微课程】Natural Language Processing - 1. Intro to NLP

文章目录1. 使用 spacy 库进行 NLP2. Tokenizing3. 文本处理4. 模式匹配练习&#xff1a;食谱满意度调查1 在评论中找到菜单项2 对所有的评论匹配3 最不受欢迎的菜4 菜谱出现的次数learn from https://www.kaggle.com/learn/natural-language-processing 1. 使用 spacy 库进行…

【Kaggle微课程】Natural Language Processing - 2.Text Classification

文章目录1. bag of words2. 建立词袋模型3. 训练文本分类模型4. 预测练习&#xff1a;1. 评估方法2. 数据预处理、建模3. 训练4. 预测5. 评估模型6. 改进learn from https://www.kaggle.com/learn/natural-language-processing NLP中的一个常见任务是文本分类。这是传统机器学…

Django框架—富文本编辑器

借助富文本编辑器&#xff0c;网站的编辑人员能够像使用offfice一样编写出漂亮的、所见即所得的页面此处以tinymce为例&#xff0c;其它富文本编辑器的使用也是类似的在虚拟环境中安装包 pip install django-tinymce2.6.0安装完成后&#xff0c;可以使用在Admin管理中&#xf…

Python基础(二)--数据类型,运算符与流程控制

目录 Python基础&#xff08;二&#xff09;--数据类型&#xff0c;运算符与流程控制 1 数据类型 1.1 Python中的数据类型 1.2 整数类型&#xff08;int&#xff09; 1.3 布尔类型 1.4 浮点类型 1.5 复数类型 1.6 类型转换 2 运算符 2.1 算术运算符 2.2 布尔运算符 …

【Kaggle微课程】Natural Language Processing - 3. Word Vectors

文章目录1. 词嵌入 Word Embeddings2. 分类模型3. 文档相似度练习&#xff1a;1. 使用文档向量训练模型2. 文本相似度learn from https://www.kaggle.com/learn/natural-language-processing 1. 词嵌入 Word Embeddings 参考博文&#xff1a;05.序列模型 W2.自然语言处理与词…

Django搜索工具——全文检索

全文检索不同于特定字段的模糊查询&#xff0c;使用全文检索的效率更高&#xff0c;并且能够对于中文进行分词处理haystack&#xff1a;全文检索的框架&#xff0c;支持whoosh、solr、Xapian、Elasticsearc四种全文检索引擎&#xff0c;点击查看官方网站whoosh&#xff1a;纯Py…

LeetCode 787. K 站中转内最便宜的航班(Dijkstra最短路径 + 优先队列)

文章目录1. 题目2. 解题1. 题目 有 n 个城市通过 m 个航班连接。每个航班都从城市 u 开始&#xff0c;以价格 w 抵达 v。 现在给定所有的城市和航班&#xff0c;以及出发城市 src 和目的地 dst&#xff0c;你的任务是找到从 src 到 dst 最多经过 k 站中转的最便宜的价格。 如…

Windows Phone 资源管理与换肤思考

Windows Phone 资源管理与换肤思考 原文 Windows Phone 资源管理与换肤思考 新入手一台Windows 8的笔记本&#xff0c;安装了VS2013后&#xff0c;终于又可以开发WP了。公司暂时不愿意开发WP&#xff0c;那么咱就自行研究吧&#xff01; 在没有WP开发环境的时候&#xff0c;曾经…

Django完成异步工具——celery

情景&#xff1a;用户发起request&#xff0c;并等待response返回。在本些views中&#xff0c;可能需要执行一段耗时的程序&#xff0c;那么用户就会等待很长时间&#xff0c;造成不好的用户体验&#xff0c;比如发送邮件、手机验证码等使用celery后&#xff0c;情况就不一样了…

Python基础(三)--序列

Python基础&#xff08;三&#xff09;--序列 1 序列相关的概念 1.1 什么是序列 序列是一种可迭代对象&#xff0c;可以存储多个数据&#xff0c;并提供数据的访问。 序列中的数据称为元素&#xff0c;Python内置的序列类型有&#xff1a;列表&#xff08;list&#xff09;…

项目上线最后工作——布署环境

当项目开发完成后&#xff0c;需要将项目代码放到服务器上&#xff0c;这个服务器拥有固定的IP&#xff0c;再通过域名绑定&#xff0c;就可以供其它人浏览&#xff0c;对于python web开发&#xff0c;可以使用wsgi、apache服务器&#xff0c;此处以wsgi为例进行布署服务器首先…

Python基础(四)--字典与集合

Python基础&#xff08;四&#xff09;--字典与集合 1 字典 1.1 什么是字典 字典提供的是一种映射存储的方式。字典分为两个部分&#xff0c;一个是键&#xff08;key&#xff09;&#xff0c;一个是key所关联的值&#xff08;value&#xff09;。&#xff0c;一个键关联&am…

[Kaggle] Spam/Ham Email Classification 垃圾邮件分类(spacy)

文章目录1. 导入包2. 数据预览2. 特征组合3. 建模4. 训练5. 预测练习地址&#xff1a;https://www.kaggle.com/c/ds100fa19 相关博文&#xff1a; [Kaggle] Spam/Ham Email Classification 垃圾邮件分类&#xff08;RNN/GRU/LSTM&#xff09; [Kaggle] Spam/Ham Email Classifi…

电商网站(Django框架)—— 大纲内容与基本功能分析

1. 项目架构 2. 数据库表结构 3. 数据库读写分离 4. Django读写分离配置 新建utils/db_router.py 课后阅读资料 http://python.usyiyi.cn/documents/django_182/topics/db/multi-db.html 5. 用户认证模型 注意&#xff1a; AUTH_USER_MODEL配置参数要在第一次迁移数据库之…

Python基础(五)--函数

目录 Python基础&#xff08;五&#xff09;--函数 1 函数的作用 1.1 函数定义与调用 1.2 函数的作用 1.3 空语句 2 参数与返回值 2.1 函数的参数 2.2 函数的返回值 2.3 返回多个值 3 参数的默认值 3.1 可选参数 3.2 参数的默认值 4 位置参数与关键字参数 4.1 关键…

LeetCode 1024. 视频拼接(动态规划/贪心)

文章目录1. 题目2. 解题2.1 动态规划2.2 贪心1. 题目 你将会获得一系列视频片段&#xff0c;这些片段来自于一项持续时长为 T 秒的体育赛事。这些片段可能有所重叠&#xff0c;也可能长度不一。 视频片段 clips[i] 都用区间进行表示&#xff1a;开始于 clips[i][0] 并于 clip…

电商网站(Django框架)—— 思维导图

1.用户模块&#xff1a;注册、登录、激活、退出、个人中心、地址 2.商品模块&#xff1a;首页、详情、列表、搜索 3.购物车&#xff1a; 增加、删除、修改、查询 4. 订单模块&#xff1a;确认订单页面、提交订单&#xff08;下单&#xff09;、请求支付、查询支付结果、评论 5.…

Python基础(六)--类与对象

目录 Python基础&#xff08;六&#xff09;--类与对象 1 类与对象的基本概念 1.1 什么是对象 1.2 什么是类 1.3 类与对象的关系 2 定义与初始化 2.1 类的定义 2.2 对象的初始化 2.3 动态增加属性方法 3 类成员 3.1 类属性与实例属性 3.2 类方法与实例方法 3.3 静态…