1.测试用户自定义函数
a)单元测试无状态、无时间限制的 UDF
示例:无状态的 MapFunction
。
public class IncrementMapFunction implements MapFunction<Long, Long> {@Overridepublic Long map(Long record) throws Exception {return record + 1;}
}
通过传递合适地参数并验证输出进行测试。
public class IncrementMapFunctionTest {@Testpublic void testIncrement() throws Exception {// instantiate your functionIncrementMapFunction incrementer = new IncrementMapFunction();// call the methods that you have implementedassertEquals(3L, incrementer.map(2L));}
}
对于使用 org.apache.flink.util.Collector
的用户自定义函数(例如FlatMapFunction
或者 ProcessFunction
),可以通过提供模拟对象而不是真正的 collector 来测试。
具有与 IncrementMapFunction
相同功能的 FlatMapFunction
可以按照以下方式进行测试。
public class IncrementFlatMapFunctionTest {@Testpublic void testIncrement() throws Exception {// instantiate your functionIncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();Collector<Integer> collector = mock(Collector.class);// call the methods that you have implementedincrementer.flatMap(2L, collector);//verify collector was called with the right outputMockito.verify(collector, times(1)).collect(3L);}
}
b)对有状态或及时 UDF 和自定义算子进行单元测试
概述
对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:
OneInputStreamOperatorTestHarness
(适用于DataStream
上的算子)KeyedOneInputStreamOperatorTestHarness
(适用于KeyedStream
上的算子)TwoInputStreamOperatorTestHarness
(适用于两个DataStream
的ConnectedStreams
算子)KeyedTwoInputStreamOperatorTestHarness
(适用于两个KeyedStream
上的ConnectedStreams
算子)
DataStream API 测试依赖项
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils</artifactId><version>1.19.0</version><scope>test</scope>
</dependency>
该模块提供了 MiniCluster
(一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。
Table API 测试依赖项
如果想在 IDE 中本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils
之外,还要添加以下依赖项。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-test-utils</artifactId><version>1.19.0</version><scope>test</scope>
</dependency>
这将自动引入查询计划器和运行时,分别用于计划和执行查询。
可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。
public class StatefulFlatMapTest {private OneInputStreamOperatorTestHarness<Long, Long> testHarness;private StatefulFlatMap statefulFlatMapFunction;@Beforepublic void setupTestHarness() throws Exception {//instantiate user-defined functionstatefulFlatMapFunction = new StatefulFlatMapFunction();// wrap user defined function into a the corresponding operatortestHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));// optionally configured the execution environmenttestHarness.getExecutionConfig().setAutoWatermarkInterval(50);// open the test harness (will also call open() on RichFunctions)testHarness.open();}@Testpublic void testingStatefulFlatMapFunction() throws Exception {//push (timestamped) elements into the operator (and hence user defined function)testHarness.processElement(2L, 100L);//trigger event time timers by advancing the event time of the operator with a watermarktestHarness.processWatermark(100L);//trigger processing time timers by advancing the processing time of the operator directlytestHarness.setProcessingTime(100L);//retrieve list of emitted records for assertionsassertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));//retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)//assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))}
}
KeyedOneInputStreamOperatorTestHarness
和 KeyedTwoInputStreamOperatorTestHarness
可以通过为键的类另外提供一个包含 TypeInformation
的 KeySelector
来实例化。
public class StatefulFlatMapFunctionTest {private OneInputStreamOperatorTestHarness<String, Long, Long> testHarness;private StatefulFlatMap statefulFlatMapFunction;@Beforepublic void setupTestHarness() throws Exception {//instantiate user-defined functionstatefulFlatMapFunction = new StatefulFlatMapFunction();// wrap user defined function into a the corresponding operatortestHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction), new MyStringKeySelector(), Types.STRING);// open the test harness (will also call open() on RichFunctions)testHarness.open();}//tests}
在 Flink 代码库里可以找到更多使用这些测试工具的示例,例如:
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest
是测试算子和用户自定义函数(取决于处理时间和事件时间)的一个很好的例子。
注意 AbstractStreamOperatorTestHarness
及其派生类目前不属于公共 API,可以进行更改。
单元测试 Process Function
除了之前可以直接用于测试 ProcessFunction
的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses
的测试工具工厂类,可以简化测试工具的实例化。示例如下:
public static class PassThroughProcessFunction extends ProcessFunction<Integer, Integer> {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {out.collect(value);}
}
通过传递合适的参数并验证输出,对使用 ProcessFunctionTestHarnesses
是很容易进行单元测试并验证输出。
public class PassThroughProcessFunctionTest {@Testpublic void testPassThrough() throws Exception {//instantiate user-defined functionPassThroughProcessFunction processFunction = new PassThroughProcessFunction();// wrap user defined function into a the corresponding operatorOneInputStreamOperatorTestHarness<Integer, Integer> harness = ProcessFunctionTestHarnesses.forProcessFunction(processFunction);//push (timestamped) elements into the operator (and hence user defined function)harness.processElement(1, 10);//retrieve list of emitted records for assertionsassertEquals(harness.extractOutputValues(), Collections.singletonList(1));}
}
有关如何使用 ProcessFunctionTestHarnesses
来测试 ProcessFunction
不同风格的更多示例,, 例如 KeyedProcessFunction
,KeyedCoProcessFunction
,BroadcastProcessFunction
等,请自行查看 ProcessFunctionTestHarnessesTest
。