Flink作业执行之 2.算子 StreamOperator

Flink作业执行之 2.算子 StreamOperator

前文介绍了Transformation创建过程,大多数情况下通过UDF完成DataStream转换中,生成的Transformation实例中,核心逻辑是封装了SimpleOperatorFactory实例。

UDF场景下,DataStream到Transformationg过程中,SimpleOperatorFactory实例的创建过程大致如下伪代码所示。

// 具体的函数实例
Function function = ;
// 将函数实例封装到算子实例中
AbstractUdfStreamOperator operator = new AbstractUdfStreamOperator(function);
// 通过算子实例得到其SimpleOperatorFactory实例
SimpleOperatorFactory factory = SimpleOperatorFactory.of(operator)

这里的UDF可以简单理解为需要我们自己传入对应Function实现类的操作,如map、filter等。

问题:
StreamOperator是什么?
为什么需要将Function封装到StreamOperator中?

1. Flink算子

在应用程序中通过各种各样的Function完成DataStream转换,但是Function仅表示数据处理逻辑,并不关心数据从哪里来到哪里去。
以MapFunction为例,map方法中仅包含对每一条到来数据的具体处理逻辑,并不清楚map方法何时被调用,结果返回到哪。

一个完整的数据处理逻辑应该是获取数据->处理数据->输出数据,在Flink中这个最小的完整逻辑通过算子表示,顶层抽象接口为StreamOperator

因此Function作为算子的一部分参与后续的数据加工。

算子包含生命周期、状态和容错管理、数据处理3个方面。设计时分为两条线:

  • 生命周期、状态和容错管理,主要是AbstractStreamOperator抽象类及其子类实现,以及未来的AbstractStreamOperatorV2抽象类。
  • 数据处理,主要是OneInputStreamOperatorTwoInputStreamOperatorMultipleInputStreamOperator接口,分别表示单流、双流和多流的数据处理。在接口中定义了数据的处理方法。

StreamOperator完整的顶层抽象如下。

在这里插入图片描述

  • AbstractStreamOperator,所有流运算的基类。提供了生命周期和属性方法的默认实现。
    包含UDF的算子需继承其AbstractUdfStreamOperator子类
    对于其具体实现,还必须实现OneInputStreamOperator或TwoInputStreamOperator其中一个。
    将来将会使用AbstractStreamOperatorV2替换该基类
  • OneInputStreamOperator,支持单流输入的运算符接口,如果要实现自定义运算符,需要使用AbatractUdfStreamOperator作为基类
  • TwoInputStreamOperator,支持双流输入的运算符基类。同样需要和AbstractStreamOperator一起使用。
  • AbstractStreamOperatorV2,所有流运算符的新基类,旨在取代AbatractUdfStreamOperator。
    当前仅仅用于和MultipleInputStreamOperator一起配合使用。

OneInputStreamOperator、TwoInputStreamOperator和MultipleInputStreamOperator分别对应了Tranformation实现类的OneInputTransformation、TwoInputTransformation和AbstractMultipleInputTransformation。

MultipleInputStreamOperator和AbstractStreamOperatorV2是高版本中才加入的。因此,flink中最初仅支持单流或双流的输入,多流场景下需要拆分成单流或双流进行处理。在支持不同输入的流的实现中,梳理数据的方法分别如下

// 单流输入
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT>, Input<IN> {// 处理数据void processElement(StreamRecord<IN> element) throws Exception;
}// 双流输入
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {// 处理双流输入中第一个流上的元素void processElement1(StreamRecord<IN1> element) throws Exception;// 处理双流输入中第二个流上的元素void processElement2(StreamRecord<IN2> element) throws Exception;
}// 多流输入,这里的Input和单流输入继承的Input父类为同一个
public interface MultipleInputStreamOperator<OUT> extends StreamOperator<OUT> {List<Input> getInputs();
}

在AbstractStreamOperator众多子类中,AbstractUdfStreamOperator抽象类中封装了Function接口,并且其中open、close等算子生命周期等方法,实际上就是调用Function实例的对应方法。

public abstract class AbstractUdfStreamOperator<OUT, F extends Function>extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {// 封装Functionprotected final F userFunction;// 通过Function实现进行算子的实例化public AbstractUdfStreamOperator(F userFunction) {this.userFunction = requireNonNull(userFunction);checkUdfCheckpointingPreconditions();}// 算子生命周期的相关方法,实际上调用Function的方法@Overridepublic void open() throws Exception {super.open();FunctionUtils.openFunction(userFunction, new Configuration());}@Overridepublic void finish() throws Exception {super.finish();if (userFunction instanceof SinkFunction) {((SinkFunction<?>) userFunction).finish();}}@Overridepublic void close() throws Exception {super.close();FunctionUtils.closeFunction(userFunction);}
}

常用的实现类基本继承自AbstractUdfStreamOperator抽象类。

单流输入,如map、fliter、source、sink等实现类

在这里插入图片描述
sink算子有两个实现类,分别是SinkOperatorStreamSink<IN>。二者的关系为SinkOperatorStreamSink<RowData>的特例。

双流输入,如concat、intervalJoin等实现类

在这里插入图片描述
本文开头提到通过SimpleOperatorFactory.of方式生成SimpleOperatorFactory实例,该方法如下

public static <OUT> SimpleOperatorFactory<OUT> of(StreamOperator<OUT> operator) {if (operator == null) {return null;} else if (operator instanceof StreamSource&& ((StreamSource) operator).getUserFunction() instanceof InputFormatSourceFunction) {// 通过addSoure方法添加的Source方式,且SourceFunction为InputFormatSourceFunction的子类return new SimpleInputFormatOperatorFactory<OUT>((StreamSource) operator);} else if (operator instanceof StreamSink&& ((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) {// 通过addSink方法添加的sink方式,且SinkFunction为OutputFormatSinkFunction的子类return new SimpleOutputFormatOperatorFactory<>((StreamSink) operator);} else if (operator instanceof AbstractUdfStreamOperator) {return new SimpleUdfStreamOperatorFactory<OUT>((AbstractUdfStreamOperator) operator);} else {return new SimpleOperatorFactory<>(operator);}
}

得到SimpleOperatorFactory实例后,在实际执行时,通过其createStreamOperator方法得到StreamOperator实例。

1.1. 算子生成示例

上述内容偏概念更多一些,通过map为例实际观察Function->StreamOperator->StreamOperatorFactory->Transformation的过程

// 步骤1,业务代码中使用map操作
DataStream<Tuple2<String, Integer>> counts = text.map(row -> Tuple2.of(row, 1))// 步骤2,将业务代码中提供的MapFunction封装成StreamMap
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {// 将MapFunction封装成StreamMap,StreamMap为AbstractUdfStreamOperator子类return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}// 步骤3,根据StreamMap获取其对应的SimpleOperatorFactory工厂实例
public <R> SingleOutputStreamOperator<R> transform(String operatorName,TypeInformation<R> outTypeInfo,OneInputStreamOperator<T, R> operator) {// 获取StreamMap对应的StreamOperatorFactory工厂类return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}// 步骤4,将工厂实例传入到Transformation中
protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName,TypeInformation<R> outTypeInfo,StreamOperatorFactory<R> operatorFactory) {OneInputTransformation<T, R> resultTransform =new OneInputTransformation<>(this.transformation,operatorName,// 将StreamOperatorFactory工厂实例,传入到Transformation中operatorFactory,outTypeInfo,environment.getParallelism());@SuppressWarnings({"unchecked", "rawtypes"})SingleOutputStreamOperator<R> returnStream =new SingleOutputStreamOperator(environment, resultTransform);getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}

在步骤2中,将MapFunction封装成StreamMap,StreamMap是AbstractUdfStreamOperator的子类,并且同时实现了OneInputStreamOperator,进行数据处理逻辑。在处理数据时,实际上是调用MapFunction的map方法完成,即在业务代码中指定的row -> Tuple2.of(row, 1)的逻辑。

public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>implements OneInputStreamOperator<IN, OUT> {// 以下3个属性从父类继承// 函数实例protected final F userFunction;// 结果输出protected transient Output<StreamRecord<OUT>> output;// 默认算子链生成策略protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;public StreamMap(MapFunction<IN, OUT> mapper) {super(mapper);// 实例化StreamMap时,指定ALWAYS的算子链生成策略chainingStrategy = ChainingStrategy.ALWAYS;}@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {// userFunction即MapFunction处理数据时,实质调用MapFunction的map方法。output.collect(element.replace(userFunction.map(element.getValue())));}
}

要在Task中算子才会真正执行,这里仅仅是在逻辑上完成算子的定义。

2. 算子链

Flink中会将多个算子合并到一起,组成算子链从而提高算子的运行效率。同一个算子链意味着将在同一个线程中运行。flink中算子链使用OperatorChain抽象类表示。

算子的合并策略在ChainingStrateg枚举类中定义,详情如下

/*** StreamOperator 使用的默认值为 HEAD,这意味着算子不链接到其前身。大多数算子使用 ALWAYS 覆盖此操作,这意味着它们将尽可能链接到前身。 */
public enum ChainingStrategy {// 尽可能的将和上游算子链接到一起,大多数算子的默认值ALWAYS,// 当前算子不会上下游算子链接到一起NEVER,// 不会上游算子连接到一起,但是可以和下游算子链接到一起HEAD,// 此运算符将运行在链的头部(与 HEAD 类似,但它还会尝试在可能的情况下链接source。这允许将多输入运算符与多个源链接到一个任务中。HEAD_WITH_SOURCES;public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY = ALWAYS;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/28401.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

转让中字头控股集团公司步骤和条件

随着中国经济的不断发展&#xff0c;越来越多的企业开始积极寻求并购和收购机会。其中&#xff0c;国家总局中字头控股集团公司也是一个备受关注的对象。本篇文章将为您详细介绍国家总局中字头控股集团公司的收购流程及要求。详情致电咨询我或者来公司面谈。 中字头公司转让需满…

CSS实现前端小组件随笔

一.CSSJS实现打字机效果 1.1实现效果 1.2HTML部分 <span class"bottom-text"></span> 1.3CSS部分 .bottom-text {font-fanmily: "fangsong";display:inline-block;position:relative;font-size:20px;height:20px;inline-height:20px;color…

定个小目标之刷LeetCode热题(21)

这是道技巧题&#xff0c;利用了 &#xff08;num - 1&#xff09;% n 计算下标的形式来将数组元素与数组索引产生映射关系&#xff0c;代码如下&#xff0c;可以看下注释 class Solution {public List<Integer> findDisappearedNumbers(int[] nums) {int n nums.lengt…

pdf格式转成jpg图片,pdf格式如何转jpg

pdf转图片的方法&#xff0c;对于许多人来说可能是一个稍显陌生的操作。然而&#xff0c;在日常生活和工作中&#xff0c;我们有时确实需要将pdf文件转换为图片格式&#xff0c;以便于在特定的场合或平台上进行分享、展示或编辑。以下&#xff0c;我们将详细介绍一个pdf转成图片…

父亲节 | 10位名家笔下的父亲,读懂那份孤独而深沉的父爱

Fathers Day 母爱如水&#xff0c;父爱如山。 相对于母爱的温柔&#xff0c;父亲的爱多了几分静默和深沉。 读完10位名家笔下的父亲&#xff0c;我们就会明白&#xff0c;到底亏欠了父亲多少。 不要让自己有“子欲养而亲不待”的后悔和遗憾&#xff0c; 多给父亲一些爱的表示&a…

mySql的事务(操作一下)

目录 1. 简介2. 事务操作3. 四大特性4. 并发事务问题5. 脏读6. 不可重复读7. 幻读事务隔离级别参考链接 1. 简介 事务是一组操作的集合&#xff0c;它是一个不可分割的工作单位&#xff0c;事务会把所有的操作作为一个整体一起向系统提交或撤销操作请求&#xff0c;即这些操作…

使用Java Spring Boot生成二维码与条形码

个人名片 &#x1f393;作者简介&#xff1a;java领域优质创作者 &#x1f310;个人主页&#xff1a;码农阿豪 &#x1f4de;工作室&#xff1a;新空间代码工作室&#xff08;提供各种软件服务&#xff09; &#x1f48c;个人邮箱&#xff1a;[2435024119qq.com] &#x1f4f1…

牛客 第二十届西南科技大学ACM程序设计竞赛(同步赛):祖玛

题目描述 wzy 在玩一种很新的祖玛。 给定一个仅包含 小写字母 的字符串 sss , sss 由 mmm 个不同的小写字母组成&#xff0c;每个字母代表一种小球&#xff0c;在消去时会获得 相应 的分数&#xff1a; 两个及以上 相同的小球相碰就会消失&#xff08;在发射小球前因为无相碰&…

ffmpeg解封装rtsp并录制视频-(2)使用VLC模拟一个rtsp服务器并用ffmpeg解封装该rtsp流

VCL模拟服务器并打开播放该视频文件&#xff1a; - 准备好一个mp4文件&#xff0c;打开vlc软件 - 选择“媒体”》“流” - 添加一个mp4文件 - 点击下方按钮选择“串流” - 下一步目标选择rtsp 点击“添加” - 端口默认8554 - 路径设置 /test - 用…

SinNerf理解和效果

文章目录 SinNerf 解决的问题方法和结构自己训练的效果 SinNerf 解决的问题 该方法主要解决的问题是&#xff1a; 现有都使用多张照片来进行nerf 表示的学习&#xff0c;这篇文章的话&#xff0c;主要是想使用一张单视角的照片来Nerf表示的学习。通过从单张照片中得到的伪标签…

【SpringBoot集成Spring Security】

一、前言 Spring Security 和 Apache Shiro 都是安全框架&#xff0c;为Java应用程序提供身份认证和授权。 二者区别 Spring Security&#xff1a;重量级安全框架Apache Shiro&#xff1a;轻量级安全框架 关于shiro的权限认证与授权可参考小编的另外一篇文章 &#xff1a; …

IDEA模版快速生成Java方法体

新建模版组myLive 在模版组下新建模版finit 在模版text内输入以下脚本 LOGGER.info("$className$.$methodName$>$parmas1$", $parmas2$); try {} catch (Exception e) {LOGGER.error("$className$.$methodName$>error:", e); }LOGGER.info("$c…

win10没有Hyper-v的解决方法

win10没有Hyper-v的解决方法 问题&#xff1a;最近想装下docker&#xff0c;但是在控制面板-程序-启用或关闭Windows功能下找不到Hyper-v节点。 废话不多说&#xff0c;直接上实操教程 1.将下面命令复制到文本文档中&#xff0c;并将文档重命名Hyper.cmd pushd "%~dp0&q…

Spring Boot实战:图书信息网站

实战概述&#xff1a;Spring Boot图书信息网站开发 项目背景 随着数字化时代的到来&#xff0c;图书信息网站为用户提供了一个便捷的在线浏览和购买图书的平台。本实战项目旨在通过Spring Boot框架开发一个图书信息网站&#xff0c;实现图书展示、用户登录和管理等功能。 项…

Android 自定义View

我们所有的试图都是起源于自定义View&#xff0c;包括ViewGroup也是继承于它&#xff0c;可以说它是视图组件之父。 我们可以从它的大致流程来分为四个部分&#xff1a; 构造方法&#xff0c;onMeasure&#xff0c;onLayout&#xff0c;onDraw 构造方法&#xff1a; 它主要有…

如何判断一个js对象是否存在循环引用

一、背景 在前端JSON.stringfy是我们常用的一个方法&#xff0c;可以将一个对象序列化。 例如将如下对象序列化 const person { name: kalory, age:18}JSON.stringfy(person) // 结果 {"name":"kalory","age":18}将一个数组序列化const arr …

什么是分布式光伏系统?

随着全球对可再生能源和环保技术的日益重视&#xff0c;分布式光伏系统已成为电力领域中不可或缺的一部分。它代表了一种新兴的能源供应方式&#xff0c;具有显著的环保和经济价值。 一、定义与特点 分布式光伏系统是指将光伏组件安装在用户侧&#xff0c;如屋顶、墙面等建筑物…

ModuleNotFoundError: No module named ‘distutils‘的解决办法

最近想试试odoo17&#xff0c;在windows环境下&#xff0c;想安装试验一下&#xff0c;结果老出现oduleNotFoundError: No module named ‘distutils‘错误。查了一下&#xff0c;以为是python版本导致的&#xff0c;结果试了很多版本如下&#xff1a; 试了几个&#xff0c;每个…

Java——变量作用域和生命周期

一、作用域 1、作用域简介 在Java中&#xff0c;作用域&#xff08;Scope&#xff09;指的是变量、方法和类在代码中的可见性和生命周期。理解作用域有助于编写更清晰、更高效的代码。 2、作用域 块作用域&#xff08;Block Scope&#xff09;&#xff1a; 块作用域是指在…

软考中级|软件设计师-知识点整理

目录 计算机网络概论 计算机系统基础知识 中央处理单元 数据表示 校验码 计算机体系结构 计算机体系结构的发展 存储系统 输入/输出技术 安全性、可靠性与系统性能评测基础知识 加密技术和认证技术 计算机可靠性 程序设计语言基础知识 程序设计语言概述 程序设计…