flink的带状态的RichFlatMapFunction函数使用

背景

使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换,而且这种用法非常常见,根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换,本文就来简单举个例子说明下RichFlatMapFunction的使用方法

RichFlatMapFunction使用示例

下面的例子的输入是不用name下的count数量值,当本次name的数量和前一次name的数量相差超过配置的阈值100时,打印出来一条告警日志,详细代码如下:

package wikiedits.func.state;import java.util.Objects;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;/*** Tuple2<String, Integer> 是输入的数据类型 String 是监控到异常值后的输出数据类型*/
public class MyRichFlatMapFunction extends RichFlatMapFunction<Tuple2<String, Integer>, String> {// 键值分区状态,对应每个name一个值ValueState<StateEntity> nameState;@Overridepublic void open(Configuration parameters) throws Exception {// 创建一个键值分区状态ValueStateDescriptor<StateEntity> state = new ValueStateDescriptor<>("nameState", StateEntity.class);nameState = getRuntimeContext().getState(state);}@Overridepublic void flatMap(Tuple2<String, Integer> input, Collector<String> collector) throws Exception {// 判断状态值是否为空(状态默认值是空)if (Objects.isNull(nameState.value())) {StateEntity sFalg = new StateEntity(input.f0, input.f1);nameState.update(sFalg);return;}// 和上一次的状态值比较StateEntity value = nameState.value();if (Math.abs(value.count - input.f1) > 100) {collector.collect(new String("监控到异常值,名称: " + input.f0 + " 上次的值:" + value + " 本次的值:" + input));}value.setName(input.f0);value.setCount(input.f1);// 更新状态值nameState.update(value);}}package wikiedits.func.state;import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class RichFlatMapFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置数据源,一共三个元素DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {for (int i = 1; i < Integer.MAX_VALUE; i++) {// 只有XXX,YYY,ZZZ三种nameString name = (0 == i % 3) ? "XXX" : ((i % 3 == 1) ? "YYY" : "ZZZ");int count = RandomUtils.nextInt(0, 1000);// 使用当前时间作为时间戳long timeStamp = System.currentTimeMillis();// 发射一个元素,并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, count), timeStamp);// 每发射一次就延时1秒Thread.sleep(5000);}}@Overridepublic void cancel() {}});dataStream.keyBy((f) -> {return f.f0;}).flatMap(new MyRichFlatMapFunction()).print();env.execute();}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}}

结果
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/136132.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

经典矩阵试题(一)

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、回型矩阵1、题目介绍2、思路讲解3、代码实现4、结果 二、蛇型矩阵1、题目介绍2、思路讲解…

[设计模式] 建造者模式

一、引言 起因是学习okhttp过程中遇到的这段代码 Request request original.newBuilder().url(original.url()).header("Authorization", "Bearer " BearerTokenUtils.getToken(configuration.getApiKey(), configuration.getApiSecret())).header(&quo…

视频特效编辑软件 After Effects 2022 mac中文版介绍 (ae 2022)

After Effects 2022 mac是一款视频特效编辑软件&#xff0c;被称为AE&#xff0c;拥有强大的特效工具&#xff0c;旋转&#xff0c;用于2D和3D合成、动画制作和视觉特效等&#xff0c;效果创建电影级影片字幕、片头和过渡&#xff0c;是一款可以帮助您高效且精确地创建无数种引…

2009-2018年全国各省财政透明度数据

2009-2018年全国各省财政透明度数据 1、时间&#xff1a;2009-2018年 2、指标&#xff1a;财政透明度 3、范围&#xff1a;31省 4、来源&#xff1a;财政透明度报告 5、指标解释&#xff1a; 财政透明度是公开透明的重要方面&#xff0c;体现了现代预算制度和法治政府的特…

GoLong的学习之路(二十二)进阶,语法之并发(go最重要的特点)(channel的主要用法)

这一章是接上一章内容继续&#xff0c;上一章说到协程也就是goroutine&#xff0c;如何使用它&#xff0c;这一张是讲一种数据结构。当然这个章节的数据结构非常重要。可以说这个数据结构就是为了方便协程&#xff0c;才制作出来的。 单纯地将函数并发执行是没有意义的。函数与…

Tomcat的类加载器

详情可以参考&#xff1a;https://tomcat.apache.org/tomcat-10.1-doc/class-loader-howto.html 简要说明 Tomcat安装了多种类加载器&#xff0c;以便容器的不同部分、容器中的应用访问能够不同的类和资源。 在Java环境中&#xff0c;类加载器被组织为父-子树的形式。通常情况…

GaN HEMT 电容的分析建模,包括寄生元件

标题&#xff1a;Analytical Modeling of Capacitances for GaN HEMTs, Including Parasitic Components 来源&#xff1a;IEEE TRANSACTIONS ON ELECTRON DEVICES&#xff08;14年&#xff09; 摘要&#xff1a;本文提出了一种基于表面势的终端电荷和电容模型&#xff0c;包…

【Git】Git安装入门使用常用命令Gitee远程仓库上传文件与下载

一&#xff0c;Git入门 1.1 Git是什么 Git是一款分布式版本控制系统&#xff0c;被广泛用于软件开发中的源代码管理。它由Linus Torvalds在2005年创造并发布&#xff0c;旨在解决传统版本控制系统&#xff08;如SVN&#xff09;的一些局限性。主要用于敏捷高效地处理任何或小或…

数组的存储结构、特殊矩阵和稀疏矩阵的压缩存储

数组的存储结构、特殊矩阵和稀疏矩阵的压缩存储 1.数组的存储结构、特殊矩阵、稀疏矩阵的压缩存储1.1 数组的存储结构1.1.1 一维数组的存储结构关系式1.1.2 多维数组的存储结构关系式 1.2 特殊矩阵的压缩存储1.2.1 对称矩阵1.2.2 下三角矩阵1.2.3 上三角矩阵1.2.4 三对角矩阵 1…

【MySQL】库操作和表操作

文章目录 一、库操作1. 创建数据库2. 数据库的编码问题查看系统默认支持的字符集查看数据库支持的字符集查看数据库支持的字符集校验规则校验规则对数据库的影响 3. 操纵数据库查看数据库显示创建语句修改数据库删除数据库 4. 数据库的备份和恢复数据库的备份数据库的恢复表的删…

二分详解(整数二分、浮点二分,附加例题)

一、二分 1.二分性质 二分的本质&#xff1a;二分可以将边界点二分出来&#xff08;即一部分满足这个条件&#xff0c;一部分不满足这个条件&#xff09; 二分的时候也一定有解 2.整数二分 1.寻找红色边界点x int l0,rn-1; while(l<r) {int mid(lr1)/2;if(check(mid))l…

MATLAB中Stem3函数用法

目录 语法 说明 向量和矩阵数据 表数据 其他选项 示例 行向量输入 列向量输入 矩阵输入 使用向量输入指定针状线条位置 使用矩阵输入指定针状线条位置 填充标记 线型、标记符号和颜色选项 线型、标记符号和颜色选项 其他样式选项 绘制表中的数据 特定坐标区上…

JS逆向爬虫---请求参数加密②【某麦数据analysis参数加密】

主页链接: https://www.qimai.cn/rank analysis逆向 完整参数生成代码如下&#xff1a; const {JSDOM} require(jsdom) const dom new JSDOM(<!DOCTYPE html><p>hello</p>) window dom.windowfunction customDecrypt(n, t) {t t || generateKey(); //…

kubernetes集群编排——k8s资源监控

资源限制 上传镜像 [rootk8s2 limit]# vim limit.yaml apiVersion: v1 kind: Pod metadata:name: memory-demo spec:containers:- name: memory-demoimage: stressargs:- --vm- "1"- --vm-bytes- 200Mresources:requests:memory: 50Milimits:memory: 100Mi [rootk8s2…

竞赛选题 深度学习机器视觉车道线识别与检测 -自动驾驶

文章目录 1 前言2 先上成果3 车道线4 问题抽象(建立模型)5 帧掩码(Frame Mask)6 车道检测的图像预处理7 图像阈值化8 霍夫线变换9 实现车道检测9.1 帧掩码创建9.2 图像预处理9.2.1 图像阈值化9.2.2 霍夫线变换 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分…

Vim编辑器学习

B站学习vim指令链接 1&#xff1a;vim下有两种模式&#xff0c;一种是命令模式&#xff0c;一种是编辑模式 2&#xff1a;命令到编辑模式&#xff0c;按键盘i&#xff0c;编辑到命令格式按Esc 3&#xff1a;&#xff1a;wq 保存并退出 &#xff1a;wq code.c保存并把文件命名为…

ubuntu, nvidia driver, cuda, cudnn, pytorch-gpu版本安装

文章目录 1.常用指令1.1查看cpu是intel还是amd:1.2.查看ubuntu版本1.3.查看架构1.4.查看已安装的nvidia驱动1.5.进入tty模式 2.安装ubuntu22.04 和 nvidia 驱动3.ubuntu 安装 anaconda4.安装pytorch gpu版本5.安装完整版cuda 和 cudnn6.nvidia-driver, cuda-toolkit, cudnn 1.常…

AI:66-基于机器学习房价预测

🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌在这个漫长的过程,中途遇到了不少问题,但是…

机器视觉的试卷批改系统 - opencv python 视觉识别 计算机竞赛

文章目录 0 简介1 项目背景2 项目目的3 系统设计3.1 目标对象3.2 系统架构3.3 软件设计方案 4 图像预处理4.1 灰度二值化4.2 形态学处理4.3 算式提取4.4 倾斜校正4.5 字符分割 5 字符识别5.1 支持向量机原理5.2 基于SVM的字符识别5.3 SVM算法实现 6 算法测试7 系统实现8 最后 0…

【算法设计】回溯法算法设计——骑士游历问题(C++实现)

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; 更多算法分析与设计知识专栏&#xff1a;算法分析&#x1f525; 给大家跳…