详解 Flink 的 ProcessFunction API

一、Flink 不同级别的 API

在这里插入图片描述

  • Flink 拥有易于使用的不同级别分层 API 使得它是一个非常易于开发的框架
  • 最底层的 API 仅仅提供了有状态流处理,它将处理函数(Process Function )嵌入到了 DataStream API 中。底层处理函数(Process Function)与 DataStream API 相集成,可以对某些操作进行抽象,允许用户可以使用自定义状态处理来自一个或多个数据流的事件,且状态具有一致性和容错保证。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。
  • 核心 API(Core APIs),比如 DataStream API (用于处理有界或无界流数据)以及 DataSet API (用于处理有界数据集)在实际生产中一般使用较多。这些 API 为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations)、连接(joins)、聚合(aggregations)、窗口(windows)操作等。
  • Table API 是以表为中心的声明式编程,其中表在表达流数据时会动态变化。 Table API 遵循关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时 API 提供可比较的操作,例如 select、join、group-by、aggregate 等。
  • Flink 提供的最高层级的抽象是 SQL。这一层抽象在语法与表达能力上与 Table API 类似,但是是以 SQL 查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。

二、ProcessFunction 介绍

  • 相较于 map、filter 和 window 等特定的具体的操作而言,Flink 在底层 API 中提炼出一个统一通用的 process 操作,它是所有转换算子的一个概括性的表达,可以在对应的接口中自定义处理逻辑,而这一层接口就被叫作“处理函数”(ProcessFunction)
  • 处理函数 (ProcessFunction) 提供了一个“定时服务”(TimerService),可以通过它访问流中的事件(event )、时间戳(timestamp )、水位线(watermark),甚至可以注册“定时事件”
  • 处理函数 (ProcessFunction) 继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息
  • 处理函数 (ProcessFunction) 可以直接将数据输出到侧输出流(side output)中
  • 所以,处理函数 (ProcessFunction) 是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础

三、常见的 ProcessFunction 类

  • ProcessFunction:最基本的处理函数,基于 DataStream 直接调用 process() 时作为参数传入
  • KeyedProcessFunction:对流按键分区后的处理函数,基于 KeyedStream 调用 process() 时作为参数传入
  • CoProcessFunction:合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用 process() 时作为参数传入
  • ProcessJoinFunction:间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用 process() 时作为参数传入
  • BroadcastProcessFunction:广播连接流处理函数,基于 BroadcastConnectedStream 调用 process() 时作为参数传入。“广播连接流” BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物
  • KeyedBroadcastProcessFunction:按键分区的广播连接流处理函数,基于 BroadcastConnectedStream 调用 process() 时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流, 是一个 KeyedStream 与广播流(BroadcastStream)做连接之后的产物
  • ProcessWindowFunction:KeyedStream 开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用 process() 时作为参数传入
  • ProcessAllWindowFunction:DataStream 开窗之后的处理函数,基于 AllWindowedStream 调用 process() 时作为参数传入

四、ProcessFunction API 实战

1. KeyedProcessFunction

1.1 解析
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {//1.两个核心方法://1.1 流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs) public abstract void processElement(I value, Context ctx, Collector<O> out);//1.2 一个回调函数。当processElement中注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext 和 processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)public abstract void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out);//2.富函数的以下方法:open()/close()/getRuntimeContext()
}
1.2 ProcessFunction 的 Context
//Context的常用方法
context.timestamp(); //获取当前数据的时间戳
context.getCurrentKey(); //获取当前数据的 key
context.output(OutputTag<X> outputTag, X value); //输出侧输出流
context.timerService(); //获取 TimerService 对象
1.3 Timer 和 TimerService

ProcessFunction 的 Context 对象调用 timerService() 方法可以直接返回一个 TimerService 对象;定时器 Timer 只能在 KeyedStream 上面使用

//TimerService 是 Flink 关于时间和定时器的基础服务接口,包含以下六个方法:
//获取当前的处理时间
long currentProcessingTime();//获取当前的水位线(事件时间)
long currentWatermark();//注册处理时间定时器,当处理时间超过 time 时触发
void registerProcessingTimeTimer(long time);//注册事件时间定时器,当水位线超过 time 时触发
void registerEventTimeTimer(long time);//删除触发时间为 time 的处理时间定时器
void deleteProcessingTimeTimer(long time);//删除触发时间为 time 的处理时间定时器
void deleteEventTimeTimer(long time);
1.4 案例

需求:监控温度传感器的温度值,如果温度值在 10 秒钟之内 (processing time) 连续上升,则报警

public class ProcessFunctionCase {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});dataStream.keyBy("id").process(new TempContIncreWarning(10)).print();env.execute();}//自定义处理函数,用于监测一段时间内某个传感器温度值是否连续上升,输出报警信息public static class TempContIncreWarning extends KeyedProcessFunction<Tuple, SensorReading, String> {//定义私有属性:监测的时间间隔private Integer interval;public TempContIncreWarning(Integer interval) {this.interval = interval;}//定义两个值状态属性,分别保存上一次的温度值和定时器的时间戳private ValueState<Double> lastTempState;private ValueState<Long> timerTsState;@Overridepublic void open(Configuration parameters) throws Exception {lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp", Double.class));timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));}@Overridepublic void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {//获取状态值Double lastTemp = lastTempState.value();Long timerTs = timerTsState.value();//如果上一次的温度值为null或者上一次的温度值小于当前温度值并且定时器为null则注册定时器if(lastTemp == null || (lastTemp != null && value.getTemperature() > lastTemp && timerTs == null)) {Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;ctx.timerService().registerProcessingTimeTimer(ts);timerTsState.update(ts);} else if(value.getTemperature() < lastTemp && timerTs != null) {//如果上一次的温度值大于当前温度值且定时器不为null则删除定时器,清空定时器值状态ctx.timerService().deleteProcessingTimeTimer(timerTs);timerTsState.clear();}//更新温度值状态lastTempState.update(value.getTemperature());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {//定时器触发则输出报警信息out.collect("传感器" + ctx.getCurrentKey().getField(0) + "的温度在" + interval + "s内连续上升");timerTsState.clear();}@Overridepublic void close() throws Exception {lastTempState.clear();}}
}

2. 侧输出流

监控传感器温度值,将温度值低于 30 度的数据输出到 side output

/**核心方法:ProcessFunction中的 Context 对象的 output(OutputTag<X> outputTag, X value)
*/
public class SideOutputCase {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});//定义OutputTag,用来标记侧输出流的低温流OutputTag<SensorReading> lowTempTag = new OutputTag<SensorReading>("lowTemp"){};//DataStream不做keyBy,使用ProcessFunction的侧输出流进行高低温分流SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>(){@Overridepublic void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {if(value.getTemperature() > 30) {//高温流,输出到主流out.collect(value);} else {//低温流,输出到侧输出流ctx.output(lowTempTag, value);}}});//高温流highTempStream.print("high-temp");//低温流highTempStream.getSideOutput(lowTempTag).print("low-temp");env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/25212.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HarmonyOS开发-鸿蒙UiAbility 组件间跳转

前言 随着春节假期结束各行各业复产复工&#xff0c;一年一度的春招也持续火热起来。最近&#xff0c;有招聘平台发布了《2024年春招市场行情周报&#xff08;第一期&#xff09;》。总体来说今年的就业市场还是人才饱和的状态&#xff0c;竞争会比较激烈。 但是&#xff0c;…

Unity编辑器扩展,快捷键的使用

代码部分 编辑器界面 使用方法&#xff1a; 使用方法和如图1一样&#xff0c;只需要在Menuitem的路径后面加上标识符号就行。 "#"对应的是shift "&"对应的是Alt "%"对应的是ctrl 比如我图中的是&#xff0c;%#s对应的是CtrlShifts&…

确保数据完整性:使用 @NotNull 和 @NotBlank 注解

在 Java 应用程序中&#xff0c;数据验证是确保数据质量和应用稳定性的关键步骤。特别是在处理用户输入或持久化数据到数据库时&#xff0c;合适的数据验证策略能够预防错误数据的录入&#xff0c;提高系统的健壮性。本文将探讨两个常用的数据验证注解&#xff1a;NotNull 和 N…

ARM-V9 RME(Realm Management Extension)系统架构之系统安全能力的MTE

安全之安全(security)博客目录导读 ARM架构参考手册定义了在实现RME时对内存标记扩展&#xff08;FEAT_MTE&#xff09;所需的更改。内存标记使用分配标签&#xff08;Allocation Tags&#xff09;&#xff0c;可以为系统中的正常内存位置分配这些标签。存储分配标签的一种实现…

基于51单片机的串口乒乓球小游戏

基于51单片机的乒乓球小游戏 &#xff08;仿真&#xff0b;程序&#xff09; 功能介绍 具体功能&#xff1a; 1.用两块单片机串口进行通信&#xff1b; 2.一排LED模拟乒乓球运动&#xff08;哪里亮表示运动到哪&#xff09;&#xff1b; 3.当最左边LED亮&#xff0c;表示球…

【java、lucene、python】互联网搜索引擎课程报告二:建立搜索引擎

一、项目要求 建立并实现文本搜索功能 对经过预处理后的500个英文和中文文档/网页建立搜索并实现搜索功能对文档建立索引&#xff0c;然后通过前台界面或者已提供的界面&#xff0c;输入关键字&#xff0c;展示搜索结果前台可通过网页形式、应用程序形式、或者利用已有的界面…

程序员具备的职业素养(个人见解)

程序员应该有什么职业素养&#xff1f; 1. 技术能力 毫无疑问&#xff0c;优秀的技术是程序员的必备。 -扎实的编程基础&#xff1a;熟练掌握至少一门编程语言&#xff0c;并理解基本的数据结构和算法&#xff0c;要做到精通&#xff01;。 - 广泛的技术知识&#xff1a;了…

Databricks Data Warehouse

Warehouse features 原来的data warehouse痛点&#xff1a; 用例不兼容的支持模型的安全和管理不兼容不相交和重复的数据 ETL workloads Streaming Architecture Data Science and ML

matplotlib 动态显示训练过程中的数据和模型的决策边界

文章目录 Github官网文档简介动态显示训练过程中的数据和模型的决策边界安装源码 Github https://github.com/matplotlib/matplotlib 官网 https://matplotlib.org/stable/ 文档 https://matplotlib.org/stable/api/index.html 简介 matplotlib 是 Python 中最常用的绘图…

ghidra

https://github.com/NationalSecurityAgency/ghidra ghidra是一个so的逆向工具&#xff0c;功能和jadx-gui类似&#xff0c;但是和jadx-gui专注于java层的不同&#xff0c;ghidra专注于native层的代码反编译&#xff08;从二进制到c语言&#xff09;。 一、 安装 准备好java1…

解释一下I/O多路复用模型?

想象一下&#xff0c;你是一家小餐馆的老板&#xff0c;你的工作是接收顾客的订单&#xff0c;然后通知厨师开始准备。如果每次只能等一个顾客点完菜再接待下一个&#xff0c;那效率就太低了&#xff0c;顾客可能要等很久。 现在&#xff0c;有一种聪明的做法叫做“I/O多路复用…

js理解异步编程和回调

什么是异步 计算机在设计上是异步的。 异步意味着事情可以独立于主程序流发生。 当你打开一个网页&#xff0c;网页载入的过程&#xff0c;你又打开了编译器&#xff0c;那么你在网页载入时启动了编译器的行为就是计算机的异步&#xff0c; 可以看出计算机时一个超大的异步…

华为防火墙 1

华为防火墙1 实验拓扑&#xff1a; 实验步骤&#xff1a; 1.完成终端基本IP信息配置 2.配置防火墙&#xff1a; 2.1配置IP地址 sys Enter system view, return user view with CtrlZ. [USG6000V1]undo in e Info: Saving log files… Info: Information center is disabled. […

《科学,无尽的前沿》—— 程序员必读

一、总体概述 《科学&#xff0c;无尽的前沿》(Science The Endless Frontier)开创了大政治推动下的大科学工程新范式&#xff0c;被视为“美国科学政策的开山之作&#xff0c;促成了支持科学的“美国战后共识”&#xff0c;是“美国历史上最具影响力的政策文件之一”。 报告…

场内基金和场外基金的区别

场内基金就是只能在场内买卖的基金&#xff0c;只有股票账户才能买。场外就是在场内以外的交易场所买卖的基金。 场内基金和场外基金区别主要是费用和买卖价格。 场内基金和场外基金都有管理费和托管费。二者不同的主要是交易费用。场内基金买卖都要交交易费用&#xff0c;这…

基于小波脊线的一维时间序列信号分解方法(MATLAB R2018A)

信号分解技术是把一个复杂信号分解为若干含有时频信息的简单信号&#xff0c;研可通过分解后的简单信号来读取和分析复杂信号的有效特征。因此&#xff0c;信号分解技术对分析结果的影响是不言而喻的。 傅里叶分解是早期常用的信号分解方法&#xff0c;最初被用于分析热过程&a…

心链7 ----Redis的引入和实现以及缓存和定时任务应用

心链 — 伙伴匹配系统 Redis 数据查询慢怎么办&#xff1f; 用缓存&#xff1a;提前把数据取出来保存好&#xff08;通常保存到读写更快的介质&#xff0c;比如内存&#xff09;&#xff0c;就可以更快地读写。 缓存 Redis&#xff08;分布式缓存&#xff09;memcached&…

JavaScript基础(十二)

截取字符串 //对象名.toLowerCase();将字符串转为小写 var strLAOWANG; strstr.toLowerCase(); console.log(str); //对象名.toUpperCase();将字符串转为大写 var str1laowang str1str1.toUpperCase(); console.log(str1); 截取字符串 //方法1&#xff1a;对象名.substr(a,b); …

Unity世界坐标下UI始终朝向摄像机

Unity世界坐标下UI始终朝向摄像机 1、第一种方法UI会反过来 void Update(){this.transform.LookAt(Camera.main.transform.position);}2、第二种方法 Transform m_Camera;void Start(){m_Camera Camera.main.transform;}void LateUpdate(){transform.rotation Quaternion.Lo…

kafka-生产者事务-数据传递语义事务介绍事务消息发送(SpringBoot整合Kafka)

文章目录 1、kafka数据传递语义2、kafka生产者事务3、事务消息发送3.1、application.yml配置3.2、创建生产者监听器3.3、创建生产者拦截器3.4、发送消息测试3.5、使用Java代码创建主题分区副本3.6、屏蔽 kafka debug 日志 logback.xml3.7、引入spring-kafka依赖3.8、控制台日志…