Flink状态应用测试程序示例

Flink状态应用测试程序示例

1. 创建执行环境
        // 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);   

2. 创建数据流
        // 2. 创建数据流DataStream<Tuple2<String, Integer>> inputStream = env.fromElements(Tuple2.of("a", 1),Tuple2.of("b", 2),Tuple2.of("c", 3),Tuple2.of("a", 4),Tuple2.of("b", 5),Tuple2.of("c", 6)).uid("source").name("source");

3. 对数据流进行keyBy()操作
        // 3. 对数据流进行keyBy()操作DataStream<Tuple2<String, Integer>> keyedStream = inputStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {return stringIntegerTuple2.f0;}});

4. 使用RichFlatMapFunction来实现状态的维护
        // 4. 使用RichFlatMapFunction来实现状态的维护keyedStream.flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {// 定义状态变量private ValueState<Integer> countState;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态变量ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("count", Integer.class);countState = getRuntimeContext().getState(descriptor);}@Overridepublic void flatMap(Tuple2<String, Integer> input, Collector<Tuple2<String, Integer>> out) throws Exception {// 获取状态变量的值Integer currentCount = countState.value();// 更新状态变量的值if (currentCount == null) {currentCount = 0;}currentCount++;countState.update(currentCount);// 输出结果out.collect(Tuple2.of(input.f0, currentCount));}}).print(); 

5.执行任务
        // 5.执行任务env.execute("Flink State Test"); 

6.执行结果
(a,1)
(b,1)
(c,1)
(a,2)
(b,2)
(c,2)Process finished with exit code 0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/645257.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringMVC第二天

今日内容 能够掌握SSM整合的流程 能够编写SSM整合功能模块类 能够使用Result统一表现层响应结果 能够编写异常处理器进行项目异常 能够完成SSM整合前端页面发送请求实现增删改查操作 能够编写拦截器并配置拦截器 一、SSM整合【重点】 1 SSM整合配置 问题导入 请描述“SSM整…

加速应用开发:低代码云SaaS和源码交付模式如何选

随着数字化转型的加速&#xff0c;企业对于快速开发和交付高质量应用的需求也越来越迫切。为了满足这一需求&#xff0c;开发者们开始探索采用低代码平台进行软件开发工作&#xff0c;以加速应用开发过程。 目前&#xff0c;市场上的低代码产品众多&#xff0c;但基本可分为简单…

黑马Java——面向对象进阶(static继承)

1.static静态变量 静态变量是随着类的加载而加载的&#xff0c;优先与对象出现的

大模型面试题总结

文章目录 一、大模型(LLMs)基础面二、大模型(LLMs)进阶面三、大模型(LLMs)微调面四、大模型(LLMs)langchain面1. 基于LLM+向量库的文档对话 基础面2. 基于LLM+向量库的文档对话 优化面3. LangChain的概念面试问题4.LangChain的一些模块提问5.LangChain的业务提问6.Lang…

写一份简单的产品说明书:格式和排版建议

现在的市场竞争那么激烈&#xff0c;拥有一份简洁明了的产品说明书可以说是很重要的。产品说明书不仅向用户提供了对产品的详细了解&#xff0c;还能够树立品牌形象&#xff0c;提升用户体验。 | 一、写一份简单的产品说明书—一些建议 1.创意封面设计 一个吸引人的封面设计能…

Kong工作原理 - 代理参考 (Proxy Reference)

在本文档中&#xff0c;我们详细介绍了Kong Gateway的代理功能&#xff0c;包括其路由能力和内部工作原理。 Kong Gateway提供了一些接口&#xff0c;可以通过以下配置属性进行调整&#xff1a; proxy_listen&#xff0c;定义了Kong Gateway将接受来自客户端的公共HTTP&#…

#Uniapp:map地图组件

示例 <map class"map" :latitude"mapOptions.latitude" :longitude"mapOptions.longitude" :scale"mapOptions.scale" :markers"mapOptions.markers"></map>mapOptions: {longitude: 108.95, // 中心经度latit…

uniapp 用web-view嵌套网页地址并传参

小程序登陆后把token和openId 对应传到pc端 pc端有两套一套pc端代码和适应移动端的代码 嵌套的是适应移动端的代码 1.uniapp <template><view class"main"><u-navbar :fixed"true" :autoBack"false" leftClick"goBack&quo…

Day44 动态规划part06 完全背包理论基础 518. 零钱兑换 II 377. 组合总和 Ⅳ

动态规划part06 完全背包理论基础 518. 零钱兑换 II 377. 组合总和 Ⅳ 完全背包理论基础 acm可运行代码&#xff08;先遍历物品再遍历背包&#xff0c;一维dp&#xff09; #include<iostream> #include<vector> using namespace std;int Solution(vector<int…

Spring框架与反射

Spring框架是Java开发中广泛使用的一个强大且全面的框架&#xff0c;它在其核心和各个组件中大量利用了Java反射机制。反射在Spring中的应用主要体现在依赖注入、数据绑定、AOP&#xff08;面向切面编程&#xff09;等方面。 1. 依赖注入&#xff08;Dependency Injection&…

【数据结构】 循环队列的基本操作 (C语言版)

目录 一、顺序队列 1、顺序队列的定义&#xff1a; 2、顺序队列的优缺点&#xff1a; 二、循环队列 1、循环队列的定义&#xff1a; 2、循环队列的优缺点&#xff1a; 三、循环队列的基本操作算法&#xff08;C语言&#xff09; 1、宏定义 2、创建结构体 3、循环队…

ReactNative进阶(三十六):iPad横屏适配

文章目录 一、前言二、实现思路三、延伸阅读四、拓展阅读 一、前言 应用RN技术栈实现APP上线后&#xff0c;业务部门领导会上反馈未实现ipad横屏全屏展示&#xff0c;用户体验较差。由此&#xff0c;一场pad横屏全屏展示的APP调优工作由此开展。 二、实现思路 时间紧任务重&…

跨平台同步 Shell 历史记录,无缝切换会话 | 开源日报 No.154

atuinsh/atuin Stars: 14.3k License: MIT Atuin 是一个用 SQLite 数据库替换现有 shell 历史记录的工具&#xff0c;可以记录命令的额外上下文&#xff0c;并提供可选且完全加密的历史同步功能。其主要功能和核心优势包括&#xff1a; 重新绑定 ctrl-r 和 up (可配置) 到全屏…

书生·浦语大模型--第五节课笔记作业--LMDeploy 大模型量化部署实践

文章目录 大模型部署背景LMDeploy简介动手实践创建环境服务部署在线转换离线转换TurboMind推理API服务Gradio 作为前端 Demo演示TurboMind 服务作为后端TurboMind 推理作为后端 作业 大模型部署背景 部署&#xff1a;将训练好的模型在特定软硬件环境中启动的过程 挑战&#x…

Leetcode—2788. 按分隔符拆分字符串【简单】(stringstream的应用)

2023每日刷题&#xff08;八十六&#xff09; Leetcode—2788. 按分隔符拆分字符串 实现代码 class Solution { public:vector<string> splitWordsBySeparator(vector<string>& words, char separator) {vector<string> res;for(auto word: words) {st…

ubuntu-base(arm64与riscv64) 根文件系统

ubuntu-base&#xff08;arm64与riscv64&#xff09; 根文件系统 有个小需求&#xff0c;是在 arm64 和 riscv64 上都跑起来 speccpu 2017 首先在 Qemu 上跑起来&#xff0c;需要考虑需要的【Linux 内核、根文件系统、Bootloader】&#xff0c;其中 Bootloader 在 Qemu 中可以很…

关于标准那些事——第十篇 分类标准

最近要赶一个极其重要的CANS认证项目&#xff0c;这会是全中国第一个完全数字化CNAS认证的实验室项目&#xff0c;内容分享进度会比较慢。其实&#xff0c;大多数情况也并不是没有时间&#xff0c;俗话说&#xff1a;时间嘛&#xff0c;挤挤总是有的&#xff01;其实影响进度更…

imgaug库图像增强指南(38):从入门到精通——图像卷积的全面解析

引言 在深度学习和计算机视觉的世界里&#xff0c;数据是模型训练的基石&#xff0c;其质量与数量直接影响着模型的性能。然而&#xff0c;获取大量高质量的标注数据往往需要耗费大量的时间和资源。正因如此&#xff0c;数据增强技术应运而生&#xff0c;成为了解决这一问题的…

Java- @FunctionalInterface声明一个接口为函数式接口

基本介绍 FunctionalInterface 是 Java 8 中引入的注解&#xff0c;用于声明一个接口是函数式接口。函数式接口是指仅包含一个抽象方法的接口&#xff0c;可以用于支持 Lambda 表达式和方法引用。FunctionalInterface 注解确保该接口只包含一个抽象方法&#xff0c;从而确保其…

VSCode Python调试运行:json编写

对于需要在命令行传参运行的项目&#xff0c;如果想要调试运行&#xff0c;则需要编写对应的launch.json文件这里记录一下json文件的编写格式&#xff1a; {"version": "0.2.0","configurations": [{"python": "/data/xxx/minic…