【大数据】Flink SQL 语法篇(四):Group 聚合

Flink SQL 语法篇(四):Group 聚合

  • 1.基础概念
  • 2.窗口聚合和 Group 聚合
  • 3.SQL 语义
  • 4.Group 聚合支持 Grouping sets、Rollup、Cube

1.基础概念

Group 聚合定义(支持 Batch / Streaming 任务):Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照数据的类别进行分组,比如年龄、性别,是横向的;而窗口聚合是在时间粒度上对数据进行分组,是纵向的。如下图所示,就展示出了其区别。其中 按颜色分 key(横向)就是 Group 聚合按窗口划分(纵向)就是 窗口聚合

在这里插入图片描述

2.窗口聚合和 Group 聚合

应用场景:一般用于对数据进行分组,然后后续使用聚合函数进行 countsum 等聚合操作。

那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?

首先来举一个例子看看怎么将 窗口聚合 转换为 Group 聚合。假如一个窗口聚合是按照 1 1 1 分钟的粒度进行聚合,如下 滚动窗口 SQL:

-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.dim.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '100000','fields.price.min' = '1','fields.price.max' = '100000'
)-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH ('connector' = 'print'
)-- 数据处理逻辑
insert into sink_table
select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000  as window_start
from source_table
group bydim,-- 按照 Flink SQL tumble 窗口写法划分窗口tumble(row_time, interval '1' minute)

转换为 Group 聚合 的写法如下:

-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.dim.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '100000','fields.price.min' = '1','fields.price.max' = '100000'
);-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH ('connector' = 'print'
);-- 数据处理逻辑
insert into sink_table
select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group bydim,-- 将秒级别时间戳 / 60 转化为 1mincast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

确实没错,上面这个转换是一点问题都没有的。

但是窗口聚合和 Group by 聚合的差异在于:

  • 本质区别窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出。
  • 运行层面:窗口聚合是和 时间 绑定的,窗口聚合其中窗口的计算结果触发都是由 时间(Watermark)推动的。Group by 聚合完全由 数据 推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。

3.SQL 语义

SQL 语义这里也拿离线和实时做对比,Order 为 Kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子。

  • 数据源算子From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中。
  • Group 聚合算子group by key + sum / count / max / min):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sum / count / max / min 结果。如果有结果 oldResult,拿出来和当前的数据进行 sum / count / max / min 计算出这个 key 的新结果 newResult,并将新结果 [key, newResult] 更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息 -[key, oldResult],然后再将新结果发往下游 +[key, newResult];如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 sum / max / min 结果 newResult,并将新结果 [key, newResult] 更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送 +[key, newResult]
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中这个实时任务也是 24 24 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。

4.Group 聚合支持 Grouping sets、Rollup、Cube

Group 聚合也支持 Grouping setsRollupCube。举一个 Grouping sets 的案例:

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES('supplier1', 'product1', 4),('supplier1', 'product2', 3),('supplier2', 'product3', 3),('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (( supplier_id, product_id, rating ),( supplier_id, product_id         ),( supplier_id,             rating ),( supplier_id                     ),(              product_id, rating ),(              product_id         ),(                          rating ),(                                 )
)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/702819.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RCE (Remote ????? execution) --->CTF

看这个标题就知道今天的内容不简单!!!! 那么就来讲一下我们的RCE吧 目录 ​编辑 1. &? |? ||? &&? 2.PHP命令执行函数&& ||"" 1."" &…

6、进程、服务管理

一、进程管理 1.概述 进程是正在执行的程序或命令,每一个进程都独立运行,都有自己的地址空间,并占用一定的系统资源以后开发会遇见: 端口占用出现程序假死、卡死 2.查看系统运行进程 语法 ps 参数ps –a:显示当前终端下的所有…

智能SQL生成:后端技术与LLM的完美结合

文章目录 引言一、什么是大模型二、为什么选择LLM三、开发技术说明四、系统架构说明五、编码实战1. Maven2. 讯飞大模型配置类3. LLM相关的封装4. 编写LLM的service5. 编写controller6. 运行测试 六、总结 引言 本篇文章主要是关于实现一个类似Chat2DB的根据自然语言生成SQL的…

开源工具和框架

目录 开源工具和框架 一、 开源工具和框架 二、开源工具和框架在现代软件开发中的角色 1、基础设施建设: 2、开发效率提升: 3、代码质量保障: 4、技术创新: 三、广泛使用的开源项目分析 3.1、Linux 3.2、Git 3.3、Docke…

Unity接入讯飞的大数据模型

原:基于C#WPF编写的调用讯飞星火大模型工具_c#xf$xccx-CSDN博客 记录一下以防以后用到。 using Newtonsoft.Json; using System.Collections.Generic;public class JsonResponse {[JsonProperty("header")]public ResponseHeader Header { get; set; }[…

上一篇文章补充:已经存在的小文件合并

对于HDFS上已经存在的大量小文件问题,有多种策略可以进行处理和优化: 1. **合并小文件**: - **使用Spark作业合并**:通过编写Spark程序读取小文件并调用repartition()或coalesce()函数重新分区数据,然后将合并后的…

【Java程序设计】【C00313】基于Springboot的物业管理系统(有论文)

基于Springboot的物业管理系统(有论文) 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的物业管理系统,本系统有管理员、物业、业主以及维修员四种角色权限; 管理员进入主页面,主要功能包…

opengl播放3d pose 原地舞蹈脚来回飘动

目录 opengl播放3d pose 原地舞蹈脚来回飘动 设置相机视角 opengl播放3d pose 原地舞蹈脚来回飘动 opengl播放3d pose 原地舞蹈时,脚来回飘动,正常状态是脚应该不动的。 经过反复分析实验验证,找到原因是,渲染计算3d坐标时,都要减去一个offset,这个offset是髋关节的坐…

【LeetCode-474】一和零(动态规划)

目录 LeetCode474.一和零 题目描述 思路1:动态规划 代码实现 题目链接 题目描述 给你一个二进制字符串数组 strs 和两个整数 m 和 n 。 请你找出并返回 strs 的最大子集的大小,该子集中 最多 有 m 个 0 和 n 个 1 。 如果 x 的所有元素也是 y 的…

mybatis总结传参三

十、(不推荐)多个参数-按位置传参 参数位置从 0 开始, 引用参数语法 #{ arg 位置 } , 第一个参数是 #{arg0}, 第二个是 #{arg1} 注意: mybatis-3.3 版本和之前的版本使用 #{0},#{1} 方式, 从 myba…

CCAA森林管理体系基础考试大纲

森林管理体系基础考试大纲(第1版) 1.总则 本大纲依据 CCAA《管理体系审核员注册准则》制定,适用于拟向CCAA申请注册森林管理体系审核员实习级别的人员。 2.考试要求 2.1考试科目 申请注册森林管理体系审核员实习级别的人员,需…

Ubuntu中Python3找不到_sqlite3模块

今天跑一个代码,出现了一个找不到sqlite3模块的错误,错误如下: from _sqlite3 import * ModuleNotFoundError: No module named _sqlite3 网上查资料说,因为python3没有自带sqlite3相关方面的支持,要自己先安装然后再重新编译Py…

stream流-> 判定 + 过滤 + 收集

List<HotArticleVo> hotArticleVos hotArticleVoList .stream() .filter(x -> x.getChannelId().equals(wmChannel.getId())).collect(Collectors.toList()); 使用Java 8中的Stream API对一个名为hotArticleVoList的列表进行过滤操作&#xff0c;筛选出符合指定条件…

SQL进阶(三):Join 小技巧:提升数据的处理速度

复杂数据结构处理&#xff1a;Join 小技巧&#xff1a;提升数据的处理速度 本文是在原本sql闯关的基础上总结得来&#xff0c;加入了自己的理解以及疑问解答&#xff08;by GPT4&#xff09; 原活动链接 用到的数据&#xff1a;链接 提取码&#xff1a;l03e 目录 1. 课前小问…

stable-diffusion-webui+sadTalker开启GFPGAN as Face enhancer

接上一篇&#xff1a;在autodl搭建stable-diffusion-webuisadTalker-CSDN博客 要开启sadTalker gfpgan as face enhancer&#xff0c; 需要将 1. stable-diffusion-webui/extensions/SadTalker/gfpgan/weights 目录下的文件拷贝到 :~/autodl-tmp/models/GFPGAN/目录下 2.将G…

Spring Boot Profiles简单介绍

Spring Boot application.properties和application.yml文件的配置 阅读本文之前&#xff0c;请先阅读上面的配置文件介绍。 Spring Boot Profiles是一个用于区分不同环境下配置的强大功能。以下是如何在Spring Boot应用程序中使用Profiles的详细步骤和代码示例。 1. 创…

【openGL教程08】基于C++的着色器(02)

LearnOpenGL - Shaders 一、说明 着色器是openGL渲染的重要内容&#xff0c;客户如果想自我实现渲染灵活性&#xff0c;可以用着色器进行编程&#xff0c;这种程序小脚本被传送到GPU的显卡内部&#xff0c;起到动态灵活的着色作用。 二、着色器简述 正如“Hello Triangle”一章…

鸿蒙开发-UI-图形-绘制几何图形

鸿蒙开发-UI-组件 鸿蒙开发-UI-组件2 鸿蒙开发-UI-组件3 鸿蒙开发-UI-气泡/菜单 鸿蒙开发-UI-页面路由 鸿蒙开发-UI-组件导航-Navigation 鸿蒙开发-UI-组件导航-Tabs 鸿蒙开发-UI-图形-图片 文章目录 前言 一、绘制组件 二、形状视口 三、自定义样式 四、使用场景 总结 前…

贪心算法学习

贪心算法&#xff08;Greedy Algorithm&#xff09;是一种在每一步选择中都采取在当前状态下最好或最优&#xff08;即最有利&#xff09;的选择&#xff0c;从而希望导致结果是全局最好或最优的算法。贪心算法在有最优子结构的问题中尤为有效。然而&#xff0c;要注意的是贪心…

python语言常见面试题:描述Python中的文件对象及其方法。

在Python中&#xff0c;文件对象是通过open()函数创建的&#xff0c;它用于读写文件。文件对象提供了一系列方法来操作文件&#xff0c;如读取、写入、关闭等。 文件对象的方法 open(filename, mode): 打开一个文件并返回一个文件对象。filename是文件名&#xff0c;mode是打开…