本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何商业交流,可随时联系。
1 Spark SQL 坚实后盾DataFrame
- DataFrame是一个分布式数据容器,更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。
- JSON schema自动推导
- Hive风格分区表自动识别
- 充分利用RCFile、ORC、Parquet等列式存储格式的优势,仅扫描查询真正涉及的列,忽略其余列的数据。
- 聚合统计函数支持
2 Spark SQL 源码包结构(溯本逐源)
主要分为4类:
- core模块:处理数据的输入输出,比如:把不同数据源(RDD,json,Parquet等)获取到数据,并将查询结果输出到DataFrame。
- catalyst模块:处理SQL语句的整个过程,包括解析,绑定,优化,物理计划等查询优化。
- hive模块:对hive数据进行处理。
- hive-ThriftServer:提供CLI以及JDBC和ODBC接口。
3 Spark SQL catalyst模块设计思路
(详细请参看我的SparkSQL源码解析内容)
catalyst主要组件有
- sqlParse => sql语句的语法解析
- Analyzer => 将不同来源的Unresolved Logical Plan和元数据(如hive metastore、Schema catalog)进行绑定,生成resolved Logical Plan
- optimizer => 根据OptimizationRules,对resolvedLogicalPlan进行合并、列裁剪、过滤器下推等优化作业而转换成optimized Logical Plan
- Planner => LogicalPlan转换成PhysicalPlan
- CostModel => 根据过去的性能统计数据,选择最佳的物理执行计划
4 Hash Join的衍生(剑走偏锋)
4.1 Hash join 设计思路剖析(总领全局)
- 第一步:一般情况下,streamIter为大表,buildIter为小表,不用关心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。
- 第二步:根据buildIter Table的join key构建Hash Table,把每一行记录都存进HashTable,位于内存中。
- 第三步:扫描streamIter Table 每一行数据,使用相同的hash函数匹配 Hash Table中的记录,匹配成功之后再检查join key 是否相等,最后join在一起
- 总结 : hash join 只扫描两表一次,可以认为运算复杂度为o(a+b),效率非常高。笛卡尔集运算复杂度为a*b。另外,构建的Hash Table最好能全部加载在内存,效率最高,这就决定了hash join算法只适合至少一个小表的join场景,对于两个大表的join场景并不适用。
4.2 broadcast Hash join 设计思路剖析(大表join极小表)
-
第一步:一般情况下,streamIter为大表,buildIter为小表,不用关心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。
-
第二步: 先把小表广播到所有大表分区所在节点,然后根据buildIter Table的join key构建Hash Table,把每一行记录都存进HashTable
-
第三步:扫描streamIter Table 每一行数据,使用相同的hash函数匹配 Hash Table中的记录,匹配成功之后再检查join key 是否相等,最后join在一起
-
总结 : hash join 只扫描两表一次,可以认为运算复杂度为o(a+b)。
-
调优
1 buildIter总体估计大小超过spark.sql.autoBroadcastJoinThreshold设定的值,即不满足broadcast join条件2 开启尝试使用hash join的开关,spark.sql.join.preferSortMergeJoin=false3 每个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold设定的值,即shuffle read阶段每个分区来自buildIter的记录要能放到内存中4 streamIter的大小是buildIter三倍以上 复制代码
4.2 shuffle Hash join 设计思路剖析(大表join小表)
- 第一步:一般情况下,streamIter为大表,buildIter为小表,不用关心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。
- 第二步: 将具有相同性质的(如Hash值相同)join key 进行Shuffle到同一个分区。
- 第三步:先把小表广播到所有大表分区所在节点,然后根据buildIter Table的join key构建Hash Table,把每一行记录都存进HashTable
- 第四步:扫描streamIter Table 每一行数据,使用相同的hash函数匹配 Hash Table中的记录,匹配成功之后再检查join key 是否相等,最后join在一起
5 Sort Merge join (横行无敌)(大表join大表)
- 第一步:一般情况下,streamIter为大表,buildIter为小表,不用关心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。
- 第二步: 将具有相同性质的(如Hash值相同)join key 进行Shuffle到同一个分区。
- 第三步: 对streamIter 和 buildIter在shuffle read过程中先排序,join匹配时按顺序查找,匹配结束后不必重头开始,利用shuffle sort特性,查找性能解决了大表对大表的情形。
6 Spark Join 类型详解
6.0 准备数据集( Justin => 左表有,Rose =>右表有)
学习 Python中单引号,双引号,3个单引号及3个双引号的区别请参考:https://blog.csdn.net/woainishifu/article/details/76105667from pyspark.sql.types import * >>> rdd1 = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(4,'Justin', 21),(5,'Cindy', 20)]
park.createDataFrame(rdd, schema)
df.show()>>> schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("age", IntegerType(), True) ])
>>> df = spark.createDataFrame(rdd, schema)
>>> df.show()+---+------+---+
| id| name|age|
+---+------+---+
| 1| Alice| 18|
| 2| Andy| 19|
| 3| Bob| 17|
| 4|Justin| 21|
| 5| Cindy| 20|
+---+------+---+>>> rdd2 = sc.parallelize([('Alice', 160),('Andy', 159),('Bob', 170),('Cindy', 165),('Rose', 160)])
show()>>> schema2 = StructType([ StructField("name", StringType(), True), StructField("height", IntegerType(), True) ])
>>> df2 = spark.createDataFrame(rdd2, schema2)
>>> df2.show()
+-----+------+
| name|height|
+-----+------+
|Alice| 160|
| Andy| 159|
| Bob| 170|
|Cindy| 165|
| Rose| 160|
+-----+------+
复制代码
6.1 inner join
-
inner join是一定要找到左右表中满足join key 条件的记录,join key都存在的情形。
df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()df.join(df3, ["id", "name"], "inner").select(df.id, df.name,"age", "height").orderBy(df.id).show()df.join(df3, ["id", "name"], "inner").select(df.id, df['name'],"age", "height").orderBy(df.id).show()>>> df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()+---+-----+---+------+| id| name|age|height|+---+-----+---+------+| 1|Alice| 18| 160|| 2| Andy| 19| 159|| 3| Bob| 17| 170|| 5|Cindy| 20| 165|+---+-----+---+------+ 复制代码
6.2 left outer join
-
left outer join是以左表为准,在右表中查找匹配的记录,如果查找失败,左表行Row不变,右表一行Row中所有字段都为null的记录。
-
要求:左表是streamIter,右表是buildIter
df.join(df2, "name", "left").select("id", df.name, "age", "height").orderBy("id").show()>>> df.join(df2, "name", "left").select("id", "name", "age", "height").orderBy("id").show()+---+------+---+------+| id| name|age|height|+---+------+---+------+| 1| Alice| 18| 160|| 2| Andy| 19| 159|| 3| Bob| 17| 170|| 4|Justin| 21| null|| 5| Cindy| 20| 165|+---+------+---+------+ 复制代码
6.3 right outer join
-
right outer join是以右表为准,在左表中查找匹配的记录,如果查找失败,右表行Row不变,左表一行Row中所有字段都为null的记录。
-
要求:右表是streamIter,左表是buildIter
df.join(df2, "name", "right").select("id", df2.name, "age", "height").orderBy("id").show()>>> df.join(df2, "name", "right").select("id", "name", "age", "height").orderBy("id").show()+----+-----+----+------+| id| name| age|height|+----+-----+----+------+|null| Rose|null| 160|| 1|Alice| 18| 160|| 2| Andy| 19| 159|| 3| Bob| 17| 170|| 5|Cindy| 20| 165|+----+-----+----+------+ 复制代码
6.4 full outer join
-
full outer join仅采用sort merge join实现,左边和右表既要作为streamIter,又要作为buildIter
-
左表和右表已经排好序,首先分别顺序取出左表和右表中的一条记录,比较key,如果key相等,则joinrowA和rowB,并将rowA和rowB分别更新到左表和右表的下一条记录。
-
如果keyA<keyB,说明右表中没有与左表rowA对应的记录,那么joinrowA与nullRow。
-
将rowA更新到左表的下一条记录;如果keyA>keyB,则说明左表中没有与右表rowB对应的记录,那么joinnullRow与rowB。
-
将rowB更新到右表的下一条记录。如此循环遍历直到左表和右表的记录全部处理完。
>>> df.join(df2, "name", "outer").select("id", "name", "age", "height").orderBy("id").show()+----+------+----+------+| id| name| age|height|+----+------+----+------+|null| Rose|null| 160|| 1| Alice| 18| 160|| 2| Andy| 19| 159|| 3| Bob| 17| 170|| 4|Justin| 21| null|| 5| Cindy| 20| 165|+----+------+----+------+ 复制代码
6.5 left semi join
left semi join是以左表为准,在右表中查找匹配的记录,如果查找成功,则仅返回左表Row的记录,否则返回null。
6.6 left anti join
left anti join与left semi join相反,是以左表为准,在右表中查找匹配的记录,如果查找成功,则返回null,否则仅返回左边的记录
6.6 row_number().over()
from pyspark.sql.types import *
from pyspark.sql import Window
from pyspark.sql.functions import *
rdd = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(1,'Justin', 21),(1,'Cindy', 20)])
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("age", IntegerType(), True)
])df = spark.createDataFrame(rdd, schema)
df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).show()+---+------+---+---+| id| name|age| rn|+---+------+---+---+| 1| Alice| 18| 1|| 1| Cindy| 20| 2|| 1|Justin| 21| 3|| 3| Bob| 17| 1|| 2| Andy| 19| 1|+---+------+---+---+df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).orderBy("age").show()+---+------+---+---+| id| name|age| rn|+---+------+---+---+| 3| Bob| 17| 1|| 1| Alice| 18| 1|| 2| Andy| 19| 1|| 1| Cindy| 20| 2|| 1|Justin| 21| 3|+---+------+---+---+
复制代码
7 结语
一直想深入挖掘一下SparkSQL内部join原理,终于有时间详细的理一下 Shuffle Join 。作者还准备进一步研究Spark SQL 内核原理,敬请期待我的Spark SQL源码剖析系列。大数据商业实战社区微信公众号即将开启,敬请关注,谢谢!