Flink SQL 如何实现数据流的 Join?

无论在 OLAP 还是 OLTP 领域,Join 都是业务常会涉及到且优化规则比较复杂的 SQL 语句。对于离线计算而言,经过数据库领域多年的积累,Join 语义以及实现已经十分成熟,然而对于近年来刚兴起的 Streaming SQL 来说 Join 却处于刚起步的状态。

其中最为关键的问题在于 Join 的实现依赖于缓存整个数据集,而 Streaming SQL Join 的对象却是无限的数据流,内存压力和计算效率在长期运行来说都是不可避免的问题。下文将结合 SQL 的发展解析 Flink SQL 是如何解决这些问题并实现两个数据流的 Join。

离线 Batch SQL Join 的实现

传统的离线 Batch SQL (面向有界数据集的 SQL)有三种基础的实现方式,分别是 Nested-loop Join、Sort-Merge Join 和 Hash Join。

  • Nested-loop Join 最为简单直接,将两个数据集加载到内存,并用内嵌遍历的方式来逐个比较两个数据集内的元素是否符合 Join 条件。Nested-loop Join 虽然时间效率以及空间效率都是最低的,但胜在比较灵活适用范围广,因此其变体 BNL 常被传统数据库用作为 Join 的默认基础选项。
  • Sort-Merge Join 顾名思义,分为两个 Sort 和 Merge 阶段。首先将两个数据集进行分别排序,然后对两个有序数据集分别进行遍历和匹配,类似于归并排序的合并。值得注意的是,Sort-Merge 只适用于 Equi-Join(Join 条件均使用等于作为比较算子)。Sort-Merge Join 要求对两个数据集进行排序,成本很高,通常作为输入本就是有序数据集的情况下的优化方案。
  • Hash Join 同样分为两个阶段,首先将一个数据集转换为 Hash Table,然后遍历另外一个数据集元素并与 Hash Table 内的元素进行匹配。第一阶段和第一个数据集分别称为 build 阶段和 build table,第二个阶段和第二个数据集分别称为 probe 阶段和 probe table。Hash Join 效率较高但对空间要求较大,通常是作为 Join 其中一个表为适合放入内存的小表的情况下的优化方案。和 Sort-Merge Join 类似,Hash Join 也只适用于 Equi-Join。

实时 Streaming SQL Join

相对于离线的 Join,实时 Streaming SQL(面向无界数据集的 SQL)无法缓存所有数据,因此 Sort-Merge Join 要求的对数据集进行排序基本是无法做到的,而 Nested-loop Join 和 Hash Join 经过一定的改良则可以满足实时 SQL 的要求。
我们通过例子来看基本的 Nested Join 在实时 Streaming SQL 的基础实现(案例及图来自 Piotr Nowojski 在 Flink Forward San Francisco 的分享[2])。

图1. Join-in-continuous-query-1

Table A 有 1、42 两个元素,Table B 有 42 一个元素,所以此时的 Join 结果会输出 42。

图2. Join-in-continuous-query-2

接着 Table B 依次接受到三个新的元素,分别是 7、3、1。因为 1 匹配到 Table A 的元素,因此结果表再输出一个元素 1。

图3. Join-in-continuous-query-3

随后 Table A 出现新的输入 2、3、6,3 匹配到 Table B 的元素,因此再输出 3 到结果表。

可以看到在 Nested-Loop Join 中我们需要保存两个输入表的内容,而随着时间的增长 Table A 和 Table B 需要保存的历史数据无止境地增长,导致很不合理的内存磁盘资源占用,而且单个元素的匹配效率也会越来越低。类似的问题也存在于 Hash Join 中。

那么有没有可能设置一个缓存剔除策略,将不必要的历史数据及时清理呢?答案是肯定的,关键在于缓存剔除策略如何实现,这也是 Flink SQL 提供的三种 Join 的主要区别。

Flink SQL 的 Join

  • Regular Join

Regular Join 是最为基础的没有缓存剔除策略的 Join。Regular Join 中两个表的输入和更新都会对全局可见,影响之后所有的 Join 结果。举例,在一个如下的 Join 查询里,Orders 表的新纪录会和 Product 表所有历史纪录以及未来的纪录进行匹配。

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id

因为历史数据不会被清理,所以 Regular Join 允许对输入表进行任意种类的更新操作(insert、update、delete)。然而因为资源问题 Regular Join 通常是不可持续的,一般只用做有界数据流的 Join。

  • Time-Windowed Join

Time-Windowed Join 利用窗口给两个输入表设定一个 Join 的时间界限,超出时间范围的数据则对 JOIN 不可见并可以被清理掉。值得注意的是,这里涉及到的一个问题是时间的语义,时间可以指计算发生的系统时间(即 Processing Time),也可以指从数据本身的时间字段提取的 Event Time。如果是 Processing Time,Flink 根据系统时间自动划分 Join 的时间窗口并定时清理数据;如果是 Event Time,Flink 分配 Event Time 窗口并依据 Watermark 来清理数据。

以更常用的 Event Time Windowed Join 为例,一个将 Orders 订单表和 Shipments 运输单表依据订单时间和运输时间 Join 的查询如下:

SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId ANDs.shiptime BETWEEN o.ordertime AND o.ordertime + INTERVAL '4' HOUR

这个查询会为 Orders 表设置了 o.ordertime > s.shiptime- INTERVAL ‘4’ HOUR 的时间下界(图4)。

图4. Time-Windowed Join 的时间下界 - Orders 表

并为 Shipmenets 表设置了 s.shiptime >= o.ordertime 的时间下界(图5)。

图5. Time-Windowed Join 的时间下界 - Shipment 表

因此两个输入表都只需要缓存在时间下界以上的数据,将空间占用维持在合理的范围。

不过虽然底层实现上没有问题,但如何通过 SQL 语法定义时间仍是难点。尽管在实时计算领域 Event Time、Processing Time、Watermark 这些概念已经成为业界共识,但在 SQL 领域对时间数据类型的支持仍比较弱[4]。因此,定义 Watermark 和时间语义都需要通过编程 API 的方式完成,比如从 DataStream 转换至 Table ,不能单纯靠 SQL 完成。这方面的支持 Flink 社区计划通过拓展 SQL 方言来完成,感兴趣的读者可以通过 FLIP-66[7] 来追踪进度。

  • Temporal Table Join

虽然 Timed-Windowed Join 解决了资源问题,但也限制了使用场景: Join 两个输入流都必须有时间下界,超过之后则不可访问。这对于很多 Join 维表的业务来说是不适用的,因为很多情况下维表并没有时间界限。针对这个问题,Flink 提供了 Temporal Table Join 来满足用户需求。

Temporal Table Join 类似于 Hash Join,将输入分为 Build Table 和 Probe Table。前者一般是纬度表的 changelog,后者一般是业务数据流,典型情况下后者的数据量应该远大于前者。在 Temporal Table Join 中,Build Table 是一个基于 append-only 数据流的带时间版本的视图,所以又称为 Temporal Table。Temporal Table 要求定义一个主键和用于版本化的字段(通常就是 Event Time 时间字段),以反映记录在不同时间的内容。

比如典型的一个例子是对商业订单金额进行汇率转换。假设有一个 Orders 流记录订单金额,需要和 RatesHistory 汇率流进行 Join。RatesHistory 代表不同货币转为日元的汇率,每当汇率有变化时就会有一条更新记录。两个表在某一时间节点内容如下:

图6. Temporal Table Join Example]

我们将 RatesHistory 注册为一个名为 Rates 的 Temporal Table,设定主键为 currency,版本字段为 time。

图7. Temporal Table Registration]

此后给 Rates 指定时间版本,Rates 则会基于 RatesHistory 来计算符合时间版本的汇率转换内容。

图8. Temporal Table Content]

在 Rates 的帮助下,我们可以将业务逻辑用以下的查询来表达:

SELECT o.amount * r.rate
FROMOrders o,LATERAL Table(Rates(o.time)) r
WHEREo.currency = r.currency

值得注意的是,不同于在 Regular Join 和 Time-Windowed Join 中两个表是平等的,任意一个表的新记录都可以与另一表的历史记录进行匹配,在 Temporal Table Join 中,Temoparal Table 的更新对另一表在该时间节点以前的记录是不可见的。这意味着我们只需要保存 Build Side 的记录直到 Watermark 超过记录的版本字段。因为 Probe Side 的输入理论上不会再有早于 Watermark 的记录,这些版本的数据可以安全地被清理掉。

总结

实时领域 Streaming SQL 中的 Join 与离线 Batch SQL 中的 Join 最大不同点在于无法缓存完整数据集,而是要给缓存设定基于时间的清理条件以限制 Join 涉及的数据范围。根据清理策略的不同,Flink SQL 分别提供了 Regular Join、Time-Windowed Join 和 Temporal Table Join 来应对不同业务场景。

另外,尽管在实时计算领域 Join 可以灵活地用底层编程 API 来实现,但在 Streaming SQL 中 Join 的发展仍处于比较初级的阶段,其中关键点在于如何将时间属性合适地融入 SQL 中,这点 ISO SQL 委员会制定的 SQL 标准并没有给出完整的答案。或者从另外一个角度来讲,作为 Streaming SQL 最早的开拓者之一,Flink 社区很适合探索出一套合理的 SQL 语法反过来贡献给 ISO。


原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/517142.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

小程序开发(9)-之地图组件map、导航

map用起来真的一言难尽呀,按照官方文档说是可以支持传入一个setting,来配置所以的属性的。。,但是我试了没效果,经纬度更新了,调试代码看到的wxml经纬度也变化了,但是map上的经纬度却没变化,后来…

惊魂48小时,阿里工程师如何紧急定位线上内存泄露?

阿里妹导读:云计算场景下的大规模分布式系统中,网络异常、磁盘IO异常、时钟跳变、操作系统异常乃至软件本身可能存在bugs等,均给分布式系统正确运行带来了挑战。持续的监控报警完善是打造稳定高可用分布式系统过程中非常重要的工作&#xff0…

我的程序跑了 60 多小时,就是为了让你看一眼 JDK 的 BUG 导致的内存泄漏

来源 | why技术荒腔走板大家好,我是 why,老规矩,先来一个简短的荒腔走板,给冰冷的技术文注入一丝色彩。上面图片中这个正在奔跑的少年,是正在参加校运会的我,那一年我 18 岁,高三。参加的项目是…

Flutter+Serverless端到端研发架构实践

Serverless(无服务架构)被誉为下一代云计算,自概念推出以来,因为能带来研发交付速度提升与成本的降低在业内异常火爆。闲鱼客户端基于Flutter进行架构演进与创新,通过Flutter统一Android和iOS双端提升研发效能之后&…

Go 开发关键技术指南 | 为什么你要选择 GO?(内含超全知识大图)

导读:从问题本身出发,不局限于 Go 语言,探讨服务器中常常遇到的问题,最后回到 Go 如何解决这些问题,为大家提供 Go 开发的关键技术指南。我们将以系列文章的形式推出《Go 开发的关键技术指南》,共有 4 篇文…

小程序开发(10)-之热力图解决方案、手绘图

原本是用别人用canvas画的热力图的https://github.com/rover95/wxapp-heatmap,但是问题有点多,热力图的颜色,卡顿、叠加、渲染失败等,所以就弃用了,也找了好久,好像大家都没有更好的提议,自己也…

windows下mysql8.x配置远程连接

文章目录1. 现象2. 登录mysql3.先查看下当前的用户,具有什么权限4.创建新的用户之后再查权限5. 赋予权限6.刷新权限,然后就可远程访问了1. 现象 Host ‘192.168.0.103’ is not allowed to connect to this MySQL server mysql8.x配置远程连接 2. 登…

Spark整合Ray思路漫谈

什么是Ray 之前花了大概两到三天把Ray相关的论文,官网文档看了一遍,同时特意去找了一些中文资料看Ray当前在国内的发展情况(以及目前国内大部分人对Ray的认知程度)。 先来简单介绍下我对Ray的认知。 首先基因很重要&#xff0c…

建设数据中台之前,建议先看这份企业数据能力测评 | 大咖说中台

作者 | 耿立超来源 | 《大数据平台架构与原型实现:数据中台建设实战》“我的企业目前在数据应用上处于什么水平?接下来应该朝哪个方向努力?”本文试图帮助企业决策者和IT负责人解答这一问题。今天,数据之于企业的重要性已经勿须多…

如何让 python 处理速度翻倍?内含代码

阿里妹导读:作为在日常开发生产中非常实用的语言,有必要掌握一些python用法,比如爬虫、网络请求等场景,很是实用。但python是单线程的,如何提高python的处理速度,是一个很重要的问题,这个问题的…

Zipkin 存储追踪数据至 MySQL

下载zipkin-mysql数据库脚本 https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/mysql-v1/src/main/resources 创建数据库名称为zipkin,字符集编码:utf8mb4 初始化脚本 -- -- Copyright 2015-2019 The OpenZipkin Authors -- -- Licen…

Spring Cloud Alibaba 新一代微服务解决方案

本篇是「跟我学 Spring Cloud Alibaba」系列的第一篇, 每期文章会在公众号「架构进化论」进行首发更新,欢迎关注。 1、Spring Cloud Alibaba 是什么 Spring Cloud Alibaba 是阿里巴巴提供的微服务开发一站式解决方案,是阿里巴巴开源中间件…

它估值25亿!被马云领投,是华为“老战友”,网友:也许股价能超茅台!

最近一条新闻被炒的沸沸扬扬:十年以来中国最大IPO,中芯国际将融资532亿元!何为IPO?翻译即为一家公司第一次向全社会公开售出它的股份。买的人越多,代表着社会对其信心越大。为什么2020年,能爆发这样1场最大…

RabbitMQ 最新版安装 (Linux环境)

文章目录一、Erlang1. Erlang下载2. Erlang 上传并解压3. 验证rabbitmq依赖是否安装4. 安装rabbitmq依赖5. Erlang 编译、安装6. Erlang 配置环境变量7. Erlang 验证二、RabbitMQ2.1. RabbitMQ 下载2.2. RabbitMQ 上传并解压2.3. RabbitMQ 配置2.4. 配置环境变2.5. 启动 Rabbit…

双11 背后的全链路可观测性:阿里巴巴鹰眼在“云原生时代”的全面升级

导读:作为一支深耕多年链路追踪技术 (Tracing) 与性能管理服务 (APM) 的团队,阿里巴巴中间件鹰眼团队的工程师们见证了阿里巴巴基础架构的多次升级,每一次的架构升级都会对系统可观测性能力 (Observability) 带来巨大挑战,而这次的…

一切转型始于数据和模型 | 2020 MATLAB EXPO 中国线上用户大会:即将上线

2020 MATLAB EXPO 中国线上用户大会一切转型始于数据和模型2020 年 7 月 21-24 日 | 线上直播MATLAB 和 Simulink,作为业界普遍使用的科学计算与模型仿真软件,已被全球的工程师和科学家们广泛应用于加快汽车、航空、电子、金融服务、生物医药以及其他行业…

Dubbo 如何成为连接异构微服务体系的最佳服务开发框架

从编程开发的角度来说,Apache Dubbo (以下简称 Dubbo )首先是一款 RPC 服务框架,它最大的优势在于提供了面向接口代理的服务编程模型,对开发者屏蔽了底层的远程通信细节。同时 Dubbo 也是一款服务治理框架,…

Zipkin 基于MQ存 储链路信息至 MySQL

RabbitMQ 最新版安装 (Linux环境) https://gblfy.blog.csdn.net/article/details/120498390 启动rabbitmq 队列是空的 数据库表是无数据的 启动nacos 应用集成rabbitMQ 父工程导入依赖 <!-- 消息队列通用依赖 --><dependency><groupId>org.springframewo…

标签编辑新工具:如何使用控制台标签编辑器(Tag editor)

创建阿里云资源时&#xff0c;您可以给资源绑定标签。已经创建的资源&#xff0c;也可以在资源列表页面或者通过API&#xff0c;批量的添加、更改和删除标签。当遇到如下更为复杂问题和场景&#xff0c;该如何快速解决标签问题呢&#xff1f; 资源跨度大&#xff0c;需要跨资源…

炸裂!这些大厂跪求的人才太牛了!

今年所有的互联网公司都在ALL in AI&#xff0c;百度、腾讯、阿里巴巴、京东等互联网巨头都在四处挖掘AI人才。AI的岗位需求很多&#xff0c;几乎每天都有数百个JD放出。而亿欧智库发布的《2020全球人工智能人才培养研究报告》提到&#xff0c;至今为止AI的人才储备仍跟不上需求…