在公司业务场景中,通常需要使用流计算引擎从多个数据源获取数据、进行 ETL 操作,并将清洗后的数据导入到数据分析系统或数据湖中。由于最后产生出来的表是一张宽表,我们通常也称这个过程为“数据打宽“。
数据打宽在流处理系统中对应的操作便是 Join。Join 可以将两个或更多的数据流按照某种关联条件合并到一起。这对于那些需要同时分析和理解多个数据源的实时数据的情景特别有用。以下是一些典型的多流 Join 场景:
-
实时广告效果监控:在实时广告系统中,广告展示和用户点击会被实时地记录和收集。我们可以使用流处理来实时处理这些数据,并使用Join 操作,按广告 ID 和用户 ID 合并广告展示数据和用户点击数据。这样,我们可以实时计算每个广告的点击率,以便实时监控广告效果。
-
服务器集群性能异常检查:集群监控系统通过采集服务器网卡设备监控指标与 TCP 协议层性能监控指标,通过 Join 展示,找到有异常流量的网卡上对应所有的 TCP 监控指标信息。
-
实时推荐系统:电商平台可能需要将用户行为数据(如点击、购买)和商品信息进行 Join,以便实时地为用户提供个性化的商品推荐。
-
金融交易风控:银行或金融机构可能需要将交易数据和用户风险评级数据进行 Join,以便实时地进行交易风险评估。
RisingWave 进行多流 join 的优势
RisingWave 作为一个专注于流计算的分布式流数据库,针对数据打宽这一场景进行了重点优化。以下是根据用户反馈总结出的 RisingWave 的独特优势:
-
稳定:传统流计算引擎通常使用本地磁盘进行内部状态管理。该种方式在进行多流 Join 时,很容易由于本地存储的限制,出现宕机等严重故障。RisingWave 将内部状态放在远端管理,能够保证多流 Join 场景中保持系统平稳运行。
-
运维简单:在使用传统流计算引擎时,同样由于本地存储的限制,运维工程师通常需要对如 RocksDB 等状态管理系统进行调参,才能调出较好的性能。而 RisingWave 的状态管理对用户透明,用户只需关心合理分配给 RisingWave 计算资源,而并不需要关心特定组件优化。
-
架构简洁:在处理杂 ETL 场景时,用户往往需要通过连接 Kafka 等消息中间件连接不同 streaming Join。而 RisingWave 使用物化视图表示流计算逻辑与结果,用户可以通过构建堆叠物化视图表示复杂逻辑,大幅简化架构。
-
可观测性强:RisingWave 中的物化视图可以被用户直接使用 SQL 进行查询,可观测性强,便于代码调试。
针对 Join 这一重要的流处理算子,RisingWave 做了不少技术上的探索与优化,下面将会为大家逐一介绍。
RisingWave 中的 join 技术
RisingWave 对于查询的处理可以分为两大类,一类称为是 Streaming Query,另外一类称为 Batch Query。其中 Batch Query 对 Join 的处理方式和传统数据库的原理是一样的,处理的输入都是有界限的 (bounded) 数据集。而 Streaming Query 主要的表达形式是CREATE MATERIALIZED VIEW
,Streaming Query 的 Join 需要处理流上无界限 (unbounded) 的数据流。流上 Join 的两个上游输入有任何数据变更都需要增量地计算出 Join 的变更结果输出到下游。Join 是一个重状态的算子,如何管理Join的状态也成了流数据库中非常重要的课题。
Symmetric Hash Join
业界通用的流处理 Join 使用的算法基本上都是 Symmetric Hash Join(需要有等值连接条件),它的思想很简单,主要是为 Join 的两边输入各自维护一个 Hash 表(Hash key 为 Join key)。每当一边有输入过来时,先插入自己的 Hash 表,再去查一下另外一边的 Hash 表并计算出 Join 的输出结果。不难看出这里的两边输入是对等 (Symmetric) 的关系。其中的 Hash表也称为 Join 的状态(State)。
Unbounded state
由于 Join 输入是 unbounded 的,可以推导出 Join 的状态也是 unbounded的。显然这会导致存储上的问题。RisingWave 通过存算分离的架构,可以把 Join 的状态存储到对象存储 (Object store) 当中。对象存储相比于传统的本地存储有诸多优点,例如容量扩展性高,可靠性高,成本效益更经济。理论上 Risingwave 的存储容量上限跟对象存储的容量上限是一样的。当然为了弥补对象存储访问延迟较高的问题,Risingwave 会利用内存和本地盘来缓存对象存储的文件,并通过 LRU 策略管理这些缓存。之前我们有文章专门介绍存储针对流处理状态的写入读取的优化,点这里阅读[1]。
Watermark & Window
即使 Join 的输入可能是 unbounded 的,但很多时候我们不希望 Join 的状态大小也是 Unbounded 的。通过水位线和窗口 Join 技术可以将 Join 状态控制在一个有限的大小之内。流上的数据一般是没有严格顺序的,但是通过定义 Watermark,我们可以得到一个相对有序的流 [3]。例如我们可以在 Source 流上的时间列设置一个 Watermark 限制为 -20s,代表当目前看见流数一行数据的时间列的时间戳为T时,意味着时间戳为 T-20s 的所有数据都已经到达。有了这个保证,我们可以在流上传递一个 Watermark 的消息,用来表示 Watermark 消息之后所有的数据的时间戳都是大于 T-20s 的。这样我们就可以在流上表达相对有序这个属性。
-- 定义一个具有Watermark的Source
CREATE SOURCE s1 (id int, value int, ts TIMESTAMP, WATERMARK FOR ts AS ts - INTERVAL '20' SECOND
) WITH (connector = 'datagen');
有了相对有序的属性我们就可以结合窗口(Window)来限制 Join 状态的空间大小。窗口在流处理上一般是通过 TUMBLE 或者 HOP Window 函数的方式为数据划分时间窗口,如果划分的时间字段带有 Watermark 信息,那么经过窗口函数后优化器也可以进一步推导出窗口时间列的 Watermark信息。Window Join 顾名思义,是在窗口中做 Join,Join 的连接条件要求左右输入的窗口时间列有等值关系。由于 Watermark 的存在,窗口不断地往前推进,已经做完 Join 的窗口中的状态便可以被清理。这样我们就可以有效的控制 Join 状态的大小。
CREATE SOURCE s2 (id int, value int, ts TIMESTAMP, WATERMARK FOR ts AS ts - INTERVAL '20' SECOND
) WITH (connector = 'datagen');-- 创建使用Window Join的物化视图
CREATE MATERIALIZED VIEW window_join AS
SELECT s1.id AS id1,s1.value AS value1,s2.id AS id2,s2.value AS value2
FROM TUMBLE(s1, ts, interval '1' MINUTE)
JOIN TUMBLE(s2, ts, interval '1' MINUTE)
ON s1.id = s2.id and s1.window_start = s2.window_start;
Interval Join
假如你正在处理的是用户的点击流数据,你可能想要连接用户的点击事件和他们的购买事件,但是这两个事件可能不会在严格的窗口期间内发生。在这种情况下,使用 Interval Join [4] 就会更加合适。Interval Join 允许两个事件在一定的时间间隔内连接,而不是在严格的窗口期间内。可能你会说,我们可以使用普通的 Symmetric Hash Join 来处理,去掉 Interval 的时间限制条件也可以。但我们在流处理 Join 上面的一个目标是控制 Join 状态的大小,而通过 Interval Join 我们可以利用输入流的 Watermark 信息用于清理过期(确定不会再被 Join 上)的状态。Interval Join 的语法如下,它需要在 Join 条件上加入左右两边流时间列上的范围约束条件。
-- 创建使用Interval Join的物化视图
CREATE MATERIALIZED VIEW interval_join AS
SELECT s1.id AS id1,s1.value AS value1,s2.id AS id2,s2.value AS value2
FROM s1 JOIN s2
ON s1.id = s2.id and s1.ts between s2.ts and s2.ts + INTERVAL '1' MINUTE;
Temporal Join
上面我们提到的 Join 对两边输入都是对等处理的,而对等处理需要付出比较高的代价,例如我们需要给两边的输入都维护对应的 Hash Table 作为状态。而传统数据库中的 Hash Join 只需要选择一边建立 Hash Table。为了提高性能,一个思考方向是打破对 Join 两边输入的对等关系。Risingwave提供 Temporal Join [5],它可以将一边流 Join 一个 Risingwave 内部表或者物化视图,流的一边可以不再维护状态,当有数据过来时,可以直接查询另外一边的表,同时表则充当了 Join 状态本身。可以看到在这种模式下Temporal Join 本身是不再需要维护任何状态的,它的效率会变得非常高。但天下没有免费的午餐,Temporal Join 中表这一侧的任何数据变更都不像前面 Symmetric Hash Join 那样可以影响到之前 Join 的输出结果上。同时为了保证系统的一致性,它要求流的一侧是 Append Only 的。Append Only 意味着这个数据流只能是 Insert,不能带有 Update,Delete。接下来我们看看 Temporal Join 的语法,它包含了特殊的FOR SYSTEM_TIME AS OF PROCETIME()
的语法,这是因为 Risingwave 中同样的 SQL 在 Batch Query和 Streaming Query 中得到结果是需要保持一致的。但 Temporal Join 的出现使得无法用普通的 SQL 来表达这样的结果,因此需要使用特殊的语法来表示。
-- 创建Table
CREATE table t1 (id int primary key, value int
);-- 创建使用Temporal Join的物化视图
CREATE MATERIALIZED VIEW interval_join AS
SELECT s1.id AS id1,s1.value AS value1,t1.id AS id2,t1.value AS value2
FROM s1 JOIN t1 FOR SYSTEM_TIME AS OF PROCTIME()
ON s1.id = t1.id;
Join Ordering
当提到 Join 的时候,不得不提及的一个问题是 Join Ordering。传统数据库中针对 Join Ordering 的优化文献浩如烟海,其中很重要的一个思想是利用CBO(Cost Based Optimizer)来枚举执行计划搜索空间, 利用表的统计信息,估算每个算子的需要处理的数据量,计算出执行计划的代价,最后采用估算代价最低的执行计划。但是在流数据库中,Join 的输入是实时不断有输入的流,而不是可以提前知道数据量和数据分布的静态数据集。有 Paper 提出了 Rate-based [6] 的 Join Ordering,主要思想是利用输入流的速率来估算经过 Join 后输出流的速率,然后找到最大化输出平均速率的Join Order。
有兴趣的读者可以细读一下论文,这里我给出一些结论。结论是直接精确计算输出平均速率的代价会比较高,因此需要一种启发式的算法来取得效果与优化速度的平衡。另外把速率比较快的流放在离 Join 输出最近,最远,中间都可能是最优的,取决于 Join Tree 的形状。但比较有意思的点是Bushy Tree 的形状比 Left Deep Tree 的形状似乎更适合流处理。仅仅形状上分析,可以看出 Bushy Tree 可以让数据并行地流过整个 Join,同时相比于 Left Deep Tre e它的一条数据从下往上 Join,Latency 更低。我们也做过一些实验验证了确实 Bushy Tree 可以更好地利用资源,提高吞吐和降低延迟。因此 Risingwave 目前使用的 Join Ordering 算法是尽量地将这棵树变成Bushy Tree 并使得它的树高最低 [7]。最后还想说的一点是不是所有的Join都能变成Bushy Tree,很多情况下用户写的 SQL 就是很多的 Left Join串成的 Left Deep Tree 形状的,这种时候是无法优化成 Bushy Tree 形状的,所以在限定了 Left Deep Tree 形状的情况下,更优做法是把速率快的流放得离输出更近。
下图为 TPCH Q9 的 Join Graph 及 Left Deep Tree 和 Bushy Tree 的比较:
更多优化
子查询
上面提及的很多 Join 都是 Inner Join 或者 Outer Join,实际上在 Join 实现上 RisingWave 还支持 SemiJoin 和 AntiJoin,只不过这两个类型的 Join 不能直接通过 Join 语法写出,而是需要以相关联子查询的形式表达出来。RisingWave 的子查询 Unnesting 技术是按照 Paper:Unnesting arbitrary queries [8] 来实现的。所有的子查询都会被转成 Apply 算子,并将 Apply算子不断往下推直到所有关联项都被改写成普通引用后就可以转成Join。因此在 RisingWave 的 Streaming Query 中子查询也被看成是 Join 的一种表达。
NestedLoop Join
上文提及到的 Join 都是带有等值连接条件的,但是对于不包含任何非等值条件的 Join,我们显然就无法使用 Symmetric Hash Join了。同时NestedLoop Join 目前在 RisingWave 的 Streaming Query 中是被禁止的,因为在流处理中它的效率非常的低下,可以想象一下,Join 流的任意一侧有变更都需要与另外一侧的所有数据做一遍比较才能得到增量的结果。
但有些特殊类型的非等值连接条件的 Join 实际上可以做到比较的高效,在Risingwave 中有一个特殊的算子被称为 Dynamic Filter [9],最初被提出是用于解决 TPCH Q11这种带有非关联子查询而产生的 NestedLoop Join 的场景。特别之处在于这个 Join 的一侧只有一行(由不带 Group by 的 Agg)不断在变的数据,Join 的关联条件是一个范围条件。在这种情况下左右两侧数据的变化都可以通过一个比较小的范围查询来实现高效的增量算法,有兴趣的同学可以去看看对应的 RFC。
Delta Join
通过前文我们可以了解到 Join 是一个重状态的算子,每个 Join 有需要维护自身的 Join 状态。那如果有多条 SQL 都使用了同一个表输入,并且 Join Key 都一样,我们可以复用它的状态,之前我们也有相关的文章在这里就不过多介绍,感谢兴趣的读者可以在这里阅读 [10]。
与Join相关的优化
-
Multi-Way Joins
前文提及的Join都是假定每个 Join 算子是 Binary 的,也就是说输入只有两个,但我们需要做N个输入 Join 的时候我们就会有 N-1 个 Join 算子,而Multi-Way 则是一个 Join 算子可以有多个输入。传统数据库中实际上 Binary Join 已经在绝大部分场景都能做到很好,只剩下一些很特别的场景例如:使用 BInary Join 的中间结果可能会出现放大。这些特殊场景可以留给 Multi-Way Joins 优化[11]。而 Multi-Way Joins 对流数据库的优势应该是可以降低 Join 的 Latency,原理是原来在好几个算子才能 Join 完的结果,现在可以在一个 Multi-Way Joins 算子中完成。但这个 Multi-Way Joins 没有对 Streaming 很友好的 Scale 方式,要么所有流都得汇聚到一个节点上完成计算,要么通过 Broadcast 方式 N-1个流广播到另外一个 Hash 的流上,要么是引入更为复杂的 HyperCube Shuffle。
-
快慢流
流上的数据是有快有慢的,对于 Join 来说两边输入的流如果一边速率非常的快,一个边速率很慢,我们是不是可以利用这种非对称性,在Join状态的数据结构上也适应这种非对称性?有 Paper [12] 研究在快流一侧不做Hash 表的索引,慢流一侧才做 Hash 表索引来利用这种非对称性。
总结
RisingWave 是一个云原生 SQL 流式数据库,并针对流处理 Join 做了大量状态管理、复用、以及性能优化。本文介绍了 RisingWave 的 Join 的使用场景,流处理 Join 的基本原理,以及 Join 状态的特点。同时介绍了如何使用 Watermark 来控制 Join 状态的大小。RisingWave 提供 Symmetric Hash Join、Interval Join、Temporal Join、Delta Join 等面向用户的 Join Features。RisingWave 有专门针对流处理的场景 Join Ordering 优化,一套完善的子查询 Unnesting 优化技术将子查询转换成 Join,和针对特殊NestedLoop Join 场景的 Dynamic Filter 优化。
Reference
-
https://www.risingwave.dev/docs/current/real-time-ad-performance-analysis
-
Xie J, Yang J. A survey of join processing in data streams[J]. Data Streams: Models and Algorithms, 2007: 209-236.
-
https://github.com/risingwavelabs/rfcs/pull/2
-
https://github.com/risingwavelabs/rfcs/pull/32
-
https://github.com/risingwavelabs/rfcs/blob/main/rfcs/0049-temporal-join.md
-
Viglas S D, Naughton J F. Rate-based query optimization for streaming information sources[C]//Proceedings of the 2002 ACM SIGMOD international conference on Management of data. 2002: 37-48.
-
https://github.com/risingwavelabs/rfcs/blob/main/rfcs/0023-minimize-height-join-ordering.md
-
Neumann T, Kemper A. Unnesting arbitrary queries[J]. Datenbanksysteme für Business, Technologie und Web (BTW 2015), 2015.
-
https://github.com/risingwavelabs/rfcs/blob/main/rfcs/0033-dynamic-filter.md
-
https://zhuanlan.zhihu.com/p/607054467
-
Freitag M, Bandle M, Schmidt T, et al. Adopting worst-case optimal joins in relational database systems[J]. Proceedings of the VLDB Endowment, 2020, 13(12): 1891-1904.
-
Kang J, Naughton J F, Viglas S D. Evaluating window joins over unbounded streams[C]//Proceedings 19th International Conference on Data Engineering (Cat. No. 03CH37405). IEEE, 2003: 341-352.
RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了近 150 名开源贡献者和近 3000 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。
了解更多:
官网: risingwave.com
教程:risingwavetutorial.com
GitHub:risingwave.com/github
微信公众号:RisingWave中文开源社区
社区用户交流群:risingwave_assistant