flink的KeyedBroadcastProcessFunction测试

背景

我们经常需要对KeyedBroadcastProcessFunction函数进行单元测试,以确保上线之前这个函数的功能是正常的,包括里面的广播状态和键值分区状态

测试KeyedBroadcastProcessFunction类

    @Testpublic void testHarnessForKeyedBroadcastProcessFunction() throws Exception {KeyedBroadcastProcessFunction<String, String, String, String> function = new MyKeyedBroadcastProcessFunction();// 键值分区状态final ValueStateDescriptor<String> valueStateDescriptor =new ValueStateDescriptor<>("item", BasicTypeInfo.STRING_TYPE_INFO);// 广播状态final MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);KeyedBroadcastOperatorTestHarness<String, String, String, String> harness =ProcessFunctionTestHarnesses.forKeyedBroadcastProcessFunction(function, x -> x,TypeInformation.of(String.class), ruleStateDescriptor);harness.processBroadcastElement("0", 1);harness.processBroadcastElement("000", 2);harness.processElement("1", 10);// 判断键值分区状态(注意这里最好就只是某个key下面,也就是分组key直接设置为x->"固定常数值"即可)ValueState<String> valueState = function.getRuntimeContext().getState(valueStateDescriptor);Assert.assertEquals(valueState.value(), "1");// 判断广播状态BroadcastState<String, String> broadcastState = harness.getBroadcastState(ruleStateDescriptor);Assert.assertTrue(broadcastState.contains("0"));Assert.assertTrue(broadcastState.contains("000"));// 判断输出的列表Assert.assertEquals(harness.extractOutputValues(), Arrays.asList("0", "000", "1"));}

关键代码:
1.获取键值分区状态

ValueState<String> valueState = function.getRuntimeContext().getState(valueStateDescriptor);

2.获取广播状态:

BroadcastState<String, String> broadcastState = harness.getBroadcastState(ruleStateDescriptor);

3.工具类

public class ProcessFunctionTestHarnesses {public ProcessFunctionTestHarnesses() {}public static <IN, OUT> OneInputStreamOperatorTestHarness<IN, OUT> forProcessFunction(ProcessFunction<IN, OUT> function) throws Exception {OneInputStreamOperatorTestHarness<IN, OUT> testHarness = new OneInputStreamOperatorTestHarness(new ProcessOperator((ProcessFunction)Preconditions.checkNotNull(function)), 1, 1, 0);testHarness.setup();testHarness.open();return testHarness;}public static <K, IN, OUT> KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> forKeyedProcessFunction(KeyedProcessFunction<K, IN, OUT> function, KeySelector<IN, K> keySelector, TypeInformation<K> keyType) throws Exception {KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness = new KeyedOneInputStreamOperatorTestHarness(new KeyedProcessOperator((KeyedProcessFunction)Preconditions.checkNotNull(function)), keySelector, keyType, 1, 1, 0);testHarness.open();return testHarness;}public static <IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> forCoProcessFunction(CoProcessFunction<IN1, IN2, OUT> function) throws Exception {TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> testHarness = new TwoInputStreamOperatorTestHarness(new CoProcessOperator((CoProcessFunction)Preconditions.checkNotNull(function)), 1, 1, 0);testHarness.open();return testHarness;}public static <K, IN1, IN2, OUT> KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> forKeyedCoProcessFunction(KeyedCoProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector1, KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType) throws Exception {KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> testHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator((KeyedCoProcessFunction)Preconditions.checkNotNull(function)), keySelector1, keySelector2, keyType, 1, 1, 0);testHarness.open();return testHarness;}public static <IN1, IN2, OUT> BroadcastOperatorTestHarness<IN1, IN2, OUT> forBroadcastProcessFunction(BroadcastProcessFunction<IN1, IN2, OUT> function, MapStateDescriptor<?, ?>... descriptors) throws Exception {BroadcastOperatorTestHarness<IN1, IN2, OUT> testHarness = new BroadcastOperatorTestHarness(new CoBroadcastWithNonKeyedOperator((BroadcastProcessFunction)Preconditions.checkNotNull(function), Arrays.asList(descriptors)), 1, 1, 0);testHarness.open();return testHarness;}public static <K, IN1, IN2, OUT> KeyedBroadcastOperatorTestHarness<K, IN1, IN2, OUT> forKeyedBroadcastProcessFunction(KeyedBroadcastProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector, TypeInformation<K> keyType, MapStateDescriptor<?, ?>... descriptors) throws Exception {KeyedBroadcastOperatorTestHarness<K, IN1, IN2, OUT> testHarness = new KeyedBroadcastOperatorTestHarness(new CoBroadcastWithKeyedOperator((KeyedBroadcastProcessFunction)Preconditions.checkNotNull(function), Arrays.asList(descriptors)), keySelector, keyType, 1, 1, 0);testHarness.open();return testHarness;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/136263.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu kill命令使用方法极简

例如 # 用法为&#xff1a;kill PID&#xff0c;示例如下&#xff1a; kill 25143大家一定也要把已经解决过的&#xff0c;查看其他的csdn过后解决完的总结成自己的博客&#xff0c;这样方便其他的查找。我就不用了&#xff0c;我有资格。

C语言 每日一题 11.9 day15

数组元素循环右移问题 一个数组A中存有N&#xff08; > 0&#xff09;个整数&#xff0c;在不允许使用另外数组的前提下&#xff0c;将每个整数循环向右移M&#xff08;≥0&#xff09;个位置&#xff0c;即将A中的数据由&#xff08;A0​A1⋯AN−1&#xff09;变换为&…

【Mysql】增删改查(基础版)

我使用的工具是Data Grip &#xff08;SQLyog Naivact 都行&#xff09; 使用Data Grip创建student表&#xff0c;具体步骤如下&#xff08;熟悉Data Grip或者使用SQLyog&#xff0c;Naivact可以跳过&#xff09; https://blog.csdn.net/m0_67930426/article/details/13429…

arima模型python代码

import pandas as pd import numpy as np from statsmodels.tsa.arima_model import ARIMA import matplotlib.pyplot as plt # 加载数据 data pd.read_csv(your_data.csv, index_coldate, parse_datesTrue) # 拟合ARIMA模型 model ARIMA(data, order(1, 1, 1)) result…

高速信号PCB布局怎么布?(电子硬件)

对于高速信号&#xff0c;pcb的设计要求会更多&#xff0c;因为高速信号很容易收到其他外在因素的干扰&#xff0c;导致实际设计出来的东西和原本预期的效果相差很多。 所以在高速信号pcb设计中&#xff0c;需要提前考虑好整体的布局布线&#xff0c;良好的布局可以很好的决定布…

物联网AI MicroPython学习之语法 ucollections集合和容器类型

学物联网&#xff0c;来万物简单IoT物联网&#xff01;&#xff01; ucollections 介绍 ucollections 模块用于创建一个新的容器类型&#xff0c;用于保存各种对象。 接口说明 namedtuple - 创建一个新namedtuple容器类型 函数原型&#xff1a; 创建一个具有特定名称和一组…

天津WEB前端培训哪家好?Web机构推荐!

05年以后&#xff0c;互联网已经进入了web2.0时代&#xff0c;同时也标志着网站的前端由此发生了翻天覆地的变化&#xff0c;现在市场上对WEB前端开发工程师岗位有着很大的需求&#xff0c;学习web前端开发的方式有很多种&#xff0c;对于初学者来说&#xff0c;选择自学还是培…

【Hive】分区表和分桶表相关知识点介绍

Hive中的分区表和分桶表是两种用于优化数据查询和管理的技术。它们可以提高查询性能、减少数据扫描量并提供更精细的数据组织方式。 分区表(Partitioned Table) Hive的分区表将数据按照一个或多个列的值进行逻辑分区。每个分区都是一个独立的子目录,其中包含符合该分区条件…

Spring Cloud - 手写 Gateway 源码,实现自定义局部 FilterFactory

目录 一、FilterFactory 分析 1.1、前置知识 1.2、分析源码 1.2.1、整体分析 1.2.2、源码分析 1.3、手写源码 1.3.1、基础框架 1.3.2、实现自定义局部过滤器 1.3.3、加参数的自定义局部过滤器器 一、FilterFactory 分析 1.1、前置知识 前面的学习我们知道&#xff0c…

AI:65-基于机器学习预测股市行情

🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌在这个漫长的过程,中途遇到了不少问题,但是…

Go基础知识全面总结

文章目录 go基本数据类型bool类型数值型字符字符串 数据类型的转换运算符和表达式1. 算数运算符2.关系运算符3. 逻辑运算符4. 位运算符5. 赋值运算符6. 其他运算符运算符优先级转义符 go基本数据类型 bool类型 布尔型的值只可以是常量 true 或者 false。⼀个简单的例⼦&#…

win10语言切换调整为像win7一样,设置纯英文键盘切换,使用ctrol+shift切换键盘

文章目录 引入键盘布局说明安装美式键盘去掉微软键盘&#xff0c;修改布局切换快捷键最终效果 引入 我们在玩游戏或者写代码的时候&#xff0c;常常需要使用shift键&#xff0c;而输入法的shift键常常是中英切换按键&#xff0c;这就让人非常不爽了&#xff0c;这里仿照在win7…

【Git】快速入门安装及使用git与svn的区别常用命令

一、导言 1、什么是svn&#xff1f; SVN是Subversion的简称&#xff0c;是一个集中式版本控制系统。与Git不同&#xff0c;SVN没有分布式的特性。在SVN中&#xff0c;项目的代码仓库位于服务器上&#xff0c;团队成员通过向服务器提交和获取代码来实现版本控制。SVN记录了每个文…

Android Glide transform旋转rotate圆图CircleCrop,Kotlin

Android Glide transform旋转rotate圆图CircleCrop&#xff0c;Kotlin import android.graphics.Bitmap import android.os.Bundle import android.util.Log import android.widget.ImageView import androidx.appcompat.app.AppCompatActivity import com.bumptech.glide.load…

基于单片机的养殖场温度控制系统设计

博主主页&#xff1a;单片机辅导设计 博主简介&#xff1a;专注单片机技术领域和毕业设计项目。 主要内容&#xff1a;毕业设计、简历模板、学习资料、技术咨询。 文章目录 主要介绍一、控制系统设计二、系统方案设计2.1 系统运行方案设计2.1.1 羊舍环境温度的确定 三、 系统仿…

Leetcode—226.翻转二叉树【简单】

2023每日刷题&#xff08;二十四&#xff09; Leetcode—226.翻转二叉树 实现代码 /*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode() : val(0), left(nullptr), right(nullptr) {}* …

【解决方案】vue 项目 npm run dev 时报错:‘cross-env‘ 不是内部或外部命令,也不是可运行的程序

报错 cross-env 不是内部或外部命令&#xff0c;也不是可运行的程序 或批处理文件。 npm ERR! code ELIFECYCLE npm ERR! errno 1 npm ERR! estate1.0.0 dev: cross-env webpack-dev-server --inline --progress --config build/webpack.dev.conf.js npm ERR! Exit status 1 np…

vue-router路由守卫进阶

vue-router路由守卫进阶 路由守卫&#xff0c;可以想象为古代御前侍卫&#xff0c;路由守卫&#xff0c;则是对路由进行权限控制 分类&#xff1a;全局守卫、独享守卫、组件内守卫 全局前置-路由守卫 作用&#xff1a;主要用来鉴权 用户点击导航区&#xff0c;随后引起路径的…

kubernetes (k8s)的使用

一、kubernetes 简介 谷歌2014年开源的管理工具项目&#xff0c;简化微服务的开发和部署。 提供功能&#xff1a;自愈和自动伸缩、调度和发布、调用链监控、配置管理、Metrics监控、日志监控、弹性和容错、API管理、服务安全等。官网&#xff1a;https://kubernetes.io/zh-cn…

后入能先出,一文搞懂栈

目录 什么是栈数组实现链表实现栈能这么玩总结 什么是栈 栈在我们日常编码中遇到的非常多&#xff0c;很多人对栈的接触可能仅仅局限在 递归使用的栈 和 StackOverflowException&#xff0c;栈是一种后进先出的数据结构(可以想象生化金字塔的牢房和生化角斗场的狗洞)。 栈&…