【大数据】Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function

Flink SQL 语法篇》系列,共包含以下 10 篇文章:

  • Flink SQL 语法篇(一):CREATE
  • Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 语法篇(四):Group 聚合、Over 聚合
  • Flink SQL 语法篇(五):Regular Join、Interval Join
  • Flink SQL 语法篇(六):Temporal Join
  • Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 语法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 语法篇(九):Window TopN、Deduplication
  • Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function

  • 1.Lookup Join(维表 Join)
  • 2.Array Expansion(数组列转行)
  • 3.Table Function(自定义列转行)

1.Lookup Join(维表 Join)

Lookup Join 定义(支持 Batch / Streaming):Lookup Join 其实就是维表 Join,比如拿离线数仓来说,常常会有用户画像,设备画像等数据,而对应到实时数仓场景中,这种实时获取外部缓存的 Join 就叫做维表 Join。

应用场景:小伙伴萌会问,我们既然已经有了上面介绍的 Regular Join,Interval Join 等,为啥还需要一种 Lookup Join?因为上面说的这几种 Join 都是 流与流之间的 Join,而 Lookup Join 是流与 Redis,MySQL,HBase 这种存储介质的 Join。Lookup 的意思就是实时查找,而实时的画像数据一般都是存储在 Redis,MySQL,HBase 中,这就是 Lookup Join 的由来。

实际案例:使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的维度之后,提供给下游计算分性别,年龄段的曝光用户数使用。

  • 曝光用户日志流(show_log)数据(数据存储在 Kafka 中)
log_id  timestamp            user_id
1       2021-11-01 00:01:03  a
2       2021-11-01 00:03:00  b
3       2021-11-01 00:05:00  c
4       2021-11-01 00:06:00  b
5       2021-11-01 00:07:00  c
  • 用户画像维表(user_profile)数据(数据存储在 Redis 中)
user_id(主键)   age     sex
a               12-18   男
b               18-24   女
c               18-24   男

注意:Redis 中的数据结构存储是按照 Key-Value 去存储的。其中 Key 为 user_id,Value 为 agesex 的 JSON。

具体 SQL:

CREATE TABLE show_log (log_id BIGINT,`timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),user_id STRING,proctime AS PROCTIME()
)
WITH ('connector' = 'datagen','rows-per-second' = '10','fields.user_id.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE user_profile (user_id STRING,age STRING,sex STRING) WITH ('connector' = 'redis','hostname' = '127.0.0.1','port' = '6379','format' = 'json','lookup.cache.max-rows' = '500','lookup.cache.ttl' = '3600','lookup.max-retries' = '1'
);CREATE TABLE sink_table (log_id BIGINT,`timestamp` TIMESTAMP(3),user_id STRING,proctime TIMESTAMP(3),age STRING,sex STRING
) WITH ('connector' = 'print'
);-- lookup join 的 query 逻辑
INSERT INTO sink_table
SELECT s.log_id as log_id, s.`timestamp` as `timestamp`, s.user_id as user_id, s.proctime as proctime, u.sex as sex, u.age as age
FROM show_log AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id

输出数据如下:

log_id  timestamp            user_id  age     sex
1       2021-11-01 00:01:03  a        12-182       2021-11-01 00:03:00  b        18-243       2021-11-01 00:05:00  c        18-244       2021-11-01 00:06:00  b        18-245       2021-11-01 00:07:00  c        18-24

注意:实时的 Lookup 维表关联能使用 处理时间 去做关联。

  • 同一条数据关联到的维度数据可能不同:实时数仓中常用的实时维表都是在不断的变化中的,当前流表数据关联完维表数据后,如果同一个 key 的维表的数据发生了变化,已关联到的维表的结果数据不会再同步更新。举个例子,维表中 user_id 1 1 1 的数据在 08 : 00 08:00 08:00age12-18 变为了 18-24,那么当我们的任务在 08 : 01 08:01 08:01 failover 之后从 07 : 59 07:59 07:59 开始回溯数据时,原本应该关联到 12-18 的数据会关联到 18-24age 数据。这是有可能会影响数据质量的。所以小伙伴萌在评估你们的实时任务时要考虑到这一点。
  • 会发生实时的新建及更新的维表博主建议小伙伴萌应该建立起数据延迟的监控机制,防止出现流表数据先于维表数据到达,导致关联不到维表数据。

再说说维表常见的性能问题及优化思路。

所有的维表性能问题都可以总结为:高 QPS 下访问维表存储引擎产生的任务背压,数据产出延迟问题

举个例子:

  • 在没有使用维表的情况下:一条数据从输入 Flink 任务到输出 Flink 任务的时延假如为 0.1 m s 0.1\ ms 0.1 ms,那么并行度为 1 1 1 的任务的吞吐可以达到 1 q u e r y / 0.1 m s = 10000 q p s 1\ query\ /\ 0.1\ ms = 10000\ qps 1 query / 0.1 ms=10000 qps
  • 在使用维表之后:每条数据访问维表的外部存储的时长为 2 m s 2\ ms 2 ms,那么一条数据从输入 Flink 任务到输出 Flink 任务的时延就会变成 2.1 m s 2.1\ ms 2.1 ms,那么同样并行度为 1 的任务的吞吐只能达到 1 q u e r y / 2.1 m s = 476 q p s 1\ query\ /\ 2.1\ ms = 476\ qps 1 query / 2.1 ms=476 qps。两者的吞吐量相差 21 21 21 倍。

这就是为什么维表 Join 的算子会产生背压,任务产出会延迟。

那么当然,解决方案也是有很多的。抛开 Flink SQL 想一下,如果我们使用 DataStream API,甚至是在做一个后端应用,需要访问外部存储时,常用的优化方案有哪些?这里列举一下:

  • 1️⃣ 按照 Redis 维表的 key 分桶 + local cache:通过按照 key 分桶的方式,让大多数据的维表关联的数据访问走之前访问过的 local cache 即可。这样就可以把访问外部存储 2.1 m s 2.1\ ms 2.1 ms 处理一个 Query 变为访问内存的 0.1 m s 0.1\ ms 0.1 ms 处理一个 Query 的时长。
  • 2️⃣ 异步访问外存:DataStream API 有异步算子,可以利用线程池去同时多次请求维表外部存储。这样就可以把 2.1 m s 2.1\ ms 2.1 ms 处理 1 1 1 个 Query 变为 2.1 m s 2.1\ ms 2.1 ms 处理 10 10 10 个 Query。吞吐可变优化到 10 q u e r y / 2.1 m s = 4761 q p s 10\ query\ /\ 2.1\ ms = 4761\ qps 10 query / 2.1 ms=4761 qps
  • 3️⃣ 批量访问外存:除了异步访问之外,我们还可以批量访问外部存储。举一个例子:在访问 Redis 维表的 1 1 1 Query 占用 2.1 m s 2.1\ ms 2.1 ms 时长中,其中可能有 2 m s 2\ ms 2 ms 都是在网络请求上面的耗时 ,其中只有 0.1 m s 0.1\ ms 0.1 ms 是 Redis Server 处理请求的时长。那么我们就可以使用 Redis 提供的 pipeline 能力,在客户端(也就是 Flink 任务 lookup join 算子中),攒一批数据,使用 pipeline 去同时访问 Redis Sever。这样就可以把 2.1 m s 2.1\ ms 2.1 ms 处理 1 1 1 个 Query 变为 7 m s = 2 m s + 50 ∗ 0.1 m s 7\ ms=2\ ms + 50 * 0.1\ ms 7 ms=2 ms+500.1 ms 处理 50 50 50 个 Query。吞吐可变为 50 q u e r y / 7 m s = 7143 q p s 50\ query\ /\ 7\ ms = 7143\ qps 50 query / 7 ms=7143 qps

博主认为上述优化效果中,最好用的是 1️⃣ + 3️⃣,2️⃣ 相比 3️⃣ 还是一条一条发请求,性能会差一些。

既然 DataStream 可以这样做,Flink SQL 必须必的也可以借鉴上面的这些优化方案。具体怎么操作呢?看下文骚操作

  • 1️⃣ 按照 Redis 维表的 key 分桶 + local cache:SQL 中如果要做分桶,得先做 group by,但是如果做了 group by 的聚合,就只能在 udafuser defined aggregation function)中做访问 Redis 处理,并且 udaf 产出的结果只能是一条,所以这种实现起来非常复杂。我们选择不做 keyby 分桶。但是我们可以直接使用 local cache 去做本地缓存,虽然【直接缓存】的效果比【先按照 key 分桶再做缓存】的效果差,但是也能一定程度上减少访问 Redis 压力。在博主实现的 Redis Connector 中,内置了 local cache 的实现。
  • 2️⃣ 异步访问外存:目前博主实现的 Redis Connector 不支持异步访问,但是官方实现的 HBase Connector 支持这个功能,参考下面链接文章的,点开之后搜索 lookup.async。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/
  • 3️⃣ 批量访问外存:这玩意官方必然没有实现啊,但是,但是,但是,经过博主周末两天的疯狂 debug,改了改源码,搞定了基于 Redis 的批量访问外存优化的功能。

2.Array Expansion(数组列转行)

应用场景(支持 Batch / Streaming):将表中 ARRAY 类型字段(列)拍平,转为多行。

实际案例:比如某些场景下,日志是合并、攒批上报的,就可以使用这种方式将一个 Array 转为多行。

CREATE TABLE show_log_table (log_id BIGINT,show_params ARRAY<STRING>
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (log_id BIGINT,show_param STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTlog_id,t.show_param as show_param
FROM show_log_table
-- array 炸开语法
CROSS JOIN UNNEST(show_params) AS t (show_param)

show_log_table 原始数据:

+I[7, [a, b, c]]
+I[5, [d, e, f]]

输出结果如下所示:

-- +I[7, [a, b, c]] 一行转为 3 行
+I[7, a]
+I[7, b]
+I[7, b]
-- +I[5, [d, e, f]] 一行转为 3 行
+I[5, d]
+I[5, e]
+I[5, f]

3.Table Function(自定义列转行)

应用场景(支持 Batch / Streaming):这个其实和 Array Expansion 功能类似,但是 Table Function 本质上是个 UDTF 函数,和离线 Hive SQL 一样,我们可以自定义 UDTF 去决定列转行的逻辑。

Table Function 使用分类:

  • Inner Join Table Function:如果 UDTF 返回结果为空,则相当于 1 1 1 行转为 0 0 0 行,这行数据直接被丢弃。
  • Left Join Table Function:如果 UDTF 返回结果为空,折行数据不会被丢弃,只会在结果中填充 null 值。
public class TableFunctionInnerJoin_Test {public static void main(String[] args) throws Exception {FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);String sql = "CREATE FUNCTION user_profile_table_func AS 'flink.examples.sql._07.query._06_joins._06_table_function"+ "._01_inner_join.TableFunctionInnerJoin_Test$UserProfileTableFunction';\n"+ "\n"+ "CREATE TABLE source_table (\n"+ "    user_id BIGINT NOT NULL,\n"+ "    name STRING,\n"+ "    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n"+ "    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n"+ ") WITH (\n"+ "  'connector' = 'datagen',\n"+ "  'rows-per-second' = '10',\n"+ "  'fields.name.length' = '1',\n"+ "  'fields.user_id.min' = '1',\n"+ "  'fields.user_id.max' = '10'\n"+ ");\n"+ "\n"+ "CREATE TABLE sink_table (\n"+ "    user_id BIGINT,\n"+ "    name STRING,\n"+ "    age INT,\n"+ "    row_time TIMESTAMP(3)\n"+ ") WITH (\n"+ "  'connector' = 'print'\n"+ ");\n"+ "\n"+ "INSERT INTO sink_table\n"+ "SELECT user_id,\n"+ "       name,\n"+ "       age,\n"+ "       row_time\n"+ "FROM source_table,\n"// Table Function Join 语法对应 LATERAL TABLE+ "LATERAL TABLE(user_profile_table_func(user_id)) t(age)";Arrays.stream(sql.split(";")).forEach(flinkEnv.streamTEnv()::executeSql);}public static class UserProfileTableFunction extends TableFunction<Integer> {public void eval(long userId) {// 自定义输出逻辑if (userId <= 5) {// 一行转 1 行collect(1);} else {// 一行转 3 行collect(1);collect(2);collect(3);}}}
}

执行结果如下:

-- userId <= 5,则只有 1 行结果
+I[3, 7, 1, 2021-05-01T18:23:42.560]
-- userId > 5,则有行 3 结果
+I[8, e, 1, 2021-05-01T18:23:42.560]
+I[8, e, 2, 2021-05-01T18:23:42.560]
+I[8, e, 3, 2021-05-01T18:23:42.560]
-- userId <= 5,则只有 1 行结果
+I[4, 9, 1, 2021-05-01T18:23:42.561]
-- userId > 5,则有行 3 结果
+I[8, c, 1, 2021-05-01T18:23:42.561]
+I[8, c, 2, 2021-05-01T18:23:42.561]
+I[8, c, 3, 2021-05-01T18:23:42.561]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/709986.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringCloudNacos注册中心服务分级存储模型

文章目录 服务分级存储模型概述配置集群同集群优先的负载均衡 权重配置总结 之前对 Nacos注册中心入门 已经做了演示. 这篇文章对 Nacos 的服务分级存储模型做理论与实践. 服务分级存储模型概述 一个服务可以有多个实例&#xff0c;例如我们的 user-server&#xff0c;可以有:…

C#使用iText7给PDF文档添加书签

上一篇文章将SqlSugar官网文档中每个链接对应的网页生成独立PDF文档再合并为单个PDF文档&#xff0c;但是没有书签&#xff0c;八百多页的内容查找和跳转都不方便&#xff0c;本文学习和使用iText7给PDF文档添加多级书签。   添加多级书签分为两大步骤&#xff1a;1&#xff…

老卫带你学---leetcode刷题(202. 快乐数)

202. 快乐数 问题 「快乐数」 定义为&#xff1a; 对于一个正整数&#xff0c;每一次将该数替换为它每个位置上的数字的平方和。 然后重复这个过程直到这个数变为 1&#xff0c;也可能是 无限循环 但始终变不到 1。 如果这个过程 结果为 1&#xff0c;那么这个数就是快乐数。…

VR全景HDR拍摄教程

什么是HDR&#xff1f; HDR可以用在哪里&#xff1f; 书面解释&#xff1a;HDR&#xff08;高动态范围 High Dynamic Range&#xff09;摄影&#xff0c;是摄影领域广泛使用的一种技术。 是不是有点懵&#xff1f; 我们来看一个实际的拍摄现场环境&#xff0c;你就懂了 我们…

使用 Gradle 版本目录进行依赖管理 - Android

/ 前言 / 在软件开发中&#xff0c;依赖管理是一个至关重要的方面。合理的依赖版本控制有助于确保项目的稳定性、安全性和可维护性。 Gradle版本目录&#xff08;Version Catalogs&#xff09;是 Gradle 构建工具的一个强大功能&#xff0c;它为项目提供了一种集中管理依赖…

定时任务框架

定时任务的框架有哪些 ● Timer&#xff0c;JDK自带的&#xff0c;比较简单&#xff0c;使用的时候&#xff0c;定义一个TimerTask&#xff0c;实现run方法&#xff0c;然后定义一个Timer类&#xff0c;调用timer.schedule(timerTask,1000,3000); ○ 缺点&#xff1a;单线程、…

附加Numpy数组

参考&#xff1a;Append Numpy Array 引言 在数据科学和机器学习领域&#xff0c;处理大规模数据集是一项重要且常见的任务。为了高效地处理数据&#xff0c;numpy是一个非常强大的Python库。本文将详细介绍numpy中的一个重要操作&#xff0c;即如何附加&#xff08;append&a…

LeetCode:2867. 统计树中的合法路径数目(筛质数+ DFS Java)

目录 2867. 统计树中的合法路径数目 题目描述&#xff1a; 实现代码与思路&#xff1a; 筛质数 DFS 原理思路&#xff1a; 2867. 统计树中的合法路径数目 题目描述&#xff1a; 给你一棵 n 个节点的无向树&#xff0c;节点编号为 1 到 n 。给你一个整数 n 和一个长度为 …

西软云XMS operate XXE漏洞

免责声明&#xff1a;文章来源互联网收集整理&#xff0c;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;所产生的一切不良后果与文章作者无关。该…

linux使用scp命令来在两台Linux设备之间传输文件

1、linux怎么将一个文件发送到另一个linux设备特定目录下 可以使用scp命令&#xff08;secure copy&#xff09;来将文件从一个Linux设备复制到另一个Linux设备的特定目录下。假设你要将本地文件localfile.txt发送到远程设备的/remote/directory目录下&#xff0c;你可以使用以…

Oracle修改用户密码之后连接特别慢的问题

一、问题现象 oracle数据库密码修改后连接速度特别慢&#xff0c;甚至出现超时的问题&#xff0c;查询表也特别慢 更改密码后&#xff0c;每次连接异常慢&#xff0c;就算用正确的密码连接&#xff0c;验证延时也非常大&#xff0c;导致应用程序连接反复出现超时现象&#xf…

Jquery操作DOM对象

文章目录 目录 文章目录 本章目标 一.DOM操作分类 二.JQuery中的DOM操作 内容操作 属性值操作 节点操作 节点属性操作 节点遍历 总结 本章目标 使用Jquery操作网页元素使用JQuery操作文本与属性值内容使用JQuery操作DOM节点使用Jquery遍历DOM节点使用JQuery操作CSS-DOM 一…

Groovy(第八节) Groovy 之类

目录 Song 类 Groovy 类就是 Java 类 类的关系 类初始化 核心的灵活性

WebServer -- 日志系统(下)

目录 &#x1f33c;整体思路 &#x1f382;基础API fputs 可变参数宏 __VA_ARGS__ fflush &#x1f6a9;流程图与日志类定义 流程图 日志类定义 &#x1f33c;功能实现 生成日志文件 && 判断写入方式 日志分级与分文件 &#x1f33c;整体思路 日志系统分两部…

常见概率分布介绍

介绍 概率分布是统计学中用于描述随机变量的概率特征的函数。以下是几种常用的概率分布&#xff1a; 均匀分布&#xff08;Uniform Distribution&#xff09;: 离散均匀分布: 每个结果发生的概率相等。连续均匀分布: 任意两个相同长度的区间内取值的概率相同。 二项分布&am…

无法调试MFC源码

VS无法调试MFC源码 起初 有时候就是这么无奈&#xff0c;MFC源码各种问题没有办法调试&#xff0c;可是又想看下代码如何调用&#xff0c;里面做了些什么&#xff0c;从哪儿调出&#xff0c;学习一下大神的思路什么的。整理一下有可能的原因。 检查生成代码设置 需要设置正…

[Java 基础] Java修饰符

Java修饰符详解 Java修饰符用于定义类、方法或者变量&#xff0c;修改其行为的关键字。Java语言主要提供了两类修饰符&#xff1a; 访问权限修饰符: default, public , protected, private非访问权限修饰符: final, abstract, static, synchronized&#xff0c; volatile等 …

04 Opencv图像操作

文章目录 读写像素修改像素值Vec3b与Vec3F灰度图像增强获取图像通道bitwise_not 算子对图像非操作 读写像素 读一个GRAY像素点的像素值&#xff08;CV_8UC1&#xff09; Scalar intensity img.at(y, x); 或者 Scalar intensity img.at(Point(x, y)); 读一个RGB像素点的像素值…

js【详解】数据类型原理(含变量赋值详解-浅拷贝)

JavaScript 中的数据按存储方式的不同&#xff0c;分为值类型和引用类型。 值类型&#xff08;共 6 种&#xff09;&#xff1a;赋值的时候传值 —— 数字、字符串、布尔值、null 、undefined&#xff0c;Symbol引用类型&#xff08;仅 1 种&#xff09;&#xff1a;赋值的时候…

虚拟机看不到共享文件夹

johnjohn-virtual-machine:/mnt/hgfs$ cat /etc/issue Ubuntu 20.04.6 LTS \n \l 看下是否挂载 johnjohn-virtual-machine:/mnt/hgfs$ vmware-hgfsclient FPGAs_AdaptiveSoCs_Unified_2023.2_1013_2256 xilinx 取消挂载 johnjohn-virtual-machine:/mnt/hgfs$ sudo umount /mn…