Flink系列之:分组聚合
- 一、DISTINCT 聚合
- 二、GROUPING SETS
- 三、ROLLUP
- 四、CUBE
- 五、HAVING
- 适用于流、批
像大多数数据系统一样,Apache Flink支持聚合函数;包括内置的和用户定义的。用户自定义函数在使用前必须在目录中注册。
聚合函数把多行输入数据计算为一行结果。例如,有一些聚合函数可以计算一组行的 “COUNT”、“SUM”、“AVG”(平均)、“MAX”(最大)和 “MIN”(最小)。
SELECT COUNT(*) FROM Orders
对于流式查询,重要的是要理解 Flink 运行的是连续查询,永远不会终止。而且它们会根据其输入表的更新来更新其结果表。对于上述查询,每当有新行插入 Orders 表时,Flink 都会实时计算并输出更新后的结果。
Apache Flink 支持标准的 GROUP BY 子句来聚合数据。
SELECT COUNT(*)
FROM Orders
GROUP BY order_id
对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小取决于分组的数量以及聚合函数的数量和类型。例如:MIN/MAX 的状态是重量级的,COUNT 是轻量级的。可以提供一个合适的状态 time-to-live (TTL) 配置来防止状态过大。注意:这可能会影响查询结果的正确性。
Flink 对于分组聚合提供了一系列性能优化的方法。
一、DISTINCT 聚合
DISTINCT 聚合在聚合函数前去掉重复的数据。下面的示例计算 Orders 表中不同 order_ids 的数量,而不是总行数。
SELECT COUNT(DISTINCT order_id) FROM Orders
对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小大多数情况下取决于去重行的数量和分组持续的时间,持续时间较短的 group 窗口不会产生状态过大的问题。可以提供一个合适的状态 time-to-live (TTL) 配置来防止状态过大。注意:这可能会影响查询结果的正确性。
二、GROUPING SETS
Grouping Sets 可以通过一个标准的 GROUP BY 语句来描述更复杂的分组操作。数据按每个指定的 Grouping Sets 分别分组,并像简单的 group by 子句一样为每个组进行聚合。
SELECT supplier_id, rating, COUNT(*) AS total
FROM (VALUES('supplier1', 'product1', 4),('supplier1', 'product2', 3),('supplier2', 'product3', 3),('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ())
这个Flink SQL查询的目标是,基于给定的产品评分数据,计算每个供应商的评分总数。
- 首先,我们定义了一个包含供应商ID、产品ID和评分的VALUES子句,表示我们的原始数据。每个元组代表了一个产品的供应商、产品和评分。
- 然后,我们使用AS关键字给VALUES子句指定别名为Products,并指定了三个列名:supplier_id、product_id和rating。
- 接下来,我们使用GROUP BY子句和GROUPING SETS关键字来分组数据。GROUP BY子句定义了我们想要按照哪些列进行分组。在这个查询中,我们定义了三个分组集合:(supplier_id, rating)、(supplier_id)和()。它们分别表示按照supplier_id和rating分组、只按照supplier_id分组以及不进行任何分组。
- 最后,我们使用COUNT(*)函数来计算每个分组的产品评分总数,并将结果作为"total"列返回。
- 这个查询的结果将为每个供应商和评分组合提供评分总数,以及每个供应商的总评分数和所有供应商的总评分
结果:
+-------------+--------+-------+
| supplier_id | rating | total |
+-------------+--------+-------+
| supplier1 | 4 | 1 |
| supplier1 | (NULL) | 2 |
| (NULL) | (NULL) | 4 |
| supplier1 | 3 | 1 |
| supplier2 | 3 | 1 |
| supplier2 | (NULL) | 2 |
| supplier2 | 4 | 1 |
+-------------+--------+-------+
GROUPING SETS 的每个子列表可以是:空的,多列或表达式,它们的解释方式和直接使用 GROUP BY 子句是一样的。一个空的 Grouping Sets 表示所有行都聚合在一个分组下,即使没有数据,也会输出结果。
对于 Grouping Sets 中的空子列表,结果数据中的分组或表达式列会用NULL代替。例如,上例中的 GROUPING SETS ((supplier_id), ()) 里的 () 就是空子列表,与其对应的结果数据中的 supplier_id 列使用 NULL 填充。
对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小取决于 Grouping Sets 的数量以及聚合函数的类型。可以提供一个合适的状态 time-to-live (TTL)配置来防止状态过大.注意:这可能会影响查询结果的正确性。
三、ROLLUP
ROLLUP 是一种特定通用类型 Grouping Sets 的简写。代表着指定表达式和所有前缀的列表,包括空列表。
例如:下面这个查询和上个例子是等效的。
SELECT supplier_id, rating, COUNT(*)
FROM (VALUES('supplier1', 'product1', 4),('supplier1', 'product2', 3),('supplier2', 'product3', 3),('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY ROLLUP (supplier_id, rating)
这个Flink SQL查询的目标是,基于给定的产品评分数据,计算每个供应商和评分组合的评分总数。
- 首先,我们定义了一个包含供应商ID、产品ID和评分的VALUES子句,表示我们的原始数据。每个元组代表了一个产品的供应商、产品和评分。
- 然后,我们使用AS关键字给VALUES子句指定别名为Products,并指定了三个列名:supplier_id、product_id和rating。
- 接下来,我们使用GROUP BY子句和ROLLUP关键字来进行分组。ROLLUP允许我们构造多个层次的大汇总。在这个查询中,我们使用ROLLUP(supplier_id, rating)来创建了两个层次的分组:一个按供应商ID和评分进行分组的层次,以及一个只按供应商ID进行分组的层次。
- 最后,我们使用COUNT()函数来计算每个分组的产品评分总数,并返回结果中的"supplier_id"、"rating"和"COUNT()"三列。
- 这个查询的结果将为每个供应商和评分组合提供评分总数,以及每个供应商在不同评分水平上的总评分数。同时,结果还包括以评分水平为基础的总评分数和所有供应商的总评分数。
四、CUBE
CUBE 是一种特定通用类型 Grouping Sets 的简写。代表着指定列表以及所有可能的子集和幂集。
例如:下面两个查询是等效的。
SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES('supplier1', 'product1', 4),('supplier1', 'product2', 3),('supplier2', 'product3', 3),('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY CUBE (supplier_id, rating, product_id)SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES('supplier1', 'product1', 4),('supplier1', 'product2', 3),('supplier2', 'product3', 3),('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (( supplier_id, product_id, rating ),( supplier_id, product_id ),( supplier_id, rating ),( supplier_id ),( product_id, rating ),( product_id ),( rating ),( )
)
这个Flink SQL查询的目标是,基于给定的产品评分数据,计算每个供应商、评分和产品ID组合的评分总数。
- 首先,我们定义了一个包含供应商ID、产品ID和评分的VALUES子句,表示我们的原始数据。每个元组代表了一个产品的供应商、产品和评分。
- 然后,我们使用AS关键字给VALUES子句指定别名为Products,并指定了三个列名:supplier_id、product_id和rating。
- 接下来,我们使用GROUP BY子句和CUBE关键字来进行分组。CUBE允许我们构造所有可能的组合。在这个查询中,我们使用CUBE(supplier_id, rating, product_id)来创建了所有可能的组合:按供应商ID、评分和产品ID进行分组的组合、只按供应商ID和评分进行分组的组合、只按供应商ID和产品ID进行分组的组合、只按评分和产品ID进行分组的组合,以及只按供应商ID进行分组的组合,只按评分进行分组的组合,只按产品ID进行分组的组合,以及不进行任何分组的组合。
- 最后,我们使用COUNT()函数来计算每个分组的产品评分总数,并返回结果中的"supplier_id"、“rating”、"product_id"和"COUNT()"四列。
- 这个查询的结果将为每个供应商、评分和产品ID组合提供评分总数,以及不同组合下的总评分数。同时,结果还包括每个供应商、每个评分和每个产品ID的总评分数,以及所有供应商、所有评分和所有产品ID的总评分数。
五、HAVING
HAVING 会删除 group 后不符合条件的行。 HAVING 和 WHERE 的不同点:WHERE 在 GROUP BY 之前过滤单独的数据行。HAVING 过滤 GROUP BY 生成的数据行。 HAVING 条件中的每一列引用必须是明确的 grouping 列,除非它出现在聚合函数中。
SELECT SUM(amount)
FROM Orders
GROUP BY users
HAVING SUM(amount) > 50
即使没有 GROUP BY 子句,HAVING 的存在也会使查询变成一个分组查询。这与查询包含聚合函数但没有 GROUP BY 子句时的情况相同。查询认为所有被选中的行形成一个单一的组,并且 SELECT 列表和 HAVING 子句只能从聚合函数中引用列。如果 HAVING 条件为真,这样的查询将发出一条记录,如果不为真,则发出零条记录。