53、Flink 测试工具测试用户自定义函数详解

1.测试用户自定义函数
a)单元测试无状态、无时间限制的 UDF

示例:无状态的 MapFunction

public class IncrementMapFunction implements MapFunction<Long, Long> {@Overridepublic Long map(Long record) throws Exception {return record + 1;}
}

通过传递合适地参数并验证输出进行测试。

public class IncrementMapFunctionTest {@Testpublic void testIncrement() throws Exception {// instantiate your functionIncrementMapFunction incrementer = new IncrementMapFunction();// call the methods that you have implementedassertEquals(3L, incrementer.map(2L));}
}

对于使用 org.apache.flink.util.Collector 的用户自定义函数(例如FlatMapFunction 或者 ProcessFunction),可以通过提供模拟对象而不是真正的 collector 来测试。

具有与 IncrementMapFunction 相同功能的 FlatMapFunction 可以按照以下方式进行测试。

public class IncrementFlatMapFunctionTest {@Testpublic void testIncrement() throws Exception {// instantiate your functionIncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();Collector<Integer> collector = mock(Collector.class);// call the methods that you have implementedincrementer.flatMap(2L, collector);//verify collector was called with the right outputMockito.verify(collector, times(1)).collect(3L);}
}
b)对有状态或及时 UDF 和自定义算子进行单元测试

概述

对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:

  • OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)
  • KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)
  • TwoInputStreamOperatorTestHarness (适用于两个 DataStreamConnectedStreams 算子)
  • KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)

DataStream API 测试依赖项

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils</artifactId><version>1.19.0</version><scope>test</scope>
</dependency>

该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。

Table API 测试依赖项

如果想在 IDE 中本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,还要添加以下依赖项。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-test-utils</artifactId><version>1.19.0</version><scope>test</scope>
</dependency>

这将自动引入查询计划器和运行时,分别用于计划和执行查询。

可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。

public class StatefulFlatMapTest {private OneInputStreamOperatorTestHarness<Long, Long> testHarness;private StatefulFlatMap statefulFlatMapFunction;@Beforepublic void setupTestHarness() throws Exception {//instantiate user-defined functionstatefulFlatMapFunction = new StatefulFlatMapFunction();// wrap user defined function into a the corresponding operatortestHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));// optionally configured the execution environmenttestHarness.getExecutionConfig().setAutoWatermarkInterval(50);// open the test harness (will also call open() on RichFunctions)testHarness.open();}@Testpublic void testingStatefulFlatMapFunction() throws Exception {//push (timestamped) elements into the operator (and hence user defined function)testHarness.processElement(2L, 100L);//trigger event time timers by advancing the event time of the operator with a watermarktestHarness.processWatermark(100L);//trigger processing time timers by advancing the processing time of the operator directlytestHarness.setProcessingTime(100L);//retrieve list of emitted records for assertionsassertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));//retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)//assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))}
}

KeyedOneInputStreamOperatorTestHarnessKeyedTwoInputStreamOperatorTestHarness 可以通过为键的类另外提供一个包含 TypeInformationKeySelector 来实例化。

public class StatefulFlatMapFunctionTest {private OneInputStreamOperatorTestHarness<String, Long, Long> testHarness;private StatefulFlatMap statefulFlatMapFunction;@Beforepublic void setupTestHarness() throws Exception {//instantiate user-defined functionstatefulFlatMapFunction = new StatefulFlatMapFunction();// wrap user defined function into a the corresponding operatortestHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction), new MyStringKeySelector(), Types.STRING);// open the test harness (will also call open() on RichFunctions)testHarness.open();}//tests}

在 Flink 代码库里可以找到更多使用这些测试工具的示例,例如:

  • org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest 是测试算子和用户自定义函数(取决于处理时间和事件时间)的一个很好的例子。

注意 AbstractStreamOperatorTestHarness 及其派生类目前不属于公共 API,可以进行更改。

单元测试 Process Function

除了之前可以直接用于测试 ProcessFunction 的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试工具工厂类,可以简化测试工具的实例化。示例如下:

public static class PassThroughProcessFunction extends ProcessFunction<Integer, Integer> {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {out.collect(value);}
}

通过传递合适的参数并验证输出,对使用 ProcessFunctionTestHarnesses 是很容易进行单元测试并验证输出。

public class PassThroughProcessFunctionTest {@Testpublic void testPassThrough() throws Exception {//instantiate user-defined functionPassThroughProcessFunction processFunction = new PassThroughProcessFunction();// wrap user defined function into a the corresponding operatorOneInputStreamOperatorTestHarness<Integer, Integer> harness = ProcessFunctionTestHarnesses.forProcessFunction(processFunction);//push (timestamped) elements into the operator (and hence user defined function)harness.processElement(1, 10);//retrieve list of emitted records for assertionsassertEquals(harness.extractOutputValues(), Collections.singletonList(1));}
}

有关如何使用 ProcessFunctionTestHarnesses 来测试 ProcessFunction 不同风格的更多示例,, 例如 KeyedProcessFunctionKeyedCoProcessFunctionBroadcastProcessFunction等,请自行查看 ProcessFunctionTestHarnessesTest

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/38827.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu20.04 安装 cudatookit 12.2 + cudnn 安装

最简约的部署Ubuntu20.04深度学习环境的教程 1. 安装Ubuntu20.04 系统 B站详细的安装教程 简约安装版 2. 安装Nvidia显卡驱动 我参考了各种资料&#xff0c;重装系统&#xff0c;完美解决开机显示器黑屏无法进入桌面的情况 黑屏问题主要是由linux内核更新导致&#xff0c;…

煤矿ar远程协作平台系统提升了操作的安全性和效率

工业AR远程专家协助系统为企业量身打造大型设施的虚拟布局方案。借助先进的AR增强现实技术&#xff0c;企业能够在虚拟环境中精准模拟并购买适配设备&#xff0c;确保设施的顺畅运行。同时&#xff0c;工业AR远程专家协助系统能提供的协作功能让团队成员能够实时共享虚拟布局&a…

昇思25天学习打卡营第8天|MindSpore-SSD目标检测

SSD目标检测介绍 SSD,全称Single Shot MultiBox Detector,是Wei Liu在ECCV 2016上提出的一种目标检测算法。使用Nvidia Titan X在VOC 2007测试集上,SSD对于输入尺寸300x300的网络,达到74.3%mAP(mean Average Precision)以及59FPS;对于512x512的网络,达到了76.9%mAP ,超…

Mouse Albumin ELISA Kit小鼠白蛋白ELISA试剂盒

白蛋白存在于所有哺乳动物和许多低等脊椎动物的血管内和血管外&#xff0c;是一种由肝脏合成的约67kDa的蛋白质。正常情况下&#xff0c;只有非常微量的白蛋白能逃过肾小球的重吸收&#xff0c;并被排泄到尿液中。ICL的Mouse Albumin ELISA Kit是一种高灵敏度的双抗体夹心法ELI…

基于“香港世界”的SLAM技术介绍

在视觉感知技术中&#xff0c;理解和描述复杂的三维室外场景至关重要&#xff0c;尤其是自动驾驶技术的发展要求对陌生环境具有更强的适应能力和鲁棒性。传统上&#xff0c;使用“曼哈顿世界”和“亚特兰大世界”模型来描述具有垂直和水平结构的城市场景。 当遇到像香港这样地形…

C++:类与面向对象static和this关键字其他关键字

类与面向对象 struct和class (1)struct是C中用户自定义类型&#xff0c;主要功能是对功能相关数据的封装 (2)struct不能直接封装函数&#xff0c;但可以通过封装函数指针来间接封装函数 (3)struct就是class的初级阶段&#xff0c;class在struct基础上做了很多扩展&#xff0c;便…

cartographer代码学习-扫描匹配(暴力搜索)

在学习栅格地图的时候&#xff0c;我们知道在栅格更新前会先进行扫描匹配获取当前机器人最有可能所在的位姿&#xff1a; // local map frame <- gravity-aligned frame// 扫描匹配, 进行点云与submap的匹配std::unique_ptr<transform::Rigid2d> pose_estimate_2d Sca…

[4]python+selenium - UI自动框架之封装基类BasePage页面

这部分内容是页面上的一些基本操作 from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException, \StaleElementReferenceException from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support im…

某安全公司DDoS攻击防御2024年6月报告

引言&#xff1a; 在2024年6月&#xff0c;网络空间的安全挑战汹涌澎湃。分布式拒绝服务&#xff08;DDoS&#xff09;攻击频发&#xff0c;针对云服务、金融科技及在线教育平台的精密打击凸显出当前网络威胁环境的严峻性。 某安全公司作为网络安全防护的中坚力量&#xff0c…

mybatis之动态标签

有些时候&#xff0c;sql语句where条件中&#xff0c;需要一些安全判断&#xff0c;例如按性别检索&#xff0c;如果传入的参数是空的&#xff0c;此时查询出的结果很可能是空的&#xff0c;也许我们需要参数为空时&#xff0c;是查出全部的信息。这是我们可以使用动态sql&…

代码随想录算法训练营Day55|42.接雨水、84.柱状图中最大的矩形

接雨水 42. 接雨水 - 力扣&#xff08;LeetCode&#xff09; 暴力解法 对计算接到的雨水&#xff0c;有两种方式&#xff0c;一是按照行来计算。 另一种是按列计算 按列计算容易不乱。基本思路如下&#xff1a; 对每列i进行循环&#xff0c;在循环中&#xff0c;找到该列左…

HarmonyOS Next开发学习手册——视频播放 (Video)

Video组件用于播放视频文件并控制其播放状态&#xff0c;常用于为短视频和应用内部视频的列表页面。当视频完整出现时会自动播放&#xff0c;用户点击视频区域则会暂停播放&#xff0c;同时显示播放进度条&#xff0c;通过拖动播放进度条指定视频播放到具体位置。具体用法请参考…

【JVM-02】垃圾收集(回收)算法

【JVM-02】垃圾收集/回收算法 1. 分代收集算法2. 标记-清除算法3. 标记-复制算法4. 标记-整理算法 1. 分代收集算法 分代收集(回收)算法根据对象存活周期的不同将内存分为几块。一般将java堆分为新生代和老年代&#xff0c;这样我们就可以根据各个年代的特点选择合适的垃圾收集…

Kotlin中object关键字的作用

1、对象声明&#xff0c;通过这种方式创建一个单例对象。 object MySingleton{ fun function{ //方法代码块 } } 调用方式&#xff1a;MySingleton.function(),类似像Java的静态方法 2、在类内部声明伴生对象 class OutClass { companion object{ val value 1 fun method(…

【算法题】爬楼梯 (经典递归)

题 爬楼梯&#xff1a; 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; 输入&#xff1a; n 2 输出&#xff1a; 2 解释&#xff1a; 有两种方法可以爬到楼顶。 1 阶 1 阶 2 阶 示例 2&#x…

寒武纪实现高维向量的softmax进阶优化和库函数对比

关于寒武纪编程可以参考本人之前的文章添加链接描述,添加链接描述,添加链接描述 实验证明,axis=0和axis=-1的时候,手写softmax速度可以和库函数媲美,甚至于更甚一筹。 src/softmax.mlu #include <bang.h> #include

Nik Collection by DxO:摄影师的创意利器与调色宝典

在数码摄影的世界里&#xff0c;后期处理是摄影师们展现创意、调整细节、提升作品质量的重要步骤。而Nik Collection by DxO作为一款由DxO公司开发的强大照片编辑插件套件&#xff0c;为摄影师们提供了一套全面的、功能丰富的工具集&#xff0c;让他们的创意得以充分发挥。 Ni…

遇到多语言跨境电商系统源码问题?这里有解决方案!

从手机到电脑&#xff0c;从线下到线上&#xff0c;如今&#xff0c;跨境电商正在打破地域界限&#xff0c;成为全球贸易的新引擎。在这个全球化的背景下&#xff0c;跨境电商平台的运营也面临着一系列的挑战&#xff0c;其中之一就是多语言问题。如果你遇到了多语言跨境电商系…

2065. 最大化一张图中的路径价值 Hard

给你一张 无向 图&#xff0c;图中有 n 个节点&#xff0c;节点编号从 0 到 n - 1 &#xff08;都包括&#xff09;。同时给你一个下标从 0 开始的整数数组 values &#xff0c;其中 values[i] 是第 i 个节点的 价值 。同时给你一个下标从 0 开始的二维整数数组 edges &#xf…

小抄 20240629

1 很多人当下焦虑的原因&#xff0c;是短期内无法实现别人长期才能做到的目标&#xff0c;总想几天就能追赶别人几年的努力&#xff0c;只看到了别人的结果&#xff0c;没有思考过别人的过程。 2 把时间线拉长看&#xff0c;人和人都只是阶段性在一起&#xff0c;只要人还在成…