Flink 统计接入的数据量-滚动窗口和状态的使用

1、概述

在生产场景值,经常需要和上游、下游对数,离线场景可以直接 group by 再 count ,但是实时场景中,如果使用 kafka 作为中间件,中间经过几个 job 的过滤转化后,再对照像 Doris 或 Clickhouse 中最终层的数据,如果出现缺失,很难判断是哪一层缺失的。

2、使用 侧流输出+处理时间的滚动窗口+状态进行数据量级统计

package com.flink.feature.windowcount;import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** 1、先输入数据* 1* 1* 1* 1* 输出结果* main=>:8> 业务处理=>1* main=>:1> 业务处理=>1* main=>:2> 业务处理=>1* main=>:3> 业务处理=>1* 每10秒每个key接受到的数据量=>:2> (1698913020000,1698913030000,窗口统计=>1,4)** 2、再输入数据* 1* 2* 2* 3* 3* 4* 4* 输出结果* main=>:4> 业务处理=>1* main=>:5> 业务处理=>2* main=>:6> 业务处理=>2* main=>:7> 业务处理=>3* main=>:8> 业务处理=>3* main=>:1> 业务处理=>4* main=>:2> 业务处理=>4* 每10秒每个key接受到的数据量=>:2> (1698913030000,1698913040000,窗口统计=>1,1)* 每10秒每个key接受到的数据量=>:7> (1698913030000,1698913040000,窗口统计=>4,2)* 每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>2,2)* 每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>3,2)*/public class UseWindowValidateData {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();OutputTag<Tuple2<String,Integer>> windowCountTag = new OutputTag<Tuple2<String,Integer>>("window_count"){};DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> process = source.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String input, ProcessFunction<String, String>.Context ctx, Collector<String> collector) throws Exception {ctx.output(windowCountTag,new Tuple2<>("窗口统计=>"+input,1));collector.collect("业务处理=>" + input);}});process.getSideOutput(windowCountTag).keyBy(new KeySelector<Tuple2<String,Integer>, String>() {@Overridepublic String getKey(Tuple2<String,Integer> tp) throws Exception {return tp.f0;}}).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple4<String,String,String,Integer>, String, TimeWindow>() {private MapState<String, Integer> mapState;@Overridepublic void open(Configuration parameters) throws Exception {MapStateDescriptor<String, Integer> stateDescriptor = new MapStateDescriptor<>("map-state", String.class, Integer.class);mapState = getRuntimeContext().getMapState(stateDescriptor);}@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Integer>, Tuple4<String, String, String, Integer>, String, TimeWindow>.Context ctx, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple4<String, String, String, Integer>> out) throws Exception {for (Tuple2<String, Integer> tp : elements) {Integer res = mapState.get(tp.f0);if (res == null) {res = 0;}res += 1;mapState.put(tp.f0, res);}out.collect(new Tuple4<>(String.valueOf(ctx.window().getStart()),String.valueOf(ctx.window().getEnd()),key,mapState.get(key)));// 每个窗口计算后清空状态mapState.clear();}}).print("每10秒每个key接受到的数据量=>");process.print("main=>");env.execute();}
}

3、测试结果

1)先输入数据

1
1
1
1

输出结果

main=>:8> 业务处理=>1
main=>:1> 业务处理=>1
main=>:2> 业务处理=>1
main=>:3> 业务处理=>1

每10秒每个key接受到的数据量=>:2> (1698913020000,1698913030000,窗口统计=>1,4)

2)再输入数据

1
2
2
3
3
4
4

输出结果

main=>:4> 业务处理=>1
main=>:5> 业务处理=>2
main=>:6> 业务处理=>2
main=>:7> 业务处理=>3
main=>:8> 业务处理=>3
main=>:1> 业务处理=>4
main=>:2> 业务处理=>4

每10秒每个key接受到的数据量=>:2> (1698913030000,1698913040000,窗口统计=>1,1)
每10秒每个key接受到的数据量=>:7> (1698913030000,1698913040000,窗口统计=>4,2)
每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>2,2)
每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>3,2)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/127781.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2014年亚太杯APMCM数学建模大赛A题无人机创造安全环境求解全过程文档及程序

2014年亚太杯APMCM数学建模大赛 A题 无人机创造安全环境 原题再现 20 国集团&#xff0c;又称 G20&#xff0c;是一个国际经济合作论坛。2016 年第 11 届 20 国集团峰会将在中国召开&#xff0c;这是继 APEC 后中国将举办的另一个大型峰会。此类大型峰会&#xff0c;举办城市…

prometheus服务发现

Consul简介 ◼ 一款基于golang开发的开源工具&#xff0c;主要面向分布式&#xff0c;服务化的系统提供服务注册、服务发现和配置管理 的功能 ◼ 提供服务注册/发现、健康检查、Key/Value存储、多数据中心和分布式一致性保证等功能 部署 curl -LO https://releases.hashicorp…

保障效率与可用,分析Kafka的消费者组与Rebalance机制

系列文章目录 上手第一关&#xff0c;手把手教你安装kafka与可视化工具kafka-eagle Kafka是什么&#xff0c;以及如何使用SpringBoot对接Kafka 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析&#xff0c;打破面试难关 防止消息丢失与消息重复——Kafka可…

opengl基础笔记1

1、opengl运行模式及opengl规范 运行模式&#xff1a;核心模式与立即渲染模式&#xff08;弃用&#xff09; 由于OpenGL的大多数实现都是由显卡厂商编写的&#xff0c;当产生一个bug时通常可以通过升级显卡驱动来解决。这些驱动会包括你的显卡能支持的最新版本的OpenGL&#xf…

YOLOv8将注意力机制融合进入C2f模块

1. 引言 1.1 YOLOv8添加注意力机制方法 yolov8添加注意力机制是一个非常常见的操作&#xff0c;常见的操作直接将注意力机制添加至YOLOv8的某一层之后&#xff0c;这种改进特别常见。 示例如下&#xff1a; 新版yolov8添加注意力机制&#xff08;以NAMAttention注意力机制为例…

10.windows系统:定时任务备份mysql数据库

1. 创建脚本 .bat内容如下&#xff1a; echo off ::设置编码格式utf-8否则有中文乱码 chcp 65001 echo. echo MySQL数据库开始备份echo ***************************** echo. echo 备份日期&#xff1a;%date% echo 备份时间&#xff1a;%time% echo. echo *****************…

鸿蒙问题记录

1、Variables decorated by Prop link, "Consume, and Obiectlink cannot be initialized locally 原因&#xff1a;被装饰器修饰的数据&#xff0c;不能初始化。这个应该是后续版本做了优化。当前使用 DevEco Studio 3.1.1 Release

Web3时代:探索DAO的未来之路

Web3 的兴起不仅代表着技术进步&#xff0c;更是对人类协作、创新和价值塑造方式的一次重大思考。在 Web3 时代&#xff0c;社区不再仅仅是共同兴趣的聚集点&#xff0c;而变成了一个价值交流和创新的平台。 去中心化&#xff1a;超越技术的革命 去中心化不仅仅是 Web3 的技术…

CRM系统如何帮助企业实现管理信息化?

21世纪的今天&#xff0c;企业不重视CRM信息化会导致什么后果&#xff1f;我们先来看这个例子—— 假设有一家中小型电子商务公司&#xff0c;他们销售各种电子产品&#xff0c;如手机、平板、电脑和配件等。在开始使用CRM系统之前&#xff0c;他们的客户数据分散在各个部门的…

Redis高可用解决方案之Redis集群,和Spring Cloud集成实战

专栏集锦&#xff0c;大佬们可以收藏以备不时之需 Spring Cloud实战专栏&#xff1a;https://blog.csdn.net/superdangbo/category_9270827.html Python 实战专栏&#xff1a;https://blog.csdn.net/superdangbo/category_9271194.html Logback 详解专栏&#xff1a;https:/…

HDFS集群环境部署(超级详细!!)

一、部署Hadoop的关键点 1.上传&#xff0c;解压到/export/server,配置软链接 2.修改4个配置文件&#xff0c;workers&#xff0c;hadoop.env.sh&#xff0c;core-stie.xml&#xff0c;hdfs-site.xml 3.SCP分发到root2,root3&#xff0c;并设置环境变量 4.创建数据目录,并修改文…

TypeScript 中for in遍历,元素隐式具有 “any“ 类型,因为类型为 “string“ 的表达式不能用于索引类型

第一种方案、使用[key: string]&#xff1a;string 形式为键名声明类型 声明类型&#xff1a; interface FormInfoData {[materialCode: string]: stringmaterialName: stringmaterialUnit: stringmaterialItem: stringmaterialOwnership: stringmaterialclassCode: stringmat…

Mac pro解压rar文件

unrar 已经从homebrew移除了&#xff01;&#xff01; 在 MacOS 上解压 rar 文件&#xff0c;需要使用一个支持 rar 格式的第三方工具。有很多工具可以选择&#xff0c;但我会推荐一个名为 "Unarchiver" 的免费工具&#xff0c;它支持许多不同类型的压缩文件&#x…

diffusers-Load adapters

https://huggingface.co/docs/diffusers/main/en/using-diffusers/loading_adaptershttps://huggingface.co/docs/diffusers/main/en/using-diffusers/loading_adapters 有几种训练技术可以个性化扩散模型&#xff0c;生成特定主题的图像或某些风格的图像。每种训练方法都会产…

【论文阅读笔记】GLM-130B: AN OPEN BILINGUAL PRE-TRAINEDMODEL

Glm-130b:开放式双语预训练模型 摘要 我们介绍了GLM-130B&#xff0c;一个具有1300亿个参数的双语(英语和汉语)预训练语言模型。这是一个至少与GPT-3(达芬奇)一样好的100b规模模型的开源尝试&#xff0c;并揭示了如何成功地对这种规模的模型进行预训练。在这一过程中&#xff0…

ESXi for ARM 最新下载地址

由于VMware决定关闭 flings.vmware.com 网站&#xff0c;内容被迁移到不同的地方&#xff0c;网站跳转到 Code Samples and PowerCLI Example Scripts | VMware - VMware {code} ESXi for ARM的下载地址迁移到了 https://customerconnect.vmware.com/downloads/get-download?…

inquirer.js——交互式命令行用户界面

一、什么是inquirer.js 1、inquirer.js是一个开源的交互式命令行用户界面&#xff08;CLI&#xff09;库&#xff0c;可以让你轻松地与用户进行交互&#xff0c;获取用户输入并做出相应的处理。它的主要功能是提供了一系列常用的命令行交互界面组件&#xff0c;例如input、con…

单目标应用:进化场优化算法(Evolutionary Field Optimization,EFO)求解微电网优化MATLAB

一、微网系统运行优化模型 微电网优化模型介绍&#xff1a; 微电网多目标优化调度模型简介_IT猿手的博客-CSDN博客 二、进化场优化算法EFO 进化场优化算法&#xff08;Evolutionary Field Optimization&#xff0c;EFO&#xff09;由Baris Baykant Alagoz等人于2022年提出&…

C现代方法(第16章)笔记——结构、联合和枚举

文章目录 第16章 结构、联合和枚举16.1 结构变量16.1.1 结构变量的声明16.1.2 结构变量的初始化16.1.3 指示器(C99)16.1.4 对结构的操作 16.2 结构类型16.2.1 结构标记的声明16.2.2 结构类型的定义16.2.3 结构作为参数和返回值16.2.4 复合字面量(C99)16.2.5 匿名结构(C1X) 16.3…

mysql---事务

mysql事务 事务是一个机制&#xff0c;一个操作序列。包含了一组数据库的操作命令&#xff0c;所有命令都是一个整体&#xff0c;向系统提交或者撤销的操作&#xff0c;要么都执行&#xff0c;要么都不执行&#xff0c;不可分割的单位。 事务的特点ACID: A: 原子性 最小单位…