大数据-玩转数据-Flink定时器

一、说明

基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行.
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
currentProcessingTime(): Long 返回当前处理时间
currentWatermark(): Long 返回当前watermark的时间戳
registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。

二、基于处理时间的定时器

package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class ProcessTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));});stream.keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value,Context ctx,Collector<String> out) throws Exception {ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);out.collect(value.toString());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {System.out.println(timestamp);out.collect("wo be chu fa le ");}}).print();env.execute();}
}

三、基于事件时间的定时器

package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;public class EventTime_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));});WatermarkStrategy<WaterSensor> wms = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element,recordTimestamp) -> element.getTs() * 1000);stream.assignTimestampsAndWatermarks(wms).keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value,Context ctx,Collector<String> out) throws Exception {System.out.println(ctx.timestamp());ctx.timerService().registerProcessingTimeTimer(ctx.timestamp()+5000);out.collect(value.toString());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {System.out.println("定时器被触发了");}}).print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/68875.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【linux命令讲解大全】056.updatedb命令:创建或更新slocate数据库文件

文章目录 updatedb补充说明语法选项实例 从零学 python updatedb 创建或更新slocate命令所必需的数据库文件 补充说明 updatedb命令用来创建或更新slocate命令所必需的数据库文件。updatedb命令的执行过程较长&#xff0c;因为在执行时它会遍历整个系统的目录树&#xff0c;…

leetcode1两数之和

题目&#xff1a; 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&#xff0c;并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是&#xff0c;数组中同一个元素在答案里不能重复出现。 你…

触摸屏与多台 PLC之间无线Ethernet通信

在实际系统中&#xff0c;同一个车间里分布多台PLC&#xff0c;由触摸屏集中控制。通常所有设备距离在几十米到上百米不等。在有通讯需求的时候&#xff0c;如果布线的话&#xff0c;工程量较大且不美观&#xff0c;这种情况下比较适合采用无线通信方式。本方案以威纶通触摸屏和…

使用Python进行健身手表数据分析

健身手表(Fitness Watch)数据分析涉及分析健身可穿戴设备或智能手表收集的数据&#xff0c;以深入了解用户的健康和活动模式。这些设备可以跟踪所走的步数、消耗的能量、步行速度等指标。本文将带您完成使用Python进行Fitness Watch数据分析的任务。 Fitness Watch数据分析是健…

(18)线程的实例认识:线程的控制,暂停,继续,停止,线程相互控制,协作

话不多&#xff0c;但比较中肯&#xff0c;本文参照c# 线程暂停继续的实现方式_哔哩哔哩_bilibili 一、老方式 1、这是一个老的实现方式&#xff0c;基本不推荐&#xff0c;背后控制的原理需要了解。 界面&#xff1a;三个button一个textbox …

计算机视觉:轨迹预测综述

计算机视觉&#xff1a;轨迹预测综述 轨迹预测的定义轨迹预测的分类基于物理的方法&#xff08;Physics-based&#xff09;基于机器学习的方法&#xff08;Classic Machine Learning-based&#xff09;基于深度学习的方法&#xff08;Deep Learning-based&#xff09;基于强化学…

说说CDN和负载均衡具体是怎么实现的

分析&回答 什么是 CDN CDN (全称 Content Delivery Network)&#xff0c;即内容分发网络。 构建在现有网络基础之上的智能虚拟网络&#xff0c;依靠部署在各地的边缘服务器&#xff0c;通过中心平台的负载均衡、内容分发、调度等功能模块&#xff0c;使用户就近获取所需…

智安网络|探索物联网架构:构建连接物体与数字世界的桥梁

物联网是指通过互联网将各种物理设备与传感器连接在一起&#xff0c;实现相互通信和数据交换的网络系统。物联网架构是实现这一连接的基础和框架&#xff0c;它允许物体与数字世界之间的互动和协作。 一、物联网架构的概述 物联网架构是一种分层结构&#xff0c;它将物联网系…

innovus:route within pin 和限制pin shape内via 数量

我正在「拾陆楼」和朋友们讨论有趣的话题&#xff0c;你⼀起来吧&#xff1f; 拾陆楼知识星球入口 setNanoRouteMode -routeWithViaInPin "1:1" setNanoRouteMode -routeWithViaOnlyForStandardCellPin "1:1"

c++20新特性

个人博客地址: https://cxx001.gitee.io 前言 自C11这个大版本更新以来&#xff0c;后来陆续有两次小版本迭代C14、C17&#xff0c;它们主要是对C11的补充扩展&#xff0c;并没有增加太多大的特性。 而这次的C20&#xff0c;和当年C11一样&#xff0c;又是一次重大更新&#…

多个pdf怎么合并在一起?跟着我的步骤一起合并

多个pdf怎么合并在一起&#xff1f;利用PDF文档合并功能可以帮助您更有效地管理文件&#xff0c;将多个相关文件整合成一个文件&#xff0c;避免分散在多个文件中。此外&#xff0c;合并后的文件更便于共享和传输&#xff0c;因为只需共享一个文件而不是多个文件。虽然合并文件…

IntelliJ IDEA(Windows 版)的所有快捷键

&#x1fa81;&#x1f341; 希望本文能够给您带来一定的帮助&#x1f338;文章粗浅&#xff0c;敬请批评指正&#xff01;&#x1f341;&#x1f425; 大家好 本文参考了 IntelliJ IDEA 的官网&#xff0c;列举了IntelliJ IDEA&#xff08;Windows 版&#xff09;的所有快捷…

数据脱敏sensitive(前端或数据库加密,解密)

可以对数据加密&#xff0c;解密&#xff0c;对数据库加密的数据进行解密显示&#xff0c;对数据库没有加密的数据进行加密处理展示前端等待 1&#xff1a;引入数据如下结构 1-1&#xff1a;SensitiveDecode脱敏解密注解 package com.example.poi.desensitization.annotation;…

将 Python 与 RStudio IDE 配合使用(R与Python系列第一篇)

目录 前言&#xff1a; 1-安装reticulate包 2-安装Python 3-选择Python的默认版本&#xff08;配置Python环境&#xff09; 4-使用Python 4.1 运行一个简单的Python脚本 4.2 在RStudio上安装Python模块 4.3 在 R 中调用 Python 模块 4.4 在RStudio上调用Python脚本写的…

ssh无法启动 报错:sshd:Missing privilege separation directory:/var/empty/sshd

ssh无法启动 报错&#xff1a;sshd:Missing privilege separation directory:/var/empty/sshd 根据提示检查/var/empty/sshd /var/empty&#xff1a;默认是sshd程序用到的这个目录&#xff0c;当建立ssh连接&#xff0c;ssh服务器必须使用该目录下的sshd子目录&#xff1b; …

Linux使用bonding实现双网冗余

1、简介 linux bonding 是一种将多个物理网卡绑定为一个逻辑网卡的技术&#xff0c;它可以实现网络的冗余、负载均衡和带宽扩展等功能。linux bonding 是 linux 内核中提供的一个模块&#xff0c;它支持七种工作模式&#xff0c;不同的模式有不同的特点和适用场景。linux bond…

vue前端解决跨域

1,首先 axios请求&#xff0c;看后端接口路径&#xff0c;http://122.226.146.110:25002/api/xx/ResxxList&#xff0c;所以baseURL地址改成 ‘/api’ let setAxios originAxios.create({baseURL: /api, //这里要改掉timeout: 20000 // request timeout}); export default s…

【C++ 二叉搜索树】

目录 1.什么是二叉搜索树2.构建二叉搜索树2.1首先搭建树的框架2.2搭建搜索树的框架 3.二叉搜索树的插入3.1非递归式插入3.2递归式插入 4.二叉搜索树的查找4.1非递归查找4.2递归查找 5.二叉搜索树的删除5.1非递归删除5.2递归删除 6.整个代码实现 1.什么是二叉搜索树 简单来讲就…

NSSCTF web 刷题记录1

文章目录 前言题目[GXYCTF 2019]禁止套娃方法一方法二 [NCTF 2019]Fake XML cookbook[NSSRound#7 Team]ec_RCE[NCTF 2018]Flask PLUS 前言 今天是2023.9.3&#xff0c;大二开学前的最后一天。老实说ctf的功力还是不太够做的题目太少&#xff0c;新学期新气象。不可急于求成&am…

java中ThreadPoolExecutor线程池如何设置核心线程数和最大线程数,跟cpu核数有关系吗?

在 ThreadPoolExecutor 中&#xff0c;可以通过设置核心线程数和最大线程数来控制线程池的行为。这两个参数可以根据实际需求进行调整&#xff0c;并且它们与 CPU 核数之间存在一定的关系。 通常情况下&#xff0c;可以根据 CPU 核数来设置线程池的核心线程数和最大线程数。以…