解析 flink sql 转化成flink job

文章目录

    • 背景
    • 流程
    • flink实例
    • 实现细节
      • 定义的规则
      • 定义的物理算子
      • 定义的flink exec node

背景

在很多计算引擎里,都会把sql 这种标准语言,转成计算引擎下底层实际的算子,因此理解此转换的流程对于理解整个过程非常重要

流程

在这里插入图片描述

flink实例

public class BatchExample {public static void main(String[] args) {// 设置执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);// 创建一个内置示例源表String sourceDDL = "CREATE TABLE users (\n" +"    id INT,\n" +"    name STRING,\n" +"    age INT\n" +") WITH (\n" +"    'connector' = 'filesystem',\n" +"    'path' = 'file:///Users/leishuiyu/IdeaProjects/SpringFlink/data.csv',\n" +"    'format' = 'csv'\n" +");";tableEnv.executeSql(sourceDDL);Table table = tableEnv.sqlQuery("select * from users limit 1 ");String explanation = tableEnv.explainSql("select * from users limit 1 ");System.out.println(explanation);table.execute().print();}
}

输出结果

== Abstract Syntax Tree ==
LogicalSort(fetch=[1])
+- LogicalProject(id=[$0], name=[$1], age=[$2])+- LogicalTableScan(table=[[default_catalog, default_database, users]])== Optimized Physical Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])+- Limit(offset=[0], fetch=[1], global=[false])+- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])== Optimized Execution Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])+- Limit(offset=[0], fetch=[1], global=[false])+- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])

实现细节

主要是三个地方,在优化那一步,就把原生的relnode 转化成了自定义的relnode,自定义的relnode 就可以带物理转化的内容了,比如上面的LogicalTableScan 转成BatchPhysicalTableSourceScan 这个relnode

定义的规则

class BatchPhysicalTableSourceScanRule(config: Config) extends ConverterRule(config) {/** Rule must only match if TableScan targets a bounded [[ScanTableSource]] *///规则只匹配有界的ScanTableSourceoverride def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0).asInstanceOf[TableScan]val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])tableSourceTable match {case tst: TableSourceTable =>tst.tableSource match {case sts: ScanTableSource =>sts.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE).isBoundedcase _ => false}case _ => false}}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan]val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)//在这里转成自定义的relnode new BatchPhysicalTableSourceScan(rel.getCluster,newTrait,scan.getHints,scan.getTable.asInstanceOf[TableSourceTable])}
}

定义的物理算子

也是一个relnode,实现类BatchPhysicalTableSourceScan

class BatchPhysicalTableSourceScan(cluster: RelOptCluster,traitSet: RelTraitSet,hints: util.List[RelHint],tableSourceTable: TableSourceTable)extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)with BatchPhysicalRel {
//主要是这个方法,转成 flink exec算子override def translateToExecNode(): ExecNode[_] = {val tableSourceSpec = new DynamicTableSourceSpec(tableSourceTable.contextResolvedTable,util.Arrays.asList(tableSourceTable.abilitySpecs: _*))tableSourceSpec.setTableSource(tableSourceTable.tableSource)new BatchExecTableSourceScan(unwrapTableConfig(this),tableSourceSpec,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription)}
}

定义的flink exec node

BatchExecTableSourceScan 类

 /// 主要是这个方法,看下下面的实现就比较熟悉了public Transformation<RowData> createInputFormatTransformation(StreamExecutionEnvironment env,InputFormat<RowData, ?> inputFormat,InternalTypeInfo<RowData> outputTypeInfo,String operatorName) {// env.createInput will use ContinuousFileReaderOperator, but it do not support multiple// paths. If read partitioned source, after partition pruning, we need let InputFormat// to read multiple partitions which are multiple paths.// We can use InputFormatSourceFunction directly to support InputFormat.final InputFormatSourceFunction<RowData> function =new InputFormatSourceFunction<>(inputFormat, outputTypeInfo);return env.addSource(function, operatorName, outputTypeInfo).getTransformation();}

这里的转换是多种方式,一种是现成的比如source 这种,还有的是函数这种,要通过代码生成的方法实现。flink代码生成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/32857.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

视听分割相关论文阅读

1. End-to-End Referring Video Object Segmentation with Multimodal Transformers RVOS&#xff08;视频中的参考对象分割&#xff09;比RIS&#xff08;图像中的参考对象分割&#xff09;要困难得多&#xff0c;因为指代动作的文本表达通常无法从单个静态帧中正确推断出来。…

商业秘密侵权

一、商业秘密侵权行为 &#xff08;一&#xff09;员工违规获取并使用企业后台用户行为数据构成商业秘密侵权 &#xff08;二&#xff09;离职员工将新单位“冒名顶替”为原单位构成对原单位商业秘密的侵犯 二、商业秘密侵权主体 &#xff08;一&#xff09;主体范围界定&a…

使用J-Link Commander查找STM32死机问题

接口:PA13,PA14&#xff0c;请勿连接复位引脚。 输入usb命令这里我已经连接过了STM32F407VET6了。 再输入connect命令这里我已经默认选择了SWD接口&#xff0c;4000K速率。 可以输入speed 4000命令选择4000K速率: 写一段崩溃代码进行测试: void CashCode(void){*((volatil…

区块链技术原理

1.起源&#xff1a; ➢ 中本聪(Satoshi Nakamoto), 2008 ➢ 比特币:一种点对点的电子现金系统 2.分布式账本技术原理 ➢ 将交易向全网所有节点进行广播 ➢ 众多记账节点对记账权达成共识&#xff0c;由共识确认的记账节点把记账区块发布给全网 ➢ 所有账本数据完整存储于区块…

机器学习基础:与Python关系和未来发展

目录 初识Python Python的由来 自由软件运动 编译方式的演进 Python语言的特点 语法简单&#xff0c;易于理解 语法结构清晰&#xff0c;快速上手 丰富的第三方库 机器学习 监督学习 无监督学习 半监督学习 欢迎回到我们的神经网络与深度学习Tensorflow实战学习&am…

Vue3的Teleport:Teleport是Vue3的一个新功能,它允许我们将子组件渲染到父组件以外的地方,这在处理模态框、弹出窗口等情况时非常有用

I. Teleport 的概述 Teleport 的定义: 在 Vue 3.0 中,Teleport 是一个新的内置组件,它允许我们将任何部分的渲染内容 Teleport(传送)到 Vue 应用范围之外的地方。 换句话说,你可以控制片段,让它们在 DOM 中的任何位置渲染,而不仅仅是在当前组件内部。 Teleport 的效…

【Android面试八股文】你能说一说在平常开发过程中你是如何解决事件冲突问题的吗?

文章目录 一、内部拦截法(Inner Intercept)1.1 工作原理:1.2 实现步骤:1.3 适用场景:1.4 内部拦截法示例1.4.1. 自定义 `RecyclerView` 以处理内部拦截1.4.2. 在布局中使用 `InterceptableRecyclerView`1.5 为什么`requestDisallowInterceptTouchEvent(boolean disallowIn…

Inno Setup打包技巧(开机启动,卸载按键,再次安装无法选其他目录)

修改.iss配置文件即可 1.设置开机启动 在对应模块中加入代码&#xff0c;在安装时候默认勾选开机启动了 [Tasks] Name: "startupicon"; Description: "开机启动"; GroupDescription: "{cm:AdditionalIcons}"; [Icons] Name: "{commonst…

毫米波移动通信系统中的波束赋形

在毫米波移动通信系统中&#xff0c;系统的频点较高&#xff0c;因此毫米波系统的射频器件易于小型化&#xff0c;然而同时也带来绕射能力差、穿透损耗大、路径损耗大[4][5]等缺点&#xff0c;这将大大降低了毫米波通信系统的接收功率&#xff0c;其中阻挡效应被认为是制约毫米…

MVVM架构详解:前端开发的理想选择

目录 前言1. MVVM架构概述1.1 MVVM架构的定义1.2 MVVM与MVC的区别 2. MVVM架构的核心组件2.1 模型&#xff08;Model&#xff09;2.2 视图&#xff08;View&#xff09;2.3 视图模型&#xff08;ViewModel&#xff09; 3. MVVM架构的优势3.1 分离关注点3.2 提高代码可测试性3.3…

代码随想录训练营Day38

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、斐波那契数列二、爬楼梯三、使用最小花费爬楼梯 前言 今天是跟着代码随想录刷题的第38天&#xff0c;主要学习了斐波那契数列、爬楼梯和使用最小花费爬楼梯…

AI大模型企业应用实战(14)-langchain的Embedding

1 安装依赖 ! pip install --upgrade langchain ! pip install --upgrade openai0.27.8 ! pip install -U langchain-openai ! pip show openai ! pip show langchain ! pip show langchain-openai 2 Embed_documents # 1. 导入所需的库 from langchain_openai import Open…

suuk-s.php.jpg-python 库劫持

做virtualBox的端口映射吧 suukmedim文件白名单绕过、反弹shell、$paht环境变量更改、python 库劫持提权、Reptile提权、sandfly-processdecloak使用 服务扫描 ┌──(kali㉿kali)-[~] └─$ sudo nmap -sV -A -T 4 -p 22,80 192.168.18.238GetSHell 访问80http://192.168.…

安全基础考核总结

黑客攻击路径 1、掌握网络安全CIA原则的基本定义 2、掌握常见的网络安全术语 3、掌握黑客攻击路径 黑客攻击路径-CSDN博客4、了解黑客攻击路径中配套的攻击手段和工具 暂未总结好&#xff0c;也未找到合适的文章&#xff0c;后续补充吧5、了解常见的网络安全风险 我认为就是…

嵌入式中逻辑分析仪与示波器的基本原理

大家好,今天主要给大家分享一下,嵌入式中如何使用逻辑分析仪和示波器的方法,希望对大家有所帮助。 https://dreamsourcelab.cn/ 第一:什么是逻辑分析仪 是否遇到使用示波器分析数字电路的冏境:深度不够,时间太短,无法抓到想要的波形,没有协议内容解析? 逻辑分析仪…

使用构建缓存优化 Docker 镜像构建

使用构建缓存优化 Docker 镜像构建 目录 实践构建应用程序额外资源后续步骤 假设一个简单的nodejs程序的 Dockerfile如下&#xff1a; FROM node:20-alpine WORKDIR /app COPY . . RUN yarn install --production CMD ["node", "./src/index.js"]当你运…

Pnpm:包管理的新星,如何颠覆 Npm 和 Yarn

在探索现代 JavaScript 生态系统时&#xff0c;我们常常会遇到新兴技术的快速迭代和改进。其中&#xff0c;包管理工具的发展尤为重要&#xff0c;因为它们直接影响开发效率和项目性能。最近&#xff0c;pnpm 作为一种新的包管理工具引起了广泛关注。它不仅挑战了传统工具如 np…

vue3路由的使用

1、引用路由组件 npm install vue-router 2、创建路由 根据项目结构创建对应的组件&#xff08;home\news\about&#xff09; 在 src 目录下创建 router/index.ts //引入路由 import { createRouter,createWebHistory,createWebHashHistory } from vue-router import Home …

總結光學(完)

參考: 陈曦<<光学讲义>>http://ithatron.phys.tsinghua.edu.cn/downloads/optics.pdf 1 波动光学 最简单的一种波是平面波。........... 一个波的波前是指相位相同的点构成的面。波的传播方向垂直于波面。 我们在此将讨论的光波特指波长远大于原子尺度又远小于…

关于Pytorch转换为MindSpore的一点建议

一、事先准备 必须要对Mindspore有一些了解&#xff0c;因为这个框架确实有些和其它流程不一样的地方&#xff0c;比如算子计算、训练过程中的自动微分&#xff0c;所以这两个课程要好好过一遍&#xff0c;官网介绍文档最好也要过一遍 1、零基础Mindspore&#xff1a;https://…