Flink流批一体计算(17):PyFlink DataStream API之StreamExecutionEnvironment

目录

StreamExecutionEnvironment

Watermark

watermark策略简介

使用 Watermark 策略

内置水印生成器

处理空闲数据源

算子处理 Watermark 的方式

创建DataStream的方式

通过list对象创建

​​​​​​使用DataStream connectors创建

使用Table & SQL connectors创建


StreamExecutionEnvironment

编写一个 Flink Python DataStream API 程序,首先需要声明一个执行环境StreamExecutionEnvironment,这是流式程序执行的上下文。

你将通过它来设置作业的属性(例如默认并发度、重启策略等)、创建源、并最终触发作业的执行。

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)

创建了 StreamExecutionEnvironment 之后,你可以使用它来声明数据源。数据源从外部系统(如 Apache Kafka、Rabbit MQ 或 Apache Pulsar)拉取数据到 Flink 作业里。

为了简单起见,本教程读取文件作为数据源。

ds = env.from_source(source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),input_path).process_static_file_set().build(),watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name="file_source"
)

Watermark

大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。

为了解决乱序数据,flink引入watermark。引入watermark机制则会等待晚到的数据一段时间,等待时间到则触发计算,如果数据延迟很大,通常也会被丢弃或者另外处理。

为了使用事件时间语义,Flink 应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳。其通常通过使用 TimestampAssigner API 从元素中的某个字段去访问/提取时间戳。

watermark策略简介

时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定 WatermarkGenerator 来配置 watermark 的生成方式。

使用 Flink API 时需要设置一个同时包含 TimestampAssigner WatermarkGenerator WatermarkStrategyWatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。

使用 Watermark 策略

WatermarkStrategy 可以在 Flink 应用程序中的两处使用,第一种是直接在数据源上使用,第二种是直接在非数据源的操作之后使用。

第一种方式相比会更好,因为数据源可以利用 watermark 生成逻辑中有关分片/分区(shards/partitions/splits)的信息。使用这种方式,数据源通常可以更精准地跟踪 watermark,整体 watermark 生成将更精确。直接在源上指定 WatermarkStrategy 意味着你必须使用特定数据源接口。

仅当无法直接在数据源上设置策略时,才应该使用第二种方式(在任意转换操作之后设置 WatermarkStrategy

内置水印生成器

水印策略定义了如何在流源中生成水印。WatermarkStrategy是生成水印的WatermarkGenerator和分配记录内部时间戳的TimestampAssigner的生成器/工厂。

BoundedOutOfOrdernessDuration),为创建WatermarkStrategy常见的内置策略。

for_bound_out_of_ordernness(max_out_of_ordernesspyflink.common.time.Duration)为记录无序的情况创建水印策略,但可以设置事件无序程度的上限。

无序绑定B意味着一旦遇到时间戳为T的事件,就不会再出现早于(T-B)的事件。

for_bound_out_of_ordernness(5)

for_mononous_timestamps()为时间戳单调递增的情况创建水印策略。

水印是定期生成的,并严格遵循数据中的最新时间戳。该策略引入的延迟主要是生成水印的周期间隔。

WatermarkStrategy.for_monotonous_timestamps()

with_timestamp_assigner(timestamp_assigner:pyflink.common.watermark_strategy.TimestampAssigner)

创建一个新的WatermarkStrategy,该策略通过实现TimestampAssigner接口使用给定的TimestampAssigner。

参数: timestamp_assigner 给定的TimestampAssigner。

Return: 包装TimestampAssigner的WaterMarkStrategy。

watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()

    with_timestamp_assigner(MyTimestampAssigner())

处理空闲数据源

如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着 WatermarkGenerator 也不会获得任何新数据去生成 watermark。我们称这类数据源为空闲输入或空闲源。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化。

为了解决这个问题,你可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。WatermarkStrategy 为此提供了一个工具接口withIdleness(Duration.ofMinutes(1))

with_idleness(idle_timeout:pyfrink.common.time.Duration)

创建一个新的丰富的WatermarkStrategy,它也在创建的WatermarkGenerator中执行空闲检测。

参数:idle_timeout–空闲超时。

Return:配置了空闲检测的新水印策略。

算子处理 Watermark 的方式

一般情况下,在将 watermark 转发到下游之前,需要算子对其进行触发的事件完全进行处理。例如,WindowOperator 将首先计算该 watermark 触发的所有窗口数据,当且仅当由此 watermark 触发计算进而生成的所有数据被转发到下游之后,其才会被发送到下游。换句话说,由于此 watermark 的出现而产生的所有数据元素都将在此 watermark 之前发出。

相同的规则也适用于 TwoInputstreamOperator。但是,在这种情况下,算子当前的 watermark 会取其两个输入的最小值。

创建DataStream的方式

通过list对象创建

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_collection(collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],type_info=Types.ROW([Types.INT(), Types.STRING()]))

​​​​​​使用DataStream connectors创建

使用add_source函数,此函数仅支持FlinkKafkaConsumer,仅在streaming执行模式下使用

from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumerenv = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
env.add_jars("file:///path/to/flink-sql-connector-kafka.jar")
deserialization_schema = JsonRowDeserializationSchema.builder() \.type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()kafka_consumer = FlinkKafkaConsumer(topics='test_source_topic',deserialization_schema=deserialization_schema,properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})ds = env.add_source(kafka_consumer)

使用from_source函数,此函数仅支持NumberSequenceSource和FileSource自定义数据源,仅在streaming执行模式下使用

from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSourceenv = StreamExecutionEnvironment.get_execution_environment()
seq_num_source = NumberSequenceSource(1, 1000)
ds = env.from_source(source=seq_num_source,watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name='seq_num_source',type_info=Types.LONG())

​​​​​​​使用Table & SQL connectors创建

首先用Table & SQL connectors创建表,再转换为DataStream.

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)t_env.execute_sql("""CREATE TABLE my_source (a INT,b VARCHAR) WITH ('connector' = 'datagen','number-of-rows' = '10')""")ds = t_env.to_append_stream(t_env.from_path('my_source'),Types.ROW([Types.INT(), Types.STRING()]))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/58284.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Autofac在WebApi,Winform中应用

安装注意事项 使用AOP的时候需要安装Autofac.Extras.DynamicProxy,如果发现VS老是提示报错,需要把VS重启下才可以识别。 WebApi 注意事项:WebApi中多一个ApiController中构造注入功能。 注入和AOP拦截 var siteNameList ClassHelper.GetConstants(typeof(SiteName));//创建…

[MySQL] MySQL基础操作汇总

文章目录 前言1.数据库概述1.1 数据库相关概念1.2登录MySQL:1.3 MySQL常用命令1.4表:1.5SQL语句分类: 2.CRUD操作2.1 DQL1.基础查询基础查询(简单查询)条件查询:排序查询:分组查询:分…

《golang设计模式》第二部分·结构型模式-03-组合模式(Composite)

文章目录 1. 概述1.1 角色1.2 类图 2. 代码示例2.1 设计2.2 代码2.3 类图 1. 概述 将叶子节点和复合节点组合起来,定义一个抽象接口遍历他们 1.1 角色 Component(抽象构件):为叶子构件和复合构件声明接口,定义了结构…

Python面试题100例【26~30题】

二十六、请介绍下Django框架的生命周期 Django是一个高级的Python Web框架,它遵循MVC设计模式(在Django中通常称为MTV,即模型(Model)、模板(Template)和视图(Views)),并且鼓励快速开发和干净、…

使用element-plus组件,默认显示英文 转换为中文

最近在边写项目边学习vue3 所以这几天没有更新 找机会把vue3的知识也统计一下吧 先说今天遇到的问题 最近做项目的时候使用element-plus分页组件时发现&#xff0c;显示的不是中文的了&#xff0c;是英文的 解决方法 在app.vue里面配置 <template><el-config-provi…

ROS2 CLI工具

目录 节点topic参数launch录包 节点 查看所有node&#xff1a;ros2 node list查看某个node的信息&#xff1a;ros2 node info node_name topic 查看topic输出&#xff1a; ros2 topic echo <topic_name> 查看topic频率&#xff1a;ros2 topic hz <topic_name> …

美创科技“签”手柠檬文才学堂,共推高校数据安全建设

近日&#xff0c;由柠檬文才学堂联合中国教育在线、东北财经大学网络教育学院共同主办的“三教统筹下高校继续教育数字化转型研讨”顺利召开。 国内高等院校&#xff08;高职院校&#xff09;继续教育分管领导&#xff0c;继续教育学院领导及继续教育信息化、教学教务管理、课程…

今天的小结

1、冒泡排序 冒泡排序&#xff08;Bubble Sort&#xff09;是一种简单的排序算法&#xff0c;它重复地遍历待排序的元素列表&#xff0c;比较相邻的元素并交换它们的位置&#xff0c;直到整个列表排序完成。冒泡排序的基本思想是通过不断交换相邻元素&#xff0c;将最大&#…

Qt——QLabel控件常见的属性、方法和信号

QLabel 控件常见的属性、方法和信号 一、QLabel 控件常见属性 二、QLabel 控件常见方法 三、QLabel 控件常见信号 方法&#xff1a; setxxx设置属性、去掉set就是获取属性 所有控件中的属性和方法都是通用的 QLabel 控件 描述&#xff1a;QLabel 是一个用于显示文本或图像…

C++之ifstream成员函数get、tellg、eof实例(一百八十五)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

Java使用 Gson 将对象转换为 JSON 字符串

在开发过程中&#xff0c;经常需要将 Java 对象转换为 JSON 字符串&#xff0c;以实现数据的序列化和传输。Gson 是一个流行的 Java 库&#xff0c;它提供了方便的方法来将对象转换为 JSON 字符串。本文将介绍使用 Gson 库的步骤&#xff0c;将一个对象转换为 JSON 字符串&…

前端实习day35

今天是下早班的一天&#xff0c;下完班直接赶车回广州了&#xff0c;吐槽一下深圳站管理得真得差&#xff0c;候车厅小&#xff0c;人巨多&#xff0c;而且进站口的标识也很少&#xff0c;绕了好久才找到&#xff01;下次再也不去了。 今天是改bug的一天&#xff0c;但是有半天…

范式 事务 多表查询

范式 概念&#xff1a;设计数据库时&#xff0c;需要遵循的一些规范。要遵循后边的范式要求&#xff0c;必须遵循前边的所有范式要求 第一范式&#xff1a; 数据库表的每一列都是不可分割的基本数据项 这样子就不满足第一范式 这样子就满足第一范式 存在问题&#xff1a; 数…

无涯教程-PHP - 返回类型声明

在PHP 7中&#xff0c;引入了一个新函数返回类型声明&#xff0c;返回类型声明指定函数应返回的值的类型&#xff0c;可以声明返回类型的以下类型。 intfloatbooleanstringinterfacesarraycallable 有效返回类型 <?phpdeclare(strict_types1);function returnIntValue(i…

Unity打包Windows程序,概率性出现无法全屏或分辨率不匹配

排除代码和Resolution and Presentation面板设置问题 如果程序还是不能按照预期的分辨率运行&#xff0c;应该是系统注册表记录了对应的设置。 解决方案&#xff1a; 打开注册表&#xff0c;使用快捷键“Win” "R"组合快捷键。在打开后面键入命令&#xff1a;Rege…

一键快速还原修复人脸,CodeFormer 助力人脸图像修复

今天在查资料的时候无意间看到了一个很有意思的工具&#xff0c;就是CodeFormer &#xff0c;作者给出来的说明是用于人脸修复任务的&#xff0c;觉得很有意思就拿来实践了一下&#xff0c;这里记录分享一下。 首先对人脸修复任务进行简单的回顾总结&#xff1a; 人脸修复是指…

关于gRPC微服务利弊之谈

gRPC微服务架构包括以下几个主要组件&#xff1a; 服务定义&#xff1a;定义服务的接口和消息格式&#xff0c;使用Protocol Buffers或其他的消息格式进行描述。服务实现&#xff1a;实现定义的服务接口和消息处理逻辑。服务器端实现&#xff1a;在服务器端&#xff0c;需要实…

分发饼干【贪心算法】

分发饼干 假设你是一位很棒的家长&#xff0c;想要给你的孩子们一些小饼干。但是&#xff0c;每个孩子最多只能给一块饼干。 对每个孩子 i&#xff0c;都有一个胃口值 g[i]&#xff0c;这是能让孩子们满足胃口的饼干的最小尺寸&#xff1b;并且每块饼干 j&#xff0c;都有一个…

MATLAB中符号变量的使用方法解析

简介 MATLAB中常常使用符号变量&#xff0c;这里定义符号变量的函数是syms 使用方法如下 syms x y z 其中&#xff0c;x、y、z 是符号变量&#xff0c;可以是任意字母、数字或下划线组合而成的字符串。 举例1&#xff1a; 代码 以下是一个简单的例子&#xff0c;演示如何…

爬虫入门01

1. 请求头中最常见的一些重要内容 User-Agent : 请求载体的身份标识(⽤啥发送的请求)Referer: 防盗链(这次请求是从哪个⻚⾯来的? 反爬会⽤到)cookie: 本地字符串数据信息(⽤户登录信息, 反爬的token) 2. 响应头中一些重要内容 cookie: 本地字符串数据信息(⽤户登录信息, 反…