Flink流批一体计算(20):DataStream API和Table API互转

目录

举个例子

连接器

下载连接器(connector)和格式(format)jar 包

依赖管理

 如何使用连接器


举个例子

StreamExecutionEnvironment集成了DataStream API,通过额外的函数扩展了TableEnvironment。

下面代码演示两种API如何互转

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.common.typeinfo import Typesenv = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# create a DataStream
ds = env.from_collection(["Alice", "Bob", "John"], Types.STRING())# interpret the insert-only DataStream as a Table
t = t_env.from_data_stream(ds)# register the Table object as a view and query it
t_env.create_temporary_view("InputTable", t)
res_table = t_env.sql_query("SELECT UPPER(f0) FROM InputTable")# interpret the insert-only Table as a DataStream again
res_ds = t_env.to_data_stream(res_table)# add a printing sink and execute in DataStream API
res_ds.print()env.execute()

TableEnvironment将采用StreamExecutionEnvironment所有的配置选项。

建议,在转换为Table API之前,设置DataStream API的所有配置选项,如下代码。

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.datastream.checkpointing_mode import CheckpointingMode# create Python DataStream API
env = StreamExecutionEnvironment.get_execution_environment()# set various configuration early
env.set_max_parallelism(256)env.get_config().add_default_kryo_serializer("type_class_name", "serializer_class_name")
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)# then switch to Python Table API
t_env = StreamTableEnvironment.create(env)
# set configuration early
t_env.get_config().set_local_timezone("Europe/Berlin")# start defining your pipelines in both APIs...

连接器

下载连接器(connector)和格式(format)jar 包

由于Flink是一个基于 Java/Scala 的项目,连接器(connector)和格式(format)的实现是作为 jar 包存在的, 要在 PyFlink 作业中使用,首先需要将其指定为作业的依赖。

如果使用第三方JAR,可以在Python Table API中指定JAR,如下所示:

table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")

or

table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")

依赖管理

需要在Python API程序中使用依赖项。例如,Python用户自定义函数中使用第三方Python库。此外,在用机器学习模型预测等场景中,用户可能希望在Python自定义函数中加载机器学习模型。

当PyFlink作业在本地执行时,可以将第三方Python库安装到本地Python环境中,将机器学习模型下载到本地,等等。

然而,当用户想要将PyFlink任务提交到远程集群时,这种方法并不奏效。

除了Table API, 在Python DataStream API中则如下配置:

stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")
stream_execution_environment.add_jars("file:///E:/my/jar/path/connector1.jar", "file:///E:/my/jar/path/connector2.jar")

# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the

# URLs are accessible on both the client and the cluster.

stream_execution_environment.add_classpaths("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")

 如何使用连接器

在 PyFlink Table API 中,DDL 是定义 source 和 sink 比较推荐的方式,这可以通过 TableEnvironment 中的 execute_sql() 方法来完成,然后就可以在作业中使用这张表了。

--下面是如何在 PyFlink 中使用 Kafka source/sink 和 JSON 格式的完整示例。

from pyflink.table import TableEnvironment, Environmentsettingsdef log_processing():env_settings = Environmentsettings.in_streaming_mode()t_env = TableEnvironment.create(env_settings)# specify connector and format jarst_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")source_ddl = """CREATE TABLE source_table(a VARCHAR,b INT) WITH ('connector' = 'kafka','topic' = 'source_topic','properties.bootstrap.servers' = 'kafka:9092','properties.group.id' = 'test_3','scan.startup.mode' = 'latest-offset','format' = 'json')"""sink_ddl = """CREATE TABLE sink_table(a VARCHAR) WITH ('connector' = 'kafka','topic' = 'sink_topic','properties.bootstrap.servers' = 'kafka:9092','format' = 'json')"""t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)t_env.sql_query("SELECT a FROM source_table") \.execute_insert("sink_table").wait()if __name__ == '__main__':log_processing()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/63474.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

B081-Lucene+ElasticSearch

目录 认识全文检索概念lucene原理全文检索的特点常见的全文检索方案 Lucene创建索引导包分析图代码 搜索索引分析图代码 ElasticSearch认识ElasticSearchES与Kibana的安装及使用说明ES相关概念理解和简单增删改查ES查询DSL查询DSL过滤 分词器IK分词器安装测试分词器 文档映射(字…

Windows NUMA编程实践 – 处理器组、组亲和性、处理器亲和性及版本变化

Windows在设计之初没有考虑过对大数量的多CPU和NUMA架构的设备的支持,大部分关于CPU的设计按照64个为上限来设计。核心数越来越多的多核处理器的进入市场使得微软不得不做较大的改动来进行支持,因此Windows 的进程、线程和NUMA API在各个版本中行为不一样…

Vue:关于声明式导航中的 跳转、高亮、以及两个类名的定制

声明式导航-导航链接 文章目录 声明式导航-导航链接router-link的两大特点(能跳转、能高亮)声明式导航-两个类名定制两个高亮类名 实现导航高亮,实现方式其实,css,JavaScript , Vue ,都可以实现。其实关于路由导航&…

AndroidStudio推荐下载和配置

1、推荐下载链接 Download Android Studio & App Tools - Android Developers 2、gradle配置案例 // Top-level build file where you can add configuration options common to all sub-projects/modules.buildscript {repositories {maven { url https://maven.aliyun.…

软考知识相关

1、计算机组成结构 计算机组成结构是指计算机系统的各个组成部分以及它们之间的相互关系和功能。通常,计算机组成结构可以分为以下主要部分: 中央处理单元(CPU): CPU 是计算机的大脑,负责执行程序指令和处理…

【计算机组成 课程笔记】3.2 算数运算和逻辑运算的硬件实现

课程链接: 计算机组成_北京大学_中国大学MOOC(慕课) 3 - 2 - 302-门电路的基本原理(11-39--)_哔哩哔哩_bilibili 现代计算机的CPU和其他很多功能部件都是基于晶体管的集成电路,想要了解计算机组成的基本原理,还是需要有…

苹果macOS 14开发者预览版Beta 7发布 新增超过100款视频壁纸和屏保

8 月 31 日,苹果向 Mac 电脑用户推送了 macOS 14 开发者预览版 Beta 7 更新(内部版本号:23A5337a),本次更新距离上次发布隔了 8 天。 苹果发布 Beta 7 更新的同时,还发布了第 6 个公测版,正式版…

【UIPickerView-UIDatePicker-应用程序对象 Objective-C语言】

一、今天我们来学习三个东西 1.UIPickerView-UIDatePicker-应用程序对象 1.首先,来看数据选择控件 数据选择控件, 大家对这个数据选择控件,是怎么理解的, 1)数据选择控件,首先,是不是得有数据, 2)然后呢,你还得让用户能够选择, 3)最后,你还得是一个控件儿 那…

【博客705】chatgpt:编写日志rotate框架

chatgpt:编写日志rotate框架 场景 我们的网关服务等为了持久化日志以供排查问题,往往将日志输出到文件,此时如果文件太大,可能导致磁盘被写满,此时就需要对日志文件进行rotate,以保存最新的日志 实现 pac…

IP子网的划分

文章目录 一、子网掩码1. 产生背景2. 定义3. 分类 二、VLSM算法1. 得出下列参数2. 计算划分结果3. 举例子计算 三、常见子网划分对应关系四、练习IP编址题目需求解题1. 192.168.1.100/282. 172.16.0.58/263. 25.83.149.222/254. 100.100.243.18/205. 10.100.100.100/10 首先可以…

docker基本命令记录

Docker 是一个开源的容器技术,它允许开发人员将应用程序及其所有依赖项打包到一个容器中,然后轻松地在任何地方部署和运行。以下是 Docker 的一些基本操作: 基础操作: 启动 Docker:service docker start停止 Docker:service docker stop查看 Docker 信息:docker info容器操作…

代码随想录笔记--栈与队列篇

目录 1--用栈实现队列 2--用队列实现栈 3--有效的括号 4--删除字符串中的所有相邻重复项 5--逆波兰表达式求值 6--滑动窗口的最大值 7--前k个高频元素 1--用栈实现队列 利用两个栈&#xff0c;一个是输入栈&#xff0c;另一个是输出栈&#xff1b; #include <iostrea…

NodeJS的简介以及下载和安装

本章节会带大家下载并安装NodeJs 以及简单的入门&#xff0c;配有超详细的图片&#xff0c;一步步带大家进行下载与安装 NodeJs简介关于前端与后端Node是什么&#xff1f;为什么要学习NodeNodeJS的优点&#xff1a; NodeJS的下载与安装NodeJS的下载&#xff1a; NodeJS的快速入…

剑指 Offer 49. 丑数(C++实现)

剑指 Offer 49. 丑数https://leetcode.cn/problems/chou-shu-lcof/ 对每个丑数 分别乘2、乘3、乘5 即可得到后续丑数 其中只需要对计算出来的丑数结果进行去重即可 int nthUglyNumber(int n) {// base caseif (n < 1){return -1;}if (n 1){return 1;}vector<int> res…

记1次前端性能优化之CPU使用率

碰到这样的一个问题&#xff0c;用户反馈页面的图表一直加载不出来&#xff0c;页面还卡死 打开链接页面&#xff0c;打开控制台 Network 看到有个请求一直pending&#xff0c;结合用户描述&#xff0c;页面一直loading,似乎验证了我的怀疑&#xff1a;后端迟迟没有相应。 但是…

LINQ详解(查询表达式)

什么是LINQ&#xff1f; LINQ(语言集成查询)是将查询功能直接集成到C#中。数据查询表示简单的字符串&#xff0c;在编译时不会进行类型检查和IntelliSense(代码补全辅助工具)支持。 在开发中&#xff0c;通常需要对不同类型的数据源了解不同的查询语句&#xff0c;如SQL数据库…

Redis项目实战——商户查询缓存

目录 为什么要用Redis实现商户查询缓存&#xff1f;用Redis实现商户查询缓存的基本思路&#xff1f;使用Redis缓存的问题及解决方法&#xff1f;一、如何保持数据库数据和Redis缓存数据的一致性&#xff1f;1 内存淘汰机制2 超时剔除机制3 主动更新机制&#xff08;胜&#xff…

sql:SQL优化知识点记录(七)

&#xff08;1&#xff09;索引优化5 &#xff08;2&#xff09;索引优化6 &#xff08;3&#xff09;索引优化7 查询*&#xff0c; 百分号加右边&#xff0c;否则索引会失效 没建立索引之前都是全表扫描 没建立索引 建立索引&#xff1a; 建立索引 id是主键&#xff0c;他也…

全新UI站长在线工具箱系统源码带后台开源版

该系统的全开源版本可供下载&#xff0c;并且支持暗黑模式。 系统内置高达72种站长工具、开发工具、娱乐工具等功能。此系统支持本地调用API&#xff0c;同时还自带免费API接口&#xff0c; 是一个多功能性工具程序&#xff0c;支持后台管理、上传插件、添加增减删功能。 环…

HAProxy(一)

四层负载均衡与七层负载均衡区别 四层负载均衡和七层负载均衡是两种不同的负载均衡方式&#xff0c;主要区别在于负载均衡的层级及其所支持的协议不同。 四层负载均衡&#xff0c;也称为传输层负载均衡&#xff0c;工作在 OSI 模型的传输层&#xff08;第四层&#xff09;&am…