【flink番外篇】18、通过数据管道将table source加入datastream示例

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • 一、DataStream 和 Table集成-数据管道
    • 1、maven依赖
    • 2、Adding Table API Pipelines to DataStream API 示例


本文介绍了将table api管道加入datastream。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

更多详细内容参考文章:

21、Flink 的table API与DataStream API 集成(完整版)

一、DataStream 和 Table集成-数据管道

1、maven依赖

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-gateway</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-uber</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.12</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>3.1.2</version></dependency><!-- flink连接器 --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.24.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><!-- <scope>provided</scope> --></dependency></dependencies>

2、Adding Table API Pipelines to DataStream API 示例

单个Flink作业可以由多个相邻运行的断开连接的管道组成。

Table API中定义的Source-to-sink管道可以作为一个整体附加到StreamExecutionEnvironment,并在调用DataStream API中的某个执行方法时提交。

源不一定是table source,也可以是以前转换为Table API的另一个DataStream管道。因此,可以将 table sinks用于DataStream API程序。

通过使用StreamTableEnvironment.createStatementSet()创建的专用StreamStatementSet实例可以使用该功能。通过使用语句集,planner 可以一起优化所有添加的语句,并在调用StreamStatement set.attachAsDataStream()时提供一个或多个添加到StreamExecutionEnvironment的端到端管道( end-to-end pipelines)。

下面的示例演示如何将表程序添加到一个作业中的DataStream API程序。


import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @author alanchan**/
public class TestTablePipelinesToDataStreamDemo {/*** @param args* @throws Exception */public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);StreamStatementSet statementSet = tenv.createStatementSet();// 建立数据源TableDescriptor sourceDescriptor =TableDescriptor.forConnector("datagen").option("number-of-rows", "3").schema(Schema.newBuilder().column("myCol", DataTypes.INT()).column("myOtherCol", DataTypes.BOOLEAN()).build()).build();// 建立sinkTableDescriptor sinkDescriptor = TableDescriptor.forConnector("print").build();// add a pure Table API pipelineTable tableFromSource = tenv.from(sourceDescriptor);statementSet.add(tableFromSource.insertInto(sinkDescriptor));// use table sinks for the DataStream API pipelineDataStream<Integer> dataStream = env.fromElements(1, 2, 3);Table tableFromStream = tenv.fromDataStream(dataStream);statementSet.add(tableFromStream.insertInto(sinkDescriptor));// attach both pipelines to StreamExecutionEnvironment (the statement set will be cleared after calling this method)statementSet.attachAsDataStream();// define other DataStream API partsenv.fromElements(4, 5, 6).addSink(new DiscardingSink<>());// use DataStream API to submit the pipelinesenv.execute();//		1> +I[287849559, true]
//		+I[1]
//		+I[2]
//		+I[3]
//		3> +I[-1058230612, false]
//		2> +I[-995481497, false]}}

以上,本文介绍了将table api管道加入datastream。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/629751.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安全基础~web攻防特性1

文章目录 知识补充ASP安全Aspx安全分析与未授权访问php特性&web89~97靶场练习ctfshow 知识补充 使用thinkphp开发的框架&#xff0c;其首页访问指向public目录&#xff0c;指向其中的index.php文件 指向的index.php打开网页后是如下情况&#xff0c;代码如下 定义应用目…

计算机网络——第三层:网络层

1. IP数据报 1.1 IPV4数据报 1.1.1 IPv4数据报的结构 如图按照RFC 791规范显示了一个IPv4数据包头部的不同字段 IPv4头部通常包括以下部分&#xff1a; 1.1.1.1 版本&#xff08;Version&#xff09; 指明了IP协议的版本&#xff0c;IPv4表示为4。 1.1.1.2 头部长度&#x…

【机器学习入门】机器学习基础概念与原理

*&#xff08;本篇文章旨在帮助新手了解机器学习的基础概念和原理&#xff0c;不深入讨论算法及核心公式&#xff09; 目录 一、机器学习概念 1、什么是机器学习&#xff1f; 2、常见机器学习算法和模型 3、使用Python编程语言进行机器学习实践 4、机器学习的应用领域 二…

nxp s32k144芯片使用J-LINK程序刷写

1.nxp s32k144 (1)打开软件&#xff1a;J-Flash V6.30j (2)新建工程&#xff1a;file->new project (3)选择芯片型号和 target interface (4)可以保存芯片和接口配置 (5)打开程序&#xff1a;File->open data file &#xff08;6&#xff09;程序刷写&#xff1a;T…

探索 GitHub:高效使用技巧与实例分享

探索 GitHub&#xff1a;高效使用技巧与实例分享 前言: 欢迎来到本篇博客&#xff0c;今天我们将深入研究 GitHub 的一些高效使用技巧&#xff0c;以便更好地利用这一强大的代码托管平台。 1. GitHub 简介&#xff1a; GitHub 是全球最大的代码托管平台之一&#xff0c;它不…

Linux下安装Mysql【CentOS7 】

Linux下安装Mysql 一、Linux下安装Mysql-5.7.41【tar包下载安装】1.1.首先检查是否已经安装过mysql1.2.下载Linux版本的Mysql-5.71.3.解压缩1.4.安装执行 rpm 安装包需要先下载 openssl-devel 插件1.5.安装 Mysql5.7 执行 rpm 安装包1.6.Mysql相关操作命令1.7.查看Mysql-5.7 临…

Hadoop集群配置及测试

Hadoop集群配置及测试 NameNode与SecondaryNameNode最好不在同一服务器 ResourceManager较为消耗资源&#xff0c;因而和NameNode与SecondaryNameNode最好不在同一服务器。 配置文件 hadoop102hadoop103hadoop104HDFSNameNodeDataNodeDataNodeSecondaryNameNodeDataNodeYAR…

Pandas实战100例 | 案例 67: 布尔运算

案例 67: 布尔运算 知识点讲解 布尔运算是数据处理中的一个重要部分&#xff0c;尤其是在处理布尔&#xff08;逻辑&#xff09;数据时。Pandas 支持常见的布尔运算&#xff0c;如 AND、OR 和 XOR。 布尔运算: & (AND): 两列都为 True 时结果为 True。| (OR): 任一列为 …

快速上手的 AI 工具-文心一言

简介 最近正打得火热的AIGC概念&#xff0c;相信大家肯定也都多少接触到了&#xff0c;那么AIGC概念股到底是什么呢&#xff1f;我个人最近也看了一些平台如&#xff1a;文心一言、通义千问、讯飞星火、豆包等等&#xff01;各位朋友也千万不要错过啦&#xff0c;真是各有各的特…

VC++中使用OpenCV读取图像、读取本地视频、读取摄像头并实时显示

VC中使用OpenCV读取图像、读取本地视频、读取摄像头并实时显示 最近闲着跟着油管博主murtazahassan&#xff0c;学习了一下LEARN OPENCV C in 4 HOURS | Including 3x Projects | Computer Vision&#xff0c;对应的Github源代码地址为&#xff1a;Learn-OpenCV-cpp-in-4-Hour…

顺序表(C/C++)

本篇将讲解一些关于顺序表的内容&#xff0c;顺序表分为静态顺序表和动态顺序表&#xff0c;其中经常用到的为动态顺序表&#xff0c;所以本篇将以动态顺序表为重点给出一些关于动态顺序表的操作。 因为顺序表的实现逻辑较为简单&#xff0c;对于代码的讲解大多以注释给出。 1…

PLSQL去除一个字符串中的数字

PLSQL去除一个字符串中的数字 SQL Select regexp_replace(abc1234ABC678aaad590AAA, [0-9], ) As 去数字后From dual;效果

11.2 PCL从ROS获取激光雷达的点云数据及处理

这部分内容结合了前面的内容。其实很简单&#xff0c;分三步走就可以&#xff1a;首先是通过ROS打开激光雷达&#xff0c;查看PCL配置需要的信息。然后是用PCL通过ROS发布的topic获取激光雷达的数据。最后将ROS和PCL结合。 实现上面两步的前提是我们已经部署好了ROS环境及PCL环…

python下常用的爬虫模块

目录 一&#xff1a;requests 二&#xff1a;BeautifulSoup 三&#xff1a;Scrapy 四&#xff1a;Selenium 一&#xff1a;requests requests 是一个用于发送 HTTP 请求的 Python 库。它提供了简洁的 API 来发送各种类型的 HTTP 请求&#xff0c;如 GET、POST、PUT、DELETE…

AP8851L 宽电压降压恒压DC-DC 电源管理芯片

产品描述 AP8851L 一款宽电压范围降压型 DC-DC 电源管理芯片&#xff0c;内部集成使能 开关控制、基准电源、误差放大器、过 热保护、限流保护、短路保护等功能&#xff0c; 非常适合在宽输入电压范围具有优良 的负载和线性调整度。 AP8851L 芯片包含每周期的峰值 限流、…

云原生到底是什么意思

云原生到底是什么意思&#xff1f; 引言 随着云计算技术的迅速发展&#xff0c;云原生成为了一个备受关注的话题。云原生不仅仅是一种新的软件架构&#xff0c;更是一种变革性的开发方法论。本文将深入解析云原生的意义、特点以及为什么它在现代软件开发中变得如此重要。 云…

C++力扣题目40--组合总和II

力扣题目链接(opens new window) 给定一个数组 candidates 和一个目标数 target &#xff0c;找出 candidates 中所有可以使数字和为 target 的组合。 candidates 中的每个数字在每个组合中只能使用一次。 说明&#xff1a; 所有数字&#xff08;包括目标数&#xff09;都是…

数据仓库面试题

1 思维导图&数仓常见面试题 2 题目 1. 数据仓库是什么&#xff1f; 数据仓库是一个面向主题的&#xff08;订单、支付、退单等&#xff09;、集成的&#xff08;整合多个信息源的大量数据&#xff09;、非易失的&#xff08;一般不会进行删除和修改操作&#xff09;且随时…

低声下气捧于正,嘉行还“行”不“行”?

随着祝绪丹辞演于正新剧一事愈演愈烈&#xff0c;嘉行传媒又站上了舆论的风口浪尖。 事情的起因是于正的新剧开机&#xff0c;演员阵容和此前透露的有所差别&#xff0c;他声称明明已经与嘉行传媒艺人祝绪丹谈好合作&#xff0c;对方却为了另一部剧的女一号而放了他鸽子。 网…

【Java】十年老司机转开发语言,新小白从学习路线图开始

欢迎来到《小5讲堂》 大家好&#xff0c;我是全栈小5。 这是《Java》序列文章&#xff0c;每篇文章将以博主理解的角度展开讲解&#xff0c; 特别是针对知识点的概念进行叙说&#xff0c;大部分文章将会对这些概念进行实际例子验证&#xff0c;以此达到加深对知识点的理解和掌握…