(二开)Flink 修改源码拓展 SQL 语法

1、Flink 扩展 calcite 中的语法解析
1)定义需要的 SqlNode 节点类-以 SqlShowCatalogs 为例
a)类位置

flink/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java

在这里插入图片描述

核心方法

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {writer.keyword("SHOW CATALOGS");}
b)类血缘

在这里插入图片描述

2)修改 includes 目录下的 .ftl 文件,在 parserImpls.ftl 文件中添加语法逻辑
a)文件位置

在这里插入图片描述

b)语法示例
/**
* Parse a "Show Catalogs" metadata query command.
*/
SqlShowCatalogs SqlShowCatalogs() :
{
}
{<SHOW> <CATALOGS>{return new SqlShowCatalogs(getPos());}
}
3)将 Calcite 源码中的 config.fmpp 文件复制到项目的 src/main/codegen 目录下,修改内容,来声明扩展的部分
a)文件位置

在这里插入图片描述

b)config.fmpp 内容
data: {# 解析器文件路径parser: tdd(../data/Parser.tdd)
}# 扩展文件的目录
freemarkerLinks: {includes: includes/
}
c)Parser.tdd 部分内容
# 生成的解析器包路径
package: "org.apache.flink.sql.parser.impl",
# 解析器名称
class: "FlinkSqlParserImpl",
# 引入的依赖类
"org.apache.flink.sql.parser.dql.SqlShowCatalogs"
# 新的关键字
keywords: ["CATALOGS"]
# 新增的语法解析方法
statementParserMethods: ["SqlShowCatalogs()"]
# 包含的扩展语法文件
implementationFiles: ["parserImpls.ftl"]
4)编译模板文件和语法文件

在这里插入图片描述

5)配置扩展的解析器类
withParserFactory(FlinkSqlParserImpl.FACTORY)
2、自定义扩展 Flink 的 Parser 语法
1)定义 SqlNode 类
package org.apache.flink.sql.parser.dql;import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;import java.util.Collections;
import java.util.List;/** XSHOW CATALOGS sql call. */
public class SqlXShowCatalogs extends SqlCall {public static final SqlSpecialOperator OPERATOR =new SqlSpecialOperator("XSHOW CATALOGS", SqlKind.OTHER);public SqlXShowCatalogs(SqlParserPos pos) {super(pos);}@Overridepublic SqlOperator getOperator() {return OPERATOR;}@Overridepublic List<SqlNode> getOperandList() {return Collections.emptyList();}@Overridepublic void unparse(SqlWriter writer, int leftPrec, int rightPrec) {writer.keyword("XSHOW CATALOGS");}
}
2)修改 includes 目录下的 parserImpls.ftl 文件
/**
* Parse a "XShow Catalogs" metadata query command.
*/
SqlXShowCatalogs SqlXShowCatalogs() :
{
}
{<XSHOW> <CATALOGS>{return new SqlXShowCatalogs(getPos());}
}
3)修改 Parser.tdd 文件,新增-声明拓展的部分
imports:"org.apache.flink.sql.parser.dql.SqlXShowCatalogs"keywords:"XSHOW"statementParserMethods:"SqlXShowCatalogs()"
4)重新编译
 mvn generate-resources
5)执行测试用例

可以看到,自定义 SQL 的报错,由解析失败,变为了校验失败。

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class CustomFlinkSql {public static void main(String[] args) throws Exception {TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().build());// 拓展自定义语法 xshow catalogs 前// SQL parse failed. Non-query expression encountered in illegal contexttEnv.executeSql("xshow catalogs").print();// 拓展自定义语法 xshow catalogs 后// SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall}
}
6)查看生成的扩展解析器类

可以看到,在 FlinkSqlParserImpl 中,自定义的解析语法已经生成了。

在这里插入图片描述

3、validate 概述

在向 Flink 中添加完自定义的解析规则后,报错信息如下:

SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
修改 validate 部分的代码
1)FlinkPlannerImpl#validate

作用:校验 SqlNode ,如果是 show catalogs 语法时直接返回。

在这里插入图片描述

sqlNode.isInstanceOf[SqlXShowCatalogs]
2)SqlToOperationConverter#convert

作用:将校验过的 SqlNode 转换为 Operator。

在这里插入图片描述

else if (validated instanceof SqlXShowCatalogs) {return Optional.of(converter.convertXShowCatalogs((SqlXShowCatalogs) validated));
}
3)SqlToOperationConverter#convertXShowCatalogs
/** Convert SHOW CATALOGS statement. */
private Operation convertXShowCatalogs(SqlXShowCatalogs sqlXShowCatalogs) {return new XShowCatalogsOperation();
}
4)XShowCatalogsOperation
package org.apache.flink.table.operations;public class XShowCatalogsOperation implements ShowOperation {@Overridepublic String asSummaryString() {return "SHOW CATALOGS";}
}
4、执行测试用例
package org.apache.flink.table.examples.java.custom;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class CustomFlinkSql {public static void main(String[] args) throws Exception {TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().build());// FlinkSQL原本支持的语法tEnv.executeSql("show catalogs").print();// 自定义语法tEnv.executeSql("xshow catalogs").print();}
}

在这里插入图片描述

5、总结-FlinkSQL 的执行流程
1、对 SQL 进行校验final SqlNode validated = flinkPlanner.validate(sqlNode);2、预校验重写 Insert 语句3、调用 SqlNode.validate() 进行校验1)如果是:ExtendedSqlNode【SqlCreateHiveTable、SqlCreateTable、SqlTableLike】2)如果是:SqlKind.DDL、SqlKind.INSERT 等,无需校验,直接返回 SqlNode3)如果是:SqlRichExplain4)其它:validator.validate(sqlNode)1.校验作用域和表达式:validateScopedExpression(topNode, scope)a)将 SqlNode 进行规范化重写b)如果SQL是【TOP_LEVEL = concat(QUERY, DML, DDL)】,则在父作用域中注册查询c)校验 validateQuery i)validateFeatureii)validateNamespaceiii)validateModalityiv)validateAccessv)validateSnapshotd)如果SQL不是【TOP_LEVEL = concat(QUERY, DML, DDL)】进行类型推导2.获取校验之后的节点类型2、将 SQLNode 转换为 Operationconverter.convertSqlQuery(validated)1)生成逻辑执行计划 RelNodeRelRoot relational = planner.rel(validated);1.对查询进行转换sqlToRelConverter.convertQuery(validatedSqlNode)2)创建 PlannerQueryOperationnew PlannerQueryOperation(relational.project());3、将 Operation 转换为 List<Transformation<?>>
List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));1)对 RelNode 逻辑执行计划进行优化,获取 optimizedRelNodesval optimizedRelNodes = optimize(relNodes)2)将 optimizedRelNodes 转换为 execGraphval execGraph = translateToExecNodeGraph(optimizedRelNodes)3)将 execGraph 转换为 transformations1.使用代码生成技术生成Function,后续可以反射调用val convertFunc = CodeGenUtils.genToInternalConverter(ctx, inputType)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/119548.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++】priority_queue仿函数

今天我们来学习C中另一个容器适配器&#xff1a;优先级队列——priority_queue&#xff1b;和C一个重要组件仿函数&#xff1a; 目录 一、priority_queue 1.1 priority_queue是什么 1.2 priority_queue的接口 1.2.1 priority_queue使用举例 二、仿函数 三、关于priority…

Docker GitLab-Runner安装

Docker GitLab-Runner安装 GitLab-Runner安装 问题合集GitLab 域名的配置修改Runner容器内注册失败&#xff0c;提示 dial tcp: lookup home.zsl0.com on 192.168.254.2:53: no such host GitLab-Runner 安装 拉去gitlab/gitlab-runner镜像 docker pull gitlab/gitlab-runne…

适用于 Windows 10 和 Windows 11 设备的笔记本电脑管理软件

便携式计算机管理软件使 IT 管理员能够简化企业中使用的便携式计算机的部署和管理&#xff0c;当今大多数员工使用Windows 笔记本电脑作为他们的主要工作机器&#xff0c;他们确实已成为几乎每个组织不可或缺的一部分。由于与台式机相比&#xff0c;笔记本电脑足够便携&#xf…

Git的远程仓库

Git的远程仓库 添加远程仓库从远程库克隆 添加远程仓库 你在本地创建了一个Git仓库后&#xff0c;又想在GitHub创建一个Git仓库&#xff0c;并且让这两个仓库进行远程同步&#xff0c;这样&#xff0c;GitHub上的仓库既可以作为备份&#xff0c;又可以让其他人通过该仓库来协作…

JoySSL:免费SSL证书的新选择

阿里云曾经提供了一年期的免费SSL证书&#xff0c;然而从下个月14号开始&#xff0c;阿里云不再提供免费的一年期SSL证书&#xff0c;而是改为68元/张/年&#xff0c;这对于很多网站建设公司需要大量基本证书来说&#xff0c;无疑是一种负担&#xff0c;又不知道该去哪里获取可…

jmeter如何测试websocket接口?

jmeter做接口测试&#xff0c;很多人都是做http协议的接口&#xff0c;就有很多人问websocket的接口怎么测试啊&#xff1f; 首先&#xff0c;我们要明白&#xff0c;websocket接口是什么接口。 然后&#xff0c;我们怎么用jmeter测试&#xff1f; jmeter要测试websocket接口…

fastjson对象序列化的问题

今天偶然遇到一个fastjson将字符串反序列化为一个对象的时候的问题&#xff0c;就是简单的通过com.alibaba.fastjson.JSON将对象转为字符串&#xff0c;然后再从字符串转换为原类型的对象。 涉及的代码也非常简单 package cn.edu.sgu.www.mhxysy.service.role.impl;import cn…

Linux文件I/O

下面的内容需要了解系统调用&#xff0c;可看下面的链接&#xff1a; 系统调用来龙去脉-CSDN博客 1.底层文件IO和标准IO 这里指的是操作系统提供的IO服务&#xff0c;不同于ANSI建立的标准IO。 底层IO和标准IO各自所使用的函数&#xff1a; 区别&#xff1a; 1.底层文件IO不…

Android 13 Framework 裁剪

裁剪应用 1. 修改 build/core/product.mk 添加PRODUCT_DEL_PACKAGES变量的声明 新增一行_product_single_value_vars PRODUCT_DEL_PACKAGES # The first API level this product shipped with _product_single_value_vars PRODUCT_SHIPPING_API_LEVEL _product_single_val…

本地存储 sessionStoragelocalStorage

随着互联网的快速发展&#xff0c;基于网页的应用越来越普遍&#xff0c;同时也变的越来越复杂&#xff0c;为了满足各种各样的需求&#xff0c;会经常性在本地存储大量的数据&#xff0c;HTML5规范提出了相关解决方案。 本地存储特性 数据存储在用户浏览器中 设置、读取方便、…

使用字节流读取文件中的数据的几种方式

public class FileReader02_ {public static void main(String[] args) {}Testpublic void m1() {String filePath "e:\\hello.txt";FileReader fileReader null;int date0;try {fileReader new FileReader(filePath);//循环读取 使用readwhile ((datefileReader.…

【opencv】【CPU】windows10下opencv4.8.0-cuda C++版本源码编译教程

【opencv】【CPU】windows10下opencv4.8.0-cuda C版本源码编译教程 提示:博主取舍了很多大佬的博文并亲测有效,分享笔记邀大家共同学习讨论 文章目录 【opencv】【CPU】windows10下opencv4.8.0-cuda C版本源码编译教程前言准备工具cmakeopencv4.8.0opencv_contrib CMake编译VS2…

uniapp: 本应用使用HBuilderX x.x.xx 或对应的cli版本编译,而手机端SDK版本是 x.x.xx。不匹配的版本可能造成应用异常。

文章目录 前言一、原因分析二、解决方案2.1、方案一&#xff1a;更新HbuilderX版本2.2、方案二&#xff1a;设置固定的版本2.3、方案三&#xff1a;忽略版本&#xff08;不推荐&#xff09; 三、总结四、感谢 前言 项目场景&#xff1a;示例&#xff1a;通过使用HbuilderX打包…

Apache JMeter 安装教程

下载&#xff1a; 注意事项&#xff1a;使用JMeter前需要配置JDK环境 下载地址 下载安装以后&#xff0c;打开安装的bin目录 D:\software\apache-jmeter-5.4.1\apache-jmeter-5.4.1\bin&#xff0c;找到jmeter.bat&#xff0c;双击打开 打开后的样子 语言设置&#xff1a; 1…

【高效开发工具系列】Postman

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kuan 的首页,持续学…

“人类高质量数据”如何训练计算机视觉模型?

人类的视觉系统可以复制吗&#xff1f; 答案是肯定的。 计算机视觉 (Computer Vision) 技术的不断普及&#xff0c;让机器识别和处理图像就像人的大脑一样&#xff0c;且速度更快、更准确。 机器像人类一样去“思考” 计算机视觉 (Computer Vision) 是近年来人工智能增长最快…

Python遍历删除列表元素的一个奇怪bug

假定有一个Python列表&#xff0c;比如[CFFEX.IF, CFFEX.TS,SHFE.FU]&#xff0c;现在需要将其中带‘CFFEX’前缀的所有元素都删除。在使用列表推导式一行代码搞定之前&#xff0c;用了一种最朴素的遍历删除方法&#xff0c;结果出现了意想不到的的问题。复盘了下&#xff0c;结…

Windows客户端下pycharm配置跳板机连接内网服务器

问题&#xff1a;实验室服务器仅限内网访问&#xff0c;无法在宿舍&#xff08;外网&#xff09;访问实验室的所有内部服务器&#xff0c;但同时实验室又提供了一个外网可以访问的跳板机&#xff0c;虽然可以先ssh到跳板机再从跳板机ssh到内网服务器&#xff0c;但这种方式不方…

【Kotlin精简】第6章 反射

1 反射简介 反射机制是在运行状态中&#xff0c;对于任意一个类&#xff0c;都能够知道这个类的所有属性和方法&#xff0c;对于任意一个对象&#xff0c;都能够调用它的任意一个方法和属性。 1.1 Kotlin反射 我们对比Kotlin和Java的反射类图。 1.1.1 Kotlin反射常用的数据结…

MySQL3:MySQL中一条更新SQL是如何执行的?

MySQL3&#xff1a;MySQL中一条更新SQL是如何执行的&#xff1f; MySQL中一条更新SQL是如何执行的&#xff1f;1.Buffer Pool缓冲池2.Redo logredo log作用Redo log文件位置redo log为什么是2个&#xff1f; 3.Undo log4.更新过程5.InnoDB官网架构InnoDB架构-内存结构①Buffer …