Flink 处理函数(1)—— 基本处理函数

在 Flink 的多层 API中,处理函数是最底层的API,是所有转换算子的一个概括性的表达,可以自定义处理逻辑

在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权

基本处理函数主要是定义数据流的转换操作,其所对应的函数类为ProcessFunction


处理函数的功能和使用

对于常用的转换算子来说:

  • MapFunction只能获取到当前的数据;
  • AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式出现);
  • RichMapFunction提供了获取运行时上下文的方法 getRuntimeContext();

但是无论那种算子,如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的

与时间相关的操作只能用时间窗口去处理,但如果要求对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了

因此需要使用处理函数

  • 处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”
  • 处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息
  • 处理函数还可以直接将数据输出到侧输出流(side output)中

处理函数的简单使用:基于 DataStream 调用.process()方法就;方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑:

stream.process(new MyProcessFunction())

简单示例:

public class ProcessFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}})).process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {if (value.user.equals("Mary")) {out.collect(value.user);} else if (value.user.equals("Bob")) {out.collect(value.user);out.collect(value.user);}System.out.println(ctx.timerService().currentWatermark());}}).print();env.execute();}
}

ProcessFunction 中重写了.processElement()方法(参数:输入,上下文对象,输出),自定义处理逻辑

ProcessFunction 解析

源码解析

源码如下:

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;/*** Process one element from the input stream.** <p>This function can output zero or more elements using the {@link Collector} parameter and* also update internal state or set timers using the {@link Context} parameter.** @param value The input value.* @param ctx A {@link Context} that allows querying the timestamp of the element and getting a*     {@link TimerService} for registering timers and querying the time. The context is only*     valid during the invocation of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the*     operation to fail and may trigger recovery.*/public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;/*** Called when a timer set using {@link TimerService} fires.** @param timestamp The timestamp of the firing timer.* @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,*     querying the {@link TimeDomain} of the firing timer and getting a {@link TimerService}*     for registering timers and querying the time. The context is only valid during the*     invocation of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the*     operation to fail and may trigger recovery.*/public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}/*** Information available in an invocation of {@link #processElement(Object, Context, Collector)}* or {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class Context {/*** Timestamp of the element currently being processed or timestamp of a firing timer.** <p>This might be {@code null}, for example if the time characteristic of your program is* set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.*/public abstract Long timestamp();/** A {@link TimerService} for querying time and registering timers. */public abstract TimerService timerService();/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/public abstract <X> void output(OutputTag<X> outputTag, X value);}/*** Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class OnTimerContext extends Context {/** The {@link TimeDomain} of the firing timer. */public abstract TimeDomain timeDomain();}
}

可以看到抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:

I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型

其内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()

  • .processElement():用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义
    • value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致
    • ctx:类型是 ProcessFunction 中定义的内部抽象类 Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()
    • out:“收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用
  • .onTimer():用于定义定时触发的操作;这个方法只有在注册好的定时器触发的时候才会调用(在 Flink 中,只有“按键分区流”KeyedStream才支持设置定时器的操作),而定时器是通过“定时服务”TimerService 来注册的
    • 参数:时间戳(timestamp),上下文(ctx),收集器(out)【这里的时间戳是指设置好的触发时间,在事件时间语义下就是水位线

利用onTimer,可以自定义数据按照时间分组、定时触发计算输出结果,这样就实现了窗口的功能

处理函数分类

  1. ProcessFunction:最基本的处理函数,基于 DataStream 直接调用.process()时作为参数传入
  2. KeyedProcessFunction:对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入(要想使用定时器必须基于 KeyedStream )
  3. ProcessWindowFunction:开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process()时作为参数传入
  4. ProcessAllWindowFunction:开窗之后的处理函数,基于 AllWindowedStream 调用.process()时作为参数传入
  5. CoProcessFunction:合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入
  6. ProcessJoinFunction:间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入
  7. BroadcastProcessFunction:广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入(这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物)
  8. KeyedBroadcastProcessFunction:按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream 调用.process()时作为参数传入(这时的广播连接流,是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物)

学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/625951.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于SSM的项目监管系统设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

xtdrone用键盘控制无人机飞行 无法起飞

运行案例 解锁无人机螺旋桨转动但无法起飞 也未报错 解决方法&#xff1a; 在QGC中修改&#xff1a;PX4飞控EKF配置 将PX4使用的EKF配置为融合GPS的水平位置与气压计高度。 如果我们想使用视觉定位&#xff0c;就需要把修改配置文件。 此修改意味着EKF融合来自mavros/vision_…

基于SSM的网上招聘系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

AtCoder Beginner Contest 336 G. 16 Integers(图计数 欧拉路径转欧拉回路 矩阵树定理 best定理)

题目 给16个非负整数&#xff0c;x[i∈(0,1)][j∈(0,1)][k∈(0,1)][l∈(0,1)] 求长为n3的01串的方案数&#xff0c;满足长度为4的ijkl&#xff08;2*2*2*2&#xff0c;16种情况&#xff09;串恰为x[i][j][k][l]个 答案对998244353取模 思路来源 https://www.cnblogs.com/tz…

Go后端开发 -- 数组 slice map range

Go后端开发 – 数组 && slice && map && range 文章目录 Go后端开发 -- 数组 && slice && map && range一、数组1.数组的声明和初始化2.数组的传参 二、slice切片1.slice的定义和初始化2.len()和cap()函数3.空切片4.切片截取5…

【计算机组成与体系结构Ⅱ】指令调度与分支延迟(实验)

实验4&#xff1a;指令调度与分支延迟 一、实验目的 1. 加深对指令调度技术的理解。 2. 加深对分支延迟技术的理解。 3. 熟练采用指令调度技术解决流水线中的数据冲突的方法。 4. 进一步理解指令调度技术对CPU性能的改进。 5. 进一步理解延迟分支技术对CPU性能的改进。 二…

装完32G内存条 电脑飞跃提升!

我是南城余&#xff01;阿里云开发者平台专家博士证书获得者&#xff01; 欢迎关注我的博客&#xff01;一同成长&#xff01; 一名从事运维开发的worker&#xff0c;记录分享学习。 专注于AI&#xff0c;运维开发&#xff0c;windows Linux 系统领域的分享&#xff01; 大家…

MiniTab的拟合回归模型的分析

拟合回归模型概述 使用拟合回归模型和普通最小二乘法可以描述一组预测变量和一个连续响应之间的关系。可以包括交互作用项和多项式项、执行逐步回归和变换偏斜数据。 例如&#xff0c;房地产评估人员想了解城市公寓与多个预测变量&#xff08;包括建筑面积、可用单元数量、建…

【YOLO系列】 YOLOv4之Focal Loss损失函数

论文下载&#xff1a;Focal Loss for Dense Object Detection 一、简介 Focal Loss损失函数何凯明大神在RetinaNet网络中提出来的&#xff0c;主要是为了解决one-stage目标检测中正负样本比例严重失衡的问题。该损失函数降低了大量简单负样本在训练中所占的比重&#xff0c;也可…

安装Anaconda遇到的问题

报错如下&#xff1a; Anaconda3 5.1.0(64-bit) Setup Error:Due to incompatibility with several Pyth on libraries, Destination Folder’cannot contain non-ascii characters(special characters or diacritics). Please choose another location. 原因&#xff1a;安装…

基于ssm百货中心供应链管理系统+jsp论文

摘 要 社会发展日新月异&#xff0c;用计算机应用实现数据管理功能已经算是很完善的了&#xff0c;但是随着移动互联网的到来&#xff0c;处理信息不再受制于地理位置的限制&#xff0c;处理信息及时高效&#xff0c;备受人们的喜爱。本次开发一套百货中心供应链管理系统有管理…

transfomer中Decoder和Encoder的base_layer的源码实现

简介 Encoder和Decoder共同组成transfomer,分别对应图中左右浅绿色框内的部分. Encoder&#xff1a; 目的&#xff1a;将输入的特征图转换为一系列自注意力的输出。 工作原理&#xff1a;首先&#xff0c;通过卷积神经网络&#xff08;CNN&#xff09;提取输入图像的特征。然…

构建未来教育:在线培训系统开发的技术探讨

随着远程学习的崛起和数字化教育的普及&#xff0c;在线培训系统的开发成为了现代教育的核心。本文将深入讨论在线培训系统的关键技术要点&#xff0c;涵盖前后端开发、数据库管理、以及安全性和身份验证等关键方面。 前端开发&#xff1a;提供交互性与用户友好体验 在构建在…

京东ES支持ZSTD压缩算法上线了:高性能,低成本 | 京东云技术团队

1 前言 在《ElasticSearch降本增效常见的方法》一文中曾提到过zstd压缩算法[1]&#xff0c;一步一个脚印我们终于在京东ES上线支持了zstd&#xff1b;我觉得促使目标完成主要以下几点原因&#xff1a; Elastic官方原因&#xff1a;zstd压缩算法没有在Elastic官方的开发计划中&…

最新智能AI系统ChatGPT网站程序源码+详细图文搭建部署教程,Midjourney绘画,GPT语音对话+ChatFile文档对话总结+DALL-E3文生图

一、前言 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作Ch…

如何增加服务器的高并发

随着互联网的快速发展和普及&#xff0c;越来越多的应用程序需要支持高并发的请求处理。在这种情况下增加服务器的高并发能力成为了一个热门的话题。下面简单的介绍如果提高服务器的高并发能力。 负载均衡 是把请求分发到多个服务器上&#xff0c;来实现请求的平衡和分担。负…

(一)环境部署

Python虚拟环境 安装virtualenv pip install virtualenv 创建环境 virtualenv -p D:\python\python.exe(python解释器目录) env-py3.6(虚拟环境目录&#xff0c;名称随意) 在当前目录下生成env-py3.6目录。 激活环境 ...\env-py3.6\Scripts> .\activate 关闭&#xf…

STM32 CubeIDE 使用 CMSIS-DAP烧录 (方法2--外部小工具)

前言&#xff1a; 本篇所用方法&#xff0c;需要借助一个外部的工具小软件。 优点&#xff1a;烧录更稳定; 缺点&#xff1a;不能在线仿真调试。 下面链接&#xff0c;是另一种方法&#xff1a;修改CubeIDE调试文件。能在CubeIDE直接烧录、仿真&#xff0c;但不稳定。…

Bazel

简介&#xff1a; Bazel 是 google 研发的一款开源构建和测试工具,也是一种简单、易读的构建工具。 Bazel 支持多种编程语言的项目&#xff0c;并针对多个平台构建输出。 高级构建语言&#xff1a;Bazel 使用一种抽象的、人类可读的语言在高语义级别上描述项目的构建属性。与其…

uniapp 简易自定义日历

1、组件代码 gy-calendar-self.vue <template><view class"calendar"><view class"selsct-date">请选择预约日期</view><!-- 日历头部&#xff0c;显示星期 --><view class"weekdays"><view v-for"…