spark3 sql优化:同一个表关联多次,优化方案

目录

  • 1.合并查询
  • 2.使用 JOIN 条件的过滤优化
  • 3.使用 Map-side Join 或 Broadcast Join
  • 4.使用 Partitioning 和 Bucketing
  • 5.利用 DataFrame API 进行优化
  • 假设 A 和 B 已经加载为 DataFrame
  • Perform left joins with specific conditions
  • 6.使用缓存或持久化
  • 7.避免笛卡尔积
  • 总结

1.合并查询

如果在 SQL 中的多个 JOIN 操作是针对同一个表,只是条件不同,可以考虑将条件合并成一个查询,从而减少对同一表的多次扫描。例如,将多个 LEFT JOIN 转换成一个 JOIN,使用 CASE 或 FILTER 直接处理不同的关联条件。

优化前:

SELECT A.id, A.col1, A.col2, B1.col3 AS B1_col3, B2.col3 AS B2_col3
FROM A
LEFT JOIN B AS B1 ON A.id = B1.id AND A.col1 = B1.col4
LEFT JOIN B AS B2 ON A.id = B2.id AND A.col2 = B2.col4

优化后:

SELECT A.id, A.col1, A.col2, MAX(CASE WHEN A.col1 = B.col4 THEN B.col3 END) AS B1_col3,MAX(CASE WHEN A.col2 = B.col4 THEN B.col3 END) AS B2_col3
FROM A
LEFT JOIN B ON A.id = B.id
GROUP BY A.id, A.col1, A.col2

2.使用 JOIN 条件的过滤优化

通过精简 JOIN 条件,尽量减少连接的行数。例如,如果 B 表中有索引列,可以直接根据索引列做筛选,而不依赖复杂的条件。

假设对 B 表进行的连接条件中,有部分条件可以通过过滤的方式提前应用,比如通过 WHERE 子句或者 JOIN 之前的 FILTER。

SELECT A.id, A.col1, A.col2, B1.col3 AS B1_col3, B2.col3 AS B2_col3
FROM A
LEFT JOIN B AS B1 ON A.id = B1.id AND A.col1 = B1.col4
LEFT JOIN B AS B2 ON A.id = B2.id AND A.col2 = B2.col4
WHERE B1.col3 IS NOT NULL OR B2.col3 IS NOT NULL

3.使用 Map-side Join 或 Broadcast Join

在 Spark SQL 中,当其中一个表(比如 A)较小且能完全加载到内存时,Spark 会自动选择广播连接,即将小表广播到所有工作节点进行连接计算,而不是进行全表扫描。

如果你知道某个表的规模较小(例如 A),可以手动启用广播连接,减少 shuffle 的开销。

SELECT /*+ BROADCAST(A) */A.id, A.col1, A.col2, B1.col3 AS B1_col3, B2.col3 AS B2_col3
FROM A
LEFT JOIN B AS B1 ON A.id = B1.id AND A.col1 = B1.col4
LEFT JOIN B AS B2 ON A.id = B2.id AND A.col2 = B2.col4

在这里,通过 /*+ BROADCAST(A) */ 强制 Spark 将 A 表广播到各个执行节点,从而避免了对大表 B 进行多次 shuffle。

广播条件:
A 表要相对较小,可以完全加载到内存中。
B 表较大,且 A 表的行数远小于 B。

4.使用 Partitioning 和 Bucketing

在分布式环境下,通过合理的分区和分桶设计,可以减少 JOIN 时的 shuffle 开销。尤其是对于大表,可以考虑对 A 或 B 表做分区(PARTITION BY)或分桶(BUCKET BY)。
– 对 B 表进行分桶(根据 id 或其他相关字段)

CREATE TABLE B (id INT,col3 STRING,col4 STRING
)
USING parquet
CLUSTERED BY (id) INTO 10 BUCKETS;

通过将表按某个字段进行分桶,Spark 在进行连接时能够减少数据的移动和重新分配。

5.利用 DataFrame API 进行优化

如果 SQL 性能不够高,可以尝试将查询转为 DataFrame API 编写,Spark DataFrame API 可能在某些复杂的连接和查询场景下更加高效。

假设 A 和 B 已经加载为 DataFrame

from pyspark.sql import functions as F

Perform left joins with specific conditions

df_A = spark.table("A")
df_B = spark.table("B")df_B1 = df_B.filter(df_B.col4.isNotNull()).select("id", "col3")
df_B2 = df_B.filter(df_B.col4.isNotNull()).select("id", "col3")df_result = df_A.join(df_B1, (df_A.id == df_B1.id) & (df_A.col1 == df_B1.col4), "left") \.join(df_B2, (df_A.id == df_B2.id) & (df_A.col2 == df_B2.col4), "left") \.select(df_A.id, df_A.col1, df_A.col2, df_B1.col3.alias("B1_col3"), df_B2.col3.alias("B2_col3"))df_result.show()

DataFrame API 可以对复杂的 JOIN 和条件执行更多优化,比如延迟执行和缓存策略。

6.使用缓存或持久化

如果你在多次查询中重复使用某些中间结果(例如对 B 表的过滤结果或计算结果),可以选择缓存或持久化某些 DataFrame。

df_B1_cached = df_B1.cache()
df_B2_cached = df_B2.cache()df_result = df_A.join(df_B1_cached, (df_A.id == df_B1_cached.id) & (df_A.col1 == df_B1_cached.col4), "left") \.join(df_B2_cached, (df_A.id == df_B2_cached.id) & (df_A.col2 == df_B2_cached.col4), "left")

缓存对于反复使用的子查询可以减少重新计算的开销。

7.避免笛卡尔积

笛卡尔积会导致非常高的计算开销和内存占用,因此在 JOIN 时需要确保条件足够明确,避免无条件的多表连接。你可以使用 EXPLAIN 来分析查询计划,检查是否出现了笛卡尔积。
查询计划中的 CartesianProduct 或 CROSS JOIN

EXPLAIN SELECT A.id, A.col1, A.col2, B.col3 FROM A JOIN B ON A.id = B.id

Spark SQL / Hive 中,查询计划可能会显示 CartesianProduct 或类似的描述,指明两张表间进行了笛卡尔积连接。

== Physical Plan ==
CartesianProduct(0)

PostgreSQL、MySQL 等关系型数据库,通常会标明连接类型。如果执行计划中显示了 CROSS JOIN,则明确表示笛卡尔积。

->  Seq Scan on table_a  (cost=0.00..10.00 rows=100 width=20)
->  Seq Scan on table_b  (cost=0.00..10.00 rows=100 width=20)
->  Hash Join  (cost=200.00..220.00 rows=1000 width=100)

如果这里显示了 CROSS JOIN,就意味着没有任何连接条件,导致笛卡尔积的生成。

通过查看执行计划(EXPLAIN)了解是否存在不必要的全表扫描。

总结

合并查询: 用 CASE WHEN 合并多个 JOIN。
简化 JOIN 条件: 提前通过 WHERE 子句过滤无效数据。
广播连接: 对小表使用 BROADCAST,减少 shuffle 开销。
分区和分桶: 对大表进行分区或分桶优化 JOIN 性能。
使用 DataFrame API: 在某些复杂查询中,DataFrame API 性能更优。
缓存数据: 重复使用的数据可以进行缓存或持久化。
避免笛卡尔积: 确保 JOIN 有明确的条件,避免全表扫描。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/889263.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

宽字节注入

尽管现在呼吁所有的程序都使用unicode编码,所有的网站都使用utf-8编码,来一个统一的国际规范。但仍然有很多,包括国内及国外(特别是非英语国家)的一些cms,仍然使用着自己国家的一套编码,比如gbk…

Web APIsPIs第1章

WebApi阶段学习什么? WebApi是浏览器提供的一组接口 使用 JavaScript 去操作页面文档 和 浏览器 什么是 API API: 应用程序接口(Application Programming Interface) 接口:本质上就是各种函数,无需关心内部如何实现…

android——录制屏幕

录制屏幕 1、界面 2、核心代码 import android.app.NotificationChannel import android.app.NotificationManager import android.app.PendingIntent import android.app.Service import android.content.Context import android.content.Intent import android.graphics.Bi…

【Excel学习记录】01-认识Excel

1.之前的优秀软件Lotus-1-2-3 默认公式以等号开头 兼容Lotus-1-2-3的公式写法,不用写等号 : 文件→选项→高级→勾选:“转换Lotus-1-2-3公式(U)” 备注:对于大范围手动输入公式可以使用该选项,否则请不要勾选&#x…

短视频矩阵抖音SEO源码OEM独立部署

短视频优化矩阵源码涉及对抖音平台上的视频内容进行筛选与排序,目的是增强其在搜索引擎中的可见度,以便更多用户能够浏览到这些视频。而抖音SEO优化系统则是通过构建一个分析框架,来解析抖音上的用户数据、视频信息及标签等元素,并…

MySQL——buffer poll

为什么要有buffer poll? 如果没有buffer poll,每次读取数据的时候都是从磁盘上读的,这样效率是很差的的。 所以有了提高效率的方式,就加上了一个缓存——buffer poll 所以,当我们读取数据的时候就有以下的方式 当读…

生产慎用之调试日志对空间矢量数据批量插入的性能影响-以MybatisPlus为例

目录 前言 一、一些缘由 1、性能分析 二、插入方式调整 1、批量插入的实现 2、MP的批量插入实现 3、日志的配置 三、默认处理方式 1、基础程序代码 2、执行情况 四、提升调试日志等级 1、在logback中进行设置 2、提升后的效果 五、总结 前言 在现代软件开发中,性能优…

元宇宙时代的社交平台:Facebook的愿景与实践

随着科技的不断进步,元宇宙(Metaverse)这一概念逐渐走进了人们的视野。作为全球最大的社交平台之一,Facebook(现Meta)在这场元宇宙革命中扮演着重要角色。Meta不仅在不断扩展其社交平台的边界,还…

C# 小案例(IT资产管理系统)

开发工具:visual studio 2022 语言:C# 数据库:Sql Server 2008 页面展示 一、登录 二、主窗体 三、用户管理 四、资产管理 五、关于 Java版地址:基于若依开发物品管理系统(springbootvue)_若依物品管理系统-CSDN博客 Python版…

分布式日志系统设计

一、分布式日志系统定义 分布式日志系统是一种用于收集、存储和分析大规模分布式系统日志的系统。它可以帮助开发人员和系统管理员实时监控和调试系统,提高系统可靠性和可用性,同时也可以用于日志分析和故障排查。 二、简单设计思路 日志收集&#xff…

敏捷开发04:Scrum 中的 Product Backlog(产品待办列表) 详细介绍

Product Backlog 产品待办列表 在计划开发产品功能时,都希望产品功能上线后,用户能够喜欢并经常使用。 因此在开发产品新功能时,就要衡量哪些产品需求是对用户最有价值,这是最应该思考的问题。 然后把这些有价值的需求集合放在一…

vmware vsphere5---部署vCSA(VMware vCenter Server)附带第二阶段安装报错解决方案

声明 因为这份文档我是边做边写的,遇到问题重新装了好几次所以IP会很乱 ESXI主机为192.168.20.10 VCSA为192.168.20.7,后台为192.168.20.7:5480 后期请自行对应,后面的192.168.20.57请对应192.168.20.7,或根据自己的来 第一阶段…

110.【C语言】编写命令行程序(1)

目录 1.前置知识 "命令"的含义 运行C语言程序 2.介绍 main函数的参数 实验1 执行结果 实验2 执行结果 修改代码 实验3 分析 方法:遍历数组argv[]中的所有参数 执行结果 修改代码 执行结果 1.前置知识 "命令"的含义 WINR输入cmd,在cmd窗口下…

Leecode刷题C语言之半有序排列

执行结果:通过 执行用时和内存消耗如下&#xff1a; 代码如下&#xff1a; int semiOrderedPermutation(int* nums, int numsSize) {int first 0, last 0;for (int i 0; i < numsSize; i) {if (nums[i] 1) {first i;}if (nums[i] numsSize) {last i;}}return firs…

RPC设计--从reactor设计 (IOthread)

主从reactor架构 一般的一个网络IO库都是主从reactor模式&#xff0c;即主线程中有一个MainReactor&#xff0c;其负责监听ListenFd&#xff0c;当接受到新的用户连接时&#xff0c;返回的clientfd并不会加入的MainReacotr&#xff0c;而是在子线程&#xff08;这里称为IO线程&…

Scala中求斐波那契数列的第n项

求斐波那契数列的第n项 问题&#xff1a;求 斐波那契数列的第n项 记&#xff1a; 0 1 1 2 3 5 8 13 21 34 55 ... 从第3项开始 f(n) f(n-1) f(n-2) 1.基本情况&#xff08;直接能求的&#xff09;&#xff1a;f(0) 0,f(1) 1 2.递归情况&#xff08;大事化小&#xff0c;自己…

【Golang】Go语言编程思想(六):Channel,第六节,并发编程模式

并发模式 下例重新对 channel 的用法进行回顾&#xff1a; package mainimport ("fmt""math/rand""time" )func msgGen(name string) chan string {c : make(chan string)go func(name string) { // 在这个 goroutine 当中向外发送数据i : 0fo…

重生之我在异世界学编程之C语言:深入结构体篇(上)

大家好&#xff0c;这里是小编的博客频道 小编的博客&#xff1a;就爱学编程 很高兴在CSDN这个大家庭与大家相识&#xff0c;希望能在这里与大家共同进步&#xff0c;共同收获更好的自己&#xff01;&#xff01;&#xff01; 本文目录 引言正文《1》 结构体的两种声明一、结构…

Scala递归中求汉罗塔游戏的步骤

记&#xff1a;f(n,"A","B","C")表示n个盘子从A柱子上移动到C柱子上&#xff0c;借用B柱子的过程 f(要移动的盘子的个数&#xff0c;起点&#xff0c;辅助柱子&#xff0c;终点) 1.基本情况(直接能求的)&#xff1a;f(1,"A","B&…

输入url到显示主页的详细过程

从浏览器地址输入url到显示主页的过程&#xff1f; 主要分为&#xff1a;DNS解析&#xff0c;TCP连接&#xff0c;发送HTTP请求&#xff0c;服务器处理请求&#xff0c;浏览器接收HTTP响应&#xff0c;断开连接 DNS解析&#xff1a; 浏览器发起一个DNS请求到DNS服务器&#…