Spark Paimon 中为什么我指定的分区没有下推

背景

最近在使用 Paimon 的时候遇到了一件很有意思的事情,写的 SQL 居然读取的数据不下推,明明是分区表,但是却全量扫描了。
目前使用的版本信息如下:
Spark 3.5.0
Paimon 0.6.0
paimon的建表语句如下:

CREATE TABLE `table_demo`(`user_id` string COMMENT 'from deserializer' )
PARTITIONED BY ( `dt` string COMMENT '日期, yyyyMMdd', `hour` string COMMENT '小时, HH')
ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe' 
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler' 
WITH SERDEPROPERTIES ( 'serialization.format'='1')
LOCATION'xxxx'
TBLPROPERTIES ('bucket'='50', 'bucketing_version'='2', 'bukect-key'='user_id', 'file.format'='parquet', 'merge-engine'='partial-update', 'partial-update.ignore-delete'='true', 'primary-key'='user_id', 'transient_lastDdlTime'='1701679855', 'write-only'='false')

查询的SQL如下:

select * from 
table_demo
where dt =20231212
and hour =10
limit 100;

注意我们这里写的dt是整数类型,而表中定义的是字符串类型

结论及解决方法

结论

具体的原因是Spark DSv2中的规则 V2ScanRelationPushDown.pushDownFilters 对于 Cast类型转换表达式不会传递到DataSource端,所以只会在读取完Source转换进行过滤,
这种情况下,对于文件的读取IO会增大,但是对于shuffle等操作是不会有性能的影响的。

解决方法

对于分区字段来说,我们在写SQL对分区字段进行过滤的时候,保持和分区字段类型一致

分析

错误写法分析

针对于错误的写法,也就是导致读取全量数据的写法,我们分析一下,首先是类型转换阶段,在Spark中,对于类型不匹配的问题,spark会用规则进行转换,具体的规则是
CombinedTypeCoercionRule,
在日志中可以看到:

=== Applying Rule org.apache.spark.sql.catalyst.analysis.TypeCoercionBase$CombinedTypeCoercionRule ==='GlobalLimit 100                                                                                   'GlobalLimit 100+- 'LocalLimit 100                                                                                     +- 'LocalLimit 100+- 'Project [*]                                                                                        +- 'Project [*]
!      +- 'Filter ((dt#520 = 20231212) AND (hour#521 = 10))                                                   +- Filter ((cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10))+- SubqueryAlias spark_catalog.default.table_demo                                                      +- SubqueryAlias spark_catalog.default.table_demo+- RelationV2[user_id#497, dt#520, hour#521] spark_catalog.default.table_demo                          +- RelationV2[user_id#497,dt#520, hour#521] spark_catalog.default.table_demo

通过以上规则我们可以看到 过滤条件(dt#520 = 20231212) AND (hour#521 = 10) 转换为了 (cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10)

接着再经过以下规则:V2ScanRelationPushDown的洗礼,我们可以看到如下日志:

12-13 13:52:58 763  INFO (org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown:60) - 
Pushing operators to table_demo
Pushed Filters: IsNotNull(dt), IsNotNull(hour)
Post-Scan Filters: (cast(dt#520 as int) = 20231212),(cast(hour#521 as int) = 10)
12-13 13:52:58 723  INFO (org.apache.paimon.spark.PaimonScanBuilder:62) - pushFilter log: IsNotNull(dt),IsNotNull(hour)
12-13 13:52:58 823  INFO (org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown:60) - 
Output: user_id#497, dt#520, hour#52112-13 13:52:58 837  INFO (org.apache.spark.sql.catalyst.rules.PlanChangeLogger:60) - 
=== Applying Rule org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown ===InsertIntoHadoopFsRelationCommand ], Overwrite, [user_id,  dt, hour]                                                                        InsertIntoHadoopFsRelationCommand xxx, false, Parquet, [path=xxx], Overwrite, [user_id, dt, hour] +- WriteFiles                                                                                                                                    +- WriteFiles+- Repartition 1, true                                                                                                                           +- Repartition 1, true+- GlobalLimit 100                                                                                                                               +- GlobalLimit 100+- LocalLimit 100                                                                                                                                +- LocalLimit 100
!            +- Filter ((isnotnull(dt#520) AND isnotnull(hour#521)) AND ((cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10)))                   +- Filter ((cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10))
!               +- RelationV2[user_id#497, dt#520, hour#521] spark_catalog.default.table_demo table_demo                                                                     +- RelationV2[user_id#497, dt#520, hour#521] spark_catalog.default.table_demo   table_demo

这里只有过滤条件 isnotnull(dt#520) AND isnotnull(hour#521) 被下推到了 DataSource。
从现象来看,确实分区的过滤条件没有推到DataSource端, 我们来分析一下该规则的数据流:

V2ScanRelationPushDown.pushDownFilters||\/
PushDownUtils.pushFilters||\/
DataSourceStrategy.translateFilterWithMappin||\/
translateLeafNodeFilter

具体到translateLeafNodeFilter 方法:

  private def translateLeafNodeFilter(predicate: Expression,pushableColumn: PushableColumnBase): Option[Filter] = predicate match {case expressions.EqualTo(pushableColumn(name), Literal(v, t)) =>Some(sources.EqualTo(name, convertToScala(v, t)))case expressions.EqualTo(Literal(v, t), pushableColumn(name)) =>Some(sources.EqualTo(name, convertToScala(v, t)))...case _ => None

这里没有对Cast表达式进行处理,所以说最后返回的就是不能下推的处理,而 Paimon datasouce那边,具体的类为PaimonBaseScanBuilder

  override def pushFilters(filters: Array[Filter]): Array[Filter] = {

这里传进来的filters实参 就不存在 (cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10) 这个过滤条件,所以就不会下推到Paimon中去

其实不仅仅是对于Paimon Source, 其他的source也会有这个问题。

正确学法分析

正确的SQL如下:

select * from 
table_demo
where dt ='20231212'
and hour ='10'
limit 100;

运行如上SQL,我们可以看到如下日志:

12-14 14:22:42 328  INFO (org.apache.paimon.spark.PaimonScanBuilder:62) - pushFilter log: IsNotNull(dt),IsNotNull(hour),EqualTo(dt,20231212),EqualTo(hour,10)
12-14 14:22:42 405  INFO (org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown:60) - 
Pushing operators to table_demo
Pushed Filters: IsNotNull(dt), IsNotNull(hour), EqualTo(dt,20231212), EqualTo(hour,10)
Post-Scan Filters: === Applying Rule org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown ===InsertIntoHadoopFsRelationCommand xxx, false, Parquet, [path=xxx], Overwrite, [user_id, dt, hour]                       InsertIntoHadoopFsRelationCommand xxx, false, Parquet, [path=xxx], Overwrite, [user_id, dt, hour]+- WriteFiles                                                                                                            +- WriteFiles+- Repartition 1, true                                                                                                   +- Repartition 1, true+- GlobalLimit 100                                                                                                       +- GlobalLimit 100+- LocalLimit 100                                                                                                        +- LocalLimit 100
!            +- Filter ((isnotnull(dt#1330) AND isnotnull(hour#1331)) AND ((dt#1330 = 20231212) AND (hour#1331 = 10)))               +- RelationV2[user_id#1307,  dt#1330, hour#1331] table_demo
!               +- RelationV2[user_id#1307,  dt#1330, hour#1331] spark_catalog.ad_dwd.table_demo table_demo                           

可以看到经过了规则转换 所有的过滤条件都下推到了DataSource了,但是具体的下推还得在DataSource进一步处理才能保证真正的下推

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/222330.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

苍穹外卖项目笔记(12)— 数据统计、Excel报表

前言 代码链接: Echo0701/take-out⁤ (github.com) 1 工作台 需求分析和设计 产品原型 工作台是系统运营的数据看板,并提供快捷操作入口,可以有效提高商家的工作效率 接口设计 ① 今日数据接口: ② 订单管理接口&#xff1…

LeetCode Hot100 146.LRU缓存

题目: 请你设计并实现一个满足 LRU (最近最少使用) 缓存 约束的数据结构。 实现 LRUCache 类: LRUCache(int capacity) 以 正整数 作为容量 capacity 初始化 LRU 缓存int get(int key) 如果关键字 key 存在于缓存中,则返回关键字的值&…

计算机组成原理—中央处理器CPU

文章目录 CPU的功能与架构CPU的组成运算器控制器 指令执行过程指令流程指令执行方案 数据通路单总线结构专用通路结构 硬布线控制器设计硬布线执行流程硬布线CU内部怎么设计微操作的组合电路 总结 微程序控制器设计微程序的基本理念微程序的基本结构 微指令设计微程序CU设计 指…

Python文本信息解析:从基础到高级实战‘[pp]]‘[

更多Python学习内容:ipengtao.com 大家好,我是彭涛,今天为大家分享 Python文本信息解析:从基础到高级实战,全文3600字,阅读大约10分钟。 文本处理是Python编程中一项不可或缺的技能,覆盖了广泛的…

Spark 单机搭建实战指南

摘要:本文将详细介绍如何在单台机器上搭建 Spark 分布式计算框架,涵盖环境准备、安装配置、运行测试等多个方面,帮助读者轻松上手 Spark 开发。 一、引言 Apache Spark 是一个开源的分布式计算系统,提供了强大的数据处理和分析能力…

2019年第八届数学建模国际赛小美赛D题安全选举的答案是什么解题全过程文档及程序

2019年第八届数学建模国际赛小美赛 D题 安全选举的答案是什么 原题再现: 随着美国进入一场关键性的选举,在确保投票系统的完整性方面进展甚微。2016年总统大选期间,唐纳德特朗普因被指控受到外国干涉而入主白宫,这一问题再次成为…

资产侦查灯塔系统ARL部署

在docker和docker-compose都安装好的前提下进行部署 随便创建一个目录 mkdir docker_arl 切换到该目录 cd docker_arl 下面步骤是安装pip,如果已安装可以直接跳到wget命令下载灯塔系统文件 (但是我不确定pip版本是否有影响,你也可以将命…

GAN的原理分析与实例

为了便于理解,可以先玩一玩这个网站:GAN Lab: Play with Generative Adversarial Networks in Your Browser! GAN的本质:枯叶蝶和鸟。生成器的目标:让枯叶蝶进化,变得像枯叶,不被鸟准确识别。判别器的目标&…

Java中的链表

文章目录 前言一、链表的概念及结构二、单向不带头非循坏链表的实现2.1打印链表2.2求链表的长度2.3头插法2.4尾插法2.5任意位置插入2.6查找是否包含某个元素的节点2.7删除第一次出现这个元素的节点2.8删除包含这个元素的所以节点2.9清空链表单向链表的测试 三、双向不带头非循坏…

【Python】人工智能-机器学习——不调库手撕深度网络分类问题

1. 作业内容描述 1.1 背景 数据集大小150该数据有4个属性,分别如下 Sepal.Length:花萼长度(cm)Sepal.Width:花萼宽度单位(cm)Petal.Length:花瓣长度(cm)Petal.Width:花瓣宽度(cm)category:类别&#xff0…

【STM32】STM32学习笔记-GPIO输入(07)

00. 目录 文章目录 00. 目录01. 按键简介02. 传感器模块简介03. 光敏电阻传感器04. 按键电路图05. C语言数据类型06. C语言宏定义07. C语言typedef08. C语言结构体09. C语言枚举10. 附录 01. 按键简介 按键:常见的输入设备,按下导通,松手断开…

量子技术将如何重构我们的生活

薛定谔的猫与量子世界的奥秘 在量子世界中,“薛定谔的猫”成为一个令人费解而神秘的概念,描述了生死叠加的状态。你能想通吗?想不通很正常,因为这是量子世界,是物理学最前沿的研究领域。在19世纪末,经典物理…

TCP/IP详解——ARP 协议

文章目录 一、ARP 协议1. ARP 数据包格式2. ARP 工作过程3. ARP 缓存4. ARP 请求5. ARP 响应6. ARP 代理7. ARP 探测IP冲突8. ARP 协议抓包分析9. ARP 断网攻击10. 总结 一、ARP 协议 ARP(Address Resolution Protocol)协议工作在网络层和数据链路层之间…

ImportError: cannot import name ‘BaseQuery‘ from ‘flask_sqlalchemy‘

ImportError: cannot import name ‘BaseQuery’ from flask_sqlalchemy’报错 原-报错的导入 from flask_sqlalchemy import SQLAlchemy as BaseQuery现-成功的导入 from flask_sqlalchemy.query import Query as BaseQuery

CCF编程能力等级认证GESP—C++2级—20230923

CCF编程能力等级认证GESP—C2级—20230923 单选题(每题 2 分,共 30 分)判断题(每题 2 分,共 20 分)编程题 (每题 25 分,共 50 分)⼩杨的 X 字矩阵数字⿊洞 答案及解析单选题判断题编程题1编程题…

使用Docker本地安装部署Draw.io绘图工具并实现远程访问协作办公

前言 提到流程图,大家第一时间可能会想到Visio,不可否认,VIsio确实是功能强大,但是软件为收费,并且因为其功能强大,导致安装需要很多的系统内存,并且是不可跨平台使用。所以,今天给…

单元测试二(理论)-云计算2023.12-云南农业大学

文章目录 一、单选题1、三次握手、四次挥手发生在网络模型的哪一层上?2、互联网Internet的拓扑结构是什么?3、以下哪一种网络设备是工作在网络层的?4、以下哪种关于分组交换网络的说法是错误的?5、以下哪种协议是在TCP/IP模型中的…

电脑开机出现:CLIENT MAD ADDR (网卡启动系统)的解决办法

文章目录 前言步骤1、确定情况2、对症下药——关闭网卡启动 补充1、关于BIOS2、关于PXE 前言 最近给旧电脑重装系统安了下开发环境和常用软件啥的,之前还好好启动的电脑,开机突然需要额外加载一个页面,虽然最后正常启动了不影响使用&#xf…

linux 内核同步互斥技术之自旋锁

自旋锁 自旋锁用于处理器之间的互斥,适合保护很短的临界区,并且不允许在临界区睡眠。申请自旋锁的时候,如果自旋锁被其他处理器占有,本处理器自旋等待(也称为忙等待)。 进程、软中断和硬中断都可以使用自旋…

Qt5 CMake环境配置

Qt5 CMake环境配置 设置Qt路径 有两种方法 Qt5_DIR,使用这个变量,必须把路径设置到Qt5Config.cmake所在文件夹,也就是安装目录下的lib/cmake/Qt5CMAKE_PREFIX_PATH,只需要设置到安装目录就可以了,这个目录就是bin、…