PyFlink 教程(三):PyFlink DataStream API - state timer

简介: 介绍如何在 Python DataStream API 中使用 state & timer 功能。

一、背景

Flink 1.13 已于近期正式发布,超过 200 名贡献者参与了 Flink 1.13 的开发,提交了超过 1000 个 commits,完成了若干重要功能。其中,PyFlink 模块在该版本中也新增了若干重要功能,比如支持了 state、自定义 window、row-based operation 等。随着这些功能的引入,PyFlink 功能已经日趋完善,用户可以使用 Python 语言完成绝大多数类型Flink作业的开发。接下来,我们详细介绍如何在 Python DataStream API 中使用 state & timer 功能。

二、state 功能介绍

作为流计算引擎,state 是 Flink 中最核心的功能之一。

  • 在 1.12 中,Python DataStream API 尚不支持 state,用户使用 Python DataStream API 只能实现一些简单的、不需要使用 state 的应用;
  • 而在 1.13 中,Python DataStream API 支持了此项重要功能。

state 使用示例

如下是一个简单的示例,说明如何在 Python DataStream API 作业中使用 state:

from pyflink.common import WatermarkStrategy, Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptorclass MyMapFunction(MapFunction):def open(self, runtime_context: RuntimeContext):state_desc = ValueStateDescriptor('cnt', Types.LONG())# 定义value stateself.cnt_state = runtime_context.get_state(state_desc)def map(self, value):cnt = self.cnt_state.value()if cnt is None:cnt = 0new_cnt = cnt + 1self.cnt_state.update(new_cnt)return value[0], new_cntdef state_access_demo():# 1. 创建 StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()# 2. 创建数据源seq_num_source = NumberSequenceSource(1, 100)ds = env.from_source(source=seq_num_source,watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name='seq_num_source',type_info=Types.LONG())# 3. 定义执行逻辑ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \.key_by(lambda a: a[0]) \.map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()]))# 4. 将打印结果数据ds.print()# 5. 执行作业env.execute()if __name__ == '__main__':state_access_demo()

在上面的例子中,我们定义了一个 MapFunction,该 MapFunction 中定义了一个名字为 “cnt_state” 的 ValueState,用于记录每一个 key 出现的次数。

说明:

  • 除了 ValueState 之外,Python DataStream API 还支持 ListState、MapState、ReducingState,以及 AggregatingState;
  • 定义 state 的 StateDescriptor 时,需要声明 state 中所存储的数据的类型(TypeInformation)。另外需要注意的是,当前 TypeInformation 字段并未被使用,默认使用 pickle 进行序列化,因此建议将 TypeInformation 字段定义为 Types.PICKLED_BYTE_ARRAY() 类型,与实际所使用的序列化器相匹配。这样的话,当后续版本支持使用 TypeInformation 之后,可以保持后向兼容性;
  • state 除了可以在 KeyedStream 的 map 操作中使用,还可以在其它操作中使用;除此之外,还可以在连接流中使用 state,比如:
ds1 = ...  # type DataStream
ds2 = ...  # type DataStream
ds1.connect(ds2) \.key_by(key_selector1=lambda a: a[0], key_selector2=lambda a: a[0]) \.map(MyCoMapFunction())  # 可以在MyCoMapFunction中使用state

可以使用 state 的 API 列表如下:

 操作自定义函数
KeyedStreammapMapFunction
flat_mapFlatMapFunction 
reduceReduceFunction 
filterFilterFunction 
processKeyedProcessFunction 
ConnectedStreamsmapCoMapFunction
flat_mapCoFlatMapFunction 
processKeyedCoProcessFunction 
WindowedStreamapplyWindowFunction
 processProcessWindowFunction

state 工作原理

 

上图是 PyFlink 中,state 工作原理的架构图。从图中我们可以看出,Python 自定义函数运行在 Python worker 进程中,而 state backend 运行在 JVM 进程中(由 Java 算子来管理)。当 Python 自定义函数需要访问 state 时,会通过远程调用的方式,访问 state backend。

我们知道,远程调用的开销是非常大的,为了提升 state 读写的性能,PyFlink 针对 state 读写做了以下几个方面的优化工作:

  • Lazy Read:

    对于包含多个 entry 的 state,比如 MapState,当遍历 state 时,state 数据并不会一次性全部读取到 Python worker 中,只有当真正需要访问时,才从 state backend 读取。

  • Async Write:

    当更新 state 时,更新后的 state,会先存储在 LRU cache 中,并不会同步地更新到远端的 state backend,这样做可以避免每次 state 更新操作都访问远端的 state backend;同时,针对同一个 key 的多次更新操作,可以合并执行,尽量避免无效的 state 更新。

  • LRU cache:

    在 Python worker 进程中维护了 state 读写的 cache。当读取某个 key 时,会先查看其是否已经被加载到读 cache 中;当更新某个 key 时,会先将其存放到写 cache 中。针对频繁读写的 key,LRU cache 可以避免每次读写操作,都访问远端的 state backend,对于有热点 key 的场景,可以极大提升 state 读写性能。

  • Flush on Checkpoint:

    为了保证 checkpoint 语义的正确性,当 Java 算子需要执行 checkpoint时,会将 Python worker中的写 cache 都 flush 回 state backend。

其中 LRU cache 可以细分为二级,如下图所示:

 

说明:

  • 二级 cache 为 global cache,二级 cache 中的读 cache 中存储着当前 Python worker 进程中所有缓存的原始 state 数据(未反序列化);二级 cache 中的写 cache 中存储着当前 Python worker 进程中所有创建的 state 对象。
  • 一级 cache 位于每一个 state 对象内,在 state 对象中缓存着该 state 对象已经从远端的 state backend 读取的 state 数据以及待更新回远端的 state backend 的 state 数据。

工作流程:

  • 当在 Python UDF 中,创建一个 state 对象时,首先会查看当前 key 所对应的 state 对象是否已经存在(在二级 cache 中的 “Global Write Cache” 中查找),如果存在,则返回对应的 state 对象;如果不存在,则创建新的 state 对象,并存入 “Global Write Cache”;
  • state 读取:当在 Python UDF 中,读取 state 对象时,如果待读取的 state 数据已经存在(一级 cache),比如对于 MapState,待读取的 map key/map value 已经存在,则直接返回对应的 map key/map value;否则,访问二级 cache,如果二级 cache 中也不存在待读取的 state 数据,则从远端的 state backend 读取;
  • state 写入:当在 Python UDF 中,更新 state 对象时,先写到 state 对象内部的写 cache 中(一级 cache);当 state 对象中待写回 state backend 的 state 数据的大小超过指定阈值或者当遇到 checkpoint 时,将待写回的 state 数据写回远端的 state backend。

state 性能调优

通过前一节的介绍,我们知道 PyFlink 使用了多种优化手段,用于提升 state 读写的性能,这些优化行为可以通过以下参数配置:

配置说明
python.state.cache-sizePython worker 中读 cache 以及写 cache 的大小。(二级 cache)需要注意的是:读 cache、写 cache是独立的,当前不支持分别配置读 cache 以及写 cache 的大小。
python.map-state.iterate-response-batch-size当遍历 MapState 时,每次从 state backend 读取并返回给 Python worker 的 entry 的最大个数。
python.map-state.read-cache-size一个 MapState 的读 cache 中最大允许的 entry 个数(一级 cache)。当一个 MapState 中,读 cache 中的 entry 个数超过该阈值时,会通过 LRU 策略从读 cache 中删除最近最少访问过的 entry。
python.map-state.write-cache-size一个 MapState 的写 cache 中最大允许的待更新 entry 的个数(一级 cache)。当一个 MapState 中,写 cache 中待更新的 entry 的个数超过该阈值时,会将该 MapState 下所有待更新 state 数据写回远端的 state backend。

需要注意的是,state 读写的性能不仅取决于以上参数,还受其它因素的影响,比如:

  • 输入数据中 key 的分布:

    输入数据的 key 越分散,读 cache 命中的概率越低,则性能越差。

  • Python UDF 中 state 读写次数:

    state 读写可能涉及到读写远端的 state backend,应该尽量优化 Python UDF 的实现,减少不必要的 state 读写。

  • checkpoint interval:

    为了保证 checkpoint 语义的正确性,当遇到 checkpoint 时,Python worker 会将所有缓存的待更新 state 数据,写回 state backend。如果配置的 checkpoint interval 过小,则可能并不能有效减少 Python worker 写回 state backend 的数据量。

  • bundle size / bundle time:

    当前 Python 算子会将输入数据划分成多个批次,发送给 Python worker 执行。当一个批次的数据处理完之后,会强制将 Python worker 进程中的待更新 state 写回 state backend。与 checkpoint interval 类似,该行为也可能会影响 state 写性能。批次的大小可以通过 python.fn-execution.bundle.size 和 python.fn-execution.bundle.time 参数控制。

三、timer 功能介绍

timer 使用示例

除了 state 之外,用户还可以在 Python DataStream API 中使用定时器 timer。

import datetimefrom pyflink.common import Row, WatermarkStrategy
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironmentclass CountWithTimeoutFunction(KeyedProcessFunction):def __init__(self):self.state = Nonedef open(self, runtime_context: RuntimeContext):self.state = runtime_context.get_state(ValueStateDescriptor("my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()])))def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):# retrieve the current countcurrent = self.state.value()if current is None:current = Row(value.f1, 0, 0)# update the state's countcurrent[1] += 1# set the state's timestamp to the record's assigned event time timestampcurrent[2] = ctx.timestamp()# write the state backself.state.update(current)# schedule the next timer 60 seconds from the current event timectx.timer_service().register_event_time_timer(current[2] + 60000)def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):# get the state for the key that scheduled the timerresult = self.state.value()# check if this is an outdated timer or the latest timerif timestamp == result[2] + 60000:# emit the state on timeoutyield result[0], result[1]class MyTimestampAssigner(TimestampAssigner):def __init__(self):self.epoch = datetime.datetime.utcfromtimestamp(0)def extract_timestamp(self, value, record_timestamp) -> int:return int((value[0] - self.epoch).total_seconds() * 1000)if __name__ == '__main__':env = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(stream_execution_environment=env)t_env.execute_sql("""CREATE TABLE my_source (a TIMESTAMP(3),b VARCHAR,c VARCHAR) WITH ('connector' = 'datagen','rows-per-second' = '10')""")stream = t_env.to_append_stream(t_env.from_path('my_source'),Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))watermarked_stream = stream.assign_timestamps_and_watermarks(WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(MyTimestampAssigner()))# apply the process function onto a keyed streamwatermarked_stream.key_by(lambda value: value[1])\.process(CountWithTimeoutFunction()) \.print()env.execute()

在上述示例中,我们定义了一个 KeyedProcessFunction,该 KeyedProcessFunction 记录每一个 key 出现的次数,当一个 key 超过 60 秒没有更新时,会将该 key 以及其出现次数,发送到下游节点。

除了 event time timer 之外,用户还可以使用 processing time timer。

timer 工作原理

timer 的工作流程是这样的:

  • 与 state 访问使用单独的通信信道不同,当用户注册 timer 之后,注册消息通过数据通道发送到 Java 算子;
  • Java 算子收到 timer 注册消息之后,首先检查待注册 timer 的触发时间,如果已经超过当前时间,则直接触发;否则的话,将 timer 注册到 Java 算子的 timer service 中;
  • 当 timer 触发之后,触发消息通过数据通道发送到 Python worker,Python worker 回调用户 Python UDF 中的的 on_timer 方法。

需要注意的是:由于 timer 注册消息以及触发消息通过数据通道异步地在 Java 算子以及 Python worker 之间传输,这会造成在某些场景下,timer 的触发可能没有那么及时。比如当用户注册了一个 processing time timer,当 timer 触发之后,触发消息通过数据通道传输到 Python UDF 时,可能已经是几秒中之后了。

四、总结

在这篇文章中,我们主要介绍了如何在 Python DataStream API 作业中使用 state & timer,state & timer 的工作原理以及如何进行性能调优。接下来,我们会继续推出 PyFlink 系列文章,帮助 PyFlink 用户深入了解 PyFlink 中各种功能、应用场景以及最佳实践等。

原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/513284.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

长跑 11 年,腾讯开源的变与不变

作者 | 贾凯强出品 | CSDN云计算(ID:CSDNcloud)在中国,开源产业的发展就像是一个美丽的童话故事。90年代,开源如一无所有的灰姑娘,仰望着海外梦幻般的舞会,自己却很难融入其中;而世纪…

.net 批量更新_Revit二次开发——读取CAD文字实现更新模型的思路

更新模型与内地BIM项目中 设计院终版图纸一波流翻模的模式不同香港BIM项目的模式是:设计出图—BIM出碰撞报告—设计再改图—BIM再碰撞报告......反反复复....模型频繁更新 是BIM项目服务过程中不可避免的应对方法:1.晚上加班2.周末加班本文中 模型更新的…

php使用七牛直播,七牛上传文件,PHP版本

自从知道七牛以来,就一直在用七牛做图片外链,但是每次需要到七牛官网登录,然后再上传图片。感觉很麻烦,最近想做一个自己的上传到七牛的平台,开始的想法是用C#写一个windows客户端,在用swift写一个mac客户端…

汽车之家:基于 Flink + Iceberg 的湖仓一体架构实践

简介: 由汽车之家实时计算平台负责人邸星星在 4 月 17 日上海站 Meetup 分享的,基于 Flink Iceberg 的湖仓一体架构实践。 内容简要: 一、数据仓库架构升级的背景 二、基于 Iceberg 的湖仓一体架构实践 三、总结与收益 四、后续规划 一、数据…

基于 Scheduled SQL 对 VPC FlowLog 实现细粒度时间窗口分析

简介: 针对VPC FlowLog的五元组和捕获窗口信息,在分析时使用不同时间窗口精度,可能得到不一样的流量特征,本文介绍一种方法将原始采集日志的时间窗口做拆分,之后重新聚合为新的日志做分析,达到更细粒度的分…

实力登场!移动云技术内核2.0 四大全新升级!

“中国数字经济占GDP比重持续增长,5G网络建设已进入规模化部署阶段。随着5G网络的发展,企业的数字化改造需求越来越旺盛。企业日益增长的数字化改造需求对云基础设施提出了新的挑战:需要支持多种类型网络接入、支持公有云、混合云、专属云等多…

obsidian使用分享

ob对比其他软件 上文提到obsidian,这里对obsidian做一个简要的总结 优点:对比notion,语雀这些软件,内容存储在应用商的服务器上。它是存在本地的。 对比思源笔记。说一下思源笔记的不足。思源是块来控制的,回车就是一…

苹果xr如何截屏_苹果手机自带的三种截屏技巧,你知道几个?现在知道还不迟...

今年苹果手机发布的新机自发布以来就受到了热烈的追捧,销量一直都处于只增不减的趋势。苹果手机为何如此之火?除了本身自带的IOS系统之外,手机自带很多小技巧,你知道不?今天就来为大家介绍苹果手机中的三种截屏小技巧&…

Scheduled SQL: SLS 大规模日志上的全局分析与调度

简介: 本文总结了大规模日志全局分析的需求,讨论SLS上现有的典型分析方案,并延伸到 SLS 原生数据处理方案,介绍 Schedueld SQL 功能与最佳实践。 大规模日志全局分析的需求 数据大规模与时效性 基于时间的数据(日志…

matlab制作以太网数据接收上位机_3D激光扫描仪设计及数据处理

本文内容转载自《电子技术应用》2019年第10期,版权归《电子技术应用》编辑部所有。段清明,王凡,徐琳琳,全文俊吉林大学仪器科学与电气工程学院摘要:利用2D激光雷达配合云台装置,设计了一种3D激光扫描仪作为…

跨平台(windows+linux)的线程辅助程序,跨平台(Windows+Linux)的Socket通讯程序(二)—结构...

上一篇"跨平台(WindowsLinux)的Socket通讯程序"给出了Socket通讯底层的一些函数的包装方法/类,同时屏蔽了操作系统(Windows/Linux)的不同。上一篇只是对通讯底层方法的封装,并没用涉及应用,这一篇将基于上一篇,并结合&q…

数据的“敏捷制造”,DataWorks一站式数据开发治理范式演进

简介: 企业大数据技术发展至今,历经了两次蜕变。第一次蜕变从最初的“小作坊”解决大数据问题,到后来企业用各类大数据技术搭建起属于自己的“大平台”,通过平台化的能力完成数据生产力的升级。 第二次蜕变让大数据从“大平台”向…

全新的 Fragment 通信方式

作者 | tech-bus.丹卿来源 | 程序员巴士前言就在前段时间,Google 推出了 Fragment Result API 和 Activity Results API,用来取代之前的 Activity 和 Fragment 之间通信方式的不足,大家可以前往看看都有哪些更新:https://medium.c…

数据传输完整性_电缆监测数据传输系统分析与设计

电缆线路是重要的输电方式,对电缆线路进行监测是保证电缆线路正常工作的重要的条件,研究人员利用嵌入式系统设计了电缆监测数据传输系统。该系统以CAN通信和嵌入式以太网络技术为核心,实现了对电缆及其沟道的实时监测、状态显示及预报警功能&…

大型企业多账号管理“安全心法”

简介: 云上多账号环境下的网络统一管理,是大型分支型企业网络安全防护的必经之路。无论是外企入华、国内企业出海,还是本土集团型企业规模化成长,云上统一网络安全管控与整体安全态势感知,都可以拉齐企业账号间安全水位…

苹果将于 2025 年推出的 Apple Car 长什么样?

整理 | 孙胜出品 | CSDN(ID:CSDNnews)据国外媒体报道,苹果公司预计将于2025年推出一款全新的自动驾驶汽车,旨在实现真正意义上的无人驾驶。报道称,基于自动驾驶的理念,苹果理想的汽车没有方向盘…

阿里云中间件首席架构师李小平:云原生实践助力企业高效创新

简介: 通过云原生技术,真正为企业带来更多的业务价值,助力企业整体的业务创新。 作者:李小平 前天我参加了信通院的云原生产业大会,在会场上非常感慨,参加会议的企业非常多,并且来自于各行各业…

cv曲线面积的意义_几何直觉的魅力:sinx曲线下的面积原理是如此的美妙

用“曲线下的面积”来描述积分,就像用一串单词来描述一本书。正弦函数的积分是其曲线下的面积。几何直觉就是:“正弦的积分是沿圆周路径的水平距离。”这句话第一次听说感觉比较抽象,当你理解了就会觉得它非常的美妙一般的思维模式求正弦函数的积分就是&…

OpenInfra 十一年:OpenStack 部署规模超 2500 万计算核心

后疫情时代下,产生海量在线需求,越来越多金融、政府、教育、通信和医疗保健等上云业务需依赖现代云基础设施来正常运行。其中开源提供了一种更具成本效益的开发方式,据最新《2021 年度 Octoverse 报告》显示,2021 年 GitHub 开发者…

集群镜像:实现高效的分布式应用交付

简介: Docker 解决了单个容器的镜像化问题,而 sealer 通过把整个集群打包,实现了分布式软件的 Build Share Run。 作者 | fanux.中弈 什么是集群镜像 顾名思义,和操作系统 .iso 镜像或 Docker 镜像类似,集群镜像是用一…