【大数据】Flink SQL 语法篇(五):Regular Join、Interval Join

Flink SQL 语法篇》系列,共包含以下 10 篇文章:

  • Flink SQL 语法篇(一):CREATE
  • Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 语法篇(四):Group 聚合、Over 聚合
  • Flink SQL 语法篇(五):Regular Join、Interval Join
  • Flink SQL 语法篇(六):Temporal Join
  • Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 语法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 语法篇(九):Window TopN、Deduplication
  • Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

Flink SQL 语法篇(五):Regular Join、Interval Join

  • 1.Regular Join
    • 1.1 Inner Join 案例
    • 1.2 Left Join 案例
    • 1.3 Full Join 案例
  • 2.Interval Join(时间区间 Join)
    • 2.1 Inner Interval Join
    • 2.2 Left Interval Join
    • 2.3 Full Interval Join

Flink 也支持了非常多的数据 Join 方式,主要包括以下三种:

  • 动态表(流)与动态表(流)的 Join
  • 动态表(流)与外部维表(比如 Redis)的 Join
  • 动态表字段的列转行(一种特殊的 Join)

细分 Flink SQL 支持的 Join:

  • Regular Join:流与流的 Join,包括 Inner Equal Join、Outer Equal Join
  • Interval Join:流与流的 Join,两条流一段时间区间内的 Join
  • Temporal Join:流与流的 Join,包括事件时间,处理时间的 Temporal Join,类似于离线中的快照 Join
  • Lookup Join:流与外部维表的 Join
  • Array Expansion:表字段的列转行,类似于 Hive 的 explode 数据炸开的列转行
  • Table Function:自定义函数的表字段的列转行,支持 Inner Join 和 Left Outer Join

1.Regular Join

Regular Join 定义(支持 Batch / Streaming):Regular Join 其实就是和离线 Hive SQL 一样的 Regular Join,通过条件关联两条流数据输出

应用场景:Join 其实在我们的数仓建设过程中应用是非常广泛的。离线数仓可以说基本上是离不开 Join 的。那么实时数仓的建设也必然离不开 Join,比如日志关联扩充维度数据,构建宽表;日志通过 ID 关联计算 CTR。

Regular Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据标识):

  • Inner JoinInner Equal Join):流任务中,只有两条流 Join 到才输出,输出 +[L, R]
  • Left JoinOuter Equal Join):流任务中,左流数据到达之后,无论有没有 Join 到右流的数据,都会输出(Join 到输出 +[L, R],没 Join 到输出 +[L, null]),如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null],然后输出 +[L, R]
  • Right JoinOuter Equal Join):有 Left Join 一样,左表和右表的执行逻辑完全相反。
  • Full JoinOuter Equal Join):流任务中,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出(对右流来说:Join 到输出 +[L, R],没 Join 到输出 +[null, R];对左流来说:Join 到输出 +[L, R],没 Join 到输出 +[L, null])。如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤 -[null, R],输出 +[L, R],右流数据到达为例:回撤 -[L, null],输出 +[L, R])。

1.1 Inner Join 案例

实际案例:案例为 曝光日志 关联 点击日志 筛选既有曝光又有点击的数据。

-- 曝光日志数据
CREATE TABLE show_log_table (log_id BIGINT,show_params STRING
) WITH ('connector' = 'datagen','rows-per-second' = '2','fields.show_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '100'
);-- 点击日志数据
CREATE TABLE click_log_table (log_id BIGINT,click_params STRING
)
WITH ('connector' = 'datagen','rows-per-second' = '2','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);-- 流的 INNER JOIN,条件为 log_id
INSERT INTO sink_table
SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table
INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;

输出结果如下:

+I[5, d, 5, f]
+I[5, d, 5, 8]
+I[5, d, 5, 2]
+I[3, 4, 3, 0]
+I[3, 4, 3, 3]
...

1.2 Left Join 案例

CREATE TABLE show_log_table (log_id BIGINT,show_params STRING
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.show_params.length' = '3','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE click_log_table (log_id BIGINT,click_params STRING
)
WITH ('connector' = 'datagen','rows-per-second' = '1','fields.click_params.length' = '3','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table
LEFT JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;

输出结果如下:

+I[5, f3c, 5, c05]
+I[5, 6e2, 5, 1f6]
+I[5, 86b, 5, 1f6]
+I[5, f3c, 5, 1f6]
-D[3, 4ab, null, null]
-D[3, 6f2, null, null]
+I[3, 4ab, 3, 765]
+I[3, 6f2, 3, 765]
+I[2, 3c4, null, null]
+I[3, 4ab, 3, a8b]
+I[3, 6f2, 3, a8b]
+I[2, c03, null, null]
...

1.3 Full Join 案例

CREATE TABLE show_log_table (log_id BIGINT,show_params STRING
) WITH ('connector' = 'datagen','rows-per-second' = '2','fields.show_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE click_log_table (log_id BIGINT,click_params STRING
)
WITH ('connector' = 'datagen','rows-per-second' = '2','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table
FULL JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;

输出结果如下:

+I[null, null, 7, 6]
+I[6, 5, null, null]
-D[1, c, null, null]
+I[1, c, 1, 2]
+I[3, 1, null, null]
+I[null, null, 7, d]
+I[10, 0, null, null]
+I[null, null, 2, 6]
-D[null, null, 7, 6]
-D[null, null, 7, d]
...

关于 Regular Join 的注意事项:

  • 实时 Regular Join 可以不是 等值 Join等值 Join非等值 Join 区别在于,等值 Join 数据 Shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 Join 数据 Shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联。
  • Join 的流程是左流新来一条数据之后,会和右流中符合条件的所有数据做 Join,然后输出。
  • 流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大。

2.Interval Join(时间区间 Join)

Interval Join 定义(支持 Batch / Streaming):Interval Join 在离线的概念中是没有的。Interval Join 可以让一条流去 Join 另一条流中前后一段时间内的数据。

应用场景:为什么有 Regular Join 还要 Interval Join 呢?刚刚的案例也讲了,Regular Join 会产生 回撤流,但是在实时数仓中一般写入的 Sink 都是类似于 Kafka 这样的消息队列,然后后面接 Clickhouse 等引擎,这些引擎又不具备处理回撤流的能力。所以博主理解 Interval Join 就是用于消灭回撤流的。

Interval Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据标识):

  • Inner Interval Join:流任务中,只有两条流 Join 到(满足 Join on 中的条件:两条流的数据在时间区间 + 满足其他等值条件)才输出,输出 +[L, R]
  • Left Interval Join:流任务中,左流数据到达之后,如果没有 Join 到右流的数据,就会等待(放在 State 中等),如果之后右流之后数据到达之后,发现能和刚刚那条左流数据 Join 到,则会输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间)。如果发现发现左流 State 中的数据过期了,就把左流中过期的数据从 State 中删除,然后输出 +[L, null],如果右流 State 中的数据过期了,就直接从 State 中删除。
  • Right Interval Join:和 Left Interval Join 执行逻辑一样,只不过左表和右表的执行逻辑完全相反
  • Full Interval Join:流任务中,左流或者右流的数据到达之后,如果没有 Join 到另外一条流的数据,就会等待(左流放在左流对应的 State 中等,右流放在右流对应的 State 中等),如果之后另一条流数据到达之后,发现能和刚刚那条数据 Join 到,则会输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间),发现 State 中的数据过期了,就将这些数据从 State 中删除并且输出(左流过期输出 +[L, null],右流过期输出 +[null, R]

可以发现 Inner Interval Join 和其他三种 Outer Interval Join 的区别在于,Outer 在随着时间推移的过程中,如果有数据过期了之后,会根据是否是 Outer 将没有 Join 到的数据也给输出。

2.1 Inner Interval Join

实际案例:还是刚刚的案例,曝光日志 关联 点击日志 筛选既有曝光又有点击的数据,条件是曝光关联之后发生 4 4 4 小时之内的点击。

CREATE TABLE show_log_table (log_id BIGINT,show_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.show_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE click_log_table (log_id BIGINT,click_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
)
WITH ('connector' = 'datagen','rows-per-second' = '1','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '4' HOUR AND click_log_table.row_time;

输出结果如下:

6> +I[2, a, 2, 6]
6> +I[2, 6, 2, 6]
2> +I[4, 1, 4, 5]
2> +I[10, 8, 10, d]
2> +I[10, 7, 10, d]
2> +I[10, d, 10, d]
2> +I[5, b, 5, d]
6> +I[1, a, 1, 7]

2.2 Left Interval Join

CREATE TABLE show_log (log_id BIGINT,show_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.show_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE click_log (log_id BIGINT,click_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
)
WITH ('connector' = 'datagen','rows-per-second' = '1','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTshow_log.log_id as s_id,show_log.show_params as s_params,click_log.log_id as c_id,click_log.click_params as c_params
FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_id
AND show_log.row_time BETWEEN click_log.row_time - INTERVAL '5' SECOND AND click_log.row_time + INTERVAL '5' SECOND;

输出结果如下:

+I[6, e, 6, 7]
+I[11, d, null, null]
+I[7, b, null, null]
+I[8, 0, 8, 3]
+I[13, 6, null, null]

2.3 Full Interval Join

CREATE TABLE show_log (log_id BIGINT,show_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.show_params.length' = '1','fields.log_id.min' = '5','fields.log_id.max' = '15'
);CREATE TABLE click_log (log_id BIGINT,click_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
)
WITH ('connector' = 'datagen','rows-per-second' = '1','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTshow_log.log_id as s_id,show_log.show_params as s_params,click_log.log_id as c_id,click_log.click_params as c_params
FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_id
AND show_log.row_time BETWEEN click_log.row_time - INTERVAL '5' SECOND AND click_log.row_time + INTERVAL '5' SECOND;

输出结果如下:

+I[6, 1, null, null]
+I[7, 3, 7, 8]
+I[null, null, 6, 6]
+I[null, null, 4, d]
+I[8, d, null, null]
+I[null, null, 3, b]

关于 Interval Join 的注意事项:

  • 实时 Interval Join 可以不是 等值 Join等值 Join非等值 Join 区别在于,等值 Join 数据 Shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 Join 数据 Shuffle 策略是 Global,所有数据发往一个并发,然后将满足条件的数据进行关联输出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/711635.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu系统下大数据服务器磁盘调优测试记录

一、背景 在kvm虚拟机ubuntu操作系统大数据平台测试的过程中,遭遇了磁盘I/O性能的瓶颈,因有cpu绑核操作,故有做隔核操作验证是否是绑核影响的磁盘I/O,后又对磁盘进行透传以及挂内存盘等操作; 二、磁盘介绍 2.1 磁盘…

『NLP学习笔记』图解 BERT、ELMo和GPT(NLP如何破解迁移学习)

图解 BERT、ELMo和GPT(NLP如何破解迁移学习) 文章目录 一. 前言二. 示例-句子分类三. 模型架构3.1. 模型输入3.2. 模型输出四. BERT VS卷积神经网络五. 词嵌入新时代5.1. 简要回顾词嵌入Word Embedding5.2. ELMo: 上下文语境很重要5.2.1. ELMo的秘密是什么?5.3. ULM-FiT:将迁…

蓝桥杯Python B组练习——斐波那契数列

一、题目 定义 斐波那契数列(Fibonacci sequence),又称黄金分割数列,因数学家莱昂纳多斐波那契(Leonardo Fibonacci)以兔子繁殖为例子而引入,故又称为“兔子数列”,指的是这样一个数…

Linux x86平台获取sys_call_table

文章目录 前言一、根据call *sys_call_table来获取二、使用dump_stack三、根据MSR_LSTAR寄存器四、使用sys_close参考资料 前言 Linux 3.10.0 – x86_64 最简单获取sys_call_table符号的方法: # cat /proc/kallsyms | grep sys_call_table ffffffff816beee0 R sy…

随想录算法训练营第四十七天|198.打家劫舍、213.打家劫舍II、337.打家劫舍III

198.打家劫舍 public class Solution {public int Rob(int[] nums) {if(nums.Length0){return 0;}if(nums.Length1){return nums[0];}int[] dpnew int[nums.Length1];dp[0]nums[0];dp[1]Math.Max(nums[0],nums[1]);for(int i2;i<nums.Length;i){dp[i]Math.Max(dp[i-2]nums[…

什么是 HTTPS 证书?作用是什么?

HTTPS 证书&#xff0c;即超文本传输安全协议证书&#xff08;Hypertext Transfer Protocol Secure&#xff09;&#xff0c;是网站安全的关键组成部分。它通过 SSL/TLS 加密协议&#xff0c;确保用户与网站之间的数据传输是加密和安全的。 什么是 HTTPS 证书&#xff1f; HT…

使用Docker -compose启动自定义jar包

步骤1&#xff1a;编写docker-compose.yml文件 首先我们需要编写一个docker-compose.yml文件来定义我们的服务传到我们的云服务器上 以下是一个示例&#xff1a; version: 3 services:app:build:context: .dockerfile: Dockerfileports:- 8080:8080volumes:- ./app.jar:/app…

可视化图表:水球图,展示百分比的神器。

Hi&#xff0c;我是贝格前端工场的老司机&#xff0c;本文分享可视化图表设计的水球图设计&#xff0c;欢迎老铁持续关注我们。 一、水球图及其作用 水球图是一种特殊的可视化图表&#xff0c;它主要用于展示百分比或比例的数据&#xff0c;并以水球的形式进行呈现。水球图的作…

2023面试题

目录 题目 1:JVM 整体结构是什么样的? 8 题目 3:Object 类有哪些方法? 11 题目 4:静态变量与实例变量区别? 11 题目 5:String 类的常用方法有哪些? 11 题目 6:数组有没有 length()方法?String 有没有 length() 12 题目 7:String、StringBuffer、StringBuilder 的区别…

【k8s 访问控制--认证与鉴权】

1、身份认证与权限 前面我们在操作k8s的所有请求都是通过https的方式进行请求&#xff0c;通过REST协议操作我们的k8s接口&#xff0c;所以在k8s中有一套认证和鉴权的资源。 Kubenetes中提供了良好的多租户认证管理机制&#xff0c;如RBAC、ServiceAccount还有各种策路等。通…

集合篇之ArrayList

一、源码如何分析&#xff1f; 1.成员变量 2.构造方法 3.关键方法 一些添加的方法。 二、debug看源码 我们给出下面代码&#xff1a; public void test01() {ArrayList<Integer> list new ArrayList<>();list.add(1);for (int i 2; i < 10; i) {list.add(i…

H5:段落标签与换行标签

目录 一.前言 二.正文 1.段落标签 2.换行标签 三.结语 一.前言 学习前端&#xff0c;从此起飞&#xff0c;愿你坚持&#xff0c;直至等顶。 二.正文 1.段落标签 <p></p> p为段落标签&#xff0c;由英文paragraph简写而来&#xff0c;用于将一段某一部分文本&am…

算法练习第九天|232.用栈实现队列、225. 用队列实现栈

熟悉栈和队列的方法&#xff1b;熟悉栈和队列的数据结构&#xff0c;不涉及算法 232.用栈实现队列 import java.util.Stack; class MyQueue {//负责进栈的Stack<Integer> stackIn;//负责出栈的Stack<Integer> stackOut;public MyQueue() {stackIn new Stack<&…

Rocketmq 入门介绍

从零手写实现 mq 详细介绍一下 rocketmq RocketMQ 是由阿里巴巴开发的分布式消息队列系统&#xff0c;它是一个低延迟、高可靠、高吞吐量的消息中间件。 RocketMQ 最初是作为阿里巴巴的内部项目进行开发的&#xff0c;后来成为了 Apache 软件基金会下的顶级项目&#xff0c;以…

精读《React 高阶组件》

本期精读文章是&#xff1a;React Higher Order Components in depth 1 引言 高阶组件&#xff08; higher-order component &#xff0c;HOC &#xff09;是 React 中复用组件逻辑的一种进阶技巧。它本身并不是 React 的 API&#xff0c;而是一种 React 组件的设计理念&…

【QT+QGIS跨平台编译】之五十三:【QGIS_CORE跨平台编译】—【qgssqlstatementparser.cpp生成】

文章目录 一、Bison二、生成来源三、构建过程一、Bison GNU Bison 是一个通用的解析器生成器,它可以将注释的无上下文语法转换为使用 LALR (1) 解析表的确定性 LR 或广义 LR (GLR) 解析器。Bison 还可以生成 IELR (1) 或规范 LR (1) 解析表。一旦您熟练使用 Bison,您可以使用…

transformers文本相似度

在自然语言处理(NLP)中,文本相似度是衡量两个文本之间语义或结构相似程度的一个重要概念。计算文本相似度的方法多种多样,适应不同的应用场景和需求。以下是一些常见的文本相似度计算方法: 1、余弦相似度: 通过将文本转换为向量表示(例如,使用词袋模型、TF-IDF 或 wor…

2024年个人护理赛道选品风向在哪?这份赛盈分销选品攻略必看!

2024年还会卷下去吗&#xff1f;看到一位行业大佬分享的内容深有感触&#xff1a;坚定做好产品&#xff0c;不做大卖&#xff0c;就不存在卷不卷。 有人出局&#xff0c;也会有人入局&#xff0c;并且深耕领域做大做强。 专注口腔护理的Bitvae入行不到两年&#xff0c;凭借一款…

C#学习(十四)——垃圾回收、析构与IDisposable

一、何为GC 数据是存储在内存中的&#xff0c;而内存又分为Stack栈内存和Heap堆内存 Stack栈内存Heap堆内存速度快、效率高结构复杂类型、大小有限制对象只能保存简单的数据引用数据类型基础数据类型、值类型- 举个例子 var c new Customer{id: 123,name: "Jack"…

Java中String类有哪些常用方法?

Java中的String类提供了许多有用的方法&#xff0c;用于处理字符串。以下是一些常用的方法及其简要描述&#xff1a; 1. **charAt(int index)**&#xff1a;返回指定位置的字符。 2. **length()**&#xff1a;返回字符串的长度。 3. **substring(int beginIndex, int endInd…