Flink任务如何跑起来之 2.算子 StreamOperator

Flink任务如何跑起来之 2.算子 StreamOperator

前文介绍了Transformation创建过程,大多数情况下通过UDF完成DataStream转换中,生成的Transformation实例中,核心逻辑是封装了SimpleOperatorFactory实例。

UDF场景下,DataStream到Transformationg过程中,SimpleOperatorFactory实例的创建过程大致如下伪代码所示。

// 具体的函数实例
Function function = ;
// 将函数实例封装到算子实例中
AbstractUdfStreamOperator operator = new AbstractUdfStreamOperator(function);
// 通过算子实例得到其SimpleOperatorFactory实例
SimpleOperatorFactory factory = SimpleOperatorFactory.of(operator)

这里的UDF可以简单理解为需要我们自己传入对应Function实现类的操作,如map、filter等。

问题:
StreamOperator是什么?
为什么需要将Function封装到StreamOperator中?

1. Flink算子

在应用程序中通过各种各样的Function完成DataStream转换,但是Function仅表示数据处理逻辑,并不关心数据从哪里来到哪里去。
以MapFunction为例,map方法中仅包含对每一条到来数据的具体处理逻辑,并不清楚map方法何时被调用,结果返回到哪。

一个完整的数据处理逻辑应该是获取数据->处理数据->输出数据,在Flink中这个最小的完整逻辑通过算子表示,顶层抽象接口为StreamOperator

因此Function作为算子的一部分参与后续的数据加工。

算子包含生命周期、状态和容错管理、数据处理3个方面。设计时分为两条线:

  • 生命周期、状态和容错管理,主要是AbstractStreamOperator抽象类及其子类实现,以及未来的AbstractStreamOperatorV2抽象类。
  • 数据处理,主要是OneInputStreamOperatorTwoInputStreamOperatorMultipleInputStreamOperator接口,分别表示单流、双流和多流的数据处理。在接口中定义了数据的处理方法。

StreamOperator完整的顶层抽象如下。

在这里插入图片描述

  • AbstractStreamOperator,所有流运算的基类。提供了生命周期和属性方法的默认实现。
    包含UDF的算子需继承其AbstractUdfStreamOperator子类
    对于其具体实现,还必须实现OneInputStreamOperator或TwoInputStreamOperator其中一个。
    将来将会使用AbstractStreamOperatorV2替换该基类
  • OneInputStreamOperator,支持单流输入的运算符接口,如果要实现自定义运算符,需要使用AbatractUdfStreamOperator作为基类
  • TwoInputStreamOperator,支持双流输入的运算符基类。同样需要和AbstractStreamOperator一起使用。
  • AbstractStreamOperatorV2,所有流运算符的新基类,旨在取代AbatractUdfStreamOperator。
    当前仅仅用于和MultipleInputStreamOperator一起配合使用。

OneInputStreamOperator、TwoInputStreamOperator和MultipleInputStreamOperator分别对应了Tranformation实现类的OneInputTransformation、TwoInputTransformation和AbstractMultipleInputTransformation。

MultipleInputStreamOperator和AbstractStreamOperatorV2是高版本中才加入的。因此,flink中最初仅支持单流或双流的输入,多流场景下需要拆分成单流或双流进行处理。在支持不同输入的流的实现中,梳理数据的方法分别如下

// 单流输入
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT>, Input<IN> {// 处理数据void processElement(StreamRecord<IN> element) throws Exception;
}// 双流输入
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {// 处理双流输入中第一个流上的元素void processElement1(StreamRecord<IN1> element) throws Exception;// 处理双流输入中第二个流上的元素void processElement2(StreamRecord<IN2> element) throws Exception;
}// 多流输入,这里的Input和单流输入继承的Input父类为同一个
public interface MultipleInputStreamOperator<OUT> extends StreamOperator<OUT> {List<Input> getInputs();
}

在AbstractStreamOperator众多子类中,AbstractUdfStreamOperator抽象类中封装了Function接口,并且其中open、close等算子生命周期等方法,实际上就是调用Function实例的对应方法。

public abstract class AbstractUdfStreamOperator<OUT, F extends Function>extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {// 封装Functionprotected final F userFunction;// 通过Function实现进行算子的实例化public AbstractUdfStreamOperator(F userFunction) {this.userFunction = requireNonNull(userFunction);checkUdfCheckpointingPreconditions();}// 算子生命周期的相关方法,实际上调用Function的方法@Overridepublic void open() throws Exception {super.open();FunctionUtils.openFunction(userFunction, new Configuration());}@Overridepublic void finish() throws Exception {super.finish();if (userFunction instanceof SinkFunction) {((SinkFunction<?>) userFunction).finish();}}@Overridepublic void close() throws Exception {super.close();FunctionUtils.closeFunction(userFunction);}
}

常用的实现类基本继承自AbstractUdfStreamOperator抽象类。

单流输入,如map、fliter、source、sink等实现类

在这里插入图片描述
sink算子有两个实现类,分别是SinkOperatorStreamSink<IN>。二者的关系为SinkOperatorStreamSink<RowData>的特例。

双流输入,如concat、intervalJoin等实现类

在这里插入图片描述
本文开头提到通过SimpleOperatorFactory.of方式生成SimpleOperatorFactory实例,该方法如下

public static <OUT> SimpleOperatorFactory<OUT> of(StreamOperator<OUT> operator) {if (operator == null) {return null;} else if (operator instanceof StreamSource&& ((StreamSource) operator).getUserFunction() instanceof InputFormatSourceFunction) {// 通过addSoure方法添加的Source方式,且SourceFunction为InputFormatSourceFunction的子类return new SimpleInputFormatOperatorFactory<OUT>((StreamSource) operator);} else if (operator instanceof StreamSink&& ((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) {// 通过addSink方法添加的sink方式,且SinkFunction为OutputFormatSinkFunction的子类return new SimpleOutputFormatOperatorFactory<>((StreamSink) operator);} else if (operator instanceof AbstractUdfStreamOperator) {return new SimpleUdfStreamOperatorFactory<OUT>((AbstractUdfStreamOperator) operator);} else {return new SimpleOperatorFactory<>(operator);}
}

得到SimpleOperatorFactory实例后,在实际执行时,通过其createStreamOperator方法得到StreamOperator实例。

1.1. 算子生成示例

上述内容偏概念更多一些,通过map为例实际观察Function->StreamOperator->StreamOperatorFactory->Transformation的过程

// 步骤1,业务代码中使用map操作
DataStream<Tuple2<String, Integer>> counts = text.map(row -> Tuple2.of(row, 1))// 步骤2,将业务代码中提供的MapFunction封装成StreamMap
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {// 将MapFunction封装成StreamMap,StreamMap为AbstractUdfStreamOperator子类return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}// 步骤3,根据StreamMap获取其对应的SimpleOperatorFactory工厂实例
public <R> SingleOutputStreamOperator<R> transform(String operatorName,TypeInformation<R> outTypeInfo,OneInputStreamOperator<T, R> operator) {// 获取StreamMap对应的StreamOperatorFactory工厂类return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}// 步骤4,将工厂实例传入到Transformation中
protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName,TypeInformation<R> outTypeInfo,StreamOperatorFactory<R> operatorFactory) {OneInputTransformation<T, R> resultTransform =new OneInputTransformation<>(this.transformation,operatorName,// 将StreamOperatorFactory工厂实例,传入到Transformation中operatorFactory,outTypeInfo,environment.getParallelism());@SuppressWarnings({"unchecked", "rawtypes"})SingleOutputStreamOperator<R> returnStream =new SingleOutputStreamOperator(environment, resultTransform);getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}

在步骤2中,将MapFunction封装成StreamMap,StreamMap是AbstractUdfStreamOperator的子类,并且同时实现了OneInputStreamOperator,进行数据处理逻辑。在处理数据时,实际上是调用MapFunction的map方法完成,即在业务代码中指定的row -> Tuple2.of(row, 1)的逻辑。

public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>implements OneInputStreamOperator<IN, OUT> {// 以下3个属性从父类继承// 函数实例protected final F userFunction;// 结果输出protected transient Output<StreamRecord<OUT>> output;// 默认算子链生成策略protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;public StreamMap(MapFunction<IN, OUT> mapper) {super(mapper);// 实例化StreamMap时,指定ALWAYS的算子链生成策略chainingStrategy = ChainingStrategy.ALWAYS;}@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {// userFunction即MapFunction处理数据时,实质调用MapFunction的map方法。output.collect(element.replace(userFunction.map(element.getValue())));}
}

要在Task中算子才会真正执行,这里仅仅是在逻辑上完成算子的定义。

2. 算子链

Flink中会将多个算子合并到一起,组成算子链从而提高算子的运行效率。同一个算子链意味着将在同一个线程中运行。flink中算子链使用OperatorChain抽象类表示。

算子的合并策略在ChainingStrateg枚举类中定义,详情如下

/*** StreamOperator 使用的默认值为 HEAD,这意味着算子不链接到其前身。大多数算子使用 ALWAYS 覆盖此操作,这意味着它们将尽可能链接到前身。 */
public enum ChainingStrategy {// 尽可能的将和上游算子链接到一起,大多数算子的默认值ALWAYS,// 当前算子不会上下游算子链接到一起NEVER,// 不会上游算子连接到一起,但是可以和下游算子链接到一起HEAD,// 此运算符将运行在链的头部(与 HEAD 类似,但它还会尝试在可能的情况下链接source。这允许将多输入运算符与多个源链接到一个任务中。HEAD_WITH_SOURCES;public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY = ALWAYS;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/26464.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Boot整合hibernate-validator实现数据校验

文章目录 概念基本概念常用校验注解 前置内容整合Hibernate Validator快速入门优雅处理参数校验异常其余注解校验自定义校验注解 参考来源 概念 基本概念 Hibernate Validator 是一个参数校验框架&#xff0c;可以非常方便地帮助我们校验应用程序的入参&#xff0c;实现了参数…

解决:selenium运行时driver初始化失败 DevToolsActivePort file doesn‘t exist的问题

解决&#xff1a;selenium运行时driver初始化失败 DevToolsActivePort file doesn‘t exist的问题 DevToolsActivePort file doesnt exist报错信息&#xff1a;![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/b3f8acc1c47d45e3912575896e421567.png)现象&#xff1…

10.4 Go 并发模式

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

【Android】基于webView打造富文本编辑器(H5)

目录 前言一、实现效果二、具体实现1. 导入网页资源2. 页面设计3. 功能调用4. 完整代码 总结 前言 HTML5是构建Web内容的一种语言描述方式。HTML5是Web中核心语言HTML的规范&#xff0c;用户使用任何手段进行网页浏览时看到的内容原本都是HTML格式的&#xff0c;在浏览器中通过…

幸狐RV1106开发板烧录Ubuntu系统与配置SDK,RV1106 LuckFox Pico Max——最新的操作

资料&#xff1a;上手教程 | LUCKFOX WIKI 以及SDK内的文档资料 开发板型号&#xff1a;RV1106 LuckFox Pico Max 烧录系统&#xff1a; Ubuntu 虚拟机系统&#xff1a;Ubuntu 20.04&&Ubuntu22.04 PC系统&#xff1a;win11 占用空间&#xff1a;大概15G 本文主要记…

解决IDEA报错Could not find resource mybatis-config.xml最全排错解决收录

解决IDEA报错:Could not find resource mybatis-config.xml最全排错解决收录 1.问题产生 迁移新项目的Java web开发测试数据库时IDEA爆Could not find resource mybatis-config.xml 这个错误表明Mybatis无法找到名为mybatis-config.xml的配置文件。 需要确保该文件存在于cla…

【Qt 学习笔记】Qt窗口 | 对话框 | 创建自定义对话框

博客主页&#xff1a;Duck Bro 博客主页系列专栏&#xff1a;Qt 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ Qt窗口 | 对话框 | 创建自定义对话框 文章编号&#xff1a;Qt 学习笔记…

鸿蒙轻内核A核源码分析系列七 进程管理 (3)

本文记录下进程相关的初始化函数&#xff0c;如OsSystemProcessCreate、OsProcessInit、OsProcessCreateInit、OsUserInitProcess、OsDeInitPCB、OsUserInitProcessStart等。 1、LiteOS-A内核进程创建初始化通用函数 先看看一些内部函数&#xff0c;不管是初始化用户态进程还…

Spring 内置BeanPostProcessor 的子子孙孙

Spring 框架已经实现了很多BeanPostProcessor的类&#xff0c;如下是关于BeanPostProcessor 的类图&#xff0c;图片过大&#xff0c;可以下载资源包看。 要能说清楚这些类&#xff0c;挺难&#xff0c;我也不知道怎么写&#xff0c;这几个类都分布在不同的包中&#xff0c;我感…

Spring系统学习 - Bean的作用域

bean作用域介绍 Spring框架提供了不同的作用域来管理Bean的生命周期和可见性&#xff0c;这对于控制不同类型的组件和处理并发请求尤其重要。 singleton&#xff08;默认&#xff09;&#xff1a; 每个Spring IoC容器只有一个bean实例。当容器创建bean后&#xff0c;它会被缓存…

C#聊天室①

聊天室服务器&#xff1a; 创建项目 桌面不需要使用控件 Program.cs internal class Program {static TcpListener server;[STAThread]static void Main(){Program p new Program(); p.start();}void start(){server new TcpListener(IPAddress.Parse(GetIP()), 33…

iText7——画发票PDF(完整)

显示描述&#xff1a; 1、每页显示必须带有发票头、“销售方和购买方信息” 2、明细填充为&#xff1a;当n≤8 行时&#xff0c;发票总高度140mm&#xff0c;每条发票明细行款高度4.375mm&#xff1b; 当8<n≤12行时&#xff0c;发票高度增加17.5mm&#xff0c;不换页&#…

【模拟-BM100 设计LRU缓存结构】

题目 BM100 设计LRU缓存结构 描述 设计LRU(最近最少使用)缓存结构&#xff0c;该结构在构造时确定大小&#xff0c;假设大小为 capacity &#xff0c;操作次数是 n &#xff0c;并有如下功能: Solution(int capacity) 以正整数作为容量 capacity 初始化 LRU 缓存get(key)&am…

【PyTorch 新手基础】Regularization -- 减轻过拟合 overfitting

Overfit 过拟合&#xff0c;效果如最右图所示 常见应对方案如下&#xff1a; 增大数据集入手&#xff1a;More data or data argumentation简化模型参数入手&#xff1a;Constraint model complexity (shallow model, regularization) or dropout dropout: torch.nn.Dropout(0…

搭建一个好玩的 RSS 订阅网站记录

全文相关链接 Github仓库创建链接Railway官网Supabase官网f-droid上的co.appreactor.news应用下载链接Railway账户使用量估算链接 全文相关代码 原文地址: https://blog.taoshuge.eu.org/p/270/ Dockerfile FROM docker.io/miniflux/miniflux:2.1.3环境变量 DATABASE_URL…

Java线程池参数和处理流程

线程池是一种管理和重用线程资源的机制&#xff0c;是利用池化思想设置和管理多线程的工具。线程池维护一定数量的线程&#xff0c;当有任务需要时&#xff0c;就从中选择一个的线程用来执行任务&#xff0c;当使用完成后该线程就会被重新放回线程池中&#xff0c;通过这样循环…

Apollo配置中心最佳实践

携程配置中心地址&#xff1a;GitCode - 全球开发者的开源社区,开源代码托管平台 1.1 Apollo配置中心介绍 Apollo&#xff08;阿波罗&#xff09;是开源配置管理中心&#xff0c;能够集中化管理应用不同环境、不同集群的配置&#xff0c;配置修改后能够实时推送到应用端…

ASM字节码插桩实现点击防抖

思路&#xff1a;在点击事件onclick的时候&#xff0c;将view的onclick在给定的时间给拦截掉。以前我们可能都是用一个util来拦截&#xff0c;这样在每个点击事件都得去判断&#xff0c;那么这里就用字节码插桩的形式来实现一下。 ASM的引入 dependencies {implementation gr…

QT day01

思维导图 QT编程 实现一个账号登录界面 代码&#xff1a; myweidget.h #ifndef MYWEIDGET_H #define MYWEIDGET_H#include <QWidget> #include <QIcon> //图标类 #include <QLineEdit> //行编辑器类 #include <QLabel> //标签类 #…

【Redis】安装和命令行客户端

https://www.bilibili.com/video/BV1cr4y1671t https://www.oz6.cn/articles/58 redis 非结构化有&#xff1a; 键值类型(Redis)文档类型(MongoDB)列类型(HBase)Graph:类型(Neo4j) 扩展性&#xff1a;水平即为分布式扩展 redis特征 键值&#xff08;key-value&#xff09;型…