【Flink】Flink 处理函数之基本处理函数(一)

1. 处理函数介绍

流处理API,无论是基本的转换聚合、还是复杂的窗口操作,都是基于DataStream进行转换的,所以统称为DataStreamAPI,这是Flink编程的核心。

但其实Flink为了更强大的表现力和易用性,Flink本身提供了多层API,DataStreamAPI只是中间一环,如下图所示:在这里插入图片描述
在更底层,Flink可以不定义任何具体的算子(比如 mapfilter,或者 window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)

在处理函数中,操作的就是数据流中最基本的元素:数据事件(event)状态(state)以及时间(time)。这就相当于对流有了完全的控制权。处理函数比较抽象,没有具体的操作,所以对于一些常见的简单应用(比如求和、开窗口)会显得有些麻烦;不过正是因为它不限定具体做什么,所以理论上我们可以做任何事情,实现所有需求。

2. 处理函数的分类

DataStream 在调用一些转换方法之后,有可能生成新的流类型;例如调用.keyBy()之后得到 KeyedStream,进而再调用.window()之后得到 WindowedStream。对于不同类型的流,其实都可以直接调用.process()方法进行自定义处理,这时传入的参数就都叫作处理函数。当然,它们尽管本质相同,都是可以访问状态和时间信息的底层 API,可彼此之间也会有所差异。

Flink 提供了 8 个不同的处理函数:

  • ProcessFunction
    最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。
  • KeyedProcessFunction
    对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于 KeyedStream
  • ProcessWindowFunction
    开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process()时作为参数传入。
  • ProcessAllWindowFunction
    同样是开窗之后的处理函数,基于 AllWindowedStream 调用.process()时作为参数传入。
  • CoProcessFunction
    合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入。
  • ProcessJoinFunction
    间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入。
  • BroadcastProcessFunction
    广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream与一个广播流(BroadcastStream)连接(conncet)之后的产物。
  • KeyedBroadcastProcessFunction
    按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream广播流(BroadcastStream)做连接之后的产物。

2.1 基本处理函数(ProcessFunction)

处理函数主要是定义数据流的转换操作,所以也可以把它归到转换算子中。在Flink 中几乎所有转换算子都提供了对应的函数类接口,处理函数也不例外;它所对应的函数类,就叫作 ProcessFunction

2.1.1 处理函数的功能和使用

转换算子一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。比如Map算子只能获取当前的数据;而想窗口聚合复杂的操作AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式出现)。另外还有富函数类,比如 RichMapFunction,它提供了获取运行时上下文的方法 getRuntimeContext(),可以拿到状态,还有并行度任务名称之类的运行时信息。

但无论那种算子,如果想要访问事件的时间戳,或者当前的水位线信息,都是获取不到的。但是处理函数可以获取,处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)时间戳(timestamp)水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。

处理函数的使用与基本的转换操作类似,只需要直接基于 DataStream 调用.process()方法就可以了。方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑。

stream.process(new MyProcessFunction())

这里 ProcessFunction 不是接口,而是一个抽象类,继承了 AbstractRichFunctionMyProcessFunction 是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。

代码实例:

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env  = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}}));stream.process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {out.collect(value.toString());if (value.getUser().equals("Mary")) {out.collect(value.user + "click " + value.getUrl());} else if (value.getUser().equals("Alice")) {out.collect(value.user);out.collect(value.user);}System.out.println("timestamp:" + ctx.timestamp());System.out.println("watermark:" + ctx.timerService().currentWatermark());System.out.println(getRuntimeContext().getIndexOfThisSubtask());}@Overridepublic void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);}@Overridepublic void close() throws Exception {super.close();}}).print();env.execute();}

运行结果:
在这里插入图片描述

这里第一次的水位线的值其实是个默认值,Long.MIN_VALUE + outOfOrdernessMillis + 1;
在这里插入图片描述

然后每次下一次的水位线都是上一次的timestamp - 1

2.1.2 ProcessFunction 解析

抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型。

内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...
public abstract void processElement(I value, Context ctx, Collector<O> out) 
throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) 
throws Exception {}
...
}
2.1.2.1 抽象方法.processElement()

用于处理元素,定义了处理的核心逻辑。这个方法对流中的每个元素都会调用一次,参数包括三个: 输入数据值 value上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义的。

  • value: 当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致。
  • cts:类型是ProcessFunction中定义的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()

Context 抽象类定义如下:

public abstract class Context {public abstract Long timestamp();public abstract TimerService timerService();public abstract <X> void output(OutputTag<X> outputTag, X value);
}
  • out: “收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。

ProcessFunction 可以轻松实现flatMap这样的基本转换功能(当然 mapfilter 更不在话下);而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自定义状态(state)进行处理,这也就能实现聚合操作的功能了。

2.1.2.2 非抽象方法.onTimer()
@Override
public void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);
}

用于定义定时触发的操作,这是一个非常强大、也非常有趣的功能。这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService 来注册的。打个比方,注册定时器(timer)就是设了一个闹钟,到了设定时间就会响;而.onTimer()中定义的,就是闹钟响的时候要做的事。所以它本质上是一个基于时间的“回调”(callback)方法,通过时间的进展来触发;在事件时间语义下就是由水位线(watermark)来触发了。

.processElement()类似,定时方法.onTimer()也有三个参数:时间戳(timestamp)上下文(ctx),以及收集器(out)。这里的timestamp是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。

既然有.onTimer()方法做定时触发,我们用ProcessFunction也可以自定义数据按照时间分组、定时触发计算输出结果;这其实就实现了窗口(window)的功能。所以说 ProcessFunction是真正意义上的终极奥义,用它可以实现一切功能。

处理函数都是基于事件触发的。水位线就如同插入流中的一条数据一样;只不过处理真正的数据事件调用的是.processElement()方法,而处理水位线事件调用的是.onTimer()

.onTimer()方法只是定时器触发时的操作,而定时器(timer)真正的设置需要用到上下文 ctx 中的定时服务。在Flink中,只有“按键分区流”KeyedStream才支持设置定时器的操作,所以之前的代码中并没有使用定时器。所以基于不同类型的流,可以使用不同的处理函数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/771854.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学习SpringBoot笔记--知识点(2)

目录 数据访问 基础特性 自定义banner Profiles Profiles组件 Profiles配置文件 外部化配置 单元测试 数据访问 整合SSM场景 SpringBoot 整合 Spring&#xff0c;SpringMVC&#xff0c;MyBatis 进行数据访问场景开发 需要的依赖&#xff1a; <!-- web启动…

leetcode 518.零钱兑换 II

思路&#xff1a;和第一道是一样的问题&#xff0c;也就是完全背包问题。 我们首先可以看到&#xff0c;每一个数都是可以重复使用的&#xff0c;而且&#xff0c;数的选择上有两种&#xff0c;一种就是选&#xff0c;一种就是不选。所以会想到完全背包问题。 上一个题的零钱…

如何从其他平台复制商品上传到自己店铺?官方授权接口,一键复制爆款同款

很多做一件代发的卖家在上新时会从其他平台选品铺货&#xff0c;看到某个商品卖得不错&#xff0c;直接复制到自己店铺去卖&#xff0c;可以节省测款成本。 现在平台严查无货源&#xff0c;直接从别人店铺搬运商品属于违规违规行为&#xff0c;想要复制这个爆款的话&#xff0…

鸿蒙HarmonyOS应用开发—AbilityStage组件容器

AbilityStage是一个Module级别的组件容器&#xff0c;应用的HAP在首次加载时会创建一个AbilityStage实例&#xff0c;可以对该Module进行初始化等操作。 AbilityStage与Module一一对应&#xff0c;即一个Module拥有一个AbilityStage。 DevEco Studio默认工程中未自动生成Abil…

ArcGIS二次开发(一)——搭建开发环境以及第一个简单的ArcGIS Engine 程序

Arcgis10.2、Arcgis Engine10.2与Microsoft Visual Studio 2012的版本进行安装 1、推荐教程与安装包2、安装顺序3、安装成功测试VS新建项目可以创建ArcGIS项目&#xff0c;并且在VS中拖拽ArcGIS工具 4、搭建第一个简单的ArcGIS Engine 程序 ArcEngine和VS版本是有对应的&#x…

东方 - 分支(1)

目录 解析部分&#xff1a;双分支1303. 冷饮的价格&#xff08;1&#xff09;问题描述解题思路代码实现代码解析 1033. 判断奇偶数问题描述解题思路代码实现代码解析 1302. 是否适合晨练&#xff1f;问题描述解题思路代码实现代码解析 1632. 需要几辆车问题描述解题思路代码实现…

Leetcode 453. 最小操作次数使数组元素相等

原题链接&#xff1a;Leetcode 453. minimum moves to equal array elements Given an integer array nums of size n, return the minimum number of moves required to make all array elements equal. In one move, you can increment n - 1 elements of the array by 1. …

【Python】enumerate函数的使用方法,小白一看就懂

enumerate函数的使用方法&#xff1a; season[‘a’,‘b’,‘c’,‘d’] for i in enumerate(season): print(i) season[‘a’,‘b’,‘c’,‘d’] for i,eliment in enumerate(season): print(i,eliment) 输出结果为&#xff1a; 练习题&#xff1a; 2.给出10个学生姓名…

5. C++ 局部静态变量在什么时候分配内存和初始化?

C局部静态变量在什么时候分配内存和初始化&#xff1f; 对于C语言的全局和静态变量&#xff0c;不管是否被初始化&#xff0c;其内存空间都是全局的&#xff1b;如果初始化&#xff0c;那么初始化发生在任何代码执行之前&#xff0c;属于编译期初始化。由于内置变量无须资源释…

从0到1:校园生活圈小程序开发笔记(一)

可行性研究 校园生活圈小程序是一种面向大学或学院校园的社交平台&#xff0c;旨在为校园内的师生提供交流、分享、互助和信息发布等功能。 为校园内的师生提供一个便捷的平台&#xff0c;帮助他们更好地了解校园生活、参与校园活动、交流学习和共享资源。 功能分解 公告资讯…

力扣HOT100 - 42. 接雨水

解题思路&#xff1a; 动态规划 感觉不是很好想 class Solution {public int trap(int[] height) {int n height.length;if (n 0) return 0;int[] leftMax new int[n];leftMax[0] height[0];for (int i 1; i < n; i) {leftMax[i] Math.max(leftMax[i - 1], height[i…

JS加密解密之应用如何保存到桌面书签

前言 事情起因是这样的&#xff0c;有个客户解密了一个js&#xff0c;然后又看不懂里边的一些逻辑&#xff0c;想知道它是如何自动拉起谷歌浏览器和如何保存应用到书签的&#xff0c;以及如何下载应用的。继而诞生了这篇文章&#xff0c;讲解一下他的基本原理。 渐进式Web应用…

docker使用教程

寒假用了docker 2个月没用 结果还重新安装docker 忘了怎么用 为了免得以后忘写下下面内容 # If you dont have a docker installed, youll need to install docker curl -s https://get.docker.com/ | sh # Use pip to install docker-compose pip install docker-compose…

【力扣hot100】207 课程表(c++、python)解析

相关题目&#xff1a; 210 课程表2 【力扣hot100】207 课程表&#xff08;c、python&#xff09;解析 1.官方题解&#xff1a;1.1深搜c版本python版本 1.2广搜c版本 1.官方题解&#xff1a; 这是一题经典的「拓扑排序」问题 给定一个包含 n 个节点的有向图 G&#xff0c;我们…

网络七层模型:理解网络通信的架构(〇)

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

idea-创建java8的springboot项目

现在使用IDEA创建 Spring Boot 项目&#xff0c;jdk 版本最低要求为 17。Spring Boot 官方在全力维护 3.x 版本&#xff0c;而 Spring Boot 3.x 对 jdk 版本的最低要求为17。 如果需要继续使用 jdk8&#xff0c;则需要修改 Server URL &#xff0c;改成&#xff1a;https://st…

解决 vue activited 无效问题

当对页面APP.vue组件router-view标签使用了keep-alive之后在组件activated状态时不会发送请求&#xff0c;这时需要使用 keep-alive标签的 exclude属性排除需要重新发送请求的组件。需要注意exclude的值要和组件本身的name值要一致&#xff0c;如果不一致就会不生效。目前我出现…

Linux服务器安装部署Harbor

Linux服务器安装部署Harbor详细说明文档&#xff1a;https://gitee.com/WilliamWangmy/snail-knowledge/blob/master/Docker/%E5%AE%89%E8%A3%85Harbor.md ps&#xff1a;如果觉得作者写的还行&#xff0c;能够满足您的需求&#xff0c;请给作者的开源项目start。如果觉得文章存…

软件工程---软件设计模式和软件体系结构

软件设计模式 软件设计模式是针对解决特定问题的通用解决方案的指导性原则和规范。设计模式通常关注如何在代码级别解决问题&#xff0c;提供了一种在软件设计中反复使用的经验性方法。设计模式通过将问题和解决方案进行抽象&#xff0c;帮助开发人员更好地理解和应用面向对象…

C++运算符重载中的引用返回

文章目录 引言原因1.为了支持链式调用2.避免不必要的对象创建和复制3.保持语义一致性 引言 在C编程语言中&#xff0c;运算符重载是一项强大的特性&#xff0c;它允许程序员为自定义类型重新定义或重载已有的运算符&#xff0c;从而使得这些类型能够像内置类型一样使用运算符。…