FlinkSQL窗口实例分析

Windowing TVFs

Windowing table-valued functions (Windowing TVFs),即窗口表值函数
注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区,即存在:group by window_start,window_end

  • TUMBLE函数采用三个必需参数,一个可选参数:

    TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])

    data:是一个表参数,可以是与时间属性列的任何关系。
    timecol:是一个列描述符,指示数据的哪些时间属性列应映射到滚动窗口。
    size:是指定翻滚窗口宽度的持续时间。
    offset: 是一个可选参数,用于指定窗口开始移动的偏移量。

  • HOP采用 4 个必需参数和 1 个可选参数:

    HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

    data:是一个表参数,可以是与时间属性列的任何关系。
    timecol:是一个列描述符,指示数据的哪些时间属性列应映射到跳跃窗口。
    slide:是指定连续跳跃窗口开始之间的持续时间的持续时间
    size:是指定跳跃窗口宽度的持续时间。
    offset: 是一个可选参数,用于指定窗口开始移动的偏移量。

  • CUMULATE采用 4 个必需参数和 1 个可选参数:

    CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)

    data:是一个表参数,可以是与时间属性列的任何关系。
    timecol:是一个列描述符,指示数据的哪些时间属性列应映射到累积窗口。
    step:是指定连续累积窗口末尾之间增加的窗口大小的持续时间。
    size:是指定累积窗口最大宽度的持续时间。size必须是 的整数倍step。
    offset: 是一个可选参数,用于指定窗口开始移动的偏移量。

滚动窗口

CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't0','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset'format' = 'json'
);create view tmp as
selectCOALESCE(cur['group_name'], src['group_name']) group_name,COALESCE(cur['batch_number'], src['batch_number']) batch_number,event_time
from kafka_table
where UPPER(opt) <> 'DELETE';
--注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,window_time,group_name

滑动窗口

CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't0','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset'format' = 'json'
);create view tmp as
selectCOALESCE(cur['group_name'], src['group_name']) group_name,COALESCE(cur['batch_number'], src['batch_number']) batch_number,event_time
from kafka_table
where UPPER(opt) <> 'DELETE';
--注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(HOP(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '60' SECOND,INTERVAL '10' MINUTES))
group by window_start,window_end,window_time,group_name

累计窗口

CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't0','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset'format' = 'json'
);create view tmp as
selectCOALESCE(cur['group_name'], src['group_name']) group_name,COALESCE(cur['batch_number'], src['batch_number']) batch_number,event_time
from kafka_table
where UPPER(opt) <> 'DELETE';
--注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(CUMULATE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '1' HOUR,INTERVAL '24' HOURS)) --从零点开始累计
TABLE(CUMULATE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '60' SECOND,INTERVAL '10' MINUTES))
TABLE(CUMULATE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '1' MINUTE,INTERVAL '1' HOURS))
group by window_start,window_end,window_time,group_name

窗口聚合-多维分析

CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't0','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset'format' = 'json'
);create view tmp as
selectCOALESCE(cur['group_name'], src['group_name']) group_name,COALESCE(cur['batch_number'], src['batch_number']) batch_number,event_time
from kafka_table
where UPPER(opt) <> 'DELETE';
--注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区--实例1:整体聚合
select window_start,window_end,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end--实例2:根据字段聚合,n个维度
select window_start,window_end,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,group_name--实例3:多维分析GROUPING SETS
select window_start,window_end,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,GROUPING SETS((group_name)) --等同于 实例2
group by window_start,window_end,GROUPING SETS((group_name), ()) --等同于 实例1 union all 实例2--实例4:多维分析GROUPING SETS,多个字段
select window_start,window_end,group_name,batch_number,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,GROUPING SETS((group_name,batch_number),(group_name),(batch_number),())--实例5:多维分析CUBE 2^n个维度
select window_start,window_end,group_name,batch_number,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,CUBE(group_name) --等同于group by window_start,window_end,GROUPING SETS((group_name), ())
group by window_start,window_end,CUBE(group_name,batch_number) --等同于实例4--实例6:多维分析ROLLUP  n+1个维度
select window_start,window_end,group_name,batch_number,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,ROLLUP(group_name) --等同于 实例1 union all 实例2
group by window_start,window_end,ROLLUP(group_name,batch_number) --等同于GROUPING SETS((group_name,batch_number),(group_name),())

窗口topN

Window Top-N 语句的语法:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name) -- relation applied windowing TVF
WHERE rownum <= N [AND conditions]
CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't0','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset'format' = 'json'
);create view tmp as
selectCOALESCE(cur['group_name'], src['group_name']) group_name,COALESCE(cur['batch_number'], src['batch_number']) batch_number,event_time
from kafka_table
where UPPER(opt) <> 'DELETE';
--注意:窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区--方式1:窗口 Top-N 紧随窗口聚合之后
create view tmp_window as
select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '24' HOURS))
group by window_start,window_end,window_time,group_name;--计算每个翻滚 24小时窗口内pv最高的前 3 名机构(即每天PV最高的前三名)
select * from(select * ,ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY cnt DESC) as rnfrom tmp_window) t
where rn <=3--计算每个机构pv最高的前 3天
select * from(select * ,ROW_NUMBER() OVER (PARTITION BY group_name ORDER BY cnt DESC) as rnfrom tmp_window) t
where rn <=3--方式2:窗口 Top-N 紧随窗口 TVF 之后
select *
from(selectwindow_start,window_end,window_time,group_name,ts,ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY ts DESC) AS rnfrom TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '24' HOURS)))
where rn <=3

窗口去重

Flink使用去重的方式,就像Window Top-N查询ROW_NUMBER()的方式一样。理论上,
窗口重复数据删除是窗口 Top-N 的一种特殊情况,其中 N 为 1,并且按处理时间或事件时间排序
Window Deduplication 语句的语法:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]ORDER BY time_attr [asc|desc]) AS rownumFROM table_name) -- relation applied windowing TVF
WHERE (rownum = 1 | rownum <=1 | rownum < 2) [AND conditions]
CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,group_name as COALESCE(cur['group_name'], src['group_name']),batch_number as COALESCE(cur['batch_number'], src['batch_number']),event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't0','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset'format' = 'json'
);select *
from(selectwindow_start,window_end,group_name,event_time,ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY event_time DESC) AS rnfrom TABLE(TUMBLE(TABLE kafka_table, DESCRIPTOR(event_time), INTERVAL '24' HOURS)))
where rn =1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/581745.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

适合当代年轻人做的副业分享,可以长期发展

年轻人精力旺盛&#xff0c;学习能力强&#xff0c;有创新&#xff0c;且处于不断上升阶段&#xff0c;所以副业要选择能成长&#xff0c;长期可靠的&#xff0c;既可以赚钱&#xff0c;又可以提升自己&#xff0c;一举两得&#xff01; 而在这其中&#xff0c;有一些副业活动不…

编程笔记 GOLANG基础 001 为什么要学习Go语言

编程笔记 GOLANG基础 001 为什么要学Go语言 一、推荐学习的计算机程序设计语言&#xff08;一&#xff09;、前端设计与编程&#xff1a;htmlcssjavascripttypescript&#xff08;二&#xff09;、C/C语言&#xff08;三&#xff09;、Go语言&#xff08;四&#xff09;、Pytho…

python3处理docx并flask显示

前言&#xff1a; 最近有需求处理docx文件&#xff0c;并讲内容显示到页面&#xff0c;对world进行在线的阅读&#xff0c;这样我这里就使用flaskDocument对docx文件进行处理并显示&#xff0c;下面直接上代码&#xff1a; Document处理&#xff1a; 首先下载Document的库文…

kubeadm 快速搭建

二进制搭建适合大集群&#xff0c;50台以上的主机 kubeadm更适合中下企业的业务集群。 master docker kubelet bubeadm kubectl flannel node1 docker kubelet bubeadm kubectl flannel node2 docker kubelet bubeadm kubectl flannel harbor节点&#xff1a;docker docker…

排列组合算法(升级版)

前言 在上一期博客中我们分享了一般的排列组合算法&#xff08;没看的话点这里哦~&#xff09;&#xff0c;但是缺点很明显&#xff0c;没法进行取模运算&#xff0c;而且计算的范围十分有限&#xff0c;而今天分享的排列组合升级版算法能够轻松解决这些问题&#xff0c;话不多…

【汇编笔记】初识汇编-内存读写

汇编语言的由来&#xff1a; CPU是计算机的核心&#xff0c;由于计算机只认识二进制&#xff0c;所以CPU执行的指令是二进制。 我们要想让CPU工作&#xff0c;就得给他提供它认识的指令&#xff0c;这一系列的指令的集合&#xff0c;称之为指令集。 指令集&#xff1a; 不同的体…

2023/12/3 今日得先看的重磅AI新闻

&#x1f4f1; 传 iPhone 设计主管加盟苹果前首席设计师公司&#xff0c;与 OpenAI 合作开发 AI 设备 &#x1f697; 雷军宣布&#xff1a;小米澎湃 OS 启动新标识&#xff0c;「人车家全生态」正式闭环 &#x1f527; OpenAI 竞争对手 Anthropic 预计明年年化营收将达到 8.5…

教育行业:真正有头脑的人,都在用这个巡课技术!

随着教育技术的迅猛发展&#xff0c;学校管理面临着日益复杂的挑战。在线巡课系统作为一种强大的工具&#xff0c;为学校管理者提供了更高效、精准的管理手段。 客户案例 中学巡课项目 河南某中学引入了泛地缘科技推出的在线巡课系统&#xff0c;实现了对教学过程的全面监管。…

[大厂实践] DoorDash基于eBPF的监控实践

eBPF是监控云原生应用的强大工具&#xff0c;本文介绍了DoorDash构建基于eBPF的监控系统的实践。原文: BPFAgent: eBPF for Monitoring at DoorDash 随着DoorDash在过去几年中经历了快速增长&#xff0c;我们开始看到传统监控方法的局限性。度量、日志和跟踪提供了服务生态系统…

Kernel:编译:剪裁

Linux内核的利用&#xff0c;有很大一部分是要做剪裁功能&#xff0c;来缩减内核所占的空间&#xff0c;以适应嵌入式环境的种种场景。或者以适应不同的架构。 其中一个主要的剪裁实现是通过编译配置&#xff0c;去除内核里不一样的功能&#xff1b;如RHEL的最终的编译配置文件…

数据结构第0章 初识

名人说&#xff1a;莫听穿林打叶声&#xff0c;何妨吟啸且徐行。—— 苏轼《定风波莫听穿林打叶声》 本篇笔记整理&#xff1a;Code_流苏(CSDN)&#xff08;一个喜欢古诗词和编程的Coder&#x1f60a;&#xff09; 目录 0、思维导图1、数据结构1&#xff09;数据结构是什么&am…

Flink1.17实战教程(第六篇:容错机制)

系列文章目录 Flink1.17实战教程&#xff08;第一篇&#xff1a;概念、部署、架构&#xff09; Flink1.17实战教程&#xff08;第二篇&#xff1a;DataStream API&#xff09; Flink1.17实战教程&#xff08;第三篇&#xff1a;时间和窗口&#xff09; Flink1.17实战教程&…

【PostgreSQL】从零开始:(三十一)数据类型-复合类型

复合类型 复合类型是一种由其他类型组成的类型。它可以是数组、结构体、联合体或指向这些类型的指针。复合类型允许将多个值组合成单个实体&#xff0c;以便更方便地处理和使用。复合类型在C语言中非常常见&#xff0c;用于表示复杂的数据结构和组织数据的方式。 数组是一种由…

python的二分查找库bisect,可用于简化繁琐的if条件分支

if条件分支的函数 之前实现了一个函数功能&#xff0c;大意是根据不同的时间天数&#xff0c;返回不同的值。 def analyse_value(days_num:int):if days_num 1:value RD1delif days_num > 1 and days_num < 7:value RD7delif days_num > 7 and days_num < 14:…

C++智能指针的简单实现,原理及应用

1. 为什么C引入了智能指针&#xff1f; 在C中&#xff0c;引入智能指针主要是为了解决原始指针在使用过程中可能出现的内存泄漏问题。内存泄漏是程序在申请内存后&#xff0c;无法释放已分配的内存&#xff0c;导致内存被无效占用&#xff0c;严重时可能导致系统运行缓慢甚至崩…

Redis6.0 Client-Side缓存是什么

前言 Redis在其6.0版本中加入了Client-side caching的支持&#xff0c;开启该功能后&#xff0c;Redis可以将指定的key-value缓存在客户端侧&#xff0c;这样当客户端发起请求时&#xff0c;如果客户端侧存在缓存&#xff0c;则无需请求Redis Server端。 Why Client-side Cac…

【每日一题】【12.24】 - 【12.28】

&#x1f525;博客主页&#xff1a; A_SHOWY&#x1f3a5;系列专栏&#xff1a;力扣刷题总结录 数据结构 云计算 数字图像处理 力扣每日一题_ 本周总结&#xff1a;本周的每日一题比较针对于数学问题的一个应用&#xff0c;如二元一次方程组的求解或者数组求和&#xff0c;同…

Baumer工业相机堡盟工业相机如何通过NEOAPI SDK使用UserSet功能保存和载入相机的各类参数(C++)

Baumer工业相机堡盟工业相机如何通过NEOAPI SDK使用UserSet功能保存和载入相机的各类参数&#xff08;C&#xff09; Baumer工业相机Baumer工业相机NEOAPISDK中UserSet的技术背景代码案例分享第一步&#xff1a;保存相机当前参数设置UserSet_Save第二步&#xff1a;载入已经保存…

C++第2关:文件读取和写入

任务描述 题目描述:从文件a.txt中读取三个整数&#xff0c;然后把这三个整数保存到b.txt中&#xff0c;两整数之间一个空格。 相关知识&#xff08;略&#xff09; 编程要求 根据提示&#xff0c;在右侧编辑器Begin-End处补充代码&#xff0c;完成本关要求。 格式如下: 10…

IDEA、VSCode等快速连接Github(Mac版)

问题描述 在本地书写✍️完代码后, 想要git push到Github上面, 出现延迟错误; 导致经常push不上去, 如下图所示; 解决方案 进入电脑终端; 输入下列命令; sudo vim /etc/hosts输入密码; 按下 I 键, 进行编辑操作; 将下列语句复制到空白区, 然后按下esc按键, 然后输入:wq即可…