使用SQL实现流处理的核心技术
在了解了Table\SQL API的使用方法以及作业运行机制之后,接下来分析SQL实现流处理的核心技术。
为什么要分析这个问题呢?
因为传统的关系代数以及SQL最开始是为了批处理设计的,在传统关系型数据库以及批处理中,数据都是有界的,因此SQL语句的执行过程比较好理解,但是在流处理中,数据是无界的,那么将SQL应用于流处理的理解成本以及实现成本相对批处理就高很多了。因此在本节中,我们会介绍SQL实现流处理的过程中面临的难题,然后通过一步一步的将这些难题解决之后,总结出SQL实现流处理的核心技术。
使用SQL实现流处理的思路
在流式SQL(使用SQL实现流处理作业)诞生之前,基于SQL的数据查询都是基于批处理的。那么如果我们想将SQL应用到流处理中,必然要站在巨人的肩膀(批处理的流程)上面进行,那么分析思路就很清晰了。如表8-1所示,我们先来比较批处理与流处理在处理数据时的异同点,如果是共通点,那么流处理就可以直接复用批处理的实现方案,只有差异点才是我们需要重点克服和关注的。
表8-1 流处理和批处理的异同点
处理模式 | 输入表 | 处理逻辑 | 结果表 |
---|---|---|---|
批处理 | 静态表:输入数据有界 | 批处理:执行处理时能够访问到完整的输入数据,然后计算,输出完整的结果数据 | 静态表:输出数据有界 |
流处理 | 动态表:输入数据无界,数据实时增加并且源源不断 | 流处理:执行时不能够访问到完整的输入数据,每次计算得到结果都是一个中间结果,因此计算的过程永远不会停止,会持续等待新的数据进入并计算 | 动态表:输出数据无界 |
通过表8-1对比流处理和批处理之间的差异后,我们发现流处理和批处理在数据处理的3个阶段要做的工作都是完全不同的,两者差异很大,所以要使用SQL实现流处理就要把下面3个问题都解决了。
- 问题1:如何将一个实时的、源源不断的输入数据流表示为SQL中的输入表。
- 问题2:将SQL处理逻辑翻译成什么样的底层处理技术才能够实时的处理输入数据流,然后产出输出数据流。
- 问题3:如何将一个实时的、源源不断的输出数据流表示为SQL中的输出表。
问题1和问题3都比较好解决,只是将数据流和表之间进行相互映射罢了,Flink针对这个问题提供了一种名为动态表(Dynamic Table)的技术,关于动态表将在下文介绍。
问题2就比较难解决了。不过问题2也有解决思路,在常见的高级关系型数据库中会提供一个称为物化视图(Materialized Views)的特性。物化视图和虚拟视图一样,都是定义在实体表上的一条SQL查询,不同之处在于物化视图会实际执行SQL查询并且缓存查询的结果,因此当我们访问物化视图时,并不需要基于原始表再进行计算,而只需要直接获取缓存的物化视图结果就可以了。
以批处理为例,天级别的物化视图是每天等数据源准备好之后,调度物化视图的SQL执行批处理,然后将结果缓存下来,缓存下来的结果就可以提供服务了。而物化视图的特性就为SQL实现流处理提供了一个很好的思路,流处理中的SQL查询实际上也可以看作一个物化视图,只不过在流处理中,数据源表的数据是源源不断的,那么整个物化视图结果的更新也必须是实时的,只有这样才能保证产出结果的及时性,因此这要求物化视图的更新时延非常低。
思路有了,具体要怎么实现呢?Flink采用了一种视图实时更新(Eager View Maintenance)的技术,这种技术可以在物化视图的数据源表发生更新时,就立即更新物化视图的结果。那么要如何理解这个视图实时更新技术呢?
我们知道在数据库中,一张表中的数据本质上是由INSERT、UPDATE和DELETE这3种命令作用的结果,如果将每一条命令的执行看作是一条数据,那么一张表的数据就可以使用一个包含INSERT、UPDATE和DELETE命令的数据流来维护,我们将这个数据流称为更新日志流(changelog stream)。有了更新日志流之后,就能实现物化视图的实时更新了,我们可以将数据源表的更新过程转化为更新日志流,那么在数据源表上的SQL查询(物化视图)就变为了对更新日志流的消费和加工,这样就能实现物化视图的实时更新。
在Flink中,这种视图实时更新技术被称作连续查询(Continuous Query)。
总结一下,为了使用SQL实现流处理,Flink提出了动态表以及连续查询两种技术方案,动态表技术用于实现输入、输出数据流和表之间的映射,连续查询技术用于实现物化视图的实时更新。
动态表以及连续查询
- 动态表:动态表(Dynamic Table)是Table\SQL API的核心概念。动态表中动态是相比于批处理中静态表来说的。
- 静态表:应用于批处理中,静态表可以理解为是不随着时间的推进而实时进行变化的。在批处理中,一般都是按照一小时或者一天的粒度新生成一个分区。
- 动态表:动态表是随时间实时进行变化的,如图8-4所示。
连续查询
在流处理中,对动态表的查询就是连续查询(Continuous Query),连续查询永远不会终止,连续查询的结果也是一个动态表。只要动态输入表有一条数据更新,连续查询就会通过视图实时更新技术计算并输出结果到动态输出表中。如图8-5所示,是一个SQL API的Flink作业的常见逻辑数据流,其中使用到了动态表技术以及连续查询技术。
图8-5 SQL API作业的动态表以及连续查询
这个Flink作业在执行时会包含以下3个步骤。
- 输入流映射为SQL API中的动态输入表:Flink作业会从数据源存储引擎读入输入流,然后将输入流映射(绑定)为SQL API中的动态输入表。注意,虽然在图8-5中将输入流和动态输入表分为了两个部分,但实际上两者之间是互相映射的关系。
- 执行连续查询:在动态输入表上按照SQL的查询逻辑执行连续查询,然后产出动态输出表。注意,连续查询执行的过程中通常是有状态的。
- SQL API中的动态输出表映射为输出流:将动态输出表映射为输出流,然后将输出流产出到数据汇存储引擎当中。
注意:虽然流处理和批处理采用的SQL查询技术方案不同,但是在Flink中,对于同一个SQL查询来说,使用流处理在输入表上执行连续查询产出的结果和使用批处理在输入表上执行查询产出的结果总是相同的。因此我们说Flink的Table\SQL API实现了流批一体。
案例
接下来,我们通过两个案例来说明动态表和连续查询的执行机制以及结果。
- 案例1:电商场景中统计每种商品的历史累计销售额。
- 案例2:电商场景中统计每种商品每1min的累计销售额。
案例1:统计每种商品的历史累计销售额
输入数据为商品销售订单,包含pId、income字段,分别代表商品ID、销售额,输出数据包含的字段为pId、all字段,分别代表商品ID和历史累计销售额。
代码实现
该案例通过SQL API实现起来很简单,最终实现如代码清单8-18所示,我们使用GROUP BY子句按照pId对商品进行分类,然后在每一种商品上面使用SUM聚合函数累加商品的销售额就可以得到每一种商品的累计销售额。
代码清单8-18 使用SQL API统计每种商品的历史累计销售额
// 创建数据源表
CREATE TABLE source_table (
pId BIGINT,
income BIGINT
) WITH (
...
);
// 创建数据汇表
CREATE TABLE sink_table (
pId BIGINT,
all BIGINT
) WITH (
...
);
// 执行查询
INSERT INTO sink_table
SELECT
pId
, SUM(income) as all
FROM source_table
GROUP BY pId;
输入流映射为SQL API中的动态输入表
如图8-6所示,首先我们将输入数据流映射为动态输入表,每当输入数据流中增加一条数据,动态输入表也会增加一行数据。
图8-6 将输入数据流映射为动态输入表
执行连续查询
如图8-7所示,在动态输入表的基础之上执行连续查询,动态输入表中的数据是一条一条到来的,因此连续查询也会去一条一条的处理输入数据。
图8-7 执行连续查询
连续查询技术的核心就是将表的更新映射为更新日志流(changelog stream),因此在图8-7的连续查询逻辑中,首先会将动态输入表source_table映射为更新日志流,由于source_table是商品销售订单表,因此source_table每输入一条新数据都代表一条INSERT消息。同时,由于代码清单8-18的SQL查询逻辑是按照pId(商品ID)进行分组的,所以这代表动态输出表的主键就是pId。接下来我们来看看连续查询是如何消费动态输入表中的数据并产出结果数据到动态输出表的。
- 第一行数据[商品1, 5]插入(INSERT)到source_table表时,连续查询会按照SQL查询逻辑消费这条INSERT消息,计算得到结果[商品1, 5],将结果保存在状态中。注意,由于动态输出表中没有pId为商品1的数据,所以连续查询会将结果[商品1, 5]插入(INSERT)到动态输出表中。
- 第二行数据[商品2, 6]插入(INSERT)到source_table表时,连续查询消费这条INSERT消息,计算得到结果[商品2, 6],将结果保存在状态中并插入(INSERT)到动态输出表中。
- 第三行数据[商品3, 7]插入(INSERT)到source_table表时,连续查询消费这条INSERT消息,计算得到结果[商品3, 7],将结果保存在状态中并插入(INSERT)到动态输出表中。
- 第四行数据[商品1, 8]插入(INSERT)到source_table表时,连续查询消费这条INSERT消息,和保存在状态中的[商品1, 5]的结果累加得到结果[商品1, 13],这时由于动态输出表中已经有了pId为商品1的数据,所以这时就不是将结果[商品1, 13]插入到动态输出表了,而是将结果更新(UPDATE)到动态输出表中。
在上述案例执行连续查询的过程中,动态输出表的更新日志流中不但有INSERT的消息,还有UPDATE的消息,这种连续查询就被Flink称作更新查询(Update Query)。那要怎么来理解更新查询的含义呢?
我们可以按照下面的方式来理解。动态输入表的数据是源源不断的,同一个商品ID的销售订单也是源源不断的,所以SQL查询执行时,每次产出到动态输出表中的商品累计销售额结果都是一个中间结果。当第一条数据到来,这时没有中间结果,所以会将结果插入(INSERT)到动态输出表中,而如果同一个商品ID的下一条商品销售订单数据到来的时候,就会计算得到新的商品累计销售额结果,这时就要用新结果把上一次产出的中间结果(旧结果)给更新(UPDATE),就会将结果更新(UPDATE)到动态输出表中。
统计每种商品每1min的累计销售额
输入数据为商品销售订单,包含的字段为pId、income和time字段,分别代表商品ID、销售额和销售时间戳(毫秒),输出数据包含的字段为pId、all和minutes字段,分别代表商品ID和1min的累计销售额和1min窗口的开始时间戳。
代码实现
统计每种商品每1min的累计销售额这是一个典型的1min大小的事件时间滚动窗口案例,使用SQL API的实现逻辑如代码清单8-19所示。我们使用GROUP BY子句按照pId对商品进行分类,同时,在GROUP BY子句中还包含了TUMBLE(row_time, INTERVAL ‘1’ MINUTES),这代表我们为每一种商品开启了1min事件时间滚动窗口。最后,在每一种商品的每1min的窗口上,我们使用SUM聚合函数来累加商品的销售额就可以得到商品每1min的累计销售额,其中TUMBLE_START(row_time, INTERVAL ‘1’ MINUTES)的返回值为1min窗口的开始时间。
代码清单8-19 使用SQL API统计每种商品每1min的累计销售额
// 创建数据源表
CREATE TABLE source_table (
pId BIGINT,
income BIGINT,
time BIGINT, // 单位为毫秒
// 用于定义数据的事件时间戳
row_time AS TO_TIMESTAMP_LTZ(time, 3),
// 用于指定Watermark分配方式,最大乱序时间为5s
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
...
);
// 创建数据汇表
CREATE TABLE sink_table (
pId BIGINT,
all BIGINT,
minutes STRING
) WITH (
...
);
// 执行查询
INSERT INTO sink_table
SELECT
pId
, sum(income) as all
, TUMBLE_START(row_time, INTERVAL '1' MINUTES) as minutes
FROM source_table
GROUP BY
pId
, TUMBLE(row_time, INTERVAL '1' MINUTES)
输入流映射为SQL API中的动态输入表
如图8-8所示,我们将输入数据流映射为动态输入表,每当输入数据流中增加一条数据,动态数据表也会增加一行数据。注意,无论是DataStream API、Table API还是SQL API,在使用事件时间窗口时,都要求数据事件时间戳的单位为毫秒,不过为了方便理解,笔者将图8-8中的时间格式化为了小时:分钟:秒(HH:mm:ss)。
图8-8 将输入数据流映射为动态输入表
执行连续查询
如图8-9所示,在动态输入表的基础之上执行连续查询,动态输入表中的数据是一条一条到来的,因此连续查询也会一条一条的处理输入数据。不过本节的案例是窗口查询,因此只有当SubTask本地的事件时钟到达窗口最大时间时,才会触发计算输出结果。这和上一节提到的更新查询中每来一条数据就处理一条数据并输出结果的机制是不同的。
图8-9 执行连续查询
在图8-9的连续查询逻辑中,同样会将动态输入表source_table映射为更新日志流,由于source_table是商品销售订单表,所以source_table每输入一条新数据都代表一条INSERT消息。同时,由于代码清单8-19的SQL查询逻辑是按照pId(商品ID)分组,而且在分组数据流上实现了1min的滚动窗口,因此这代表动态输出表的主键有两个,分别是pId和1分钟的窗口。接下来我们来看看连续查询是如何消费动态输入表并产出结果到动态输出表的。
上述案例的滚动窗口的步长为1min,那么我们就按照窗口大小对动态输入表的数据进行划分,事件时间戳在[09:01:00, 09:02:00)、[09:02:00, 09:03:00)、[09:06:00, 09:07:00)以及[09:09:00, 09:10:00)之间的数据分别有1条、2条、3条和1条。
- 当事件时间戳在[09:01:00, 09:02:00)之间的数据插入(INSERT)到动态输入表时,连续查询就按照SQL查询逻辑消费这些(INSERT)消息,当Watermark达到09:02:00时,窗口触发计算得到结果[商品1, 5, 09:01:00]。接下来,由于动态输出表中没有pId为商品1,且窗口为[09:01:00, 09:02:00)的数据,所以连续查询会插入(INSERT)结果到动态输出表中。
- 当事件时间戳在[09:02:00, 09:03:00)之间的数据插入(INSERT)到动态输入表时,连续查询消费这些(INSERT)消息,当Watermark达到09:03:00时,窗口计算得到结果[商品2, 13, 09:02:00],插入(INSERT)到动态输出表中。
- 当事件时间戳在[09:06:00, 09:07:00)之间的数据插入(INSERT)到动态输入表时,连续查询消费这些(INSERT)消息,当Watermark达到09:07:00时,窗口计算得到结果[商品3, 8, 09:06:00]、[商品1, 18, 09:06:00],插入(INSERT)到动态输出表中。
- 当事件时间戳在[09:09:00, 09:10:00)之间的数据插入(INSERT)到动态输入表时,连续查询消费这些(INSERT)消息,当Watermark达到09:10:00时,窗口计算得到结果[商品2, 9, 09:09:00],插入(INSERT)到动态输出表中。
在上述这个案例执行连续查询的过程中,动态输出表的更新日志流中只有INSERT消息,这种连续查询被Flink称作追加查询(Append Query)。有读者可能会疑惑为什么这个场景中的动态输出表不会发生更新呢?
在这个案例中,虽然动态输入表的数据是源源不断的,但是这个SQL查询的计算逻辑是事件时间滚动窗口。我们知道时间是不会倒流的,当一个窗口触发计算结束之后,之后触发的所有窗口的时间都只会比当前窗口大,所以当前窗口计算结果一旦产出,就不会再被更新了。举例来说,当时间为[09:09:00, 09:10:00)的窗口触发计算,输出结果之后,以后就再也不会有时间为[09:09:00, 09:10:00)的窗口触发计算了。因此这个SQL查询是一个追加查询。
动态表映射为数据流
在上文两个案例中,我们提到连续查询是通过更新日志流来不断地维护动态表的。如果SQL查询是一个更新查询,那么这个SQL查询写入的动态输出表有可能是一个只有一行数据,而这一行数据在不断进行更新的表。如果这个SQL查询是一个追加查询,那么这个SQL查询写入的动态输出表就只会插入(INSERT)数据,数据量不断增大,但是不会发生修改。
但是,如果想要将动态输出表的结果再写出到数据汇存储引擎,就会碰到一个难题,那就是我们如何将动态输出表的INSERT、UPDATE以及DELETE消息进行编码才能保证输出到数据汇存储引擎中的数据是正确的呢?
针对这个问题,Flink的Table\SQL API提供了以下3种编码方式来将动态表编码为数据流。
- 将动态表编码为Append-only流。
- 将动态表编码为Retract流。
- 将动态表编码为Upsert流。
接下来,我们逐一分析这3种编码方式。
将动态表编码为Append-only流
如果一个动态表只通过插入消息来维护,那么这个动态输出表就可以被转化为一个只有INSERT消息的数据流,只有INSERT消息的数据流被称为追加流(Append-only stream)。上文案例2的动态输出表就可以采用这种编码方式将动态输出表中的数据转换为数据流并输出。
将动态表编码为Retract流
Retract流的定义
Retract流包含两种类型的消息:新增消息(Add Message)和回撤消息(Retract Message)。在动态表被转化为Retract流时,动态表的INSERT操作会被编码为新增消息,DELETE操作会被编码为回撤消息,UPDATE操作会被编码为一条回撤消息以及一条新增消息。那么怎么来理解新增消息和回撤消息所代表的含义呢?
- 新增消息:新增消息代表将当前最新的结果发送到数据流中。
- 回撤消息:将发送到数据流中的旧的结果给撤销。
动态表编码为Retract流
接下来,我们来分析Flink是如何实现将动态表的INSERT、UPDATE和DELETE操作编码为新增消息和回撤消息的。
- INSERT操作被编码为新增消息:INSERT操作代表新增了一条数据,那么就理所当然应该被编码为新增消息。
- UPDATE操作被编码为一条回撤消息以及一条新增消息:UPDATE操作会将某个主键下的旧结果给更新为新结果。而要实现这个操作,一种简单的方法就是直接将旧结果给删除,然后写入当前最新的结果,因此UPDATE操作就可以被编码为先发送一条回撤消息将旧结果给撤销,然后发送一条新增消息将最新的结果发送下去。注意,回撤消息一定在新增消息之前发送,否则会导致错误的结果。
- DELETE操作被编码为回撤消息:DELETE操作代表删除了一条数据,那么就编码为回撤消息。
案例
如图8-10所示,我们以上文中案例1为例,来看看动态输出表编码为Retract流的过程。
图8-10 将动态输出表编码为Retract流
如图8-10所示,Retract流中的消息有+和-两种前缀。如果前缀为+,则代表这条数据为新增消息,如果前缀为-,则代表这条数据为回撤消息。这两种消息最终都会以数据流的形式写入到数据汇存储引擎中。
我们来分析一下图8-10中SQL查询的执行过程。
- 第一行数据[商品1, 5]插入(INSERT)到source_table表时,连续查询按照SQL查询逻辑消费这条(INSERT)消息,计算得到结果[商品1, 5],并插入(INSERT)结果到动态输出表中,这时就会将这条INSERT操作编码为新增消息,也就是+[商品1, 5]。
- 第二行数据[商品2, 6]的执行逻辑同上。
- 第三行数据[商品1, 7]插入(INSERT)到source_table表时,连续查询消费这条(INSERT)消息,和状态中保存的[商品1, 5]累加得到结果[商品1, 13],这时由于动态输出表中已经有了pId为商品1的数据,所以这时就不是将结果[商品1, 13]插入到动态输出表了,而是将结果更新(UPDATE)到动态输出表中,这时就会将这条UPDATE操作编码为一条回撤消息和一条新增消息,即-[商品1, 5]和+[商品1, 12]。
- 第四行和第五行数据的执行流程类似,不再赘述。
注意,如果下游还有作业去消费Retract流,要求能够正确处理新增和回撤两种消息,防止数据计算重复或者错误。
将动态表编码为Upsert流
Upsert流的定义
Upsert流包含两种类型的消息:插入或更新消息(Upsert Message)和删除消息(Delete Message)。在动态表被转化为Upsert流时,动态表的INSERT和UPDATE操作会被编码为插入或更新消息,DELETE操作会被编码为删除消息。注意,如果一个动态表要被转化为Upsert流,那么要求这个动态表要有主键。
怎么来理解插入或更新消息和删除消息所代表的含义呢?
- 插入或更新消息:插入或更新消息其实和数据库中的UPSERT子句的能力一致,它包含了插入(INSERT)和更新(UPDATE)两个功能,数据库中的UPSERT子句在执行时,如果当前主键下没有数据,那么就执行INSERT操作,如果当前主键下已经有一条数据,那么就执行UPDATE操作。而如果在流处理的场景中去理解插入或更新消息的话,其实就是将当前主键下的最新的结果发送到数据流中。
- 删除消息:将发送到数据流中的旧的结果给删除。
动态表编码为Upsert流
接下来,我们分析Flink是如何实现将动态表的INSERT、UPDATE和DELETE操作编码为插入或更新消息和删除消息的。
- INSERT操作被编码为插入或更新消息:INSERT操作代表新增了一条数据,那么自然会被编码为插入或更新消息。
- UPDATE操作被编码为插入或更新消息:UPDATE操作是将某个主键下的旧结果给更新为新结果的操作,显而易见这个操作就会被编码为插入或更新消息。这时就能体现出Upsert流和Retract流的不同之处了,Retract流将UPDATE操作编码为了一条回撤消息和一条新增消息,那么对于下游来说,收到回撤消息,就能将保存的这条数据删除,收到新增消息时,就能将这条新数据保存下来。而Upsert流只将UPDATE操作编码为一条插入或更新消息,因此对于下游来说,接收到这条插入或更新消息时,必须得知道主键才能去找到旧的数据并更新为新的数据,因此这就是一个动态表要被编码为Upsert流时必须包含主键的原因。
- DELETE操作被编码为删除消息:DELETE操作代表删除了一条数据,那么就编码为删除消息。
案例
如图8-11所示,我们依然以上文中案例1为例,看看动态输出表编码为Upsert流的过程。
图8-11 将动态输出表转化为Upsert流
如图8-11所示,Upsert流中的消息有和-两种前缀,如果前缀为,则代表这条数据为插入或更新消息,如果前缀为-,则代表这条数据为删除消息,这两种数据最终都会以数据流被写入到数据汇存储引擎中。
接下来我们来分析一下图8-11中SQL查询的执行过程。
- 第一行数据[商品1, 5]插入到source_table表时,连续查询按照SQL查询逻辑消费这条消息,计算得到结果[商品1, 5],并插入结果到动态输出表中,这时就会将这条INSERT操作编码为插入或更新消息,也就是*[商品1, 5]。
- 第二行数据[商品2, 6]的执行逻辑同上。
- 第三行数据[商品1, 7]插入到source_table表时,连续查询消费这条消息,和保存在状态中的[商品1, 5]的结果累加得到结果[商品1, 13],这时由于动态输出表(主键为pId)中已经有了pId为商品1的数据,所以这时就不是将结果[商品1, 13]插入到动态输出表了,而是将结果更新到动态输出表中,这时就会将这条UPDATE操作编码为插入或更新消息,即*[商品1, 12]。
- 第四行和第五行数据的执行流程类似,不再赘述。
注意,如果下游还有一个作业或者算子去消费Upsert流的话,消费算子需要知道这条数据流的唯一键,以便正确地根据唯一键去处理插入或更新消息以及删除消息。
值得一提的是,如果一个SQL查询是更新查询,那么动态输出表既可以编码为Retract流也可以编码为Upsert流,两者的区别在于Upsert流会将UPDATE操作编码为一条消息,因此编码为Upsert流的执行效率会更高。在上述案例中,编码为Retract流总共产生了7条消息,而编码为Upsert流总共只有5条消息。