Flink (九):DataStream API (六) Process Function

1. ProcessFunction

ProcessFunction 是一种底层的流处理操作,基于它用户可以访问(无环)流应用程序的所有基本构建块

  • 事件(流元素)
  • 状态(容错,一致性,仅在 keyed stream 上)
  • 定时器(事件时间和处理时间,仅在 keyed stream 上)

可以将 ProcessFunction 视为一种可以访问 keyed state 和定时器的 FlatMapFunction。Flink 为收到的输入流中的每个事件都调用该函数来进行处理。对于容错,与其它有状态的函数类似,ProcessFunction 可以通过 RuntimeContext 访问 Flink 的keyed state。定时器允许应用程序对处理时间和 事件时间中的更改做出反应。 每次调用 processElement(...) 时参数中都会提供一个 Context 对象,该对象可以访问元素的事件时间戳和 TimerService。 

TimerService 可用于为将来特定的事件时间/处理时间注册回调。 特定事件时间的 onTimer(...) 回调函数会在当前对齐的 watermark 超过所注册的时间戳时调用。 特定处理时间的 onTimer(...) 回调函数则会在系统物理时间超过所注册的时间戳时调用。 在该调用期间,所有状态会被再次绑定到创建定时器时的键上,从而允许定时器操作与之对应的 keyed state。如果想要访问 keyed state 和定时器,需要在 keyed stream 上使用 ProcessFunction

stream.keyBy(...).process(new MyProcessFunction());

2. 底层 Join

为了在两个输入上实现底层操作,应用程序可以使用 CoProcessFunction 或 KeyedCoProcessFunction。 这些函数绑定两个不同的输入,从两个不同的输入中获取元素并分别调用 processElement1(...) 和 processElement2(...) 进行处理。

实现底层 join 一般需要遵循以下模式:

  • 为一个输入(或两者)创建状态对象。
  • 从某个输入接收元素时更新状态。
  • 从另一个输入接收元素时,查询状态并生成 join 结果。

例如,你可能会将客户数据与金融交易进行 join,同时想要保留客户数据的状态。如果你希望即使在出现乱序事件时仍然可以得到完整且确定的 join 结果,你可以通过注册一个定时器在客户数据流的 watermark 已经超过当前这条金融交易记录时计算和发送 join 结果。

在下面的例子中,KeyedProcessFunction 维护每个键的计数,并且每次超过一分钟(事件时间)没有更新时输出一次键/计数对。

  • 计数,键和最后修改时间存储在 ValueState 中,它由键隐式限定范围。
  • 对于每条记录,KeyedProcessFunction 递增计数器并设置最后修改时间。
  • 对于每条记录,该函数还会注册了一个一分钟后(事件时间)的回调函数。
  • 在每次回调时,它会根据注册的时间和最后修改时间进行比较,如果正好差一分钟则 输出键/计数对(即,在该分钟内没有进一步更新)

这个简单的例子本身可以用会话窗口(session window)实现, 这里我们使用 KeyedProcessFunction 来展示使用它的基本模式。

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;// 源数据流
DataStream<Tuple2<String, String>> stream = ...;// 使用 process function 来处理一个 Keyed Stream 
DataStream<Tuple2<String, Long>> result = stream.keyBy(value -> value.f0).process(new CountWithTimeoutFunction());/*** 在状态中保存的数据类型*/
public class CountWithTimestamp {public String key;public long count;public long lastModified;
}/*** 用来维护数量和超时的 ProcessFunction 实现*/
public class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {/** 由 process function 管理的状态 */private ValueState<CountWithTimestamp> state;@Overridepublic void open(OpenContext openContext) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));}@Overridepublic void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {// 获得当前的数量CountWithTimestamp current = state.value();if (current == null) {current = new CountWithTimestamp();current.key = value.f0;}// 更新状态中的数量current.count++;// 将状态中的最后修改时间改为记录的事件时间current.lastModified = ctx.timestamp();// 将更新后的状态写回state.update(current);// 注册一个 60s 之后的事件时间回调 ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {// 获得注册该回调时使用的键对应的状态CountWithTimestamp result = state.value();// 检查当前回调时否是最新的回调还是后续注册了新的回调if (timestamp == result.lastModified + 60000) {// 超时后发送状态out.collect(new Tuple2<String, Long>(result.key, result.count));}}
}

在 Flink 1.4.0 之前,在调用处理时间定时器时,ProcessFunction.onTimer() 方法将当前的处理时间设置为事件时间的时间戳。此行为非常不明显,用户可能不会注意到。 然而,这样做是有害的,因为处理时间的时间戳是不确定的,并且和 watermark 不一致。此外,用户依赖于此错误的时间戳来实现逻辑很有可能导致非预期的错误。 因此,我们决定对其进行修复。在 1.4.0 后,使用此错误的事件时间时间戳的 Flink 作业将失败,用户应将其作业更正为正确的逻辑。

3. KeyedProcessFunction

KeyedProcessFunction 是 ProcessFunction 的一个扩展, 可以在其 onTimer(...) 方法中访问定时器的键。

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {K key = ctx.getCurrentKey();// ...
}

4. Timers

两种定时器(处理时间定时器和事件时间定时器)都在 TimerService 内部维护,并排队等待执行。对于相同的键和时间戳,TimerService 会删除重复的定时器,即每个键和时间戳最多有一个定时器。如果为同一时间戳注册了多个定时器,则只调用一次 onTimer() 方法。Flink 会同步 onTimer() 和 processElement() 的调用,因此用户不必担心状态的并发修改。

4.1 Fault Tolerance

定时器支持容错,它会和应用程序的状态一起进行 checkpoint。当进行故障恢复或从保存点启动应用程序时,定时器也会被恢复。当应用程序从故障中恢复或从保存点启动时,可能会发生这种情况。即:在恢复之前就应该触发的处理时间定时器会立即触发。

除了使用基于 RocksDB backend 的增量 snapshots 并使用基于 Heap 的定时器的情况外,Flink 总是会异步执行计算器的快照操作。 大量定时器会增加 checkpoint 的时间,因为定时器是需要 checkpoint 的状态的一部分。

4.2 Timer Coalescing

由于 Flink 中每个键和时间戳只保存一个定时器,因此可以通过降低定时器的精度来合并它们,从而减少定时器的数量。

对于精度为 1 秒(事件或处理时间)的定时器,可以将目标时间向下舍入为整秒。定时器最多会提前 1 秒,但不迟于要求的毫秒精度。 这样,每个键在每秒内最多有一个定时器。

long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000;
ctx.timerService().registerProcessingTimeTimer(coalescedTime);

由于事件时间定时器仅在 watermark 到来时才触发,因此还可以将下一个 watermark 到达前的定时器与当前定时器合并:

long coalescedTime = ctx.timerService().currentWatermark() + 1;
ctx.timerService().registerEventTimeTimer(coalescedTime);

定时器也可以按照以下方式被停止或者删除:

停止处理时间定时器:

long timestampOfTimerToStop = ...;
ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);

停止事件时间定时器:

long timestampOfTimerToStop = ...;
ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);

如果没有注册给定时间戳的定时器,则停止定时器不会产生影响。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/893271.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux Bash 中使用重定向运算符的 5 种方法

注&#xff1a;机翻&#xff0c;未校。 Five ways to use redirect operators in Bash Posted: January 22, 2021 | by Damon Garn Redirect operators are a basic but essential part of working at the Bash command line. See how to safely redirect input and output t…

C语言内存之旅:从静态到动态的跨越

大家好&#xff0c;这里是小编的博客频道 小编的博客&#xff1a;就爱学编程 很高兴在CSDN这个大家庭与大家相识&#xff0c;希望能在这里与大家共同进步&#xff0c;共同收获更好的自己&#xff01;&#xff01;&#xff01; 本文目录 引言正文一 动态内存管理的必要性二 动态…

AI时代:弯道超车的新思维与实践路径

大家好&#xff0c;我是herosunly。985院校硕士毕业&#xff0c;现担任算法研究员一职&#xff0c;热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名&#xff0c;CCF比赛第二名&#xff0c;科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的…

完整地实现了推荐系统的构建、实验和评估过程,为不同推荐算法在同一数据集上的性能比较提供了可重复实验的框架

{"cells": [{"cell_type": "markdown","metadata": {},"source": ["# 基于用户的协同过滤算法"]},{"cell_type": "code","execution_count": 1,"metadata": {},"ou…

【Spring】定义的Bean缺少隐式依赖

问题描述 初学 Spring 时&#xff0c;我们往往不能快速转化思维。例如&#xff0c;在程序开发过程中&#xff0c;有时候&#xff0c;一方面我们把一个类定义成 Bean&#xff0c;同时又觉得这个 Bean 的定义除了加了一些 Spring 注解外&#xff0c;并没有什么不同。所以在后续使…

基于本地消息表实现分布式事务

假设我们有一个电商系统,包含订单服务和库存服务。当用户下单时,需要在订单服务中创建订单,同时在库存服务中扣减库存。这是一个典型的分布式事务场景,我们需要保证这两个操作要么都成功,要么都失败,以保证数据的最终一致性。 项目结构: 订单服务(Order Service)库存服务(Inv…

『 实战项目 』Cloud Backup System - 云备份

文章目录 云备份项目服务端功能服务端功能模块划分客户端功能客户端模块划分 项目条件Jsoncpp第三方库Bundle第三方库httplib第三方库Request类Response类Server类Client类搭建简单服务器搭建简单客户端 服务端工具类实现 - 文件实用工具类服务器配置信息模块实现- 系统配置信息…

网络编程 | UDP组播通信

1、什么是组播 在上一篇博客中&#xff0c;对UDP的广播通信进行了由浅入深的总结梳理&#xff0c;本文继续对UDP的知识体系进行探讨&#xff0c;旨在将UDP的组播通信由浅入深的讲解清楚。 组播是介于单播与广播之间&#xff0c;在一个局域网内&#xff0c;将某些主机添加到组中…

第9章:Python TDD解决货币对象相等性比较难题

写在前面 这本书是我们老板推荐过的&#xff0c;我在《价值心法》的推荐书单里也看到了它。用了一段时间 Cursor 软件后&#xff0c;我突然思考&#xff0c;对于测试开发工程师来说&#xff0c;什么才更有价值呢&#xff1f;如何让 AI 工具更好地辅助自己写代码&#xff0c;或许…

【无标题】微调是迁移学习吗?

是的&#xff0c;微调&#xff08;Fine-Tuning&#xff09;可以被视为一种迁移学习&#xff08;Transfer Learning&#xff09;的形式。迁移学习是一种机器学习方法&#xff0c;其核心思想是利用在一个任务上学到的知识来改进另一个相关任务的性能。微调正是通过在预训练模型的…

Jenkins-获取build用户信息

需求&#xff1a; 代码发布后&#xff0c;将发布结果发送至相关运维同学邮箱&#xff0c;需要获取发布人的信息。jenkins默认是没有相关内置变量的。 需要通过插件的方式进行解决&#xff1a; 插件&#xff1a; user build vars plugin 部署后&#xff0c;可使用的变量&…

【HarmonyOS NAPI 深度探索12】创建你的第一个 HarmonyOS NAPI 模块

【HarmonyOS NAPI 深度探索12】创建你的第一个 HarmonyOS NAPI 模块 在本篇文章中&#xff0c;我们将一步步走过如何创建一个简单的 HarmonyOS NAPI 模块。通过这个模块&#xff0c;你将能够更好地理解 NAPI 的工作原理&#xff0c;并在你的应用中开始使用 C 与 JavaScript 的…

OpenCV相机标定与3D重建(62)根据两个投影矩阵和对应的图像点来计算3D空间中点的坐标函数triangulatePoints()的使用

加粗样式- 操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 这个函数通过使用立体相机对3维点的观测&#xff0c;重建这些点的三维坐标&#xff08;以齐次坐标表示&#xff09;。 cv::triangula…

为AI聊天工具添加一个知识系统 之58 从文字块构造到数据库创建

本文要点 要点 文字块 为了项目(为AI聊天工具的聊天者 开挂知识系统) ,我们探索一下知识的“种子”到底“藏”在哪&#xff0c;知识树又是如何“生根发芽开会结果”的。 看看下面的四组表达&#xff08;仔细体会理解消化每一个字 以及上下文 和标题&#xff09;&#xff1a;…

【电视盒子】HI3798MV300刷机教程笔记/备份遥控码修复遥控器/ADB/线刷卡刷/电视盒子安装第三方应用软件

心血来潮&#xff0c;看到电视机顶盒满天飞的广告&#xff0c;想改造一下家里的电视盒子&#xff0c;学一下网上的人刷机&#xff0c;但是一切都不知道怎么开始&#xff0c;虽然折腾了一天&#xff0c;以失败告终&#xff0c;还是做点刷机笔记。 0.我的机器 年少不会甄别&…

Python基于OpenCV和PyQt5的人脸识别上课签到系统【附源码】

博主介绍&#xff1a;✌Java老徐、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&…

【FPGA】MIPS 12条整数指令【1】

目录 修改后的仿真结果 修改后的完整代码 实现bgtz、bltz、jalr 仿真结果&#xff08;有问题&#xff09; bltz------并未跳转&#xff0c;jCe&#xff1f; 原因是该条跳转语句判断的寄存器r7&#xff0c;在该时刻并未被赋值 代码&#xff08;InstMem修改前&#xff09; i…

uniapp(小程序、app、微信公众号、H5)预览下载文件(pdf)

1. 小程序、app 在uniapp开发小程序环境或者app环境中,都可以使用以下方式预览文件 之前其实写过一篇,就是使用uniapp官网提供文件下载、文件保存、文件打开的API, uniapp文件下载 感兴趣也可以去看下 uni.downloadFile({// baseURL 是

Java面试专题——常见面试题1

引入 本文属于专题中的常见面试题模块&#xff0c;属于面试时经常遇到的&#xff0c;适合需要面试的小伙伴做面试前复习准备用&#xff0c;后续会持续补充 1.面向对象基本特征 面向对象的基本特征是什么&#xff1f;怎么理解&#xff1f; 面向对象的基本特征是封装、继承、…

【博客之星2024】技术洞察:前沿技术趋势与创新实践

前言 随着科技的飞速发展&#xff0c;技术的前沿趋势和跨领域的融合创新已经成为影响现代社会的重要因素。无论是 人工智能 (AI)、区块链 还是 量子计算&#xff0c;这些新兴技术不仅仅改变了行业的运行方式&#xff0c;还深入影响了社会、文化及个人的生活方式。因此&#xf…