详解 Flink 的 ProcessFunction API

一、Flink 不同级别的 API

在这里插入图片描述

  • Flink 拥有易于使用的不同级别分层 API 使得它是一个非常易于开发的框架
  • 最底层的 API 仅仅提供了有状态流处理,它将处理函数(Process Function )嵌入到了 DataStream API 中。底层处理函数(Process Function)与 DataStream API 相集成,可以对某些操作进行抽象,允许用户可以使用自定义状态处理来自一个或多个数据流的事件,且状态具有一致性和容错保证。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。
  • 核心 API(Core APIs),比如 DataStream API (用于处理有界或无界流数据)以及 DataSet API (用于处理有界数据集)在实际生产中一般使用较多。这些 API 为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations)、连接(joins)、聚合(aggregations)、窗口(windows)操作等。
  • Table API 是以表为中心的声明式编程,其中表在表达流数据时会动态变化。 Table API 遵循关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时 API 提供可比较的操作,例如 select、join、group-by、aggregate 等。
  • Flink 提供的最高层级的抽象是 SQL。这一层抽象在语法与表达能力上与 Table API 类似,但是是以 SQL 查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。

二、ProcessFunction 介绍

  • 相较于 map、filter 和 window 等特定的具体的操作而言,Flink 在底层 API 中提炼出一个统一通用的 process 操作,它是所有转换算子的一个概括性的表达,可以在对应的接口中自定义处理逻辑,而这一层接口就被叫作“处理函数”(ProcessFunction)
  • 处理函数 (ProcessFunction) 提供了一个“定时服务”(TimerService),可以通过它访问流中的事件(event )、时间戳(timestamp )、水位线(watermark),甚至可以注册“定时事件”
  • 处理函数 (ProcessFunction) 继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息
  • 处理函数 (ProcessFunction) 可以直接将数据输出到侧输出流(side output)中
  • 所以,处理函数 (ProcessFunction) 是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础

三、常见的 ProcessFunction 类

  • ProcessFunction:最基本的处理函数,基于 DataStream 直接调用 process() 时作为参数传入
  • KeyedProcessFunction:对流按键分区后的处理函数,基于 KeyedStream 调用 process() 时作为参数传入
  • CoProcessFunction:合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用 process() 时作为参数传入
  • ProcessJoinFunction:间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用 process() 时作为参数传入
  • BroadcastProcessFunction:广播连接流处理函数,基于 BroadcastConnectedStream 调用 process() 时作为参数传入。“广播连接流” BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物
  • KeyedBroadcastProcessFunction:按键分区的广播连接流处理函数,基于 BroadcastConnectedStream 调用 process() 时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流, 是一个 KeyedStream 与广播流(BroadcastStream)做连接之后的产物
  • ProcessWindowFunction:KeyedStream 开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用 process() 时作为参数传入
  • ProcessAllWindowFunction:DataStream 开窗之后的处理函数,基于 AllWindowedStream 调用 process() 时作为参数传入

四、ProcessFunction API 实战

1. KeyedProcessFunction

1.1 解析
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {//1.两个核心方法://1.1 流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs) public abstract void processElement(I value, Context ctx, Collector<O> out);//1.2 一个回调函数。当processElement中注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext 和 processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)public abstract void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out);//2.富函数的以下方法:open()/close()/getRuntimeContext()
}
1.2 ProcessFunction 的 Context
//Context的常用方法
context.timestamp(); //获取当前数据的时间戳
context.getCurrentKey(); //获取当前数据的 key
context.output(OutputTag<X> outputTag, X value); //输出侧输出流
context.timerService(); //获取 TimerService 对象
1.3 Timer 和 TimerService

ProcessFunction 的 Context 对象调用 timerService() 方法可以直接返回一个 TimerService 对象;定时器 Timer 只能在 KeyedStream 上面使用

//TimerService 是 Flink 关于时间和定时器的基础服务接口,包含以下六个方法:
//获取当前的处理时间
long currentProcessingTime();//获取当前的水位线(事件时间)
long currentWatermark();//注册处理时间定时器,当处理时间超过 time 时触发
void registerProcessingTimeTimer(long time);//注册事件时间定时器,当水位线超过 time 时触发
void registerEventTimeTimer(long time);//删除触发时间为 time 的处理时间定时器
void deleteProcessingTimeTimer(long time);//删除触发时间为 time 的处理时间定时器
void deleteEventTimeTimer(long time);
1.4 案例

需求:监控温度传感器的温度值,如果温度值在 10 秒钟之内 (processing time) 连续上升,则报警

public class ProcessFunctionCase {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});dataStream.keyBy("id").process(new TempContIncreWarning(10)).print();env.execute();}//自定义处理函数,用于监测一段时间内某个传感器温度值是否连续上升,输出报警信息public static class TempContIncreWarning extends KeyedProcessFunction<Tuple, SensorReading, String> {//定义私有属性:监测的时间间隔private Integer interval;public TempContIncreWarning(Integer interval) {this.interval = interval;}//定义两个值状态属性,分别保存上一次的温度值和定时器的时间戳private ValueState<Double> lastTempState;private ValueState<Long> timerTsState;@Overridepublic void open(Configuration parameters) throws Exception {lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp", Double.class));timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));}@Overridepublic void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {//获取状态值Double lastTemp = lastTempState.value();Long timerTs = timerTsState.value();//如果上一次的温度值为null或者上一次的温度值小于当前温度值并且定时器为null则注册定时器if(lastTemp == null || (lastTemp != null && value.getTemperature() > lastTemp && timerTs == null)) {Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;ctx.timerService().registerProcessingTimeTimer(ts);timerTsState.update(ts);} else if(value.getTemperature() < lastTemp && timerTs != null) {//如果上一次的温度值大于当前温度值且定时器不为null则删除定时器,清空定时器值状态ctx.timerService().deleteProcessingTimeTimer(timerTs);timerTsState.clear();}//更新温度值状态lastTempState.update(value.getTemperature());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {//定时器触发则输出报警信息out.collect("传感器" + ctx.getCurrentKey().getField(0) + "的温度在" + interval + "s内连续上升");timerTsState.clear();}@Overridepublic void close() throws Exception {lastTempState.clear();}}
}

2. 侧输出流

监控传感器温度值,将温度值低于 30 度的数据输出到 side output

/**核心方法:ProcessFunction中的 Context 对象的 output(OutputTag<X> outputTag, X value)
*/
public class SideOutputCase {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});//定义OutputTag,用来标记侧输出流的低温流OutputTag<SensorReading> lowTempTag = new OutputTag<SensorReading>("lowTemp"){};//DataStream不做keyBy,使用ProcessFunction的侧输出流进行高低温分流SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>(){@Overridepublic void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {if(value.getTemperature() > 30) {//高温流,输出到主流out.collect(value);} else {//低温流,输出到侧输出流ctx.output(lowTempTag, value);}}});//高温流highTempStream.print("high-temp");//低温流highTempStream.getSideOutput(lowTempTag).print("low-temp");env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/25212.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HarmonyOS开发-鸿蒙UiAbility 组件间跳转

前言 随着春节假期结束各行各业复产复工&#xff0c;一年一度的春招也持续火热起来。最近&#xff0c;有招聘平台发布了《2024年春招市场行情周报&#xff08;第一期&#xff09;》。总体来说今年的就业市场还是人才饱和的状态&#xff0c;竞争会比较激烈。 但是&#xff0c;…

Unity编辑器扩展,快捷键的使用

代码部分 编辑器界面 使用方法&#xff1a; 使用方法和如图1一样&#xff0c;只需要在Menuitem的路径后面加上标识符号就行。 "#"对应的是shift "&"对应的是Alt "%"对应的是ctrl 比如我图中的是&#xff0c;%#s对应的是CtrlShifts&…

基于51单片机的串口乒乓球小游戏

基于51单片机的乒乓球小游戏 &#xff08;仿真&#xff0b;程序&#xff09; 功能介绍 具体功能&#xff1a; 1.用两块单片机串口进行通信&#xff1b; 2.一排LED模拟乒乓球运动&#xff08;哪里亮表示运动到哪&#xff09;&#xff1b; 3.当最左边LED亮&#xff0c;表示球…

【java、lucene、python】互联网搜索引擎课程报告二:建立搜索引擎

一、项目要求 建立并实现文本搜索功能 对经过预处理后的500个英文和中文文档/网页建立搜索并实现搜索功能对文档建立索引&#xff0c;然后通过前台界面或者已提供的界面&#xff0c;输入关键字&#xff0c;展示搜索结果前台可通过网页形式、应用程序形式、或者利用已有的界面…

Databricks Data Warehouse

Warehouse features 原来的data warehouse痛点&#xff1a; 用例不兼容的支持模型的安全和管理不兼容不相交和重复的数据 ETL workloads Streaming Architecture Data Science and ML

matplotlib 动态显示训练过程中的数据和模型的决策边界

文章目录 Github官网文档简介动态显示训练过程中的数据和模型的决策边界安装源码 Github https://github.com/matplotlib/matplotlib 官网 https://matplotlib.org/stable/ 文档 https://matplotlib.org/stable/api/index.html 简介 matplotlib 是 Python 中最常用的绘图…

js理解异步编程和回调

什么是异步 计算机在设计上是异步的。 异步意味着事情可以独立于主程序流发生。 当你打开一个网页&#xff0c;网页载入的过程&#xff0c;你又打开了编译器&#xff0c;那么你在网页载入时启动了编译器的行为就是计算机的异步&#xff0c; 可以看出计算机时一个超大的异步…

华为防火墙 1

华为防火墙1 实验拓扑&#xff1a; 实验步骤&#xff1a; 1.完成终端基本IP信息配置 2.配置防火墙&#xff1a; 2.1配置IP地址 sys Enter system view, return user view with CtrlZ. [USG6000V1]undo in e Info: Saving log files… Info: Information center is disabled. […

基于小波脊线的一维时间序列信号分解方法(MATLAB R2018A)

信号分解技术是把一个复杂信号分解为若干含有时频信息的简单信号&#xff0c;研可通过分解后的简单信号来读取和分析复杂信号的有效特征。因此&#xff0c;信号分解技术对分析结果的影响是不言而喻的。 傅里叶分解是早期常用的信号分解方法&#xff0c;最初被用于分析热过程&a…

JavaScript基础(十二)

截取字符串 //对象名.toLowerCase();将字符串转为小写 var strLAOWANG; strstr.toLowerCase(); console.log(str); //对象名.toUpperCase();将字符串转为大写 var str1laowang str1str1.toUpperCase(); console.log(str1); 截取字符串 //方法1&#xff1a;对象名.substr(a,b); …

Unity世界坐标下UI始终朝向摄像机

Unity世界坐标下UI始终朝向摄像机 1、第一种方法UI会反过来 void Update(){this.transform.LookAt(Camera.main.transform.position);}2、第二种方法 Transform m_Camera;void Start(){m_Camera Camera.main.transform;}void LateUpdate(){transform.rotation Quaternion.Lo…

kafka-生产者事务-数据传递语义事务介绍事务消息发送(SpringBoot整合Kafka)

文章目录 1、kafka数据传递语义2、kafka生产者事务3、事务消息发送3.1、application.yml配置3.2、创建生产者监听器3.3、创建生产者拦截器3.4、发送消息测试3.5、使用Java代码创建主题分区副本3.6、屏蔽 kafka debug 日志 logback.xml3.7、引入spring-kafka依赖3.8、控制台日志…

关于烫烫烫和屯屯屯

微较的msvc编译器&#xff0c;调试模式下为了方便检测内存的非法访问&#xff0c;对于不同的内存做了初始化&#xff0c; 未初始化栈&#xff1a; 0xCCCCCCCC 未初始化堆&#xff1a; 0xCDCDCDCD 已释放的堆&#xff1a; 0xDDDDDDDD 0xCCCC解释为GB2312字符即是烫&#xff…

Django 鸡与蛋问题

"Django 的鸡与蛋问题"通常指的是在开始 Django 项目时&#xff0c;你可能会遇到的一个困境&#xff1a;是先设计数据库模型还是先编写视图和控制器&#xff08;即视图函数&#xff09;&#xff1f; 这个问题的实质是在于&#xff0c;Django 的核心部分是由数据库模…

Qt5/6使用SqlServer用户连接操作SqlServer数据库

网上下载SQLServer2022express版数据库,这里没啥可说的,随你喜欢,也可以下载Develop版本。安装完后,我们可以直接连接尝试, 不过一般来说,还是下载SQLServer管理工具来连接数据更加方便。 所以直接下载ssms, 我在用的时候,一开始只能用Windows身份登录。 所以首先,我…

入门matlab

常识 如何建一个新文件 创建新文件&#xff0c;点击新建&#xff0c;我们就可以开始写代码了 为什么要在代码开头加入clear 假如我们有2个文件&#xff0c;第一个文件里面给x赋值100&#xff0c;第二个文件为输出x 依次运行&#xff1a; 结果输出100&#xff0c;这是因为它们…

ChatGPT Prompt技术全攻略-精通篇:Prompt工程技术的高级应用

系列篇章&#x1f4a5; No.文章1ChatGPT Prompt技术全攻略-入门篇&#xff1a;AI提示工程基础2ChatGPT Prompt技术全攻略-进阶篇&#xff1a;深入Prompt工程技术3ChatGPT Prompt技术全攻略-高级篇&#xff1a;掌握高级Prompt工程技术4ChatGPT Prompt技术全攻略-应用篇&#xf…

电脑缺失msvcp110.dll文件的解决方法,总结5种靠谱的方法

在计算机使用过程中&#xff0c;我们可能会遇到一些错误提示&#xff0c;其中之一就是“找不到msvcp110.dll”。这个错误提示通常出现在运行某些软件时&#xff0c;那么&#xff0c;它究竟会造成哪些问题呢&#xff1f; 一&#xff0c;msvcp110.dll文件概述 msvcp110.dll是Mic…

推荐云盘哪个好,各有各的优势

选择合适的云盘服务是确保数据安全、便捷分享和高效协作的关键。下面将从多个维度对目前主流的云盘服务进行详细的对比和分析&#xff1a; 速度性能 百度网盘青春版&#xff1a;根据测试&#xff0c;其上传和下载确实不限速&#xff0c;但主要定位是办公人群&#xff0c;适用于…

STM32F103C8T6 HAL库 USART1 DMA方式接收数据

前言&#xff1a; 前面的两篇文章都说关于发送的&#xff0c;HAL库发送数据可以调用现成的函数&#xff0c;而接收数据&#xff0c;现成函数不太好用。这里为了记录了一下自己参考了网上几个大佬的代码&#xff0c;整理了一下USART1 DMA方式接受数据的代码&#xff0c;…