46、Flink 的 异步 I/O 算子详解

异步 I/O
1.需求

在与外部系统交互(用数据库中的数据扩充流数据)时,需要考虑与外部系统的通信延迟对整个流处理应用的影响。

同步交互:使用 MapFunction访问外部数据库的数据, MapFunction 向数据库发送一个请求然后一直等待,直到收到响应;大多数情况下,等待占据了函数运行的大部分时间。

异步交互:一个并行函数实例可以并发地处理多个请求和接收多个响应,使函数在等待的时间可以发送其他请求和接收其他响应,使等待的时间可以被多个请求摊分;大多数情况下,异步交互可以大幅度提高流处理的吞吐量。

在这里插入图片描述

注意: 提高 MapFunction 的并行度(parallelism)在有些情况下也可以提升吞吐量,但是这样做通常会导致非常高的资源消耗:更多的并行 MapFunction 实例意味着更多的 Task、更多的线程、更多的 Flink 内部网络连接、 更多的与数据库的网络连接、更多的缓冲和更多程序内部协调的开销。

2.前提
  • 需要支持异步请求的数据库客户端
  • 如果没有支持异步请求的客户端,可以通过创建多个客户端并使用线程池处理同步调用的方法,将同步客户端转换为有限并发的客户端,比正规的异步客户端效率低。
3.异步 I/O API
a)代码模版

Flink 的异步 I/O API 允许用户在流处理中使用异步请求客户端,API 处理与数据流的集成,同时还能处理好顺序、事件时间和容错等。

在具备异步数据库客户端的基础上,实现数据流转换操作与数据库的异步 I/O 交互需要以下三部分:

  • 实现分发请求的 AsyncFunction
  • 获取数据库交互的结果并发送给 ResultFuture回调 函数
  • 将异步 I/O 操作应用于 DataStream 作为 DataStream 的一次转换操作,启用或者不启用重试。
// 使用 Java 8 的 Future 接口(与 Flink 的 Future 相同)实现了异步请求和回调。/*** 实现 'AsyncFunction' 用于发送请求和设置回调。*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {/** 能够利用回调函数并发发送请求的数据库客户端 */private transient DatabaseClient client;@Overridepublic void open(OpenContext openContext) throws Exception {client = new DatabaseClient(host, post, credentials);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {// 发送异步请求,接收 future 结果final Future<String> result = client.query(key);// 设置客户端完成请求后要执行的回调函数// 回调函数只是简单地把结果发给 futureCompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {return result.get();} catch (InterruptedException | ExecutionException e) {// 显示地处理异常。return null;}}}).thenAccept( (String dbResult) -> {resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));});}
}// 创建初始 DataStream
DataStream<String> stream = ...;// 应用异步 I/O 转换操作,不启用重试
DataStream<Tuple2<String, String>> resultStream =AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);// 应用异步 I/O 转换操作并启用重试
// 通过工具类创建一个异步重试策略, 或用户实现自定义的策略
AsyncRetryStrategy asyncRetryStrategy =new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms.ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE).ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE).build();// 应用异步 I/O 转换操作并启用重试
DataStream<Tuple2<String, String>> resultStream =AsyncDataStream.unorderedWaitWithRetry(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy);

注意: 第一次调用 ResultFuture.completeResultFuture 就完成了,后续的 complete 调用都将被忽略。

下面两个参数控制异步操作:

  • Timeout: 超时参数定义了异步操作执行多久未完成、最终认定为失败的时长,如果启用重试,则可能包括多个重试请求,可以防止一直等待得不到响应的请求。
  • Capacity: 容量参数定义了可以同时进行的异步请求数;即使异步 I/O 通常带来更高的吞吐量,执行异步 I/O 操作的算子仍然可能成为流处理的瓶颈,限制并发请求的数量可以确保算子不会持续累积待处理的请求进而造成积压,而是在容量耗尽时触发反压
  • AsyncRetryStrategy: 重试策略参数定义了什么条件会触发延迟重试以及延迟的策略,例如,固定延迟、指数后退延迟、自定义实现等。
b)超时处理

当异步 I/O 请求超时的时候,默认会抛出异常并重启作业。

如果想处理超时,可以重写 AsyncFunction#timeout 方法;重写 AsyncFunction#timeout 时需要调用 ResultFuture.complete() 或者 ResultFuture.completeExceptionally() 以通知 Flink 这条记录的处理已经完成;如果超时发生时不想发出任何记录,可以调用 ResultFuture.complete(Collections.emptyList())

c)结果的顺序

AsyncFunction 发出的并发请求经常以不确定的顺序完成,这取决于请求得到响应的顺序;Flink 提供两种模式控制结果记录以何种顺序发出。

  • 无序模式: 异步请求一结束就立刻发出结果记录。 流中记录的顺序在经过异步 I/O 算子之后发生了改变。 当使用 处理时间 作为基本时间特征时,这个模式具有最低的延迟和最少的开销。 此模式使用 AsyncDataStream.unorderedWait(...) 方法。
  • 有序模式: 保持了流的顺序。发出结果记录的顺序与触发异步请求的顺序(记录输入算子的顺序)相同;算子将缓冲一个结果记录直到这条记录前面的所有记录都发出(或超时),因为记录或者结果要在 checkpoint 的状态中保存更长的时间,所以与无序模式相比,有序模式通常会带来额外的延迟和 checkpoint 开销。此模式使用 AsyncDataStream.orderedWait(...) 方法。
d)事件时间

当流处理应用使用事件时间时,异步 I/O 算子会正确处理 watermark。

  • 无序模式: Watermark 既不超前于记录也不落后于记录,即 watermark 建立了顺序的边界。 只有连续两个 watermark 之间的记录是无序发出的。 在一个 watermark 后面生成的记录只会在这个 watermark 发出以后才发出。 在一个 watermark 之前的所有输入的结果记录全部发出以后,才会发出这个 watermark。

    在 watermark 的情况下,无序模式 会引入一些与有序模式 相同的延迟和管理开销。开销大小取决于 watermark 的频率。

  • 有序模式: 连续两个 watermark 之间的记录顺序也被保留了。开销与使用处理时间 相比,没有显著的差别。

注意:摄入时间是一种特殊的事件时间,它基于数据源的处理时间自动生成 watermark。

e)容错保证

异步 I/O 算子提供了完全的精确一次容错保证,它将异步请求的记录保存在 checkpoint 中,在故障恢复时重新触发请求。

f)重试支持

重试支持为异步 I/O 操作引入了一个内置重试机制,它对用户的异步函数实现逻辑是透明的。

  • AsyncRetryStrategy: 异步重试策略包含了触发重试条件 AsyncRetryPredicate 定义,以及根据当前已尝试次数判断是否继续重试、下次重试间隔时长的接口方法。 在满足触发重试条件后,有可能因为当前重试次数超过预设的上限放弃重试,或是在任务结束时被强制终止重试(此时系统以最后一次执行的结果或异常作为最终状态)。
  • AsyncRetryPredicate: 触发重试条件可以选择基于返回结果、 执行异常来定义条件,两种条件是或的关系,满足其一即会触发。
g)实现提示

在实现使用 Executor(或者 Scala 中的 ExecutionContext)和回调的 Futures 时,建议使用 DirectExecutor,因为通常回调的工作量很小,DirectExecutor 避免了额外的线程切换开销;回调通常只是把结果发送给 ResultFuture,也就是把它添加进输出缓冲。从这里开始,包括发送记录和与 chenkpoint 交互在内的繁重逻辑都将在专有的线程池中进行处理。

DirectExecutor 可以通过 org.apache.flink.util.concurrent.Executors.directExecutor()com.google.common.util.concurrent.MoreExecutors.directExecutor() 获得。

h)警告

Flink 不以多线程方式调用 AsyncFunction

AsyncFunction 不是以多线程方式调用的;只有一个 AsyncFunction 实例,它被流中相应分区内的每个记录顺序地调用。除非 asyncInvoke(...) 方法快速返回并且依赖于(客户端的)回调,否则无法实现正确的异步 I/O。

例如,以下情况导致阻塞的 asyncInvoke(...) 函数,从而使异步行为无效

  • 使用同步数据库客户端,它的查询方法调用在返回结果前一直被阻塞。
  • asyncInvoke(...) 方法内阻塞等待异步客户端返回的 future 类型对象。

默认情况下,AsyncFunction 的算子(异步等待算子)可以在作业图的任意处使用,但它不能与SourceFunction/SourceStreamTask组成算子链

启用重试后可能需要更大的缓冲队列容量

新的重试功能可能会导致更大的队列容量要求,最大数量可以近似地评估如下。

inputRate * retryRate * avgRetryDuration

例如,对于一个输入率=100条记录/秒的任务,其中1%的元素将平均触发1次重试,平均重试时间为60秒,额外的队列容量要求为:

100条记录/秒 * 1% * 60s = 60

即在无序输出模式下,给工作队列增加 60 个容量可能不会影响吞吐量; 而在有序模式下,头部元素是关键点,它未完成的时间越长,算子提供的处理延迟就越长;在相同的超时约束下,如果头元素事实上获得了更多的重试,那重试功能可能会增加头部元素的处理时间即未完成时间,也就是说在有序模式下,增大队列容量并不是总能提升吞吐。

当队列容量增长时( 可以缓解背压),OOM 的风险会随之增加;对于 ListState 存储来说,理论的上限是 Integer.MAX_VALUE, 虽然队列容量的限制是一样的,但在生产中不能把队列容量增加到太大,此时增加任务的并行性也许更可行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/21523.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

高级 Go 程序设计:使用 net/http/httputil 包构建高效网络服务

高级 Go 程序设计&#xff1a;使用 net/http/httputil 包构建高效网络服务 介绍ReverseProxy 的使用基本概念实现步骤高级配置实际案例 DumpRequest 的使用功能说明代码示例应用场景NewSingleHostReverseProxy 的特性功能概述 详细教程 注意事项使用 NewChunkedWriter 实现高效…

C语言 带头双向循环链表的基本操作

带头双向循环链表的基本操作 结构体定义初始化创建新节点头插头删尾插尾删查找在指定位置之后插入删除指定位置的值打印 结构体定义 typedef int DataType; typedef struct LinkNode {DataType data;struct LinkNode* prev;struct LinkNode* next; }LNode;初始化 有两种初始化…

远控免杀篇

0x00&#xff1a;前言 随着近两年hvv和红蓝对抗以及国家对于网络安全的重视&#xff0c;国内防护水平都蹭蹭上了一个台阶&#xff0c;不管是内部人员的技术水平提高还是防护设备的层层部署&#xff0c;均给了红队人员想要进一步行动设置了障碍。 通过weblogic的cve-2019-2725获…

MLPerf storage基准测试

MLPerf 基准测试 什么是 MLPerf&#xff1f;MLPerf™ 基准测试由来自学术界、研究实验室和行业的 AI 领导者联盟 MLCommons 开发&#xff0c;旨在对硬件、软件和服务的训练和推理性能进行无偏评估。它们都在规定的条件下进行。为了保持在行业趋势的前沿&#xff0c;MLPerf 不断…

C基础-标准库上

下:http://t.csdnimg.cn/LXa0J C 标准库是一组 C 内置函数、常量和头文件&#xff0c;比如 <stdio.h>、<stdlib.h>、<math.h>&#xff0c;等等。 目录 一. assert.h 二. ctype.h 三. errno.h 四. float.h 五.limits.h 六. locale.h 一. assert.h 源码…

166.二叉树:相同的树(力扣)

代码解决 /*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode() : val(0), left(nullptr), right(nullptr) {}* TreeNode(int x) : val(x), left(nullptr), right(nullptr) {}* Tre…

【机器学习】RLHF:在线方法与离线算法在大模型语言模型校准中的博弈

RLHF&#xff1a;在线方法与离线算法在大型语言模型校准中的博弈 一、引言二、RLHF概述三、在线方法与离线算法的对比四、实验验证与代码实例 一、引言 在人工智能领域&#xff0c;大型语言模型&#xff08;LLM&#xff09;的校准已成为一个备受关注的热点。基于人类反馈的强化…

实现spring配置bean类机制

大家好&#xff0c;这里是教授.F 流程说明&#xff1a; 我们自己实现spring配置bean类的机制&#xff0c;要先了解原本是怎么实现的。 原本的机制就是有一个bean配置文件&#xff0c;还有一个ApplicationContext spring文件。bean类写着要扫描的文件信息&#xff0c;spring文…

【Python学习】数据结构+正则表达式

个人主页&#xff1a;Yang-ai-cao 系列专栏&#xff1a;Python学习 博学而日参省乎己&#xff0c;知明而行无过矣 目录 个人主页&#xff1a;Yang-ai-cao 系列专栏&#xff1a;Python学习 博学而日参省乎己&#xff0c;知明而行无过矣 数据结构 一、列表&#xff08;Lis…

风电机组与风力发电机:它们是同一个东西吗?

风电机组与风力发电机&#xff1a;它们是同一个东西吗&#xff1f; 风电机组和风力发电机是两个常见的术语&#xff0c;尤其是在可再生能源领域。虽然这两个术语有时会被混淆使用&#xff0c;但它们实际上指的是不同的设备和系统。为了弄清楚它们之间的关系&#xff0c;本文将…

最近很火的 ChatTTS项目,有大佬做的 ChatTTS-UI 来啦

地址&#xff1a;https://github.com/jianchang512/ChatTTS-ui 》》》更多开源项目 ChatTTS webUI & API 一个简单的本地网页界面&#xff0c;直接在网页使用 ChatTTS 将文字合成为语音&#xff0c;支持中英文、数字混杂&#xff0c;并提供API接口。 Releases中可下载Wi…

光伏并网逆变器UL 1741:2021标准解析

光伏并网逆变器UL 1741:2021标准解析 不同国家的安规认证可以说是光伏逆变器走向国际市场的一张通行证&#xff0c;由于全球各国家的电网制式及并网政策的不同差异&#xff0c;这对逆变器测试顺利的通过安规测试认证 还是有一定的技术难度&#xff0c;也是中国光伏制造企业迫切…

最全解析:只有了解低代码平台的今生前世,才能明白低代码是什么

低代码&#xff08;Low-Code&#xff09;是一种软件开发方法&#xff0c;它使得开发人员能够通过图形界面、拖放组件和模型驱动的逻辑&#xff0c;快速地构建和部署应用程序&#xff0c;而无需编写大量的代码。尽管低代码这个术语是在2014年才提出&#xff0c;从诞生之初距今不…

【数据结构】复杂度的重要性—–决定程序运行的效率

【数据结构】复杂度的重要性—–决定程序运行的效率 前言 在我们写算法的时候&#xff0c;常常会需要考虑一个问题&#xff1a;这个算法好不好&#xff1f;而这个“好”实际上就取决于是算法的复杂度。 算法复杂度&#xff08;Algorithmic Complexity&#xff09;是指算法在编…

如何在电脑上安装两个flutter版本

Flutter Version Manager (FVM): Flutter的版本管理终极指南 Mac的flutter多版本管理之fvm的安装及使用 一、安装fvm brew tap leoafarias/fvm brew install fvm这个错误可能是由于网络问题或者 Git 配置问题导致的&#xff0c;我开启了梯子就可以了 brew install watchman…

Low Memory Killer in Android

目录 低内存管理&#xff08;Linux vs Android&#xff09; Linux内存回收 shrink_slab原理 shrink_zone原理 oom killer oom killer设计原则 OOM killer具体实现 android的lmk(Low Memory Killer) Android系统特点 oom killer在android中的不足 ​​​​​​​LMK概…

基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用

背景 随着互联网服务的广泛普及与技术应用的深入发展&#xff0c;日志数据作为记录系统活动、用户行为和业务操作的宝贵资源&#xff0c;其价值愈发凸显。然而&#xff0c;当前海量日志数据的产生速度已经远远超出了传统数据分析工具的处理能力&#xff0c;这不仅要求我们具备…

【机器学习】机器学习与AI大数据的融合:开启智能新时代

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; 机器学习与AI大数据的融合 &#x1f4d2;1. 引言&#x1f4d5;2. 机器学习与大数据&#x1f3a9;机器学习与大数据的特征&#x1f388;大数据如…

驱动开发之platform总线

1.前言 在前面的实验以及提出的各种问题中&#xff0c;为了提高移植性&#xff0c;降低模块耦合度&#xff0c;提让模块高内聚&#xff0c;分离device与driver是一个必然的趋势了。为了解决这个问题&#xff0c;心心念念的platform总线出来。 linux从2.6起就加入了一套新的驱…

python中利用cartopy库绘制SST图像

1. Cartopy简介 Cartopy 是一个开源的 Python 库&#xff0c;用于绘制地图和地理数据分析。它结合了 matplotlib 的绘图功能和 shapely、pyproj 等库的地理空间数据处理能力&#xff0c;为用户提供了在地图上可视化数据的强大工具。 以下是 Cartopy 的一些主要特点和功能&#…