解析 flink sql 转化成flink job

文章目录

    • 背景
    • 流程
    • flink实例
    • 实现细节
      • 定义的规则
      • 定义的物理算子
      • 定义的flink exec node

背景

在很多计算引擎里,都会把sql 这种标准语言,转成计算引擎下底层实际的算子,因此理解此转换的流程对于理解整个过程非常重要

流程

在这里插入图片描述

flink实例

public class BatchExample {public static void main(String[] args) {// 设置执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);// 创建一个内置示例源表String sourceDDL = "CREATE TABLE users (\n" +"    id INT,\n" +"    name STRING,\n" +"    age INT\n" +") WITH (\n" +"    'connector' = 'filesystem',\n" +"    'path' = 'file:///Users/leishuiyu/IdeaProjects/SpringFlink/data.csv',\n" +"    'format' = 'csv'\n" +");";tableEnv.executeSql(sourceDDL);Table table = tableEnv.sqlQuery("select * from users limit 1 ");String explanation = tableEnv.explainSql("select * from users limit 1 ");System.out.println(explanation);table.execute().print();}
}

输出结果

== Abstract Syntax Tree ==
LogicalSort(fetch=[1])
+- LogicalProject(id=[$0], name=[$1], age=[$2])+- LogicalTableScan(table=[[default_catalog, default_database, users]])== Optimized Physical Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])+- Limit(offset=[0], fetch=[1], global=[false])+- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])== Optimized Execution Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])+- Limit(offset=[0], fetch=[1], global=[false])+- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])

实现细节

主要是三个地方,在优化那一步,就把原生的relnode 转化成了自定义的relnode,自定义的relnode 就可以带物理转化的内容了,比如上面的LogicalTableScan 转成BatchPhysicalTableSourceScan 这个relnode

定义的规则

class BatchPhysicalTableSourceScanRule(config: Config) extends ConverterRule(config) {/** Rule must only match if TableScan targets a bounded [[ScanTableSource]] *///规则只匹配有界的ScanTableSourceoverride def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0).asInstanceOf[TableScan]val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])tableSourceTable match {case tst: TableSourceTable =>tst.tableSource match {case sts: ScanTableSource =>sts.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE).isBoundedcase _ => false}case _ => false}}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan]val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)//在这里转成自定义的relnode new BatchPhysicalTableSourceScan(rel.getCluster,newTrait,scan.getHints,scan.getTable.asInstanceOf[TableSourceTable])}
}

定义的物理算子

也是一个relnode,实现类BatchPhysicalTableSourceScan

class BatchPhysicalTableSourceScan(cluster: RelOptCluster,traitSet: RelTraitSet,hints: util.List[RelHint],tableSourceTable: TableSourceTable)extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)with BatchPhysicalRel {
//主要是这个方法,转成 flink exec算子override def translateToExecNode(): ExecNode[_] = {val tableSourceSpec = new DynamicTableSourceSpec(tableSourceTable.contextResolvedTable,util.Arrays.asList(tableSourceTable.abilitySpecs: _*))tableSourceSpec.setTableSource(tableSourceTable.tableSource)new BatchExecTableSourceScan(unwrapTableConfig(this),tableSourceSpec,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription)}
}

定义的flink exec node

BatchExecTableSourceScan 类

 /// 主要是这个方法,看下下面的实现就比较熟悉了public Transformation<RowData> createInputFormatTransformation(StreamExecutionEnvironment env,InputFormat<RowData, ?> inputFormat,InternalTypeInfo<RowData> outputTypeInfo,String operatorName) {// env.createInput will use ContinuousFileReaderOperator, but it do not support multiple// paths. If read partitioned source, after partition pruning, we need let InputFormat// to read multiple partitions which are multiple paths.// We can use InputFormatSourceFunction directly to support InputFormat.final InputFormatSourceFunction<RowData> function =new InputFormatSourceFunction<>(inputFormat, outputTypeInfo);return env.addSource(function, operatorName, outputTypeInfo).getTransformation();}

这里的转换是多种方式,一种是现成的比如source 这种,还有的是函数这种,要通过代码生成的方法实现。flink代码生成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/32857.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

视听分割相关论文阅读

1. End-to-End Referring Video Object Segmentation with Multimodal Transformers RVOS&#xff08;视频中的参考对象分割&#xff09;比RIS&#xff08;图像中的参考对象分割&#xff09;要困难得多&#xff0c;因为指代动作的文本表达通常无法从单个静态帧中正确推断出来。…

使用J-Link Commander查找STM32死机问题

接口:PA13,PA14&#xff0c;请勿连接复位引脚。 输入usb命令这里我已经连接过了STM32F407VET6了。 再输入connect命令这里我已经默认选择了SWD接口&#xff0c;4000K速率。 可以输入speed 4000命令选择4000K速率: 写一段崩溃代码进行测试: void CashCode(void){*((volatil…

区块链技术原理

1.起源&#xff1a; ➢ 中本聪(Satoshi Nakamoto), 2008 ➢ 比特币:一种点对点的电子现金系统 2.分布式账本技术原理 ➢ 将交易向全网所有节点进行广播 ➢ 众多记账节点对记账权达成共识&#xff0c;由共识确认的记账节点把记账区块发布给全网 ➢ 所有账本数据完整存储于区块…

机器学习基础:与Python关系和未来发展

目录 初识Python Python的由来 自由软件运动 编译方式的演进 Python语言的特点 语法简单&#xff0c;易于理解 语法结构清晰&#xff0c;快速上手 丰富的第三方库 机器学习 监督学习 无监督学习 半监督学习 欢迎回到我们的神经网络与深度学习Tensorflow实战学习&am…

MVVM架构详解:前端开发的理想选择

目录 前言1. MVVM架构概述1.1 MVVM架构的定义1.2 MVVM与MVC的区别 2. MVVM架构的核心组件2.1 模型&#xff08;Model&#xff09;2.2 视图&#xff08;View&#xff09;2.3 视图模型&#xff08;ViewModel&#xff09; 3. MVVM架构的优势3.1 分离关注点3.2 提高代码可测试性3.3…

AI大模型企业应用实战(14)-langchain的Embedding

1 安装依赖 ! pip install --upgrade langchain ! pip install --upgrade openai0.27.8 ! pip install -U langchain-openai ! pip show openai ! pip show langchain ! pip show langchain-openai 2 Embed_documents # 1. 导入所需的库 from langchain_openai import Open…

嵌入式中逻辑分析仪与示波器的基本原理

大家好,今天主要给大家分享一下,嵌入式中如何使用逻辑分析仪和示波器的方法,希望对大家有所帮助。 https://dreamsourcelab.cn/ 第一:什么是逻辑分析仪 是否遇到使用示波器分析数字电路的冏境:深度不够,时间太短,无法抓到想要的波形,没有协议内容解析? 逻辑分析仪…

Pnpm:包管理的新星,如何颠覆 Npm 和 Yarn

在探索现代 JavaScript 生态系统时&#xff0c;我们常常会遇到新兴技术的快速迭代和改进。其中&#xff0c;包管理工具的发展尤为重要&#xff0c;因为它们直接影响开发效率和项目性能。最近&#xff0c;pnpm 作为一种新的包管理工具引起了广泛关注。它不仅挑战了传统工具如 np…

vue3路由的使用

1、引用路由组件 npm install vue-router 2、创建路由 根据项目结构创建对应的组件&#xff08;home\news\about&#xff09; 在 src 目录下创建 router/index.ts //引入路由 import { createRouter,createWebHistory,createWebHashHistory } from vue-router import Home …

关于Pytorch转换为MindSpore的一点建议

一、事先准备 必须要对Mindspore有一些了解&#xff0c;因为这个框架确实有些和其它流程不一样的地方&#xff0c;比如算子计算、训练过程中的自动微分&#xff0c;所以这两个课程要好好过一遍&#xff0c;官网介绍文档最好也要过一遍 1、零基础Mindspore&#xff1a;https://…

linux服务器运行pycharm代码

一、pycharm代码上传服务器 1、进行配置 2、建立ssh连接&#xff08;选择文件传输协议SFTP&#xff09; 3、设置服务器名&#xff08;自定义&#xff09; 4、点击SSH配置右侧的"…"&#xff0c;进行SSH内容设置&#xff1a; 5、输入服务器信息 6、进行本地项目与远程…

AIGC系列之一-一文理解什么是Embedding嵌入技术

摘要&#xff1a;嵌入技术&#xff08;Embedding&#xff09;是一种将高维数据映射到低维空间的技术&#xff0c;在人工智能与图形学研究中被广泛应用。本文将介绍嵌入技术的基本概念、原理以及在 AIGC&#xff08;Artificial Intelligence and Graphics Computing&#xff09;…

基于STM32的智能农业灌溉系统

目录 引言环境准备智能农业灌溉系统基础代码实现&#xff1a;实现智能农业灌溉系统 4.1 数据采集模块4.2 数据处理与分析4.3 控制系统实现4.4 用户界面与数据可视化应用场景&#xff1a;智能农业管理与优化问题解决方案与优化收尾与总结 1. 引言 智能农业灌溉系统通过使用ST…

FPGA学习网站推荐

FPGA学习网站推荐 本文首发于公众号&#xff1a;FPGA开源工坊 引言 FPGA的学习主要分为以下两部分 语法领域内知识 做FPGA开发肯定要首先去学习相应的编程语言&#xff0c;FPGA开发目前在国内采用最多的就是使用Verilog做开发&#xff0c;其次还有一些遗留下来的项目会采用…

智慧校园综合门户有哪些特点?

智慧校园的门户系统&#xff0c;作为整个智慧校园架构的门户窗口&#xff0c;扮演着至关重要的角色。它如同一座桥梁&#xff0c;将校园内的各种信息资源、应用服务以及管理功能紧密相连&#xff0c;为师生、家长及管理人员提供了一个集中访问的便捷通道。智慧校园门户的设计理…

【Java】Java基础语法

一、注释详解 1.1 注释的语法&#xff1a; // 单行注释/*多行注释 *//**文档注释 */ 1.2 注释的特点&#xff1a; 注释不影响程序的执行&#xff0c;在Javac命令进行编译后会将注释去掉 1.3 注释的快捷键 二、字面量详解 2.1 字面量的概念&#xff1a; 计算机是用来处理…

DS:二叉树的链式存储及遍历

​ 欢迎来到Harper.Lee的学习世界&#xff01; 博主主页传送门&#xff1a;Harper.Lee的博客主页 想要一起进步的uu可以来后台找我哦&#xff01; ​ 一、引入 1.1 二叉树的存储方式 在之前接触到的满二叉树和完全二叉树使用的是数组的存储方式&#xff08;DS&#xff1a;树与…

thrift接口调用工具

写了一个thrift接口调用工具 导入thrift文件就可以直接调用相应接口 工具会根据thrift文件中接口的参数名&#xff0c;参数类型&#xff0c;返回值等等&#xff0c;自动生成接口参数&#xff0c;和结果json化显示。 https://github.com/HuaGouFdog/Fdog-Kit

实际项目开发:Spring集成Redis,并实现短信登录功能

redis新手&#xff0c;学了几种基本数据类型&#xff0c;却不知道怎么使用&#xff1f; 总是一边学一边忘&#xff1f; 学会了Redis的大多数使用命令&#xff0c;却不知道如何在项目中使用&#xff1f; 本文将从实际出发&#xff0c;为大家解决这些问题。 我是蚊子码农&#xf…