flink的分组聚合、over聚合、窗口聚合对比

【背景】

flink有几种聚合,使用上是有一些不同,需要加以区分:

分组聚合:group agg

over聚合:over agg

窗口聚合:window agg

省流版:

触发计算时机

结果流类型

状态大小

分组聚合group agg

每当有新行就输出更新的结果

update流

保持中间结果,所以状态可能无限膨胀

over agg

每当有新行就输出更新的结果,类似一个滑动窗口

append流

保持中间结果,所以状态可能无限膨胀

window agg

窗口结束产生一个总的聚合结果

append流

不生成中间结果,自动清除状态

下面是详细对比和具体的例子(主要讨论的是流处理下的情况)。

over聚合:over agg

OVER 聚合通过排序后的范围数据为每行输入计算出聚合值。和 GROUP BY 聚合不同, OVER 聚合不会把结果通过分组减少到一行,它会为每行输入增加一个聚合值,结果是一个append流


 OVER 窗口的语法。

SELECT
  agg_func(agg_col) OVER ([PARTITION BY col1[, col2, ...]]ORDER BY time_col
    range_definition),...
FROM ...

over聚合很少用到,所以本地自己做了一个测试:

测试sql如下:
create table test_window_tab(region String,qa_id String,count_qa_id Bigint) COMMENT ''with('properties.bootstrap.servers' ='','json.fail-on-missing-field' = 'false','connector' = 'kafka','format' = 'json','topic' = 'test_window_tab');create table dwm_qa_score(,qa_id String   ,agent_id String,region String,saas_id String,version_timestamp bigint, ts as to_timestamp(from_unixtime(`version_timestamp`, 'yyyy-MM-dd HH:mm:ss')),`event_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,WATERMARK FOR `ts` AS `ts` - INTERVAL '10' SECOND) COMMENT ''with('properties.bootstrap.servers' ='','json.fail-on-missing-field' = 'false','connector' = 'kafka','format' = 'json','scan.startup.mode' = 'earliest-offset','topic' = 'dwm_qa_score');insert into test_window_tab(region,qa_id,count_qa_id)select region,qa_id,count(1)  over w as count_qa_idfrom dwm_qa_scorewindow w as(partition by region,qa_idorder by tsrows between 2 preceding and current row)

dwm_qa_score这个topic现有数据

{ "qa_id": "123", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

{ "qa_id": "123", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

{ "qa_id": "123", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

{ "qa_id": "123", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

{ "qa_id": "123", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

{ "qa_id": "1234", "agent_id": "497235295815123",

"region": "TH", "version_timestamp": 1709807228

}

数据选择offset=ealiest-offset运行程序得到结果如下

{"region":"TH","qa_id":"123","count_qa_id":1}

{"region":"TH","qa_id":"123","count_qa_id":2}

{"region":"TH","qa_id":"123","count_qa_id":3}

{"region":"TH","qa_id":"123","count_qa_id":3}

{"region":"TH","qa_id":"123","count_qa_id":3}

{"region":"TH","qa_id":"1234","count_qa_id":1}

这里注意:

  1. 数据都会返回一个聚合
  2. 由于我们rows between 2 preceding and current row所以count_qa_id最多3

如果此时dwm_qa_score这个topic插入新数据

{ "qa_id": "1234", "agent_id": "497235295815123",

"region": "TH"

}

或者

{ "qa_id": "1234", "agent_id": "497235295815123",

"region": "TH","version_timestamp": null

}

或者

{ "qa_id": "1234", "agent_id": "497235295815123",

"region": "TH","version_timestamp": 0

}

发现flink作业输出record多了一条

但是在目标kafka:test_window_tab中没有新增结果

原因是我们插入的新数据中没有version_timestamp这一列为空或为0

如果往dwm_qa_score这个topic插入新数据:

{

"qa_id": "1234",

"region": "TH",

"version_timestamp": 1710145110

}

则可以看到对应目标kafka:test_window_tab中会新增结果数据

{"region":"TH","qa_id":"1234","count_qa_id":2}

如果等一分钟后,再次往dwm_qa_score这个topic插入新数据:

{

"qa_id": "1234",

"region": "TH",

"version_timestamp": 1710145110

}

则在目标kafka:test_window_tab中没有新增结果原因应该数据过期丢弃watermark)

你可以在一个 SELECT 子句中定义多个 OVER 窗口聚合。然而,对于流式查询,由于目前的限制,所有聚合的 OVER 窗口必须是相同的

ORDER BY

OVER 窗口需要数据是有序的。因为表没有固定的排序,所以 ORDER BY 子句是强制的。对于流式查询,Flink 目前只支持 OVER 窗口定义在升序(asc) 时间属性 上。其他的排序不支持。

PARTITION BY

OVER 窗口可以定义在一个分区表上。PARTITION BY 子句代表着每行数据只在其所属的数据分区进行聚合。

范围(RANGE)定义

范围(RANGE)定义指定了聚合中包含了多少行数据。范围通过 BETWEEN 子句定义上下边界,其内的所有行都会聚合。Flink 只支持 CURRENT ROW 作为上边界。

有两种方法可以定义范围:ROWS 间隔 和 RANGE 间隔

RANGE 间隔

RANGE 间隔是定义在排序列值上的,在 Flink 里,排序列总是一个时间属性。下面的 RANG 间隔定义了聚合会在比当前行的时间属性小 30 分钟的所有行上进行。

RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW

ROW 间隔

ROWS 间隔基于计数。它定义了聚合操作包含的精确行数。下面的 ROWS 间隔定义了当前行 + 之前的 10 行(也就是11行)都会被聚合。

ROWS BETWEEN 10 PRECEDING AND CURRENT ROW

常见错误

OVER windows' ordering in stream mode must be defined on a time attribute.

 这个报错,是建表的时候需要指定时间语义的字段,WATERMARK 是必须的,而且WATERMARK所用字段必须是order by的时间字段例如下面 order by load_date那么WATERMARK就要load_date生成,即WATERMARK FOR load_date AS load_date - INTERVAL '1' MINUTE

object SqlOverRows02 {def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
    val tEnv = TableEnvironment.create(settings)    tEnv.executeSql("""
        |create table projects(
        |id int,
        |name string,
        |score double,
        |load_date timestamp(3),
        |WATERMARK FOR load_date AS load_date - INTERVAL '1' MINUTE
        |)with(
        |'connector' = 'kafka',
        |'topic' = 'test-topic',
        |'properties.bootstrap.servers' = 'server120:9092',
        |'properties.group.id' = 'testGroup',
        |'scan.startup.mode' = 'latest-offset',
        |'format' = 'csv'
        |)
        |""".stripMargin)
    tEnv.executeSql("""
        |select
        | name,
        | max(score)
        |   over(partition by name
        |     order by load_date
        |     RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW )max_score,
        | min(score)
        |   over(partition by name
        |     order by load_date
        |     RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW )min_score,
        | current_time
        | from
        | projects
        |""".stripMargin).print()}
}

分组聚合:group agg

Apache Flink 支持标准的 GROUP BY 子句来聚合数据。

SELECT COUNT(*) FROM Orders GROUP BY order_id

特点:

1、聚合函数把多行输入数据计算为一行结果。例如,有一些聚合函数可以计算一组行的 “COUNT”、“SUM”、“AVG”、“MAX”和 “MIN”。

2、对于流式查询,重要的是要理解 Flink 运行的是连续查询,永远不会终止,会根据其输入表的更新来更新其结果表。对于上述查询,每当有新行插入 Orders 表时,Flink 都会实时计算并输出更新后的结果。

 3、对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小取决于分组的数量以及聚合函数的数量和类型。例如:MIN/MAX 的状态是重量级的,COUNT 是轻量级的,因为COUNT只需要保存计数值。

因此,可以设置table-exec-state-ttl,但是可能会影响查询结果的正确性,因为状态超时会被丢弃。

注意:

Flink 对于分组聚合提供了一系列性能优化的方法。更多参见:性能优化,包括MiniBatch 聚合、Local-Global 聚合、拆分 distinct 聚合、在 distinct 聚合上使用 FILTER 修饰符 、MiniBatch Regular Joins

窗口聚合:window agg

窗口聚合是通过 GROUP BY 子句定义的,其特征是包含 窗口表值函数 产生的 “window_start” 和 “window_end” 列(必须包含,否则就变成分组聚合等了)。和普通的 GROUP BY 子句一样,窗口聚合对于每个组会计算出一行数据。

SELECT ...
FROM <windowed_table> -- relation applied windowing TVF
GROUP BY window_start, window_end, ...

窗口聚合不产生中间结果,只在窗口结束产生一个总的聚合结果,另外,窗口聚合会清除不需要的中间状态(watermark超过窗口end+allowlateness,就会销毁窗口)。

具体例子:

SELECT window_start, window_end, SUM(price) AS

total_price

FROM TABLE(

    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))

GROUP BY window_start, window_end;

+------------------+------------------+-------------+

|     window_start |       window_end | total_price |

+------------------+------------------+-------------+

| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |

| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |

+------------------+------------------+-------------+

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/738204.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用OpenCV实现两张图像融合在一起

简单介绍 图像融合技术是一种结合多个不同来源或不同传感器捕获的同一场景的图像数据&#xff0c;以生成一幅更全面、更高质量的单一图像的过程。这种技术广泛应用于遥感、医学影像分析、计算机视觉等多个领域。常见的图像融合技术包括基于像素级、特征级和决策级的融合方法&a…

基与HTML5的塔防游戏设计与实现

目 录 摘 要 I Abstract II 引 言 1 1 项目背景与相关技术 3 1.1 背景与发展简介 3 1.2 HTML5技术及其优势 4 1.3 JavaScript开发的优势与劣势 4 1.4 CSS样式表在开发中的用处 5 1.5 本章小结 6 2 系统分析 7 2.1 需求分析 7 2.2 问题分析 7 2.3 流程设计 7 2.3 功能分析 8 2.…

Linux---多线程(上)

一、线程概念 线程是比进程更加轻量化的一种执行流 / 线程是在进程内部执行的一种执行流线程是CPU调度的基本单位&#xff0c;进程是承担系统资源的基本实体 在说线程之前我们来回顾一下进程的创建过程&#xff0c;如下图 那么以进程为参考&#xff0c;我们该如何去设计创建一个…

paddle的版面分析的环境搭建及使用

一、什么是版面分析 版面分析技术&#xff0c;主要是对图片形式的文档进行版面分析&#xff0c;将文档划分为文字、标题、表格、图片以及列表5类区域&#xff0c;如下图所示&#xff1a; 二、应用场景 2.1 合同比对 2.2 文本类型划分 2.3 通用文档的还原 版面分析技术可将以…

论文阅读FCN-Transformer Feature Fusion for PolypSegmentation

本文提出了一种名为Fully Convolutional Branch-TransFormer (FCBFormer)的图像分割框架。该架构旨在结合Transformer和全卷积网络&#xff08;FCN&#xff09;的优势&#xff0c;以提高结肠镜图像中息肉的检测和分类准确性。 1&#xff0c;框架结构&#xff1a; 模型采用双分…

【Python】牛客网—软件开发-Python专项练习

专栏文章索引&#xff1a;Python 1.&#xff08;单选&#xff09;下面哪个是Python中不可变的数据结构&#xff1f; A.set B.list C.tuple D.dict 可变数据类型&#xff1a;列表list[ ]、字典dict{ }、集合set{ }(能查询&#xff0c;也可更改)数据发生改…

Golang 开发实战day03 - Arrays Slices

Golang 教程03 - Arrays&#xff0c;Slices Go语言中的数组和切片都是用于存储数据的类型&#xff0c;但它们之间存在一些重要的区别。了解这些区别对于有效地使用它们至关重要。 1. Arrays 数组 1.1 定义 数组是一种固定大小的数据结构&#xff0c;用于存储相同类型的值。…

广西省行政村边界shp数据/广西省乡镇边界/广西省土地利用分类数据/径流分布

广西壮族自治区&#xff0c;地处中国南部&#xff0c;北回归线横贯中部。南北以贺州——东兰一线为界&#xff0c;此界以北属中亚热带季风&#xff0c;以南属南亚热带季风。 数据范围&#xff1a;全国行政区划-行政村界 数据类型&#xff1a;面状数据&#xff0c;全国各省市县…

1月笔记本电脑行业分析:多品牌下滑但ThinkPad逆势增长!

2024年1月&#xff0c;笔记本行业市场格局出现较大的变化。长期在京东平台保持头部联想和惠普&#xff0c;被ThinkPad挤下&#xff08;虽然是联想旗下品牌&#xff09;&#xff0c;排名掉至第二和第三。ThinkPad以超2.7亿的月销售额成绩拿下第一&#xff0c;市占比16%。 与去年…

java SSM农产品订购网站系统myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 java SSM农产品订购网站系统是一套完善的web设计系统&#xff08;系统采用SSM框架进行设计开发&#xff0c;springspringMVCmybatis&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采…

算法-贪心-112. 雷达设备

题目 假设海岸是一条无限长的直线&#xff0c;陆地位于海岸的一侧&#xff0c;海洋位于另外一侧。 每个小岛都位于海洋一侧的某个点上。 雷达装置均位于海岸线上&#xff0c;且雷达的监测范围为 d&#xff0c;当小岛与某雷达的距离不超过 d 时&#xff0c;该小岛可以被雷达覆…

大语言模型:Large Language Models Are Human-Level Prompt Engineers概述

研究内容 如何通过prompt&#xff0c;在不进行微调大语言模型的前提下&#xff0c;增加大语言模型的表现 研究动机 prompt非常有用&#xff0c;但是人工设置的非常不自然&#xff1b;因此提出了要自动使用大语言模型自己选择prompt&#xff1b;取得了很好的效果。 作者主要…

python实现生成树

生成树 生成树&#xff08;Spanning Tree&#xff09;是一个连通图的生成树是图的极小连通子图&#xff0c;它包含图中的所有顶点&#xff0c;并且只含尽可能少的边。这意味着对于生成树来说&#xff0c;若砍去它的一条边&#xff0c;则会使生成树变成非连通图&#xff1b;若给…

Git LFS【部署 01】Linux环境安装git-lfs及测试

Linux系统安装git-lfs及测试 1.下载2.安装3.测试4.总结 Git LFS&#xff08;Large File Storage&#xff09;是一个用于Git版本控制系统的扩展&#xff0c;它专门用来管理大型文件&#xff0c;如图像、音频和视频文件。 1.下载 安装包下载页面&#xff1a;https://github.com/…

web3D三维引擎(Direct3D、OpenGL、UE、U3D、threejs)基础扫盲

Hi&#xff0c;我是贝格前端工场的老司机&#xff0c;本文介绍文web3D的几个引擎&#xff0c;做个基础扫盲&#xff0c;如果还不能解决问题&#xff0c;可以私信我&#xff0c;搞私人订制呦。 三维引擎是指用于创建和渲染三维图形的软件框架。它们通常提供了图形处理、物理模拟…

AIGC: 2 语音转换新纪元-Whisper技术在全球客服领域的创新运用

背景 现实世界&#xff0c;人跟人的沟通相当一部分是语音沟通&#xff0c;比如打电话&#xff0c;聊天中发送语音消息。 而在程序的世界&#xff0c;大部分以处理字符串为主。 所以&#xff0c;把语音转换成文字就成为了编程世界非常普遍的需求。 Whisper 是由 OpenAI 开发…

PostgreSQL索引篇 | GIN索引 (倒排索引)

GIN索引 倒排索引 PostgreSQL版本为8.4.1 &#xff08;本文为《PostgreSQL数据库内核分析》一书的总结笔记&#xff0c;需要电子版的可私信我&#xff09; 索引篇&#xff1a; PostgreSQL索引篇 | BTreePostgreSQL索引篇 | GiST索引PostgreSQL索引篇 | Hash索引PostgreSQL索引…

汽车软件市场迅猛扩张,Perforce Helix Core与Helix IPLM助力汽车软件开发的版本控制及IP生命周期管理

汽车软件世界正处于持续变革和转型之中。从自动驾驶汽车到电动汽车和先进的驾驶辅助系统&#xff0c;汽车软件的集成度和复杂性不断提升。 据美国电气与电子工程师协会的研究&#xff0c;如今大多数汽车都集成了超过1亿行代码&#xff0c;而仅仅十年前&#xff0c;这种水平的汽…

软件杯 垃圾邮件(短信)分类算法实现 机器学习 深度学习

文章目录 0 前言2 垃圾短信/邮件 分类算法 原理2.1 常用的分类器 - 贝叶斯分类器 3 数据集介绍4 数据预处理5 特征提取6 训练分类器7 综合测试结果8 其他模型方法9 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 垃圾邮件(短信)分类算…

ubuntu 18.04安装教程(详细有效)

文章目录 一、下载ubuntu 18.04镜像二、安装ubuntu1. 点击下载好的Vmware Workstation&#xff0c;点击新建虚拟机&#xff0c;选择 “自定义(高级)”&#xff0c;之后下一步。2. 默认配置&#xff0c;不需要更改&#xff0c;点击下一步。3. 选择 “安装程序光盘映像文件(iso)(…