SparkSQL 之 Shuffle Join 内核原理及应用深度剖析-Spark商业源码实战

本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何商业交流,可随时联系。

1 Spark SQL 坚实后盾DataFrame

  • DataFrame是一个分布式数据容器,更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。
  • JSON schema自动推导
  • Hive风格分区表自动识别
  • 充分利用RCFile、ORC、Parquet等列式存储格式的优势,仅扫描查询真正涉及的列,忽略其余列的数据。
  • 聚合统计函数支持

2 Spark SQL 源码包结构(溯本逐源)

主要分为4类:

  • core模块:处理数据的输入输出,比如:把不同数据源(RDD,json,Parquet等)获取到数据,并将查询结果输出到DataFrame。
  • catalyst模块:处理SQL语句的整个过程,包括解析,绑定,优化,物理计划等查询优化。
  • hive模块:对hive数据进行处理。
  • hive-ThriftServer:提供CLI以及JDBC和ODBC接口。

3 Spark SQL catalyst模块设计思路

(详细请参看我的SparkSQL源码解析内容)

catalyst主要组件有

  • sqlParse => sql语句的语法解析
  • Analyzer => 将不同来源的Unresolved Logical Plan和元数据(如hive metastore、Schema catalog)进行绑定,生成resolved Logical Plan
  • optimizer => 根据OptimizationRules,对resolvedLogicalPlan进行合并、列裁剪、过滤器下推等优化作业而转换成optimized Logical Plan
  • Planner => LogicalPlan转换成PhysicalPlan
  • CostModel => 根据过去的性能统计数据,选择最佳的物理执行计划

4 Hash Join的衍生(剑走偏锋)

4.1 Hash join 设计思路剖析(总领全局)

  • 第一步:一般情况下,streamIter为大表,buildIter为小表,不用关心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。
  • 第二步:根据buildIter Table的join key构建Hash Table,把每一行记录都存进HashTable,位于内存中。
  • 第三步:扫描streamIter Table 每一行数据,使用相同的hash函数匹配 Hash Table中的记录,匹配成功之后再检查join key 是否相等,最后join在一起
  • 总结 : hash join 只扫描两表一次,可以认为运算复杂度为o(a+b),效率非常高。笛卡尔集运算复杂度为a*b。另外,构建的Hash Table最好能全部加载在内存,效率最高,这就决定了hash join算法只适合至少一个小表的join场景,对于两个大表的join场景并不适用。

4.2 broadcast Hash join 设计思路剖析(大表join极小表)

  • 第一步:一般情况下,streamIter为大表,buildIter为小表,不用关心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。

  • 第二步: 先把小表广播到所有大表分区所在节点,然后根据buildIter Table的join key构建Hash Table,把每一行记录都存进HashTable

  • 第三步:扫描streamIter Table 每一行数据,使用相同的hash函数匹配 Hash Table中的记录,匹配成功之后再检查join key 是否相等,最后join在一起

  • 总结 : hash join 只扫描两表一次,可以认为运算复杂度为o(a+b)。

  • 调优

     1 buildIter总体估计大小超过spark.sql.autoBroadcastJoinThreshold设定的值,即不满足broadcast join条件2 开启尝试使用hash join的开关,spark.sql.join.preferSortMergeJoin=false3 每个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold设定的值,即shuffle read阶段每个分区来自buildIter的记录要能放到内存中4 streamIter的大小是buildIter三倍以上
    复制代码

4.2 shuffle Hash join 设计思路剖析(大表join小表)

  • 第一步:一般情况下,streamIter为大表,buildIter为小表,不用关心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。
  • 第二步: 将具有相同性质的(如Hash值相同)join key 进行Shuffle到同一个分区。
  • 第三步:先把小表广播到所有大表分区所在节点,然后根据buildIter Table的join key构建Hash Table,把每一行记录都存进HashTable
  • 第四步:扫描streamIter Table 每一行数据,使用相同的hash函数匹配 Hash Table中的记录,匹配成功之后再检查join key 是否相等,最后join在一起

5 Sort Merge join (横行无敌)(大表join大表)

  • 第一步:一般情况下,streamIter为大表,buildIter为小表,不用关心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。
  • 第二步: 将具有相同性质的(如Hash值相同)join key 进行Shuffle到同一个分区。
  • 第三步: 对streamIter 和 buildIter在shuffle read过程中先排序,join匹配时按顺序查找,匹配结束后不必重头开始,利用shuffle sort特性,查找性能解决了大表对大表的情形。

6 Spark Join 类型详解

6.0 准备数据集( Justin => 左表有,Rose =>右表有)

学习 Python中单引号,双引号,3个单引号及3个双引号的区别请参考:https://blog.csdn.net/woainishifu/article/details/76105667from pyspark.sql.types import * >>> rdd1 = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(4,'Justin', 21),(5,'Cindy', 20)]
park.createDataFrame(rdd, schema)
df.show()>>> schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("age", IntegerType(), True) ]) 
>>> df = spark.createDataFrame(rdd, schema)
>>> df.show()+---+------+---+
| id|  name|age|
+---+------+---+
|  1| Alice| 18|
|  2|  Andy| 19|
|  3|   Bob| 17|
|  4|Justin| 21|
|  5| Cindy| 20|
+---+------+---+>>> rdd2 = sc.parallelize([('Alice', 160),('Andy', 159),('Bob', 170),('Cindy', 165),('Rose', 160)]) 
show()>>> schema2 = StructType([ StructField("name", StringType(), True), StructField("height", IntegerType(), True) ]) 
>>> df2 = spark.createDataFrame(rdd2, schema2) 
>>> df2.show()
+-----+------+
| name|height|
+-----+------+
|Alice|   160|
| Andy|   159|
|  Bob|   170|
|Cindy|   165|
| Rose|   160|
+-----+------+
复制代码

6.1 inner join

  • inner join是一定要找到左右表中满足join key 条件的记录,join key都存在的情形。

      df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()df.join(df3, ["id", "name"], "inner").select(df.id, df.name,"age", "height").orderBy(df.id).show()df.join(df3, ["id", "name"], "inner").select(df.id, df['name'],"age", "height").orderBy(df.id).show()>>> df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()+---+-----+---+------+| id| name|age|height|+---+-----+---+------+|  1|Alice| 18|   160||  2| Andy| 19|   159||  3|  Bob| 17|   170||  5|Cindy| 20|   165|+---+-----+---+------+   
    复制代码

6.2 left outer join

  • left outer join是以左表为准,在右表中查找匹配的记录,如果查找失败,左表行Row不变,右表一行Row中所有字段都为null的记录。

  • 要求:左表是streamIter,右表是buildIter

          df.join(df2, "name", "left").select("id", df.name, "age", "height").orderBy("id").show()>>> df.join(df2, "name", "left").select("id", "name", "age", "height").orderBy("id").show()+---+------+---+------+| id|  name|age|height|+---+------+---+------+|  1| Alice| 18|   160||  2|  Andy| 19|   159||  3|   Bob| 17|   170||  4|Justin| 21|  null||  5| Cindy| 20|   165|+---+------+---+------+
    复制代码

6.3 right outer join

  • right outer join是以右表为准,在左表中查找匹配的记录,如果查找失败,右表行Row不变,左表一行Row中所有字段都为null的记录。

  • 要求:右表是streamIter,左表是buildIter

          df.join(df2, "name", "right").select("id", df2.name, "age", "height").orderBy("id").show()>>> df.join(df2, "name", "right").select("id", "name", "age", "height").orderBy("id").show()+----+-----+----+------+|  id| name| age|height|+----+-----+----+------+|null| Rose|null|   160||   1|Alice|  18|   160||   2| Andy|  19|   159||   3|  Bob|  17|   170||   5|Cindy|  20|   165|+----+-----+----+------+
    复制代码

6.4 full outer join

  • full outer join仅采用sort merge join实现,左边和右表既要作为streamIter,又要作为buildIter

  • 左表和右表已经排好序,首先分别顺序取出左表和右表中的一条记录,比较key,如果key相等,则joinrowA和rowB,并将rowA和rowB分别更新到左表和右表的下一条记录。

  • 如果keyA<keyB,说明右表中没有与左表rowA对应的记录,那么joinrowA与nullRow。

  • 将rowA更新到左表的下一条记录;如果keyA>keyB,则说明左表中没有与右表rowB对应的记录,那么joinnullRow与rowB。

  • 将rowB更新到右表的下一条记录。如此循环遍历直到左表和右表的记录全部处理完。

      >>> df.join(df2, "name", "outer").select("id", "name", "age", "height").orderBy("id").show()+----+------+----+------+|  id|  name| age|height|+----+------+----+------+|null|  Rose|null|   160||   1| Alice|  18|   160||   2|  Andy|  19|   159||   3|   Bob|  17|   170||   4|Justin|  21|  null||   5| Cindy|  20|   165|+----+------+----+------+
    复制代码

6.5 left semi join

left semi join是以左表为准,在右表中查找匹配的记录,如果查找成功,则仅返回左表Row的记录,否则返回null。

6.6 left anti join

left anti join与left semi join相反,是以左表为准,在右表中查找匹配的记录,如果查找成功,则返回null,否则仅返回左边的记录

6.6 row_number().over()

from pyspark.sql.types import *
from pyspark.sql import Window
from pyspark.sql.functions import *
rdd = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(1,'Justin', 21),(1,'Cindy', 20)])
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("age", IntegerType(), True)
])df = spark.createDataFrame(rdd, schema)
df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).show()+---+------+---+---+| id|  name|age| rn|+---+------+---+---+|  1| Alice| 18|  1||  1| Cindy| 20|  2||  1|Justin| 21|  3||  3|   Bob| 17|  1||  2|  Andy| 19|  1|+---+------+---+---+df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).orderBy("age").show()+---+------+---+---+| id|  name|age| rn|+---+------+---+---+|  3|   Bob| 17|  1||  1| Alice| 18|  1||  2|  Andy| 19|  1||  1| Cindy| 20|  2||  1|Justin| 21|  3|+---+------+---+---+
复制代码

7 结语

一直想深入挖掘一下SparkSQL内部join原理,终于有时间详细的理一下 Shuffle Join 。作者还准备进一步研究Spark SQL 内核原理,敬请期待我的Spark SQL源码剖析系列。大数据商业实战社区微信公众号即将开启,敬请关注,谢谢!

秦凯新 于深圳 201811200130

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/276878.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python标准库之csv(1)

1.Python处理csv文件之csv.writer() import csvdef csv_write(path,data):with open(path,w,encodingutf-8,newline) as f:writer csv.writer(f,dialectexcel)for row in data:writer.writerow(row)return True 调用上面的函数 data [[Name,Height],[Keys,176cm],[HongPing,1…

python自动化测试脚本可以测php吗_请对比分析一下php的自动化测试与python的自动化测试...

Unit Level Test: Python: doctest, nose PHP: PHPUnit Behaviour Driven Test (Cucumber-style): Python: Lettuce, Behave PHP: Behat Behaviour Driven Test (Spec-style): Python: spec PHP: PHPSpec Acceptance Test (Selenium): Python: 有官方的Selenium binding。 PHP:…

简单易变的CSS阴影效果

厌倦了在图片处理软件上给每张图片加上边框修饰&#xff1f;让CSS帮你一把吧&#xff01;嘿嘿&#xff0c;看看下面的几张效果图&#xff0c;边框都不是用图片做的&#xff0c;很方便吧&#xff1f; 文字块的应用效果 NARROW This is the text that goes in the middle. MEDIUM…

我用代码来给你们分析一个赚钱的技巧

2019独角兽企业重金招聘Python工程师标准>>> 赚钱是个俗气的话题&#xff0c;但又是人人都绕不开的事情。我今天来“科学”地触碰下这个话题。 谈赚钱&#xff0c;就会谈到理财、投资&#xff0c;谈到炒股。有这样一个笑话&#xff1a; 问&#xff1a;如何成为百万富…

idea中自动deployment的步骤

转载于:https://www.cnblogs.com/littlehb/p/11322666.html

python怎么编辑文件_如何使用python中的方法对文件进行修改文件名

在使用python语言中的方法操作文件时&#xff0c;打开方法可以直接使用open&#xff0c;但是对文件重命名需要调用os模块中的方法&#xff0c;删除文件也是工具/原料 python 编辑器 截图工具 台式机 方法/步骤 1 进入到python安装文件目录&#xff0c;新建txt文件kou.txt2 打开…

球迷必备Euro Cup Mobile 2008 !-dopod touch diamond试用之欧洲杯

欧洲杯从6月8日开始&#xff0c;到现在已经进行了半个多月了。到今天为止已经进入到了尾声&#xff0c;也到了激战正酣的时刻&#xff01;(相信在国足出线无望后大伙的目光都聚集到了欧洲杯上) 但是平时上班忙&#xff0c;晚上也没法熬夜看球&#xff0c;哥们心理着急呀。白天上…

【工具】switchhost

1.前提 主要功能切换host 2.下载路径 https://oldj.github.io/SwitchHosts/ 3.使用略&#xff08;太简单&#xff09;转载于:https://www.cnblogs.com/totoro-cat/p/9987101.html

C# ?. 判斷Null值

有一句代碼&#xff1a; Html.DisplayFor(modelItem > item.SellDate, "RegularDate") RegularDate.cshtml 內容如下&#xff1a; model System.DateTime Model.ToString("yyyy/MM/dd") 目的是將數據庫里的 DateTime 顯示為完整日期&#xff0c;如 2019…

MOSS站点的FORM认证修改小结

项目中&#xff0c;将moss站点修改成form认证的方法&#xff0c;园子里面已经很多了&#xff0c;我就不再重提&#xff0c;其中有1点有些文章没有提及&#xff0c;但是实际操作中又是比较重要的&#xff1a;在管理中心的web.config中添加roleManager之后&#xff0c;一定要将ht…

python中意外缩进是什么意思_如何处理python中的“意外缩进”?

慕工程0101907 Python在行的开头使用间距来确定代码块何时开始和结束。你可以得到的错误是&#xff1a;意外的缩进。这行代码在开始时比前一行有更多空格&#xff0c;但前一行不是子块的开头&#xff08;例如if / while / for语句&#xff09;。块中的所有代码行必须以完全相同…

HDU 1042 N!(高精度阶乘、大数乘法)

N! Time Limit: 10000/5000 MS (Java/Others) Memory Limit: 262144/262144 K (Java/Others)Total Submission(s): 100274 Accepted Submission(s): 30006 Problem Description Given an integer N(0 ≤ N ≤ 10000), your task is to calculate N!Input One N in one li…

设计模式学习笔记九:原型模式(Prototype Pattern)

1&#xff0e;概述 意图&#xff1a;我们将已经存在的对象作为原型&#xff0c;用户可以通过复制这些原型创建新的对象。 使用场合&#xff1a;当一个系统应该独立于产品的创建、构造和表示时&#xff0c;可以使用原型模式。在原型模式中&#xff0c;产品的创建和初始化…

Centos7上安装docker

步骤&#xff1a;1、Docker 要求 CentOS 系统的内核版本高于 3.10 &#xff0c;查看本页面的前提条件来验证你的CentOS 版本是否支持 Docker 。通过 uname -r 命令查看你当前的内核版本2、使用 root 权限登录 Centos。确保 yum 包更新到最新。 &#xff08;这个可能需要几分钟的…

pythonista3安装stash_Pythonista下stash安装教程

前言 “StaSh is a serious attempt to implement a Bash-like shell for Pythonista.” StaSh是一个Pythonista环境下的仿shell程序&#xff0c;Sta来自于Pythonista的后三个字母&#xff0c;Sh即shell缩写。除了能完成shell的基本功能外&#xff0c;最主要的功能还有实现pip安…

通过java类的反射机制获取类的属性类型

import java.lang.reflect.Field;import java.lang.reflect.Method; Class<?> clsClass.forName(className);//通过类的名称反射类//Class<?> cls Object.getClass();Field field cls.getDeclaredField("name");//根据属性名称获取单个属性if (field…

建立合理的索引提高SQL Server的性能

在应用系统中,尤其在联机事务处理系统中,对数据查询及处理速度已成为衡量应用系统成败的标准。而采用索引来加快数据处理速度也成为广大数据库用户所接受的优化方法。 在良好的数据库设计基础上&#xff0c;能有效地使用索引是SQL Server取得高性能的基础&#xff0c;SQL Serv…

c++ map用法_Python的 5 种高级用法,效率提升没毛病

原创&#xff1a;机器之心(ID&#xff1a;almosthuman2014)任何编程语言的高级特征通常都是通过大量的使用经验才发现的。比如你在编写一个复杂的项目&#xff0c;并在 stackoverflow 上寻找某个问题的答案。然后你突然发现了一个非常优雅的解决方案&#xff0c;它使用了你从不…

非对称加密算法RSA加密传输数据python3源代码实现

2019独角兽企业重金招聘Python工程师标准>>> import rsa# RSA 算法规定&#xff1a; # 待加密的字节数不能超过密钥的长度值除以 8 再减去 11NBIT 4096 CAN_ENCODE_LEN NBIT // 8 - 11 PER_ENCODE_LEN CAN_ENCODE_LEN - (CAN_ENCODE_LEN % 2) PER_DECODE_LEN CA…

(Microsoft) Visual Studio LightSwitch

在蓝色小铺&#xff0c;听到了前辈 阿源哥哥提到 Visual Studio LightSwitch"号称" 可以快速开发桌面、云端的应用程序。http://www.microsoft.com/visualstudio/en-us/lightswitch &#xff08;这里也提供下载&#xff09; 原厂提供的图片&#xff1a; 跟「正…