【大数据】Flink SQL 语法篇(二):WITH、SELECT WHERE、SELECT DISTINCT

Flink SQL 语法篇(二)

  • 1.WITH 子句
  • 2.SELECT & WHERE 子句
  • 3.SELECT DISTINCT 子句

1.WITH 子句

应用场景(支持 Batch / Streaming):With 语句和离线 Hive SQL With 语句一样的,语法糖 +1,使用它可以让你的代码逻辑更加清晰。

-- 语法糖 +1
WITH orders_with_total AS (SELECT order_id, price + tax AS totalFROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id;

2.SELECT & WHERE 子句

应用场景(支持 Batch / Streaming):SELECT & WHERE 语句和离线 Hive SQL 语句一样的,常用作 ETL,过滤,字段清洗标准化。

INSERT INTO target_table
SELECT * FROM OrdersINSERT INTO target_table
SELECT order_id, price + tax FROM OrdersINSERT INTO target_table
-- 自定义 Source 的数据
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1))  AS t (order_id, price)INSERT INTO target_table
SELECT price + tax FROM Orders WHERE id = 10-- 使用 UDF 做字段标准化处理
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
-- 过滤条件
Where id > 3

其实理解一个 SQL 最后生成的任务是怎样执行的,最好的方式就是理解其语义。

以下面的 SQL 为例,我们来介绍下其在离线中和在实时中执行的区别,对比学习一下,大家就比较清楚了。

INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
Where id > 3

这个 SQL 对应的实时任务,假设 Orders 为 Kafka,target_table 也为 Kafka,在执行时,会生成三个算子:

  • 数据源算子From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Orders Kafka 中一条一条的读取数据,然后一条一条发送给下游的 过滤和字段标准化算子
  • 过滤和字段标准化算子Where id > 3PRETTY_PRINT(order_id)):接收到上游算子发的一条一条的数据,然后判断 id > 3 ?,将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,一条一条将计算结果数据发给下游 数据汇算子
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。

可以看到这个实时任务的所有算子是以一种 Pipeline 模式运行的,所有的算子在同一时刻都是处于 running 状态的,24 小时一直在运行,实时任务中也没有离线中常见的分区概念。

在这里插入图片描述

⭐ 关于看如何看一段 Flink SQL 最终的执行计划:最好的方法就如上图,看 Flink Web UI 的算子图,算子图上详细的标记清楚了每一个算子做的事情。

以上图来说,我们可以看到主要有三个算子:

  • Source 算子。Source: TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, name]) -> Calc(select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]) -> WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)])
    • 其中 Source 表名称为 table=[[default_catalog, default_database, Orders]
    • 字段为 select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]
    • Watermark 策略为 rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]
  • 过滤算子。Calc(select=[order_id, name, row_time], where=[(order_id > 3)]) -> NotNullEnforcer(fields=[order_id])
    • 其中过滤条件为 where=[(order_id > 3)]
    • 结果字段为 select=[order_id, name, row_time]
  • Sink 算子。Sink: Sink(table=[default_catalog.default_database.target_table], fields=[order_id, name, row_time])
    • 其中最终产出的表名称为 table=[default_catalog.default_database.target_table]
    • 表字段为 fields=[order_id, name, row_time]

可以看到 Flink SQL 具体执行了哪些操作是非常详细的标记在算子图上。所以小伙伴萌一定要学会看算子图,这是掌握 Debug、调优前最基础的一个技巧。

那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个类似的算子(虽然实际可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),离线和实时任务的执行方式完全不同:

  • 数据源算子From Order):数据源从 Orders Hive 表(通常都是读一天、一小时的分区数据)中一次性读取所有的数据,然后将读到的数据全部发给下游 过滤和字段标准化算子,然后 数据源算子 就运行结束了,释放资源了。
  • 过滤和字段标准化算子Where id > 3PRETTY_PRINT(order_id)):接收到上游算子的所有数据,然后遍历所有数据判断 id > 3 ?,将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,将所有数据发给下游 数据汇算子,然后 过滤和字段标准化算子 就运行结束了,释放资源了
  • 数据汇算子INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 表中,然后整个任务就运行结束了,整个任务的资源也就都释放了

可以看到离线任务的算子是分阶段(Stage)进行运行的,每一个 Stage 运行结束之后,然后下一个 Stage 开始运行,全部的 Stage 运行完成之后,这个离线任务就跑结束了。

注意:很多小伙伴都是之前做过离线数仓的,熟悉了离线的分区、计算任务定时调度运行这两个概念,所以在最初接触 Flink SQL 时,会以为 Flink SQL 实时任务也会存在这两个概念,这里博主做一下解释:

  • 分区概念:离线由于能力限制问题,通常都是进行一批一批的数据计算,每一批数据的数据量都是有限的集合,这一批一批的数据自然的划分方式就是时间,比如按小时、天进行划分分区。但是 在实时任务中,是没有分区的概念的,实时任务的上游、下游都是无限的数据流。
  • 计算任务定时调度概念:同上,离线就是由于计算能力限制,数据要一批一批算,一批一批输入、产出,所以要按照小时、天定时的调度和计算。但是 在实时任务中,是没有定时调度的概念的,实时任务一旦运行起来就是 24 小时不间断,不间断的处理上游无限的数据,不简单的产出数据给到下游。

3.SELECT DISTINCT 子句

应用场景(支持 Batch / Streaming):语句和离线 Hive SQL SELECT DISTINCT 语句一样的,用作根据 key 进行数据去重。

INSERT into target_table
SELECT DISTINCT id 
FROM Orders

也是拿离线和实时做对比。

这个 SQL 对应的实时任务,假设 Orders 为 kafka,target_table 也为 Kafka,在执行时,会生成三个算子:

  • 数据源算子From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Orders Kafka 中一条一条的读取数据,然后一条一条发送给下游的 去重算子
  • 去重算子DISTINCT id):接收到上游算子发的一条一条的数据,然后判断这个 id 之前是否已经来过了,判断方式就是使用 Flink 中的 state 状态,如果状态中已经有这个 id 了,则说明已经来过了,不往下游算子发,如果状态中没有这个 id,则说明没来过,则往下游算子发,也是一条一条发给下游 数据汇算子
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。

在这里插入图片描述
对于实时任务,计算时的状态可能会无限增长。状态大小取决于不同 key(上述案例为 id 字段)的数量。为了防止状态无限变大,我们可以设置状态的 TTL。但是这可能会影响查询结果的正确性,比如某个 key 的数据过期从状态中删除了,那么下次再来这么一个 key,由于在状态中找不到,就又会输出一遍。

那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个相同的算子(虽然可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),但是其和实时任务的执行方式完全不同:

  • 数据源算子From Order):数据源从 Orders Hive 表(通常都有天、小时分区限制)中一次性读取所有的数据,然后将读到的数据全部发给下游 去重算子,然后 数据源算子 就运行结束了,释放资源了。
  • 去重算子DISTINCT id):接收到上游算子的所有数据,然后遍历所有数据进行去重,将去重完的所有结果数据发给下游 数据汇算子,然后 去重算子 就运行结束了,释放资源了。
  • 数据汇算子INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 中,然后整个任务就运行结束了,整个任务的资源也就都释放了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/655459.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

inode生命周期

1.添加inode到inode cache链表 当inode的引用计数器i_count为0后,会调用iput_final去释放 static void iput_final(struct inode *inode) {struct super_block *sb inode->i_sb;const struct super_operations *op inode->i_sb->s_op;unsigned long sta…

PaddleNLP的简单使用

1 介绍 PaddleNLP是一个基于PaddlePaddle深度学习平台的自然语言处理(NLP)工具库。 它提供了一系列用于文本处理、文本分类、情感分析、文本生成等任务的预训练模型、模型组件和工具函数。 PaddleNLP有统一的应用范式:通过 paddlenlp.Task…

数据结构-数组(详细讲解)

文章目录 数组数组的概述数组的图示一维数组二维数组 数组的定义一维数组的定义二维数组的定义 数组的取值赋值一维数组二维数组 数组的操作一维数组的操作索引实现指针实现 二位数组的操作矩阵转三元组矩阵的乘法 数组 数组的概述 概述:数组是一种线性数据结构&a…

C# 使用WMI监听进程的启动和关闭

写在前面 Windows Management Instrumentation(WMI)是用于管理基于 Windows 操作系统的数据和操作的基础结构。具体的API可以查看 WMI编程手册。 WMIC 是WMI的命令行管理工具,使用 WMIC,不但可以管理本地计算机,还可…

粒子群算法求解港口泊位调度问题(MATLAB代码)

粒子群算法(Particle Swarm Optimization,PSO)是一种基于群体智能的优化算法,它通过模拟鸟群或鱼群的行为来寻找最优解。在泊位调度问题中,目标是最小化所有船只在港时间的总和,而PSO算法可以帮助我们找到一…

Java把列表数据导出为PDF文件,同时加上PDF水印

一、实现效果 二、遇到的问题 实现导出PDF主体代码参考:Java纯代码实现导出PDF功能,下图是原作者实现的效果 导出报错Font STSong-Light with UniGB-UCS2-H is not recognized.。参考:itext 生成 PDF(五) 使用外部字体 网上都是说jar包的版本…

FastBee开源物联网平台2.0开源版发布啦!!!

一、项目介绍 物美智能(wumei-smart)更名为蜂信物联(FastBee)。 FastBee开源物联网平台,简单易用,更适合中小企业和个人学习使用。适用于智能家居、智慧办公、智慧社区、农业监测、水利监测、工业控制等。 系统后端采用Spring boot;前端采用…

成功解决AttributeError: ‘str‘ object has no attribute ‘decode‘

成功解决AttributeError: ‘str’ object has no attribute ‘decode’. 🌵文章目录🌵 🌳引言🌳🌳报错分析及解决方案🌳🌳参考文章🌳🌳结尾🌳 🌳引…

Git安装,Git镜像,Git已安装但无法使用解决经验

git下载地址&#xff1a; Git - 下载 (git-scm.com) <-git官方资源 Git for Windows (github.com) <-github资源 CNPM Binaries Mirror (npmmirror.com) <-阿里镜像&#xff08;推荐&#xff0c;镜…

算法沉淀——前缀和(leetcode真题剖析)

算法沉淀——前缀和 01.一维前缀和02.二维前缀和03.寻找数组的中心下标04.除自身以外数组的乘积05.和为 K 的子数组06.和可被 K 整除的子数组07.连续数组08.矩阵区域和 前缀和算法是一种用于高效计算数组或序列中某个范围内元素之和的技巧。它通过预先计算数组的前缀和&#xf…

Redhat 8.4 一键安装 Oracle 11GR2 单机版

Oracle 一键安装脚本&#xff0c;演示 Redhat 8.4 一键安装 Oracle 11GR2 单机版过程&#xff08;全程无需人工干预&#xff09;&#xff1a;&#xff08;脚本包括 ORALCE PSU/OJVM 等补丁自动安装&#xff09; ⭐️ 脚本下载地址&#xff1a;Shell脚本安装Oracle数据库 脚本…

宝塔控制面板配置SSL证书实现网站HTTPS

宝塔安装SSL证书提前申请好SSL证书&#xff0c;如果还没有&#xff0c;先去Gworg里面申请&#xff0c;一般几分钟就可以下来&#xff0c;申请地址&#xff1a;首页-Gworg官方店-淘宝网 一、登录邮箱下载&#xff1a;Gworg证书文件目录 &#xff0c;都会有以下五个文件夹。宝塔…

ROS学习笔记11——ROS中的重名问题

一、ros功能包重名——ros工作空间覆盖 功能包重名时&#xff0c;会按照 ROS_PACKAGE_PATH 查找&#xff0c;在前的会优先执行。ROS 会解析 .bashrc 文件&#xff0c;并生成 ROS_PACKAGE_PATH ROS包路径&#xff0c;即调用功能包的顺序&#xff0c;该变量中按照 .bashrc 中配置…

Blender教程(基础)-内插面、分离、环切、倒角-08

一、内插面 菜单位置如下图位置。 单击需要处理的面&#xff0c;出现一个黄色的圈。 1、菜单选中内插 鼠标悬停在黄色圈内单击左键可以来回实现内插&#xff0c;但是发现并不好操作。 2、快捷键内插 在选中需要操作的面之后&#xff0c;鼠标移动到外面&#xff0c;键盘在英…

【linux】复制cp和硬连接、软连接的区别? innode 关系?

1.命令&#xff1a; cp -r [源文件或目录] [目的目录] #复制 ln -s [被链接的文件] [链接的目录/名称] #软连接 ln [被链接的文件] [链接的目录/名称] #硬连接 注&#xff1a;cp -r 会把所有source当作普通文件&#xff08;regular文件&#xff09;&#x…

【CMU-自主导航与规划】M-TARE planner 配置与运行

M-TARE docker M-TARE 源码 一、依赖 Docker, Docker Compose, NVIDIA Container Toolkit, Nvidia GPU Driver&#xff08;需要至少2个&#xff0c;带Nvidia GPU&#xff09; 1.1 Docker docker -v #查询版本1.2 Docker Compose docker compose version1.3 …

Python 数据分析实战——社交游戏的用户流失?酒卷隆治_案例2

# 什么样的顾客会选择离开 # 数据集 DAU : 每天至少来访问一次的用户数据 数据内容 数据类型 字段名 访问时间 string&#xff08;字符串&#xff09; log_data 应用名称 string&#xff08;字符串&#xff09; app_name 用户 ID int&#xff08;数值&#xff09; user_id…

时隔3年 | 微软 | Windows Server 2025 重磅发布

最新功能 以下是微软产品团队正在努力的方向&#xff1a; Windows Server 2025 为所有人提供的热补丁下一代 AD 活动目录和 SMB数据与存储Hyper-V 和人工智能还有更多… Ignite 发布视频 Windows Server 2025 Ignite Video 介绍 Windows Server 2022 正式发布日期是2021年…

Linux部署lomp环境,安装typecho、WordPress博客

部署lomp环境&#xff0c;安装typecho、WordPress博客 一、环境要求1.1.版本信息1.2.准备阿里云服务器【新用户免费使用三个月】1.3.准备远程工具【FinalShell】 二、Linux下安装openresty三、Linux下安装Mysql四、安装Apache【此步骤可省略】4.1.安装Apache服务及其扩展包4.2.…

EasyExcel通用导入 | 简单封装

0. 前言&#xff1a;1. 基本思路&#xff1a;2. 调用代码&#xff1a; 0. 前言&#xff1a; 之前做了好几个导入&#xff0c;用EasyExcel每次都要定义监听器去处理&#xff0c;就想能不能做个通用的方式&#xff0c;如下 1. 基本思路&#xff1a; 导入无非主要就是参数校验和数…