背景
在flink中,我们需要对我们写的map转换函数,process处理函数进行单元测试,测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新,本文就记录几个测试过程中的要点
flink中测试函数
首先我们根据我们要测试的是数据流的类型选择不同的测试套件,如下所示:
- OneInputStreamOperatorTestHarness:适用于 DataStreams 数据流
- KeyedOneInputStreamOperatorTestHarness:适用于 KeyedStreams 分组后的数据流
- TwoInputStreamOperatorTestHarness:适用于两个数据流DataStream的 ConnectedStream
- KeyedTwoInputStreamOperatorTestHarness:适用于两个 KeyedStream 的 ConnectedStream
其次,根据是测试map函数还是process函数,我们选择不同的操作符,如果是map函数我们选择StreamFlatMap算子(可同时处理FlatMap和带状态的RichFlatmap函数)还是ProcessFunctionTestHarnesses.forXX算子
map函数测试代码:
@Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();// OneInputStreamOperatorTestHarness takes the input and output types as type parametersOneInputStreamOperatorTestHarness<String, String> testHarness =// KeyedOneInputStreamOperatorTestHarness takes three arguments:// Flink operator object, key selector and key typenew KeyedOneInputStreamOperatorTestHarness<String, String, String>(new StreamFlatMap<>(statefulFlatMap),x -> "1", Types.STRING);testHarness.open();// test first recordtestHarness.processElement("world", 10);ValueState<String> previousInput =statefulFlatMap.getRuntimeContext().getState(new ValueStateDescriptor<>("previousInput", Types.STRING));String stateValue = previousInput.value();Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10)),testHarness.extractOutputStreamRecords());Assert.assertEquals("world", stateValue);// test second recordtestHarness.processElement("parallel", 20);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10),new StreamRecord<>("hello parallel world", 20)), testHarness.extractOutputStreamRecords());Assert.assertEquals("parallel", previousInput.value());}public class StatefulFlatMap extends RichFlatMapFunction<String, String> {ValueState<String> previousInput;@Overridepublic void open(Configuration parameters) throws Exception {previousInput = getRuntimeContext().getState(new ValueStateDescriptor<String>("previousInput", Types.STRING));}@Overridepublic void flatMap(String in, Collector<String> collector) throws Exception {String out = "hello " + in;if(previousInput.value() != null){out = out + " " + previousInput.value();}previousInput.update(in);collector.collect(out);}
}
process处理函数代码:
@Testpublic void testProcessElement() throws Exception {MyProcessFunction myProcessFunction = new MyProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x -> "1", Types.STRING);// Function time is initialized to 0testHarness.open();testHarness.processElement("world", 10);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10)),testHarness.extractOutputStreamRecords());}@Testpublic void testOnTimer() throws Exception {MyProcessFunction myProcessFunction = new MyProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x -> "1", Types.STRING);testHarness.open();testHarness.processElement("world", 10);Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// Function time is set to 50testHarness.setProcessingTime(50);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10),new StreamRecord<>("Timer triggered at timestamp 50")),testHarness.extractOutputStreamRecords());}public class MyProcessFunction extends KeyedProcessFunction<String, String, String> {@Overridepublic void processElement(String in, Context context, Collector<String> collector) throws Exception {context.timerService().registerProcessingTimeTimer(50);String out = "hello " + in;collector.collect(out);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {out.collect(String.format("Timer triggered at timestamp %d", timestamp));}}
此外附加官方的map函数的测试代码:
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.streaming.api.operators;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;import org.junit.Assert;
import org.junit.Test;import java.util.concurrent.ConcurrentLinkedQueue;/*** Tests for {@link StreamMap}. These test that:** <ul>* <li>RichFunction methods are called correctly* <li>Timestamps of processed elements match the input timestamp* <li>Watermarks are correctly forwarded* </ul>*/
public class StreamFlatMapTest {private static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {private static final long serialVersionUID = 1L;@Overridepublic void flatMap(Integer value, Collector<Integer> out) throws Exception {if (value % 2 == 0) {out.collect(value);out.collect(value * value);}}}@Testpublic void testFlatMap() throws Exception {StreamFlatMap<Integer, Integer> operator =new StreamFlatMap<Integer, Integer>(new MyFlatMap());OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);long initialTime = 0L;ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();testHarness.open();testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));testHarness.processWatermark(new Watermark(initialTime + 2));testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));expectedOutput.add(new Watermark(initialTime + 2));expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());}@Testpublic void testOpenClose() throws Exception {StreamFlatMap<String, String> operator =new StreamFlatMap<String, String>(new TestOpenCloseFlatMapFunction());OneInputStreamOperatorTestHarness<String, String> testHarness =new OneInputStreamOperatorTestHarness<String, String>(operator);long initialTime = 0L;testHarness.open();testHarness.processElement(new StreamRecord<String>("Hello", initialTime));testHarness.close();Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFlatMapFunction.closeCalled);Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);}// This must only be used in one test, otherwise the static fields will be changed// by several tests concurrentlyprivate static class TestOpenCloseFlatMapFunction extends RichFlatMapFunction<String, String> {private static final long serialVersionUID = 1L;public static boolean openCalled = false;public static boolean closeCalled = false;@Overridepublic void open(OpenContext openContext) throws Exception {super.open(openContext);if (closeCalled) {Assert.fail("Close called before open.");}openCalled = true;}@Overridepublic void close() throws Exception {super.close();if (!openCalled) {Assert.fail("Open was not called before close.");}closeCalled = true;}@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {if (!openCalled) {Assert.fail("Open was not called before run.");}out.collect(value);}}
}
包含同时测试FlatMap和RichFlatMap函数,但是其中没有操作状态,我前面的例子包含了RichFlatMap状态的测试
参考文献:
https://flink.apache.org/2020/02/03/a-guide-for-unit-testing-in-apache-flink/