Flink学习-处理函数

简介

处理函数是Flink底层的函数,工作中通常用来做一些更复杂的业务处理,处理函数分好几种,主要包括基本处理函数,keyed处理函数,window处理函数。

Flink提供了8种不同处理函数:

  • ProcessFunction:dataStream
  • KeyedProcessFunction:用于KeyedStream,keyBy之后的流处理
  • CoProcessFunction:用于connect连接的流
  • ProcessJoinFunction:用于join流操作
  • BroadcastProcessFunction:用于广播
  • KeyedBroadcastProcessFunction:keyBy之后的广播
  • ProcessWindowFunction:窗口增量聚合
  • ProcessAllWindowFunction:全窗口聚合
@Public
public interface RichFunction extends Function {void open(Configuration var1) throws Exception;void close() throws Exception;RuntimeContext getRuntimeContext();IterationRuntimeContext getIterationRuntimeContext();void setRuntimeContext(RuntimeContext var1);
}

基本处理函数(ProcessFunction)

处理函数主要是定义数据流的转换操作,所以也可以把它归到转换算子中。它所对应的函数 类,就叫作 ProcessFunction

处理函数的功能和使用

在很多应 用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。就必须使用处理函数进行实现。

处理函数提供了一个“定时服务” (TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线 (watermark),甚至可以注册“定时事件”。处理函数继承了 AbstractRichFunction 抽象类, 所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函 数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方 法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。

处理函数的使用与基本的转换操作类似,只需要直接基于 DataStream 调用.process()方法 就可以了。ProcessFunction 不是接口,而是一个抽象类,继承了 AbstractRichFunction; 所以所有的处理函数,都是富函数(RichFunction), 富函数可以调用的东西这里同样都可以调用。

PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;public ProcessFunction() {}public abstract void processElement(I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3) throws Exception;public void onTimer(long timestamp, ProcessFunction<I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {}//public abstract class OnTimerContext extends ProcessFunction<I, O>.Context {public OnTimerContext() {super();}public abstract TimeDomain timeDomain();}//public abstract class Context {public Context() {}public abstract Long timestamp();public abstract TimerService timerService();public abstract <X> void output(OutputTag<X> var1, X var2);}
}

processElement会针对流中的每条记录调用一次。跟MapFunction一样,Collector发送出去。

Context可以访问时间戳,当前记录键值以及TimeService,支持将结果发送到副输出。

onTimer() 是一个回调函数,会在之前注册的计数器触发时调用。timestamp 参数给出了所触发计时器的时间戳,Collector可用来发出记录。

OnTimerContext能够提供和processElement()方法中Context对象相同的服务,它还会返回触发计时器的时间域(处理时间/事件时间)。

时间服务和计时器

ContextOnTimerContext对象中TimerService 提供时间相关的数据访问。

PublicEvolving
public interface TimerService {/** Error string for {@link UnsupportedOperationException} on registering timers. */String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";/** Error string for {@link UnsupportedOperationException} on deleting timers. */String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";/** 返回当前的处理时间。 */long currentProcessingTime();/** 返回当前水位线时间戳。 */long currentWatermark();/**针对当前键值注册一个处理时间计时器,当执行机器处理时间达到给定的时间戳,该计时器就会触发。*/void registerProcessingTimeTimer(long time);/*** 针对当前键值注册一个事件时间计时器,当更新后水位线时间戳大于或等于计时器时间戳时,它就会触发。*/void registerEventTimeTimer(long time);/*** 针对当前键值删除一个注册过的处理时间计时器。如果该计时器不存在,则方法不会有任何作用。*/void deleteProcessingTimeTimer(long time);/*** 针对当前键值删除一个注册过事件时间计时器,如果该计时器不存在,则方法不会有任何作用。*/void deleteEventTimeTimer(long time);
}

计时器触发时会调用onTimer()回调函数,系统对于processElement()onTimer()两个方法调用同步,防止并发。

每个键值和时间戳只能注册一个计时器,每个键值可以有多个计时器,但具体到每个时间戳就只能有一个。

副输出/侧输出(SideOutput)

大多数DataStream API 算子都只有一个输出,即只能生成一条某个数据类型的结果流。只有split算子可以将一条流拆分成多条类型相同的流。

处理函数提供的副输出功能允许从同一函数发出多条数据流,它们类型可以不同。

按键分区处理函数(KeyedProcessFunction)

按键分区处理函数是重点,用在keyby后面,对keyedStream进行处理,keyby将会按照Key进行分区,然后将不同key的数据分配到不同并行子任务上进行执行。

PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}public abstract class Context {public abstract Long timestamp();public abstract TimerService timerService();public abstract <X> void output(OutputTag<X> outputTag, X value);/** Get key of the element being processed. */public abstract K getCurrentKey();}public abstract class OnTimerContext extends Context {/** The {@link TimeDomain} of the firing timer. */public abstract TimeDomain timeDomain();/** Get key of the firing timer. */@Overridepublic abstract K getCurrentKey();}
}

窗口处理函数(ProcessWindowsFunction)

除了上面的按键分区处理函数之外,对于窗口也有函数,分两种,一种是窗口处理函数(ProcessWindowsFunction),另一种是全窗口处理函数(ProcessAllWindowsFunction),ProcessWindowFunction获得一个包含窗口所有元素的可迭代器以及一个具有时间和状态信息访问权的上下文对象,使得它比其他窗口函数提供更大的灵活性。是以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到认为窗口可以处理为止。

ProcessWindowsFunction:处理分区数据,每个窗口执行一次process方法.

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>extends AbstractRichFunction {private static final long serialVersionUID = 1L;public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;public void clear(Context context) throws Exception {}/** The context holding window metadata. */public abstract class Context implements java.io.Serializable {/** Returns the window that is being evaluated. */public abstract W window();public abstract long currentProcessingTime();public abstract long currentWatermark();public abstract KeyedStateStore windowState();public abstract KeyedStateStore globalState();public abstract <X> void output(OutputTag<X> outputTag, X value);}
}

全窗口处理函数(ProcessAllWindowFunction)

ProcessAllWindowFunctionProcessFunction类相似,都是用来对上游过来的元素做处理,不过ProcessFunction是每个元素执行一次processElement方法,ProcessAllWindowFunction是每个窗口执行一次process方法(方法内可以遍历该窗口内的所有元素);

public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window>extends AbstractRichFunction {private static final long serialVersionUID = 1L;public abstract void process(Context context, Iterable<IN> elements, Collector<OUT> out)throws Exception;public void clear(Context context) throws Exception {}/** The context holding window metadata. */public abstract class Context {public abstract W window();public abstract KeyedStateStore windowState();public abstract KeyedStateStore globalState();public abstract <X> void output(OutputTag<X> outputTag, X value);}
}

合并流处理函数(CoProcessFunction)

对于连接流ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调用。我们把这种接口叫作“协同处理函数”(co-process function)。与 CoMapFunction 类似,如果是调用.flatMap()就需要传入一个 CoFlatMapFunction,需要实现 flatMap1()flatMap2()两个方法;而调用.process()时,传入的则是一个 CoProcessFunction

public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {private static final long serialVersionUID = 1L;public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out)throws Exception;public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out)throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}public abstract class Context {public abstract Long timestamp();/** A {@link TimerService} for querying time and registering timers. */public abstract TimerService timerService();public abstract <X> void output(OutputTag<X> outputTag, X value);}public abstract class OnTimerContext extends Context {public abstract TimeDomain timeDomain();}
}

连接流处理函数(ProcessJoinFunction)

ProcessJoinFunctionCoProcessFunction类似,但是有区别。

ProcessJoinFunction 用于join流操作,可以拿到两个流数据处理

CoProcessFunction 用于连接流处理,两个流数据分别处理

public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {private static final long serialVersionUID = -2444626938039012398L;public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out)throws Exception;public abstract class Context {public abstract long getLeftTimestamp();public abstract long getRightTimestamp();public abstract long getTimestamp();public abstract <X> void output(OutputTag<X> outputTag, X value);}
}

广播流处理函数(BroadcastProcessFunction)

广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流” BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物。

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {private static final long serialVersionUID = 8352559162119034453L;public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;public abstract class Context extends BaseBroadcastProcessFunction.Context {}public abstract class ReadOnlyContext extends BaseBroadcastProcessFunction.ReadOnlyContext {}
}

按键分区的广播连接流处理函数(KeyedBroadcastProcessFunction)

按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流, 是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物。

附录

参考

Flink官方文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/591600.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【algorithm】自动驾驶常见常考的几个模型和推导,顺便总结自己遇到的考题经验不断更新之———控制版

写在前面 本来快达成目标了&#xff0c;没想到公司遭受了问题&#xff0c;公司和同事我感觉还是挺好的&#xff0c;有国企的正规也有小企业的灵活&#xff0c;大家都很有学习欲望。 作为本次再次复习回忆如下&#xff1a; 把之前面试准备的 机器学习&#xff08;基本搬运到CSD…

JVM篇:JVM的简介

JVM简介 JVM全称为Java Virtual Machine&#xff0c;翻译过来就是java虚拟机&#xff0c;Java程序&#xff08;Java二进制字节码&#xff09;的运行环境 JVM的优点&#xff1a; Java最大的一个优点是&#xff0c;一次编写&#xff0c;到处运行。之所以能够实现这个功能就是依…

电脑突然不能使用win+x后的快捷键的解决方法

在一次使用电脑后我习惯性的winxuh进行休眠&#xff0c;但是失败了&#xff0c;我发现winx后并没有出现曾经常用的快捷键方式。 左边图片显示的是正常情况。我遇到的情况是图片右边快捷键位没有了&#xff0c;并且也不能进行快捷操作。 国内的网站我都搜索过了&#xff0c;甚至…

outlook邮箱群发邮件方法?邮箱如何群发?

outlook邮箱群发邮件如何使用&#xff1f;QQ邮箱设置群发的步骤&#xff1f; Outlook邮箱群发邮件&#xff1a;必要性 Outlook邮箱作为全球广泛使用的邮件服务之一&#xff0c;不仅提供了便捷的邮件收发功能&#xff0c;还支持多种附件、日历提醒及强大的联系人管理。Outlook…

Python 实现给 pdf 文件自动识别标题并增添大纲

一、背景&#xff1a; 客户方提供过来一个开放平台的pdf文档&#xff0c;文档里有几十个接口&#xff0c;没有大纲和目录可以定位到具体内容&#xff0c;了解整体的API功能&#xff0c;观看体验极度差劲&#xff0c;所以想使用Python代码自动解析pdf文档&#xff0c;给文档增添…

某人寿保险公司基础架构云化与小机数仓下移实践

随着数据中心 IT 基础架构的不断演进&#xff0c;云计算、大数据、移动互联的需求日益高涨&#xff0c;快速敏捷、易于维护以及扩展性&#xff0c;逐渐成为金融机构在升级数据中心时重点考虑的方面。 某人寿保险公司&#xff08;以下简称“客户”&#xff09;过往采用传统三层架…

PS插件一键生成超治愈向日葵花海

金黄色的向日葵总能给人带来治愈的感觉&#xff0c;仿佛在这里能够疗愈心灵所有的伤口。今天我们通过START AI来生成一片美丽的向日葵花海~ 这是小编使用的关键词&#xff0c;负面词需要填写你不想要拥有的&#xff0c;能够让生成的结果更贴合你的想法 最后的生成效果就如下图…

我的Spring Cloud学习之旅:原因、过程和收获

简介&#xff1a; 在这篇文章中&#xff0c;我将分享我学习Spring Cloud的经验和经历。我将谈到我开始学习Spring Cloud的原因&#xff0c;我是如何进行学习的&#xff0c;以及最终的学习成果。希望通过这篇文章&#xff0c;读者们可以从中获得一些有用的收获和启发。 正文&…

IC工程师级别与薪资是怎样的?资深工程师一文带你了解清楚

入行IC之后&#xff0c;想必大家更关心的就是工程师薪资和级别&#xff0c;因为入行的大多数也是工程师。 国际的一流企业基本上工程师分为以下几个级别&#xff1a;普通工程师&#xff0c;资深工程师&#xff0c;主管工程师&#xff0c;资深主管&#xff0c;总工, 资深总工&am…

电子电器架构(E/E)演化 —— 车载以太网

电子电器架构&#xff08;E/E&#xff09;演化 —— 车载以太网 我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 本文13000字。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 屏蔽力是信息过载时代一…

笔记1:基于锚框(先验框)的目标检测

一、边缘框&#xff08;bounding box&#xff09; 1.1 定义 边缘框&#xff1a;真实标注的物体位置 2.1 表示方式 1、&#xff08;x1,y1)和(x2,y2) 2、&#xff08;x1,y1)和w,h 二、锚框(anchor box)/先验框&#xff08;prior bounding box&#xff09; 2.1 定义 对边缘…

如何高效使用Excel的SUMIF函数:掌握条件求和的技巧

背景&#xff1a; 在日常工作中&#xff0c;我们经常遇到这样的情况&#xff1a;需要根据特定条件对一系列数据进行求和。幸运的是&#xff0c;Excel提供了一个强大的工具来简化这一任务——SUMIF函数。本博客将带你深入了解如何使用SUMIF函数&#xff0c;包括一些实用的示例和…

MySQL5.7服务器 SQL 模式

官网地址&#xff1a;MySQL :: MySQL 5.7 Reference Manual :: 5.1.10 Server SQL Modes 欢迎关注留言&#xff0c;我是收集整理小能手&#xff0c;工具翻译&#xff0c;仅供参考&#xff0c;笔芯笔芯. MySQL 5.7 参考手册 / ... / 服务器 SQL 模式 5.1.10 服务器 SQL 模式…

Django 学习教程- Django模板(Template)

系列 Django 学习教程-介绍与安装-CSDN博客 Django 学习教程- Hello world入门案例-CSDN博客 前言 在上一章节中我们使用django.http.HttpResponse() 来输出 "Hello World&#xff01;"。该方式将数据与视图混合在一起&#xff0c;不符合 Django 的 MTV 思想。 本…

Linux network — 网络层收发包流程及 Netfilter 框架浅析

Linux network — 网络层收发包流程及 Netfilter 框架浅析 1. 前言2. 基础网络知识2.1 网络分层模型2.2 数据包协议分层2.3 sk_buff 结构2.4 收发包整体框架 3. 网络层&#xff08;IPv4&#xff09;收发包流程4. Netfilter 框架4.1 IPv4 网络层的 Netfilter Hook 点4.2 iptable…

算法——队列+宽搜(BFS)

队列这种数据结构大都服务于一个算法——宽搜&#xff08;BFS&#xff09;。宽搜还可以运用到二叉树、图、迷宫最短路径问题、拓扑排序等等 N叉数的层序遍历 N叉树的层序遍历 题目解析 给定一个 N 叉树&#xff0c;返回其节点值的_层序遍历_。&#xff08;即从左到右&#…

Word2Vec原理+gensim实现

链接&#xff1a;https://download.csdn.net/download/qq_60567426/88692270

Opencv图像灰度化,图像保存,按键事件(附带解析)

import cv2 import numpy as np """ 图像灰度化&#xff0c;图像保存&#xff0c;按键事件 1.读取本地图片 2.输出当前图片的宽、高、通道数 3.显示图片 4.判断用户输入如果输入为q则退出&#xff0c;如果输入为m则保存灰度化图片到本地否则继续等待 "&quo…

使用Wireshark进行网络流量分析

目录 Wireshark是什么&#xff1f; 数据包筛选 筛选指定ip 使用逻辑运算符筛选 HTTP模式过滤 端口筛选 协议筛选 包长度筛选 数据包搜索 数据流分析 数据包导出 Wireshark是什么&#xff1f; 通过Wireshark&#xff0c;我们可以捕获和分析网络数据包&#xff0c;查看…

【Maven】工程依赖下载失败错误解决

在使用 Maven 构建项目时&#xff0c;可能会发生依赖项下载错误的情况&#xff0c;主要原因有以下几种&#xff1a; 下载依赖时出现网络故障或仓库服务器宕机等原因&#xff0c;导致无法连接至 Maven 仓库&#xff0c;从而无法下载依赖。 依赖项的版本号或配置文件中的版本号错…