SparkHiveSQL中Join操作的谓词下推?

前言:

SparkSQL和HiveSQL的Join操作中也有谓词下推?今天就通过大神的文章来了解下。同样,如有冒犯,请联系。

正文

上文简要介绍了Join在大数据领域中的使用背景以及常用的几种算法-broadcast hash join 、shuffle hash join以及sort merge join等,对每一种算法的核心应用场景也做了相关介绍,这里再重点说明一番:大表与小表进行join会使用broadcast hash join,一旦小表稍微大点不再适合广播分发就会选择shuffle hash join,最后,两张大表的话无疑选择sort merge join。
好了,问题来了,说是这么一说,但到底选择哪种算法归根结底是SQL执行引擎干的事情,按照上文逻辑,SQL执行引擎肯定要知道参与Join的两表大小,才能选择最优的算法喽!那么斗胆问一句,怎么知道两表大小?衡量两表大小的是物理大小还是纪录多少抑或两者都有?其实,这是另一门学问-基于代价优化(Cost Based Optimization,简称CBO),它不仅能够解释Join算法的选择问题,更重要的,它还能确定多表联合Join场景下的Join顺序问题。
是不是对CBO很期待呢?好吧,这里先刨个坑,下一个话题我们再聊。那今天要聊点什么呢?Join算法选择、Join顺序选择确实对Join性能影响极大,但,还有一个很重要的因素对Join的性能至关重要,那就是Join算法优化!无论是broadcast hash join、shuffle hash join还是sort merge join,都是最基础的join算法,有没有什么优化方案呢?还真有,这就是今天要聊的主角-Runtime Filter(下文简称RF)

RF预备知识:bloom filter

RF说白了是使用bloomfilter对参与join的表进行过滤,减少实际参与join的数据量。为了下文详细解释整个流程,有必要先解释一下bloomfilter这个数据结构(对之熟悉的看官可以绕道)。Bloom Filter使用位数组来实现过滤,初始状态下位数组每一位都为0,如下图所示:
在这里插入图片描述
假如此时有一个集合S = {x1, x2, … xn},Bloom Filter使用k个独立的hash函数,分别将集合中的每一个元素映射到{1,…,m}的范围。对于任何一个元素,被映射到的数字作为对应的位数组的索引,该位会被置为1。比如元素x1被hash函数映射到数字8,那么位数组的第8位就会被置为1。下图中集合S只有两个元素x和y,分别被3个hash函数进行映射,映射到的位置分别为(0,3,6)和(4,7,10),对应的位会被置为1:
在这里插入图片描述
现在假如要判断另一个元素是否是在此集合中,只需要被这3个hash函数进行映射,查看对应的位置是否有0存在,如果有的话,表示此元素肯定不存在于这个集合,否则有可能存在。下图所示就表示z肯定不在集合{x,y}中:
在这里插入图片描述

RF算法理论

为了更好地说明整个过程,这里使用一个SQL示例对RF算法进行完整讲解,SQL:

select item.name, order.* 
from order , item 
where order.item_id = item.id 
and item.category = ‘book’

,其中order为订单表,item为商品表,两张表根据商品id字段进行join,该SQL意为取出商品类别为书籍的所有订单详情。假设商品类型为书籍的商品并不多,join算法因此确定为broadcast hash join。整个流程如下图所示:
在这里插入图片描述
Step 1:将item表的join字段(item.id)经过多个hash函数映射处理为一个bloomfilter(如果对bloomfilter不了解,自行google)
Step 2:将映射好的bloomfilter分别广播到order表的所有partition上,准备进行过滤
Step 3:以Partition2为例,存储进程(比如DataNode进程)将order表中join列(order.item_id)数据一条一条读出来,使用bloomfilter进行过滤。淘汰该订单数据不是书籍相关商品的订单,这条数据直接跳过;否则该条订单数据有可能是待检索订单,将该行数据全部扫描出来。
Step 4:将所有未被bloomfilter过滤掉的订单数据,通过本地socket通信发送到计算进程(impalad)。
Step 5:再将所有书籍商品数据广播到所有Partition节点与step4所得订单数据进行真正的hashjoin操作,得到最终的选择结果。

RF算法分析

上面通过一个SQL示例简单演示了整个RF算法在broadcast hash join中的操作流程,根据流程对该算法进行一下理论层次分析:

  • RF本质:通过谓词(
    bloomfilter)下推,在存储层通过bloomfilter对数据进行过滤,可以从三个方面实现对Join的优化。其一,如果可以跳过很多记录,就可以减少了数据IO扫描次数。这点需要重点解释一下,许多朋友会有这样的疑问:既然需要把数据扫描出来使用BloomFilter进行过滤,为什么还会减少IO扫描次数呢?这里需要关注一个事实:大多数表存储行为都是列存,列之间独立存储,扫描过滤只需要扫描join列数据(而不是所有列),如果某一列被过滤掉了,其他对应的同一行的列就不需要扫描了,这样减少IO扫描次数。其二,减少了数据从存储层通过socket(甚至TPC)发送到计算层的开销,其三,减少了最终hash
    join执行的开销。
  • RF代价:对照未使用RF的Broadcast Hash
    Join来看,前者主要增加了bloomfilter的生成、广播以及大表根据bloomfilter进行过滤这三个开销。通常情况下,这几个步骤在小表较小的情况下代价并不大,基本可以忽略。
  • RF优化效果:基本取决于bloomfilter的过滤效果,如果大量数据被过滤掉了,那么join的性能就会得到极大提升;否则性能提升就会有限。
  • RF实现:和常见的谓词下推(’=‘,’>’,’<‘等)一样,RF实现需要在计算层以及存储层分别进行相关逻辑实现,计算层要构造bloomfilter并将bloomfilter下传到存储层,存储层要实现使用该bloomfilter对指定数据进行过滤。

RF效果验证

事实上,RF这个东东的优化效果是在组内同事何大神做impala on parquet以及impala on kudu的基准对比测试的时候分析发现的。实际测试中,impala on parquet 比之impala on kudu性能有明显优势,目测至少10倍性能提升。同一SQL解析引擎,不同存储引擎,性能竟然天壤之别!为了分析具体原因,同事就使用impala的执行计划分析工具对两者的执行计划分别进行了分析,才透过蛛丝马迹发现前者使用了RF,而后者并没有(当然可能还有其他因素,但RF肯定是原因之一)。
简单复盘一下这次测试吧,基准测试使用TPCDS测试,数据规模为1T,本文使用测试过程中的一个典型SQL(Q40)作为示例对RF的神奇功效进行回放演示。下图是Q40的对比性能,直观上来看RF可以直接带来40x的性能提升,40倍哎,这到底是怎么做到的?
在这里插入图片描述
先来简单看看Q40的SQL语句,如下所示,看起来比较复杂,核心涉及到3个表(catalog_sales join date_dim 、catalog_sales join warehouse 、catalog_sales join item)的join操作:

select  w_state  ,i_item_id  ,
sum(case when (cast(d_date as date) < cast (1998-04-08as date))  then cs_sales_price – coalesce(cr_refunded_cash,0) else 0 end) as sales_before  ,
sum(case when (cast(d_date as date) >= cast (1998-04-08as date))  then cs_sales_price – coalesce(cr_refunded_cash,0) else 0 end) as sales_after  
from  catalog_sales 
left outer join catalog_returns on  (catalog_sales.cs_order_number = catalog_returns.cr_order_number  
and catalog_sales.cs_item_sk = catalog_returns.cr_item_sk)  ,
warehouse  ,item  ,date_dim  
where  i_current_price between 0.99 and 1.49  
and item.i_item_sk = catalog_sales.cs_item_sk  
and catalog_sales.cs_warehouse_sk = warehouse.w_warehouse_sk  
and catalog_sales.cs_sold_date_sk = date_dim.d_date_sk  
and date_dim.d_date between1998-03-09and1998-05-08group by  w_state,i_item_id  
order by w_state,i_item_id  limit 100;

典型的星型结构,其中catalog_sales是事实表,其他表为纬度表。本次分析选择其中catalog_sales join item这个纬度的join。因为对比测试中两者的SQL解析引擎都是使用impala,所以SQL执行计划基本都相同。在此基础上,来看看执行计划中单个执行节点在执行catalog_sales join item操作时由先到后的主要阶段耗时,其中只贴出来重要耗时阶段(Q40中Join算法为shuffle hash join,与上文所举broadcast hash join示例略有不同,不过不影响结论):

实验项目impala on kudu(without runtime filter)impala on kudu(without runtime filter)
total time43s996ms2s385ms
bloomfilter生成Filter 0 arrival: 857ms
Filter 1 arrival: 879ms
Filter 2 arrival: 939ms
大表scan扫描HDFS_SCAN_NODE (id=0):(Total: 3s479ms)
– RowsRead: 72.01M
RowsReturned: 72.01M
– RowsReturnedRate: 20.69 M/s
HDFS_SCAN_NODE (id=0):(Total: 2s011ms)
– RowsRead: 72.01M
RowsReturned: 35.92K
– RowsReturnedRate: 17.86 K/sec
Filter 0 (1.00 MB):
– Rows processed: 72.01M
– Rows rejected: 71.43M
– Rows total: 72.01M
Filter 1 (1.00 MB):
– Rows processed: 49.15K
– Rows rejected: 126
– Rows total: 49.15K
Filter 2 (1.00 MB):
– Rows processed: 584.38K
– Rows rejected: 548.46K
– Rows total: 584.38K
数据加载计算进程内存DataStreamSender (dst_id=11):(Total: 15s984ms)
– NetworkThroughput(*): 298.78 MB/sec
– OverallThroughput: 100.85 MB/sec
– RowsReturned: 72.01M– SerializeBatchTime: 10s567ms
TransmitDataRPCTime: 5s395ms
DataStreamSender (dst_id=11):(Total: 10.725ms)
– NetworkThroughput(*): 244.06 MB/sec
– OverallThroughput: 71.23 MB/sec
– RowsReturned: 35.92K
SerializeBatchTime: 7.544ms
TransmitDataRPCTime: 3.130ms
Hash JoinHASH_JOIN_NODE (id=5): (Total: 19s104ms
– BuildPartitionTime: 862.560ms
– BuildRows: 8.99M
– BuildRowsPartitioned: 8.99M
– BuildTime: 373.855ms
– ……
– ProbeRows: 90.00M
– ProbeRowsPartitioned: 0 (0)
ProbeTime: 17s628ms
– RowsReturned: 90.00M
– RowsReturnedRate: 985.85 K/s
– SpilledPartitions: 0 (0)
– UnpinTime: 960.000ns
HASH_JOIN_NODE (id=6): (Total: 21.707ms)
– BuildPartitionTime: 3.487ms
– BuildRows: 18.81K (18814)
– BuildRowsPartitioned: 18.81K
– BuildTime: 646.817us
– ……
– ProbeRows: 85.28K (85278)
– ProbeRowsPartitioned: 0 (0)
ProbeTime: 6.396ms
– RowsReturned: 85.27K
– RowsReturnedRate: 38.88 K/s
– SpilledPartitions: 0 (0)
– UnpinTime: 915.000ns

经过对两种场景执行计划的解析,可以基本验证上文所做的基本理论结果:
1. 确认经过RF之后大表的数据量得到大量滤除,只剩下少量数据参与最终的HashJoin。参见第二行大表scan扫描结果,未使用rf的返回结果有7千万行+纪录,而经过RF过滤之后满足条件的只有3w+纪录。3万相比7千万,性能优化效果自然不言而喻。
2. 经过RF滤除之后,少量数据经过网络从存储进程加载到计算进程内存的网络耗时大量减少。参见第三行“数据加载到计算进程内存”,前者耗时15s,后者耗时仅仅11ms。主要耗时分为两部分,其中数据序列化时间占到2/3-10s左右,数据经过RPC传输时间占另外1/3 -5s左右。
3. 最后,经过RF滤除之后,参与到最终Hash Join的数据量大幅减少,Hash Join耗时前者是19s,后者是21ms左右。主要耗时在于大表Probe Time,前者消耗了17s左右,而后者仅需6ms。

说好的谓词下推呢?
讲真,刚开始接触RF的时候觉得这简直是一个实实在在的神器,崇拜之情溢于言表。然而,经过一段时间的探索消化,直至把这篇文章写完,也就是此时此刻,忽然觉得它并不高深莫测,说白了就是一个谓词下推,不同的是这里的谓词稍微奇怪一点,是一个bloomfilter而已。

提到谓词下推,这里再引申一下下。以前经常满大街听到谓词下推,然而对谓词下推却总感觉懵懵懂懂,并不明白的很真切。经过RF的洗礼,现在确信有了更进一步的理解。这里拿出来和大家交流交流。个人认为谓词下推有两个层面的理解:

  • 其一是逻辑执行计划优化层面的说法,比如SQL语句:select * from order ,item where item.id =order.item_id and item.category =‘book’,正常情况语法解析之后应该是先执行Join操作,再执行Filter操作。通过谓词下推,可以将Filter操作下推到Join操作之前执行。即将where item.category = ‘book’下推到 item.id = order.item_id之前先行执行。

  • 其二是真正实现层面的说法,谓词下推是将过滤条件从计算进程下推到存储进程先行执行,注意这里有两种类型进程:计算进程以及存储进程。计算与存储分离思想,这在大数据领域相当常见,比如最常见的计算进程有SparkSQL、Hive、impala等,负责SQL解析优化、数据计算聚合等,存储进程有HDFS(DataNode)、Kudu、HBase,负责数据存储。正常情况下应该是将所有数据从存储进程加载到计算进程,再进行过滤计算。谓词下推是说将一些过滤条件下推到存储进程,直接让存储进程将数据过滤掉。这样的好处显而易见,过滤的越早,数据量越少,序列化开销、网络开销、计算开销这一系列都会减少,性能自然会提高。

写到这里,忽然意识到笔者在上文出现了一个很严重的认知错误:RF机制并不仅仅是一个简单的谓词下推,它的精髓在于提出了一个重要的谓词-bloomfilter。当前对RF支持的系统并不多,笔者只知道目前唯有Impala on Parquet进行了支持。Impala on Kudu虽说Impala支持,但Kudu并不支持。SparkSQL on Parqeut中虽有存储系统支持,无奈计算引擎-SparkSQL目前还不支持。

转自:http://hbasefly.com/2017/04/10/bigdata-join-2/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/456344.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

六种方式实现生产者消费者(未完)

2019独角兽企业重金招聘Python工程师标准>>> 一、利用Object对象是wait和notify\notifyAll package com.jv.parallel.consumerandproducer.objectwait;public class Car {private volatile int flag 0;public void showConsumer(){System.out.println("I am a…

SQL中基于代价的优化

还记得笔者在上篇文章无意中挖的一个坑么&#xff1f;如若不知&#xff0c;强烈建议看官先行阅读前面两文&#xff0d;《SparkSQL Join原理》和《Join中竟然也有谓词下推?》 第一篇文章主要分析了大数据领域Join的三种基础算法以及各自的适用场景&#xff0c;第二篇文章在第一…

Java Map 怎样实现Key 的唯一性?

大家都知道。在Map和Set不可存在反复元素&#xff1f; 可是对于内部的细节我们并不了解。今天我们就一块来 探讨一下&#xff01; 1 对于 HashMap HashSet 他们的底层数据结构的实现是&#xff1a;维护了一张 HashTable 。容器中的元素所有存储在Hashtable 中。他们再加入…

win10下安装pyspark及碰到的问题

文章目录前言安装过程Q1总结&#xff1a;前言 最近由于工作需要&#xff0c;需要了解下pyspark&#xff0c;所以就在win10环境下装了下&#xff0c;然后在pycharm中使用的时候碰到了一些问题。整个过程可谓是一波三折。下面一一道来。 安装过程 安装过程就不详细说了&#x…

解决AttributeError AttributeError: 'NoneType' object has no attribute 'filename'

原因忘记上传文件 表单需要加属性 enctype"multipart/form-data" 否则报错&#xff01;AttributeError AttributeError: NoneType object has no attribute filename enctype"multipart/form-data是设置表单的MIME编码。默认情况&#xff0c;这个编码格式是ap…

SQLAlchemy()分页器paginate方法

Flask的数据分页示例 用法&#xff1a; 1&#xff0c;首先写数据获取的视图函数&#xff0c;就像这样&#xff1a; # 首页 blog_bp.route(/, endpointindex) def index():#获取页数page request.args.get(page,1)paginate Article.query.paginate(pageint(page),per_page3)…

开源中国 2014 年源创会年度计划

时光总是从敲代码的指尖不经意地滑过&#xff0c;转眼2014年已快过去一半&#xff0c;OSC依然心怀着最初的梦想。 源创会&#xff0c;oscer的线下快乐大本营&#xff0c;我们仍在继续...... 聆听技术大牛讲解最前沿的技术&#xff0c;和同道中人切磋IT秘籍&#xff0c;吃点心侃…

互联网金融行业申请评分卡(A卡)简介

文章目录前言基本概念1、信用违约风险的基本概念什么是信用违约风险&#xff1a;组成部分违约的主体个贷中常用的违约定义M0&#xff0c;M1&#xff0c;M2的定义2、申请评分卡的重要性和特性信贷场景中的评分卡申请评分卡的概念为什么要开发申请评分卡评分卡的特性 &#xff08…

Flask的csrf_token的用法

在flask当中&#xff0c;flask-wtf模块时携带csrf校验的&#xff0c;只是需要开启&#xff1b; 如果不开启校验就不需要校验&#xff0c;但是那样不安全。 Csrf是针对与post请求的跨域限制&#xff0c;get请求没有作用 csrf_token的开启 在flask中开启csrf保护 from flask_…

dotty编译器语法特性之一交叉类型,联合类型和文本单例类型

2019独角兽企业重金招聘Python工程师标准>>> ###翻译&#xff1a;http://dotty.epfl.ch/docs/reference/intersection-types.html #交叉类型 trait Resettable {def reset(): this.type } trait Growable[T] {def add(x: T): this.type } def f(x: Resettable &…

【转】Zookeeper 安装和配置

转自&#xff1a;http://coolxing.iteye.com/blog/1871009 Zookeeper的安装和配置十分简单, 既可以配置成单机模式, 也可以配置成集群模式. 下面将分别进行介绍. 单机模式 1. 配置 点击这里下载zookeeper的安装包之后, 解压到合适目录. 进入zookeeper目录下的conf子目录, 创建z…

一分钟精通Flask-Bootstrap的使用

要想在程序中集成Bootstrap&#xff0c;显然要对模板做所有必要的改动。不过&#xff0c;更简单的方法是使用一个名为Flask-Bootstrap 的Flask 扩展&#xff0c;简化集成的过程。 安装&#xff1a; Flask-Bootstrap 使用pip安装&#xff1a; pip install flask_bootstrap Fl…

linux生产环境下安装anaconda总结

前言&#xff1a; 工作中&#xff0c;常常要在新的linux生产服务器中安装自己的集成python环境&#xff0c;这种情况下有一点需要注意&#xff1a;不能覆盖生产服务器中的python环境&#xff08;也就是自己的python环境要和系统的python环境分开&#xff09;。一般情况下系统自…

FSF 称 DRM 被用于锁定、控制和监视用户

自由软件基金会正在督促美国政府废除DMCA中保护DRM的反规避条款。DMCA的1201条款禁止绕过DRM保护的内容和设备。 自由软件基金会的Donald Robertson在致函美国版权办公室的信&#xff08;PDF&#xff09;中指出&#xff0c;技术保护措施和数字限制管理&#xff08;即DRM&#x…

改数据库表结构类型两种方法

alter table user change password password varchar(128) not null; alter table user modify column password varchar(128) not null;

申请评分卡(A卡)的开发过程(1)

前言&#xff1a; 本篇文章上接《申请评分卡简介》&#xff0c;有需要的童鞋可以参考下&#xff1a;https://blog.csdn.net/qq_16633405/article/details/107744921 下面介绍下A卡的开发步骤。 开发过程 1、评分卡模型开发步骤&#xff1a; 1、立项&#xff1a;场景&#…

E: 无法获得锁 /var/lib/dpkg/lock-frontend - open (11: 资源暂时不可用) E: 无法获取 dpkg 前端锁 (/var/lib/dpkg/lock-front

解决&#xff1a; E: 无法获得锁 /var/lib/dpkg/lock-frontend - open (11: 资源暂时不可用) E: 无法获取 dpkg 前端锁 (/var/lib/dpkg/lock-frontend)&#xff0c;是否有其他进程正占用它&#xff1f; 方法&#xff1a; 重新启动虚拟机服务器 再在黑屏终端中重新尝试输入su…

聚类效果评估指标总结

前言 实际工作中经常会用到一些聚类算法对一些数据进行聚类处理&#xff0c;如何评估每次聚类效果的好坏&#xff1f;可选的方法有1、根据一些聚类效果的指标来评估&#xff1b;2、直接打点。今天就主要总结下这段时间了解的聚类效果评估指标。废话少说&#xff0c;直接上干货…

{%extends bootstrap/base.html%}的添加,使得其他block无法继承

仙说{%extends "bootstrap/base.html"%}用法&#xff1a; 在base.html中调用一次即可&#xff0c;并且 {%extends "bootstrap/base.html"%} 要放在 最后头&#xff01;&#xff01;最后头&#xff01;最后头&#xff01; base中不用再添加 {% block cont…

运用Nginx代理和UWSGI将Flask项目部署在Linux中 详细步骤

nginx: 安装可以参照的路径: http://nginx.org/en/linux_packages.html#Ubuntu 启动Nginx nginx [ -c configpath] 默认配置目录&#xff1a;/etc/nginx/nginx.conf 查看进程&#xff1a; ps -ef |grep nginx 控制Nginx nginx -s xxxstop 快速关闭quit …