详解flink sql, calcite logical转flink logical

文章目录

    • 背景
    • 示例
    • FlinkLogicalCalcConverter
    • BatchPhysicalCalcRule
    • StreamPhysicalCalcRule
    • 其它算子
      • FlinkLogicalAggregate
      • FlinkLogicalCorrelate
      • FlinkLogicalDataStreamTableScan
      • FlinkLogicalDistribution
      • FlinkLogicalExpand
      • FlinkLogicalIntermediateTableScan
      • FlinkLogicalIntersect
      • FlinkLogicalJoin
      • FlinkLogicalLegacySink
      • FlinkLogicalLegacyTableSourceScan
      • FlinkLogicalMatch
      • FlinkLogicalMinus
      • FlinkLogicalOverAggregate
      • FlinkLogicalRank
      • FlinkLogicalSink
      • FlinkLogicalSnapshot
      • FlinkLogicalSort
      • FlinkLogicalUnion
      • FlinkLogicalValues

背景

本文主要介绍calcite 如何转成自定义的relnode

在这里插入图片描述

示例

FlinkLogicalCalcConverter

检查是不是calcite 的LogicalCalc 算子,是的话,重写带RelTrait 为FlinkConventions.LOGICA
的rel,类型FlinkLogicalCalc

private class FlinkLogicalCalcConverter(config: Config) extends ConverterRule(config) {override def convert(rel: RelNode): RelNode = {val calc = rel.asInstanceOf[LogicalCalc]val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.LOGICAL)FlinkLogicalCalc.create(newInput, calc.getProgram)}
}

BatchPhysicalCalcRule

检查是不是FlinkLogicalCalc 的relnode

class BatchPhysicalCalcRule(config: Config) extends ConverterRule(config) {override def matches(call: RelOptRuleCall): Boolean = {val calc: FlinkLogicalCalc = call.rel(0)val program = calc.getProgram!program.getExprList.asScala.exists(containsPythonCall(_))}def convert(rel: RelNode): RelNode = {val calc = rel.asInstanceOf[FlinkLogicalCalc]val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.BATCH_PHYSICAL)new BatchPhysicalCalc(rel.getCluster, newTrait, newInput, calc.getProgram, rel.getRowType)}
}

StreamPhysicalCalcRule

检查是不是FlinkLogicalCalc 的relnode

class StreamPhysicalCalcRule(config: Config) extends ConverterRule(config) {override def matches(call: RelOptRuleCall): Boolean = {val calc: FlinkLogicalCalc = call.rel(0)val program = calc.getProgram!program.getExprList.asScala.exists(containsPythonCall(_))}def convert(rel: RelNode): RelNode = {val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.STREAM_PHYSICAL)new StreamPhysicalCalc(rel.getCluster, traitSet, newInput, calc.getProgram, rel.getRowType)}
}

其它算子

介绍下算子的匹配条件

FlinkLogicalAggregate

对应的SQL语义是聚合函数
FlinkLogicalAggregateBatchConverter
不存在准确的distinct调用并且支持聚合函数,则返回true

override def matches(call: RelOptRuleCall): Boolean = {val agg = call.rel(0).asInstanceOf[LogicalAggregate]// we do not support these functions natively// they have to be converted using the FlinkAggregateReduceFunctionsRuleval supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {// we support AVGcase SqlKind.AVG => true// but none of the other AVG agg functionscase k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => falsecase _ => true}val hasAccurateDistinctCall = AggregateUtil.containsAccurateDistinctCall(agg.getAggCallList)!hasAccurateDistinctCall && supported}

FlinkLogicalAggregateStreamConverter

SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP
非这几种,都支持转换

override def matches(call: RelOptRuleCall): Boolean = {val agg = call.rel(0).asInstanceOf[LogicalAggregate]// we do not support these functions natively// they have to be converted using the FlinkAggregateReduceFunctionsRuleagg.getAggCallList.map(_.getAggregation.getKind).forall {case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => falsecase _ => true}}

FlinkLogicalCorrelate

对应的SQL语义是,LogicalCorrelate 用于处理关联子查询和某些特殊的连接操作
检查relnode 是不是LogicalCorrelate,重写relnode

默认的onMatch 函数

FlinkLogicalDataStreamTableScan

对应的SQL语义是,检查数据源是不是流式的
检查relnode 是不是LogicalCorrelate,重写relnode

  override def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0)val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])dataStreamTable != null}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[TableScan]FlinkLogicalDataStreamTableScan.create(rel.getCluster, scan.getHints, scan.getTable)}

FlinkLogicalDistribution

描述数据是不是打散的

  override def convert(rel: RelNode): RelNode = {val distribution = rel.asInstanceOf[LogicalDistribution]val newInput = RelOptRule.convert(distribution.getInput, FlinkConventions.LOGICAL)FlinkLogicalDistribution.create(newInput, distribution.getCollation, distribution.getDistKeys)}

FlinkLogicalExpand

支持复杂聚合操作(如 ROLLUP 和 CUBE)的逻辑运算符

 override def convert(rel: RelNode): RelNode = {val expand = rel.asInstanceOf[LogicalExpand]val newInput = RelOptRule.convert(expand.getInput, FlinkConventions.LOGICAL)FlinkLogicalExpand.create(newInput, expand.projects, expand.expandIdIndex)}

FlinkLogicalIntermediateTableScan

FlinkLogicalIntermediateTableScan 用于表示对这些中间结果表进行扫描的逻辑操作

override def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0)val intermediateTable = scan.getTable.unwrap(classOf[IntermediateRelTable])intermediateTable != null}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[TableScan]FlinkLogicalIntermediateTableScan.create(rel.getCluster, scan.getTable)}

FlinkLogicalIntersect

用于表示 SQL 中 INTERSECT 操作的逻辑运算符

override def convert(rel: RelNode): RelNode = {val intersect = rel.asInstanceOf[LogicalIntersect]val newInputs = intersect.getInputs.map {input => RelOptRule.convert(input, FlinkConventions.LOGICAL)}FlinkLogicalIntersect.create(newInputs, intersect.all)}

FlinkLogicalJoin

用于表示 SQL 中 JOIN 操作的逻辑运算符

 override def convert(rel: RelNode): RelNode = {val join = rel.asInstanceOf[LogicalJoin]val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, join.getHints, join.getJoinType)}

FlinkLogicalLegacySink

写数据到传统的数据源

override def convert(rel: RelNode): RelNode = {val sink = rel.asInstanceOf[LogicalLegacySink]val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL)FlinkLogicalLegacySink.create(newInput,sink.hints,sink.sink,sink.sinkName,sink.catalogTable,sink.staticPartitions)}

FlinkLogicalLegacyTableSourceScan

读传统的数据源

override def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0)isTableSourceScan(scan)}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[TableScan]val table = scan.getTable.asInstanceOf[FlinkPreparingTableBase]FlinkLogicalLegacyTableSourceScan.create(rel.getCluster, scan.getHints, table)}

FlinkLogicalMatch

MATCH_RECOGNIZE 语句的逻辑运算符。MATCH_RECOGNIZE 语句允许用户在流数据中进行复杂的事件模式匹配,这对于实时数据处理和复杂事件处理(CEP)非常有用。

override def convert(rel: RelNode): RelNode = {val logicalMatch = rel.asInstanceOf[LogicalMatch]val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)val newInput = RelOptRule.convert(logicalMatch.getInput, FlinkConventions.LOGICAL)new FlinkLogicalMatch(rel.getCluster,traitSet,newInput,logicalMatch.getRowType,logicalMatch.getPattern,logicalMatch.isStrictStart,logicalMatch.isStrictEnd,logicalMatch.getPatternDefinitions,logicalMatch.getMeasures,logicalMatch.getAfter,logicalMatch.getSubsets,logicalMatch.isAllRows,logicalMatch.getPartitionKeys,logicalMatch.getOrderKeys,logicalMatch.getInterval)}

FlinkLogicalMinus

用于表示 SQL 中 minus 操作的逻辑运算符

 override def convert(rel: RelNode): RelNode = {val minus = rel.asInstanceOf[LogicalMinus]val newInputs = minus.getInputs.map {input => RelOptRule.convert(input, FlinkConventions.LOGICAL)}FlinkLogicalMinus.create(newInputs, minus.all)}

FlinkLogicalOverAggregate

用于表示 SQL 中 窗口函数操作的逻辑运算符

FlinkLogicalRank

SQL 中 RANK 或 DENSE_RANK 函数的逻辑运算符。这些函数通常用于对数据进行排序和排名

override def convert(rel: RelNode): RelNode = {val rank = rel.asInstanceOf[LogicalRank]val newInput = RelOptRule.convert(rank.getInput, FlinkConventions.LOGICAL)FlinkLogicalRank.create(newInput,rank.partitionKey,rank.orderKey,rank.rankType,rank.rankRange,rank.rankNumberType,rank.outputRankNumber)}

FlinkLogicalSink

表示SQL里的写

FlinkLogicalSnapshot

SQL 语句中的 AS OF 子句的逻辑运算符。AS OF 子句用于对流数据进行快照操作,从而在处理数据时可以引用特定时间点的数据快照

def convert(rel: RelNode): RelNode = {val snapshot = rel.asInstanceOf[LogicalSnapshot]val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL)snapshot.getPeriod match {case _: RexFieldAccess =>FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod)case _: RexLiteral =>newInput}}

FlinkLogicalSort

表示SQL里的排序

FlinkLogicalUnion

表示SQL里的union 操作

 override def matches(call: RelOptRuleCall): Boolean = {val union: LogicalUnion = call.rel(0)union.all}override def convert(rel: RelNode): RelNode = {val union = rel.asInstanceOf[LogicalUnion]val newInputs = union.getInputs.map {input => RelOptRule.convert(input, FlinkConventions.LOGICAL)}FlinkLogicalUnion.create(newInputs, union.all)}

FlinkLogicalValues

SQL 中 VALUES 表达式的逻辑运算符。VALUES 表达式允许在查询中直接定义一组值,这在需要构造临时数据或进行简单的数据输入时非常有用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/38148.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

语音唤醒入门(基于ESP-skainet)

主要参考资料: ESP-SR 用户指南: https://docs.espressif.com/projects/esp-sr/zh_CN/latest/esp32s3/index.html 目录 ESP提供的模型直接初始化和使用模型AFE声学前端算法 使用模型 自定义模型 ESP提供的模型 乐鑫提供了经过训练的 WakeNet 和 MultiNet 模型&…

45.分解质因数

上海市计算机学会竞赛平台 | YACSYACS 是由上海市计算机学会于2019年发起的活动,旨在激发青少年对学习人工智能与算法设计的热情与兴趣,提升青少年科学素养,引导青少年投身创新发现和科研实践活动。https://www.iai.sh.cn/problem/711 题目描述 给定一个整数 𝑛n,请将它…

HDFS详细介绍以及HDFS集群环境部署【hadoop组件HDFS笔记】(图片均为学习时截取的)

HDFS详细介绍 HDFS是什么 HDFS是Hadoop三大组件(HDFS、MapReduce、YARN)之一 全称是:Hadoop Distributed File System(Hadoop分布式文件系统);是Hadoop技术栈内提供的分布式数据存储解决方案 可以在多台服务器上构建存储集群&…

云计算【第一阶段(21)】Linux引导过程与服务控制

目录 一、linux操作系统引导过程 1.1、开机自检 1.2、MBR引导 1.3、GRUB菜单 1.4、加载 Linux 内核 1.5、init进程初始化 1.6、简述总结 1.7、初始化进程centos 6和7的区别 二、排除启动类故障 2.1、修复MBR扇区故障 2.1.1、 实验 2.2、修复grub引导故障 2.2.1、实…

敏捷开发笔记(第9章节)--开放-封闭原则(OCP)

目录 1:PDF上传链接 9.1 开放-封闭原则(OCP) 9.2 描述 9.3 关键是抽象 9.3.1 shape应用程序 9.3.2 违反OCP 糟糕的设计 9.3.3 遵循OCP 9.3.4 是的,我说谎了 9.3.5 预测变化和“贴切的”结构 9.3.6 放置吊钩 1.只受一次…

团队任务管理跟踪软件有哪些?分享2024年值得关注的10款

本文将分享2024年值得关注的10款团队任务管理跟踪软件:Worktile、PingCode、Zoho Projects、Wrike、ProofHub、Connecteam、MeisterTask、Nifty、BIGContacts、Hive。 无论是小型初创企业还是庞大的跨国公司,高效的任务管理都能显著提升工作效率&#xf…

面试框架一些小结

springcloud的⼯作原理 springcloud由以下⼏个核⼼组件构成: Eureka:各个服务启动时,Eureka Client都会将服务注册到Eureka Server,并且Eureka Client还可以反过来从Eureka Server拉取注册表, 从⽽知道其他服务在哪⾥ …

新能源行业知识体系-------主目录-----持续更新

本文相当于目录方便快速检索内容,没有实际内容,只做索引 文章目录 一、电力市场概论二、蒙西电网需求侧响应三、蒙西电网市场结算V2.0 一、电力市场概论 是学习清华大学电力市场概论(2024年春)的学习笔记,详细了解电力市场是如何利用经济学知…

48 - 按日期分组销售产品(高频 SQL 50 题基础版)

48 - 按日期分组销售产品 -- group_concat 分组拼接selectsell_date,count(distinct product) num_sold,group_concat(distinct product order by product separator ,) products fromActivities group bysell_date;

grpc教程——proto文件转go

【1】编写一个proto文件 syntax "proto3"; package myproto;service NC{rpc SayStatus (NCRequest) returns (NCResponse){} }message NCRequest{ string name 1; } message NCResponse{string status 1; } 【2】转换:protoc --go_out. myservice.pro…

mmdetection2.28修改backbone不使用预训练参数、从头训练

背景 最近需要测试一下在backbone部分如果不使用预训练参数的话,模型需要多少轮才能收敛所使用的backbone是mmcls.ConvNeXtmmdetection版本为2.28.2,mmcls版本为0.25.0 修改流程 最简单的方法,直接去mmcls的model zoo里找到对应backbone的…

SpringBoot:使用Spring Batch实现批处理任务

引言 在企业级应用中,批处理任务是不可或缺的一部分。它们通常用于处理大量数据,如数据迁移、数据清洗、生成报告等。Spring Batch是Spring框架的一部分,专为批处理任务设计,提供了简化的配置和强大的功能。本文将介绍如何使用Spr…

6-Pandas使用自定义函数

Pandas使用自定义函数 如果想要应用自定义的函数,或者把其他库中的函数应用到 Pandas 对象中,有以下三种方法: 1) 操作整个 DataFrame 的函数:pipe()2) 操作行或者列的函数:apply()3) 操作单一元素的函数&#xff1a…

Python 教程---面向对象编程

面向对象编程 4.1 类和对象的概念类(Class)对象(Object)示例创建和使用类 4.2 类成员实例成员类成员静态成员示例 4.3 面向对象三要素封装(Encapsulation)继承(Inheritance)多态&…

单晶层状氧化物制作方法技术资料 纳离子技术

网盘 https://pan.baidu.com/s/1hjHsXvTXG74-0fDo5TtXWQ?pwd10jk 单晶型高熵普鲁士蓝正极材料及其制备方法与应用.pdf 厘米级铬氧化物单晶及其制备方法和存储器件.pdf 多孔氧化物单晶材料及其制备方法和应用.pdf 大单晶层状氧化物正极材料及其制备方法和应用.pdf 富钠P2相层状…

docker k8s

1、docker是什么? 将环境和程序一起打包给到 服务器运行的工具软件。 2、基础镜像base image是什么? 操作系统:用户空间、内核空间 阉割操作系统,利用其的用户空间(因为应用程序运行在用户空间)&#xf…

量化交易之机器学习篇 - 实现K近邻模型的两种方式

# 导入相关模块import numpy as npfrom collections import Counter import matplotlib.pyplot as pltfrom sklearn import datasets from sklearn.utils import shuffledef load_data():iris datasets.load_iris()# 打乱数据后的数据和标签X, y shuffle(iris.data, iris.tar…

【机器学习】在【Pycharm】中的应用:【线性回归模型】进行【房价预测】

专栏:机器学习笔记 pycharm专业版免费激活教程见资源,私信我给你发 python相关库的安装:pandas,numpy,matplotlib,statsmodels 1. 引言 线性回归(Linear Regression)是一种常见的统计方法和机器学习算法&a…

弹性力学讲义

弹性力学讲义 1. 基本假设和一些概念2. 应力3. 二维应力状态与摩尔库伦屈服准则 1. 基本假设和一些概念 力学:变形体力学–固体力学和流体力学(连续介质力学) 刚体力学–理论力学(一般力学) 物理受理后:要…

Facebook的投流技巧有哪些?

相信大家都知道Facebook拥有着巨大的用户群体和高转化率,在国外社交推广中的影响不言而喻。但随着Facebook广告的竞争越来越激烈,在Facebook广告上获得高投资回报率也变得越来越困难。IPIDEA代理IP今天就教大家如何在Facebook上投放广告的技巧&#xff0…