详述FlinkSql Join操作

 

FlinkSql 的 Join

Flink 官网将其分为了 Joins 和 Window Joins两个大类,其中里面又分了很多 Join 方式

参考文档:

Joins | Apache Flink

Window JOIN | Apache Flink

Joins

官网介绍共有6种方式:

  1. Regular Join:流与流的 Join,包括 Inner Join、Outer Equal Join

  2. Interval Join:流与流的 Join,两条流一段时间区间内的 Join

  3. Temporal Join:流与流的 Join,包括事件时间,处理时间的 Temporal Join,类似于离线中的快照 Join

  4. Lookup Join:流与外部维表的 Join

  5. Array Expansion:表字段的列转行,类似于 Hive 的 explode 数据炸开的列转行

  6. Table Function:自定义函数的表字段的列转行,支持 Inner Join 和 Left Outer Join

Regular Join

写法上和传统数据库没有区别,关联条件支持等值和非等值Join,有Inner Join 和 Outer Join(Left Join、Right Join、FULL JOIN)

有人问我为什么要特别区分内外连接,后面会用到

内连接是通过匹配两个表之间的共同列,返回满足连接条件的行。只有在连接条件匹配的情况下,才会返回结果。

外连接是在内连接的基础上,还包括了不满足连接条件的行。

SELECT order_id, uid, price, user_name 
FROM order a
Left JOIN user b
ON a.uid = b.uid

顺便了解一下流是怎么 Join 的:

和离线不同,离线是一批数据一起运算的,完成后输出结果

FlinkSql是Dynamic Table的概念,数据在 State 里面,每来一条数据就会对左右两边的数据进行关联

Regular Join 的 State 默认是永久保存的,为了避免 State 无限膨胀,可以根据情况决定是否设置状态清理:table.exec.state.ttl(目前是根据更新时间来判断是否过期,而非访问时间)

再来看看几种 Join ,其中outer Join产生的回撤流是和传统离线方式有很大区别的:

首先不考虑数据源有回撤的情况,Regular Join在 Outer Join 时会产生回撤流,L-左表、R-右表

  •  Inner Join:两条流 Join 到才输出 +[L, R],关联不上不会输出

  •  Left Join:当左流数据到达之后就会直接输出

        可以 Join 到右流则输出 +[L,R],Join 不到右流输出 +[L,null]

        如果之后右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据

        则会发起回撤流,先输出 -[L,null],然后在输出一条 +[L,R]

  •  Right Join:有 Left Join 一样,只是逻辑相反

  • Full Join:和Left原理一样,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出,如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流

        对右流来说:Join 到输出 +[L,R],没 Join 到输出 +[null,R],左流数据到达后回撤 -[null,R],输出 +[L,R]

        对左流来说:Join 到输出 +[L,R],没 Join 到输出 +[L,null]),右流数据到达后回撤 -[L, null],输出 +[L,R]     

图解:

Regular Join 过程图

inner join 和 lef join 输出结果示例:

inner join
+I[5, d, 5, f]
+I[5, d, 5, 8]
+I[3, 4, 3, 0]
left join
+I[3, 4ab, null, null]
+I[5, f3c, 5, c05]
+I[5, 6e2, 5, c05]
-D[3, 4ab, null, null]
+I[3, 4ab, 3, 765]

关于 Regular Join 的注意事项:

  • 实时 Regular Join 可以不是 等值 join等值 join 和 非等值 join 区别在于,等值 join 数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联

  •  Join 的流程是左流新来一条数据之后,会和右流中符合条件的所有数据做 Join,然后输出,如果是outer join会立即输出之后产生回撤流

  • 流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大。

Interval Join

Interval Join 只支持普通 Append 数据流,不支持含 Retract 的动态表

Interval Join 左右表仅在某个时间范围(给定上界和下界)内进行关联,这个时间区间支持event time 和 processing time两种语义,如果是 event time,会根据区间和Watermark自动清理状态

场景示例:用户下单产生订单信息,用户必须在下单后一个小时以内付款,输出付款的订单信息

SELECTo.orderId,o.productName,p.payType,o.orderTime,cast(payTime as timestamp) as payTime
FROM Orders o 
JOIN Payment p 
ON  o.orderId = p.orderId 
AND p.payTime BETWEEN orderTime AND orderTime + INTERVAL ‘1’ HOUR

Interval Join 几种方式,需要注意 Interval Join 不会产生回撤流:

  •  Inner Join:只有两条流 Join 到才输出,输出 +[L, R]

  • Left Join:和 Regular Join 不同,左流数据到达之后,如果没有 Join 到右流的数据,就会等待(放在 State 中等),如果之后右流之后数据到达之后,发现能和刚刚那条左流数据 Join 到,这时输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间)。如果发现发现左流 State 中的数据过期了,就把左流中过期的数据从 State 中删除,然后输出 +[L, null](这时候其实已经延迟了),如果右流 State 中的数据过期了,就直接从 State 中删除

  • Right Join:同 Left Join,逻辑相反

  • Full Join:流任务中,左流或者右流的数据到达之后,如果没有 Join 到另外一条流的数据,就会等待(左流放在左流对应的 State 中等,右流放在右流对应的 State 中等),如果之后另一条流数据到达之后,发现能和刚刚那条数据 Join 到,则会输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间),发现 State 中的数据能够过期了,就将这些数据从 State 中删除并且输出(左流过期输出 +[L, null],右流过期输出 -[null, R]

图解:

图片来自阿里云社区

inner join不用多说,看看 left join 输出结果示例:

+I[6, e, 6, 7]
+I[11, d, null, null]
+I[7, b, null, null]
+I[8, 0, 8, 3]
+I[13, 6, null, null]

关于 Interval Join 的注意事项:

  • 实时 Interval Join 可以不是 等值 join。等值 join 和 非等值 join 区别在于,等值 join 数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,然后将满足条件的数据进行关联输出

  •  outer join 不会产生回撤流,关联不上会在 State 过期时发送数据,会有延迟

Temporal Joins

这种关联方式同样是传统数据库没有的,但是会发现和数仓的拉链表Join有点类似

Temporal Join 支持和 Verisoned Table 进行关联,也支持 event time 和 processing time 两种语义,支持inner join 和 left join 两种方式

事件时间 ,在解决多版本问题时有奇效:

  1.  事件时间的 Temporal Join 一定要给左右两张表都设置 Watermark

  2. 事件时间的 Temporal Join 一定要把 Versioned Table 的主键包含在 Join on 的条件中

--官网案例
CREATE TABLE orders (order_id    STRING,price       DECIMAL(32,2),currency    STRING,order_time  TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
) WITH (/* ... */);-- 必须定义一个 versioned table
CREATE TABLE currency_rates (currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,WATERMARK FOR update_time AS update_time - INTERVAL '15' SECOND,PRIMARY KEY(currency) NOT ENFORCED
) WITH ('connector' = 'kafka'/* ... */
);SELECT order_id,price,orders.currency,conversion_rate,order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;order_id  price  currency  conversion_rate  order_time
========  =====  ========  ===============  =========
o_001     11.11  EUR       1.14             12:00:00
o_002     12.51  EUR       1.10             12:06:00

Flink SQL 会为 Versioned Table 维护 Primary Key 下的所有历史时间版本的数据,然后根据左表Orders的事件时间关联到对应时间的 Versioned Table 的汇率

Processing Time,由于是处理时间,只维护了最新的状态数据,不需要关心历史版本的数据,直接根据LeftTable数据到达的时间关联最新的数据

另外还支持 Temporal Table Functionv Join,但是一般不怎么用(至少我基本不这样写)

SELECTo_amount, r_rate
FROMOrders,LATERAL TABLE (Rates(o_proctime))
WHEREr_currency = o_currency

Lookup Join

Lookup Join 通常用于关联外部系统数据(比如Mysql、Hbase等),但是目前只支持 processing time,只能以处理时间关联最新的数据(这个最新是有代价的)

实际用起来其实会发现功能上和 version table 的processing 类似

-- 官网案例,需要定义一个外部存储的表
CREATE TEMPORARY TABLE Customers (id INT,name STRING,country STRING,zip STRING
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://mysqlhost:3306/customerdb','table-name' = 'customers'
);-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS oJOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS cON o.customer_id = c.id;

待办:lookup支持cache,cache的异步查询原理,数据更新的延迟,参数调优等等

Array Expansion

常见的用法就是类似Spark 的 lateral view expload(arr)

SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tagArray) AS t (tag)

Table Function 

其实和 Array Expansion 功能类似,但是 Table Function 本质上是个 UDTF 函数,并且支持自定义函数

Window Joins

见 FlinkSql 窗口函数

语法示例:

SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id,COALESCE(L.window_start, R.window_start) as window_start,COALESCE(L.window_end, R.window_end) as window_end
FROM (SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) L
INNER JOIN (SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) R
ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;
SELECT *
FROM (SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) L WHERE EXISTS (SELECT * FROM (SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R WHERE L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/672597.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用Softing edgeConnector模块将云轻松连接到Siemens PLC

一 工业边缘的连接解决方案 云服务提供商 (CSP) 引入了服务和功能,以简化基于云的工业物联网解决方案的实施。Azure Industrial IoT Platform或AWS IoT SiteWise支持标准协议和接口,例如OPC UA或MQTT。但是,如果您希望在典型的旧改项目中连接…

VM安装Centos7

目标: 一,安装Centos7 二,ssh可以连接 1 新建虚拟机 一直下一步 2 直到此处,选择稍后安装 一直下一步直到完成。 3 选中虚拟机,点击设置 选择CD/DVD,选取ISO映像文件。 4 等待安装 并且设置root密码 5…

Redis -- zset有序集合

聪明在于勤奋,天才在于积累。 目录 zset 有序集合 zset相关命令 zadd zcard zcount zrange zrevrange zrangebyscore zpopmax bzpopmax zpopmin bzpopmin zrank zscore zrem zRemRangeByRank zRemRangeByScore zincrby 集合间操作 zinte…

java日志框架总结(五、logback日志框架)

一、logback概述 Logback是由log4j创始人设计的又一个开源日志组件。 Logback当前分成三个模块: 1、logback-core, 2、logback- classic 3、logback-access。 1)logback-core是其它两个模块的基础模块。 2)logback-…

详解C++类和对象(中(类的6个默认成员函数))

文章目录 写在前面1. 类的6个默认成员函数2. 构造函数2.1 构造函数的引入2.1 构造函数的特性 3. 析构函数3.1 析构函数的引入3.2 析构函数的特性 4. 拷贝构造函数4.1 拷贝构造函数概念4.2 拷贝构造函数的特性4.3 拷贝构造函数典型调用场景 5. 赋值运算符重载5.1 运算符重载5.2 …

2024-2-6-复习作业

1> 要求&#xff1a; 源代码&#xff1a; #include <stdio.h> #include <stdlib.h> void output(int arr[],int len) {for(int i0;i<len;i){printf("%d ",arr[i]);}puts(""); } void bubble_sort(int arr[],int len) {for(int i1;i<…

C++类和对象(6)

目录 1. 再谈构造函数 1.1 构造函数体赋值 1.2 初始化列表 1.3 explicit关键字 2. static成员 2.1 概念 2.2 特性 【问题】 1. 再谈构造函数 1.1 构造函数体赋值 在创建对象时&#xff0c;编译器通过调用构造函数&#xff0c;给对象中各个成员变量一个合适的初始值。 c…

python的数据类型

&#x1f388;srting&#xff08;字符串&#xff09;&#xff1a; 操作符&#xff1a; &#xff1a;字符串连接 aabc befg print(ab) #输出 abcdefg * : 重复输出字符串 aabc print(a*3) #输出 abcabcabc [ : ]:截取字符串中的一部分&#xff0c;遵循左闭右开的原则&am…

迭代器和生成器

迭代器和生成器 一、迭代器① iter()② next()③ 自定义迭代器 二、生成器① 创建生成器1、斐波那契数列2、yield 创建 ② 使用send() 一、迭代器 迭代器是一个可以记住遍历的位置的对象&#xff0c;迭代器从第一个元素开始访问&#xff0c;直到所有元素访问结束 ① iter() …

Vue3快速上手(二)VSCode官方推荐插件安装及配置

一、VSCode官方插件安装&#xff0c;如下图2款插件 在用vite创建的程序里&#xff0c;提示提安装推荐的插件了&#xff0c;如下图&#xff1a; 二、配置 在设置-扩展里找到Volar插件&#xff0c;将Dot Value勾选上。这样在ref()修改变量时&#xff0c;会自动填充.value,无需…

电力负荷预测 | Matlab实现基于LSTM长短期记忆神经网络的电力负荷预测模型(结合时间序列)

文章目录 效果一览文章概述源码设计参考资料效果一览 文章概述 电力负荷预测 | Matlab实现基于LSTM长短期记忆神经网络的电力负荷预测模型(结合时间序列) 所谓预测,就是指通过对事物进行分析及研究,并运用合理的方法探索事物的发展变化规律,对其未来发展做出预先估计和判断…

李宏毅LLM——大模型+大资料的神奇力量

文章目录 大模型的重要性顿悟时刻 大资料的重要性数据预处理不一样的做法&#xff1a;KNN LM 对应视频P12-P14 大模型的重要性 模型参数和数据集越大&#xff0c;文字接龙的错误率越低 顿悟时刻 当模型超过10B-20B时&#xff0c;会突然顿悟 启示&#xff1a;不能只看最终结…

vue3:24—组件通信方式

目录 1、props 2、自定义事件 &#xff08;emit&#xff09; 3、mitt&#xff08;任意组件的通讯&#xff09; 4、v-model【封装ui组件库用的多&#xff0c;平时用的少。和vue2有点不同】 5、$attrs 6、$refs和$parent 7、provide和inject 8、pinia&#xff08;即vue2中…

HTML 样式学习手记

HTML 样式学习手记 在探索网页设计的世界时&#xff0c;我发现HTML元素的样式调整真的是个很酷的环节。通过简单的属性设置&#xff0c;就能让文字换上五彩斑斓的颜色、变换各异的字体和大小。特别是那个style属性&#xff0c;感觉就像是一扇通往CSS魔法世界的大门。 代码小试…

c语言实现io多路复用(select),进程,线程并发服务器

io多路复用&#xff08;select&#xff09;代码 #include<myhead.h> #include <sys/select.h> #define PORT 8888 #define IP "192.168.250.100" int main(int argc, char const *argv[]) { //创建套接字int sfd socket(AF_INET, SOCK_STREAM, 0…

常见的 MIME(媒体)类型速查

一、简介 MIME(Multipurpose Internet Mail Extensions)多用途互联网邮件扩展类型&#xff0c;是设定某种扩展名的文件用一种应用程序来打开的方式类型&#xff0c;当该扩展名文件被访问的时候&#xff0c;浏览器会自动使用指定应用程序来打开。多用于指定一些客户端自定义的文…

P1808 单词分类

P1808 单词分类 题目描述 Oliver 为了学好英语决定苦背单词&#xff0c;但很快他发现要直接记住杂乱无章的单词非常困难&#xff0c;他决定对单词进行分类。 两个单词可以分为一类当且仅当组成这两个单词的各个字母的数量均相等。 例如 AABAC&#xff0c;它和 CBAAA 就可以…

时序预测 | MATLAB实现基于CNN-BiLSTM-AdaBoost卷积双向长短期记忆网络结合AdaBoost时间序列预测

时序预测 | MATLAB实现基于CNN-BiLSTM-AdaBoost卷积双向长短期记忆网络结合AdaBoost时间序列预测 目录 时序预测 | MATLAB实现基于CNN-BiLSTM-AdaBoost卷积双向长短期记忆网络结合AdaBoost时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.Matlab实现…

SpringBoot3整合Mybatis-Plus,自定义动态数据源starter

文章目录 前言正文一、项目总览二、核心代码展示2.1 自定义AbstractRoutingDataSource2.2 动态数据源DynamicDataSource2.3 动态数据源自动配置2.4 动态数据源上下文DynamicDataSourceContextHolder2.5 动态数据源修改注解定义2.6 修改切面DynamicDataSourceAspect2.7 动态数据…

多维时序 | MATLAB实现基于CNN-LSSVM卷积神经网络-最小二乘支持向量机多变量时间序列预测

多维时序 | MATLAB实现基于CNN-LSSVM卷积神经网络-最小二乘支持向量机多变量时间序列预测 目录 多维时序 | MATLAB实现基于CNN-LSSVM卷积神经网络-最小二乘支持向量机多变量时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.MATLAB实现基于CNN-LSSVM卷积神经…