1、初识Spark
Spark是分布式的,主要基于内存的,适合迭代计算的大数据计算框架。注意基于内存:是优先考虑将数据放到内存中,因为在内存中具有更好的数据本地性,但是如果内存放不下也会放在磁盘上,或者部分数据放在磁盘上计算。所以Spark不仅能够计算内存放的下的数据,也可以计算内存中放不下的数据(Spark的真正生产环境,如果数据大于内存,需要考虑数据的放置策略以及性能调优的技巧)。
由于Spark底层是基于RDD(分布式弹性数据集)的抽象,所以不仅可以支持目前Spark已经支持的5种计算方式(流处理,SQL,图计算,机器学习等),还可以支持其他。
个人编写的程序会经过Driver驱动器,提交到集群中,在集群中的某些节点中运行。处理数据的来源有HDFS,HBase,Hive,传统的关系数据库,处理后的数据可以放到HDFS,HBase,Hive(使用数据仓库),DB,显示在客户端的输出端,s3等。
2、理解Spark核心的三个方面
2.1 分布式
分布式就是多台机器,当然Spark也可以像一个JVM进程一样Local模式,开发测试debug的时候在本地运行,但是生产环境一定是分布式多台机器运行。
分布式的多台机器运行,首先会有一个提交具体程序的Driver(或客户端),程序会
被提交给集群,集群中会有很多台机器,默认情况下,每台机器是一个节点。Spark程序提交到Spark集群中进行运行,程序本身会处理一批数据,分布式下,不同的节点会处理一部分数据,不同的节点之间的操作互不影响。这样分布式的处理会使得程序对数据的处理更节约时间。
分布式做并行化就相当于一个图书馆,有很多书,数据里面有书。如果是以前单机版本的,处理的时候就是线性的去数每一个书架。如果是分布式的,可能图书馆馆长(Cluster Manager)分配计算资源说找1000个人,每个人负责一个书架的书的数量计算,那这1000个人并行计算,速度非常快。这1000个人计算完之后就交给图书馆馆长,那他最后在进行统计。分布式由于应用了并行计算,所以肯定会处理更快。
2.2 主要基于内存
整个数据在进行计算的时候,肯定是希望数据是在内存中的,不希望在本地磁盘上,更不希望通过网络从远程机器上把数据抓过来,所以Spark优先考虑内存其实是对计算机资源最大化利用的物理机制。
每个节点的数据首先会被放于内存中,内存容量不足时,会被放到磁盘中。放于内存中的数据,经过第一个阶段的计算后,处理的结果可以继续在其他节点上进行下一个阶段的计算。这是迭代计算。
2.3 迭代计算
擅长迭代式计算是Spark真正的精髓,因为实际我们凡事对数据进行稍有价值的挖掘,或稍有复杂度的挖掘一定是对这个多步的计算。Spark天生就是适合分布式的主要基于内存的迭代式计算,当然也适合分布式基于磁盘的迭代式计算。
数据被存放在不同节点中,数据不移动,程序移动。程序在计算完第一个阶段后,进行shuffle,数据被移动到其他节点,shuffle过程的不同策略,导致第一个阶段处理的结果,例如某一个节点的数据会被分发到不同的节点,以便进行下一个阶段的计算。
3、Spark的架构中的基本组件
3.1 Driver
Driver是应用程序application运行的时候的核心,因为他负责了整个作业的调度,并且向master申请资源来完成具体作业。
运行Application的main()函数并创建SparkContext,本身是整个程序运行调度的核心,会有高层调度器DAGScheduler(把作业划分层几个小的阶段)和底层调度器TaskScheduler(每个阶段里面的任务该具体怎么去处理),还有schedulerbackend去管理整个集群为当前的程序分配的计算资源(本身就是executor)。
driver除了创建对象,也会向master注册当前的程序,如果注册没问题的话master会分配资源,下一步就是根据他的action触发这个job,job里面有一系列的RDD,从后往前回溯,如果发现是宽依赖的话就划分不同的Stage,把Stage提交给底层调度器TaskScheduler,TaskScheduler拿到这个任务的集合。因为一个Stage内部都是计算逻辑完全一样的任务,只是计算逻辑不一样而已,底层调度器就会根据数据的本地性把数据放到executor去执行。而且这个executor在任务运行结束或者出状况的时候肯定要向driver汇报,最后运行完毕的时候SparkContext关闭。
3.2 Application
应用程序application就是用户编写的spark代码打包后的jar包和相关依赖,包含了driver功能的代码,和分布在集群中多个节点的executor的代码。也就是应用程序有两个层面,一个是driver层面,一个是executor层面。driver是驱动executor工作的,executor是具体处理数据分片,内部是线程池并发的处理。driver层面的代码其实就是mian方法中new sparkConf然后配置,创建sparkContext,也就是sparkConf+sparkContext,基于sparkContext接下来就开始创建RDD了,这些代码是具体的业务实现,就是executor层面的代码
3.3 Cluster Manager
在standalone模式中即为Master主节点,控制整个集群,监控worker。在YARN模式中为资源管理器ResourceManager。
在粗粒度的资源分配方式在,spark程序application的运行不依赖于Cluster Manager。也就是说spark应用程序注册给Cluster Manager,注册如果是成功的Cluster Manager就提前分配好了资源,运行过程中不需要Cluster Manager的参与。所以Cluster Manager可插拔。
3.4 Worker
从节点,负责控制计算节点,启动Executor。在YARN模式中为NodeManager,负责计算节点的控制,启动的进程叫Container。
worker就是集群中任何可以运行application操作代码的节点。worker上是不会运行我们程序代码的,worker是管理当前节点内存CPU等资源的使用状况,会接收mater分配资源的指令,并通过executorRunner具体启动一个新进程,进程里面有executor。
worker管理当前NODE的资源并接受master指令来分配具体的计算资源Executor(在新的进程中分跑配)。他分配的时候会有一个ExecutorRunner,就是我们要分配一个新的进程来做计算的时候worker都会有一个ExecutorRunner,相当于一个Proxy管理具体新分配的进程,其实就是在ExecutorRunner中帮我们远程创建出新的进程。
Worker本身是个进程,不会向mater汇报当前机器的CPU,内存的等信息,worker发心跳主要只有一个作用workid,当前机器上的资源是我们应用程序在注册的时候,注册成功master就会给我们分配资源,分配的时候会记录这个资源。发心跳的时候不会汇报资源,只有在发生故障的时候说资源出现的情况。
3.5 Executor
执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executors。
executor是运行在worker节点上的为当前应用程序开启的一个进程里面的处理对象,这个对象负责具体task运行,是线程池并发执行和线程复用的方式。线程池中的每个线程可以运行一个任务,然后任务运行完回收到池子进行线程复用。(这就比Hadoop的MapReduce好多了,需要开启JVM执行完了其中一个Map或Reduce不能复用JVM,而且JVM比较重量级)。而spark默认在一个节点上为程序开启一个JVM进程,这个JVM进程里面是线程池的方式,通过线程处理具体的task任务。
一个worker默认会为当前应用程序开辟一个executor,当然可以配置多个。executor线程池中的线程运行task的时候,task肯定要从磁盘或者内存中读写数据。每个application都有自己独立的一批executor。
executor配置多少看情况,如只有一个executor处理作业,占据了大量的CPUcore,但是资源闲置,这是巨大的资源浪费,另外一方面由于CPUcore个数是有限的,而在特定个CPUcore的时候只有一个executor如果数据比较大的情况下容易内存溢出OOM,这个时候就要分成几个executor。
4 spark提交程序流程
Spark的driver的运行有2种模式,一种是Client模式,一种是cluster模式。默认是Client模式(因为Client模式的时候可以看见跟多交互式日志的信息,就是运行过程的信息),如果指定为模式cluster模式,这样真正的driver就会在worker中的一台机器上,在哪台有master决定。
首先构建Spark Application的运行环境(启动SparkContext),SparkContext里面最重要的是做3件事情:①创建DAGScheduler(划分Satge)②创建TaskScheduler(负责一个Stage内部作业运行)③创建SchedulerBackend(计算资源)。在实例化过程中向master注册当前应用程序,master接收注册,如果没有问题会为当前程序分配APPid并分配计算资源(从3个地方获取①spark-env.sh或spark-default.sh②saprk-submit的时候提供的参数③程序中saprkconf配置的参数)。
然后Cluster Manager接收用户提交的程序并发送指令给Worker为当前应用程序分配计算资源,每个Worker所在节点默认为当前应用程序分配一个Executor,在Executor中通过线程池并发执行。
然后Worker进程通过一个proxy为ExecutorRunner的对象实例远程启动ExecutorBackend进程,ExecutorBackend进程里面有Executor。
分配完资源之后,下一步就是通过action触发具体的job,这时候DAGScheduler会把job中的RDD构成的DAG划分成不同的Stage,每个Stage内部是一系列业务逻辑完全相同但是处理数据不同的Tasks,构成TaskSet。TaskSet会交给TaskScheduler和Schedulerbackend负责具体task的运行(遵循数据本地性)。每个Task会计算RDD中的一个Partition,基于Partition来执行具体我们定义的一系列同个Stage内部的函数,依次类推知道整个程序运行完成。
Task有两种类型:ResultTask和ShuffleMapTask:最后一个Stage中的task为ResultTask产生job的结果,其他前面的Stage中的task都是ShuffleMapTask为下一个阶段的Stage做数据准备。
总结:①首先构建Spark Application的运行环境(启动SparkContext),在实例化过程中向master注册当前应用程序,master接收注册,如果没有问题会为当前程序分配APPid并分配计算资源②master接收用户提交的程序并发送指令给Worker为当前应用程序分配计算资源③Worker进程通过ExecutorRunner的对象启动ExecutorBackend进程,ExecutorBackend进程里面有Executor④分配完资源之后,下一步就是通过action触发具体的job,这时候DAGScheduler会把job中的RDD构成的DAG划分成不同的Stage,每个Stage内部是一系列业务逻辑完全相同但是处理数据不同的Tasks,构成TaskSet。TaskSet会交给TaskScheduler和Schedulerbackend负责具体task的运行,每个Task会计算RDD中的一个Partition,基于Partition来执行具体我们定义的一系列同个Stage内部的函数,依次类推知道整个程序运行完成。