flink测试map转换函数和process函数

背景

在flink中,我们需要对我们写的map转换函数,process处理函数进行单元测试,测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新,本文就记录几个测试过程中的要点

flink中测试函数

首先我们根据我们要测试的是数据流的类型选择不同的测试套件,如下所示:

  1. OneInputStreamOperatorTestHarness:适用于 DataStreams 数据流
  2. KeyedOneInputStreamOperatorTestHarness:适用于 KeyedStreams 分组后的数据流
  3. TwoInputStreamOperatorTestHarness:适用于两个数据流DataStream的 ConnectedStream
  4. KeyedTwoInputStreamOperatorTestHarness:适用于两个 KeyedStream 的 ConnectedStream

其次,根据是测试map函数还是process函数,我们选择不同的操作符,如果是map函数我们选择StreamFlatMap算子(可同时处理FlatMap和带状态的RichFlatmap函数)还是ProcessFunctionTestHarnesses.forXX算子

map函数测试代码:

@Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();// OneInputStreamOperatorTestHarness takes the input and output types as type parametersOneInputStreamOperatorTestHarness<String, String> testHarness =// KeyedOneInputStreamOperatorTestHarness takes three arguments:// Flink operator object, key selector and key typenew KeyedOneInputStreamOperatorTestHarness<String, String, String>(new StreamFlatMap<>(statefulFlatMap),x -> "1", Types.STRING);testHarness.open();// test first recordtestHarness.processElement("world", 10);ValueState<String> previousInput =statefulFlatMap.getRuntimeContext().getState(new ValueStateDescriptor<>("previousInput", Types.STRING));String stateValue = previousInput.value();Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10)),testHarness.extractOutputStreamRecords());Assert.assertEquals("world", stateValue);// test second recordtestHarness.processElement("parallel", 20);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10),new StreamRecord<>("hello parallel world", 20)), testHarness.extractOutputStreamRecords());Assert.assertEquals("parallel", previousInput.value());}public class StatefulFlatMap extends RichFlatMapFunction<String, String> {ValueState<String> previousInput;@Overridepublic void open(Configuration parameters) throws Exception {previousInput = getRuntimeContext().getState(new ValueStateDescriptor<String>("previousInput", Types.STRING));}@Overridepublic void flatMap(String in, Collector<String> collector) throws Exception {String out = "hello " + in;if(previousInput.value() != null){out = out + " " + previousInput.value();}previousInput.update(in);collector.collect(out);}
}

process处理函数代码:

@Testpublic void testProcessElement() throws Exception {MyProcessFunction myProcessFunction = new MyProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x -> "1", Types.STRING);// Function time is initialized to 0testHarness.open();testHarness.processElement("world", 10);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10)),testHarness.extractOutputStreamRecords());}@Testpublic void testOnTimer() throws Exception {MyProcessFunction myProcessFunction = new MyProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x -> "1", Types.STRING);testHarness.open();testHarness.processElement("world", 10);Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// Function time is set to 50testHarness.setProcessingTime(50);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10),new StreamRecord<>("Timer triggered at timestamp 50")),testHarness.extractOutputStreamRecords());}public class MyProcessFunction extends KeyedProcessFunction<String, String, String> {@Overridepublic void processElement(String in, Context context, Collector<String> collector) throws Exception {context.timerService().registerProcessingTimeTimer(50);String out = "hello " + in;collector.collect(out);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {out.collect(String.format("Timer triggered at timestamp %d", timestamp));}}

此外附加官方的map函数的测试代码:

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.streaming.api.operators;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;import org.junit.Assert;
import org.junit.Test;import java.util.concurrent.ConcurrentLinkedQueue;/*** Tests for {@link StreamMap}. These test that:** <ul>*   <li>RichFunction methods are called correctly*   <li>Timestamps of processed elements match the input timestamp*   <li>Watermarks are correctly forwarded* </ul>*/
public class StreamFlatMapTest {private static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {private static final long serialVersionUID = 1L;@Overridepublic void flatMap(Integer value, Collector<Integer> out) throws Exception {if (value % 2 == 0) {out.collect(value);out.collect(value * value);}}}@Testpublic void testFlatMap() throws Exception {StreamFlatMap<Integer, Integer> operator =new StreamFlatMap<Integer, Integer>(new MyFlatMap());OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);long initialTime = 0L;ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();testHarness.open();testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));testHarness.processWatermark(new Watermark(initialTime + 2));testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));expectedOutput.add(new Watermark(initialTime + 2));expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());}@Testpublic void testOpenClose() throws Exception {StreamFlatMap<String, String> operator =new StreamFlatMap<String, String>(new TestOpenCloseFlatMapFunction());OneInputStreamOperatorTestHarness<String, String> testHarness =new OneInputStreamOperatorTestHarness<String, String>(operator);long initialTime = 0L;testHarness.open();testHarness.processElement(new StreamRecord<String>("Hello", initialTime));testHarness.close();Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFlatMapFunction.closeCalled);Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);}// This must only be used in one test, otherwise the static fields will be changed// by several tests concurrentlyprivate static class TestOpenCloseFlatMapFunction extends RichFlatMapFunction<String, String> {private static final long serialVersionUID = 1L;public static boolean openCalled = false;public static boolean closeCalled = false;@Overridepublic void open(OpenContext openContext) throws Exception {super.open(openContext);if (closeCalled) {Assert.fail("Close called before open.");}openCalled = true;}@Overridepublic void close() throws Exception {super.close();if (!openCalled) {Assert.fail("Open was not called before close.");}closeCalled = true;}@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {if (!openCalled) {Assert.fail("Open was not called before run.");}out.collect(value);}}
}

包含同时测试FlatMap和RichFlatMap函数,但是其中没有操作状态,我前面的例子包含了RichFlatMap状态的测试

参考文献:
https://flink.apache.org/2020/02/03/a-guide-for-unit-testing-in-apache-flink/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/137279.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RSA加密的使用(前后端)

公钥&#xff08;publicKey&#xff09;加密、私钥&#xff08;privateKey&#xff09;解密。不能逆向&#xff0c;私钥&#xff08;privateKey&#xff09;加密、公钥&#xff08;publicKey&#xff09;解密。说白了就是前后端都需要用公钥&#xff08;publicKey&#xff09;进…

【JavaEESpring】认识Spring

认识Spring 1. 什么是框架2. SpringBoot 介绍2.1 Spring 的介绍2.2 SpringBoot 1. 什么是框架 框架(Framework) &#xff0c;意思是框架、机制、准则。通俗的来讲: 框架是实现某种功能的半成品, 他提供了⼀些常⽤的⼯具类, 我们在框架的基础上, 可以更加⾼效的进⾏开发 后端框…

Redis6的IO多线程分析

性能测试 机器配置 C Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 14 On-line CPU(s) list: 0-13 Mem: 62G性能 配置推荐 官方表示&#xff0c;当使用redis时有性能瓶…

vue3接口、数据懒加载,回滚不重复加载

目标&#xff1a;实现当组件进入可视区域在加载数据或者发送请求。 背景&#xff1a;父组件为vxe-table构成的组件、子组件为table的某一列&#xff0c;这一列的数据通过接口返回&#xff0c;有多少条表格数据就会请求多少次接口&#xff0c;为了提升性能&#xff0c;所以采用…

《开箱元宇宙》:认识香港麦当劳通过 The Sandbox McNuggets Land 的 Web3 成功经验

McNuggets Land 是 The Sandbox 于 2023 年发布的最受欢迎的体验之一。在本期的《开箱元宇宙》系列中&#xff0c;我们采访了香港麦当劳数位顾客体验暨合作伙伴资深总监 Kai Tsang&#xff0c;来了解这一成功案例背后的策略。 在不断发展的市场营销和品牌推广领域&#xff0c;不…

Visual Studio 2022 + OpenCV 4.5.2 安装与配置教程

目录 OpenCV的下载与配置Visual Studio 2022的配置新建工程新建文件新建项目属性表环境配置测试先写一个输出将OpenCV的动态链接库添加到项目的 x64 | Debug下测试配置效果 Other OpenCV的下载与配置 参考这个OpenCV的下载与环境变量的配置&#xff1a; Windows10CLionOpenCV4…

网络原理---拿捏HTTP协议:请求和响应

文章目录 认识请求首行URLURL的格式URL的encode和decode 版本号方法GET方法POST方法GET VS POST 请求头&#xff1a;headerHostContent-Length 和 Content-TypeUser-Agent&#xff08;UA&#xff09;RefererCookie 空行正文&#xff1a;body如何构造HTTP请求&#xff1f;浏览器…

ARMday04(开发版简介、LED点灯)

开发版简介 开发板为stm32MP157AAA,附加一个拓展版 硬件相关基础知识 PCB PCB&#xff08; Printed Circuit Board&#xff09;&#xff0c;中文名称为印制电路板&#xff0c;又称印刷线路板&#xff0c;是重要的电子部件&#xff0c;是电子元器件的支撑体&#xff0c;是电子…

Linux生成随机密码和根据密码批量生成用户

cat /dev/urandom|tr -dc [:alnum:]|head -c20 生成20位数字字母的随机密码。 /dev/urandom生成随机数&#xff0c;tr -dc [:alnum:] 保留所有数字和字母&#xff0c;head -c20保留前20位。 使用原生的Linux命令生成可以说是极度安全的&#xff0c;也适用于批量用户生成的情况…

Django中如何让DRF的接口针对前后台返回不同的字段

在Django中&#xff0c;使用Django Rest Framework&#xff08;DRF&#xff09;时&#xff0c;可以通过序列化器&#xff08;Serializer&#xff09;和视图&#xff08;View&#xff09;的组合来实现前后台返回不同的字段。这通常是因为前后台对数据的需求不同&#xff0c;或者…

AlphaControls控件TsRadioGroup的使用

通常使用AlphaControls控件中的TsRadioGroup时&#xff0c;往往使用默认值&#xff0c;会造成TsRadioGroup标题被TsRadioGroup的ITEMs占用&#xff0c;严重影响美观&#xff1a; 解决方案&#xff0c;通过对TsRadioGroup的ContentVOffset属性&#xff0c;设置为10。即可立即改善…

处理uniapp打包后有广告的问题

1、登录平台&#xff08;开发者中心&#xff09; 2、 3、 4、 5、

3线硬件SPI+DMA驱动 HX8347 TFT屏-快速显示文字

本文实现DMA快速显示文字 汉字点阵通常是16*16点阵&#xff0c;那么用DMA一次性显示汉字&#xff0c;应该至少申请480*16个字节的空间&#xff0c;用于显示一行文字&#xff0c;其中480是屏幕一行用DMA驱动所需内存。 一、 源码 HX8347.h #ifndef USER_HX8347_H_ #define USE…

回归预测 | Matlab实现PCA-PLS主成分降维结合偏最小二乘回归预测

回归预测 | Matlab实现PCA-PLS主成分降维结合偏最小二乘回归预测 目录 回归预测 | Matlab实现PCA-PLS主成分降维结合偏最小二乘回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 Matlab实现PCA-PLS主成分降维结合偏小二乘回归预测&#xff08;完整源码和数据) 1.输…

2023年9月少儿编程 中国电子学会图形化编程等级考试Scratch编程二级真题解析(选择题)

2023年9月scratch编程等级考试二级真题 选择题(共25题,每题2分,共50分) 1、点击绿旗,运行程序后,舞台上的图形是 A、画笔粗细为4的三角形 B、画笔粗细为5的六边形 C、画笔粗细为4的六角形 D、画笔粗细为5的三角形 答案:D 考点分析:考查积木综合使用,重点考查画笔…

伪造referer [极客大挑战 2019]Http1

打开题目 没有发现什么&#xff0c;我们查看源代码 在这里我们发现了提示 访问一下页面得到 提示说不能来自于https://Sycsecret.buuoj.cn&#xff0c;我们尝试访问一下这个url 发现访问不了 我们bp抓包一下 伪造个referer头 referer:https://Sycsecret.buuoj.cn 发包过去…

【js逆向实战】某sakura动漫视频逆向

写在前面 再写一个逆向实战&#xff0c;后面写点爬虫程序来实现一下。 网站简介与逆向目标 经典的一个视频网站&#xff0c;大多数视频网站走的是M3U8协议&#xff0c;就是一个分段传输&#xff0c;其实这里就有两个分支。 通过传统的m3u8协议&#xff0c;我们可以直接进行分…

如何申请QQ邮箱的SMTP密钥简洁版

QQ 邮箱的 SMTP 密钥通常称为"SMTP 授权码"&#xff0c;你可以按照以下步骤找到它&#xff1a; 1.登录 QQ 邮箱&#xff1a;打开 QQ 邮箱登录页面&#xff0c;并使用你的 QQ 账号和密码登录。 2.进入设置页面&#xff1a;在 QQ 邮箱页面中&#xff0c;点击顶部的&q…

MySQL篇之mysql主从集群搭建

一、MySQL集群架构的介绍 我们在使用MySQL数据库的时候&#xff0c;只是一个单机的数据库服务。在实际的生产环境中&#xff0c;数据量可能会非常庞大&#xff0c;这样单机服务的MySQL在使用的时候&#xff0c;性能会受到影响。并且单机的数据安全想也会受到影响。因此在生产黄…

制作一个模板三

您已经看到了Jinja2在呈现过程中如何用实际值替换占位符&#xff0c;但这只是Jinja2在模板文件中支持的众多强大操作之一。例如&#xff0c;模板还支持在{%…%}块。下一个版本的index.html模板增加了一个条件语句: app/templates/index.html: <!doctype html> <htm…