http://ihoge.cn/2018/IntroductionToSpark.html
Spark的基本架构
当单机没有足够的能力和资源来执行大量信息的计算(或者低延迟计算),这时就需要一个集群或一组机器将许多机器的资源集中在一起,使我们可以使用全部累积的在一起的计算和存储资源。现在只有一组机器不够强大,你需要一个框架来协调他们之间的工作。 Spark是一种工具,可以管理和协调跨计算机集群执行数据任务。
Spark用于执行任务的机器集群可以由Spark的Standalone,YARN或Mesos等集群管理器进行管理。然后,我们向这些集群管理器提交Spark应用程序,这些集群管理器将资源授予我们的应用程序,以便我们完成我们的工作。
1. Spark Application
Spark应用程序由一个驱动程序进程和一组执行程序进程组成。Driver进程运行main()函数,位于集群中的一个节点上,它负责三件事:维护Spark应用程序的相关信息;回应用户的程序或输入;分配和安排Executors之间的工作。驱动程序过程是绝对必要的 - 它是Spark应用程序的核心,并在应用程序的生命周期中保留所有相关信息。
Executor负责实际执行Driver分配给他们的工作。这意味着,每个Executor只有两个任务:执行由驱动程序分配给它的代码,并将该执行程序的计算状态报告给驱动程序节点。
群集管理器控制物理机器并为Spark应用程序分配资源。这可以是几个核心集群管理员之一:Spark的Standalone,YARN或Mesos。这意味着可以同时在群集上运行多个Spark应用程序。
在前面的插图中,左侧是我们的driver,右侧是四个executors。在该图中,我们删除了群集节点的概念。用户可以通过配置指定有多少执行者应该落在每个节点上。
- Spark有一些集群管理器,负责调度可用资源。
- 驱动程序进程负责执行执行程序中的驱动程序命令,以完成我们的任务。
2. Spark’s APIs
尽管我们的executor大多会一直运行Spark代码。但我们仍然可以通过Spark的语言API用多种不同语言运行Spark代码。大多数情况下,Spark会在每种语言中提供一些核心“concepts”,并将不同语言的代码译成运行在机器集群上的Spark代码。
Spark有两套基本的API:低级非结构化(Unstructured)API和更高级别的结构化(Structured)API。
3. SparkSession
我们通过驱动程序来控制Spark应用程序。该驱动程序进程将自身作为名为SparkSession并作为唯一的接口API对象向用户开放。 SparkSession实例是Spark在群集中执行用户定义操作的方式。 SparkSession和Spark应用程序之间有一对一的对应关系。在Scala和Python中,变量在启动控制台时可用作spark。让我们看下简单的Scala和/或Python中的SparkSession。
4. Dataframe
DataFrame是最常见的Structured API
(结构化API),只是表示有类型的包含行和列的数据表
。一个简单的比喻就是一个带有命名列的电子表格。其根本区别在于,当电子表格位于一台计算机上某个特定位置时,Spark DataFrame可以跨越数千台计算机。将数据放在多台计算机上的原因无非有两种:数据太大而无法放在一台计算机上,或者在一台计算机上执行计算所需的时间太长。
DataFrame概念并不是Spark独有的。 R和Python都有相似的概念。但是,Python / R DataFrame(有一些例外)存在于一台机器上,而不是多台机器上。这限制了您可以对python和R中给定的DataFrame执行的操作与该特定机器上存在的资源进行对比。但是,由于Spark具有适用于Python和R的Spark’s Language APIs
,因此将Pandas(Python)DataFrame转换为Spark DataFrame和R DataFrame转换为Spark DataFrame(R)非常容易。
注意
Spark有几个核心抽象:Datasets, DataFrames, SQL Tables,和弹性分布式数据集(RDD)。这些抽象都表示分布式数据集合,但它们有不同的接口来处理这些数据。最简单和最有效的是DataFrames,它可以用于所有语言。
5. Partitions
为了允许每个执行者并行执行工作,Spark将数据分解成称为分区的块。分区是位于集群中的一台物理机上的一组行。 DataFrame的分区表示数据在执行过程中如何在整个机器群中物理分布。如果你有一个分区,即使你有数千个执行者,Spark也只会有一个分区。如果有多个分区,但只有一个执行程序Spark仍然只有一个并行性,因为只有一个计算资源。
值得注意的是,使用DataFrames,我们不会(大部分)操作 手动分区(基于个人)。我们只需指定物理分区中数据的高级转换,并且Spark确定此工作将如何在集群上实际执行。较低级别的API确实存在(通过弹性分布式数据集接口)。
6. Transformations
在Spark中,核心数据结构是不可改变的,这意味着一旦创建它们就不能更改。起初,这可能看起来像一个奇怪的概念,如果你不能改变它,你应该如何使用它?为了“更改”DataFrame,您必须指示Spark如何修改您所需的DataFrame。这些说明被称为转换
。
转换操作没有返回输出,这是因为我们只指定了一个抽象转换,并且Spark不会在转换之前采取行动,直到我们执行一个动作。Transformations是如何使用Spark来表达业务逻辑的核心。Spark有两种类型的Transformations,一种是窄依赖转换关系,一种是宽依赖转换关系。
宽依赖指输入分区对多输出分区起作用(多个孩子)。这被称为shuffle,Spark将在群集之间交换分区。对于窄依赖转换,Spark将自动执行称为流水线的操作,这意味着如果我们在DataFrame上指定了多个过滤器,它们将全部在内存中执行。当我们执行shuffle时,Spark会将结果写入磁盘。
7. Lazy Evaluation
Lazy Evaluation意味着Spark将等到执行计算指令图的最后时刻。在Spark中,我们不是在表达某些操作时立即修改数据,而是建立起来应用于源数据的转换计划。Spark将把原始DataFrame转换计划编译为一个高效的物理计划,该计划将在群集中尽可能高效地运行。这为最终用户带来了巨大的好处,因为Spark可以优化整个数据流从端到端。这方面的一个例子就是所谓的“predicate pushdown” DataFrames。如果我们构建一个大的Spark作业,但在最后指定了一个过滤器,只需要我们从源数据中获取一行,则执行此操作的最有效方法就是访问我们需要的单个记录。 Spark实际上会通过自动推低滤波器来优化这一点。
8. Actions
转换使我们能够建立我们的逻辑计划。为了触发计算,我们需要一个动作操作。一个动作指示Spark计算一系列转换的结果。
在指定我们的操作时,我们开始了一个Spark作业,它运行我们的过滤器转换(一个窄依赖转换),然后是一个聚合(一个宽依赖转换),它在每个分区的基础上执行计数,然后一个collect将我们的结果带到各自语言的本地对象。我们可以通过检查Spark UI(http://localhost:4040)来看到所有这些,Spark UI是一个包含在Spark中的工具,它允许我们监视集群上运行的Spark作业。
9. Dataframe & SQL
Spark SQL是Spark为结构化和半结构化数据处理设计的最受欢迎的模块之一。 Spark SQL允许用户使用SQL或可在Java,Scala,Python和R中使用的DataFrame和Dataset API来查询Spark程序中的structured data。由于DataFrame API提供了一种统一的方法来访问各种的数据源(包括Hive datasets,Avro,Parquet,ORC,JSON和JDBC),用户能够以相同方式连接到任何数据源,并将这些多个数据源连接在一起。 Spark SQL使用Hive meta store为用户提供了与现有Hive数据,查询和UDF完全兼容的功能。用户可以无缝地 在Spark上无需修改即可运行其当前的Hive工作负载。
Spark SQL也可以通过spark-sql shell来访问,现有的业务工具可以通过标准的JDBC和ODBC接口进行连接。
现在我们通过一个示例并在DataFrame和SQL中进行跟踪。不管语言如何,以完全相同的方式启动相同的转换。您可以在SQL或DataFrames(R,Python,Scala或Java)中表达业务逻辑,并且在实际执行代码之前,Spark会将该逻辑编译计划优化并最终生成最优的物理计划。 Spark SQL允许您作为用户将任何DataFrame注册为表或视图(临时表),并使用纯SQL查询它。编写SQL查询或编写DataFrame代码之间没有性能差异 都“编译”到我们在DataFrame代码中指定的相同底层计划。
通过一个简单的方法调用就可以将任何DataFrame制作成表格或视图。
With SQl
With DataFrame
现在有7个步骤将我们带回源数据。您可以在这些DataFrame的解释计划中看到这一点。以上图解说明了我们在“代码”中执行的一系列步骤。真正的执行计划(解释中可见的执行计划)将与下面的执行计划有所不同,因为在物理执行方面进行了优化,然而,该执行计划与任何计划一样都是起点。这个执行计划是一个有向无环图(DAG)的转换,每个转换产生一个新的不可变DataFrame,我们在这个DataFrame上调用一个动作来产生一个结果。
1. 第一步是读取数据。但是Spark实际上并没有读取它(Lazy Evaluation)
2. 第二步是我们的分组,在技术上,当我们调用groupBy时,我们最终得到了一个RelationalGroupedDataset,它是DataFrame的一个奇特名称,该DataFrame具有指定的分组,但需要用户在可以进一步查询之前指定聚合。
3. 因此第三步是指定聚合。我们使用总和聚合方法。这需要输入一列 表达式或简单的列名称。 sum方法调用的结果是一个新的dataFrame。你会看到它有一个新的模式,但它知道每个列的类型。(再次强调!)这里没有执行计算是非常重要的。这只是我们表达的另一种转换,Spark仅仅能够跟踪我们提供的类型信息。
4. 第四步是简化语言,我们使用withColumnRename给原始列重新定义新名称。当然,这不会执行计算 - 这只是另一种转换!
5. 第五步导入一个函数对数据进行排序,即desc函数。从destination_total列中找到的最大值。
6. 第六步,我们将指定一个限制。这只是说明我们只需要五个值。这就像一个过滤器,只是它按位置而不是按值过滤。可以肯定地说,它基本上只是指定了一定大小的DataFrame。
7. 最后一步是我们的行动!现在我们实际上开始收集上面的DataFrame结果的过程,Spark将以我们正在执行的语言返回一个列表或数组。现在我们看下它的解释计划。
虽然这个解释计划与我们确切的“概念计划”不符,但所有的部分都在那里。可以看到limit语句以及orderBy(在第一行)。你也可以看到我们的聚合是如何在partial_sum调用中的两个阶段发生的。这是因为数字列表是可交换的,并且Spark可以执行sum()并按分区进行划分。当然,我们也可以看到我们如何在DataFrame中读取数据。同时我们也可以将它写出到Spark支持的任何数据源中。例如,假设我们想要将这些信息存储在PostgreSQL等数据库中,或者将它们写入另一个文件。
本文永久地址,转载注明出处!
http://ihoge.cn/2018/IntroductionToSpark.html