【flink番外篇】16、DataStream 和 Table 相互转换示例

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、DataStream 和 Table 相互转换示例
    • 1、maven依赖
    • 2、 DataStream 和 Table 相互转换示例
      • 1)、示例1 - toDataStream
      • 2)、示例2 - toChangelogStream
      • 3)、示例3 - 通过仅切换标志来处理批处理和流数据


本文简单的介绍了DataStream 和 Table 的相互转换及示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

更多详细内容参考文章:

21、Flink 的table API与DataStream API 集成(完整版)

一、DataStream 和 Table 相互转换示例

1、maven依赖

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-gateway</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-uber</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.12</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>3.1.2</version></dependency><!-- flink连接器 --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.24.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><!-- <scope>provided</scope> --></dependency></dependencies>

2、 DataStream 和 Table 相互转换示例

Flink提供了专门的StreamTableEnvironment,用于与DataStream API集成。这些环境使用其他方法扩展常规TableEnvironment,并将DataStream API中使用的StreamExecutionEnvironments作为参数。

1)、示例1 - toDataStream

下面的代码展示了如何在两个API之间来回切换的示例。

表的列名和类型自动从DataStream的TypeInformation派生。

由于DataStream API本机不支持变更日志处理,因此代码假设在流到表和表到流转换期间仅附加/仅插入语义。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** @author alanchan**/
public class ConvertingDataStreamAndTableDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 1、创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 2、创建输入流DataStream<String> dataStream = env.fromElements("alan", "alanchan", "alanchanchn");// 3、将datastream 转为 tableTable inputTable = tenv.fromDataStream(dataStream);// 4、创建视图,该步骤不是必须,将姓名转为大写tenv.createTemporaryView("InputTable", inputTable);Table resultTable = tenv.sqlQuery("SELECT UPPER(f0) FROM InputTable");// 5、将table转成datastream进行输出DataStream<Row> resultStream = tenv.toDataStream(resultTable);resultStream.print();env.execute();}}
  • 示例输出
12> +I[ALAN]
14> +I[ALANCHANCHN]
13> +I[ALANCHAN]

fromDataStream和toDataStream的完整语义可以在下面的部分中找到。它还包括使用事件时间和水印。

根据查询的类型,在许多情况下,生成的动态表是一个管道,它不仅在将表转换为数据流时产生仅插入的更改,而且还产生收回和其他类型的更新。在表到流转换期间,这可能会导致类似于以下内容的异常

Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].

在这种情况下,需要再次修改查询或切换到ChangelogStream。

2)、示例2 - toChangelogStream

下面的示例显示如何转换更新表。

每个结果行表示更改日志中的一个条目,该条目具有更改标志,可以通过对其调用row.getKind()来查询。

在本例中,alan的第二个分数在更改之前(-U)创建更新,在更改之后(+U)创建更新。

本示例仅仅以一个方法来展示,避免没有必要的代码,运行框架参考上述示例。

	public static void test2() throws Exception {// 1、创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 2、创建输入流DataStream<Row> dataStream = env.fromElements(Row.of("alan", 18), Row.of("alanchan", 19), Row.of("alanchanchn", 20), Row.of("alan", 20));// 3、将datastream 转为 tableTable inputTable = tenv.fromDataStream(dataStream).as("name", "salary");// 4、创建视图,该步骤不是必须tenv.createTemporaryView("InputTable", inputTable);Table resultTable = tenv.sqlQuery("SELECT name, SUM(salary) FROM InputTable GROUP BY name");// 5、将table转成datastream进行输出DataStream<Row> resultStream = tenv.toChangelogStream(resultTable);resultStream.print();env.execute();}
  • 运行结果
2> +I[alan, 18]
16> +I[alanchan, 19]
16> +I[alanchanchn, 20]
2> -U[alan, 18]
2> +U[alan, 38]

fromChangelogStream和toChangelogStream的完整语义可以在下面的部分中找到。它包括使用事件时间和水印。它讨论了如何为输入和输出流声明主键和变更日志模式。

上面的示例显示了如何通过为每个传入记录连续发出逐行更新来增量计算最终结果。然而,在输入流有限(即有界)的情况下,通过利用批处理原理可以更有效地计算结果。

在批处理中,可以在连续的阶段中执行运算符,这些阶段在发出结果之前使用整个输入表。

例如,连接操作符可以在执行实际连接之前对两个有界输入进行排序(即排序合并连接算法),或者在使用另一个输入之前从一个输入构建哈希表(即哈希连接算法的构建/探测阶段)。

DataStream API和Table API都提供专门的批处理运行时模式。

3)、示例3 - 通过仅切换标志来处理批处理和流数据

下面的示例说明了统一管道能够通过仅切换标志来处理批处理和流数据。

public static void test3() throws Exception {// 1、创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.BATCH);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 2、创建输入流DataStream<Row> dataStream = env.fromElements(Row.of("alan", 18), Row.of("alanchan", 19), Row.of("alanchanchn", 20), Row.of("alan", 20));// 3、将datastream 转为 tableTable inputTable = tenv.fromDataStream(dataStream).as("name", "salary");// 4、创建视图,该步骤不是必须tenv.createTemporaryView("InputTable", inputTable);Table resultTable = tenv.sqlQuery("SELECT name, SUM(salary) FROM InputTable GROUP BY name");// 5、将table转成datastream进行输出DataStream<Row> resultStream = tenv.toChangelogStream(resultTable);resultStream.print();env.execute();}
  • 运行结果

注意比较和示例2的输出区别

+I[alanchan, 19]
+I[alan, 38]
+I[alanchanchn, 20]

一旦将changelog 应用于外部系统(例如键值存储),可以看到两种模式都能够产生完全相同的输出表。通过在发出结果之前使用所有输入数据,批处理模式的更改日志仅由仅插入的更改组成。

以上,本文简单的介绍了DataStream 和 Table 的相互转换及示例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/621612.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

win10使用debug,汇编初学

DOSBox挂载Debug.exe 双击 DOSBox Options.bat 打开配置 或者执行cmd DOSBox.exe -editconf notepad.exe -editconf %SystemRoot%\system32\notepad.exe -editconf %WINDIR%\notepad.exe最后一行增加 mount [盘符] [挂载的工作目录&#xff08;debug.exe文件夹位置&#xff…

Pandas实战100例 | 案例 17: 处理重复数据 - 删除重复行

案例 17: 处理重复数据 - 删除重复行 知识点讲解 在数据分析过程中&#xff0c;处理重复的记录是一个常见的任务。Pandas 提供了方便的方法来删除重复行&#xff0c;保证数据的准确性和可靠性。 删除所有列重复的行: 使用 drop_duplicates() 方法可以删除 DataFrame 中所有列…

2023年终总结,一路向阳待花期

回望2023&#xff0c;可谓“苦尽甘来终有时&#xff0c;一路向阳待花期”。这一年&#xff0c;经历很多&#xff0c;收获亦很多。 回望2023 2023年最重要的三件事&#xff0c;想聊聊买房、工作、自我提升。 买房&#xff1a; 众所众知&#xff0c;2023楼市整体的情况不甚乐…

计算机三级(网络技术)——综合题(Sniffer抓包分析)

考点内容&#xff1a; DNS域名解析TCP三次握手FTP(文件传输协议)ICMP(Internet控制报文协议)&#xff1a;ping、tracertHTTP(超文本传输协议)&#xff1a;get、post命令 例题一 下图是校园网某台主机在命令行模式下执行某个命令时用sniffer捕获的数据包。 抓包分析 5~8行为…

使用JGit拉取代码提示未授权not authorized

原因&#xff1a;2021年8月13日后不支持密码登录&#xff0c;需要使用token验证 调用时候需要先去git仓库创建个人令牌 需要在安全中心创建个人token&#xff0c;使用token名称作为账号&#xff0c;使用token作为密码。 另&#xff1a; Github克隆仓库的三种方式对比&#xff…

x-cmd pkg | qrencode - 二维码生成工具

目录 简介首次用户功能特点竞品和相关作品进一步阅读 简介 qrencode 是一个用于生成二维码的命令行工具。它可以将文本、URL、电话号码等信息转换为二维码图像。生成的二维码图像可以保存为图片文件&#xff0c;方便在电子文档、网页、移动应用等各种场景中使用。 它支持的二维…

Redis集群(主从复制)

主从复制&#xff1a;是指将一台 Redis 服务器的数据&#xff0c;复制到其他的 Redis 服务器。 前者称为主节点(master)&#xff0c;后者称为从节点(slave),数据的复制是单向的&#xff0c;只能 由主节点到从节点。 可以实现数据备份。即使当其中一台机器宕机其他机器还可以正…

arcgis javascript api4.x加载天地图web墨卡托(wkid:3857)坐标系

效果&#xff1a; 示例代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta http-equiv&quo…

一、MySQL 卸载

目录 1、软件的卸载准备 2、软件的卸载 方式一&#xff1a;通过控制面板卸载 方式二&#xff1a;通过mysql8的安装向导卸载 1、双击mysql8的安装向导 2、取消更新 3、选择要卸载的mysql服务器软件的具体版本 4、确认删除数据目录 5、执行删除 6、完成删除 3、清理残…

ssm基于web办事大厅政务预约系统+vue论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本办事大厅政务预约系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据…

【OpenMP】 2.3 并行化循环

目录 1、for循环 2、规约 3、for循环中的调度&#xff08;schedule API&#xff09; 3.1 静态调度(static) 3.2 动态调度(dynamic) 调度的选择 1、for循环 前面的示例中&#xff0c;通过创建一组线程并通过线程ID与线程数来人为的定义每个线程需要处理的数据&#xff0c;…

【GitHub项目推荐--6 个吊炸天的后台模板】【转载】

很多程序员都有过接私活的经历&#xff0c;帮别人开发一个网站&#xff1f;写个软件&#xff1f;不少网站都要有一个后台管理系统&#xff0c;而后台管理系统大多数情况下仅仅是管理员在使用&#xff0c;所以不像前台那样需要去定制设计优美的 UI。 一套既美观又方便的后台框架…

国家注册信息安全专业人员十五类CISP证书

国家注册信息安全专业人员&#xff08;Certified Information Security Professiona&#xff0c;简称CISP&#xff09;&#xff0c;是面向党政机关、关键信息基础设施运营单位、各类企事业单位和社会组织以及网络与信息安全企业、测评和咨询服务机构等工作的信息安全人员颁发的…

第 4 课 创建工作空间与功能包

文章目录 第 4 课 创建工作空间与功能包1.工作环境的创建2.ROS功能包的创建 第 4 课 创建工作空间与功能包 消息和服务的创建、发布器和订阅器的编写、服务端和客户端的编写都是基于Ros功能包进行操作的&#xff0c;因此在进行上述操作前&#xff0c;需要先创建工作空间及功能包…

注释的魔力:HTML、JS/jQuery和CSS中的单行与多行注释

HTML注释&#xff1a; 在HTML中&#xff0c;我们使用<!--和-->来创建单行注释。例如&#xff1a; <!-- 这是单行注释 -->而多行注释也类似例如&#xff1a; <!DOCTYPE html> <html><!--这是多行注释这是多行注释这是多行注释--> </html>…

【信息论安全】:信源编码定理

一. 介绍 在点对点的通信中&#xff0c;信源编码定理&#xff08;source coding theorem&#xff09;满足可达性和可逆性。当信道是无噪声时&#xff0c;那么YX&#xff0c;这时就不需要信道编码。但是&#xff0c;信源编码依旧是有效的&#xff0c;可以提高数据传输效率&…

iOS swift UISlider改变进度条的高度和圆形滑块的大小

文章目录 1.改变进度条的高度&#xff08;亲测有效&#xff09;2.改变圆形滑块的大小&#xff08;亲测有效&#xff09; 1.改变进度条的高度&#xff08;亲测有效&#xff09; import UIKitclass CustomSlider: UISlider {// 设置轨道高度var trackHeight: CGFloat 10// 重写…

Navicat 16 for MySQL:打造高效数据库开发管理工具

随着数据的快速增长和复杂性的提升&#xff0c;数据库成为了现代应用开发中不可或缺的一部分。而在MySQL数据库领域&#xff0c;Navicat 16 for MySQL作为一款强大的数据库开发管理工具&#xff0c;正受到越来越多开发者的青睐。 Navicat 16 for MySQL拥有丰富的功能和直观的界…

Jenkins-Maven Git

整合Maven 安装GIT #更新yum sudo yum update #安装git yum install git 安装Maven插件,在插件管理中心&#xff1a; 配置仓库 配置密码认证 我们可以在这个目录下看到Jenkins 帮我们拉取了代码 /env/liyong/data/docker/jenkins_mount/workspace/maven-job 配置maven打包…

[数据结构与算法]数据结构基础、排序算法详解、算法思想详解、领域算法详解------

# 数据结构基础 学习思路 避免孤立的学习知识点&#xff0c;要关联学习。比如实际应用当中&#xff0c;我们经常使用的是查找和排序操作&#xff0c;这在我们的各种管理系统、数据库系统、操作系统等当中&#xff0c;十分常用&#xff0c;我们通过这个线索将知识点串联起来&am…