【大数据】Flink SQL 语法篇(六):Temporal Join

Flink SQL 语法篇》系列,共包含以下 10 篇文章:

  • Flink SQL 语法篇(一):CREATE
  • Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 语法篇(四):Group 聚合、Over 聚合
  • Flink SQL 语法篇(五):Regular Join、Interval Join
  • Flink SQL 语法篇(六):Temporal Join
  • Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 语法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 语法篇(九):Window TopN、Deduplication
  • Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

Flink SQL 语法篇(六):Temporal Join

  • 1.Versioned Table 的两种定义方式
    • 1.1 PRIMARY KEY 定义方式
    • 1.2 Deduplicate 定义方式
  • 2.应用案例
    • 2.1 案例一(事件时间)
    • 2.2 案例二(处理时间)

Temporal Join 定义(支持 Batch / Streaming):Temporal Join 在离线的概念中其实是没有类似的 Join 概念的,但是离线中常常会维护一种表叫做 拉链快照表,使用一个明细表去 Join 这个 拉链快照表 的 Join 方式就叫做 Temporal Join。而 Flink SQL 中也有对应的概念,表叫做 Versioned Table,使用一个明细表去 Join 这个 Versioned Table 的 Join 操作就叫做 Temporal Join。Temporal Join 中,Versioned Table 其实就是对同一条 key(在 DDL 中以 Primary Key 标记同一个 key)的历史版本(根据时间划分版本)做一个维护,当有明细表 Join 这个表时,可以根据明细表中的时间版本选择 Versioned Table 对应时间区间内的快照数据进行 Join。

应用场景:比如常见的汇率数据(实时的根据汇率计算总金额),在 12 : 00 12:00 12:00 之前(事件时间),人民币和美元汇率是 7 : 1 7:1 7:1,在 12 : 00 12:00 12:00 之后变为 6 : 1 6:1 6:1,那么在 12 : 00 12:00 12:00 之前数据就要按照 7 : 1 7:1 7:1 进行计算, 12 : 00 12:00 12:00 之后就要按照 6 : 1 6:1 6:1 计算。在事件时间语义的任务中,事件时间 12 : 00 12:00 12:00 之前的数据,要按照 7 : 1 7:1 7:1 进行计算, 12 : 00 12:00 12:00 之后的数据,要按照 6 : 1 6:1 6:1 进行计算。这其实就是离线中快照的概念,维护具体汇率的表在 Flink SQL 体系中就叫做 Versioned Table

1.Versioned Table 的两种定义方式

Verisoned Table:Verisoned Table 中存储的数据通常是来源于 CDC 或者会发生更新的数据。Flink SQL 会为 Versioned Table 维护 Primary Key 下的所有历史时间版本的数据。举一个汇率场景的案例来看一下一个 Versioned Table 的两种定义方式。

1.1 PRIMARY KEY 定义方式

-- 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到
CREATE TABLE currency_rates (currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,WATERMARK FOR update_time AS update_time,-- PRIMARY KEY 定义方式PRIMARY KEY(currency) NOT ENFORCED
) WITH ('connector' = 'kafka','value.format' = 'debezium-json',/* ... */
);

1.2 Deduplicate 定义方式

-- 定义一个 append-only 的数据源表
CREATE TABLE currency_rates (currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,WATERMARK FOR update_time AS update_time
) WITH ('connector' = 'kafka','value.format' = 'debezium-json',/* ... */
);-- 将数据源表按照 Deduplicate 方式定义为 Versioned Table
CREATE VIEW versioned_rates AS
SELECT currency, conversion_rate, update_time   -- 1. 定义 `update_time` 为时间字段FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY currency  -- 2. 定义 `currency` 为主键ORDER BY update_time DESC              -- 3. ORDER BY 中必须是时间戳列) AS rownum FROM currency_rates)
WHERE rownum = 1; 

2.应用案例

Temporal Join 支持的时间语义:事件时间、处理时间。

2.1 案例一(事件时间)

-- 1. 定义一个输入订单表
CREATE TABLE orders (order_id    STRING,price       DECIMAL(32,2),currency    STRING,order_time  TIMESTAMP(3),WATERMARK FOR order_time AS order_time
) WITH (/* ... */);-- 2. 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到
CREATE TABLE currency_rates (currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,WATERMARK FOR update_time AS update_time,PRIMARY KEY(currency) NOT ENFORCED
) WITH ('connector' = 'kafka','value.format' = 'debezium-json',/* ... */
);SELECT order_id,price,currency,conversion_rate,order_time,
FROM orders
-- 3. Temporal Join 逻辑
-- SQL 语法为:FOR SYSTEM_TIME AS OF
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;

结果如下,可以看到相同的货币汇率会根据具体数据的事件时间不同 Join 到对应时间的汇率:

order_id  price  货币       汇率             order_time
========  =====  ========  ===============  =========
o_001     11.11  EUR       1.14             12:00:00
o_002     12.51  EUR       1.10             12:06:00
  • 事件时间的 Temporal Join 一定要给左右两张表都设置 Watermark。
  • 事件时间的 Temporal Join 一定要把 Versioned Table 的主键包含在 Join on 的条件中。

2.2 案例二(处理时间)

10:15> SELECT * FROM LatestRates;currency   rate
======== ======
US Dollar   102
Euro        114
Yen           110:30> SELECT * FROM LatestRates;currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1-- 10:42 时,Euro 的汇率从 114 变为 116
10:52> SELECT * FROM LatestRates;currency   rate
======== ======
US Dollar   102
Euro        116     <====114 变为 116
Yen           1-- 从 Orders 表查询数据
SELECT * FROM Orders;amount currency
====== =========2 Euro             <== 在处理时间 10:15 到达的一条数据1 US Dollar        <== 在处理时间 10:30 到达的一条数据2 Euro             <== 在处理时间 10:52 到达的一条数据-- 执行关联查询
SELECTo.amount, o.currency, r.rate, o.amount * r.rate
FROMOrders AS oJOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS rON r.currency = o.currency-- 结果如下:
amount currency     rate   amount*rate
====== ========= ======= ============2 Euro          114          228    <== 在处理时间 10:15 到达的一条数据1 US Dollar     102          102    <== 在处理时间 10:30 到达的一条数据2 Euro          116          232    <== 在处理时间 10:52 到达的一条数据

可以发现处理时间就比较好理解了,因为处理时间语义中是根据左流数据到达的时间决定拿到的汇率值。Flink 就只为 LatestRates 维护了最新的状态数据,不需要关心历史版本的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/711801.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机器视觉——硬件选型

1、相机选型 在选择机器视觉相机时&#xff0c;通常需要考虑以下几个方面&#xff1a; 1、分辨率&#xff1a;相机的分辨率决定了其拍摄图像的清晰度和细节程度。根据具体的应用需求&#xff0c;可以选择适当的分辨率范围。 2、帧率&#xff1a;帧率表示相机每秒钟能够拍摄的…

2023年营养保健品线上电商市场行业分析(2024年营养保健行业未来趋势分析)

近年来&#xff0c;受人口老龄化、养生年轻化等因素驱动&#xff0c;保健品行业增长强劲&#xff0c;加之越来越多的年轻人也加入养生大军&#xff0c;成为保健品市场上的一股新力量&#xff0c;进一步带动市场扩容。 鲸参谋数据显示&#xff0c;2023年度&#xff0c;京东平台…

[pdf]《软件方法》2024版部分公开-共196页

DDD领域驱动设计批评文集 做强化自测题获得“软件方法建模师”称号 《软件方法》各章合集 潘加宇《软件方法》2024版部分公开pdf文件&#xff0c;共196页&#xff0c;已上传CSDN资源。 也可到以下地址下载&#xff1a; http://www.umlchina.com/url/softmeth2024.html 如果…

Ubuntu20.04 ssh终端登录后未自动执行.bashrc

sudo vim ~/.profile输入以下内容 if [ -n "$BASH_VERSION" ]; then if [ -f "$HOME/.bashrc" ]; then . "$HOME/.bashrc" fi fi 执行 source ~/.profile重新测试 其他答案 如果你的~/.bashrc文件在Ubuntu中没有自动生效&#xff0c;…

3. 文档概述(Documentation Overview)

3. 文档概述&#xff08;Documentation Overview&#xff09; 本章节简要介绍一下Spring Boot参考文档。它包含本文档其它部分的链接。 本文档的最新版本可在 docs.spring.io/spring-boot/docs/current/reference/ 上获取。 3.1 第一步&#xff08;First Steps&#xff09; …

解析电源模块测试条件与测试步骤 快速完成测试

高温高湿储存测试是电源模块环境适应性测试内容之一&#xff0c;在实际使用过程中由于应用场景不同电源所处的环境也是多样的&#xff0c;因此需要测试电源对各种环境的适应能力&#xff0c;提高电源的性能和可靠性。 电源高温高湿存储测试的目的是为了测量环境对电源结构、元件…

C语言第三十三弹---动态内存管理(上)

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】 动态内存管理 1、为什么要有动态内存分配 2、malloc和free 2.1、malloc 2.2、free 3、calloc和realloc 3.1、calloc 3.2、realloc 4、常见的动态内存的错…

气象数据收集

1、国家气象科学数据中心 预报数据:需要定制,收费10万+ 观测数据:国家气象信息中心-中国气象数据网 (cma.cn)https://data.cma.cn/data/cdcdetail/dataCode/A.0012.0001.html 地面基本气象观测数据 滞后2天 滞后一天 路面数据同化系统,实时 国家气象信息中心-中国气象数…

11.以太网交换机工作原理

目录 一、以太网协议二、以太网交换机原理三、交换机常见问题思考四、同网段数据通信全过程五、跨网段数据通信全过程六、关键知识七、调试命令 前言&#xff1a;在网络中传输数据时需要遵循一些标准&#xff0c;以太网协议定义了数据帧在以太网上的传输标准&#xff0c;了解以…

android移动应用开发基础答案,安卓工程师面试题

一线企业的app都是多线程和多进程的&#xff0c;而Android进程间通信机制就是Binder&#xff0c;原生的线程间通信则是Handler&#xff0c;Binder和Handler是了解安卓运行机制必须要掌握的一个知识点&#xff0c;更是一线企业面试必问的知识点&#xff01; 以下几道就是大厂关于…

【QT+QGIS跨平台编译】之五十五:【QGIS_CORE跨平台编译】—【qgsmeshcalcparser.cpp生成】

文章目录 一、Bison二、生成来源三、构建过程一、Bison GNU Bison 是一个通用的解析器生成器,它可以将注释的无上下文语法转换为使用 LALR (1) 解析表的确定性 LR 或广义 LR (GLR) 解析器。Bison 还可以生成 IELR (1) 或规范 LR (1) 解析表。一旦您熟练使用 Bison,您可以使用…

Unity中URP实现水体(整理优化)

文章目录 前言一、优化水的深度1、我们把 水流动的方向 和 水深浅过渡值&#xff0c;整合到一个四维变量中2、修改 水体流动方向3、在片元着色器中&#xff0c;修改使用过渡变量 二、优化泡沫三、优化水下的扭曲1、修复原本扰动UV的计算 四、优化水面高光1、把高光强度、光滑度…

红队基础设施建设

文章目录 一、ATT&CK二、T1583 获取基础架构2.1 匿名网络2.2 专用设备2.3 渗透测试虚拟机 三、T1588.002 C23.1 开源/商用 C23.1.1 C2 调研SliverSliver 对比 CS 3.1.2 CS Beacon流量分析流量规避免杀上线 3.1.3 C2 魔改3.1.4 C2 隐匿3.1.5 C2 准入应用场景安装配置说明工具…

UC++对象方法IsValid()、IsValidLowLevel()、IsValidLowLevelFast()的区别

在 Unreal Engine 中&#xff0c;IsValid(), IsValidLowLevel(), 和 IsValidLowLevelFast() 是用于检查 UObject&#xff08;Unreal Object&#xff09;有效性的三个不同的方法。它们之间的区别主要在于检查的级别和效率。 IsValid()&#xff1a; 检查级别&#xff1a; IsVal…

深度学习 精选笔记(2)自动求导与概率

学习参考&#xff1a; 动手学深度学习2.0Deep-Learning-with-TensorFlow-bookpytorchlightning ①如有冒犯、请联系侵删。 ②已写完的笔记文章会不定时一直修订修改(删、改、增)&#xff0c;以达到集多方教程的精华于一文的目的。 ③非常推荐上面&#xff08;学习参考&#x…

Linux系统——LAMP架构

目录 一、LAMP架构组成 1.LAMP定义 2.各组件的主要作用 3.CGI和FastCGI 3.1CGI 3.3CGI和FastCGI比较 4.PHP 4.1PHP简介 4.2PHP的Opcode语言 4.3PHP设置 二、LAMP架构实现 1.编译安装Apache httpd服务 2.编译安装Mysql 3.编译安装PHP 4.安装论坛 5.搭建博客 W…

Linux编程 2.4 文件和目录-Linux文件系统结构

1、文件操作基本元素 文件操作相关的最基本元素是&#xff1a;目录结构、索引节点和文件的数据本身。 目录结构&#xff08;目录项&#xff09;索引节点&#xff08;i节点&#xff09;文件的数据 2、文件系统的三个区域 属性&#xff1a; 超级块&#xff1a;存放文件系统本身…

vs code快捷键

ShiftCtrlO vs code 提供很强大的功能&#xff0c;就是可以快速查文件中的符号列表和函数列表&#xff0c;我们首先打开一个源码文件&#xff0c;比tcp.c&#xff0c;然后我们通过快捷键“ShiftCtrlO”即可打开对应源码文件的符号列表和函数列表&#xff0c;通过查看这些列表&a…

【学习心得】Python调用JS的三种常用方法

在做JS逆向的时候&#xff0c;一种情况是直接用Python代码复现JS代码的功能&#xff0c;达成目的。但很多时候这种方法有明显的缺点&#xff0c;那就是一旦JS代码逻辑发生了更改&#xff0c;你就得重写Python的代码逻辑非常不便。于是第二种情况就出现了&#xff0c;我直接得到…

python自动化管理和zabbix监控网络设备(防火墙和python自动化配置部分)

目录 前言 一、ssh配置 1.FW1 2.core-sw1 3.core-sw2 二、python自动化配置防火墙 三、验证DNAT 四、验证DNAT 前言 视频演示请访问b站主页 白帽小丑的个人空间-白帽小丑个人主页-哔哩哔哩视频 一、ssh配置 给需要自动化管理的设备配置ssh服务端用户名和密码 1.FW1 …