【大数据】Flink SQL 语法篇(八):集合、Order By、Limit、TopN

Flink SQL 语法篇(八):集合、Order By、Limit、TopN

  • 1.集合操作
  • 2.Order By、Limit 子句
    • 2.1 Order By 子句
    • 2.2 Limit 子句
  • 3.TopN 子句

1.集合操作

集合操作支持 Batch / Streaming 任务。

在这里插入图片描述

  • UNION:将集合合并并且去重。
  • UNION ALL:将集合合并,不做去重。
Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c');
Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b');Flink SQL> (SELECT s FROM t1) UNION (SELECT s FROM t2);
+---+
|  s|
+---+
|  c|
|  a|
|  b|
|  d|
|  e|
+---+Flink SQL> (SELECT s FROM t1) UNION ALL (SELECT s FROM t2);
+---+
|  c|
+---+
|  c|
|  a|
|  b|
|  b|
|  c|
|  d|
|  e|
|  a|
|  b|
|  b|
+---+
  • Intersect:交集并且去重。
  • Intersect ALL:交集不做去重。
Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c');
Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b');
Flink SQL> (SELECT s FROM t1) INTERSECT (SELECT s FROM t2);
+---+
|  s|
+---+
|  a|
|  b|
+---+Flink SQL> (SELECT s FROM t1) INTERSECT ALL (SELECT s FROM t2);
+---+
|  s|
+---+
|  a|
|  b|
|  b|
+---+
  • Except:差集并且去重。
  • Except ALL:差集不做去重。
Flink SQL> (SELECT s FROM t1) EXCEPT (SELECT s FROM t2);
+---+
| s |
+---+
| c |
+---+Flink SQL> (SELECT s FROM t1) EXCEPT ALL (SELECT s FROM t2);
+---+
| s |
+---+
| c |
| c |
+---+

上述 SQL 在流式任务中,如果一条左流数据先来了,没有从右流集合数据中找到对应的数据时会直接输出,当右流对应数据后续来了之后,会下发回撤流将之前的数据给撤回。这也是一个回撤流。

  • In 子查询:这个大家比较熟悉了,但是注意,In 子查询的结果集只能有一列。
SELECT user, amount
FROM Orders
WHERE product IN (SELECT product FROM NewProducts
)

上述 SQL 的 In 子句其实就和之前介绍到的 Inner Join 类似。并且 In 子查询也会涉及到大状态问题,大家注意设置 State 的 TTL。

2.Order By、Limit 子句

2.1 Order By 子句

支持 Batch / Streaming,但在实时任务中一般用的非常少。

实时任务中,Order By 子句中 必须要有时间属性字段,并且时间属性必须为 升序 时间属性,即 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND 或者 WATERMARK FOR rowtime_column AS rowtime_column

举例:

CREATE TABLE source_table_1 (user_id BIGINT NOT NULL,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.user_id.min' = '1','fields.user_id.max' = '10'
);CREATE TABLE sink_table (user_id BIGINT
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECT user_id
FROM source_table_1
Order By row_time, user_id desc

2.2 Limit 子句

支持 Batch / Streaming,但实时场景一般不使用,但是此处依然举一个例子。

CREATE TABLE source_table_1 (user_id BIGINT NOT NULL,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.user_id.min' = '1','fields.user_id.max' = '10'
);CREATE TABLE sink_table (user_id BIGINT
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECT user_id
FROM source_table_1
Limit 3

结果如下,只有 3 条输出:

+I[5]
+I[9]
+I[4]

3.TopN 子句

TopN 定义(支持 Batch / Streaming):TopN 其实就是对应到离线数仓中的 row_number(),可以使用 row_number() 对某一个分组的数据进行排序。

应用场景:根据 某个排序 条件,计算 某个分组 下的排行榜数据。

SQL 语法标准:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name)
WHERE rownum <= N [AND conditions]
  • ROW_NUMBER():标识 TopN 排序子句。
  • PARTITION BY col1[, col2...]:标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序取 TopN,比如下述案例中的 partition by key,就是根据需求中的搜索关键词(key)做为分区。
  • ORDER BY col1 [asc|desc][, col2 [asc|desc]...]:标识 TopN 的排序规则,是按照哪些字段、顺序或逆序进行排序。
  • WHERE rownum <= N:这个子句是一定需要的,只有加上了这个子句,Flink 才能将其识别为一个 TopN 的查询,其中 N 代表 TopN 的条目数。
  • [AND conditions]:其他的限制条件也可以加上。

实际案例:取某个搜索关键词下的搜索热度前 10 名的词条数据。

输入数据为搜索词条数据的搜索热度数据,当搜索热度发生变化时,会将变化后的数据写入到数据源的 Kafka 中:

-- 数据源 schema-- 字段名         备注
-- key          搜索关键词
-- name         搜索热度名称
-- search_cnt    热搜消费热度(比如 3000)
-- timestamp       消费词条时间戳CREATE TABLE source_table (name BIGINT NOT NULL,search_cnt BIGINT NOT NULL,key BIGINT NOT NULL,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH (...
);-- 数据汇 schema-- key          搜索关键词
-- name         搜索热度名称
-- search_cnt    热搜消费热度(比如 3000)
-- timestamp       消费词条时间戳CREATE TABLE sink_table (key BIGINT,name BIGINT,search_cnt BIGINT,`timestamp` TIMESTAMP(3)
) WITH (...
);-- DML 逻辑
INSERT INTO sink_table
SELECT key, name, search_cnt, row_time as `timestamp`
FROM (SELECT key, name, search_cnt, row_time, -- 根据热搜关键词 key 作为 partition key,然后按照 search_cnt 倒排取前 100 名ROW_NUMBER() OVER (PARTITION BY keyORDER BY search_cnt desc) AS rownumFROM source_table)
WHERE rownum <= 100

输出结果:

-D[关键词1, 词条1, 4944]
+I[关键词1, 词条1, 8670]
+I[关键词1, 词条2, 1735]
-D[关键词1, 词条3, 6641]
+I[关键词1, 词条3, 6928]
-D[关键词1, 词条4, 6312]
+I[关键词1, 词条4, 7287]

可以看到输出数据是有回撤数据的,为什么会出现回撤,我们来看看 SQL 语义。

上面的 SQL 会翻译成以下三个算子:

  • 数据源:数据源即最新的词条下面的搜索词的搜索热度数据,消费到 Kafka 中数据后,按照 partition key 将数据进行 Hash 分发到下游排序算子,相同的 Key 数据将会发送到一个并发中。
  • 排序算子:为每个 Key 维护了一个 TopN 的榜单数据,接受到上游的一条数据后,如果 TopN 榜单还没有到达 N 条,则将这条数据加入 TopN 榜单后,直接下发数据,如果到达 N 条之后,经过 TopN 计算,发现这条数据比原有的数据排序靠前,那么新的 TopN 排名就会有变化,就变化了的这部分数据之前下发的排名数据撤回(即回撤数据),然后下发新的排名数据。
  • 数据汇:接收到上游的数据之后,然后输出到外部存储引擎中。

上面三个算子也是会 24 小时一直运行的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/708300.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DataGrip 2023:让数据库开发变得更简单、更高效 mac/win版

JetBrains DataGrip 2023是一款功能强大的数据库IDE&#xff0c;专为数据库开发和管理而设计。通过DataGrip&#xff0c;您可以连接到各种关系型数据库管理系统(RDBMS)&#xff0c;并使用其提供的一组工具来查询、管理、编辑和开发数据库。 DataGrip 2023 软件获取 DataGrip …

[unity]lua热更新——个人复习笔记【侵删/有不足之处欢迎斧正】

一、AssetBundle AB包是特定于平台的资产压缩包&#xff0c;类似于压缩文件 相对于RESOURCES下的资源&#xff0c;AB包更加灵活轻量化&#xff0c;用于减小包体大小和热更新 可以在unity2019环境中直接下载Asset Bundle Browser 可以在其中设置关联 AB包生成的文件 AB包文件…

【Linux】云服务器的Redis被黑

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;Linux ⛺️稳中求进&#xff0c;晒太阳 攻击发现&#xff1a; 这个异常情况是在腾讯云被入侵后&#xff0c;短信提醒发现的。并没有系统的学习过关于服务器安防相关的知识&#xff0c;遇到…

国产动漫|基于Springboot的国产动漫网站设计与实现(源码+数据库+文档)

国产动漫网站目录 目录 基于Springboot的国产动漫网站设计与实现 一、前言 二、系统功能设计 三、系统功能设计 1、用户信息管理 2、国漫先驱管理 3、国漫之最管理 4、公告信息管理 四、数据库设计 1、实体ER图 五、核心代码 六、论文参考 七、最新计算机毕设选题…

C语言中如何进行内存管理

主页&#xff1a;17_Kevin-CSDN博客 收录专栏&#xff1a;《C语言》 C语言是一种强大而灵活的编程语言&#xff0c;但与其他高级语言不同&#xff0c;它要求程序员自己负责内存的管理。正确的内存管理对于程序的性能和稳定性至关重要。 一、引言 C 语言是一门广泛使用的编程语…

VPX基于全国产飞腾FT-2000+/64核+复旦微FPGA的计算刀片

6U VPX计算板 产品简介 产品特点 飞腾计算平台&#xff0c;国产化率100% VPX-MPU6902是一款基于飞腾FT-2000/64核的计算刀片&#xff0c;主频2.2GHz&#xff0c;负责业务数据流的管控和调度。搭配自带独立显示芯片的飞腾X100芯片&#xff0c;可用于于各类终端及服务器类应用场…

spring boot整合cache使用memcached

之前讲了 spring boot 整合 cache 做 simple redis Ehcache 三种工具的缓存 上文 windows系统下载安装 memcached 我们装了memcached 但spring boot没有将它的整合纳入进来 那么 我们就要自己来处理客户端 java历史上 有过三种客户端 那么 我们用肯定是用最好的 Xmemcached …

vue2 + axios + mock.js封装过程,包含mock.js获取数据时报404状态的解决记录,带图文,超详细!!!

vue axios mock.js 以下是封装的过程&#xff0c;记录一下 1、首先先了解什么是mock.js的用途及特点 官网地址&#xff1a;Mock.js (mockjs.com) 作用&#xff1a;生成随机数据&#xff0c;拦截 Ajax 请求 优势&#xff1a; 2、了解axios的原理及使用 官网地址&#xff1a…

大模型(LLM)的token学习记录-I

文章目录 基本概念什么是token?如何理解token的长度&#xff1f;使用openai tokenizer 观察token的相关信息open ai的模型 token的特点token如何映射到数值&#xff1f;token级操作&#xff1a;精确地操作文本token 设计的局限性 tokenizationtoken 数量对LLM 的影响训练模型参…

研发日记,MatlabSimulink开箱报告(九)——Simulink Test模块

文章目录 前言 Simulink Test模块 静态测试 动态测试 逻辑测试 前言 见《开箱报告&#xff0c;Simulink Toolbox库模块使用指南&#xff08;四&#xff09;——S-Fuction模块》 见《开箱报告&#xff0c;Simulink Toolbox库模块使用指南&#xff08;五&#xff09;——S-F…

练习 2 Web [ACTF2020 新生赛]BackupFile 1

[ACTF2020 新生赛]BackupFile 1 Web常规题目 首先尝试查找常见的前端页面index.php之类的&#xff0c;没找到 题目有个“BackupFile”——备份文件 尝试用工具遍历查找相关的文件 御剑没扫出来&#xff0c;搜索搭建好dirsearch后&#xff0c;扫出来的index.php.bak 扫描工…

迟到的VNCTF2024逆向题WP

这次比赛因为有事外出&#xff0c;只做了前两题&#xff0c;最近有空才把另外3题也做出来&#xff0c;总体来说比以往的VNCTF逆向题目要难一些。当然也有可能是我水平退步了&#xff0c;就算有时间参加比赛&#xff0c;应该也做不完这5题。VN的小伙伴越来越厉害了&#xff0c;出…

猜猜心里数字(个人学习笔记黑马学习)

1.定义一个变量&#xff0c;数字类型&#xff0c;内容随意 2.基于input语句输入猜想的数字&#xff0c;通过if和多次elif的组合&#xff0c;判断猜想数字是否和心里数字一致 num5if int(input("请输入第一次猜想的数字&#xff1a;"))5:print("猜对了&#xff0…

ROS 2基础概念#1:计算图(Compute Graph)| ROS 2学习笔记

在ROS中&#xff0c;计算图&#xff08;ROS Compute Graph&#xff09;是一个核心概念&#xff0c;它描述了ROS节点之间的数据流动和通信方式。它不仅仅是一个通信网络&#xff0c;它也反映了ROS设计哲学的核心——灵活性、模块化和可重用性。通过细致探讨计算图的高级特性和实…

Java中使用Jsoup实现网页内容爬取与Html内容解析并使用EasyExcel实现导出为Excel文件

场景 Pythont通过request以及BeautifulSoup爬取几千条情话&#xff1a; Pythont通过request以及BeautifulSoup爬取几千条情话_爬取情话-CSDN博客 Node-RED中使用html节点爬取HTML网页资料之爬取Node-RED的最新版本&#xff1a; Node-RED中使用html节点爬取HTML网页资料之爬…

远程服务器Ubuntu 18.04安装VNC远程桌面

一、安装vnc 1.安装图形化界面工具 # 安装过程中会弹窗让选择配置&#xff0c;选lightdm sudo apt install ubuntu-desktop sudo apt-get install gnome-panel gnome-settings-daemon metacity nautilus gnome-terminal 2.安装vnc sudo apt-get install x11vnc3.安装LightD…

ifcplusplus 示例 函数中英文 对照分析

有需求&#xff0c;需要分析 ifc c渲染&#xff0c;分析完&#xff0c;有 230个函数&#xff0c;才能完成一个加载&#xff0c;3d加载真的是大工程&#xff01; 函数中英文对照表&#xff0c;方便 日后开发&#xff0c;整理思路顺畅&#xff01;&#xff01;&#xff01;&#…

线性规划基础

利用一个简单的实例来介绍什么事线性规划&#xff0c;假设如果有一家巧克力工厂需要生产两种不同类型的巧克力&#xff0c;分别是类型A和类型B&#xff0c;两种巧克力用到的原材料是一样的&#xff0c;都是使用牛奶和可可两种材料&#xff0c;主要的区别是在与这两种原料的配料…

C语言实现21点游戏【单人模式,双人模式,单-多电脑模式】,21点又名黑杰克(英文:Blackjack)

项目背景&#xff1a; 21点又名黑杰克&#xff08;英文&#xff1a;Blackjack&#xff09; &#xff0c;起源于法国&#xff0c;已流传到世界各地。21点&#xff0c;是一种使用扑克牌玩的赌博游戏。亦是唯一一种在赌场中可以在概率中战胜庄家的一种赌博游戏。 现在在世界各地…

k8s初始化报错 [ERROR CRI]: container runtime is not running: ......

一、环境参数 linux系统为centos7kubernetes版本为v1.28.2containerd版本为1.6.28 二、报错内容 执行初始化命令kubeadm init命令时报错&#xff0c;内容如下 error execution phase preflight: [preflight] Some fatal errors occurred:[ERROR CRI]: container runtime is…