Flink处理函数(2)—— 按键分区处理函数

 按键分区处理函数(KeyedProcessFunction):先进行分区,然后定义处理操作

1.定时器(Timer)和定时服务(TimerService)

  • 定时器(timers)是处理函数中进行时间相关操作的主要机制
  • 定时服务(TimerService)提供了注册定时器的功能

TimerService 是 Flink 关于时间和定时器的基础服务接口:

// 获取当前的处理时间
long currentProcessingTime();
// 获取当前的水位线(事件时间)
long currentWatermark();
// 注册处理时间定时器,当处理时间超过 time 时触发
void registerProcessingTimeTimer(long time);
// 注册事件时间定时器,当水位线超过 time 时触发
void registerEventTimeTimer(long time);
// 删除触发时间为 time 的处理时间定时器
void deleteProcessingTimeTimer(long time);
// 删除触发时间为 time 的事件时间定时器
void deleteEventTimeTimer(long time);

六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器

尽管处理函数中都可以直接访问TimerService,不过只有基于 KeyedStream 的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的 DataStream 不支持定时器操作,只能获取当前时间

对于处理时间和事件时间这两种类型的定时器,TimerService 内部会用一个优先队列将它们的时间戳保存起来,排队等待执行;可以认为,定时器其实是 KeyedStream上处理算子的一个状态,它以时间戳作为区分。所以 TimerService 会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个 key 和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次

基于 KeyedStream 注册定时器时,会传入一个定时器触发的时间戳,这个时间戳的定时器对于每个 key 都是有效的;利用这个特性,有时我们可以故意降低时间戳的精度,来减少定时器的数量,从而提高处理性能。比如我们可以在设置定时器时只保留整秒数,那么定时器的触发频率就是最多 1 秒一次:

long coalescedTime = time / 1000 * 1000; //时间戳(定时器默认的区分精度是毫秒)
ctx.timerService().registerProcessingTimeTimer(coalescedTime); //注册定时器

2.KeyedProcessFunction 的使用

基础用法:

stream.keyBy( t -> t.f0 ).process(new MyKeyedProcessFunction())

这里的MyKeyedProcessFunction即是KeyedProcessFunction的一个实现类;

源码解析


KeyedProcessFunction源码如下:

public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;/*** Process one element from the input stream.** <p>This function can output zero or more elements using the {@link Collector} parameter and* also update internal state or set timers using the {@link Context} parameter.** @param value The input value.* @param ctx A {@link Context} that allows querying the timestamp of the element and getting a*     {@link TimerService} for registering timers and querying the time. The context is only*     valid during the invocation of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the*     operation to fail and may trigger recovery.*/public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;/*** Called when a timer set using {@link TimerService} fires.** @param timestamp The timestamp of the firing timer.* @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link*     TimeDomain}, and the key of the firing timer and getting a {@link TimerService} for*     registering timers and querying the time. The context is only valid during the invocation*     of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the*     operation to fail and may trigger recovery.*/public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}/*** Information available in an invocation of {@link #processElement(Object, Context, Collector)}* or {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class Context {/*** Timestamp of the element currently being processed or timestamp of a firing timer.** <p>This might be {@code null}, for example if the time characteristic of your program is* set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.*/public abstract Long timestamp();/** A {@link TimerService} for querying time and registering timers. */public abstract TimerService timerService();/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/public abstract <X> void output(OutputTag<X> outputTag, X value);/** Get key of the element being processed. */public abstract K getCurrentKey();}/*** Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class OnTimerContext extends Context {/** The {@link TimeDomain} of the firing timer. */public abstract TimeDomain timeDomain();/** Get key of the firing timer. */@Overridepublic abstract K getCurrentKey();}
}

可以看到和ProcessFunction类似,都有一个processElement()onTimer()方法,并且定义了一个Context抽象类;不同点在于类型参数多了一个K,也就是key的类型;

代码示例

①处理时间语义

public class ProcessingTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 处理时间语义,不需要分配时间戳和watermarkSingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());// 要用定时器,必须基于KeyedStreamstream.keyBy(data -> true).process(new KeyedProcessFunction<Boolean, Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {Long currTs = ctx.timerService().currentProcessingTime();out.collect("数据到达,到达时间:" + new Timestamp(currTs));// 注册一个10秒后的定时器ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {out.collect("定时器触发,触发时间:" + new Timestamp(timestamp));}}).print();env.execute();}
}

通过ctx.timerService().currentProcessingTime()获取当前处理时间;

通过ctx.timerService().registerProcessingTimeTimer来设置一个定时器;

运行结果如下:

由于定时器是处理时间的定时器,不用考虑水位线延时问题,因此10s后能够准时触发定时操作;


②事件时间语义:

public class EventTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 基于KeyedStream定义事件时间定时器stream.keyBy(data -> true).process(new KeyedProcessFunction<Boolean, Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {out.collect("数据到达,时间戳为:" + ctx.timestamp());out.collect("数据到达,水位线为:" + ctx.timerService().currentWatermark() + "\n -------分割线-------");// 注册一个10秒后的定时器ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {out.collect("定时器触发,触发时间:" + timestamp);}}).print();env.execute();}// 自定义测试数据源public static class CustomSource implements SourceFunction<Event> {@Overridepublic void run(SourceContext<Event> ctx) throws Exception {// 直接发出测试数据ctx.collect(new Event("Mary", "./home", 1000L));// 为了更加明显,中间停顿5秒钟Thread.sleep(5000L);// 发出10秒后的数据ctx.collect(new Event("Mary", "./home", 11000L));Thread.sleep(5000L);// 发出10秒+1ms后的数据ctx.collect(new Event("Alice", "./cart", 11001L));Thread.sleep(5000L);}@Overridepublic void cancel() { }}
}

运行结果如下:

运行结果解释:

①第一条数据到来时,时间戳为1000,但由于水位线的生成是周期性的(默认200ms),因此水位线不会立即发送改变,仍然是Long.MIN_VALUE,之后只要到了水位线生成的时间周期,就会依据当前最大的时间戳来生成水位线(默认减1)

②第二条数据到来时,显然水位线已经推进到了999,但仍然不会立即改变;

③在事件时间语义下,定时器触发的条件就是水位线推进到设定的时间;第一条数据到来之后,设定的定时器时间为11000,而当时间戳为11000的数据到来时,水位线还停留在999的位置,因此不会立即触发定时器;之后水位线会推进到10999(11000-1),同样无法触发定时器;

④第三条数据到来时,时间戳为11001,此时水位线推进到了10999,等到水位线周期性更新后,推进到11000(11001-1),这样第一个定时器就会触发

⑤然后等待5s后,没有新的数据到来,整个程序结束,将要退出,此时会将水位线推进到Long.MAX_VALUE,所以所有没有触发的定时器统一触发;

 学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/642843.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode_11_中等_盛最多水的容器

文章目录 1. 题目2. 思路及代码实现&#xff08;Python&#xff09;2.1 双指针 1. 题目 给定一个长度为 n n n 的整数数组 h e i g h t height height 。有 n n n 条垂线&#xff0c;第 i i i 条线的两个端点是 ( i , 0 ) (i, 0) (i,0) 和 ( i , h e i g h t [ i ] ) (i…

联想M7268、7208打印机加粉清零方法

联想小新M7268激光一体机基本参数 产品类型 黑白激光多功能商用一体机 涵盖功能 打印/复印/扫描 最大处理幅面 A4 耗材类型 鼓粉分离 耗材容量 硒鼓LD2268&#xff1a;10000页&#xff0c;墨粉LT2268&#xff1a;1000页 双面功能 手…

Linux常用的管线命令(pipe)

只介绍命令和对应的功能&#xff0c;详细用法可针对性的自行搜索 管线命令基本上都是对文本进行截取的功能&#xff0c;据我观察&#xff0c;他们基本上会以行为单位。 以下命令都可以用在管道上&#xff0c;但是有些也可以单独使用。 以下演示的文件是用last | head -n 12 >…

你知道Mysql的架构吗?

msyql分为server曾和存储引擎层 server层包括了连接器(管理连接&#xff0c;权限验证)、查询缓存&#xff08;命中直接返回结果&#xff09;、分析器&#xff08;词法分析&#xff0c;语法分析&#xff09;、优化器&#xff08;执行计划生成&#xff0c;索引选择&#xff09;、…

java.lang.IllegalArgumentException: When allowCredentials is true

1.遇到的错误 java.lang.IllegalArgumentException: When allowCredentials is true, allowedOrigins cannot contain the special value "*" since that cannot be set on the "Access-Control-Allow-Origin" response header. To allow credentials to a…

vue echarts地图

下载地图文件&#xff1a; DataV.GeoAtlas地理小工具系列 范围选择器右侧行政区划范围中输入需要选择的省份或地市&#xff0c;选择自己想要的数据格式&#xff0c;这里选择了geojson格式&#xff0c;点右侧的蓝色按钮复制到浏览器地址栏中&#xff0c;打开的geojson文件内容…

gRPC-gateway使用介绍

gRPC-gateway 参考资料&#xff1a;gRPC-Gateway使用指南 服务中&#xff0c;使用了gRPC gateway&#xff08;代理&#xff09;来将外部的http请求映射为内部rpc调用。 proto文件示例&#xff1a; // 导入google/api/annotations.proto import "google/api/annotations…

Oracle 19c rac集群管理 -------- 集群启停操作过程

Oracle rac集群启停操作过程 首先查看数据库的集群的db_unique_name SQL> show parameter nameNAME TYPE VALUE ------------------------------------ ----------- --------------------------- cdb_cluster_name …

Android Dialog 显示不全的问题

前言&#xff1a;开发的时候发现一些运行到手机里的dialog显示不全&#xff0c;只显示一半左右 问了下chatgpt发现没有任何头绪&#xff0c;于是开始自己慢慢分析 显示去掉了原有的dialog的style发现问题解决了&#xff0c;但在原有基础上如何解决呢&#xff1f; 先看看xml&a…

MYSQL之索引语法与使用

索引分类 分类 含义 特点 关键字 主键索引 针对表中主键创建的索引 默认自动创建&#xff0c;只能有一个 PRIMARY 唯一索引 …

【UE】在控件蓝图中通过时间轴控制材质参数变化

效果 步骤 1. 新建一个控件蓝图和一个材质 2. 打开材质&#xff0c;设置材质域为用户界面&#xff0c;混合模式设置为“半透明” 在材质图表中添加两个参数来控制材质的颜色和不透明度 3. 对材质创建材质实例 4. 打开控件蓝图&#xff0c;在画布面板中添加一个图像控件 将刚…

DC-8靶机做题记录

靶机下载地址&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1jPMYoyFZXqr7sVMElHqGcw?pwdypq9 提取码&#xff1a;ypq9 参考&#xff1a; 【DC系列靶机DC8通关讲解】 https://www.bilibili.com/video/BV1R84y1H7rk/?share_sourcecopy_web&vd_source12088c392…

指针数组与数组指针

数组指针与指针数组 动态数组 扩容&#xff1a;空间不够&#xff0c;重新申请2倍大小的连续空间&#xff0c;拷贝元素后&#xff0c;释放旧空间 动态数组区别于静态数组&#xff0c;其不具备begin(),end()操作 //动态一维数组int n 10;int *a new int[n];//可以输入n值&…

(完整代码)R语言中利用SVM-RFE机器学习算法筛选关键因子

前言 自用生信代码&#xff0c; 花费一个多月写下来的。自学R以来第一次写600多行的代码。我的文章已经发表&#xff0c;如对您的研究有帮助希望可以引用一下。文章点我 SVM-RFE 主要是借助e1071包&#xff0c; 实现mSVM-REF识别并筛选关键基因&#xff0c;没有安装的小伙伴…

SpringBoot3(一)动力节点总结

目录 0、有用的新特性 一、Record 1.1、Record的介绍 1.2、Record的声明 1.3、Record的创建 0、有用的新特性 JDK8-19 新增了不少新特性&#xff0c;这里我们把实际常用的新特性&#xff0c;给大家介绍一下&#xff0c;包括以下几个方面&#xff1a; Java RecordSwich 开…

Gold-YOLO(NeurIPS 2023)论文与代码解析

paper&#xff1a;Gold-YOLO: Efficient Object Detector via Gather-and-Distribute Mechanism official implementation&#xff1a;https://github.com/huawei-noah/Efficient-Computing/tree/master/Detection/Gold-YOLO 存在的问题 在过去几年里&#xff0c;YOLO系列已经…

东南大学博士,华为上班5年,月薪达到4万4000,年终奖近10万

东南大学博士&#xff0c;华为上班5年&#xff0c;月薪达到4万4000&#xff0c;年终奖近10万 近日有华为员工爆料真实薪资&#xff0c;该网友是东南大学2018级博士&#xff0c;华为工作近5年&#xff0c;薪资达到4万4000&#xff0c;年终奖近10W。 该网友华为职场履历如下&am…

Qt6入门教程 10:菜单栏、工具栏和状态栏

目录 一.菜单栏 1.Qt Designer 1.1添加菜单和菜单项 1.2添加二级菜单 1.3给菜单和菜单项添加图标 1.4给菜单项添加功能 2.纯手写 二.工具栏 1.Qt Designer 1.1添加工具栏按钮 1.2工具栏的几个重要属性 2.纯手写 三.状态栏 1.Qt Designer 2.纯手写 用Qt Creator新…

iptables命令详解

简介 iptables 是 Linux 系统中用于配置 IPv4 数据包过滤规则的工具。它是 Linux 内核中 Netfilter 框架的一部分&#xff0c;通过设置规则&#xff0c;可以实现网络包的过滤、NAT 转发、端口映射等功能。 基本概念 表&#xff08;Tables&#xff09;&#xff1a; filter 表…

GPU与SSD间的P2P DMA访问机制

基于PCIe&#xff08;Peripheral Component Interconnect Express&#xff09;总线连接CPU、独立GPU和NVMe SSD的系统架构。 在该架构中&#xff0c;PCIe Swicth支持GPU与SSD之间快速的点对点直接内存访问&#xff08;peer-to-peer, p2p DMA&#xff09;。通常情况下&#xff0…