【大数据】Flink SQL 语法篇(九):Window TopN、Deduplication

Flink SQL 语法篇》系列,共包含以下 10 篇文章:

  • Flink SQL 语法篇(一):CREATE
  • Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 语法篇(四):Group 聚合、Over 聚合
  • Flink SQL 语法篇(五):Regular Join、Interval Join
  • Flink SQL 语法篇(六):Temporal Join
  • Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 语法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 语法篇(九):Window TopN、Deduplication
  • Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

Flink SQL 语法篇(九):Window TopN、Deduplication

  • 1.Window TopN
  • 2.Deduplication
    • 2.1 案例 1(事件时间)
    • 2.2 案例 2(处理时间)

1.Window TopN

Window TopN 定义(支持 Streaming):Window TopN 是一种特殊的 TopN,它的返回结果是每一个窗口内的 N 个最小值或者最大值。

应用场景:小伙伴萌会问了,我有了 TopN 为啥还需要 Window TopN 呢?还记得上一篇博客介绍 TopN 说道的 TopN 时会出现中间结果,从而出现回撤数据的嘛?Window TopN 不会出现回撤数据,因为 Window TopN 实现是在窗口结束时输出最终结果,不会产生中间结果。而且注意,因为是窗口上面的操作,Window TopN 在窗口结束时,会自动把 State 给清除。

SQL 语法标准:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name) -- windowing TVF
WHERE rownum <= N [AND conditions]

实际案例:取当前这一分钟的搜索关键词下的搜索热度前 10 名的词条数据。

-- 输入表字段:
-- 字段名         备注
-- key              搜索关键词
-- name             搜索热度名称
-- search_cnt       热搜消费热度(比如 3000)
-- timestamp        消费词条时间戳CREATE TABLE source_table (name BIGINT NOT NULL,search_cnt BIGINT NOT NULL,key BIGINT NOT NULL,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH (...
);-- 输出表字段:
-- 字段名         备注
-- key              搜索关键词
-- name             搜索热度名称
-- search_cnt       热搜消费热度(比如 3000)
-- window_start     窗口开始时间戳
-- window_end       窗口结束时间戳CREATE TABLE sink_table (key BIGINT,name BIGINT,search_cnt BIGINT,window_start TIMESTAMP(3),window_end TIMESTAMP(3)
) WITH (...
);-- 处理 sql:INSERT INTO sink_table
SELECT key, name, search_cnt, window_start, window_end
FROM (SELECT key, name, search_cnt, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end, keyORDER BY search_cnt desc) AS rownumFROM (SELECT window_start, window_end, key, name, max(search_cnt) as search_cnt-- window tvf 写法FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '1' MINUTES))GROUP BY window_start, window_end, key, name)
)
WHERE rownum <= 100

输出结果:

+I[关键词1, 词条1, 8670, 2021-1-28T22:34, 2021-1-28T22:35]
+I[关键词1, 词条2, 6928, 2021-1-28T22:34, 2021-1-28T22:35]
+I[关键词1, 词条3, 1735, 2021-1-28T22:34, 2021-1-28T22:35]
+I[关键词1, 词条4, 7287, 2021-1-28T22:34, 2021-1-28T22:35]
...

SQL 语义:

  • 数据源:数据源即最新的词条下面的搜索词的搜索热度数据,消费到 Kafka 中数据后,将数据按照窗口聚合的 Key 通过 Hash 分发策略发送到下游窗口聚合算子。
  • 窗口聚合算子:进行窗口聚合计算,随着时间的推进,将窗口聚合结果计算完成发往下游窗口排序算子。
  • 窗口排序算子:这个算子其实也是一个窗口算子,只不过这个窗口算子为每个 Key 维护了一个 TopN 的榜单数据,接受到上游发送的窗口结果数据进行排序,随着时间的推进,窗口的结束,将排序的结果输出到下游数据汇算子。
  • 数据汇:接收到上游的数据之后,然后输出到外部存储引擎中。

2.Deduplication

Deduplication 定义(支持 Batch / Streaming):Deduplication 其实就是去重,也即上文介绍到的 TopN 中 row_number = 1 的场景,但是这里有一点不一样在于其 排序字段 一定是 时间属性列,不能是其他非时间属性的普通列。在 row_number = 1 时,如果排序字段是普通列 Planner 会翻译成 TopN 算子,如果是时间属性列 Planner 会翻译成 Deduplication,这两者最终的执行算子是不一样的,Deduplication 相比 TopN 算子专门做了对应的优化,性能会有很大提升。

应用场景:比如上游数据发重了,或者计算 DAU 明细数据等场景,都可以使用 Deduplication 语法去做去重。

SQL 语法标准:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]ORDER BY time_attr [asc|desc]) AS rownumFROM table_name)
WHERE rownum = 1
  • ROW_NUMBER():标识当前数据的排序值。
  • PARTITION BY col1[, col2...]:标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序。
  • ORDER BY time_attr [asc|desc]:标识排序规则,必须为时间戳列,当前 Flink SQL 支持处理时间、事件时间,ASC 代表保留第一行,DESC 代表保留最后一行。
  • WHERE rownum = 1:这个子句是一定需要的,而且必须为 rownum = 1

2.1 案例 1(事件时间)

某一游戏用户等级的场景,每一个用户都有一个用户等级,需要求出当前用户等级在 星星⭐,月亮🌙,太阳🌞 的用户数分别有多少。

-- 数据源:当每一个用户的等级初始化及后续变化的时候的数据,即用户等级变化明细数据。
CREATE TABLE source_table (user_id BIGINT COMMENT '用户 id',level STRING COMMENT '用户等级',row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)) COMMENT '事件时间戳',WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.level.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '1000000'
);-- 数据汇:输出即每一个等级的用户数
CREATE TABLE sink_table (level STRING COMMENT '等级',uv BIGINT COMMENT '当前等级用户数',row_time timestamp(3) COMMENT '时间戳'
) WITH ('connector' = 'print'
);-- 处理逻辑:
INSERT INTO sink_table
select level, count(1) as uv, max(row_time) as row_time
from (SELECTuser_id,level,row_time,row_number() over(partition by user_id order by row_time) as rnFROM source_table
)
where rn = 1
group by level

输出结果:

+I[等级 1, 6928, 2021-1-28T22:34]
-I[等级 1, 6928, 2021-1-28T22:34]
+I[等级 1, 8670, 2021-1-28T22:34]
-I[等级 1, 8670, 2021-1-28T22:34]
+I[等级 1, 77287, 2021-1-28T22:34]
...

可以看到其有回撤数据。

其对应的 SQL 语义如下:

  • 数据源:消费到 Kafka 中数据后,将数据按照 partition by 的 Key 通过 Hash 分发策略发送到下游去重算子。
  • Deduplication 去重算子:接受到上游数据之后,根据 order by 中的条件判断当前的这条数据和之前数据时间戳大小,以上面案例来说,如果当前数据时间戳大于之前数据时间戳,则撤回之前向下游发的中间结果,然后将最新的结果发向下游(发送策略也为 Hash,具体的 Hash 策略为按照 group by 中 Key 进行发送),如果当前数据时间戳小于之前数据时间戳,则不做操作。此算子产出的结果就是每一个用户的对应的最新等级信息。
  • Group by 聚合算子:接受到上游数据之后,根据 Group by 聚合粒度对数据进行聚合计算结果(每一个等级的用户数),发往下游数据汇算子。
  • 数据汇:接收到上游的数据之后,然后输出到外部存储引擎中。

2.2 案例 2(处理时间)

最原始的日志是明细数据,需要我们根据用户 id 筛选出这个用户当天的第一条数据,发往下游,下游可以据此计算分各种维度的 DAU。

-- 数据源:原始日志明细数据
CREATE TABLE source_table (user_id BIGINT COMMENT '用户 id',name STRING COMMENT '用户姓名',server_timestamp BIGINT COMMENT '用户访问时间戳',proctime AS PROCTIME()
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.name.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '10','fields.server_timestamp.min' = '1','fields.server_timestamp.max' = '100000'
);-- 数据汇:根据 user_id 去重的第一条数据
CREATE TABLE sink_table (user_id BIGINT,name STRING,server_timestamp BIGINT
) WITH ('connector' = 'print'
);-- 处理逻辑:
INSERT INTO sink_table
select user_id,name,server_timestamp
from (SELECTuser_id,name,server_timestamp,row_number() over(partition by user_id order by proctime) as rnFROM source_table
)
where rn = 1

输出结果:

+I[1, 用户 1, 2021-1-28T22:34]
+I[2, 用户 2, 2021-1-28T22:34]
+I[3, 用户 3, 2021-1-28T22:34]
...

可以看到这个处理逻辑是没有回撤数据的。其对应的 SQL 语义如下:

  • 数据源:消费到 Kafka 中数据后,将数据按照 partition by 的 Key 通过 Hash 分发策略发送到下游去重算子。
  • Deduplication 去重算子:处理时间语义下,如果是当前 Key 的第一条数据,则直接发往下游,如果判断(根据 State 中是否存储过该 Key)不是第一条,则直接丢弃。
  • 数据汇:接收到上游的数据之后,然后输出到外部存储引擎中。

⭐ 在 Deduplication 关于是否会出现回撤流,博主总结如下:

  • Order by 事件时间 DESC:会出现回撤流,因为当前 Key 下 可能会有 比当前事件时间还大的数据。
  • Order by 事件时间 ASC:会出现回撤流,因为当前 Key 下 可能会有 比当前事件时间还小的数据。
  • Order by 处理时间 DESC:会出现回撤流,因为当前 Key 下 可能会有 比当前处理时间还大的数据。
  • Order by 处理时间 ASC:不会出现回撤流,因为当前 Key 下 不可能会有 比当前处理时间还小的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/716308.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

COM - get VARIANT value - .vt = (VT_BSTR | VT_ARRAY)

文章目录 COM - get VARIANT value - .vt (VT_BSTR | VT_ARRAY)概述笔记END COM - get VARIANT value - .vt (VT_BSTR | VT_ARRAY) 概述 取到一个VARIANT值, .vt 0x2008, 查了一下, 0x2008 (VT_BSTR | VT_ARRAY) 查了资料, 这个vt 0x2008是BSTR的数组. 看看咋取值? 网上…

3.2 log |416. 分割等和子集,1049.最后一块石头的重量II,494.目标和

416. 分割等和子集 class Solution { public:bool canPartition(vector<int>& nums) {vector<int> dp(10001,0);int sumaccumulate(nums.begin(),nums.end(),0);if(sum%2) return false;int targetsum/2;for(int i0;i<nums.size();i){for(int jtarget;j>…

项目管理:高效推动项目成功的关键

项目管理&#xff1a;高效推动项目成功的关键 在当今竞争激烈的商业环境中&#xff0c;项目管理已成为企业实现目标和取得成功的关键因素。有效的项目管理不仅能够确保项目按时完成&#xff0c;还能在预算范围内达到预期的质量标准。本文将探讨项目管理的重要性、关键环节以及…

Maven安装并配置本地仓库

一、安装Maven 1.下载链接 Maven官网下载链接 Binary是可执行版本&#xff0c;已经编译好可以直接使用。 Source是源代码版本&#xff0c;需要自己编译成可执行软件才可使用。 tar.gz和zip两种压缩格式,其实这两个压缩文件里面包含的内容是同样的,只是压缩格式不同 tar.gz格…

Stable Video文本生成视频公测地址——Scaling Latent Video Diffusion Models to Large Datasets

近期&#xff0c;Stability AI发布了首个开放视频模型——"Stable Video"&#xff0c;该创新工具能够将文本和图像输入转化为生动的场景&#xff0c;将概念转换成动态影像&#xff0c;生成出电影级别的作品&#xff0c;旨在满足广泛的视频应用需求&#xff0c;包括媒…

STM32 DMA入门指导

什么是DMA DMA&#xff0c;全称直接存储器访问&#xff08;Direct Memory Access&#xff09;&#xff0c;是一种允许硬件子系统直接读写系统内存的技术&#xff0c;无需中央处理单元&#xff08;CPU&#xff09;的介入。下面是DMA的工作原理概述&#xff1a; 数据传输触发&am…

解决Java并发问题的常见思路

写在文章开头 近期对一些比较老的项目进行代码走查&#xff0c;碰到一些极端的并发编程恶习&#xff0c;所以笔者就基于此文演示这类问题以及面对并发编程时我们应该需要了解一些常见套路。 Hi&#xff0c;我是sharkChili&#xff0c;是个不断在硬核技术上作死的java coder&am…

基于 Amazon EKS 的 Stable Diffusion ComfyUI 部署方案

01 背景介绍 Stable Diffusion 作为当下最流行的开源 AI 图像生成模型在游戏行业有着广泛的应用实践&#xff0c;无论是 ToC 面向玩家的游戏社区场景&#xff0c;还是 ToB 面向游戏工作室的美术制作场景&#xff0c;都可以发挥很大的价值&#xff0c;如何更好地使用 Stable Dif…

scanf和cin的利弊

scanf和cin的利弊&#xff1a; scanf: 利&#xff1a;耗时短&#xff0c;写法方便输入固定格式&#xff0c;比如scanf(“%*d%d”,&a)&#xff0c;可以直接忽略第一个输入&#xff0c;不用创建新对象&#xff0c;再比如scanf(“%1d”,&x[i])&#xff0c;输入3214&#x…

卡牌——二分

卡牌 题目分析 想一下前面题的特点&#xff0c;是不是都出现了“最大边长”&#xff0c;“最小的数”这种字眼&#xff0c;那么这里出现了“最多能凑出多少套牌”&#xff0c;我们可以考虑用二分。接下来我们要看一下他是否符合二段性&#xff0c;二分的关键在于二段性。 第…

续Java的执行语句、方法--学习JavaEE的day07

day07 一、特殊的流程控制语句 break(day06) continue 1.理解&#xff1a; 作用于循环中&#xff0c;表示跳过循环体剩余的部分&#xff0c;进入到下一次循环 做实验&#xff1a; while(true){ System.out.println(“111”); System.out.println(“222”); if(true){ conti…

编译链接实战(25)gcc ASAN、MSAN检测内存越界、泄露、使用未初始化内存等内存相关错误

文章目录 1 ASAN1.1 介绍1.2 原理编译时插桩模块运行时库2 检测示例2.1 内存越界2.2 内存泄露内存泄露检测原理作用域外访问2.3 使用已经释放的内存2.4 将漏洞信息输出文件3 MSAN1 ASAN 1.1 介绍 -fsanitize=address是一个编译器选项,用于启用AddressSanitizer(地址

基于SpringBoot的教师考勤管理系统(赠源码)

作者主页&#xff1a;易学蔚来-技术互助文末获取源码 简介&#xff1a;Java领域优质创作者 Java项目、简历模板、学习资料、面试题库 教师考勤管理系统是基于JavaVueSpringBootMySQL实现的&#xff0c;包含了管理员、学生、教师三类用户。该系统实现了班级管理、课程安排、考勤…

基于springboot的足球俱乐部管理系统的设计与实现

** &#x1f345;点赞收藏关注 → 私信领取本源代码、数据库&#x1f345; 本人在Java毕业设计领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目希望你能有所收获&#xff0c;少走一些弯路。&#x1f345;关注我不迷路&#x1f345;** 一 、设计说明 1.1 课题…

2024.3.3每日一题

LeetCode 用队列实现栈 题目链接&#xff1a;225. 用队列实现栈 - 力扣&#xff08;LeetCode&#xff09; 题目描述 请你仅使用两个队列实现一个后入先出&#xff08;LIFO&#xff09;的栈&#xff0c;并支持普通栈的全部四种操作&#xff08;push、top、pop 和 empty&…

如何取消ChatGPT 4.0的自动续费和会员订阅(chatgpt4.0自動續費嗎)

如何取消ChatGPT 4.0的自动续费和会员订阅 ChatGPT 4.0自动续费是否存在 是的&#xff0c;ChatGPT 4.0 Plus会员服务存在自动续费功能。 ChatGPT 4.0 Plus会员服务自动续费 ChatGPT Plus会员服务的自动续费机制用户在购买ChatGPT 4.0 Plus会员服务后&#xff0c;系统会自动…

npm ERR! code ERESOLVE

1、问题概述&#xff1f; 执行npm install命令的时候报错如下&#xff1a; tangxiaochuntangxiaochundeMacBook-Pro stf % npm install npm ERR! code ERESOLVE npm ERR! ERESOLVE unable to resolve dependency tree npm ERR! npm ERR! While resol…

LeetCode102.二叉树的层序遍历

题目 给你二叉树的根节点 root &#xff0c;返回其节点值的 层序遍历 。 &#xff08;即逐层地&#xff0c;从左到右访问所有节点&#xff09;。 示例 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;[[3],[9,20],[15,7]]输入&#xff1a;root [1] 输出&am…

SpringCloud-MQ消息队列

一、消息队列介绍 MQ (MessageQueue) &#xff0c;中文是消息队列&#xff0c;字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。消息队列是一种基于生产者-消费者模型的通信方式&#xff0c;通过在消息队列中存放和传递消息&#xff0c;实现了不同组件、服务或系统…

2024全新手机软件下载应用排行、平台和最新发布网站,采用响应式织梦模板

这是一款简洁蓝色的手机软件下载应用排行、平台和最新发布网站&#xff0c;采用响应式织梦模板。 主要包括主页、APP列表页、APP详情介绍页、新闻资讯列表、新闻详情页、关于我们等模块页面。 地 址 &#xff1a; runruncode.com/php/19703.html 软件程序演示图&#xff1a;…