Flink的Table API 与SQL介绍及调用

1 概述

   DataSetAPI和DateStreamAPI是基于整个Flink的运行时环境做操作处理的,Table API和SQL是在DateStreamAPI上又包了一层。对于新版本的Blink在DateStream基础上又包了一层实现了批流统一,上层执行环境都是基于流处理,做批流统一的查询。Table API是流处理和批处理通用的关系型API,与常规SQL语言中将查询指定为字符串不同,Table API查询是以Java或Scala中的语言嵌入样式来定义。 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询。

  从Flink 1.9开始,Flink为Table 和SQL API程序提供了两种不同的planner :Blink planner 和the old planner。Blink planner 和the old planner的区别如下:

  (1)批流统一:Blink将批处理作业,视为流式处理的特殊情况。所以,blink不支持表和DataSet之间的转换,批处理作业将不转换为DataSet应用程序,而是跟流处理一样,转换为DataStream程序来处理。

  (2)因为批流统一,Blink planner也不支持BatchTableSource,而使用有界的StreamTableSource代替

  (3)Blink planner只支持全新的目录,不支持已弃用的ExternalCatalog

  (4)old planner和Blink planner的FilterableTableSource实现不兼容。旧的planner会把PlannerExpressions下推到filterableTableSource中,而blink planner则会把Expressions下推

  (5)基于字符串的键值配置选项仅适用于Blink planner

  (6)PlannerConfig在两个planner中的实现不同

  (7) Blink planner会将多个sink优化在一个DAG中(仅在TableEnvironment上受支持,而在StreamTableEnvironment上不受支持)。而旧planner的优化总是将每一个sink放在一个新的DAG中,其中所有DAG彼此独立

  (8)旧的planner不支持目录统计,而Blink planner支持

  需要添加的依赖

  根据目标编程语言的不同,您需要将Java或ScalaAPI添加到项目中,以便使用TableAPI&SQL来定义管道:

<!-- Either... -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.10.0</version><scope>provided</scope>
</dependency>
<!-- or... -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.10.0</version><scope>provided</scope>
</dependency>

  您想在IDE中本地运行TableAPI&SQL程序,则必须添加以下一组模块,具体取决于使用的planner:

<!-- Either... (for the old planner that was available before Flink 1.9) -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.10.0</version><scope>provided</scope>
</dependency>
<!-- or.. (for the new Blink planner) -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.10.0</version><scope>provided</scope>
</dependency>

   在内部,表生态系统的一部分是在Scala中实现的。因此确保为批处理和流应用程序添加以下依赖项:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.10.0</version><scope>provided</scope>
</dependency>

2 Table API 和 SQL 的程序结构

  所有用于批处理和流处理的Table API和SQL程序都遵循相同的结构,与流式处理的程序结构类似

//1 创建表的执行环境
// create a TableEnvironment for specific planner batch or streaming
val tableEnv = ... // see "Create a TableEnvironment" section//2 创建表,读取数据
tableEnv.connect(...).createTemporaryTable("table1")//3 注册表,用于输出计算结果
tableEnv.connect(...).createTemporaryTable("outputTable")//4.1 通过Table API查询得到结果表
val tapiResult = tableEnv.from("table1").select(...)//4.2 通过SQL查询得到结果表
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ...")//5 将结果表写入到输出表
tapiResult.insertInto("outputTable")// execute
tableEnv.execute("scala_job")

3 创建TableEnvironment

  创建表的执行环境,需要将 flink 流处理的执行环境传入

val tableEnv = StreamTableEnvironment.create(env)

  TableEnvironment 是 flink 中集成 Table API 和 SQL 的核心概念,所有对表的操作都基于 TableEnvironment :注册 Catalog(可以认为是对表的管理的结构);在 Catalog 中注册表;执行 SQL 查询;注册用户自定义函数(UDF);转换DataStream或DataSet变成Table;保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

  在老版本Table总是绑定到特定的TableEnvironment。不可能在同一个查询中组合不同TableEnvironment的表,例如加入或合并它们。

  TableEnvironment创建通过调用静态的BatchTableEnvironment.create()或StreamTableEnvironment.create()方法的StreamExecutionEnvironment或者ExecutionEnvironment还有一个可选的TableConfig。这个TableConfig可用于配置TableEnvironment或自定义查询优化和转换过程

  如果两个planner JAR都位于类路径(默认行为)上,则应显式设置在当前程序中使用的planner。

// **********************
// FLINK STREAMING QUERY 配置老版本 planner 的流式查询
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironmentval fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)
// or val fsTableEnv = TableEnvironment.create(fsSettings)// ******************
// FLINK BATCH QUERY  配置老版本 planner 的批式查询
// ******************
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironmentval fbEnv = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv = BatchTableEnvironment.create(fbEnv)// **********************
// BLINK STREAMING QUERY  配置 blink planner 的流式查询
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironmentval bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
// or val bsTableEnv = TableEnvironment.create(bsSettings)// ******************
// BLINK BATCH QUERY  配置 blink planner 的批式查询
// ******************
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)

  注意:老版本可以在代码里面定义不同的环境,老版本处理过程中可以有批处理和流处理环境。一张批处理环境,一张流处理环境是不能做join查询的,所有针对表的操作,必须是基于同一个执行环境的。

  如果只有一个Planner/lib目录,可以使用useAnyPlanner (use_any_planner(用于python)创建特定的EnvironmentSettings.

4 在Catalog中创建表

  一个TableEnvironment维护使用标识符创建的表的Catalog的map,也就是TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。表(Table)是由一个“标识符”(identifier)来指定的,由3部分组成:Catalog名、数据库(database)名和对象名

  表可以是常规的,也可以是虚拟的视图,View。常规表(Table):一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream转换而来。视图(View):可以从现有的表中创建,通常是 table API 或者 SQL 查询的一个结果集

  表可以是临时的,并且与单个Flink会话的生命周期相关联,或者是永久的,并且跨多个Flink会话和集群可见。永久表:需要Catalog(如Hive Metastore)来维护有关表的元数据。一旦创建了永久表,它对连接到目录的任何Flink会话都是可见的,并且将继续存在,直到该表显式删除为止。临时表:总是存储在内存中,并且仅在Flink会话期间才存在。这些表在其他会话中不可见。它们不绑定到任何目录或数据库,但可以在其中的名称空间中创建。如果删除临时表的相应数据库,则不会删除它们。

4.1 创建表

4.1.1 Virtual Tables

  Table API对应于虚拟的表,它封装了一个逻辑查询计划。它可以在Catalog中创建,如下所示:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section// table is the result of a simple projection query 
val projTable: Table = tableEnv.from("X").select(...)// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable)

  注意:Table对象类似于关系数据库系统的VIEW,即定义Table未优化,当另一个查询引用已注册的Table查询时,将内联。如果多个查询引用同一个已注册的Table,它将为每个引用查询内联并多次执行,即注册的结果Table不被分享。

4.1.2 Connector Tables

  可以创建一个TABLE从关系数据库中所知道的连接器申报。连接器描述存储表数据的外部系统,可以在这里声明诸如Kafka之类的存储系统或常规文件系统。其实就是TableEnvironment 可以调用 .connect() 方法,连接外部系统,并调用 .createTemporaryTable() 方法,在 Catalog 中注册表

tableEnvironment.connect(...)          // 定义表的数据来源,与外部系统建立连接.withFormat(...)       // 定义数据格式化方法.withSchema(...)       // 定义表结构.inAppendMode().createTemporaryTable("MyTable")      // 创建临时表

  创建 Table 从文件中读取如下:

tableEnv.connect(new FileSystem().path(“YOUR_FILE_PATH”))    // 定义到文件系统的连接.withFormat(new Csv())    // 定义以csv格式进行数据格式化.withSchema(new Schema()    // 定义表结构.field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE()))    .createTemporaryTable("sensorTable")    // 创建临时表

4.2 表的标识符identifier

  Flink中表由一个identifier指定,identifier由Catalog名、数据库(database)名和对象名(表名)组成。用户可以将其中的一Catalog和一个database设置为“当前目录”和“当前数据库”。使用它们,上面提到的3部分标识符中的前两部分可以是可选的,如果不提供它们,则将引用当前目录和当前数据库。用户可以通过表API或SQL切换当前目录和当前数据库。

  标识符遵循SQL要求,这意味着可以使用回勾字符(`)。此外,必须转义所有SQL保留关键字。

Scala
// get a TableEnvironment
val tEnv: TableEnvironment = ...;
tEnv.useCatalog("custom_catalog")
tEnv.useDatabase("custom_database")val table: Table = ...;// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database' 
tableEnv.createTemporaryView("exampleView", table)// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database' 
tableEnv.createTemporaryView("other_database.exampleView", table)// register the view named 'View' in the catalog named 'custom_catalog' in the
// database named 'custom_database'. 'View' is a reserved keyword and must be escaped.  
tableEnv.createTemporaryView("`View`", table)// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database' 
tableEnv.createTemporaryView("`example.View`", table)// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database' 
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)

5 表的查询

5.1 Table API

  Table API 是集成在 Scala 和 Java 语言内的查询 API。

  Table API 基于代表“表”的 Table 类,并提供一整套操作处理的方法 API;这些方法会返回一个新的 Table 对象,表示对输入表应用转换操作的结果,有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构

  Table API 文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tableApi.html

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section// register Orders table// scan registered Orders table
val orders = tableEnv.from("Orders")
// compute revenue for all customers from France
val revenue = orders.filter('cCountry === "FRANCE").groupBy('cID, 'cName).select('cID, 'cName, 'revenue.sum AS 'revSum)// emit or convert Table
// execute query

注意:Scala Table API使用Scala符号,前面加了一个单引号’,这是Table API中定义的Expression类型的写法,可以很方便地表示一个表中的字段。字段可以直接全部用双引号引起来,也可以用半边单引号+字段名的方式。Table API使用Scala实现。确保导入org.apache.flink.api.scala._和org.apache.flink.table.api.scala._为了使用Scala隐式转换。

5.2 SQL

  Flink 的 SQL 集成,基于实现 了SQL 标准的 Apache Calcite,在 Flink 中,用常规字符串来定义 SQL 查询语句,SQL 查询的结果,也是一个新的 Table。官方Flink对流表和批处理表的SQL支持地址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/index.html

  指定查询并将结果作为Table如下:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section// register Orders table// compute revenue for all customers from France
val revenue = tableEnv.sqlQuery("""|SELECT cID, cName, SUM(revenue) AS revSum|FROM Orders|WHERE cCountry = 'FRANCE'|GROUP BY cID, cName""".stripMargin)// emit or convert Table
// execute query

  指定将其结果插入到已注册表中的更新查询如下:

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section// register "Orders" table
// register "RevenueFrance" output table// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate("""|INSERT INTO RevenueFrance|SELECT cID, cName, SUM(revenue) AS revSum|FROM Orders|WHERE cCountry = 'FRANCE'|GROUP BY cID, cName""".stripMargin)// execute query

  Table API和SQL查询可以很容易地混合在一起使用,因为两者都返回Table对象:Table API查询可以基于SQL查询返回的Table对象;可以根据Table API查询的结果定义SQL查询注册结果表在TableEnvironment并在FROM子句的SQL查询。

6 输出表

6.1 表的输出

  表的输出,是通过将数据写入 TableSink 来实现的,TableSink 是一个通用接口,可以支持不同的文件格式(如CSV、Apache Parquet、Apache Avro)、存储数据库(如如JDBC、Apache HBASE、Apache Cassandra、Elasticsearch)和消息队列(如如Apache Kafka、RabbitMQ)。

  一个batch Table只能写入BatchTableSink,而Streaming Table需要一个AppendStreamTableSink或RetractStreamTableSink或UpsertStreamTableSink。

  输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入注册过的 TableSink 中

// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section// create an output Table
val schema = new Schema().field("a", DataTypes.INT()).field("b", DataTypes.STRING()).field("c", DataTypes.LONG())tableEnv.connect(new FileSystem("/path/to/file")).withFormat(new Csv().fieldDelimiter('|').deriveSchema()).withSchema(schema).createTemporaryTable("CsvSinkTable")// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable")// execute the program

6.2 更新模式

  对于流式查询,需要声明如何在表和外部连接器之间执行转换,与外部系统交换的消息类型,由更新模式(Update Mode)指定。有追加(Append),撤回(Retract),更新插入(Upsert)三种模式

  追加(Append)模式:表只做插入操作,和外部连接器只交换插入(Insert)消息。以前发出的结果永远不会更新,如果更新或删除操作使用追加模式会失败报错。

  撤回(Retract)模式:表和外部连接器交换添加(Add)和撤回(Retract)消息,插入操作(Insert)编码为 Add 消息;删除(Delete)编码为 Retract 消息;更新(Update)编码为上一条的 Retract 和下一条的 Add 消息。返回值是boolean类型。它用true或false来标记数据的插入和撤回,返回true代表数据插入,false代表数据的撤回。

  更新插入(Upsert)模式:更新和插入都被编码为 Upsert 消息;删除编码为 Delete 消息

  撤回(Retract)和更新插入(Upsert)的区别:

  输出到文件如下:Retract不能定义key,这一点跟upsert模式完全不同;Update操作需要一个唯一的key,通过这个key可以传递更新消息。为了正确应用消息,外部连接器需要知道这个唯一key的属性,是用单个消息编码的,所以效率会更高。

6.3 输出到文件

tableEnv.connect(new FileSystem().path("/path/to/file") // 定义到文件系统的连接.withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("temp", DataTypes.Double())) .createTemporaryTable("outputTable")     // 创建临时表resultTable.insertInto("outputTable")    // 输出表

6.4 输出到Kafka

  可以创建 Table 来描述 kafka 中的数据,作为输入或输出的 TableSink

tableEnv.connect(new Kafka().version("0.11").topic("sinkTest").property("zookeeper.connect", "localhost:2181").property("bootstrap.servers", "localhost:9092")
).withFormat( new Csv() ).withSchema( new Schema().field("id", DataTypes.STRING()).field("temp", DataTypes.DOUBLE())).createTemporaryTable("kafkaOutputTable")resultTable.insertInto("kafkaOutputTable")

6.5 输出到ElasticSearch

  ElasticSearch的connector可以在upsert(update+insert,更新插入)模式下操作,这样就可以使用Query定义的键(key)与外部系统交换UPSERT/DELETE消息。对于“仅追加”(append-only)的查询,connector还可以在append 模式下操作,这样就可以与外部系统只交换insert消息。es目前支持的数据格式,只有Json,而flink本身并没有对应的支持,所以还需要引入依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.10.0</version>
</dependency>
tableEnv.connect(new Elasticsearch().version("6").host("localhost", 9200, "http").index("test").documentType("temp")
).inUpsertMode()           // 指定是 Upsert 模式.withFormat(new Json()).withSchema( new Schema().field("id", DataTypes.STRING()).field("count", DataTypes.BIGINT())).createTemporaryTable("esOutputTable")aggResultTable.insertInto("esOutputTable")

6.6 输出到MySql

  Flink专门为Table API的jdbc连接提供了flink-jdbc连接器,我们需要先引入依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.10.0</version>
</dependency>

  jdbc连接的代码实现比较特殊,因为没有对应的java/scala类实现ConnectorDescriptor,所以不能直接tableEnv.connect()。不过Flink SQL留下了执行DDL的接口:tableEnv.sqlUpdate()

val sinkDDL: String ="""|create table jdbcOutputTable (|  id varchar(20) not null,|  cnt bigint not null|) with (|  'connector.type' = 'jdbc',|  'connector.url' = 'jdbc:mysql://localhost:3306/test',|  'connector.table' = 'sensor_count',|  'connector.driver' = 'com.mysql.jdbc.Driver',|  'connector.username' = 'root',|  'connector.password' = '123456'|)""".stripMargintableEnv.sqlUpdate(sinkDDL)
aggResultSqlTable.insertInto("jdbcOutputTable")

7 Query的解释和执行

  Table API提供了一种机制来解释(Explain)计算表的逻辑和优化查询计划。这是通过TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成的。

  explain方法会返回一个字符串,描述三个计划:①未优化的逻辑查询计划②优化后的逻辑查询计划③实际执行计划

  查看执行计划如下

val explaination: String = tableEnv.explain(resultTable)
println(explaination)

  Query的解释和执行过程,老planner和blink planner是不一样的。整体来讲,Query都会表示成一个逻辑查询计划,然后分两步解释:①优化查询计划②解释成 DataStream 或者 DataSet程序

  而Blink版本是批流统一的,所以所有的Query,只会被解释成DataStream程序;另外在批处理环境TableEnvironment下,Blink版本要到tableEnv.execute()执行调用才开始解释。

8 Table与DataStream,DataSet的集成

  两种planners都可以与DataStream API集成,只有老的planner才能与DataSet API集成。Scala Table API提供了DataSet、DataStream和Table的隐式转换,通过导入org.apache.flink.table.api.scala._

8.1 从DataStream或DataSet创建视图

  DataStream或DataSet可以在TableEnvironment作为一个视图,只能注册为临时视图

// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" sectionval stream: DataStream[(Long, String)] = ...// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream)// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, 'myLong, 'myString)

8.2 将DataStream或DataSet转换成表

  DataStream或DataSet可以直接转换为Table,而不是注册在TableEnvironment。

// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = ... // see "Create a TableEnvironment" sectionval stream: DataStream[(Long, String)] = ...// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

  可以基于一个DataStream,先流式地读取数据源,然后map成样例类,再把它转成Table。Table的列字段(column fields),就是样例类里的字段,这样就不用再定义schema。

  代码中实现非常简单,直接用tableEnv.fromDataStream()就可以了。默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。

  这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次map操作(或者Table API的 select操作)

case class WC(id: String, timestamp: Long, count: Double)val inputStream: DataStream[String] = env.readTextFile("/path/to/file")
val dataStream: DataStream[WC] = inputStream.map(data => {val dataArray = data.split(",")SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)})val wcTable: Table = tableEnv.fromDataStream(dataStream)

  数据类型与 Table schema的对应:上面DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照样例类中的字段名来对应的(name-based mapping),所以还可以用as做重命名。另外一种对应方式是,直接按照字段的位置来对应(position-based mapping),对应的过程中,就可以直接指定新的字段名了。

  基于名称的对应:

val wcTable = tableEnv.fromDataStream(dataStream, 'timestamp as 'ts, 'id as 'myId, 'temperature)

  基于位置的对应:

val wcTable = tableEnv.fromDataStream(dataStream, 'myId, 'ts)

  Flink的DataStream和 DataSet API支持多种类型。组合类型,比如元组(内置Scala和Java元组)、POJO、Scala case类和Flink的Row类型等,允许具有多个字段的嵌套数据结构,这些字段可以在Table的表达式中访问。其他类型,则被视为原子类型。

  元组类型和原子类型,一般用位置对应会好一些;用名称对应的话:元组类型,默认的名称是 “_1”, “_2”;而原子类型,默认名称是 ”f0”。

8.3 将表转换成DataStream

  表可以转换为DataStream或DataSet。这样,自定义流处理或批处理程序就可以继续在 Table API或SQL查询的结果上运行了。

  将表转换为DataStream或DataSet时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型。通常,最方便的转换类型就是Row。当然,因为结果的所有字段类型都是明确的,我们也经常会用元组类型来表示。

  表作为流式查询的结果,是动态更新的。所以,将这种动态查询转换成的数据流,同样需要对表的更新操作进行编码,进而有不同的转换模式。Table API中表到DataStream有两种模式:①追加模式(Append Mode):用于表只会被插入(Insert)操作更改的场景②撤回模式(Retract Mode):用于任何场景。有些类似于更新模式中Retract模式,它只有Insert和Delete两类操作。得到的数据会增加一个Boolean类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(老数据, Delete)。

// get TableEnvironment. 
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section// Table with two fields (String name, Integer age)
val table: Table = ...// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream[(String, Int)](table)// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream[(Boolean, X)]. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

  注意:一般没有经过groupby之类聚合操作,可以直接用 toAppendStream 来转换;而如果经过了聚合,有更新操作,一般就必须用 toRetractDstream

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473543.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python编程中一些异常处理的小技巧

编程中经常会需要使用到异常处理的情况&#xff0c;在阅读了一些资料后&#xff0c;整理了关于异常处理的一些小技巧记录如下。 1 如何自定义异常 1.1 定义异常类 在实际编程中&#xff0c;有时会发现Python提供的内建异常的不够用&#xff0c;我们需要在特殊业务场景下的异常…

Flink的Table API 与SQL的流处理

1 流处理与SQL的区别 Table API和SQL&#xff0c;本质上还是基于关系型表的操作方式&#xff1b;而关系型表、SQL本身&#xff0c;一般是有界的&#xff0c;更适合批处理的场景。所以在流处理的过程中&#xff0c;有一些特殊概念。 SQL流处理处理对象字段元组的有界集合字段元…

LeetCode 833. 字符串中的查找与替换(排序,replace)

文章目录1. 题目2. 解题1. 题目 某个字符串 S 需要执行一些替换操作&#xff0c;用新的字母组替换原有的字母组&#xff08;不一定大小相同&#xff09;。 每个替换操作具有 3 个参数&#xff1a;起始索引 i&#xff0c;源字 x 和目标字 y。 规则是&#xff1a;如果 x 从原始…

Json.NET

我前面的一篇博客 Metro应用Json数据处理 介绍了如何使用 DataContractJsonSerializer 类将对象的实例序列化为JSON字符串以及将JSON字符串反序列化为对象的实例的处理方式。而此种处理方式的一个很大的缺点就是要求JSON字符串格式是约定好的&#xff0c;而在很多情况下我们无法…

MySQL如何跨机器迁移数据?

经常会遇到如此需求&#xff0c;需把A主机上的MySQL数据库所有迁移到B主机上&#xff0c;或者部分数据库&#xff0c;所以接下来将介绍迁移所有数据库和迁移单个数据库时的数据迁移步骤。 1 实验环境 A主机&#xff08;源主机&#xff09;&#xff1a; IP地址&#xff1a;19…

ClickHouse的特性及读写

1 ClickHouse特性 OLAP数据库一般有2个要求&#xff1a;①容量要比关系型数据库大&#xff0c;②在线查询的速度要快。ClickHouse这两点都满足并且还支持标准的sql&#xff0c;支持比较复杂的语句&#xff0c;支持分布式。ClickHouse的几个显著特点如下&#xff1a; &#xff0…

天池 在线编程 最大得分(DP)

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744275 2. 解题 class Solution { public:/*** param matrix: the matrix* return: the maximum score you can get*/int maximumScore(vector<vector<i…

imagick用法!

https://coderwall.com/p/9hj97w sudo apt-get install imagemagick sudo apt-get install php5-imagick sudo service apache2 restart 使用imagick类&#xff1a; http://www.wodezhan.cn/?p15转载于:https://www.cnblogs.com/vincedotnet/p/3592957.html

天池 在线编程 LR String

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744276 2. 解题 class Solution { public:/*** param s: a string* param t: a string* param n: max times to swap a l and a r.* return: return if s can …

Python中如何在一行里获取多个异常

我知道这样&#xff1a; try:# 可能错的地方 except:# 如果错了执行这里也知道这样&#xff1a; try:# 可能错的地方 except IDontLikeYourFaceException:# 给爷笑一个 except YouAreTooShortException:# 踩高跷但是我想在两个不同的异常里做同样的事&#xff0c;我能想到的办法…

DolphinScheduler对比Airflow

DolphinSchedulerAirFlow稳定性单点故障去中心化的多Master和多Worke是&#xff08;单一调度程序&#xff09;HA额外要求不需要(本身就支持HA)Celery / Dask / Mesos Load Balancer DB过载处理任务队列机制&#xff0c;单个机器上可调度的任务数量可以灵活配置&#xff0c;当…

Python中字符串格式化:%和format

Python2.6推出了[str.format()]方法&#xff0c;和原有的%格式化方式有小小的区别。那个方法更好&#xff1f; 下面的方法有同样的输出&#xff0c;它们的区别是什么&#xff1f; #!/usr/bin/pythonsub1 "python string!"sub2 "an arg"a "i am a …

jsAutomation 服务器不能创建对象(转)

var ExApp new ActiveXObject("Excel.Application") “automation服务器不能创建对象”的问题的解决方案大全本人工作中的应用系统都是jsp的&#xff0c;大量javascript程序&#xff0c;一旦出“automation服务器不能创建对象”问题&#xff0c;大量报表及查询无法保…

天池 在线编程 音乐组合

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164423301311799378/184808348725744274 2. 解题 对60求余后&#xff0c;0, 30的为 Cn2C_n^2Cn2​&#xff0c;其余的相加等于60的&#xff0c;种类相乘 class Solution { public:/*** param …

java之NIO(Channel,Buffer,Selector)

java之NIO 1 什么是NIO Java NIO (New IO&#xff0c;Non-Blocking IO)是从Java 1.4版本开始引入的一套新的IO API。NIO支持面向缓冲区的、基于通道的IO操作。NIO的三大核心部分&#xff1a;通道(Channel)&#xff0c;缓冲区(Buffer), 选择器(Selector)&#xff0c;数据总是从…

Python中对象名称前单下划线和双下划线有啥区别

单下划线 在一个类中的方法或属性用单下划线开头就是告诉别的程序这个属性或方法是私有的。然而对于这个名字来说并没有什么特别的。 引自PEP-8&#xff1a; 单下划线&#xff1a;"内部使用"的弱指示器。比如&#xff0c;from M import * 将不会引进用但下划线开头的…

JQUERY解析XML IE8的兼容问题

var str"xml字符串"; alert($(str).find("Row").attr("Id")); 在IE8下&#xff0c;这段脚本无法运行&#xff0c;&#xff0c;而在IE9以上的版是正常的 IE8浏览器只能强制把字符串转成XML ajaxfxml new ActiveXObject("Microsoft.XMLDOM&q…

LeetCode 1652. 拆炸弹(前缀和)

文章目录1. 题目2. 解题1. 题目 你有一个炸弹需要拆除&#xff0c;时间紧迫&#xff01;你的情报员会给你一个长度为 n 的 循环 数组 code 以及一个密钥 k 。 为了获得正确的密码&#xff0c;你需要替换掉每一个数字。所有数字会 同时 被替换。 如果 k > 0 &#xff0c;将…

Hadoop DistCp工具简介及其参数

1 概述 DistCp&#xff08;分布式拷贝&#xff09;是用于大规模集群内部和集群之间拷贝的工具。 它使用Map/Reduce实现文件分发&#xff0c;错误处理和恢复&#xff0c;以及报告生成。 它把文件和目录的列表作为map任务的输入&#xff0c;每个任务会完成源列表中部分文件的拷贝…

MYSQL从入门到精通

SQL是数据库的查询语言&#xff0c;语法结构简单&#xff0c;相信本文会让你从入门到熟练。 掌握SQL后&#xff0c;不论你是产品经理、运营人员或者数据分析师&#xff0c;都会让你分析的能力边界无限拓展。别犹豫了&#xff0c;赶快上车吧&#xff01; SQL最小化的查询结构如下…