从Spark SQL 底层架构可以看到,我们写的SQL语句,经过一个优化器(Catalyst)处理,转化为可执行的RDD,提交给集群执行。
SQL到RDD中间经过了一个Catalyst,它便是Spark SQL的核心,是针对Spark SQL语句执行过程中的查询优化框架,基于Scala函数式编程结构。
1、SparkSql执行架构
Catalyst的工作流程是一条SQL语句生成执行引擎可识别的程序,就离不开解析(Parser)
、优化(Optimizer)
、执行(Execution)
这三大过程。而Catalyst优化器在执行计划生成和优化的工作时候,它离不开自己内部的五大组件,如下所示:
(1)Parser模块:将SparkSql字符串解析为一个抽象语法树/AST。
(2)Analyzer模块:该模块会遍历整个AST,并对AST上的每个节点进行数据类型的绑定以及函数绑定,然后根据元数据信息Catalog对数据表中的字段进行解析。
(3)Optimizer模块:该模块是Catalyst的核心,主要分为RBO和CBO两种优化策略,其中RBO是基于规则优化,CBO是基于代价优化。
(4)Planner模块:优化后的逻辑执行计划OptimizedLogicalPlan依然是逻辑的,并不能被Spark系统理解,此时需要将OptimizedLogicalPlan转换成physical plan(物理计划) 。
(5)CostModel模块:主要根据过去的性能统计数据,选择最佳的物理执行计划,这个过程的优化就是CBO(基于代价优化)。
2、SQL执行过程
为了更好的对整个过程进行理解,下面通过简单的实例进行解释。
2.1、 Parser阶段:未解析的逻辑计划
Parser简单说就是将SQL字符串切分成一个一个的Token,再根据一定语义规则解析成一颗语法树。Parser模块目前都是使用第三方类库ANTLR进行实现的,包括我们熟悉的Hive、Presto、SparkSQL等都是由ANTLR实现的。
在这个过程中,会判断SQL语句是否符合规范,比如select from where 等这些关键字是否写对。
2.2、Analyzer阶段:解析后的逻辑计划
通过解析后的逻辑计划基本有了骨架,此时需要基本的元数据信息来表达这些词素,最重要的元数据信息主要包括两部分:表的Scheme和基本函数信息,表的Scheme主要包括表的基本定义(列名、数据类型)、表的数据格式(Json、Text)、表的物理位置等,基本函数主要指类信息。
Analyzer会再次遍历整个语法树,对树上的每个节点进行数据类型绑定及函数绑定,比如people词素会根据元数据表信息解析为包含age、id以及name三列的表,people.age会被解析为数据类型的int的变量,sum被解析为特定的聚合函数。
2.3、Optimizer模块:优化过的逻辑计划
Optimizer优化模块是整个Catalyst的核心,上面提到优化器分为基于规则的优化(RBO)和基于代价优化(CBO)两种。基于规则的优化策略实际上就是对语法树进行一次遍历,模式匹配能够满足特定规则的节点,在进行相应的等价转换。下面介绍三种常见的规则:谓词下推(Predicate Pushdown) 、常量累加(Constant Folding) 、列值裁剪(Column Pruning) 。
-
谓词下推(Predicate Pushdown)
上图左边是经过解析后的语法树,语法树中两个表先做join,之后在使用age>10进行filter。join算子是一个非常耗时的算子,耗时多少一般取决于参与join的两个表的大小,如果能够减少参与join两表的大小,就可以大大降低join算子所需的时间。
谓词下推就是将过滤操作下推到join之前进行,之后再进行join的时候,数据量将会得到显著的减少,join耗时必然降低。 -
常量累加(Constant Folding)
常量累加就是比如计算x+(100+80)->x+180,虽然是一个很小的改动,但是意义巨大。如果没有进行优化的话,每一条结果都需要执行一次100+80的操作,然后再与结果相加。优化后就不需要再次执行100+80操作。
列值裁剪(Column Pruning)
列值裁剪是当用到一个表时,不需要扫描它的所有列值,而是扫描只需要的id,不需要的裁剪掉。这一优化一方面大幅度减少了网络、内存数据量消耗,另一方面对于列式存储数据库来说大大提高了扫描效率。
2.4、SparkPlanner模块:转化为物理执行计划
根据上面的步骤,逻辑执行计划已经得到了比较完善的优化,然而,逻辑执行计划依然没办法真正执行,他们只是逻辑上可行,实际上Spark并不知道如何去执行这个东西。比如join是一个抽象概念,代表两个表根据相同的id进行合并,然而具体怎么实现合并,逻辑执行计划并没有说明。
此时就需要将逻辑执行计划转化为物理执行计划,也就是将逻辑上可行的执行计划变为Spark可以真正执行的计划。比如join算子,Spark根据不同场景为该算子制定了不同的算法策略,有BroadcastHashJoin、ShuffleHashJoin以及SortMergejoin等,物理执行计划实际上就是在这些具体实现中挑选一个耗时最小的算法实现,怎么挑选,下面简单说下:
实际上SparkPlanner对优化后的逻辑计划进行转换,是生成了多个可以执行的物理计划Physical Plan;
接着CBO(基于代价优化)
优化策略会根据Cost Model
算出每个Physical Plan的代价,并选取代价最小的 Physical Plan作为最终的Physical Plan。
以上的步骤合起来,就是Catalyst优化器!
2.5、执行物理计划
在最终真正执行物理执行计划之前,还要进行 Preparations 规则处理,将SQL转化为DAG,最后调用 SparkPlan 的 execute(),执行物理计划计算 RDD。
final def execute(): RDD[InternalRow] = executeQuery {if (isCanonicalizedPlan) {throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")}doExecute()}
3、Catalyst 的优化
针对提交给Spark的SQL语句引擎会进行自动优化,这里在总结下Catalyst优化器的两个重要的优化。
3.1、RBO:基于规则的优化
(1)标准优化规则
- 过滤推断前的算子优化-operatorOptimizationRuleSet。
- 过滤推断-Infer Filters。
- 过滤推断后的算子优化-operatorOptimizationRuleSet。
- 下推join的额外谓词-Push extra predicate through join。
- 算子下推(Operator push down)-Project、Join、Limit、列剪裁。
- 算子合并(Operator combine)-Repartition、Project、Window、Filter、Limit、Union。
- 常量折叠和强度消减(Constant folding and strength reduction)-Repartition、Window、Null、常量、In、Filter、整数类型、Like、Boolean、if/case、二义性、no-op、struct、取值操作(struct/array/map)、csv/json、Concat。
- analysis 阶段的收尾规则-Finish Analysis,比如EliminateSubqueryAliases实际是在Analyzer里定义的。
- 算子优化前-Union、Limit、数据库关系、子查询、算子的替代、聚合算子。
- 算子优化-operatorOptimizationBatch
- 依赖统计数的优化规则-Project、Filter、Join、Sort、Decimal、Aggregate、对象表达式、数据库关系、笛卡尔积、子查询、Float、Struct。
(2)其他特殊的优化规则
rule【规则】 | batch【表示一组同类的规则】 | strategy【迭代策略】 | 注释 |
---|---|---|---|
【算子下推】PushProjectionThroughUnion | Operator Optimization after Inferring Filters | fixedPoint | 将Project操作符推送到Union操作符的两侧。可安全下推的操作如下所示。Union:现在,Union就意味着Union ALL,它不消除重复行。因此,通过它下推Filter和Project是安全的。下推Filter是由另一个规则PushDownPredicates处理的。一旦我们添加了UNION DISTINCT,我们就无法下推Project了。 |
【算子下推】ReorderJoin | Operator Optimization after Inferring Filters | fixedPoint | 重新排列Join,并将所有条件推入Join,以便底部的条件至少有一个条件。如果所有Join都已具有至少一个条件,则Join的顺序不会更改。如果启用了星型模式检测,请基于启发式重新排序星型Join计划。 |
【算子下推】EliminateOuterJoin | Operator Optimization after Inferring Filters | fixedPoint | 1.消除outer join,前提是谓词可以限制结果集,以便消除所有空行:如果两侧都有这个谓词,full outer -> inner;如果右侧有这个谓词,left outer -> inner;如果左侧有这个谓词,right outer -> inner;当且仅当左侧有这个谓词,full outer -> left outer;当且仅当右侧有这个谓词,full outer -> right outer 2.如果outer join仅在流侧具有distinct,则移除outer join:SELECT DISTINCT f1 FROM t1 LEFT JOIN t2 ON t1.id = t2.id ==> SELECT DISTINCT f1 FROM t1。当前规则应该在谓词下推之前执行。 |
【算子下推】PushPredicateThroughJoin | Operator Optimization after Inferring Filters | fixedPoint | 针对Join+on和Join+where这两种情况进行的优化。优化的目的是为了把过滤条件尽量下推到数据源读取时,减少数据传输和join时的数据量,从而提升性能。要注意的是,若使用的是FullOuter,则在这两种情况下该规则都不会进行优化 |
【算子下推】PushDownPredicates | Operator Optimization after Inferring Filters | fixedPoint | 常规的运算符和Join谓词下推的统一版本。此规则提高了级联join(例如:Filter-Join-Join-Join)的谓词下推性能。大多数谓词可以在一次传递中下推。 |
【算子下推】LimitPushDown | Operator Optimization after Inferring Filters | fixedPoint | 下推UNION ALL和JOIN下的LocalLimit。 |
【列裁剪】ColumnPruning | Operator Optimization after Inferring Filters | fixedPoint | 试图消除查询计划中不需要的列读取。由于在Filter之前添加Project会和PushPredicatesThroughProject冲突,此规则将以以下模式删除Project p2:p1 @ Project(, Filter(, p2 @ Project(_, child))) if p2.outputSet.subsetOf(p2.inputSet) p2通常是按照这个规则插入的,没有用,p1无论如何都可以删减列。 |
【过滤推断】InferFiltersFromConstraints | Infer Filters | Once | 基于运算符的现有约束生成附加过滤器的列表,但删除那些已经属于运算符条件一部分或属于运算符子节点约束一部分的过滤器。这些筛选器当前插入到Filter运算符的和Join运算符任一侧的现有条件中。注意:虽然这种优化适用于许多类型的join,但它主要有利于Inner Join和LeftSemi Join。 |
【算子合并】CollapseRepartition | Operator Optimization after Inferring Filters | fixedPoint | 合并相邻的RepartitionOperation和RebalancePartitions运算符 |
【算子合并】CollapseProject | Operator Optimization after Inferring Filters | fixedPoint | 两个Project运算符合并为一个别名替换,在以下情况下,将表达式合并为一个表达式。1.两个Project运算符相邻时。2.当两个Project运算符之间有LocalLimit/Sample/Repartition运算符,且上层的Project由相同数量的列组成,且列数相等或具有别名时。同时也考虑到GlobalLimit(LocalLimit)模式。 |
【算子合并】CollapseWindow | Operator Optimization after Inferring Filters | fixedPoint | 折叠相邻的Window表达式。如果分区规格和顺序规格相同,并且窗口表达式是独立的,且属于相同的窗口函数类型,则折叠到父节点中。 |
【算子合并】CombineFilters | Operator Optimization after Inferring Filters | fixedPoint | 将两个相邻的Filter运算符合并为一个,将非冗余条件合并为一个连接谓词。 |
-其他- | -其他- | -其他- | -其他- |
val operatorOptimizationRuleSet =Seq(// Operator push downPushProjectionThroughUnion,ReorderJoin,EliminateOuterJoin,PushPredicateThroughJoin,PushDownPredicate,LimitPushDown,ColumnPruning,InferFiltersFromConstraints,// Operator combineCollapseRepartition,CollapseProject,CollapseWindow,CombineFilters,CombineLimits,CombineUnions,// Constant folding and strength reductionNullPropagation,ConstantPropagation,FoldablePropagation,OptimizeIn,ConstantFolding,ReorderAssociativeOperator,LikeSimplification,BooleanSimplification,SimplifyConditionals,RemoveDispensableExpressions,SimplifyBinaryComparison,PruneFilters,EliminateSorts,SimplifyCasts,SimplifyCaseConversionExpressions,RewriteCorrelatedScalarSubquery,EliminateSerialization,RemoveRedundantAliases,RemoveRedundantProject,SimplifyExtractValueOps,CombineConcats) ++extendedOperatorOptimizationRules
下面简单介绍优化的点:谓词下推、列裁剪、常量累加等
- 谓词下推案例:
select
*
from
t_table1 a
join
t_table2 b
on a.id=b.id
where a.age>20 and b.cid < 100
上面的语句会自动优化为如下所示:
select
*
from
(select * from t_table1 where age>20) a
join
(select * from t_table2 where cid<100) b
on a.id=b.id
就是在子查询阶段就提前将数据进行过滤,后期join的shuffle数据量就大大减少。
- 列裁剪案例:
select
a.name, a.age, b.cid
from
(select * from t_table1 where age>20) a
join
(select * from t_table2 where cid<100) b
on a.id=b.id
上面的语句会自动优化为如下所示:
select
a.name, a.age, b.cid
from
(select name, age, id from t_table1 where age>20) a
join
(select id, cid from t_table2 where cid<100) b
on a.id=b.id
就是提前将需要的列查询出来,其他不需要的列裁剪掉。
- 常量累加:
select 1+1 as id from t_table1
上面的语句会自动优化为如下所示:
select 2 as id from t_table1
就是会提前将1+1计算成2,再赋给id列的每行,不用每次都计算一次1+1。
3.2、CBO:基于代价的优化
就是在SparkPlanner对优化后的逻辑计划生成了多个可以执行的物理计划Physical Plan之后,多个物理执行计划基于Cost Model选取最优的执行耗时最少的那个物理计划。
4、RDD线程模型
RDD是基于内存模型的分布式数据对象集合,是一组分区数据。
线程模型中,一个Executor可以运行Task的个数取决于Executor的Core数量,Task是Spark独立的计算单元
,RDD计算时以分区数据为单位
,如下图所示。
执行一条SQL时,会生成一个Job,可以在Spark UI查看到。
(1)Stage划分
Spark任务会计算RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分成相互依赖的多个stage,而划分stage的依据便是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每个stage包含一个或多个task任务,然后将这些task以taskSet的形式提交给TaskScheduler运行。
stage实际是由一组并行的task组成,stage切割规则:从后往前,遇到宽依赖就切割stage
。看两个 RDD 的分区之间,是不是一对一的关系,若是则为窄依赖,反之则为宽依赖
。
- 宽依赖
父 RDD 中每个分区的数据都可以被子 RDD 的多个分区使用(涉及到了shuffle),常用的算子有:join(非hash-partitioned)、groupByKey、partitionBy; - 窄依赖
父 RDD 中每个分区的数据最多只能被子 RDD 的一个分区使用,常用的算子有:map、filter、union、join(hash-partitioned)、mapPartitions。
(2)RDD分区
key-value数据默认使用HashParttioner分区,spark.default.parallelism参数用于设置RDD的初始分区数
,以及设置默认的Task数量,参数一般设置Executor的core总数的2-3倍。