【大数据】Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

Flink SQL 语法篇》系列,共包含以下 10 篇文章:

  • Flink SQL 语法篇(一):CREATE
  • Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 语法篇(四):Group 聚合、Over 聚合
  • Flink SQL 语法篇(五):Regular Join、Interval Join
  • Flink SQL 语法篇(六):Temporal Join
  • Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 语法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 语法篇(九):Window TopN、Deduplication
  • Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

  • 1.EXPLAIN 子句
  • 2.USE 子句
  • 3.SHOW 子句
  • 4.LOAD、UNLOAD 子句
  • 5.SET、RESET 子句
  • 6.SQL Hints

1.EXPLAIN 子句

EXPLAIN 子句其实就是用于查看当前这个 SQL 查询的逻辑计划以及优化的执行计划。

SQL 语法标准:

EXPLAIN PLAN FOR <query_statement_or_insert_statement>

实际案例:

public class Explain_Test {public static void main(String[] args) throws Exception {FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);flinkEnv.env().setParallelism(1);String sql = "CREATE TABLE source_table (\n"+ "    user_id BIGINT COMMENT '用户 id',\n"+ "    name STRING COMMENT '用户姓名',\n"+ "    server_timestamp BIGINT COMMENT '用户访问时间戳',\n"+ "    proctime AS PROCTIME()\n"+ ") WITH (\n"+ "  'connector' = 'datagen',\n"+ "  'rows-per-second' = '1',\n"+ "  'fields.name.length' = '1',\n"+ "  'fields.user_id.min' = '1',\n"+ "  'fields.user_id.max' = '10',\n"+ "  'fields.server_timestamp.min' = '1',\n"+ "  'fields.server_timestamp.max' = '100000'\n"+ ");\n"+ "\n"+ "CREATE TABLE sink_table (\n"+ "    user_id BIGINT,\n"+ "    name STRING,\n"+ "    server_timestamp BIGINT\n"+ ") WITH (\n"+ "  'connector' = 'print'\n"+ ");\n"+ "\n"+ "EXPLAIN PLAN FOR\n"+ "INSERT INTO sink_table\n"+ "select user_id,\n"+ "       name,\n"+ "       server_timestamp\n"+ "from (\n"+ "      SELECT\n"+ "          user_id,\n"+ "          name,\n"+ "          server_timestamp,\n"+ "          row_number() over(partition by user_id order by proctime) as rn\n"+ "      FROM source_table\n"+ ")\n"+ "where rn = 1";/*** 算子 {@link org.apache.flink.streaming.api.operators.KeyedProcessOperator}*      -- {@link org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction}*/for (String innerSql : sql.split(";")) {TableResult tableResult = flinkEnv.streamTEnv().executeSql(innerSql);tableResult.print();}}
}

上述代码执行结果如下:

1. 抽象语法树
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
+- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2])+- LogicalFilter(condition=[=($3, 1)])+- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)])+- LogicalTableScan(table=[[default_catalog, default_database, source_table]])2. 优化后的物理计划
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
+- Calc(select=[user_id, name, server_timestamp])+- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME])+- Exchange(distribution=[hash[user_id]])+- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3])+- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])3. 优化后的执行计划
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
+- Calc(select=[user_id, name, server_timestamp])+- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME])+- Exchange(distribution=[hash[user_id]])+- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3])+- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])

2.USE 子句

如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,USE 子句通常被用于切换库,那么在 Flink SQL 体系中,它的作用也是和 MySQL 中 USE 子句的功能基本一致,用于切换 Catalog,DataBase,使用 Module。

  • 切换 Catalog
USE CATALOG catalog_name
  • 使用 Module
USE MODULES module_name1[, module_name2, ...]
  • 切换 Database
USE db名称

实际案例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// create a catalog
tEnv.executeSql("CREATE CATALOG cat1 WITH (...)");
tEnv.executeSql("SHOW CATALOGS").print();
// +-----------------+
// |    catalog name |
// +-----------------+
// | default_catalog |
// | cat1            |
// +-----------------+// change default catalog
tEnv.executeSql("USE CATALOG cat1");tEnv.executeSql("SHOW DATABASES").print();
// databases are empty
// +---------------+
// | database name |
// +---------------+
// +---------------+// create a database
tEnv.executeSql("CREATE DATABASE db1 WITH (...)");
tEnv.executeSql("SHOW DATABASES").print();
// +---------------+
// | database name |
// +---------------+
// |        db1    |
// +---------------+// change default database
tEnv.executeSql("USE db1");// change module resolution order and enabled status
tEnv.executeSql("USE MODULES hive");
tEnv.executeSql("SHOW FULL MODULES").print();
// +-------------+-------+
// | module name |  used |
// +-------------+-------+
// |        hive |  true |
// |        core | false |
// +-------------+-------+

3.SHOW 子句

如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,SHOW 子句常常用于查询库、表、函数等,在 Flink SQL 体系中也类似。Flink SQL 支持 SHOW 以下内容。

SQL 语法标准:

  • SHOW CATALOGS:展示所有 Catalog
  • SHOW CURRENT CATALOG:展示当前的 Catalog
  • SHOW DATABASES:展示当前 Catalog 下所有 Database
  • SHOW CURRENT DATABASE:展示当前的 Database
  • SHOW TABLES:展示当前 Database 下所有表
  • SHOW VIEWS:展示所有视图
  • SHOW FUNCTIONS:展示所有的函数
  • SHOW MODULES:展示所有的 Module(Module 是用于 UDF 扩展)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// show catalogs
tEnv.executeSql("SHOW CATALOGS").print();
// +-----------------+
// |    catalog name |
// +-----------------+
// | default_catalog |
// +-----------------+// show current catalog
tEnv.executeSql("SHOW CURRENT CATALOG").print();
// +----------------------+
// | current catalog name |
// +----------------------+
// |      default_catalog |
// +----------------------+// show databases
tEnv.executeSql("SHOW DATABASES").print();
// +------------------+
// |    database name |
// +------------------+
// | default_database |
// +------------------+// show current database
tEnv.executeSql("SHOW CURRENT DATABASE").print();
// +-----------------------+
// | current database name |
// +-----------------------+
// |      default_database |
// +-----------------------+// create a table
tEnv.executeSql("CREATE TABLE my_table (...) WITH (...)");
// show tables
tEnv.executeSql("SHOW TABLES").print();
// +------------+
// | table name |
// +------------+
// |   my_table |
// +------------+// create a view
tEnv.executeSql("CREATE VIEW my_view AS ...");
// show views
tEnv.executeSql("SHOW VIEWS").print();
// +-----------+
// | view name |
// +-----------+
// |   my_view |
// +-----------+// show functions
tEnv.executeSql("SHOW FUNCTIONS").print();
// +---------------+
// | function name |
// +---------------+
// |           mod |
// |        sha256 |
// |           ... |
// +---------------+// create a user defined function
tEnv.executeSql("CREATE FUNCTION f1 AS ...");
// show user defined functions
tEnv.executeSql("SHOW USER FUNCTIONS").print();
// +---------------+
// | function name |
// +---------------+
// |            f1 |
// |           ... |
// +---------------+// show modules
tEnv.executeSql("SHOW MODULES").print();
// +-------------+
// | module name |
// +-------------+
// |        core |
// +-------------+// show full modules
tEnv.executeSql("SHOW FULL MODULES").print();
// +-------------+-------+
// | module name |  used |
// +-------------+-------+
// |        core |  true |
// |        hive | false |
// +-------------+-------+

4.LOAD、UNLOAD 子句

我们可以使用 LOAD 子句去加载 Flink SQL 体系内置的或者用户自定义的 Module,UNLOAD 子句去卸载 Flink SQL 体系内置的或者用户自定义的 Module。

SQL 语法标准:

-- 加载
LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]-- 卸载
UNLOAD MODULE module_name
  • LOAD 案例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 加载 Flink SQL 体系内置的 Hive module
tEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')");
tEnv.executeSql("SHOW MODULES").print();
// +-------------+
// | module name |
// +-------------+
// |        core |
// |        hive |
// +-------------+
  • UNLOAD 案例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 卸载唯一的一个 CoreModule
tEnv.executeSql("UNLOAD MODULE core");
tEnv.executeSql("SHOW MODULES").print();
// 结果啥 Moudle 都没有了

5.SET、RESET 子句

SET 子句可以用于修改一些 Flink SQL 的环境配置,RESET 子句是可以将所有的环境配置恢复成默认配置,但只能在 SQL CLI 中进行使用,主要是为了让用户更纯粹的使用 SQL 而不必使用其他方式或者切换系统环境。

SET (key = value)?RESET (key)?

启动一个 SQL CLI 之后,在 SQL CLI 中可以进行以下 SET 设置:

Flink SQL> SET table.planner = blink;
[INFO] Session property has been set.Flink SQL> SET;
table.planner=blink;Flink SQL> RESET table.planner;
[INFO] Session property has been reset.Flink SQL> RESET;
[INFO] All session properties have been set to their default values.

6.SQL Hints

Hints(提示)是一种机制,用来告诉优化器按照我们的告诉它的方式生成执行计划。

比如有一个 Kafka 数据源表 kafka_table1,用户想直接从 latest-offset Select 一些数据出来预览,其元数据已经存储在 Hive MetaStore 中,但是 Hive MetaStore 中存储的配置中的 scan.startup.modeearliest-offset,通过 SQL Hints,用户可以在 DML 语句中将 scan.startup.mode 改为 latest-offset 查询,因此可以看出 SQL Hints 常用语这种比较临时的参数修改,比如 Ad-hoc 这种临时查询中,方便用户使用自定义的新的表参数而不是 Catalog 中已有的表参数。

以下 DML SQL 中的 /*+ OPTIONS(key=val [, key=val]*) */ 就是 SQL Hints。

SELECT *
FROM table_path /*+ OPTIONS(key=val [, key=val]*) */

启动一个 SQL CLI 之后,在 SQL CLI 中可以进行以下 SET 设置:

CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);-- 1. 使用 'scan.startup.mode'='earliest-offset' 覆盖原来的 scan.startup.mode
select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;-- 2. 使用 'scan.startup.mode'='earliest-offset' 覆盖原来的 scan.startup.mode
select * fromkafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1joinkafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2on t1.id = t2.id;-- 3. 使用 'sink.partitioner'='round-robin' 覆盖原来的 Sink 表的 sink.partitioner
insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/710449.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

day05_用户管理minIO角色分配(页面制作,查询用户,添加用户,修改用户,删除用户,用户头像,查询所有角色,保存角色数据)

文章目录 1 用户管理1.1 页面制作1.2 查询用户1.2.1 需求说明1.2.2 后端接口需求分析SysUserSysUserDtoSysUserControllerSysUserServiceSysUserMapperSysUserMapper.xml 1.2.3 前端对接实现思路sysUser.jssysRole.vue 1.3 添加用户1.3.1 需求说明1.3.2 页面制作1.3.3 后端接口…

ChatGPT-4 AI 绘图魔力释放

最近刚开通了 ChatGPT4&#xff0c;正好要设计一个网站图标&#xff0c;想测试一下它AI绘图的能力&#xff0c;让它根据文字描述生成一个想象中的图标 &#xff08;PS&#xff1a;如果想体验 GPT4 文生图&#xff0c;可以看这个教程 如何升级 ChatGPT 4.0&#xff09; 第1次交…

【三维重建】【SLAM】SplaTAM:基于3D高斯的密集RGB-D SLAM

题目&#xff1a;SplaTAM: Splat, Track & Map 3D Gaussians for Dense RGB-D SLAM 地址&#xff1a;spla-tam.github.io 机构&#xff1a;CMU&#xff08;卡内基梅隆大学&#xff09;、MIT&#xff08;美国麻省理工&#xff09; 总结&#xff1a;SplaTAM&#xff0c;一个新…

ywtool network命令

一.network功能介绍 network功能就是通过脚本的方式配置IP信息&#xff0c;分为4项: (1) 配置单网卡(2)配置br网桥(单网卡)(3)配置bond(两张网卡)(4)配置ovs网桥(单网卡) 日志文件:/var/log/ywtools/ywtools-network.log/usr/local/ywtools/config/config.ini中network参数:…

从预训练到通用智能(AGI)的观察和思考

1.预训练词向量 预训练词向量&#xff08;Pre-trained Word Embeddings&#xff09;是指通过无监督学习方法预先训练好的词与向量之间的映射关系。这些向量通常具有高维稠密特征&#xff0c;能够捕捉词语间的语义和语法相似性。最著名的预训练词向量包括Google的Word2Vec&#…

Go语言实战五-go语言的类型与iton

前言 go语言是静态类型语言&#xff0c;也就是在编译时编译器需要知道程序中值的类型&#xff0c;这样有利于减少bug和提高性能&#xff0c;具体就是内存的分配量和其中的内容 用户定义类型 1.用户定义新的类型&#xff0c;需要使用type 和struct关键字 type user struct {…

项目实现json字段

有些很复杂的信息&#xff0c;我们一般会用扩展字段传一个json串&#xff0c;字段一般用text类型存在数据库。mysql5.7以后支持json类型的字段&#xff0c;还可以进行sql查询与修改json内的某个字段的能力。 1.json字段定义 ip_info json DEFAULT NULL COMMENT ip信息, 2.按…

应用存储与持久化数据卷

1、PV 引入场景&#xff1a; ① Deployment 管理的 pod&#xff0c;在做镜像升级的过程中&#xff0c;会产生新的 pod并且删除旧的 pod &#xff0c;新旧 pod 之间如何复用数据&#xff1f; ② 宿主机宕机的时候&#xff0c;如何实现带卷迁移&#xff1f; ③ 多个 pod 之间&…

Redis 之三:发布订阅(pub/sub)

概念介绍 Redis 发布订阅 (pub/sub) 是一种消息通信模式&#xff0c;它允许客户端之间进行异步的消息传递 Redis 客户端可以订阅任意数量的频道。 模型中的角色 在该模型中&#xff0c;有三种角色&#xff1a; 发布者&#xff08;Publisher&#xff09;&#xff1a;负责发送信…

[分类指标]准确率、精确率、召回率、F1值、ROC和AUC、MCC马修相关系数

准确率、精确率、召回率、F1值 定义&#xff1a; 1、准确率&#xff08;Accuracy&#xff09; 准确率是指分类正确的样本占总样本个数的比例。准确率是针对所有样本的统计量。它被定义为&#xff1a; 准确率能够清晰的判断我们模型的表现&#xff0c;但有一个严重的缺陷&…

《Docker极简教程》--Docker在生产环境的应用--Docker在生产环境的监控

一、Docker监控的基本概念 1.1 容器监控 vs 主机监控 在 Docker 环境中&#xff0c;监控是确保系统稳定性和性能的关键活动之一。在监控 Docker 环境时&#xff0c;我们通常会关注容器监控和主机监控两个方面。 容器监控&#xff1a; 容器监控是指监视 Docker 容器本身的运行…

在Windows系统中启动Redis服务

前言 Redis是一个开源、高性能的键值对数据库&#xff0c;常用于缓存、消息队列等场景。本文将详细指导您如何在Windows系统上启动Redis服务。 第一步&#xff1a;确认Redis安装 确保您已经在Windows系统上成功安装了Redis。官方提供了预编译好的Windows版本&#xff0c;您可…

低代码中的可视化表单:效率与灵活兼备的设计工具

近年来&#xff0c;随着数字化转型的加速推进&#xff0c;企业对于高效率、灵活性和可定制性的软件开发需求不断增长。传统的软件开发过程通常需要耗费大量的时间和资源&#xff0c;而低代码开发平台的出现为企业提供了一种更加快速和灵活的解决方案。在低代码开发平台中&#…

Linux centos 变更MySQL数据存储路径

Linux centos 变更MySQL数据存储路径 登录mysql&#xff0c;查看数据存储路径创建新目录准备迁移数据检查是否配置成功 登录mysql&#xff0c;查看数据存储路径 mysql -u root -pshow global variables like "%datadir%";创建新目录 查看磁盘空间 df -h选取最大磁…

Android 11.0 framework系统修改安兔兔等显示的屏幕尺寸大小功能实现

1.前言 在11.0的系统rom定制化开发中,在使用第三方app检测系统的一些信息中,比如安兔兔 设备信息等检测app中,有时候显示的屏幕尺寸大小和 产品规格书等信息不同,稍微有些差异,所以就需要看下系统framework层中,相关的设备信息是怎么读出来的,然后做些调整 接下来就来分…

不可多得的干货,网易的朋友给我这份339页的Android面经

这里先放上目录 一 性能优化 1.如何对 Android 应用进行性能分析 android 性能主要之响应速度 和UI刷新速度。 首先从函数的耗时来说&#xff0c;有一个工具TraceView 这是androidsdk自带的工作&#xff0c;用于测量函数耗时的。 UI布局的分析&#xff0c;可以有2块&#x…

本届挑战赛亚军方案:基于大模型和多AGENT协同的运维

“轻舟已过万重山团队”荣获本届挑战赛亚军&#xff0c;该团队来自华为集团IT-UniAI 产品和openEuler系统智能团队。 方案介绍 自ChatGPT问世以来&#xff0c;AI迎来了奇点iPhone时刻&#xff0c;这一年来大模型深入影响企业办公&#xff0c;金融&#xff0c;广告&#xff0c;…

打造去中心化透明储蓄罐:Solidity智能合约的又一实践

一、案例背景 传统的储蓄罐通常是由个人或家庭使用&#xff0c;用于存放硬币或小额纸币。然而&#xff0c;这样的储蓄罐缺乏透明性&#xff0c;用户无法实时了解储蓄情况&#xff0c;也无法确保资金的安全性。 通过Solidity智能合约&#xff0c;我们可以构建一个去中心化…

转前端了!!

大家好&#xff0c;我是冰河~~ 没错&#xff0c;为了更好的设计和开发分布式IM即时通讯系统&#xff0c;也为了让大家能够直观的体验到分布式IM即时通讯系统的功能&#xff0c;冰河开始转战前端了。也就是说&#xff0c;整个项目从需求立项到产品设计&#xff0c;从架构设计到…

leetcode 热题 100_字母异位词分组

题解一&#xff1a; 排序&#xff1a;对两个字母异位词&#xff0c;二者排序后的字符串完全一样&#xff0c;因此可以对所给字符串进行排序&#xff0c;以排序后的字符串作为HashMap哈希表的键值&#xff0c;将排序前的字符串作为值进行存储分组&#xff0c;最后返回。 import…