Flink的KeyedProcessFunction基于Event Time和Process Time的定时器用法实例分析

FLink处理函数简介

在Flink底层,我们可以不定义任何具体的算子(比如 map,filter,或者 window),而只是提炼出一个统一的【处理】(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作【处理函数】(process function)。在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权。处理函数比较抽象,没有具体的操作,所以对于一些常见的简单应用(比如求和、开窗口)会显得有些麻烦;不过正是因为它不限定具体做什么,所以理论上我们可以做任何事情,实现所有需求。

Flink几种处理函数简介

  1. ProcessFunction是用于处理数据流的通用函数。它是一个抽象类,定义了处理数据流的常用方法,如processElement,onTimer等。您可以扩展ProcessFunction类并重写这些方法,以便在Flink程序中执行复杂的数据流处理逻辑。
  2. KeyedProcessFunction是ProcessFunction的特殊类型,用于处理带有键的数据流。它定义了额外的方法,如getKey,context.timerService()等,用于访问数据流中每个元素的键以及在处理函数中安排定时器。
  3. ProcessWindowFunction和ProcessAllWindowFunction是用于处理时间窗口的特殊函数。它们提供了一个process方法,用于在每个窗口中对数据进行处理。ProcessWindowFunction接受带有键的数据流,并且每个窗口都对应于一个键,而ProcessAllWindowFunction接受不带键的数据流,并且每个窗口都包含整个数据流。

这里重点介绍KeyedProcessFunction,KeyedProcessFunction是用来处理KeyedStream的。每有一个数据进入算子,则会触发一次processElement()的处理。它还提供了定时器的功能,在在预警、监控等场景特定场景下,非常适合。
KeyedProcessFunction定时器包分为两种:基于事件时间、基于处理时间。下面以统计计数的方式展示这两种定时器的用法,并附上详细的分析思路。以下用例基于Flink1.14

实例分析

KeyedProcessFunction基于事件时间的定时器

代码:


import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;/*** @description:** @author pony* @date 2024/1/17 20:55* @version 1.0* nc -l 9999*/
public class KeyedProcessFunctionOnTimerEventTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(60)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {return Long.valueOf(element.split(",")[1]);}}).withIdleness(Duration.ofSeconds(1));DataStream<Tuple2<String, Long>> stream0 = env.socketTextStream("x.x.x.x", 9999).assignTimestampsAndWatermarks(watermarkStrategy) //必须在数据源上指定watermark.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {return new Tuple2<String, Long>(value.split(",")[0], Long.valueOf(value.split(",")[1]));}});// apply the process function onto a keyed streamDataStream<Tuple2<String, Long>> result = stream0.keyBy(value -> value.f0).process(new CountEventTimeWithTimeoutFunction());result.print();env.execute("KeyedProcessFunction wordCount");}/*** The implementation of the ProcessFunction that maintains the count and timeouts*/static class CountEventTimeWithTimeoutFunctionextends KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>> {private ValueState<Long> state;private static final Integer DELAY = 1000; //1s@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));}@Overridepublic void processElement(Tuple2<String, Long> value,Context ctx,Collector<Tuple2<String, Long>> out) throws Exception {Long current = state.value();if (current == null) {current = 0L;}current++;state.update(current);//获取当前数据流的水位线long currentWatermark = ctx.timerService().currentWatermark();//            long timer = ctx.timestamp() + DELAY;//设置定时器的时间为当前event time+DELAYlong timer = currentWatermark + DELAY;//设置定时器的时间为当前水位线+DELAY//注册事件时间定时器,与watermark绑定,必须满足条件: watermark >= timer 来触发特定event的定时器ctx.timerService().registerEventTimeTimer(timer);//删除事件时间定时器if (currentWatermark < 0) {ctx.timerService().deleteEventTimeTimer(timer);}System.out.println("last Watermark: " + currentWatermark + ", format: " + time(currentWatermark));// 打印信息,用于核对数据System.out.println(String.format("processElement: %s, %d, ctx.timestamp() : %d (%s), timer : %d (%s)\n",ctx.getCurrentKey(),current,ctx.timestamp(),time(ctx.timestamp()),timer,time(timer)));}@Overridepublic void onTimer(long timestamp, //定时器触发时间,等于以上的timerOnTimerContext ctx,Collector<Tuple2<String, Long>> out) throws Exception {// 取得当前单词String currentKey = ctx.getCurrentKey();// get the state for the key that scheduled the timerLong result = state.value();// 打印数据,用于核对是否符合预期System.out.println(String.format("onTimer: %s, %d, ctx.timestamp() : %d (%s), timestamp : %d (%s)\n",currentKey,result,ctx.timestamp(),time(ctx.timestamp()),timestamp,time(timestamp)));System.out.println("current Watermark: " + ctx.timerService().currentWatermark() + ", format: " + time(ctx.timerService().currentWatermark()));out.collect(new Tuple2<String, Long>(currentKey, result));}@Overridepublic void close() throws Exception {super.close();state.clear();}}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(timeStamp));}
}

测试数据:

nc -l 9999
a1,1704038400000
a1,1704038401000
a1,1704038403000

运行结果:
在这里插入图片描述

KeyedProcessFunction基于处理时间的定时器

代码:

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;/*** @description:** @author pony* @date 2024/1/17 20:55* @version 1.0* nc -l 9999*/
public class KeyedProcessFunctionOnTimerProcessTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(60)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {
//                        return System.currentTimeMillis();return Long.valueOf(element.split(",")[1]);}}).withIdleness(Duration.ofSeconds(1));DataStream<Tuple2<String, Long>> stream0 = env.socketTextStream("x.x.x.x", 9999).assignTimestampsAndWatermarks(watermarkStrategy) //必须在数据源上指定watermark.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {return new Tuple2<String, Long>(value.split(",")[0], Long.valueOf(value.split(",")[1]));}});// apply the process function onto a keyed streamDataStream<Tuple2<String, Long>> result = stream0.keyBy(value -> value.f0).process(new CountProcessTimeWithTimeoutFunction());result.print();env.execute("KeyedProcessFunction wordCount");}static class CountProcessTimeWithTimeoutFunctionextends KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>> {private ValueState<Long> state;private static final Integer DELAY = 60 * 1000; //1s@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));}@Overridepublic void processElement(Tuple2<String, Long> value,Context ctx,Collector<Tuple2<String, Long>> out) throws Exception {Long current = state.value();if (current == null) {current = 0L;}current++;state.update(current);long timer = ctx.timestamp() + DELAY;//设置定时器的时间为当前event time+DELAY//注册处理时间定时器, 与watermark无关,定时器触发条件:当前系统时间>timerctx.timerService().registerProcessingTimeTimer(timer);//删除处理时间定时器
//            ctx.timerService().deleteProcessingTimeTimer(timer);System.out.println("processElement currentProcessingTime: " + ctx.timerService().currentProcessingTime() + ", format: " + time(ctx.timerService().currentProcessingTime()));// 打印所有信息,用于核对数据System.out.println(String.format("processElement: %s, %d, ctx.timestamp() : %d (%s), timer : %d (%s)\n",ctx.getCurrentKey(),current,ctx.timestamp(),time(ctx.timestamp()),timer,time(timer)));}@Overridepublic void onTimer(long timestamp,OnTimerContext ctx,Collector<Tuple2<String, Long>> out) throws Exception {// 取得当前单词String currentKey = ctx.getCurrentKey();// get the state for the key that scheduled the timerLong result = state.value();System.out.println("onTimer currentProcessingTime: " + ctx.timerService().currentProcessingTime() + ", format: " + time(ctx.timerService().currentProcessingTime()));// 打印数据,用于核对是否符合预期System.out.println(String.format("onTimer: %s, %d, ctx.timestamp() : %d (%s), timestamp : %d (%s)\n",currentKey,result,ctx.timestamp(),time(ctx.timestamp()),timestamp,time(timestamp)));//另外还支持侧流OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("single"){};if (result < 2) {ctx.output(outputTag, new Tuple2<>(currentKey, result));} else {out.collect(new Tuple2<String, Long>(currentKey, result));}}@Overridepublic void close() throws Exception {super.close();state.clear();}}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(timeStamp));}
}

测试数据:

nc -l 9999
a,1705568024000    
a,1705568024000

运行结果:
在这里插入图片描述

总结

在真实业务场景中【 KeyedProcessFunction基于处理时间的定时器】用的比较多,比较符合业务场景,即根据事件的时间来指定处理时间去定时触发定时器。因此在此场景中,可以不指定watermarkStrategy,可以获取传输参数的时间时间来定时触发定时器。

参考:
Process Function
Generating Watermarks

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/631837.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一键式Excel分词统计工具:如何轻松打包Python脚本为EXE

一键式Excel分词统计工具&#xff1a;如何轻松打包Python脚本为EXE 写在最前面需求分析直接用Python打包为什么大&#xff1f;为什么要使用conda环境&#xff1f; 将Python脚本打包为一个独立的应用程序1. 编写Python脚本&#xff1a;初步功能实现2. 初步图形用户界面&#xff…

Spark基础学习--基础介绍

1. Spark基本介绍 1.1 定义 Spark是可以处理大规模数据的统一分布式计算引擎。 1.2 Spark与MapReduce的对比 在之前我们学习过MapReduce&#xff0c;同样作为大数据分布式计算引擎&#xff0c;究竟这两者有什么区别呢&#xff1f; 首先我们回顾一下MapReduce的架构&#xf…

CC工具箱使用指南:【三调名称转用地用海名称】

一、简介 三调地类和用地用海地类之间有点相似但并不一致。 在做规划时&#xff0c;拿到的三调&#xff0c;都需要将三调地类转换为用地用海地类&#xff0c;然后才能做后续的工作。 一般情况下&#xff0c;三调转用地用海存在【一对一&#xff0c;多对一和一对多】3种情况。…

JWT渗透姿势一篇通

文章前言 企业内部产品应用使用JWT作为用户的身份认证方式&#xff0c;在对应用评估时发现了新的关于JWT的会话安全带来的安全问题&#xff0c;后期再整理时又加入了之前遗留的部分JWT安全问题&#xff0c;至此汇总成一篇完整的JWT文章 简单介绍 JWT(JSON Web Token)是一种用…

直播间的秒杀狂热背后,猫眼电影如何接住10w+并发运营活动?

“倒数&#xff0c;5、4、3、2、1” “10万张&#xff01;” “20秒没了” 上周末&#xff0c;张家辉和导演马浴柯带着新电影《怒潮》上了疯狂小杨哥的直播间&#xff0c;人数一度冲破80万人。 这次直播&#xff0c;是猫眼电影为新电影《怒潮》准备的一次宣传活动。 随着小…

洛谷P1161 开灯

这倒也是水题&#xff0c;我们可以建立一个数组&#xff0c;数组的下标就是编号&#xff0c;我们要注意的是浮点数乘法的结果要转化成整数&#xff0c;才能当做下标&#xff0c;因为题目给的是整数编号。 # include <stdio.h> int main() {int a[1000000] { 0 }, n, t,…

【linux】linux系统安装与更新软件

前言 linux系统安装软件有许多的方式&#xff0c;本文列举的是类似于windows从应用商店安装软件的方法。也是最常用最省事的方法。 但是呢linux系统是有许多发行版本的&#xff0c;不同版本的命令不同&#xff0c;但语法基本是一模一样。 概念插入 windows系统中&#xff0c…

IPv6自动隧道---6to4隧道

IPv6 over IPv4自动隧道特点 由于IPv4兼容IPv6隧道要求每一个主机都要有一个合法的IP地址,而且通讯的主机要支持双栈、支持IPv4兼容IPv6隧道,不适合大面积部署。目前该技术已经被6to4隧道所代替。 6to4隧道 集手动隧道和自动隧道的优点于一身,提出6to4的目的是为IPv4网络…

Linux如何创建文件

使用touch命令&#xff1a;使用touch命令可以创建一个新文件&#xff0c;如果文件已经存在&#xff0c;则只更新其访问时间和修改时间。例如&#xff0c;要创建一个名为test.txt的文件&#xff0c;请在终端中输入以下命令&#xff1a; touch test.txt使用echo命令&#xff1a;使…

【Spring源码分析】从源码角度去熟悉依赖注入(一)

从源码角度去熟悉依赖注入 一、全局出发引出各种依赖注入策略二、Autowired依赖注入源码分析属性注入源码分析&#xff08;AutowiredFieldElement.inject&#xff09;方法注入源码分析&#xff08;AutowiredMethodElement.inject&#xff09;流程图 其实在上篇阐述非懒加载单例…

UE5 独立程序的网络TCP/UDP服务器与客户端基础流程

引擎源码版&#xff0c;复制\Engine\Source\Programs\路径下的BlankProgram空项目示例。 重命名BlankProgram&#xff0c;例如CustomTcpProgram&#xff0c;并修改项目名称。 修改.Build.cs内容 修改Target.cs内容 修改Private文件夹内.h.cpp文件名并修改.cpp内容 刷新引擎 …

模拟StopWatch改写的一款耗时调试工具

StopWatch在单个文件的单个方法中确时还蛮好用的&#xff0c;但跨多个文件&#xff0c;多个方法的同一线程内调试就有明显的不舒服。一是要建立ThreadLocal共享StopWatch的实例。二是StopWatch的start和stop必须形成闭合。在方法嵌套的场景。比如要查看大方法执行时间&#xff…

性能测试中的基准测试

在性能测试中有一种测试类型叫做基准测试。这篇文章&#xff0c;就聊聊关于基准测试的一些事儿。 1、定义 通过设计合理的测试方法&#xff0c;选用合适的测试工具和被测系统&#xff0c;实现对某个特定目标场景的某项性能指标进行定量的和可对比的测试。 2、特质 ①、可重…

ChatGPT 如何解决 “Something went wrong. lf this issue persists ….” 错误

Something went wrong. If this issue persists please contact us through our help center at help.openai.com. ChatGPT经常用着用着就出现 “Something went wrong” 错误&#xff0c;不管是普通账号还是Plus账号&#xff0c;不管是切换到哪个节点&#xff0c;没聊两次就报…

Unity3D代码混淆方案详解

背景 Unity引擎使用Mono运行时&#xff0c;而C#语言易受反编译影响&#xff0c;存在代码泄露风险。本文通过《QQ乐团》项目实践&#xff0c;提出一种适用于Unity引擎的代码混淆方案&#xff0c;以保护代码逻辑。 引言 在Unity引擎下&#xff0c;为了防止代码被轻易反编译&a…

Vue created()和 activated()区别和作用调用顺序

目录 作用 区别 举个例子 调用顺序 作用 简单说都是用于页面初始化&#xff0c;土话说一进来页面&#xff0c;去进行数据加载或调用方法的 区别 首先activated 钩子只适用于被 <keep-alive> 包裹的组件。对于不被 <keep-alive> 包裹的组件&#xff0c;activate…

CSS实现的 Loading 效果

方式一、纯CSS实现 代码&#xff1a;根据需要复制 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>CSS Animation Library for Developers and Ninjas</title><style>/* ---------------…

2024年甘肃省职业院校技能大赛信息安全管理与评估 样题三 模块一

竞赛需要完成三个阶段的任务&#xff0c;分别完成三个模块&#xff0c;总分共计 1000分。三个模块内容和分值分别是&#xff1a; 1.第一阶段&#xff1a;模块一 网络平台搭建与设备安全防护&#xff08;180 分钟&#xff0c;300 分&#xff09;。 2.第二阶段&#xff1a;模块二…

ILI2130触控IC驱动

概述 ili2130触控网上有关的介绍很少&#xff0c;官网也没有有用的东西&#xff0c;所以记录一其驱动&#xff08;MCU驱动&#xff09;。此外ILI2520, ILI2521, ILI2322, ILI2323, ILI2316, ILI2326, ILI2130, ILI2131, ILI2132这几款触控IC使用的是同一个用户指导手册&#x…

吃瓜教程Task1:概览西瓜书+南瓜书第1、2章

由于本人之前已经学习过西瓜书&#xff0c;本次学习主要是对以往知识的查漏补缺&#xff0c;因此本博客记录了在学习西瓜书中容易混淆的点以及学习过程中的难点。更多学习内容可以参考下面的链接&#xff1a; 南瓜书的地址&#xff1a;https://github.com/datawhalechina/pumpk…