flink中使用异步函数的几个注意事项

背景

在flink系统中,我们为了补充某个流事件成一个完整的记录,经常需要调用外部接口获取一些配置数据,流事件结合这些配置数据就可以组合成一条完整的记录,然而如果同步调用外部系统接口来实现,那么会有很大的性能瓶颈,这种情况下我们一般会使用异步函数提高性能,本文就来记录下使用异步函数的几个注意事项

异步函数的使用

首先看一下官方的例子:

/*** 实现 'AsyncFunction' 用于发送请求和设置回调。*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {/** 能够利用回调函数并发发送请求的数据库客户端 */private transient DatabaseClient client;@Overridepublic void open(Configuration parameters) throws Exception {client = new DatabaseClient(host, post, credentials);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {// 发送异步请求,接收 future 结果final Future<String> result = client.query(key);// 设置客户端完成请求后要执行的回调函数// 回调函数只是简单地把结果发给 futureCompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {return result.get();} catch (InterruptedException | ExecutionException e) {// 显示地处理异常。return null;}}}).thenAccept( (String dbResult) -> {resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));});}
}// 创建初始 DataStream
DataStream<String> stream = ...;// 应用异步 I/O 转换操作
DataStream<Tuple2<String, String>> resultStream =AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

注意事项如下:
1.在asyncinvoke方法中不能有阻塞的操作,比如这里仅仅是使用Future.thenAccept注册一个回调返回后的处理逻辑,而不会使用Future.get方法进行阻塞操作
2.AsyncDataStream.orderWait和AsyncDataStream.unorderWait方法都能正确的事件时间,也就是说即使是AsyncDataStream.unorderWait,它也能保证记录不会被之后的水位线超越
3.异步函数可以和检查点机制进行集成,也就是那些正在等待响应结果的记录会被写入检查点中,当故障恢复后,可以重新发送请求
4.如果服务端没有提供异步的客户端,我们可以用多线程进行模拟,只要多线程返回future对象即可
5.使用AsyncDataStream可以限制并发数以及如何进行超时处理等

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/115044.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2023高频前端面试题(含答案)

一、简单页面1、CSS选择器样式优先级2、CSS实现三列布局&#xff08;左右固定宽度&#xff0c;中间自适应&#xff09; &#xff08;1&#xff09;CSS浮动 第一个float:left&#xff0c;第二个float:right&#xff0c;第三个设置margin-left和margin-right &#xff08;2&#…

使用反射获取API数据并强制转为实例

例如获取API数据&#xff0c;以下只教思想&#xff0c;我下面也是代码也是大概写了写&#xff1a; 1.新建两个实例&#xff0c;一个是API提供方提供的API返回data&#xff0c;一个是本地用于接收的data数据&#xff0c;以下两个类虽然后者多几个字段&#xff0c;但是除多出那几…

实现图像处理和分析的关键技术

在计算机视觉中&#xff0c;我们可以利用摄像头捕捉到的图像来进行各种分析和处理。以下是一些常见的计算机视觉任务&#xff1a; 对象检测&#xff1a;识别图像中的特定对象并标注其位置。人脸识别&#xff1a;识别和验证人脸身份。姿态估计&#xff1a;估计人体的姿态和动作…

无纸化办公小程序数据交互、wxs的使用

前言 很多同志们再写小程序的过程中&#xff0c;不知道该怎么发起HTTP请求到后端&#xff0c;在Web环境中发起HTTPS请求是很常见的&#xff0c;但是微信小程序是腾讯内部的产品&#xff0c;不能直接打开一个外部的链接。例如&#xff0c;在微信小程序中不能直接打开www.taobao…

ASRPRO语音识别模块

ASRPRO语音识别模块 SOFT IIC 与PCA9685模块通信 pca9685 iic通信 地址位 ADDR<<1|0 左移一位 #define I2C_WRITE 0 #define I2C_READ 1 否则通信地址错误 asrpro 通过UART与电脑连接&#xff0c;可以进行简单的交互 将STM32作为接口扩展&#xff0c;通过SPI或I…

Seata学习

Seata Seata 是一款开源的分布式事务解决方案&#xff0c;致力于在微服务架构下提供高性能和简单易用的分布式事务服务。 官网地址&#xff1a;https://seata.io/zh-cn/index.html 为什么会产生分布式事务&#xff1f; 示例&#xff1a;用户下单后需要创建订单&#xff0c;同时…

DDR电源硬件设计要点

一、DDR电源简介 1. 电源 DDR的电源可以分为三类: a、主电源VDD和VDDQ,主电源的要求是VDDQ=VDD,VDDQ是给IO buffer供电的电源,VDD是给但是一般的使用中都是把VDDQ和VDD合成一个电源使用。 有的芯片还有VDDL,是给DLL供电的,也和VDD使用同一电源即可。电源设计时,需要考…

最短路相关笔记

Floyd Floyd 算法&#xff0c;是一种在图中求任意两点间最短路径的算法。 Floyd 算法适用于求解无负边权回路的图。 时间复杂度为 O ( n 3 ) O(n^3) O(n3)&#xff0c;空间复杂度 O ( n 2 ) O(n^2) O(n2)。 对于两点 ( i , j ) (i,j) (i,j) 之间的最短路径&#xff0c;有…

如何让ChatGPT生成图片?

目录 一、那么如何解决让ChatGPT具有画图能力的问题呢&#xff1f; 二、那ChatGPT为什么能生成图片呢&#xff1f; 我们都知道ChatGPT只是个纯文本的AI模型&#xff0c;不具备画图能力。它可以生成文本&#xff0c;但如果让他生成图片就会显示如下的声明&#xff1a; 但通过本…

图像信号处理板设计原理图:2-基于6U VPX的双TMS320C6678+Xilinx FPGA K7 XC7K420T的图像信号处理板

综合图像处理硬件平台包括图像信号处理板2块&#xff0c;视频处理板1块&#xff0c;主控板1块&#xff0c;电源板1块&#xff0c;VPX背板1块。 一、板卡概述 图像信号处理板包括2片TI 多核DSP处理器-TMS320C6678&#xff0c;1片Xilinx FPGA XC7K420T-1FFG1156&#xff0c;1片X…

strcpy函数

文章目录 strcpy函数描述函数使用总结目标空间为什么必须可变&#xff1f;模拟实现 strcpy函数描述 重点&#xff1a;including the terminating null character (and stopping at that point).意为拷贝的值包括停止字符 传参时第一个参数为要拷贝参数&#xff0c;第二个参数为…

游戏盾如何有效防护DDoS

从进入计算机时代以来&#xff0c;DDoS攻击一直是网络世界中的一大威胁&#xff0c;让无数服务陷入瘫痪。这种攻击的原理非常简单&#xff1a;攻击者使用大量的僵尸主机或蠕虫病毒&#xff0c;向目标服务器发送海量请求&#xff0c;迅速耗尽服务器的资源&#xff0c;使其无法继…

某全球领先的芯片供应商:优化数据跨网交换流程,提高安全管控能力

1、客户介绍 某全球领先的芯片供应商&#xff0c;成立于2005年&#xff0c;总部设于北京&#xff0c;在国内上海、深圳、合肥等地及国外多个国家和地区均设有分支机构和办事处&#xff0c;致力于为客户提供更优质、便捷的服务。 2、建设背景 该公司基于网络安全管理的需求&am…

Datawhale学习笔记AI +新能源:电动汽车充电站充电量预测2

在飞浆平台上成功运行出pandas-profiling啦~ 首先一键安装 pip install ydata_profiling然后演示&#xff0c;可以生成一个网页对数据有一个比较好的理解 import numpy as np import pandas as pd from ydata_profiling import ProfileReporttrain_power pd.read_csv(/home/…

【论文阅读】以及部署BEVFusion: A Simple and Robust LiDAR-Camera Fusion Framework

BEVFusion: A Simple and Robust LiDAR-Camera Fusion Framework BEVFusion&#xff1a;一个简单而强大的LiDAR-相机融合框架 NeurIPS 2022 多模态传感器融合意味着信息互补、稳定&#xff0c;是自动驾驶感知的重要一环&#xff0c;本文注重工业落地&#xff0c;实际应用 融…

Flutter的Don‘t use ‘BuildContext‘s across async gaps警告解决方法

文章目录 问题有问题的源码 问题原因问题分析Context的含义BuildContext的作用特殊情况 解决方法 问题 Flutter开发中遇到Don’t use BuildContext’s across async gaps警告 有问题的源码 if (await databaseHelper.isDataExist(task.title)) {showDialog(context: context,…

想要精通算法和SQL的成长之路 - 找到最终的安全状态

想要精通算法和SQL的成长之路 - 找到最终的安全状态 前言一. 找到最终的安全状态1.1 初始化邻接图1.2 构建反向邻接图1.3 BFS遍历1.4 完整代码 前言 想要精通算法和SQL的成长之路 - 系列导航 一. 找到最终的安全状态 原题链接 我们从题目中可以看出来&#xff1a; 出度为0的…

面向对象(基础)特征一:封装性(encapsulation)

文章目录 一、介绍&#xff08;1&#xff09;封装性&#xff08;2&#xff09;权限修饰符 二、案例&#xff08;1&#xff09;案例1 三、练习&#xff08;1&#xff09;练习1&#xff08;2&#xff09;练习2&#xff08;3&#xff09;练习3&#xff08;4&#xff09;练习4 面向…

html5 web前端 localStorage的存储,读取,删除

html5 web前端 localStorage的存储&#xff0c;读取&#xff0c;删除 我们通过以下方式将数据储存到localStorage中 window.localStorage.setItem(key,value) 但有时value为一个对象Object,以上面的方式写入,会出现读取的返回值为 {object Object}的情况,但这并不是我们想要的…

我做不到受每个人喜欢

我做不到受每个人喜欢 我想描述一下昨天发生争吵后我个人的观点&#xff0c;希望能够重新呈现出一种积极的态度。 首先&#xff0c;让我简要梳理一下事件的经过&#xff0c;当天我像往常一样去另一个宿舍找人聊天&#xff0c;可能因为说话声音有点大&#xff0c;坐在我后面的那…