Flink定时器

flink的定时器都是基于事件时间(event time)或事件处理时间(processing time)的变化来触发响应的。对一部分新手玩家来说,可能不清楚事件时间和事件处理时间的区别。我这里先说一下我的理解,防止下面懵逼。简单来说事件时间就相当于人出生的时间,一般数据生成的时候也会有创建时间。而事件处理时间则相当于人具体做某件事的时间,一条数据可能是2023年生成的,但是到2024年才被处理,这个2024年便被称为这个事件的处理时间。

一、事件时间定时器(event time),这是基于事件时间来触发的,这里面有一个小坑。当第一个事件到的时候创建一个定时器10秒后触发。对我们大部分人来说我既然已经创建了这个定时器,那么10秒后,他就会自动触发。但事实上他10秒后如果没有事件到来他并不会触发。大概意思就是前一个事件创建的定时器需要后一个事件的时间来触发。下面是事件时间定时器的一种实现方式。

import com.xcj.flink.bean.Temperature;
import com.xcj.util.DateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;public class EventTime {public static void main(String[] args) throws Exception {SourceTemperature mySourceTemperature = new SourceTemperature();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Temperature> tDSSource = env.addSource(mySourceTemperature);WatermarkStrategy<Temperature> twsDS= WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner((temperature, recordTimestamp) -> temperature.getTimestamp());SingleOutputStreamOperator<Temperature> tSSODS = tDSSource.assignTimestampsAndWatermarks(twsDS);KeyedStream<Temperature, String> keyByDS = tSSODS.keyBy(temperature -> temperature.getDay());SingleOutputStreamOperator<Temperature> process = keyByDS.process(new ProcessFunction<Temperature, Temperature>() {ListState<Temperature> temperatureListState;ValueState<Temperature> temperatureState;ValueState<Integer> size;ValueState<Long> temperature;@Overridepublic void open(OpenContext openContext) throws Exception {ListStateDescriptor<Temperature> listStateDescriptor = new ListStateDescriptor<>("listState", Temperature.class);temperatureListState = getRuntimeContext().getListState(listStateDescriptor);temperatureState = getRuntimeContext().getState(new ValueStateDescriptor<>("temperatureState", Temperature.class));size = getRuntimeContext().getState(new ValueStateDescriptor<>("sizeState", Integer.class));temperature = getRuntimeContext().getState(new ValueStateDescriptor<>("temperature", Long.class));}@Overridepublic void processElement(Temperature value, ProcessFunction<Temperature, Temperature>.Context ctx, Collector<Temperature> out) throws Exception {Temperature value1 = temperatureState.value();//System.out.println(ctx.timestamp());if(value1 == null){temperatureState.update(value);temperatureListState.add(value);size.update(1);//System.out.printf("当前事件处理:"+DateFormat.getDateTime(ctx.timestamp()));//System.out.println("当前水位线:"+DateFormat.getDateTime(ctx.timerService().currentWatermark()));temperature.update(value.getTimestamp());ctx.timerService().registerEventTimeTimer(temperature.value()+1000*10);}else{if(value1.getTemperature() < value.getTemperature()){temperatureState.update(value);temperatureListState.add(value);size.update(size.value()+1);//System.out.println(size.value());if(size.value()>= 3){System.out.printf("警告警告:");Iterator<Temperature> iterator = temperatureListState.get().iterator();while(iterator.hasNext()){out.collect(iterator.next());}temperatureListState.clear();temperatureState.clear();size.clear();ctx.timerService().deleteEventTimeTimer(temperature.value()+1000*10);}}else{System.out.println("温度降低了");temperatureState.update(value);temperatureListState.clear();temperatureListState.add(value);size.update(1);ctx.timerService().deleteEventTimeTimer(temperature.value()+1000*10);temperature.update(value.getTimestamp());ctx.timerService().registerEventTimeTimer(temperature.value()+1000*10);}}}@Overridepublic void onTimer(long timestamp, ProcessFunction<Temperature, Temperature>.OnTimerContext ctx, Collector<Temperature> out) throws Exception {System.out.printf("时间到了:清空温度"+DateFormat.getDateTime(ctx.timestamp()));temperatureListState.clear();temperatureState.clear();size.clear();if(temperature.value() != null)ctx.timerService().deleteEventTimeTimer(temperature.value() + 10*1000);}});process.print("当前警告温度为:");env.execute();}
}//自己定义数据源class SourceTemperature extends RichSourceFunction<Temperature> {@Overridepublic void run(SourceContext<Temperature> ctx) throws Exception {Scanner scanner = new Scanner(System.in);while (true) {Temperature temperature = new Temperature();System.out.print("请输入温度: ");//double temp = Math.random()*40;double temp = scanner.nextDouble();//System.out.println(temp);temperature.setTemperature(temp);temperature.setTimestamp(new Date().getTime());ctx.collect(temperature);//Thread.sleep(1000);}}@Overridepublic void cancel() {}
}//自定义实体类
class Temperature1 {public Temperature1(double temperature, long timestamp) {this.temperature = temperature;this.timestamp = timestamp;}public Temperature1(){};//温度private double temperature;//时间private long timestamp;//idprivate String day = "2024-12-24";public double getTemperature() {return temperature;}public void setTemperature(double temperature) {this.temperature = temperature;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public String getDay() {return day;}public void setDay(String day) {this.day = day;}@Overridepublic String toString() {return "Temperature1{" +"temperature=" + temperature +", timestamp=" + timestamp +", day='" + day + '\'' +'}';}
}

下面我们做一个测试,来验证一下这个解释:前一个事件创建的定时器需要后一个事件的时间来触发。他们的时间间隔超过了10秒钟,但是时间并没有触发,而是下一个事件到的时候才触发的。

二、事件处理时间,事件处理时间触发有系统时间有关

package com.xcj;
import com.xcj.flink.bean.Temperature;
import com.xcj.util.DateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;public class ProcessTime {public static void main(String[] args) throws Exception {SourceTemperature mySourceTemperature = new SourceTemperature();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Temperature> tDSSource = env.addSource(mySourceTemperature);
//        WatermarkStrategy<Temperature> twsDS
//                = WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(0))
//                .withTimestampAssigner((temperature, recordTimestamp) -> temperature.getTimestamp());
//
//        SingleOutputStreamOperator<Temperature> tSSODS = tDSSource.assignTimestampsAndWatermarks(twsDS);KeyedStream<Temperature, String> keyByDS = tDSSource.keyBy(temperature -> temperature.getDay());SingleOutputStreamOperator<Temperature> process = keyByDS.process(new ProcessFunction<Temperature, Temperature>() {ListState<Temperature> temperatureListState;ValueState<Temperature> temperatureState;ValueState<Integer> size;ValueState<Long> temperature;@Overridepublic void open(OpenContext openContext) throws Exception {ListStateDescriptor<Temperature> listStateDescriptor = new ListStateDescriptor<>("listState", Temperature.class);temperatureListState = getRuntimeContext().getListState(listStateDescriptor);temperatureState = getRuntimeContext().getState(new ValueStateDescriptor<>("temperatureState", Temperature.class));size = getRuntimeContext().getState(new ValueStateDescriptor<>("sizeState", Integer.class));temperature = getRuntimeContext().getState(new ValueStateDescriptor<>("temperature", Long.class));}@Overridepublic void processElement(Temperature value, ProcessFunction<Temperature, Temperature>.Context ctx, Collector<Temperature> out) throws Exception {Temperature value1 = temperatureState.value();//System.out.println(ctx.timestamp());System.out.printf("当前事件时间:"+DateFormat.getDateTime(value.getTimestamp()));System.out.println("当前水位线:"+DateFormat.getDateTime(ctx.timerService().currentWatermark()));if(value1 == null){temperatureState.update(value);temperatureListState.add(value);size.update(1);temperature.update(ctx.timerService().currentProcessingTime());ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);}else{if(value1.getTemperature() < value.getTemperature()){temperatureState.update(value);temperatureListState.add(value);size.update(size.value()+1);//System.out.println(size.value());if(size.value()>= 3){System.out.printf("警告警告:");Iterator<Temperature> iterator = temperatureListState.get().iterator();while(iterator.hasNext()){out.collect(iterator.next());}temperatureListState.clear();temperatureState.clear();size.clear();ctx.timerService().deleteProcessingTimeTimer(temperature.value()+1000*10);}}else{System.out.println("温度降低了");temperatureState.update(value);temperatureListState.clear();temperatureListState.add(value);size.update(1);ctx.timerService().deleteProcessingTimeTimer(temperature.value()+1000*10);temperature.update(value.getTimestamp());ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);}}}@Overridepublic void onTimer(long timestamp, ProcessFunction<Temperature, Temperature>.OnTimerContext ctx, Collector<Temperature> out) throws Exception {System.out.printf("时间到了:清空温度"+DateFormat.getDateTime(ctx.timestamp()));temperatureListState.clear();temperatureState.clear();size.clear();if(temperature.value() != null)ctx.timerService().deleteProcessingTimeTimer(temperature.value() + 10*1000);}});process.print("当前警告温度为:");env.execute();}
}//自己定义数据源
class SourceTemperature extends RichSourceFunction<Temperature> {@Overridepublic void run(SourceContext<Temperature> ctx) throws Exception {Scanner scanner = new Scanner(System.in);while (true) {Temperature temperature = new Temperature();System.out.print("请输入温度: ");//double temp = Math.random()*40;double temp = scanner.nextDouble();//System.out.println(temp);temperature.setTemperature(temp);temperature.setTimestamp(new Date().getTime());ctx.collect(temperature);//Thread.sleep(1000);}}@Overridepublic void cancel() {}
}//自定义实体类
class Temperature1 {public Temperature1(double temperature, long timestamp) {this.temperature = temperature;this.timestamp = timestamp;}public Temperature1(){};//温度private double temperature;//时间private long timestamp;//idprivate String day = "2024-12-24";public double getTemperature() {return temperature;}public void setTemperature(double temperature) {this.temperature = temperature;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public String getDay() {return day;}public void setDay(String day) {this.day = day;}@Overridepublic String toString() {return "Temperature1{" +"temperature=" + temperature +", timestamp=" + timestamp +", day='" + day + '\'' +'}';}
}

事件处理时间是不需要下一个事件触发的

三、总结

事件时间(event time) 与事件处理时间(process time)定时器整体代码其实差不多,主要是在注册定时器的时候选择的方法

//事件时间
ctx.timerService().registerEventTimeTimer(value.getTimestamp());
//事件处理事件            
ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);

和不同定时器的逻辑。注意:事件时间定时器是需要下一个事件来触发上一个事件的定时任务,但是事件处理时间定时器是不需要下一个事件来触发的,他是根据注册时间和系统时间的差值来触发的。

上面我把注册时间改为了过去很久的时间,来一个就触发一次定时任务,因为注册时间与当前系统时间相差>10秒,所以会直接触发。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/891042.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker中的分层(Layer)

docker中有分层的概念&#xff0c;如下图所示 上面是容器层&#xff08;Container layer&#xff09;&#xff0c;下面是镜像层&#xff08;Image layers&#xff09;。 镜像层的内容是静态的&#xff0c;读和写的操作&#xff0c;都是在容器层发生&#xff0c;专门为容器的读…

RoboMIND:多体现基准 机器人操纵的智能规范数据

我们介绍了 RoboMIND&#xff0c;这是机器人操纵的多体现智能规范数据的基准&#xff0c;包括 4 个实施例、279 个不同任务和 61 个不同对象类别的 55k 真实世界演示轨迹。 工业机器人企业 埃斯顿自动化 | 埃夫特机器人 | 节卡机器人 | 珞石机器人 | 法奥机器人 | 非夕科技 | C…

python报错ModuleNotFoundError: No module named ‘visdom‘

在用虚拟环境跑深度学习代码时&#xff0c;新建的环境一般会缺少一些库&#xff0c;而一般解决的方法就是直接conda install&#xff0c;但是我在conda install visdom之后&#xff0c;安装是没有任何报错的&#xff0c;conda list里面也有visdom的信息&#xff0c;但是再运行代…

C语言性能优化:从基础到高级的全面指南

引言 C 语言以其高效、灵活和功能强大而著称&#xff0c;被广泛应用于系统编程、嵌入式开发、游戏开发等领域。然而&#xff0c;要写出高性能的 C 语言代码&#xff0c;需要对 C 语言的特性和底层硬件有深入的了解。本文将详细介绍 C 语言性能优化的背后技术&#xff0c;并通过…

go多版本管理工具g win安装配置

go多版本管理工具g 基本介绍仓库安装配置配置环境配置系统变量配置path变量测试使用配置完环境变量之后&#xff0c;打开终端进行测试使用查看 g 的环境变量配置&#xff0c;g env 为环境变量配置&#xff0c;g -v为当前版本信息查看可下载列表下载安装指定版本go&#xff0c;并…

PlasmidFinder:质粒复制子的鉴定和分型

质粒&#xff08;Plasmid&#xff09;是一种细菌染色体外的线性或环状DNA分子&#xff0c;也是一种重要的遗传元素&#xff0c;它们具有自主复制能力&#xff0c;可以在细菌之间传播&#xff0c;并携带多种重要的基因(如耐药基因与毒力基因等)功能。根据质粒传播的特性&#xf…

细说STM32F407单片机通过IIC读写EEPROM 24C02

目录 一、操作说明 二、工程配置 1、时钟、DEBUG、GPIO、USART6、NVIC、Code Generator 2、 IIC2 &#xff08;1&#xff09;Master Features组&#xff0c;主设备参数 &#xff08;2&#xff09;Slave Features组&#xff0c;从设备参数 三、软件设计 1、KELED 2、E…

神经网络-Inception

Inception网络是由Google开发的一种深度卷积神经网络架构&#xff0c;旨在解决计算机视觉领域中的图像分类和物体识别任务。 Inception网络最初在2014年被提出&#xff0c;并在ImageNet图像分类挑战赛上取得了很好的结果。其设计灵感来自于模块化的思想&#xff0c;将不同尺度…

PyTorch Instance Normalization介绍

Instance Normalization(实例归一化) 是一种标准化技术,与 Batch Normalization 类似,但它对每个样本独立地对每个通道进行归一化,而不依赖于小批量数据的统计信息。这使得它非常适合小批量训练任务以及图像生成任务(如风格迁移)。 Instance Normalization 的原理 对每…

国内独立开发者案例及免费送独立开发蓝图书

独立开发者在国内越来越受到关注&#xff0c;他们追求的是一种自由且自给自足的工作状态。 送这个&#xff1a; 少楠light&#xff08;Flomo、小报童、如果相机&#xff09;&#xff1a;他们是独立开发者的典范&#xff0c;不仅开发了多款产品&#xff0c;还坚信“剩者为王”…

【小程序】自定义组件的data、methods、properties

目录 自定义组件 - 数据、方法和属性 1. data 数据 2. methods 方法 3. properties 属性 4. data 和 properties 的区别 5. 使用 setData 修改 properties 的值 自定义组件 - 数据、方法和属性 1. data 数据 在小程序组件中&#xff0c;用于组件模板渲染的私有数据&…

MATLAB用find函数结合all,any函数高效解决问题

如本节中最后提到的问题&#xff0c;我们输出后还需要判断&#xff0c;不是特别的一目了然&#xff0c;这时候我们可以再加上 f i n d find find函数直接标记序号并输出。首先我们先来了解 f i n d find find的用法&#xff0c; f i n d ( a ) find(a) find(a)表示将矩阵或向量…

2022博客之星年度总评选开始了

作者简介&#xff1a;陶然同学 专注于Java领域开发 熟练掌握Java、js等语言的“Hello World” CSDN原力计划作者、CSDN内容合伙人、Java领域优质作者、Java领域新星作者、51CTO专家、华为云专家、阿里云专家等 &#x1f3ac; 陶然同学&#x1f3a5; 由 陶然同学 原创&#…

vue2 升级为 vite 打包

VUE2 中使用 Webpack 打包、开发&#xff0c;每次打包时间太久&#xff0c;尤其是在开发的过程中&#xff0c;本文记录一下 VUE2 升级Vite 步骤。 安装 Vue2 Vite 依赖 dev 依赖 vitejs/plugin-vue2": "^2.3.3 vitejs/plugin-vue2-jsx": "^1.1.1 vite&…

20241227在ubuntu20.04.6系统中,如何用watch命令每秒钟调用nvidia-smi来监控GPU

watch -n 1 nvidia-smi 20241227在ubuntu20.04.6系统中&#xff0c;如何用watch命令每秒钟调用nvidia-smi来监控GPU 2024/12/27 17:04 缘起&#xff1a;在ubuntu20.04.6系统中&#xff0c;使用M6000显卡来跑whisper&#xff0c;显存拉满/占用巨大&#xff0c;但是CPU占用比低&…

[江科大STM32] 第五集STM32工程模板——笔记

保存&#xff0c;进去选芯片型号&#xff0c;我们是F10C8T6 再添加一些文件&#xff0c;自己看路径 然后去 复习这三文件 打开KEIL add existing那个&#xff0c;添加已经存在的文件 还有5个.c.h文件也要添加进来 回到KEIL 点击旁边的settings 如果你用寄存器开发就建到这里就可…

Bitmap(BMP)图像信息分析主要说明带压缩的形式

文章目录 参考资料Bitmap图片结构Bitmap图片组成实例说明 参考资料 微软官方-位图存储 Bitmap图片结构 序号名称说明1Bitmap File HeaderBitmap文件头2Bitmap Info HeaderBitmap信息头3Color Palette Data调色板数据4Bitmap Image Data图像数据 说明 Bitmap文件头的大小为…

二分和离散化

为什么把二分和离散化放一起&#xff1a;因为离散化其实是一种二分整数的过程。 二分 相信大家都接触过二分查找&#xff08;折半查找&#xff09;&#xff0c;这就是二分的思想。 二分通过每次舍弃一半并不存在答案的区间&#xff0c;进而快速锁定要求的答案&#xff08;二…

ESP-IDF学习记录(2)ESP-IDF 扩展的简单使用

傻瓜式记录一个示例的打开&#xff0c;编译&#xff0c;运行。后面我再一个个运行简单分析每个demo的内容。 1.打开示例代码 2.选择项目&#xff0c;文件夹 3.选择串口 4.选择调试方式 5.根据硬件GPIO口配置menuconfig 6.构建项目 7.烧录设备&#xff0c;选择串口UART方式 运行…

SpringMVC学习(一)——请求与响应处理

目录 一、SpringMVC简介 二、RequestMapping&#xff1a;请求路径映射 三、RestController 四、请求限定 五、请求处理 1.使用普通变量&#xff0c;收集请求参数 2.使用RequestParam明确指定获取参数 3.目标方法参数是一个pojo 4.RequestHeader&#xff1a;获取请求…