Flink系列之:窗口Top-N

Flink系列之:窗口Top-N

  • 一、窗口Top-N
  • 二、示例:在窗口聚合后进行窗口 Top-N
  • 三、在窗口表值函数后进行窗口 Top-N
  • 四、限制

一、窗口Top-N

  • 适用于流、批一体
  • 窗口 Top-N 是特殊的 Top-N,它返回每个分区键的每个窗口的N个最小或最大值。
  • 与普通Top-N不同,窗口Top-N只在窗口最后返回汇总的Top-N数据,不会产生中间结果。窗口 Top-N 会在窗口结束后清除不需要的中间状态。 因此,窗口 Top-N 适用于用户不需要每条数据都更新Top-N结果的场景,相对普通Top-N来说性能更好。通常,窗口 Top-N 直接用于 窗口表值函数上。 另外,窗口 Top-N 可以用于基于 窗口表值函数 的操作之上,比如 窗口聚合,窗口 Top-N 和 窗口关联。
  • 窗口 Top-N 的语法和普通的 Top-N 相同。除此之外,窗口 Top-N 需要 PARTITION BY 子句包含 窗口表值函数 或 窗口聚合 产生的 window_start 和 window_end。 否则优化器无法翻译。

下面展示了窗口 Top-N 的语法:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name) -- relation applied windowing TVF
WHERE rownum <= N [AND conditions]

二、示例:在窗口聚合后进行窗口 Top-N

下面的示例展示了在10分钟的滚动窗口上计算销售额位列前三的供应商。

-- tables must have time attribute, e.g. `bidtime` in this table
Flink SQL> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
|        name |                   type | null | key | extras |                       watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
|     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND |
|       price |         DECIMAL(10, 2) | true |     |        |                                 |
|        item |                 STRING | true |     |        |                                 |
| supplier_id |                 STRING | true |     |        |                                 |
+-------------+------------------------+------+-----+--------+---------------------------------+Flink SQL> SELECT * FROM Bid;
+------------------+-------+------+-------------+
|          bidtime | price | item | supplier_id |
+------------------+-------+------+-------------+
| 2020-04-15 08:05 |  4.00 |    A |   supplier1 |
| 2020-04-15 08:06 |  4.00 |    C |   supplier2 |
| 2020-04-15 08:07 |  2.00 |    G |   supplier1 |
| 2020-04-15 08:08 |  2.00 |    B |   supplier3 |
| 2020-04-15 08:09 |  5.00 |    D |   supplier4 |
| 2020-04-15 08:11 |  2.00 |    B |   supplier3 |
| 2020-04-15 08:13 |  1.00 |    E |   supplier1 |
| 2020-04-15 08:15 |  3.00 |    H |   supplier2 |
| 2020-04-15 08:17 |  6.00 |    F |   supplier5 |
+------------------+-------+------+-------------+Flink SQL> SELECT *FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownumFROM (SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cntFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end, supplier_id)) WHERE rownum <= 3;
+------------------+------------------+-------------+-------+-----+--------+
|     window_start |       window_end | supplier_id | price | cnt | rownum |
+------------------+------------------+-------------+-------+-----+--------+
| 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier1 |  6.00 |   2 |      1 |
| 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier4 |  5.00 |   1 |      2 |
| 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier2 |  4.00 |   1 |      3 |
| 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier5 |  6.00 |   1 |      1 |
| 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier2 |  3.00 |   1 |      2 |
| 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier3 |  2.00 |   1 |      3 |
+------------------+------------------+-------------+-------+-----+--------+

注意: 为了更好地理解窗口行为,这里把 timestamp 值后面的0去掉了。例如:在 Flink SQL Client 中,如果类型是 TIMESTAMP(3) ,2020-04-15 08:05 应该显示成 2020-04-15 08:05:00.000 。

这条Flink SQL查询的目标是在表Bid中根据时间窗口对数据进行分组,并找出每个窗口内价格最高的三个供应商。

  • 首先,在FROM子句中,使用TUMBLE函数对Bid表进行分区,每个分区的时间窗口大小为10分钟,并指定bidtime作为分区依据。然后,将其结果作为内部查询的输入表。
  • 在内部查询中,使用GROUP BY子句将数据按窗口的开始时间(window_start)、结束时间(window_end)和供应商ID(supplier_id)进行分组。并计算每个分组的价格总和(SUM(price))和行数(COUNT(*))。同时,使用ROW_NUMBER()函数在每个窗口分组内按价格降序排列,并为每行分配一个行号(rownum)。
  • 最后,在外部查询中,筛选出行号(rownum)小于等于3的记录,并返回窗口的开始时间、结束时间、供应商ID、价格总和、行数和行号。
  • 最终的查询结果将包括每个窗口内价格最高的三个供应商的信息。

三、在窗口表值函数后进行窗口 Top-N

下面的示例展示了在10分钟的滚动窗口上计算价格位列前三的数据。

Flink SQL> SELECT *FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownumFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))) WHERE rownum <= 3;
+------------------+-------+------+-------------+------------------+------------------+--------+
|          bidtime | price | item | supplier_id |     window_start |       window_end | rownum |
+------------------+-------+------+-------------+------------------+------------------+--------+
| 2020-04-15 08:05 |  4.00 |    A |   supplier1 | 2020-04-15 08:00 | 2020-04-15 08:10 |      2 |
| 2020-04-15 08:06 |  4.00 |    C |   supplier2 | 2020-04-15 08:00 | 2020-04-15 08:10 |      3 |
| 2020-04-15 08:09 |  5.00 |    D |   supplier4 | 2020-04-15 08:00 | 2020-04-15 08:10 |      1 |
| 2020-04-15 08:11 |  2.00 |    B |   supplier3 | 2020-04-15 08:10 | 2020-04-15 08:20 |      3 |
| 2020-04-15 08:15 |  3.00 |    H |   supplier2 | 2020-04-15 08:10 | 2020-04-15 08:20 |      2 |
| 2020-04-15 08:17 |  6.00 |    F |   supplier5 | 2020-04-15 08:10 | 2020-04-15 08:20 |      1 |
+------------------+-------+------+-------------+------------------+------------------+--------+

注意: 为了更好地理解窗口行为,这里把 timestamp 值后面的0去掉了。例如:在 Flink SQL Client 中,如果类型是 TIMESTAMP(3) ,2020-04-15 08:05 应该显示成 2020-04-15 08:05:00.000 。

这个Flink SQL语句的目标是从表Bid中选择特定的列,并为每个时间窗口内的数据分配一个行号(rownum)。行号的分配是基于每个窗口内数据的价格进行降序排列。

  • 首先,在内部查询中,使用TUMBLE函数将Bid表按照bidtime进行分区,每个分区代表一个时间窗口,窗口大小为10分钟。然后将分区结果作为输入表。
  • 接下来,在内部查询中,使用ROW_NUMBER()函数为每个窗口分组内的数据分配行号。PARTITION BY子句指定按窗口开始时间(window_start)和结束时间(window_end)进行分组,ORDER BY子句指定按价格(price)降序排列。这样,每个时间窗口内的数据就会被分配一个行号。
  • 最后,在外部查询中,筛选出行号(rownum)小于等于3的记录,这意味着只保留每个时间窗口内前三个价格最高的数据。
  • 最终的查询结果将包括原始表中的所有列,以及每个时间窗口的开始时间、结束时间和行号。这样就可以查看每个窗口内价格最高的前三个数据,同时保留其他列的信息。

四、限制

  • 目前,Flink只支持在滚动,滑动和累计 窗口表值函数后进行窗口 Top-N。基于会话窗口的Top-N将在将来版本中支持。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/228275.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

时序分解 | Matlab实现DBO-VMD基于蜣螂优化算法优化VMD变分模态分解时间序列信号分解

时序分解 | Matlab实现DBO-VMD基于蜣螂优化算法优化VMD变分模态分解时间序列信号分解 目录 时序分解 | Matlab实现DBO-VMD基于蜣螂优化算法优化VMD变分模态分解时间序列信号分解效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.利用蜣螂优化算法优化VMD中的参数k、a&…

12、Kafka中位移提交那些事儿

Kafka中位移提交那些事儿 1、自动提交位移2、手动提交位移2.1、同步提交位移2.2、异步提交位移2.3、更精细化的位移管理 Consumer 端有个位移的概念&#xff0c;它和消息在分区中的位移不是一回事儿&#xff0c;虽然它们的英文都是 Offset。今天我们要聊的位移是 Consumer 的消…

千亿露酒市场的未来之“露”

执笔 | 尼 奥 编辑 | 扬 灵 12月15日&#xff0c;以“以美为酿&#xff0c;品致未来”为主题的中国露酒产业发展大会暨露酒价值论坛在“中国酒都”宜宾举办。 近年来&#xff0c;露酒产业发展异军突起&#xff0c;市场销售规模超越黄酒、葡萄酒品类&#xff0c;成为中国酒…

正则表达式IP地址

正则表达式基础语法 正则表达式-字符类 [abc]&#xff1a;代表a或者b&#xff0c;或者c字符中的一个。 [^abc]&#xff1a;代表除a,b,c以外的任何字符。 [a-z]&#xff1a;代表a-z的所有小写字符中的一个。 [A-Z]&#xff1a;代表A-Z的所有大写字符中的一个。 [0-9]&#xff…

人工智能文本分类

在本文中&#xff0c;我们全面探讨了文本分类技术的发展历程、基本原理、关键技术、深度学习的应用&#xff0c;以及从RNN到Transformer的技术演进。文章详细介绍了各种模型的原理和实战应用&#xff0c;旨在提供对文本分类技术深入理解的全面视角。 一、引言 文本分类作为人工…

期末总复习(重点!!!)

一、第6章异常处理 1、什么是异常、什么是异常处理异常是指程序在运行过程中发生的错误事件&#xff0c;影响程序的正常执行。异常并不是一定会发生&#xff0c;默认情况下&#xff0c;程序运行中遇到异常时将会终止&#xff0c;并在控制台打印出异常出现的堆栈信息。异常处理…

在线客服系统定价因素解析:影响价格的关键因素

跨境电子商务公司必不可少的工具就是在线客服系统。企业选择在线客服系统的时候免不了要对不同产品的功能性、价格、服务等因素进行考量。今天这篇文章&#xff0c;我们就来探讨一下在线客服系统的定价因素有哪些&#xff1f;探究市面上的在线客服系统价格各异的影响因素。为大…

Lambda 的表达式作用域(Lambda Scopes)

文章目录 讲一下 Lambda 的表达式作用域&#xff08;Lambda Scopes&#xff09;。访问局部变量访问字段和静态变量访问默认接口方法 讲一下 Lambda 的表达式作用域&#xff08;Lambda Scopes&#xff09;。 访问局部变量 我们可以直接在 lambda 表达式中访问外部的局部变量&a…

c# bitmap压缩导致png不透明的问题解决

新建.net 6控制台项目 安装System.Drawing.Common包 代码如下 using System.Drawing; using System.Drawing.Imaging;namespace PngCompress02 {internal class Program{static void Main(string[] args){CompressPngImage("E:\Desktop\6.png", "E:\Desktop\6…

C++相关闲碎记录(14)

1、数值算法 &#xff08;1&#xff09;运算后产生结果accumulate() #include "algostuff.hpp"using namespace std;int main() {vector<int> coll;INSERT_ELEMENTS(coll, 1, 9);PRINT_ELEMENTS(coll);cout << "sum: " << accumulate(…

DPDK系列之三十九控制管理

一、基础介绍 通过前面的分析&#xff0c;对DPDK中对报文处理的过程有了一个初步的认知。从一个更高层次来看&#xff0c;传统的网络通信一般会通过上层应用、操作系统、网卡驱动和硬件四层。再往下&#xff0c;基本就不属于于计算机控制的系统了。 早期的应用&#xff0c;基本…

Python - coverage

coverage overage 是一个用于测量Python程序代码覆盖率的工具。它监视您的程序&#xff0c;注意代码的哪些部分已经执行&#xff0c;然后分析源代码&#xff0c;以确定哪些代码本可以执行&#xff0c;但没有执行。 覆盖率测量通常用于衡量测试的有效性。它可以显示代码的哪些…

整理了上百个开源中文大语言模型,涵盖模型、应用、数据集、微调、部署、评测

自ChatGPT为代表的大语言模型&#xff08;Large Language Model, LLM&#xff09;出现以后&#xff0c;由于其惊人的类通用人工智能&#xff08;AGI&#xff09;的能力&#xff0c;掀起了新一轮自然语言处理领域的研究和应用的浪潮。 尤其是以ChatGLM、LLaMA等平民玩家都能跑起…

抖音品牌力不足,如何开通抖音旗舰店?强开旗舰店全攻略来了!

随着直播的兴起&#xff0c;抖音电商在近年来的发展速度可谓是相当迅猛。越来越多的商家开始将重心投入到抖音电商。从开店、搭建直播间&#xff0c;起号&#xff0c;再到日常运营... 然而我们在第一步开店的时候&#xff0c;就遇到了不少麻烦。 1、选择开通抖音旗舰店&#x…

初识Flask

摆上中文版官方文档网站&#xff1a;https://flask.github.net.cn/quickstart.html 开启实验之路~~~~~~~~~~~~~ from flask import Flaskapp Flask(__name__) # 使用修饰器告诉flask触发函数的URL&#xff0c;绑定URL&#xff0c;后面的函数用于返回用户在浏览器上看到的内容…

Spring Cloud + Vue前后端分离-第5章 单表管理功能前后端开发

Spring Cloud Vue前后端分离-第5章 单表管理功能前后端开发 完成单表的增删改查 控台单表增删改查的前后端开发&#xff0c;重点学习前后端数据交互&#xff0c;vue ajax库axios的使用等 通用组件开发:分页、确认框、提示框、等待框等 常用的公共组件:确认框、提示框、等待…

系列九、事务

一、事务 1.1、概述 事务是一组操作的集合&#xff0c;它是一个不可分割的工作单位&#xff0c;事务会把所有的操作作为一个整体一起向系统提交或者撤销操作请求&#xff0c;即&#xff1a;这些操作要么同时成功&#xff0c;要么同时失败。 例如: 张三给李四转账1000块钱&…

使用邮件群发平台,轻松实现高效沟通的4大优势!

新媒体带动着众多线上平台的发展&#xff0c;使得流量为企业带来了可观的营收。但是&#xff0c;随着短视频市场的饱和&#xff0c;想要再次获得初始时的流量就变得越发困难。在这个时候&#xff0c;企业不妨将眼光往邮件群发这个传统的营销方式上倾斜&#xff0c;特别是出海、…

多进程间通信学习之共享内存

共享内存&#xff1a;1、在内核中创建共享内存&#xff1b;2、进程1和进程2都能够访问到&#xff0c;通过这段内存空间进行数据传递&#xff1b;3、共享内存是所有进程间通信方式中&#xff0c;效率最高&#xff0c;不需要在内核中往返进行拷贝&#xff1b;4、共享内存的内存空…

prometheus简介

什么是Prometheus Prometheus 是一个开源的服务监控系统和时序数据库&#xff0c;其提供了通用的数据模型和快捷数据采集、存储和查询接口。Prometheus的特点 多维数据模型&#xff1a;由度量名称和键值对标识的时间序列数据 时序数据&#xff0c;是在一段时间内通过重复测量&…