【Flink系列七】TableAPI和FlinkSQL初体验

Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL

  • Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。
  •  Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入是连续的(流式)还是有界的(批处理),在两个接口中指定的查询都具有相同的语义,并指定相同的结果。

基本程序结构

import org.apache.flink.table.api.*;
import org.apache.flink.connector.datagen.table.DataGenOptions;// 可以从流中创建表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件
DataStream<String> inputStream = env.readTextFile("xxx文件地址")//包装转换
DataStream<SensorReading> dataStream = xx转换;//创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//tableEnv
Table dataTable = tableEnv.fromDataSource(dataStream);Table resultTable = dataTable.select("id, temp")
.where("id=1");// 输出数据
tableEnv.toAppendStream(resultTable, Sensor.class);// register the Table dataTable as table "dataTable"
tableEnv.createTemporaryView("dataTable", dataTable);// 执行sql
Table resultTable1 = tableEnv.sqlQuery("select id,temp from dataTable where is = 1");// 输出数据
tableEnv.toAppendStream(resultTable1, Sensor.class);

TableEnvironment

TableAPI的核心概念是TableEnvironment,它主要负责

  • 在内部的 catalog 中注册 Table
  • 注册外部的 catalog
  • 加载可插拔模块
  • 执行 SQL 查询
  • 注册自定义函数 (scalar、table 或 aggregation)
  • DataStream 和 Table 之间的转换(面向 StreamTableEnvironment )

Table 总是与特定的 TableEnvironment 绑定。 不能在同一条查询中使用不同 TableEnvironment 中的表,例如,对它们进行 join 或 union 操作。 TableEnvironment 可以通过静态方法 TableEnvironment.create() 创建。

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode()//.inBatchMode().build();TableEnvironment tEnv = TableEnvironment.create(settings);

或者也可以按程序示例中的方式,从现有的 StreamExecutionEnvironment 创建一个 StreamTableEnvironment 与 DataStream API 互操作。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

在 Catalog 中创建表

---先解释一下什么是Catalog:数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。cataLog详细解释

TableEnvironment 维护着一个由标识符(identifier)创建的表 catalog 的映射。标识符由三个部分组成:catalog 名称、数据库名称以及对象名称。如果 catalog 或者数据库没有指明,就会使用当前默认值

TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");

创建表

在 SQL 的术语中,Table API 的对象对应于视图(虚拟表)。它封装了一个逻辑查询计划。它可以通过以下方法在 catalog 中创建:

// register the Table dataTable as table "dataTable"
tableEnv.createTemporaryView("dataTable", dataTable);// 执行sql
Table resultTable = tableEnv.sqlQuery("select id,temp from dataTable where is = 1");// 输出数据
tableEnv.toAppendStream(resultTable, Sensor.class);

注意: 从传统数据库系统的角度来看,Table 对象与 VIEW 视图非常像。也就是,定义了 Table 的查询是没有被优化的, 而且会被内嵌到另一个引用了这个注册了的 Table的查询中。如果多个查询都引用了同一个注册了的Table,那么它会被内嵌每个查询中并被执行多次, 也就是说注册了的Table的结果不会被共享。

Connector Tables

另外一个方式去创建 TABLE 是通过 connector 声明。Connector 描述了存储表数据的外部系统。存储系统例如 Apache Kafka 或者常规的文件系统都可以通过这种方式来声明。

// Using table descriptors
final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen").schema(Schema.newBuilder().column("f0", DataTypes.STRING()).build()).option(DataGenOptions.ROWS_PER_SECOND, 100).build();tableEnv.createTable("SourceTableA", sourceDescriptor);
tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor);// Using SQL DDL
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")

扩展表标识符 

表总是通过三元标识符注册,包括 catalog 名、数据库名和表名。

用户可以指定一个 catalog 和数据库作为 “当前catalog” 和"当前数据库"。有了这些,那么刚刚提到的三元标识符的前两个部分就可以被省略了。如果前两部分的标识符没有指定, 那么会使用当前的 catalog 和当前数据库。用户也可以通过 Table API 或 SQL 切换当前的 catalog 和当前的数据库。

标识符遵循 SQL 标准,因此使用时需要用反引号(`)进行转义。

// 默认
TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");// 注册一个view 命名为 'exampleView' 在 catalog 中命名为 'custom_catalog'
// 默认在 'custom_database'的数据库中 
tableEnv.createTemporaryView("exampleView", table);//注册一个view 命名为 'exampleView' 在 catalog 中命名为 'custom_catalog'
// 指定数据库的名称为 'other_database' 
tableEnv.createTemporaryView("other_database.exampleView", table);// 注册一个view 命名为  'example.View', 在 catalog 中命名为'custom_catalog'
// 默认在 'custom_database'的数据库中 
tableEnv.createTemporaryView("`example.View`", table);// 注册view名为 'exampleView', 指定 catalog命名为'other_catalog'
// 指定数据库的名称为  'other_database' 
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);

查询表

tableApi

Table API 是基于 Table 类的,该类表示一个表(流或批处理),并提供使用关系操作的方法。这些方法返回一个新的 Table 对象,该对象表示对输入 Table 进行关系操作的结果。 一些关系操作由多个方法调用组成,例如 table.groupBy(...).select(),其中 groupBy(...) 指定 table 的分组,而 select(...) 在 table 分组上的投影。

所有流处理和批处理表支持的 Table API 算子--> 文档 Table API 

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section// 注册表// 获取order表
Table orders = tableEnv.from("Orders");
// 从所有的数据中过滤 cCountry=France,然后聚合
Table revenue = orders.filter($("cCountry").isEqual("FRANCE")).groupBy($("cID"), $("cName")).select($("cID"), $("cName"), $("revenue").sum().as("revSum"));// emit or convert Table
// execute query

SQL

Flink SQL 是基于实现了SQL标准的 Apache Calcite 的。SQL 查询由常规字符串指定。

Flink对流处理和批处理表的SQL支持-->文档 SQL 

一个示例:

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section// 注册 Orders table// 执行sql处理
Table revenue = tableEnv.sqlQuery("SELECT cID, cName, SUM(revenue) AS revSum " +"FROM Orders " +"WHERE cCountry = 'FRANCE' " +"GROUP BY cID, cName");// 展示了如何指定一个更新查询,将查询的结果插入到已注册的表中。
// 执行sql获取到的数据 and 提交到 "RevenueFrance"表中
tableEnv.executeSql("INSERT INTO RevenueFrance " +"SELECT cID, cName, SUM(revenue) AS revSum " +"FROM Orders " +"WHERE cCountry = 'FRANCE' " +"GROUP BY cID, cName");

输出表

Table 通过写入 TableSink 输出。TableSink 是一个通用接口,用于支持多种文件格式(如 CSV、Apache Parquet、Apache Avro)、存储系统(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息队列系统(如 Apache Kafka、RabbitMQ)。

批处理 Table 只能写入 BatchTableSink,而流处理 Table 需要指定写入 AppendStreamTableSinkRetractStreamTableSink 或者 UpsertStreamTableSink

演示如何输出 Table

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section// create an output Table
final Schema schema = Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING()).column("c", DataTypes.BIGINT()).build();tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("filesystem").schema(schema).option("path", "/path/to/file").format(FormatDescriptor.forFormat("csv").option("field-delimiter", "|").build()).build());// compute a result Table using Table API operators and/or SQL queries
Table result = ...//方法 Table.executeInsert(String tableName) 将 Table 发送至已注册的 TableSink。该方法通过名称在 catalog 中查找 TableSink 并确认Table schema 和 TableSink schema 一致。
result.executeInsert("CsvSinkTable");

翻译与执行查询 

不论输入数据源是流式的还是批式的,Table API 和 SQL 查询都会被转换成 DataStream 程序。 查询在内部表示为逻辑查询计划,并被翻译成两个阶段:

  1. 优化逻辑执行计划
  2. 翻译成 DataStream 程序

Table API 或者 SQL 查询在下列情况下会被翻译:

  1. 当 T ableEnvironment.executeSql() 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
  2. 当 Table.executeInsert() 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
  3. 当 Table.execute() 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
  4. 当 StatementSet.execute() 被调用时。Table (通过 StatementSet.addInsert() 输出给某个 Sink)和 INSERT 语句 (通过调用 StatementSet.addInsertSql())会先被缓存到 StatementSet 中,StatementSet.execute() 方法被调用时,所有的 sink 会被优化成一张有向无环图。
  5. 当 Table 被转换成 DataStream 时(参阅与 DataStream 集成)。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 StreamExecutionEn vironment.execute() 时被执行。

查询优化

Apache Flink 使用并扩展了 Apache Calcite 来执行复杂的查询优化。 这包括一系列基于规则和成本的优化,例如:

  1.  基于 Apache Calcite 的子查询解相关
  2. 投影剪裁
  3. 分区剪裁
  4. 过滤器下推
  5. 子计划消除重复数据以避免重复计算
  6. 特殊子查询重写,包括两部分:
    1. 将 IN 和 EXISTS 转换为 left semi-joins
    2. 将 NOT IN 和 NOT EXISTS 转换为 left anti-join
  7. 可选 join 重新排序
    1. 通过 table. optimizer.join-reorder-enabled 启用

注意: 当前仅在子查询重写的结合条件下支持 IN / EXISTS / NOT IN / NOT EXISTS。

优化器不仅基于计划,而且还基于可从数据源获得的丰富统计信息以及每个算子(例如 io,cpu,网络和内存)的细粒度成本来做出明智的决策。

解释表

Table API 提供了一种机制来解释计算 Table 的逻辑和优化查询计划。 这是通过 Table.explain() 方法或者 StatementSet.explain() 方法来完成的。Table.explain() 返回一个 Table 的计划。StatementSet.explain() 返回多 sink 计划的结果。它返回一个描述三种计划的字符串:

  1. 关系查询的抽象语法树(the Abstract Syntax Tree),即未优化的逻辑查询计划,
  2. 优化的逻辑查询计划,以及
  3. 物理执行计划。

可以用 TableEnvironment.explainSql() 方法和 TableEnvironment.executeSql() 方法支持执行一个 EXPLAIN 语句获取逻辑和优化查询计划,请参阅 EXPLAIN 页面.

以下代码展示了一个示例以及对给定 Table 使用 Table.explain() 方法的相应输出:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));// explain Table API
Table table1 = tEnv.fromDataStream(stream1, $("count"), $("word"));
Table table2 = tEnv.fromDataStream(stream2, $("count"), $("word"));
Table table = table1.where($("word").like("F%")).unionAll(table2);System.out.println(table.explain());

上述例子的结果是:

== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:  +- LogicalTableScan(table=[[Unregistered_DataStream_1]])
+- LogicalTableScan(table=[[Unregistered_DataStream_2]])== Optimized Physical Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])

参考资料:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/common/#expanding-table-identifiers

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/225467.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

文章解读与仿真程序复现思路——电网技术EI\CSCD\北大核心《基于时空注意力卷积模型的超短期风电功率预测》

这个标题描述了一种用于超短期风电功率预测的模型&#xff0c;该模型基于时空注意力卷积模型。下面我会逐步解读这个标题的关键词和背景&#xff1a; 超短期风电功率预测&#xff1a;风电功率预测是指根据历史风速和其他相关数据&#xff0c;通过建立数学模型来预测未来特定时间…

Windows使用selenium操作浏览器爬虫

以前的大部分程序都是操作Chrome&#xff0c;很少有操作Edge&#xff0c;现在以Edge为例。 Selenium本身是无法直接控制浏览器的&#xff0c;不同的浏览器需要不同的驱动程序&#xff0c;Google Chrome需要安装ChromeDriver、Edge需要安装Microsoft Edge WebDriver&#xff0c…

DevOps搭建(九)-Jenkins实现基础CI、CD详细操作

1、创建可运行SpringBoot项目 1.1、创建一个新工程 在idea里创建一个项目,这里叫devops-test,如下图: String Boot版本要选择2.x的,依赖直选中Spring Web选项即可: 修改pom.xml文件,在build标签中增加如下内容,目的是简化jar包名称。 <finalName>devops-test&l…

Ubuntu虚拟机怎么设置静态IP

1 首先先ifconfig看一下使用的是哪个网络接口&#xff1a; 2 编辑 sudo vi /etc/netplan/00-installer-config.yamlnetwork:ethernets:ens33: # 根据您的网络接口进行修改&#xff0c;有的是eth0&#xff0c;有的是ens33&#xff0c;具体看第一步显示的是哪个网络接口addres…

Knife4j 接口文档如何设置 Authorization 鉴权参数?

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f33a; 仓库主页&#xff1a; Gitee &#x1f4ab; Github &#x1f4ab; GitCode &#x1f496; 欢迎点赞…

针对基于nohup后台运行PyTorch多卡并行程序中断问题的一种新方法

针对基于nohup后台运行PyTorch多卡并行程序中断问题的一种新方法 文章目录 针对基于nohup后台运行PyTorch多卡并行程序中断问题的一种新方法Abstractscreen和tmux介绍tmux常用命令以及快捷键Byobu简单操作步骤集锦参考文献 Abstract PyTorch多卡并行运行程序is one of the mos…

网络互通--三层交换机配置

目录 一、三层交换机的原理 1、概念 2、PC A与不同网段的PC B第一次数据转发过程 3、一次路由&#xff0c;多次转发的概念 4、 三层交换机和路由器的比较 二、利用实验理解交换机 1、建立以下拓扑图​编辑 2、分别配置主机的IP地址&#xff0c;子网掩码、网关等信息 3、…

小白学爬虫:根据商品ID或商品链接获取淘宝商品详情数据接口方法

小白学爬虫的准备工作包括以下几个方面&#xff1a; 学习Python基础知识&#xff1a;首先需要掌握Python编程语言的基本语法和数据类型&#xff0c;了解Python的常用库和模块&#xff0c;例如requests库等。了解HTTP协议和HTML语言&#xff1a;了解HTTP协议的基本概念和原理&a…

【Hadoop_05】NN、2NN以及DataNode的工作机制

1、NameNode和SecondaryNameNode1.1 NN和2NN工作机制1.2 Fsimage和Edits解析1.3 CheckPoint时间设置 2、DataNode2.1 DataNode工作机制2.2 数据完整性2.3 掉线时限参数设置 1、NameNode和SecondaryNameNode 1.1 NN和2NN工作机制 思考&#xff1a;NameNode中的元数据是存储在哪…

CUDA 指定设备的方法,CUDA_VISIBLE_DEVICES 设置当前pytorch程序使用那些GPU设备

在进行pytorch 相关程序开发时&#xff0c;有时需要根据自己的规划使用系统中的多块NVidia GPU 设备&#xff0c;可以通过如下几种方法来指定GPU设备&#xff1a; 当服务器有多个GPU卡时&#xff0c;通过设置 CUDA_VISIBLE_DEVICES环境变量可以改变CUDA程序所能使用的GPU设备&…

ES-组合与聚合

ES组合查询 1 must 满足两个match才会被命中 GET /mergeindex/_search {"query": {"bool": {"must": [{"match": {"name": "liyong"}},{"match_phrase": {"desc": "liyong"}}]}}…

http 返回状态

一、状态1&#xff1a;信息 100 Continue&#xff1a;服务器仅接收到部分请求&#xff0c;但是一旦服务器并没有拒绝该请求&#xff0c;客户端应该继续发送其余的请求。 101 Switching Protocols &#xff1a;服务器转换协议&#xff1a;服务器将遵从客户的请求转换到另外一种…

消息队列kafka详解:Kafka架构介绍

一. 工作流程 Kafka中消息是以topic进行分类的&#xff0c;Producer生产消息&#xff0c;Consumer消费消息&#xff0c;都是面向topic的。 Topic是逻辑上的改变&#xff0c;Partition是物理上的概念&#xff0c;每个Partition对应着一个log文件&#xff0c;该log文件中存储的就…

C/C++ makefile 支持多目录、多文件批量化模版

最近因工作需要&#xff0c;要尝试徒手撸一份makefile文件&#xff0c;这份模版支持批量化&#xff0c;也不针对某一个C/CPP文件指定规则&#xff1a; # Makefile for building: GuiDemo # by MT 2023-12-12 v1.0 # http://blog.csdn.net/wangningyu CC gcc CFLA…

qt 5.15.2连接postgresql9.4数据库功能

qt 5.15.2连接postgresql9.4数据库功能 执行后显示效果&#xff1a; "QSQLITE" "QODBC" "QODBC3" "QPSQL" "QPSQL7" connected success to postgresql9.4 "admin" "1"注意事项: 连接postgresql9.4…

SpringBoot接入企微机器人

1、企业微信创建机器人&#xff08;如何创建不懂的请自行百度&#xff0c;很简单的&#xff09;&#xff0c;成功后能获取到一个Webhook地址&#xff1a;https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key693a91f6-7xxx-4bc4-97a0-0ec2sifa5aaa 2、创建一个SpringBoot项…

Leetcode 37 解数独

题意理解&#xff1a; 填充数独。每个九宫格内&#xff0c;9个数字各出现一个次&#xff0c;每行&#xff0c;每列上&#xff0c;9个数字各出现一次。数独部分空格内已填入了数字&#xff0c;空白格用 . 表示。 这道题要比N皇后问题更难&#xff1a; N皇后只放置N个皇后的位置&…

网络安全Web学习记录———CTF---Web---SQL注入(GET和POST传参)例题

小白初见&#xff0c;若有问题&#xff0c;希望各位大哥多多指正~ 我的第一道web类CTF题——一起来撸猫o(•ェ•)m-CSDN博客 最开始学习CTF里的web方向时&#xff0c;每次做了题遇到类似的老是忘记之前的解法&#xff0c;所以写点东西记录一下。听大哥的话&#xff0c;就从最…

企业微信旧版-新版网络连接错误,无法登录的解决方案

一.企业微微信无法登录故障 二.解决方案 1.网上的解决方案 **检查网络连接&#xff1a;**确保你的计算机正常连接到互联网。尝试打开其他网页&#xff0c;以确保网络连接正常。 **防火墙和安全软件&#xff1a;**某些防火墙或安全软件可能会阻止企业微信的正常连接。请确保你…

2023.12.13 关于 MySQL 复杂查询

目录 聚合查询 聚合函数 group by 子句 执行流程图 联合查询 笛卡尔积 内连接 外连接 左外连接 右外连接 自连接 子查询 单行子查询 多行子查询 EXISTS 关键字 合并查询 union on 和 union 的区别 聚合查询 聚合函数 函数说明COUNT([DISTINCT] expr)返回查询到…