45、Flink 的 Process Function 详解

Process Function
1.概述

ProcessFunction 是底层的数据流处理操作,可访问所有(非循环)流应用程序的基本模块。

  • 事件 (数据流中的元素)
  • 状态(容错、一致、仅在 keyed stream 上)
  • 定时器(事件时间和处理时间,仅在 keyed stream 上)

ProcessFunction 可以被认为是一个可以访问 keyed State 和定时器的 FlatMapFunction,它通过对输入流中接收到的每个元素进行调用来处理事件。

对于状态,ProcessFunction 允许访问 keyed State,可通过 RuntimeContext 访问。

定时器支持处理时间和事件时间,对函数 processElement(…)的每次调用都会获得一个 Context 对象,该对象可以访问元素的事件时间戳和 TimerService;TimerService 可用于注册处理时间和事件时间定时器;对于事件时间定时器,当 watermark 到达或超过定时器的时间戳时,会调用 onTimer(…),在该调用过程中,所有状态的作用域再次限定为创建定时器时使用的 key,从而允许定时器操作 keyed State。

注意:如果要访问 keyed State 和定时器,必须在 keyed stream 上应用 ProcessFunction。

stream.keyBy(...).process(new MyProcessFunction());
2.底层 Joins

对于两个输入的底层 Join 可以使用 CoProcessFunction 或 KeyedCoProcessFunction,通过调用 processElement1(…)和processElement2(…)分别处理两个输入。

底层 Join 流程如下

为一个输入或两个输入创建状态对象;

从输入中接收元素时更新状态;

从另一个输入接收元素后,查询状态并产生 Join 结果;

示例:将客户数据加入金融交易,同时保留客户数据的状态;如果关心在面对无序事件时具有完整和确定性的 Join,那么当客户数据流的watermark 超过交易时间时,可以使用定时器评估并触发交易的 Join。

3.案例

示例:KeyedProcessFunction 维护每个 key 的 count,并在一分钟后(事件时间)发出一个 key/count,而不更新该 key。

  • count、key 和上次修改时间戳存储在 ValueState 中;
  • 对于每条记录,KeyedProcessFunction 递增计数器并设置最后一次修改的时间戳;
  • 一分钟后(事件时间)触发定时器;
  • 定时器触发时,会根据存储计数的最后修改时间检查回调的事件时间戳,并在它们匹配的情况下发出 key/计数。

注意:也可以用会话窗口实现,此处使用 KeyedProcessFunction。

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;// the source data stream
DataStream<Tuple2<String, String>> stream = ...;// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream.keyBy(value -> value.f0).process(new CountWithTimeoutFunction());/*** The data type stored in the state*/
public class CountWithTimestamp {public String key;public long count;public long lastModified;
}/*** The implementation of the ProcessFunction that maintains the count and timeouts*/
public class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {/** The state that is maintained by this process function */private ValueState<CountWithTimestamp> state;@Overridepublic void open(OpenContext openContext) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));}@Overridepublic void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {// retrieve the current countCountWithTimestamp current = state.value();if (current == null) {current = new CountWithTimestamp();current.key = value.f0;}// update the state's countcurrent.count++;// set the state's timestamp to the record's assigned event time timestampcurrent.lastModified = ctx.timestamp();// write the state backstate.update(current);// schedule the next timer 60 seconds from the current event timectx.timerService().registerEventTimeTimer(current.lastModified + 60000);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {// get the state for the key that scheduled the timerCountWithTimestamp result = state.value();// check if this is an outdated timer or the latest timerif (timestamp == result.lastModified + 60000) {// emit the state on timeoutout.collect(new Tuple2<String, Long>(result.key, result.count));}}
}
4.KeyedProcessFunction

KeyedProcessFunction 作为 ProcessFunction 的扩展,在其 onTimer() 方法中提供了对定时器 key 的访问。

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {K key = ctx.getCurrentKey();// ...
}
5.定时器
a)概述

处理时间和事件时间定时器都由 TimerService 内部维护,并排队等待执行。

TimerService 按 key 和时间戳消除重复的定时器,即每个 key 和时间戳最多有一个定时器,如果为同一时间戳注册了多个计时器,那么 onTimer() 方法将只被调用一次。

Flink 会同步 onTimer() 和 processElement() 的调用,无需担心同时修改状态。

b)容错

定时器是容错的,并与应用程序的状态一起进行 Checkpoint,如果发生故障恢复或从保存点启动应用程序,则会恢复定时器。

当应用程序从故障中恢复或从保存点启动时,本应在恢复前启动的检查点中的处理时间定时器将立即启动。

定时器使用异步检查点,除了 RocksDB 后端/与增量快照/与基于堆的定时器的组合,但是大量的定时器会增加检查点时间,因为定时器是检查点状态的一部分。

c)合并定时器

由于 Flink 为每个 key 和时间戳只维护一个定时器,可以通过降低定时器的精度来合并定时器以减少定时器的数量

对于1秒(事件或处理时间)的定时器精度,可以将目标时间四舍五入到整秒,定时器最多会提前1秒触发,但不会晚于要求的毫秒精度,每个键和秒最多有一个计时器。

long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000;
ctx.timerService().registerProcessingTimeTimer(coalescedTime);

由于事件时间定时器只在 watermark 到达时触发,可以使用当前 watermark 来注册定时器,并将其与下一个 watermark 合并。

long coalescedTime = ctx.timerService().currentWatermark() + 1;
ctx.timerService().registerEventTimeTimer(coalescedTime);

定时器也可以按如下方式停止和移除

停止处理时间定时器:

long timestampOfTimerToStop = ...;
ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);

停止事件时间定时器:

long timestampOfTimerToStop = ...;
ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);

如果没有注册具有给定时间戳的定时器,则停止定时器无效。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/18494.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Java EE】网络原理——HTTP响应

目录 1.认识“状态码”&#xff08;status code&#xff09; 1.1 200 OK 1.2 404 Not Found 1.3 403 Forbodden 1.4 Method Not Allowed 1.5 Internal Sever Error 1.6 504 Gsteway Timeout 1.7 Move temporarily 1.8 Moved Permanently 1.9状态码小结 2.认识响应“报…

树莓派部署harbor_arm64

文章目录 树莓派4b部署Harbor-arm64版本docker-compose维护命令访问harbor 192.168.1.111认用户名密码admin/Harbor12345 树莓派4b部署Harbor-arm64版本 harbor-arm版本 部署&#xff1a;参考 wget https://github.com/hzliangbin/harbor-arm64/releases/download/v1.9.3/ha…

java项目之高校教师科研管理系统源码(springboot+vue+mysql)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的高校教师科研管理系统源码。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 高校教师科研管…

linux下常用软件

文章目录 1. GIMP图片编辑工具&#xff0c;用于处理文档图片2. 星火字典stardict 1. GIMP图片编辑工具&#xff0c;用于处理文档图片 install 在linux应用商店里搜索GIMP 就能出来 语言设置 辑—首选项—界面—语言&#xff0c;在下拉列表中选择[汉语_zh-CN]&#xff0c;对应的…

评测 香橙派OrangePi在智能交通上的应用

1、OrangePi应用场景 关于 Orange Pi AI Pro 开发板是香橙派联合华为精心打造的高性能 AI 开发板&#xff0c;其搭载了昇腾 AI 处理器&#xff0c;可提供 8TOPS INT8 的计算能力&#xff0c;内存提供了 8GB 和 16GB两种版本。可以实现图像、视频等多种数据分析与推理计算&#…

OpenMV的VisionBoard视觉识别开发板学习记录

此篇博客仅用于对VisionBoard的开发板的学习研究记录&#xff0c;没有教学内容。 一、资料来源 开发板资料链接 开发板环境搭建手册 开发板视频教程 板子的资料网站 openmv官方的网站 目录 一、资料来源二、针对 VisionBoard的目标识别和定位总结1. 目标识别功能1.1 物体检测…

延迟队列的时间轮算法实现

业务背景 很多时候&#xff0c;业务需要在一段时间之后完成一个工作任务。例如&#xff0c;滴滴打车订单完成后&#xff0c;如果用户一直不评价&#xff0c;会在48小时后自动评价为5星。 一般来说&#xff0c;实现这类需求需要设置一个定时器&#xff0c;在规定的时间后自动执…

芯片原厂驱动开发工程师:初学到精通,如何快速成长?

01 前言 大家好&#xff0c;我是XX&#xff0c;来自湖南XX学院&#xff0c;电子信息18级&#xff0c;也曾在创新基地控制组学习过两三年&#xff0c;毕业后就职于一家芯片原厂的解决方案部&#xff0c;担任驱动工程师的职位&#xff0c;算上实习期&#xff0c;我的工作时长已有…

Redis操作之Jedis

Jedis是Redis官方推荐的Java连接开发工具&#xff0c;它是一个流行的Redis客户端中间件&#xff0c;提供了简单易用的API和高性能的连接池管理。Jedis是一个轻量级的库&#xff0c;适用于大多数Redis应用场景&#xff0c;包括数据缓存、消息队列等。 一、简介 功能全面&#…

htb-Mailing

因为做windows服务器渗透较少&#xff0c;不妥的地方还请师傅们指出 可先看思路&#xff0c;实在不行再看writeup 任意文件下载拿pop3登录邮箱——》利用邮件服务器漏洞拿下NTLM——》利用组件版本漏洞拿下 拿shell 端口扫描开放服务 Host is up (0.91s latency). Not shown:…

CSS学习笔记:rem实现移动端适配的原理——媒体查询

移动端适配 移动端即手机端&#xff0c;也称M端 移动端适配&#xff1a;同一套移动端页面在不同屏幕尺寸的手机上可以实现宽度和高度的自适应&#xff0c;也就是页面中元素的宽度和高度可以根据屏幕尺寸的变化等比缩放 rem配合媒体查询可实现移动端适配 rem单位 媒体查询 …

SpringAdminClient如何将Httpbasic账号密码告知SpringAdminServer

场景&#xff0c;因为Config Service开了权限校验&#xff0c;注册到eureka之后&#xff0c;SpringAdmin查看信息会报错401&#xff0c;如果想在SpringAdmin中正确的看到Config Service的actuator信息则需要将账号密码告知给SpringAdmin&#xff0c;磁力用的是Eureka作为发现服…

1045. 买下所有产品的客户

1045. 买下所有产品的客户 题目链接&#xff1a;1045. 买下所有产品的客户 代码如下&#xff1a; # Write your MySQL query statement below select customer_id from Customer where product_key in (select product_key from Product) group by customer_id having count(…

WPF 如何调试

简述 它是一种系统机制&#xff0c;用于识别和修复一段代码中的错误或缺陷&#xff0c;这些错误或缺陷的行为与您的预期不同。调试子系统紧密耦合的复杂应用程序并不容易&#xff0c;因为修复一个子系统中的错误可能会在另一个子系统中创建错误。 在 C# 中调试 在 WPF 应用程序…

javaIO流知识点概况

一、前言&#xff1a; 1.1.流的概念: java将输入与输出比喻为"流"&#xff0c;英文:Stream. 就像生活中的"电流","水流"一样,它是以同一个方向顺序移动的过程.只不过这里流动的是字节(2进制数据).所以在IO中有输入流和输出流之分,我们理解他们…

Vue3中使用 filter 方法通过 id 删除数组中的指定对象

Vue中使用 filter 方法通过 id 删除数组中的指定对象 一、前言1、示例2、案例 一、前言 在 Vue3 中&#xff0c;我们经常需要处理数据并进行相应操作&#xff0c;比如删除数组中的特定对象。在这篇文章中&#xff0c;我们将学习如何使用 filter 方法和响应式变量来实现这一目标…

单点11.2.0.3备份恢复到单点11.2.0.4

保命法则&#xff1a;先备份再操作&#xff0c;磁盘空间紧张无法备份就让满足&#xff0c;给自己留退路。 场景说明&#xff1a; 1.本文档的环境为同平台、不同版本&#xff08;操作系统版本可以不同&#xff0c;数据库小版本不同&#xff09;&#xff0c;源机器和目标机器部…

swiftui基础组件Image加载图片,以及记载gif动图示例

想要在swiftui中展示图片&#xff0c;可以使用Image这个组件&#xff0c;这个组件可以加载本地图片和网络图片&#xff0c;也可以调整图片大小等设置。先大概看一下Image的方法有哪些可以用。 常用的Image属性 1.调整图像尺寸&#xff1a; 使用 resizable() 方法使图像可调整…

Java如何分块读取大文件

在Java中&#xff0c;分块读取大文件通常使用FileInputStream或BufferedInputStream结合循环来实现。以下是一个基本的示例&#xff0c;展示如何分块读取大文件&#xff1a; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException…

黑龙江等保测评:强化网络安全的北方防线

在数字化时代&#xff0c;网络空间已成为国家发展的新领域&#xff0c;其安全直接关系到国家安全、社会稳定和个人隐私。作为中国东北的重要省份&#xff0c;黑龙江省积极响应国家网络安全战略&#xff0c;深入实施信息安全等级保护制度&#xff08;简称“等保”&#xff09;&a…