【大数据】Flink SQL 语法篇(四):Group 聚合、Over 聚合

Flink SQL 语法篇(四):Group 聚合、Over 聚合

  • 1.Group 聚合
    • 1.1 基础概念
    • 1.2 窗口聚合和 Group 聚合
    • 1.3 SQL 语义
    • 1.4 Group 聚合支持 Grouping sets、Rollup、Cube
  • 2.Over 聚合
    • 2.1 时间区间聚合
    • 2.2 行数聚合

1.Group 聚合

1.1 基础概念

Group 聚合定义(支持 Batch / Streaming 任务):Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照数据的类别进行分组,比如年龄、性别,是横向的;而窗口聚合是在时间粒度上对数据进行分组,是纵向的。如下图所示,就展示出了其区别。其中 按颜色分 key(横向)就是 Group 聚合按窗口划分(纵向)就是 窗口聚合

在这里插入图片描述

1.2 窗口聚合和 Group 聚合

应用场景:一般用于对数据进行分组,然后后续使用聚合函数进行 countsum 等聚合操作。

那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?

首先来举一个例子看看怎么将 窗口聚合 转换为 Group 聚合。假如一个窗口聚合是按照 1 1 1 分钟的粒度进行聚合,如下 滚动窗口 SQL:

-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.dim.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '100000','fields.price.min' = '1','fields.price.max' = '100000'
)-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH ('connector' = 'print'
)-- 数据处理逻辑
insert into sink_table
select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000  as window_start
from source_table
group bydim,-- 按照 Flink SQL tumble 窗口写法划分窗口tumble(row_time, interval '1' minute)

转换为 Group 聚合 的写法如下:

-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.dim.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '100000','fields.price.min' = '1','fields.price.max' = '100000'
);-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH ('connector' = 'print'
);-- 数据处理逻辑
insert into sink_table
select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group bydim,-- 将秒级别时间戳 / 60 转化为 1mincast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

确实没错,上面这个转换是一点问题都没有的。

但是窗口聚合和 Group by 聚合的差异在于:

  • 本质区别窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出。
  • 运行层面:窗口聚合是和 时间 绑定的,窗口聚合其中窗口的计算结果触发都是由 时间(Watermark)推动的。Group by 聚合完全由 数据 推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。

1.3 SQL 语义

SQL 语义这里也拿离线和实时做对比,Order 为 Kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子。

  • 数据源算子From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中。
  • Group 聚合算子group by key + sum / count / max / min):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sum / count / max / min 结果。如果有结果 oldResult,拿出来和当前的数据进行 sum / count / max / min 计算出这个 key 的新结果 newResult,并将新结果 [key, newResult] 更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息 -[key, oldResult],然后再将新结果发往下游 +[key, newResult];如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 sum / max / min 结果 newResult,并将新结果 [key, newResult] 更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送 +[key, newResult]
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中这个实时任务也是 24 24 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。

1.4 Group 聚合支持 Grouping sets、Rollup、Cube

Group 聚合也支持 Grouping setsRollupCube。举一个 Grouping sets 的案例:

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES('supplier1', 'product1', 4),('supplier1', 'product2', 3),('supplier2', 'product3', 3),('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (( supplier_id, product_id, rating ),( supplier_id, product_id         ),( supplier_id,             rating ),( supplier_id                     ),(              product_id, rating ),(              product_id         ),(                          rating ),(                                 )
)

2.Over 聚合

Over 聚合定义(支持 Batch / Streaming):可以理解为是一种特殊的滑动窗口聚合函数。

那这里我们拿 Over 聚合窗口聚合 做一个对比,其之间的最大不同之处在于:

  • 窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到。
  • Over 聚合:能够保留原始字段。

注意:其实在生产环境中,Over 聚合的使用场景还是比较少的。在 Hive 中也有相同的聚合,但是小伙伴萌可以想想你在离线数仓经常使用嘛?

  • 应用场景:计算最近一段滑动窗口的聚合结果数据。
  • 实际案例:查询每个产品最近一小时订单的金额总和。
SELECT order_id, order_time, amount,SUM(amount) OVER (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM Orders
  • Over 聚合的语法总结如下:
SELECTagg_func(agg_col) OVER ([PARTITION BY col1[, col2, ...]]ORDER BY time_colrange_definition),...
FROM ...
  • ORDER BY:必须是时间戳列(事件时间、处理时间)。
  • PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合。
  • range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为 按照行数聚合,第二种为 按照时间区间聚合。如下案例所示。

2.1 时间区间聚合

按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例 1 1 1 小时的区间,最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。

CREATE TABLE source_table (order_id BIGINT,product BIGINT,amount BIGINT,order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.order_id.min' = '1','fields.order_id.max' = '2','fields.amount.min' = '1','fields.amount.max' = '10','fields.product.min' = '1','fields.product.max' = '2'
);CREATE TABLE sink_table (product BIGINT,order_time TIMESTAMP(3),amount BIGINT,one_hour_prod_amount_sum BIGINT
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECT product, order_time, amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是一个 product 的最近 1 小时的数据RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM source_table

2.2 行数聚合

按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近 5 5 5 行数据的 amount 之和。

CREATE TABLE source_table (order_id BIGINT,product BIGINT,amount BIGINT,order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.order_id.min' = '1','fields.order_id.max' = '2','fields.amount.min' = '1','fields.amount.max' = '2','fields.product.min' = '1','fields.product.max' = '2'
);CREATE TABLE sink_table (product BIGINT,order_time TIMESTAMP(3),amount BIGINT,one_hour_prod_amount_sum BIGINT
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECT product, order_time, amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是一个 product 的最近 5 行数据ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM source_table

预跑结果如下:

+I[2, 2021-12-24T22:18:19.147, 1, 9]
+I[1, 2021-12-24T22:18:20.147, 2, 11]
+I[1, 2021-12-24T22:18:21.147, 2, 12]
+I[1, 2021-12-24T22:18:22.147, 2, 12]
+I[1, 2021-12-24T22:18:23.148, 2, 12]
+I[1, 2021-12-24T22:18:24.147, 1, 11]
+I[1, 2021-12-24T22:18:25.146, 1, 10]
+I[1, 2021-12-24T22:18:26.147, 1, 9]
+I[2, 2021-12-24T22:18:27.145, 2, 11]
+I[2, 2021-12-24T22:18:28.148, 1, 10]
+I[2, 2021-12-24T22:18:29.145, 2, 10]

当然,如果你在一个 SELECT 中有多个聚合窗口的聚合方式,Flink SQL 支持了一种简化写法,如下案例:

SELECT order_id, order_time, amount,SUM(amount) OVER w AS sum_amount,AVG(amount) OVER w AS avg_amount
FROM Orders
-- 使用下面子句,定义 Over Window
WINDOW w AS (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/703393.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DevOps 周期的 6 个 C

中型到大型软件开发项目涉及许多人员、多个团队、资源、工具和开发阶段。它们都需要以某种方式进行管理和简化,不仅可以获得所需的产品,而且还要确保将来在不断变化的环境下易于管理和维护。组织通常遵循许多项目管理模型和技术。DevOps 是其中之一&…

家用超声波清洗机哪个好?四款高评分超声波清洗机分享

超声波清洗机可以说是眼镜党家中必备的一款超声波清洗机,毕竟它能高效的帮我们解决清洗眼镜的烦恼,也可以帮我们清洗家中其他的一些物品。很多朋友因为各种原因没有时间清洗眼镜以及家中的小物件物品,长时间下来一次物品或者是眼镜上就会堆积…

卷积神经网络 CNN

目录 卷积网络与传统网络的区别 参数共享 卷积神经网络整体架构 卷积操作的作用 卷积核的定义 卷积特征值计算方法 卷积层涉及的参数 边缘填充 ​编辑 卷积结果计算 池化层 整体网格架构 VGG网络架构 残差网络Resnet 卷积网络与传统网络的区别 卷积神经网络&#x…

50.仿简道云公式函数实战-文本函数-ISEMPTY

1. ISEMPTY函数 判断值是否为空字符串、空对象或者空数组。 支持使用 ISEMPTY 函数的字段有:单行文本、多行文本、数字、日期时间、单选按钮组、复选框组、下拉框、下拉复选框、地址、定位、成员字段、部门字段、微信昵称、微信 OpenID、扩展字段。 2. 函数用法 …

API保障——电子商务安全性与稳定性设计

在这次深入探讨中,我们将深入了解API设计,从基础知识开始,逐步进阶到定义出色API的最佳实践。 作为开发者,你可能对许多这些概念很熟悉,但我将提供详细的解释,以加深你的理解。 ​ API设计:电…

Android Jni的介绍和简单Demo实现

Android Jni的介绍和简单Demo实现 文章目录 Android Jni的介绍和简单Demo实现一、JNI的简单介绍JNINDKJni的开发背景:**JNI在 Android 开发里的主要应用场景:** 二、JNI的简单Demo1、Demo主要界面和效果展示2、CMake编译加载文件add_library 指令的加载库…

力扣--双指针167.二数之和Ⅱ

这题一个穷举方法是比较好想到的&#xff1a; class Solution { public:vector<int> twoSum(vector<int>& numbers, int target) {int i,j;int nnumbers.size();vector<int>result(2,0);for(i0;i<n-1;i){for(ji1;j<n;j){if(numbers[i]numbers[j…

分库分表如何管理不同实例中几万张分片表?

大家好&#xff0c;我是小富&#xff5e; ShardingSphere实现分库分表&#xff0c;如何管理分布在不同数据库实例中的成千上万张分片表&#xff1f; 上边的问题是之前有个小伙伴看了我的分库分表的文章&#xff0c;私下咨询我的&#xff0c;看到他的提问我第一感觉就是这老铁…

《高质量的C/C++编程规范》学习

目录 一、编程规范基础知识 1、头文件 2、程序的板式风格 3、命名规则 二、表达式和基本语句 1、运算符的优先级 2、复合表达式 3、if语句 4、循环语句的效率 5、for循环语句 6、switch语句 三、常量 1、#define和const比较 2、常量定义规则 四、函数设计 1、参…

【k8s资源调度-Deployment】

1、标签和选择器 1.1 标签Label 配置文件&#xff1a;在各类资源的sepc.metadata.label 中进行配置通过kubectl 命令行创建修改标签&#xff0c;语法如下 创建临时label&#xff1a;kubectl label po <资源名称> apphello -n <命令空间&#xff08;可不加&#xff0…

2024生物科学、医学技术与化学国际会议(ICBSMTC2024)

2024生物科学、医学技术与化学国际会议(ICBSMTC2024) 会议简介 ICBSMTC2024是一个聚焦于生物科学、医学技术与化学领域的学术交流活动&#xff0c;会议将在中国桂林举行&#xff0c;会议旨在促进相关领域的学术交流与发展。会议将汇集来自世界各地的顶级学者和专家&#xf…

人工智能_CPU微调ChatGLM大模型_使用P-Tuning v2进行大模型微调_007_微调_002---人工智能工作笔记0102

这里我们先试着训练一下,我们用官方提供的训练数据进行训练. 也没有说使用CPU可以进行微调,但是我们先执行一下试试: https://www.heywhale.com/mw/project/6436d82948f7da1fee2be59e 可以看到说INT4量化级别最低需要7GB显存可以启动微调,但是 并没有说CPU可以进行微调.我们…

【b站咸虾米】chapter5_uniapp-API_新课uniapp零基础入门到项目打包(微信小程序/H5/vue/安卓apk)全掌握

课程地址&#xff1a;【新课uniapp零基础入门到项目打包&#xff08;微信小程序/H5/vue/安卓apk&#xff09;全掌握】 https://www.bilibili.com/video/BV1mT411K7nW/?p12&share_sourcecopy_web&vd_sourceb1cb921b73fe3808550eaf2224d1c155 目录 5 API 5.1 页面和路…

Linux 文件操作

目录 C语言下的文件操作 Linux下的文件操作 文件描述符的前因后果 文件描述符的概念 文件描述符的分配规则 理解C语言的FILE结构体 Linux重定向 文件缓冲区 文件系统 文件系统的概念 ext2文件系统 对ext2的补充 虚拟文件系统的概念 软硬链接 C语言下的文件操作 …

备战蓝桥杯---基础算法刷题2

题目有一点水&#xff0c;不过还是有几个好题的&#xff0c;我在这分享一下&#xff1a; 很容易想到先往最高处跳再往最低处跳&#xff0c;依次类推&#xff0c;那怎么保证其正确性呢&#xff1f; 证法1. 在此&#xff0c;我们从0开始&#xff0c;假设可以跳到a,b,c(a<b<…

UE学习笔记-- bUseUnity 加速编译 及 踩坑记录

前言 在写 UE 项目时&#xff0c;使用 VS 2022 进行编译&#xff0c;发现编译不过。 原因是少了头文件&#xff0c;导致某些了类型缺失。 问题发现 但是很奇怪的是&#xff0c;以前编译没问题&#xff0c;在原本应该出问题的 cpp 文件里面加了一行注释之后&#xff0c;编译就…

嵌入式学习day25 Linux

进程基本概念: 1.进程: 程序&#xff1a;存放在外存中的一段数据组成的文件 进程&#xff1a;是一个程序动态执行的过程,包括进程的创建、进程的调度、进程的消亡 2.进程相关命令: 1.top 动态查看当前系统中的所有进程信息&#xff08;根据CPU占用率排序&a…

查看仓库版本记录

打开命令行窗口 输入git log即可。 若发现分支不对&#xff0c;方法如下 查看项目目录&#xff0c;命令行输入dir可以查看 多个moudel&#xff0c;进入到需要查版本记录的moudel下 命令行输入cd .\文件名如wowo-win-server\ 切换到wowo-win-server文件夹下后&#xff0c;再输入…

C语言内存管理-C进程内存布局

C进程内存布局 任何一个程序&#xff0c;正常运行都需要内存资源&#xff0c;用来存放诸如变量、常量、函数代码等等。这些不同的内容&#xff0c;所存储的内存区域是不同的&#xff0c;且不同的区域有不同的特性。因此我们需要研究C语言进程的内存布局&#xff0c;逐个了解不…

sonar-java 手写一个规则-单元测试分析

前言 最近做项目&#xff0c;定制sonar规则&#xff0c;提高Java代码质量&#xff0c;在编写的sonar规则&#xff0c;做验证时&#xff0c;使用单元测试有一些简单的心得感悟&#xff0c;分享出来。 自定义规则模式 sonar的自定义规则很简单&#xff0c;一般而言有2种模式可…