21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明
21、Flink 的table API与DataStream API 集成(2)- 批处理模式和inser-only流处理
21、Flink 的table API与DataStream API 集成(3)- changelog流处理、管道示例、类型转换和老版本转换示例
21、Flink 的table API与DataStream API 集成(完整版)
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
25、Flink 的table api与sql之函数(自定义函数示例)
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
33、Flink 的Table API 和 SQL 中的时区
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的


文章目录

  • Flink 系列文章
  • 一、Table API 与 DataStream API集成
    • 1、概述
    • 2、 DataStream 和 Table 相互转换示例
      • 1)、示例1 - toDataStream
      • 2)、示例2 - toChangelogStream
      • 3)、示例3 - 通过仅切换标志来处理批处理和流数据
    • 3、集成说明
      • 1)、maven依赖
      • 2)、import
      • 3)、Configuration
      • 4)、执行行为
        • 1、DataStream API
        • 2、Table API


本文是Flink table api 与 datastream api的集成的第一篇,主要介绍了集成的概述、table api 与 datastream api相互转换的三个示例以及其集成的说明(即maven依赖、import、配置以及执行行为),并以具体的示例进行说明。
本文依赖flink、kafka集群能正常使用。
本文分为3个部分,即集成概述、三个入门示例、集成说明。
本文的示例是在Flink 1.17版本中运行。

一、Table API 与 DataStream API集成

1、概述

在定义数据处理管道时,Table API和DataStream API同样重要。

DataStream API在一个相对低级的命令式编程API中提供流处理的原语(即时间、状态和数据流管理)。Table API抽象了许多内部构件,并提供了结构化和声明性API。

这两个API都可以处理有界和无界流。

在处理历史数据时,需要管理有界流。无界流发生在实时处理场景中,这些场景可能先使用历史数据进行初始化。

为了有效执行,这两个API都以优化的批处理执行模式提供处理有界流。然而,由于批处理只是流的一种特殊情况,因此也可以在常规流执行模式下运行有界流的管道。

一个API中的管道可以端到端定义,而不依赖于另一个API。然而,出于各种原因,混合这两种API可能是有用的:

  • 在DataStream API中实现主管道(main pipeline)之前,使用表生态系统(table ecosystem)轻松访问目录(catalogs )或连接到外部系统。
  • 在DataStream API中实现主管道之前,访问一些SQL函数以进行无状态数据规范化和清理。
  • 如果table API中不存在更低级的操作(例如自定义计时器处理),则不时切换到DataStream API。

Flink提供了特殊的桥接功能,以使与DataStream API的集成尽可能顺利。

在DataStream 和Table API之间切换会增加一些转换开销。例如,部分处理二进制数据的表运行时(即RowData)的内部数据结构需要转换为更用户友好的数据结构(即Row)。通常,这个开销可以忽略。

  • maven依赖
    本篇文章,如果没有特殊说明,将使用如下maven依赖
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-gateway</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-uber</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.12</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>3.1.2</version></dependency><!-- flink连接器 --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.24.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><!-- <scope>provided</scope> --></dependency></dependencies>

2、 DataStream 和 Table 相互转换示例

Flink提供了专门的StreamTableEnvironment,用于与DataStream API集成。这些环境使用其他方法扩展常规TableEnvironment,并将DataStream API中使用的StreamExecutionEnvironments作为参数。

1)、示例1 - toDataStream

下面的代码展示了如何在两个API之间来回切换的示例。表的列名和类型自动从DataStream的TypeInformation派生。由于DataStream API本机不支持变更日志处理,因此代码假设在流到表和表到流转换期间仅附加/仅插入语义。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** @author alanchan**/
public class ConvertingDataStreamAndTableDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 1、创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 2、创建输入流DataStream<String> dataStream = env.fromElements("alan", "alanchan", "alanchanchn");// 3、将datastream 转为 tableTable inputTable = tenv.fromDataStream(dataStream);// 4、创建视图,该步骤不是必须,将姓名转为大写tenv.createTemporaryView("InputTable", inputTable);Table resultTable = tenv.sqlQuery("SELECT UPPER(f0) FROM InputTable");// 5、将table转成datastream进行输出DataStream<Row> resultStream = tenv.toDataStream(resultTable);resultStream.print();env.execute();}}
  • 示例输出
12> +I[ALAN]
14> +I[ALANCHANCHN]
13> +I[ALANCHAN]

fromDataStream和toDataStream的完整语义可以在下面的部分中找到。特别是,本节讨论了如何使用更复杂的嵌套类型来影响模式派生。它还包括使用事件时间和水印。

根据查询的类型,在许多情况下,生成的动态表是一个管道,它不仅在将表转换为数据流时产生仅插入的更改,而且还产生收回和其他类型的更新。在表到流转换期间,这可能会导致类似于以下内容的异常

Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].

在这种情况下,需要再次修改查询或切换到ChangelogStream。

2)、示例2 - toChangelogStream

下面的示例显示如何转换更新表。
每个结果行表示更改日志中的一个条目,该条目具有更改标志,可以通过对其调用row.getKind()来查询。在本例中,Alice的第二个分数在更改之前(-U)创建更新,在更改之后(+U)创建更新。

本示例仅仅以一个方法来展示,避免没有必要的代码,运行框架参考上述示例。

	public static void test2() throws Exception {// 1、创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 2、创建输入流DataStream<Row> dataStream = env.fromElements(Row.of("alan", 18), Row.of("alanchan", 19), Row.of("alanchanchn", 20), Row.of("alan", 20));// 3、将datastream 转为 tableTable inputTable = tenv.fromDataStream(dataStream).as("name", "salary");// 4、创建视图,该步骤不是必须tenv.createTemporaryView("InputTable", inputTable);Table resultTable = tenv.sqlQuery("SELECT name, SUM(salary) FROM InputTable GROUP BY name");// 5、将table转成datastream进行输出DataStream<Row> resultStream = tenv.toChangelogStream(resultTable);resultStream.print();env.execute();}
  • 运行结果
2> +I[alan, 18]
16> +I[alanchan, 19]
16> +I[alanchanchn, 20]
2> -U[alan, 18]
2> +U[alan, 38]

fromChangelogStream和toChangelogStream的完整语义可以在下面的部分中找到。特别是,本节讨论了如何使用更复杂的嵌套类型来影响模式派生。它包括使用事件时间和水印。它讨论了如何为输入和输出流声明主键和变更日志模式。

上面的示例显示了如何通过为每个传入记录连续发出逐行更新来增量计算最终结果。然而,在输入流有限(即有界)的情况下,通过利用批处理原理可以更有效地计算结果。

在批处理中,可以在连续的阶段中执行运算符,这些阶段在发出结果之前使用整个输入表。例如,连接操作符可以在执行实际连接之前对两个有界输入进行排序(即排序合并连接算法),或者在使用另一个输入之前从一个输入构建哈希表(即哈希连接算法的构建/探测阶段)。

DataStream API和Table API都提供专门的批处理运行时模式。

3)、示例3 - 通过仅切换标志来处理批处理和流数据

下面的示例说明了统一管道能够通过仅切换标志来处理批处理和流数据。

public static void test3() throws Exception {// 1、创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.BATCH);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 2、创建输入流DataStream<Row> dataStream = env.fromElements(Row.of("alan", 18), Row.of("alanchan", 19), Row.of("alanchanchn", 20), Row.of("alan", 20));// 3、将datastream 转为 tableTable inputTable = tenv.fromDataStream(dataStream).as("name", "salary");// 4、创建视图,该步骤不是必须tenv.createTemporaryView("InputTable", inputTable);Table resultTable = tenv.sqlQuery("SELECT name, SUM(salary) FROM InputTable GROUP BY name");// 5、将table转成datastream进行输出DataStream<Row> resultStream = tenv.toChangelogStream(resultTable);resultStream.print();env.execute();}
  • 运行结果

注意比较和示例2的输出区别

+I[alanchan, 19]
+I[alan, 38]
+I[alanchanchn, 20]

一旦将changelog 应用于外部系统(例如键值存储),可以看到两种模式都能够产生完全相同的输出表。通过在发出结果之前使用所有输入数据,批处理模式的更改日志仅由仅插入的更改组成。有关更多细节,请参阅下面的专用批处理模式部分。

3、集成说明

将Table API与DataStream API相结合的项目需要添加以下桥接模块之一。
它们包括对 flink-table-api-java或flink-table-api-scala的可传递依赖性,以及相应的特定于语言的DataStream api模块。

1)、maven依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.17.1</version><scope>provided</scope>
</dependency>

2)、import

使用DataStream API和Table API的Java或Scala版本声明公共管道需要以下导入。

// imports for Java DataStream API
import org.apache.flink.streaming.api.*;
import org.apache.flink.streaming.api.environment.*;// imports for Table API with bridging to Java DataStream API
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.*;

3)、Configuration

TableEnvironment将采用传递的StreamExecutionEnvironment.中的所有配置选项。然而,不能保证对StreamExecutionEnvironment配置的进一步更改在实例化后传播到StreamTableEnvironment。在规划期间,将选项从Table API传播到DataStream API。

我们建议在切换到Table API之前尽早在DataStream API中设置所有配置选项。

import java.time.ZoneId;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;// create Java DataStream API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// set various configuration earlyenv.setMaxParallelism(256);
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// then switch to Java Table API
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// set configuration early
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin"));// start defining your pipelines in both APIs...

4)、执行行为

这两个API都提供了执行管道的方法。换句话说:如果被请求,它们将编译一个作业图( job graph),该作业图将提交到集群并触发以执行。结果将流式传输到声明的sinks。

通常,这两个API都在方法名称中使用术语“执行”来标记这种行为。然而,Table API和DataStream API之间的执行行为略有不同。

1、DataStream API

DataStream API的StreamExecutionEnvironment使用生成器模式(builder pattern)来构造复杂的管道。管道可能会拆分为多个分支,这些分支可能以sink结尾,也可能不以sink结尾。环境缓冲(environment buffers)所有这些定义的分支,直到提交作业。

StreamExecutionEnvironment.execute()提交整个构建的管道,然后清除构建器。换句话说:不再声明sources 和sinks ,并且可以向生成器中添加新的管道。因此,每个DataStream程序通常以对StreamExecutionEnvironment.execute()的调用结束。或者,DataStream.executeAndCollect()隐式定义了一个sink,用于将结果流式传输到本地客户端。

2、Table API

在Table API中,分支管道仅在StatementSet中受支持,其中每个分支必须声明一个最终sink。TableEnvironment和StreamTableEnvironment都不提供专用的通用execute()方法。相反,它们提供了提交单个source-to-sink管道或语句集的方法:

	final static String sinkSQL = "CREATE TABLE OutputTable (\n" +" userId INT,\r\n" + " age INT,\r\n" + " balance DOUBLE,\r\n" + " userName STRING,\r\n" +" t_insert_time TIMESTAMP(3)\r\n" +") WITH (\n" +"  'connector' = 'print'\n" +")";final static String sinkSQL2 = "CREATE TABLE OutputTable2 (\n" +" userId INT,\r\n" + " age INT,\r\n" + " balance DOUBLE,\r\n" + " userName STRING,\r\n" +" t_insert_time TIMESTAMP(3)\r\n" +") WITH (\n" +"  'connector' = 'print'\n" +")";final static String sourceSQL = "CREATE TABLE InputTable (\r\n" + " userId INT,\r\n" + " age INT,\r\n" + " balance DOUBLE,\r\n" + " userName STRING,\r\n" + " t_insert_time AS localtimestamp,\r\n" + " WATERMARK FOR t_insert_time AS t_insert_time\r\n" + ") WITH (\r\n" + " 'connector' = 'datagen',\r\n" + " 'rows-per-second'='10',\r\n" + " 'fields.userId.kind'='sequence',\r\n" + " 'fields.userId.start'='1',\r\n" + " 'fields.userId.end'='20',\r\n" + " 'fields.balance.kind'='random',\r\n" + " 'fields.balance.min'='1',\r\n" + " 'fields.balance.max'='100',\r\n" + " 'fields.age.min'='1',\r\n" + " 'fields.age.max'='100',\r\n" + " 'fields.userName.length'='6'\r\n" + ");";public static void test4() throws Exception {// 1、创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//sinkSQL//sourceSQL// 建表tenv.executeSql(sourceSQL);//tenv.executeSql(sinkSQL);tenv.executeSql(sinkSQL2);//插入表数据,方式一tenv.from("InputTable").insertInto("OutputTable").execute();tenv.executeSql("select * from OutputTable");tenv.from("InputTable").execute().print();//插入表数据,方式二tenv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");tenv.executeSql("select * from OutputTable");//插入表数据,方式三tenv.createStatementSet().addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable").addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable").execute();// 输出tenv.from("InputTable").execute().print();tenv.executeSql("SELECT * FROM InputTable").print();env.execute();}
  • 输出结果
3> +I[3, 99, 36.20987556045243, d23888, 2023-11-13T14:49:58.812]
15> +I[15, 39, 68.30743253178122, 43bec8, 2023-11-13T14:49:58.812]
2> +I[2, 62, 47.280395949976885, 7bae4e, 2023-11-13T14:49:58.812]
16> +I[16, 52, 42.10205629532836, 6baf0e, 2023-11-13T14:49:58.812]
10> +I[10, 25, 58.008035887440094, d43dea, 2023-11-13T14:49:58.812]
13> +I[13, 36, 70.9215559827798, 01bb28, 2023-11-13T14:49:58.812]
12> +I[12, 38, 30.31004698340413, 322ba8, 2023-11-13T14:49:58.812]
6> +I[6, 17, 32.28909358733212, 13bf88, 2023-11-13T14:49:58.812]
9> +I[9, 49, 44.52802246768357, e8280c, 2023-11-13T14:49:58.812]
8> +I[8, 80, 18.03487847824154, 803b2a, 2023-11-13T14:49:58.812]
5> +I[5, 61, 54.43695775227862, 063f08, 2023-11-13T14:49:58.812]
7> +I[7, 64, 33.886576642098404, 443dea, 2023-11-13T14:49:58.812]
14> +I[14, 92, 63.71527772015468, 123848, 2023-11-13T14:49:58.812]
11> +I[11, 22, 30.745102844313315, e62848, 2023-11-13T14:49:58.812]
4> +I[4, 78, 88.60724929598506, 55bca8, 2023-11-13T14:49:58.812]
1> +I[1, 82, 62.50149215989057, 0bba0c, 2023-11-13T14:49:58.812]
3> +I[19, 67, 14.244993215937432, e6c911, 2023-11-13T14:49:59.806]
1> +I[17, 67, 91.05078612782468, 560b6c, 2023-11-13T14:49:59.807]
4> +I[20, 95, 82.12047947156385, 1ac5b2, 2023-11-13T14:49:59.807]
2> +I[18, 81, 25.384055001988084, fe98d1, 2023-11-13T14:49:59.806]
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
| op |      userId |         age |                        balance |                       userName |           t_insert_time |
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
| +I |           1 |          91 |             22.629318048042723 |                         923e08 | 2023-11-13 14:49:59.800 |
| +I |           2 |          67 |              75.26915785038814 |                         342baa | 2023-11-13 14:49:59.803 |
| +I |           3 |          68 |              74.06076023217011 |                         1dbbce | 2023-11-13 14:49:59.803 |
| +I |           4 |          26 |              79.47471729272772 |                         083e2e | 2023-11-13 14:49:59.802 |
| +I |           5 |          97 |              82.56249330491859 |                         4a3c6e | 2023-11-13 14:49:59.804 |
| +I |           6 |          32 |              81.74903214944425 |                         fdac4e | 2023-11-13 14:49:59.800 |
| +I |           7 |          67 |              94.80154136831771 |                         f7acea | 2023-11-13 14:49:59.800 |
| +I |           8 |          53 |              50.85073238739004 |                         cfbd0c | 2023-11-13 14:49:59.800 |
| +I |           9 |          69 |              93.64054547476522 |                         7fa9ec | 2023-11-13 14:49:59.801 |
| +I |          10 |          66 |              61.92366658766452 |                         05b86a | 2023-11-13 14:49:59.803 |
| +I |          11 |          81 |              95.61717698776191 |                         efa8ce | 2023-11-13 14:49:59.797 |
| +I |          12 |           8 |             63.573174957723076 |                         0fbfec | 2023-11-13 14:49:59.802 |
| +I |          13 |          85 |             52.938510850778734 |                         43bfa8 | 2023-11-13 14:49:59.803 |
| +I |          14 |          26 |              5.130287258770441 |                         083c6c | 2023-11-13 14:49:59.797 |
| +I |          15 |          35 |               73.3318749510538 |                         0e3b4c | 2023-11-13 14:49:59.802 |
| +I |          16 |          84 |              16.24326410122912 |                         ac2d6e | 2023-11-13 14:49:59.802 |
| +I |          18 |          41 |              32.38455189801736 |                         b07afb | 2023-11-13 14:50:00.804 |
| +I |          19 |          24 |               77.6947569111452 |                         7f72ac | 2023-11-13 14:50:00.803 |
| +I |          20 |          92 |              82.53929937026987 |                         051fb9 | 2023-11-13 14:50:00.802 |
| +I |          17 |          93 |             12.784194121509948 |                         bce5d9 | 2023-11-13 14:50:00.801 |
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
20 rows in set

为了组合这两种执行行为,对StreamTableEnvironment.toDataStream或StreamTableEnviron.toChangelogStream的每次调用都将具体化(materialize )(即编译)Table API子管道(sub-pipeline),并将其插入DataStream API管道生成器(builder)中。这意味着之后必须调用StreamExecutionEnvironment.execute()或DataStream.executeAndCollect。Table API中的执行不会触发这些“外部部件(external parts)”。

// adds a branch with a printing sink to the StreamExecutionEnvironment
tableEnv.toDataStream(table).print();// (2)// executes a Table API end-to-end pipeline as a Flink job and prints locally,
// thus (1) has still not been executed
table.execute().print();// executes the DataStream API pipeline with the sink defined in (1) as a
// Flink job, (2) was already running before
env.execute();

上述示例中有具体应用。

以上,本文是Flink table api 与 datastream api的集成的第一篇,主要介绍了集成的概述、table api 与 datastream api相互转换的三个示例以及其集成的说明(即maven依赖、import、配置以及执行行为),并以具体的示例进行说明。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/142551.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

arm2 day6

串口实现单个字符的收发 main.c uart4.c uart4.h

java轮播图接口实现

一. 内容简介 实现java后端用户管理接口&#xff0c;数据库使用msyql。 二. 软件环境 2.1 java 1.8 2.2 mysql Ver 8.0.13 for Win64 on x86_64 (MySQL Community Server - GPL) 2.3 IDEA ULTIMATE 2019.3 2.4d代码地址 https://gitee.com/JJW_1601897441/competitionAs…

【左程云算法全讲10】打表技巧和矩阵处理技巧

系列综述&#xff1a; &#x1f49e;目的&#xff1a;本系列是个人整理为了秋招面试的&#xff0c;整理期间苛求每个知识点&#xff0c;平衡理解简易度与深入程度。 &#x1f970;来源&#xff1a;材料主要源于左程云算法课程进行的&#xff0c;每个知识点的修正和深入主要参考…

冲击900亿美元估值!邀约路演、秘密交表的Shein上市有望

双十一的狂欢刚刚结束&#xff0c;Shein即将赴美上市的消息又在电商圈里投下一枚重磅炸弹。 继被媒体曝光其寻求900亿美金估值后&#xff0c;最新的消息称其已邀请投资人参与路演&#xff0c;且已秘密完成交表。这个神秘的中国独角兽&#xff0c;离敲钟登陆美股的日子越来越近…

SoftwareTest6 - 用 Selenium 怎么点点点

用 Selenium 来点点点 一 . 什么是自动化 ?1.1 自动化测试的分类接口自动化测试UI 自动化测试 (界面测试) 1.2 实现自动化测试的工具 : selenium环境部署驱动 二 . selenium 的使用2.1 一个简单的示例 : 让谷歌浏览器在百度首页搜索蔡徐坤准备工作编写代码 2.2 打开谷歌浏览器…

【vue】AntDV组件库中a-upload实现文件上传:

文章目录 一、文档&#xff1a;二、使用(以Jeecg为例)&#xff1a;【1】template&#xff1a;【2】script&#xff1a; 三、效果图&#xff1a; 一、文档&#xff1a; Upload 上传–Ant Design Vue 二、使用(以Jeecg为例)&#xff1a; 【1】template&#xff1a; <a-uploa…

day08_子网划分与子网掩码

什么是子网划分? 1、概念&#xff1a;借主机位给网络位使用,以此来达到把一个大网段划分为n个儿子网段的目的&#xff0c;2. 为何要进行子网划分&#xff1f;3、子网掩码&#xff1a;就是对ip地址打记号4、 网络地址的计算机方式&#xff1a;ip地址与子网掩码都转换成二进制&a…

Kafka消息队列为什么会丢消息,要怎么解决?

Kafka消息队列为什么会丢消息&#xff0c;怎么解决Kafka消息队列丢消息问题 Kafka 是一个分布式的高可用、高性能消息队列&#xff0c;它可以用于大规模的数据处理和流式计算场景。在 Kafka 中丢失消息会导致数据的不连续性、计算结果的准确性下降等问题&#xff0c;从而影响到…

【C++ 学习 ㊲】- 五种特殊类的设计

目录 一、设计一个禁止拷贝的类 二、设计一个只能在堆区上创建对象的类 三、设计一个只能在栈区和静态区上创建对象的类 四、设计一个不能继承的类 五、设计一个只能创建一个对象的类&#xff08;单例模式&#xff09; 一、设计一个禁止拷贝的类 拷贝只会发生在两个场景中…

IronSource 聚合广告平台接入踩坑日记——游戏声音消失

1.IOS 播放完广告之后&#xff0c;关闭广告&#xff0c;发现整个游戏的声音消失 解决方案&#xff1a; 1.取消勾选 Mute Other audio sources&#xff0c; 对于这个选项&#xff0c;官方的说法是&#xff1a; 运行 Unity 应用程序时&#xff0c;停止或允许在后台播放来自其…

基于51单片机PCF8591数字电压表数码管显示设计( proteus仿真+程序+设计报告+讲解视频)

PCF8591数字电压表数码管显示 1.主要功能&#xff1a;讲解视频&#xff1a;2.仿真3. 程序代码4. 设计报告5. 设计资料内容清单&&下载链接资料下载链接&#xff08;可点击&#xff09;&#xff1a; 基于51单片机PCF8591数字电压表数码管设计( proteus仿真程序设计报告讲…

Vue3-TypeScript-Threejs:导入外部的glb格式3D模型

一、直接上代码&#xff0c;在vue3-typescript-threejs 项目 导入外部的glb格式3D模型 极简代码&#xff0c;快速理解 <template><div ref"container"></div></template><script lang"ts" setup>import { onMounted, ref …

IEEE--DSConv: Efficient Convolution Operator 论文翻译

论文地址:https://arxiv.org/pdf/1901.01928v1.pdf 目录 摘要 1 介绍 2 相关工作 3 DSConv层 4 量化过程 5 分布偏移 6 优化推断 7 训练 8 结果 8.1 ImageNet 8.2 内存和计算负载 8.3 转移性 9 结论 摘要 我们引入了一种卷积层的变体&#xff0c;称为DSConv&…

cmd打开idea

当我们用idea打开一个项目的时候&#xff0c;有时候这个项目目录是有的&#xff0c;但是用idea的open却找不到&#xff0c;有时候我要重新关闭窗口&#xff0c;再open好多次才有 于是我现在使用命令打开&#xff0c;先把idea安装路径的bin目录放在path里面 然后cd到项目路径&…

Linux操作系统使用及C高级编程-D3Linux shell命令(权限、输入输出)

Shell 是一种应用程序&#xff0c;用以完成用户与内核之间的交互 一个功能强大的编程语言&#xff08;C语言&#xff09; 一个解释执行的脚本语言&#xff0c;不需要编译&#xff0c;写完直接执行 目前Linux 乌班图的Shell默认是bash 查看当前提供的Shell&#xff1a;cat /…

Tomcat隐藏版本号和关闭默认管理页面

一. 隐藏Tomcat异常页面中的版本信息&#xff0c;Tomcat服务器版本号泄露 Tomcat/8.5.xx相关版本号等信息&#xff0c;是不安全的。这会被黑客获取到&#xff0c;利用该版本的其他漏洞对服务器进行异常操作&#xff0c;所以需要隐藏掉。 进入tomcat安装目录 apache-tomcat-8.…

怎么在uni-app中使用Vuex 深度解刨

本文深入研究Vuex,一个Vue.js状态管理库。我们将介绍创建它是为了解决的问题、其背后的核心概念、如何设置它,当然,还将在每一步中使用代码示例。 Vuex是一个由Vue团队构建的状态管理库,用于管理Vue.js应用程序中的数据。它提供了一种集中管理跨应用程序使用的数据的方式,…

设计模式-备忘录模式(Memento)

设计模式-备忘录模式&#xff08;Memento&#xff09; 一、备忘录模式概述1.1 什么是备忘录模式1.2 简单实现备忘录模式1.3 使用备忘录模式的注意事项 二、备忘录模式的用途三、备忘录模式实现方式3.1 基于数组的备忘录实现方式3.2 基于集合的备忘录实现方式3.3 基于HashMap的备…

如何修改文件的修改时间?

如何修改文件的修改时间&#xff1f;随着当代社会科技的不断进步&#xff0c;我们对信息和数据的依赖程度与日俱增。在这个信息化时代&#xff0c;文件处理已经成为数字化办公中不可或缺的一部分。文件处理的范围非常广&#xff0c;其中有一个比较冷门的操作技巧&#xff0c;那…

《QT从基础到进阶·二十三》弹窗提示框QMessageBox和QCloseEvent事件

1、正常信息提示 QMessageBox::information(NULL, "Title", "Content", QMessageBox::Yes | QMessageBox::No, QMessageBox::Yes);消息框按钮判断&#xff1a; if(QMessageBox::Ok QMessageBox::warning(this,"温馨提示","是否保存设置?…