54、Flink 测试工具测试 Flink 作业详解

测试 Flink 作业
a)JUnit 规则 MiniClusterWithClientResource

Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 Junit 规则,用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource.

要使用 MiniClusterWithClientResource,需要添加一个额外的依赖项(测试范围)。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils</artifactId><version>1.19.0</version>    <scope>test</scope>
</dependency>

示例:MapFunction

public class IncrementMapFunction implements MapFunction<Long, Long> {@Overridepublic Long map(Long record) throws Exception {return record + 1;}
}

在本地 Flink 集群使用这个 MapFunction 的简单 pipeline,如下所示。

public class ExampleIntegrationTest {@ClassRulepublic static MiniClusterWithClientResource flinkCluster =new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(2).setNumberTaskManagers(1).build());@Testpublic void testIncrementPipeline() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements(1L, 21L, 22L).map(new IncrementMapFunction()).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(2L, 22L, 23L));}// create a testing sinkprivate static class CollectSink implements SinkFunction<Long> {// must be staticpublic static final List<Long> values = Collections.synchronizedList(new ArrayList<>());@Overridepublic void invoke(Long value, SinkFunction.Context context) throws Exception {values.add(value);}}
}

使用 MiniClusterWithClientResource 进行集成测试的注意

  • 为了不将整个 pipeline 代码从生产复制到测试,请将 source 和 sink 在生产代码中设置成可插拔的,并在测试中注入特殊的测试 source 和测试 sink。
  • 这里使用 CollectSink 中的静态变量,是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者,可以使用测试的 sink 将数据写入临时目录的文件中。
  • 如果作业使用事件时间定时器,则可以实现自定义的 并行 源函数来发出 watermark。
  • 建议始终以 parallelism > 1 的方式在本地测试 pipeline,以识别只有在并行执行 pipeline 时才会出现的 bug。
  • 优先使用 @ClassRule 而不是 @Rule,这样多个测试可以共享同一个 Flink 集群。可以节省大量的时间,因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。
  • 如果 pipeline 包含自定义状态处理,则可以通过启用 checkpoint 并在小型集群中重新启动作业来测试其正确性。为此,需要在 pipeline 中(仅测试)抛出用户自定义函数的异常来触发失败。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/864724.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kafka-服务端-副本同步-源码流程

杂 在0.9.0.0之前&#xff0c;Kafka提供了replica lag.max.messages 来控制follower副本最多落后leader副本的消息数量&#xff0c;follower 相对于leader 落后当超过这个数量的时候就判定该follower是失效的&#xff0c;就会踢出ISR&#xff0c;这里的指的是具体的LEO值。 对…

墨烯的C语言技术栈-C语言基础-001

(最近报名了9月的计算机二级 得好好重温一下C语言 祝我计算机二级必过!) 学习视频为B站的哔哩大学计算机学院 参考书籍为C语言程序设计第五版(张磊主编) 一.什么是C语言C语言是一门通用计算机编程语言 广泛应用底层开发 C语言的设计目标是提供一种能以简易的方式编译 处理低级…

面试历程--工行外包,岗位高级测试工程师

岗位来源&#xff1a;朋友内推 面试形式&#xff1a;线上会议 面试官&#xff1a;开始三人&#xff0c;后续两人 面试历程&#xff1a; 1.自我介绍&#xff1a;老样子&#xff0c;没有突出重点 2.问题&#xff1a; 2.1 自动化测试框架用的什么&#xff1f; 回答springbo…

软考高级之系统分析师及系统架构设计师备考过程记录

0x00 前言 考了两次系分&#xff0c;一次架构&#xff0c;今年系分终于上岸。 在此记录备考过程和一些体会 先说我自己的情况&#xff0c;我本硕都是计算机科班出身&#xff0c;本科的时候搞Java后端开发&#xff0c;硕士搞python和深度学习&#xff0c;但工作之后就基本没碰过…

洞鉴-产品部署及其功能

网络策略&#xff1a;安装&#xff1a; 资源准备 ⼀、系统安装包 https://chaitin-release.oss-cnbeijing.aliyuncs.com/release%2Ff%2F66600aac66bcea13c086319c?Expires1719310707 &OSSAccessKeyIdLTAI5tBpSz7iLYLH51NrVx22&SignaturesOpuVYuKpm9ZBoEzfwiRlJ fLrhQ…

STM32 HAL库读取ID

在stm32f1xx_hal.c文件中由读取ID号的子函数&#xff0c;不同单片机的UID_BASE不同&#xff0c;本单片机用的是STM32F103CBT6,跳转之后可以看到地址为&#xff1a;0x1FFFF7E8 在程序中只需定义一个数组调用读取ID的函数即可 uint32_t UID[3]; while(1) { UID[0] HAL_GetUIDw0…

C盘清理和管理

本篇是C盘一些常用的管理方法&#xff0c;以及定期清理C盘的方法&#xff0c;大部分情况下都能避免C盘爆红。 C盘清理和管理 C盘存储管理查看存储情况清理存储存储感知清理临时文件清理不需要的 迁移存储 磁盘清理桌面存储管理应用存储管理浏览器微信 工具清理 C盘存储管理 查…

VUE3+ AntV Select 选择器:mode=“multiple“和mode=“tags“的区别是什么

文章目录 VUE3 AntV Select 选择器&#xff1a;mode"multiple"和mode"tags"的区别是什么一、解释二、对比演示 VUE3 AntV Select 选择器&#xff1a;mode"multiple"和mode"tags"的区别是什么 一、解释 “mode” 是一个参数&#xff…

SpringSecurity中文文档(Servlet Persisting Authentication)

Persisting Authentication 用户第一次请求受保护的资源时&#xff0c;系统会提示他们输入凭据。提示凭据的最常见方法之一是将用户重定向到登录页。对于请求受保护资源的未经身份验证的用户&#xff0c;总结的 HTTP 交换可能如下所示: Example 1. Unauthenticated User Requ…

VBA字典与数组第十六讲:行、列数不相同的数组间运算规律

《VBA数组与字典方案》教程&#xff08;10144533&#xff09;是我推出的第三套教程&#xff0c;目前已经是第二版修订了。这套教程定位于中级&#xff0c;字典是VBA的精华&#xff0c;我要求学员必学。7.1.3.9教程和手册掌握后&#xff0c;可以解决大多数工作中遇到的实际问题。…

【SpringCloud】Config源码解析

config是一个微服务配置组件&#xff0c;为微服务提供集中化的配置管理服务。config包含服务端和客户端&#xff0c;客户端在启动服务时从服务端拉取配置信息&#xff0c;服务端响应客户端的请求提供具体的配置。本章分析config组件配置信息的拉取过程 1、config服务端 服务端…

一键AI抠图太方便啦!不会ps也能成为修图大师

引言 在数字生活中&#xff0c;抠图技能已成为一项日常且必不可少的技能。无论是需要更换证件照的背景色&#xff0c;还是想要将图像中的主体精确分离。 但并非所有人都精通Photoshop&#xff0c;而且对于简单的任务来说&#xff0c;使用Photoshop可能显得过于复杂。因此&…

1077 韩信点兵

这是一个中国剩余定理的问题。中国剩余定理是数论中的一个定理&#xff0c;它给出了一组同余方程的解的存在性和唯一性。在这个问题中&#xff0c;我们需要找到一个数&#xff0c;使得它对给定的每个质数取余的结果等于给定的余数。 以下是一个使用C实现的解决方案&#xff1a…

Spark2.0

目录 10.3 Spark运行架构 10.3.1 基本概念 10.3.2 架构设计 ​编辑 10.3.3 Spark运行基本流程 Spark运行架构特点 10.3 Spark运行架构 10.3.1 基本概念 RDD &#xff1a;是 Resillient Distributed Dataset &#xff08;弹性分布式数据集&#xff09;的简称&#xff0c;是分…

elasticsearch入门基本知识+使用案例

1、ES逻辑结构 索引-index&#xff1a;相当于db中的数据库名。索引命名规则&#xff1a;小写字母。 类型-type&#xff1a;相当于数据库中的表名&#xff0c;为具有相同字段的文档定义的一个类型。 字段-field&#xff1a;相当于表字段名&#xff0c;文档数据的属性…

C 标准库 - <stdio.h>

C 标准库 - <stdio.h> 概述 <stdio.h> 是 C 语言标准库中的一个头文件&#xff0c;它包含了用于输入输出操作的函数声明。这些函数主要用于处理文件读写、格式化输出、字符输入输出等操作。stdio.h 是 "standard input/output" 的缩写&#xff0c;它提…

【Llama 2的使用方法】

Llama 2是Meta AI&#xff08;Facebook的母公司Meta的AI部门&#xff09;开发并开源的大型语言模型系列之一。Llama 2是在其前身Llama模型的基础上进行改进和扩展的&#xff0c;旨在提供更强大的自然语言处理能力和更广泛的应用场景。 以下是Llama 2的一些关键特性和更新点&am…

git主机仓库地址迁移后 git提交代码报错

找到本地电脑的文件known_hosts 2.在代码中git pull 此时终端会有提示 输入ye enter提交便成功了

EasyExcel动态表头多sheet导出,单元格操作样式,自动修改单元格格式

EasyExcel动态表头多sheet导出,单元格操作样式,自动修改单元格格式 说明 EasyExcel是一款开源的Java库&#xff0c;用于读取、写入和操作Excel文件。它是阿里巴巴集团开发的一款高效、功能丰富且易于使用的Excel操作工具。 EasyExcel提供了简洁的API&#xff0c;使得读写Excel…

springboot个人证书管理系统-计算机毕业设计源码16679

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了个人证书管理系统的开发全过程。通过分析个人证书管理系统管理的不足&#xff0c;创建了一个计算机管理个人证书管理系统的方案。文章介绍了个人证书管理系统的系…