【源码解析】flink sql执行源码概述:flink sql执行过程中有哪些阶段,这些阶段的源码大概位置在哪里

文章目录

  • 一. sql执行流程源码分析
    • 1. Sql语句解析成语法树阶段(SQL - > SqlNode)
    • 2. SqlNode 验证(SqlNode – >Operation)
    • 3. 语义分析(Operation - > RelNode)
    • 4. 优化阶段(RelNode - > optimize - >Transformation )
    • 5. 生成ExecutionPlan并执行
  • 二. 源码分析小结
    • `sqlnode->relnode->优化->pipeline(StreamGraph)-> 执行并返回结果`

本文大致分析了flink sql执行过程中的各个阶段的源码逻辑,这样可以在flink sql执行过程中, 能够定位到任务执行的某个阶段的代码大概分布在哪里,为更针对性的分析此阶段的细节逻辑打下基础,比如create 的逻辑是怎么执行的,select的逻辑是怎么生成的,优化逻辑都做了哪些,而这些是接下来的文章要分析的。

一. sql执行流程源码分析

SQL语句经过Calcite解析生成抽象语法树SQLNode,基于生成的SQLNode并结合flink Catalog完成校验生成一颗Operation树,接下来blink planner将Opearation树转为RelNode树然后进行优化,最后进行执行。如下流程流转图:

在这里插入图片描述
 

flink使用的是一款开源SQL解析工具Apache Calcite ,Calcite使用Java CC对sql语句进行了解析,转换为java/scala语言能够执行的逻辑。

 
Sql 的执行过程一般可以分为下图中的四个阶段,Calcite 同样也是这样:解析,校验,优化,执行:
在这里插入图片描述

1. Sql语句解析成语法树阶段(SQL - > SqlNode)

对于flink中解析sql为SqlNode对象的流程为:

  • TableEnvironmentImpl是sql执行的入口类,TableEnvironmentImpl中提供了excuteSqlSqlQuery等方法用来执行DDL、DML等sql
  • sql执行时会先对sql进行解析,ParserImp是flink调用sql解析的实现类,ParserImpl#parse()方法中通过调用包装器对象CalciteParser#parse()方法创建并调用使用javacc生成的sql解析器
  • (FlinkSqlParserImpl)parseSqlStmtEof方法完成sql解析,并返回SqlNode对象。

具体calciteParser 的动作之后更新

在这里插入图片描述

parse方法:负责将 SQL 查询字符串解析为抽象语法树(AST)

org.apache.flink.table.planner.delegation.ParserImpl/*** When parsing statement, it first uses {@link ExtendedParser} to parse statements. If {@link* ExtendedParser} fails to parse statement, it uses the {@link CalciteParser} to parse statements.* 先使用ExtendedParser进行解析如果解析失败了使用CalciteParser进行解析* @param statement input statement.* @return parsed operations.*/@Overridepublic List<Operation> parse(String statement) {//两种解析实例CalciteParser parser = calciteParserSupplier.get();FlinkPlannerImpl planner = validatorSupplier.get();//ExtendedParser解析Optional<Operation> command = EXTENDED_PARSER.parse(statement);if (command.isPresent()) {return Collections.singletonList(command.get());}//CalciteParser解析//解析为sqlNodeSqlNodeList sqlNodeList = parser.parseSqlList(statement);List<SqlNode> parsed = sqlNodeList.getList();Preconditions.checkArgument(parsed.size() == 1, "only single statement supported");return Collections.singletonList(//解析为SOperationSqlToOperationConverter.convert(planner, catalogManager, parsed.get(0)).orElseThrow(() -> new TableException("Unsupported query: " + statement)));}

将sql语句解析成sqlNode。
对应的表名、列名、with属性参数、主键、唯一键、分区键、水印、表注释、表操作(create table、alter table、drop table。。。)都放到SqlNode对象的对应属性中,SqlNode是一个树形结构也就是AST。
在这里插入图片描述

 

2. SqlNode 验证(SqlNode – >Operation)

经过上面的第一步,会生成一个 SqlNode 对象,它是一个未经验证的抽象语法树,下面就进入了一个语法检查阶段,语法检查前需要知道元数据信息,这个检查会包括表名、字段名、函数名、数据类型的检查。

  • sql解析完成后执行sql校验,flink sql中增加了SqlNode转换为Operation的过程,sql校验是这个过程中完成。
  • 在SqlToOperationConvertver#convert()方法中完成这个转换,之后通过FlinkPlannerImpl#validate()方法对表、函数、字段等完成校验并基于生成的validated SqlNode生成对应的Operation。

在这里插入图片描述

接着进入到SqlToOperationConverter.convert方法中

    /*** This is the main entrance for executing all kinds of DDL/DML {@code SqlNode}s, different* SqlNode will have it's implementation in the #convert(type) method whose 'type' argument is* subclass of {@code SqlNode}.* 转换DDL,DML(select比如?)sqlnode的主入口,不同的SqlNode有不同的convert实现。* */public static Optional<Operation> convert(FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) {//1.进行校验final SqlNode validated = flinkPlanner.validate(sqlNode);//2.转换为Operationreturn convertValidatedSqlNode(flinkPlanner, catalogManager, validated);}//将校验过的sqlnode转换为Operationprivate static Optional<Operation> convertValidatedSqlNode(FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode validated) {SqlToOperationConverter converter =new SqlToOperationConverter(flinkPlanner, catalogManager);。。。else if (validated instanceof RichSqlInsert) {return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {return Optional.of(converter.convertSqlQuery(validated));

但其实converter.convertSqlQuery包含了createSqlToRelConverter逻辑,即创建了SqlToRelConverter实例,用于转换RelNode。这里源码暂不展示。

 

3. 语义分析(Operation - > RelNode)

接着将 SqlNode 转换成 RelNode/RexNode,也就是生成相应的逻辑计划(Logical Plan),也就是最初版本的逻辑计划(Logical Plan)。

其中Operation中包含了RelNode的converter

源码见:org.apache.flink.table.planner.delegation.PlannerBase

  /** Converts a relational tree of [[ModifyOperation]] into a Calcite relational expression.将ModifyOperation转换为Calcite relational expression即RelNode*/@VisibleForTestingprivate[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode = {val dataTypeFactory = catalogManager.getDataTypeFactorymodifyOperation match {case s: UnregisteredSinkModifyOperation[_] =>//relBuilder:val input = createRelBuilder.queryOperation(s.getChild).build()val sinkSchema = s.getSink.getTableSchema//校验relnode的 查询schema和sink schema是否一致,以及是否需要执行cast// validate query schema and sink schema, and apply cast if possibleval query = validateSchemaAndApplyImplicitCast(input,catalogManager.getSchemaResolver.resolve(sinkSchema.toSchema),null,dataTypeFactory,getTypeFactory)LogicalLegacySink.create(query,s.getSink,"UnregisteredSink",ConnectorCatalogTable.sink(s.getSink, !isStreamingMode))
....

这里触发创建relnode的调用逻辑,这里在之后statementSet.execute()后执行。

//执行flink sql的调用
。。。
//解析sql -> sqlnode
StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tableEnv);
//sqlnode->relnode
TableResult execute = statementSet.execute();tableEnvironment.executeInternal(operations);TableEnvironmentImpl.translatePlannerBase.translate->translateToRel			

 

4. 优化阶段(RelNode - > optimize - >Transformation )

即对逻辑计划优化,根据前面生成的逻辑计划按照相应的规则进行优化。

接着看tableEnvironment.executeInternal中的translate方法

  override def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {beforeTranslation()。。。//转换为relnode并放到一个map中val relNodes = modifyOperations.map(translateToRel)//优化逻辑计划val optimizedRelNodes = optimize(relNodes)//生成execGraph:执行图val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)//生成transformations DAGval transformations = translateToPlan(execGraph)afterTranslation()transformations}

Calcite 的核心所在,优化器进行优化的地方,如过滤条件的下压(push down),在进行 join 操作前,先进行 filter 操作,这样的话就不需要在 join 时进行全量 join,减少参与 join 的数据量等。

 

5. 生成ExecutionPlan并执行

最终的执行计划转为Graph图,下面的流程与真正的java代码流程就一致了。

TableEnvironmentImpl.executeInternal中具体看executeInternal方法

TableEnvironmentImpl.executeInternal(
...
TableResultInternal result = executeInternal(transformations, sinkIdentifierNames);
...
)private TableResultInternal executeInternal(List<Transformation<?>> transformations, List<String> sinkIdentifierNames) {。。。//Translates the given transformations to a Pipeline.//将transformations转换为pipelinePipeline pipeline =execEnv.createPipeline(transformations, tableConfig.getConfiguration(), defaultJobName);try {// 执行pipeline//pipeline 其实就是StreamGraphJobClient jobClient = execEnv.executeAsync(pipeline);。。。}。。。//执行后返回结果return TableResult。。。}

通过生成的Transformation对象调用 execEnv.createPipeline,生成pipelinepipeline其实就StreamGraph便可以调用execEnv.executeAsync执行任务了。

 

二. 源码分析小结

上述描述了flink sql在内部执行过程进行的一些操作:
在这里插入图片描述

这里我们从执行sql tEnv.executeSql(stmt)statementSet.addInsertSql(sql); parse、validate阶段)生成statementSet,然后执行statementSet.execute()optimize、Execute阶段) 触发任务执行。

 

parse、validate阶段

通过执行以下代码触发

StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tableEnv);statementSet.addInsertSql(sql);
tEnv.executeSql(stmt)

通过调用 tEnv.executeSql(stmt)statementSet.addInsertSql(sql); 进行每个sql的解析,校验,具体:

  1. Sql语句解析成语法树阶段(SQL - > SqlNode)
  2. SqlNode 验证(SqlNode – >Operation),其中Operation中包含着RelNode的convert实例,为转换逻辑计划做提前准备

 
optimize、Execute阶段

接着执行如下代码

//sqlnode->relnode->优化->pipeline(StreamGraph)-> 执行并返回结果
TableResult execute = statementSet.execute();
//调用链:tableEnvironment.executeInternal(operations);TableEnvironmentImpl.translatePlannerBase.translate->translateToRel

经历如下几个过程:

sqlnode->relnode->优化->pipeline(StreamGraph)-> 执行并返回结果

 

这里我们主要看executeInternal的逻辑

    public TableResultInternal executeInternal(List<ModifyOperation> operations) {List<ModifyOperation> mapOperations = new ArrayList<>();for (ModifyOperation modify : operations) {//1.先执行CTAS sql语句, 并放到mapOperations中进行translate操作// execute CREATE TABLE first for CTAS statementsif (modify instanceof CreateTableASOperation) {CreateTableASOperation ctasOperation = (CreateTableASOperation) modify;executeInternal(ctasOperation.getCreateTableOperation());mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));} else {//2. 将其他非CTAS sqlnode放到mapOperations,进行translate操作mapOperations.add(modify);}}//translate主要的逻辑是:将所有的sqlNodes转换为relNodes,为初始的逻辑计划,然后优化逻辑计划,//接着翻译 ExecNodeGraph 为 Transformation DAG.List<Transformation<?>> transformations = translate(mapOperations);List<String> sinkIdentifierNames = extractSinkIdentifierNames(mapOperations);//  transformations转换为pipeline,最终执行pipeline即StreamGraph,然后返回结果TableResultInternal result = executeInternal(transformations, sinkIdentifierNames);if (tableConfig.get(TABLE_DML_SYNC)) {try {result.await();} catch (InterruptedException | ExecutionException e) {result.getJobClient().ifPresent(JobClient::cancel);throw new TableException("Fail to wait execution finish.", e);}}return result;}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/219337.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LangChain 25: SQL Agent通过自然语言查询数据库sqlite

LangChain系列文章 LangChain 实现给动物取名字&#xff0c;LangChain 2模块化prompt template并用streamlit生成网站 实现给动物取名字LangChain 3使用Agent访问Wikipedia和llm-math计算狗的平均年龄LangChain 4用向量数据库Faiss存储&#xff0c;读取YouTube的视频文本搜索I…

Java中的多态到底是什么?

Java中的多态到底是什么&#xff1f; 在Java中&#xff0c;多态是面向对象编程中的一个重要概念&#xff0c;它有助于提高代码的灵活性和可维护性。多态分为编译时多态和运行时多态。 编译时多态&#xff08;静态多态&#xff09;&#xff1a; 编译时多态是指在编译阶段确定方…

2023全国大学生数据分析大赛A题完整论文教学

大家好呀&#xff0c;从发布赛题一直到现在&#xff0c;总算完成了全国大学生数据分析大赛A题某电商平台用户行为分析与挖掘完整的成品论文。 本论文可以保证原创&#xff0c;保证高质量。绝不是随便引用一大堆模型和代码复制粘贴进来完全没有应用糊弄人的垃圾半成品论文。 实…

透析回溯的模板

关卡名 认识回溯思想 我会了✔️ 内容 1.复习递归和N叉树&#xff0c;理解相关代码是如何实现的 ✔️ 2.理解回溯到底怎么回事 ✔️ 3.掌握如何使用回溯来解决二叉树的路径问题 ✔️ 回溯可以视为递归的拓展&#xff0c;很多思想和解法都与递归密切相关&#xff0c;在很多…

Windows 网络监控的内容和方式

Microsoft Windows是使用最广泛的操作系统之一&#xff0c;受到全球用户的青睐&#xff0c;Windows 设备与许多进程、服务和事件相关联&#xff0c;这些进程、服务和事件通常需要从单个控制台进行跟踪&#xff0c;这就是 Windows 网络监控工具派上用场的地方。Windows 网络监控…

项目总结-自主HTTP实现

终于是写完了&#xff0c;花费了2周时间&#xff0c;一点一点看&#xff0c;还没有扩展&#xff0c;但是基本功能是已经实现了。利用的是Tcp为网络链接&#xff0c;在其上面又写了http的壳。没有使用epoll&#xff0c;多路转接难度比较高&#xff0c;以后有机会再写&#xff0c…

一张图片组合一组动作就可以生成毫无违和感的视频!

你敢信&#xff0c;1张人物图片 1张动作动画&#xff0c;就可以生成一段视频。网友直呼&#xff1a;“主播/视频UP主可能快要下岗了&#xff01;” &#xff08;模型视频来源于网络&#xff09; 本周&#xff0c;字节跳动联合新加坡国立大学发布了一款开源项目 MagicAnimate&…

(第63天)19C NONCDB 转 PDB

目前很多 19C 数据库依然是创建为 NONCDB 架构,但是未来 CDB 架构的使用是无法避免的,在 21C 版本开始 Oracle 官方将不再支持 NONCDB 架构。 环境信息 本文主要介绍以下如何在 19C 同版本下将 NONCDB 转为 CDB/PDB 架构(DBMS_PDB.DESCRIBE 方式),以下为测试环境信息: …

什么是XSS攻击?如何防止它?

跨站脚本攻击&#xff08;XSS&#xff09;&#xff0c;英文全称为 Cross-Site Scripting&#xff0c;是一种常见的 Web 安全漏洞。XSS 攻击的目标是在用户浏览器中执行恶意脚本&#xff0c;从而获取用户敏感信息、劫持用户会话或者进行其他恶意操作。 XSS 攻击通常发生在由用户…

探索C++中的常见排序算法

探索C中的常见排序算法 目录 冒泡排序 (Bubble Sort)选择排序 (Selection Sort)插入排序 (Insertion Sort) 冒泡排序 (Bubble Sort) 实现思路&#xff1a; 冒泡排序是一种简单直观的排序算法&#xff0c;它通过不断交换相邻元素的位置来达到排序的目的。算法的基本思想是重…

k8s中EmptyDir、HostPath、NFS三种基本存储方式介绍

目录 一.数据存储介绍 二.EmptyDir 1.简介 2.案例演示 三.HostPath 1.简介 2.案例演示 &#xff08;1&#xff09;介绍一下type类型 &#xff08;2&#xff09;简单演示 &#xff08;3&#xff09;数据同步功能 四.NFS 1.简介 2.案例演示 &#xff08;1&#xff…

Linux Ubuntu 手动搭建webDav

1、安装 因为需要跟 zotero 进行交互&#xff0c;因此需要在服务器搭建一个webDav 以下是搭建步骤&#xff1a; sudo apt-get update sudo apt-get install apache2 Ubuntu 安装apache2来实现 不同于Centos 安装好了之后&#xff0c;运行 a2enmod dav_fs a2enmod dav 激…

【视频笔记】古人智慧与修行

古人的智慧 相由心生、老子悟道、佛祖成佛 多一些思考&#xff0c;多一些精神修炼。 除非我们今天能够产生与人类科技发展相并行的精神变革&#xff0c;否则永远可能也无法跳脱出历史的轮回。 视频来源 曾仕强教授周易的智慧 太极两仪四象八卦 一生二&#xff0c;二生三&…

大数据机器学习深度解读决策树算法:技术全解与案例实战

大数据机器学习深度解读决策树算法&#xff1a;技术全解与案例实战 本文深入探讨了机器学习中的决策树算法&#xff0c;从基础概念到高级研究进展&#xff0c;再到实战案例应用&#xff0c;全面解析了决策树的理论及其在现实世界问题中的实际效能。通过技术细节和案例实践&…

【C++】POCO学习总结(十四):引用计数、共享指针、缓冲区管理

【C】郭老二博文之&#xff1a;C目录 1、Poco::AutoPtr 智能指针 1.1 说明 Poco::AutoPtr是一个含有引用计数的“智能”指针模版。 Poco::AutoPtr用于支持引用计数的类实例化。支持引用计数的类需要有以下要求&#xff1a; 维护一个引用计数(在创建时初始化为1)实现void du…

(企业 / 公司项目)SpringBoot3整合校验框架validation

在Spring Boot项目中使用校验框架validation可以让我们更方便地实现数据校验和错误提示。下面是Spring Boot集成校验框架validation的步骤。 添加依赖 在项目的pom.xml文件中添加validation依赖&#xff1a; <dependency><groupId>org.springframework.boot</…

现代雷达车载应用——第2章 汽车雷达系统原理 2.5节 检测基础

经典著作&#xff0c;值得一读&#xff0c;英文原版下载链接【免费】ModernRadarforAutomotiveApplications资源-CSDN文库。 2.5 检测基础 对于要测试目标是否存在的雷达测量&#xff0c;可以假定下列两个假设之一为真&#xff1a; •H0:—测量结果仅为噪声。 •H1:—测量是噪…

eNSP小实验(vlan和单臂路由)

一.vlan的划分 实验目的&#xff1a; ①pc1 只可以和pc2通信&#xff0c;不可以和pc3 pc4通信 ②pc1和pc2只能到Server1&#xff0c;pc3和pc4到Server2 1.拓扑图 2.配置 PC1-4 同理配置 SW1 <Huawei> <Huawei>u t m //关闭注释 Info: …

java项目将依赖打进jar、并生成可执行的jar

生成可执行的jar包 最近在做JAVA 的SDK 工具&#xff0c;由于SDK 依赖了其他的一些开源工具包&#xff0c;打包时少了依赖工具包&#xff0c;这样其他项目想要用SDK 就需要自己额外增加响应依赖&#xff0c;所以想要把依赖打进SDK。示例中依赖了fastjson处理json数据。 ​ 其…

网络编程案例

InetAddress 类 相关方法: getLocalHost&#xff1a;获取本机InetAddress对象。 getByName&#xff1a;根据指定主机名/域名获取ip地址对象。 getHostName&#xff1a;获取InetAddress对象的主机名。 getHostAddress&#xff1a;获取InetAddress对象的地址。 简单使用&am…