Spark Join优化案例:Join Key 远大于 Payload

在一个案例中,大表 100GB、小表 10GB,它们全都远超广播变量阈值(默认 10MB)。因为小表的尺寸已经超过 8GB,在大于 8GB 的数据集上创建广播变量,Spark 会直接抛出异常,中断任务执行,所以 Spark 是没有办法应用 BHJ 机制的。那我们该怎么办呢?
先别急,我们来看看这个案例的业务需求。这个案例来源于计算广告业务中的流量预测,流量指的是系统中一段时间内不同类型用户的访问量。这里有三个关键词,第一个是“系统”,第二个是“一段时间”,第三个是“用户类型”。时间粒度好理解,就是以小时为单位去统计流量。用户类型指的是采用不同的维度来刻画用户,比如性别、年龄、教育程度、职业、地理位置。系统指的是流量来源,比如平台、站点、频道、媒体域名。在系统和用户的维度组合之下,流量被细分为数以百万计的不同“种类”。比如,来自 XX 平台 XX 站点的在校大学生的访问量,或是来自 XX 媒体 XX 频道 25-45 岁女性的访问量等等。
我们知道,流量预测本身是个时序问题,它和股价预测类似,都是基于历史、去预测未来。在我们的案例中,为了预测上百万种不同的流量,咱们得先为每种流量生成时序序列,然后再把这些时序序列喂给机器学习算法进行模型训练。统计流量的数据源是线上的访问日志,它记录了哪类用户在什么时间访问了哪些站点。要知道,我们要构建的,是以小时为单位的时序序列,但由于流量的切割粒度非常细致,因此有些种类的流量不是每个小时都有访问量的,如下图所示。
在这里插入图片描述

某种流量在过去24小时的记录情况我们可以看到,在过去的 24 小时中,某种流量仅在 20-24 点这 5 个时段有数据记录,其他时段无记录,也就是流量为零。在这种情况下,我们就需要用“零”去补齐缺失时段的序列值。那么我们该怎么补呢?因为业务需求是填补缺失值,所以在实现层面,我们不妨先构建出完整的全零序列,然后以系统、用户和时间这些维度组合为粒度,用统计流量去替换全零序列中相应位置的“零流量”。这个思路描述起来比较复杂,用图来理解会更直观、更轻松一些。
在这里插入图片描述

首先,我们生成一张全零流量表,如图中左侧的“负样本表”所示。这张表的主键是划分流量种类的各种维度,如性别、年龄、平台、站点、小时时段等等。表的 Payload 只有一列,也即访问量,在生成“负样本表”的时候,这一列全部置零。
然后,我们以同样的维度组合统计日志中的访问量,就可以得到图中右侧的“正样本表”。不难发现,两张表的 Schema 完全一致,要想获得完整的时序序列,我们只需要把外表以“左连接(Left Outer Join)”的形式和内表做关联就好了。具体的查询语句如下:

//左连接查询语句
select t1.gender, t1.age, t1.city, t1.platform, t1.site, t1.hour, coalesce(t2.access, t1.access) as access
from t1 left join t2 on
t1.gender = t2.gender and
t1.age = t2.age and
t1.city = t2.city and
t1.platform = t2.platform and
t1.site = t2.site and
t1.hour = t2.hour

使用左连接的方式,我们刚好可以用内表中的访问量替换外表中的零流量,两表关联的结果正是我们想要的时序序列。“正样本表”来自访问日志,只包含那些存在流量的时段,而“负样本表”是生成表,它包含了所有的时段。因此,在数据体量上,负样本表远大于正样本表,这是一个典型的“大表 Join 小表”场景。
尽管小表(10GB)与大表(100GB)相比,在尺寸上相差一个数量级,但两者的体量都不满足 BHJ 的先决条件。因此,Spark 只好退而求其次,选择 SMJ(Shuffle Sort Merge Join)的实现方式。我们知道,SMJ 机制会引入 Shuffle,将上百 GB 的数据在全网分发可不是一个明智的选择。那么,根据“能省则省”的开发原则,我们有没有可能“省去”这里的 Shuffle 呢?
要想省去 Shuffle,我们只有一个办法,就是把 SMJ 转化成 BHJ。你可能会说:“都说了好几遍了,小表的尺寸 10GB 远超广播阈值,我们还能怎么转化呢?”办法总比困难多,我们先来反思,关联这两张表的目的是什么?目的是以维度组合(Join Keys)为基准,用内表的访问量替换掉外表的零值。那么,这两张表有哪些特点呢?首先,两张表的 Schema 完全一致。其次,无论是在数量、还是尺寸上,两张表的 Join Keys 都远大于 Payload。
那么问题来了,要达到我们的目的,一定要用那么多、那么长的 Join Keys 做关联吗?答案是否定的。在上一讲,我们介绍过 Hash Join 的实现原理,在 Build 阶段,Hash Join 使用哈希算法生成哈希表。在 Probe 阶段,哈希表一方面可以提供 O(1) 的查找效率,另一方面,在查找过程中,Hash Keys 之间的对比远比 Join Keys 之间的对比要高效得多。受此启发,我们为什么不能计算 Join Keys 的哈希值,然后把生成的哈希值当作是新的 Join Key 呢?
在这里插入图片描述
我们完全可以基于现有的 Join Keys 去生成一个全新的数据列,它可以叫“Hash Key”。生成的方法分两步:把所有 Join Keys 拼接在一起,把性别、年龄、一直到小时拼接成一个字符串,如图中步骤 1、3 所示使用哈希算法(如 MD5 或 SHA256)对拼接后的字符串做哈希运算,得到哈希值即为“Hash Key”,如上图步骤 2、4 所示在两张表上,我们都进行这样的操作。
如此一来,在做左连接的时候,为了把主键一致的记录关联在一起,我们不必再使用数量众多、冗长的原始 Join Keys,用单一的生成列 Hash Key 就可以了。相应地,SQL 查询语句也变成了如下的样子。

//调整后的左连接查询语句
select t1.gender, t1.age, t1.city, t1.platform, t1.site, t1.hour, coalesce(t2.access, t1.access) as access
from t1 left join t2 on
t1.hash_key = t2. hash_key

添加了这一列之后,我们就可以把内表,也就是“正样本表”中所有的 Join Keys 清除掉,大幅缩减内表的存储空间,上图中的步骤 5 演示了这个过程。当内表缩减到足以放进广播变量的时候,我们就可以把 SMJ 转化为 BHJ,从而把 SMJ 中的 Shuffle 环节彻底省掉。这样一来,清除掉 Join Keys 的内表的存储大小就变成了 1.5GB。对于这样的存储量级,我们完全可以使用配置项或是强制广播的方式,来完成 Shuffle Join 到 Broadcast Join 的转化,具体的转化方法你可以参考广播变量那一讲(第 13 讲)。
案例 1 说到这里,其实已经基本解决了,不过这里还有一个小细节需要我们特别注意。案例 1 优化的关键在于,先用 Hash Key 取代 Join Keys,再清除内表冗余数据。Hash Key 实际上是 Join Keys 拼接之后的哈希值。既然存在哈希运算,我们就必须要考虑哈希冲突的问题。
哈希冲突我们都很熟悉,它指的就是不同的数据源经过哈希运算之后,得到的哈希值相同。在案例 1 当中,如果我们为了优化引入的哈希计算出现了哈希冲突,就会破坏原有的关联关系。比如,本来两个不相等的 Join Keys,因为哈希值恰巧相同而被关联到了一起。显然,这不是我们想要的结果。消除哈希冲突隐患的方法其实很多,比如“二次哈希”,也就是我们用两种哈希算法来生成 Hash Key 数据列。两条不同的数据记录在两种不同哈希算法运算下的结果完全相同,这种概率几乎为零。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/37407.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言 求 n 个数的阶乘之和

求n个数的阶乘之和&#xff08;即求1&#xff01;2&#xff01;3&#xff01;…n!&#xff09; 这个程序读取用户输入的正整数 n&#xff0c;计算并输出 1! 2! 3! ... n! 的值。 #include <stdio.h>// 计算阶乘的函数 long factorial(int num) {long result 1;for…

恢复 IntelliJ IDEA 中消失的菜单栏

要恢复 IntelliJ IDEA 中消失的菜单栏&#xff0c;可以按照以下简单步骤操作&#xff1a; 使用快捷键打开搜索&#xff1a;首先&#xff0c;双击 Shift 键打开全局搜索对话框。 搜索“Menu”&#xff1a;在搜索框中输入 menu&#xff0c;然后从搜索结果中选择与“Main Menu”相…

python-基础篇-选择-是什么

文章目录 定义一&#xff1a;Python 条件语句跟其他语言基本一致的&#xff0c;都是通过一条或多条语句的执行结果&#xff08; True 或者 False &#xff09;来决定执行的代码块。1、什么是条件语句2、if 语句的基本形式3、if 语句多个判断条件的形式4、if 语句多个条件同时判…

次序统计量

内容来源 概率论与数理统计教程&#xff08;第三版&#xff09; 茆诗松 高等教育出版社 数理统计学导论&#xff08;原书第7版&#xff09; 机械工业出版社 定义 设 X 1 , X 2 , ⋯ , X n X_1,X_2,\cdots,X_n X1​,X2​,⋯,Xn​ 是来自连续分布的随机样本 此分布具有 p d f…

【机器学习】Python reversed 函数

目录&#xff1a; reversed()函数初探应用于列表和元组实战演练&#xff1a;山海经故事文本处理 Python中的内置函数——reversed()。 这个函数能够帮助你高效地处理序列类型数据&#xff0c;比如列表、元组、字符串等&#xff0c;通过它你可以轻松地反转这些序列中的元素顺…

JSON 简述与应用

1. JSON 简述 JSON&#xff08;JavaScript Object Notation&#xff09;是一种轻量级的数据交换格式&#xff0c;常用于客户端与服务器之间的数据传递。它基于JavaScript对象表示法&#xff0c;但独立于语言&#xff0c;可以被多种编程语言解析和生成。 1.1 特点 轻量级&#…

JS对数据类型的检测方式

1. typeof()对于基本数据类型没问题&#xff0c;遇到引用数据类型就不管用 console.log( typeof 666 ); // number console.log( typeof [1,2,3] ); // object 2. instanceof()只能判断引用数据类型&#xff0c;不能判断基本数据类型 console.log( [] instanceof Array ) // tr…

Unity--协程--Coroutine

Unity–协程–Coroutine 1. 协程的基本概念 基本概念:不是线程,将代码按照划分的时间来执行,这个时间可以是具体的多少秒,也可以是物理帧的时间,也可以是一帧的绘制结束的时间。 协程的写法&#xff1a;通过返回IEnumerator的函数实现&#xff0c;使用yield return语句暂停执…

Golang | Leetcode Golang题解之第205题同构字符串

题目&#xff1a; 题解&#xff1a; func isIsomorphic(s, t string) bool {s2t : map[byte]byte{}t2s : map[byte]byte{}for i : range s {x, y : s[i], t[i]if s2t[x] > 0 && s2t[x] ! y || t2s[y] > 0 && t2s[y] ! x {return false}s2t[x] yt2s[y] …

python 查找轮廓

在Python中&#xff0c;查找图像的轮廓通常使用OpenCV库。以下是一个简单的示例代码&#xff0c;展示了如何使用OpenCV来查找并绘制图像的轮廓&#xff1a; pythonimport cv2 import numpy as np# 读取图像 image cv2.imread(your_image.jpg, 0) # 请将your_image.jpg替换为您…

设备树下的 platform 驱动编写

设备树下的 platform 驱动编写 设备树下的 platform 驱动简介 platform 驱动框架分为总线、设备和驱动&#xff0c;其中总线不需要我们这些驱动程序员去管理&#xff0c;这个是 Linux 内核提供的&#xff0c;我们在编写驱动的时候只要关注于设备和驱动的具体实现即可。在没有…

《昇思25天学习打卡营第6天 | 函数式自动微分》

《昇思25天学习打卡营第6天 | 函数式自动微分》 目录 《昇思25天学习打卡营第6天 | 函数式自动微分》函数式自动微分简单的单层线性变换模型函数与计算图微分函数与梯度计算Stop Gradient 函数式自动微分 神经网络的训练主要使用反向传播算法&#xff0c;模型预测值&#xff0…

建站小记:迁移域名DNS到CloudFlare

CloudFlare一直有赛博菩萨之称&#xff0c;据说用它做DNS解析服务又快又好又免费&#xff0c;还能防DDOS攻击&#xff0c;并且可以提供页面访问统计功能。 正好我博客网页打开略卡顿&#xff0c;所以决定将自己的DNS解析迁移到CloudFlare。 1.登录CF控制台&#xff0c;添加自己…

LeetCode-刷题记录-二分法合集(本篇blog会持续更新哦~)

一、二分查找概述 二分查找&#xff08;Binary Search&#xff09;是一种高效的查找算法&#xff0c;适用于有序数组或列表。&#xff08;但其实只要满足二段性&#xff0c;就可以使用二分法&#xff0c;本篇博客后面博主会持续更新一些题&#xff0c;来破除一下人们对“只有有…

(已解决)Adobe Flash Player已不再受支持

文章目录 前言解决方案 前言 一般来说&#xff0c;很少遇到官方网站使用Adobe Flash Player来进行录用名单公示了。但是&#xff0c;今天就偏偏遇到一次&#xff0c; 用谷歌浏览器打不开&#xff0c; 点了没有反应&#xff0c;用其他的浏览器&#xff0c;例如windows自带的那…

Golang | Leetcode Golang题解之第207题课程表

题目&#xff1a; 题解&#xff1a; func canFinish(numCourses int, prerequisites [][]int) bool {var (edges make([][]int, numCourses)indeg make([]int, numCourses)result []int)for _, info : range prerequisites {edges[info[1]] append(edges[info[1]], info[0]…

数据结构:期末考 第六次测试(总复习)

一、 单选题 &#xff08;共50题&#xff0c;100分&#xff09; 1、表长为n的顺序存储的线性表&#xff0c;当在任何位置上插入或删除一个元素的概率相等时&#xff0c;插入一个元素所需移动元素的平均个数为&#xff08; D &#xff09;.&#xff08;2.0&#xff09; A、 &am…

在node环境使用MySQL

什么是Sequelize? Sequelize是一个基于Promise的NodeJS ORM模块 什么是ORM? ORM(Object-Relational-Mapping)是对象关系映射 对象关系映射可以把JS中的类和对象&#xff0c;和数据库中的表和数据进行关系映射。映射之后我们就可以直接通过类和对象来操作数据表和数据了, 就…

join()方法——连接字符串、元组、列表和字典

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 语法参考 join()方法用于连接字符串数组。将字符串、元组、列表中的元素以指定的字符&#xff08;分隔符&#xff09;连接生成一个新的字符串&#…

喜报 | 极限科技获得北京市“创新型”中小企业资格认证

2024年6月20日&#xff0c;北京市经济和信息化局正式发布《关于对2024年度4月份北京市创新型中小企业名单进行公告的通知》&#xff0c;极限数据&#xff08;北京&#xff09;科技有限公司凭借其出色的创新能力和卓越的企业实力&#xff0c;成功获得“北京市创新型中小企业”的…