Flink Job 执行流程

Flink On Yarn 模式

在这里插入图片描述

基于Yarn层面的架构类似 Spark on Yarn模式,都是由Client提交AppRM上面去运行,然后 RM分配第一个container去运行AM,然后由AM去负责资源的监督和管理。需要说明的是,FlinkYarn模式更加类似Spark on Yarncluster模式,在cluster模式中,dirver将作为AM中的一个线程去运行。Flink on Yarn模式也是会将JobManager启动在container里面,去做个driver类似的任务调度和分配,Yarn AMFlink JobManager在同一个Container,这样AM可以知道Flink JobManager的地址,从而AM可以申请Container去启动Flink TaskManager。待Flink成功运行在Yarn集群上,Flink Yarn Client就可以提交Flink JobFlink JobManager,并进行后续的映射、调度和计算处理。

Fink on Yarn 的缺陷

【1】资源分配是静态的,一个作业需要在启动时获取所需的资源并且在它的生命周期里一直持有这些资源。这导致了作业不能随负载变化而动态调整,在负载下降时无法归还空闲的资源,在负载上升时也无法动态扩展。
【2】On-Yarn模式下,所有的container都是固定大小的,导致无法根据作业需求来调整container的结构。譬如CPU密集的作业或需要更多的核,但不需要太多内存,固定结构的container会导致内存被浪费。
【3】与容器管理基础设施的交互比较笨拙,需要两个步骤来启动Flink作业:1.启动Flink守护进程;2.提交作业。如果作业被容器化并且将作业部署作为容器部署的一部分,那么将不再需要步骤2。
【4】On-Yarn模式下,作业管理页面会在作业完成后消失不可访问。
【5】Flink推荐 per job clusters 的部署方式,但是又支持可以在一个集群上运行多个作业的session模式,令人疑惑。

Flink版本1.5中引入了DispatcherDispatcher是在新设计里引入的一个新概念。Dispatcher会从Client端接受作业提交请求并代表它在集群管理器上启动作业。引入Dispatcher的原因主要有两点:
【1】一些集群管理器需要一个中心化的作业生成和监控实例;
【2】能够实现Standalone模式下JobManager的角色,且等待作业提交。在一些案例中,Dispatcher是可选的Yarn或者不兼容的kubernetes

资源调度模型重构下的 Flink On Yarn 模式

[点击并拖拽以移动] ​

客户端提交JobGraph以及依赖jar包到YarnResourceManager,接着Yarn ResourceManager分配第一个container以此来启动AppMasterApplication Master中会启动一个FlinkResourceManager以及JobManagerJobManager会根据JobGraph生成的ExecutionGraph以及物理执行计划向FlinkResourceManager申请slotFlinkResoourceManager会管理这些slot以及请求,如果没有可用slot就向YarnResourceManager申请containercontainer启动以后会注册到FlinkResourceManager,最后JobManager会将subTask deploy到对应containerslot中去。
[点击并拖拽以移动] ​

在有Dispatcher的模式下:会增加一个过程,就是Client会直接通过HTTP Server的方式,然后用Dispatcher将这个任务提交到Yarn ResourceManager中。

新框架具有四大优势,详情如下:
【1】client直接在Yarn上启动作业,而不需要先启动一个集群然后再提交作业到集群。因此client再提交作业后可以马上返回。
【2】所有的用户依赖库和配置文件都被直接放在应用的classpath,而不是用动态的用户代码classloader去加载。
【3】container在需要时才请求,不再使用时会被释放。
【4】“需要时申请”的container分配方式允许不同算子使用不同profile (CPU和内存结构)的container

新的资源调度框架下 single cluster job on Yarn 流程介绍

[点击并拖拽以移动] ​

single cluster job on Yarn模式涉及三个实例对象:
【1】clifrontend Invoke App code;生成StreamGraph,然后转化为JobGraph
【2】YarnJobClusterEntrypoint(Master) 依次启动YarnResourceManagerMinDispatcherJobManagerRunner三者都服从分布式协同一致的策略;JobManagerRunnerJobGraph转化为ExecutionGraph,然后转化为物理执行任务Execution,然后进行deploydeploy过程会向 YarnResourceManager请求slot,如果有直接deploy到对应的YarnTaskExecutiontorslot里面,没有则向YarnResourceManager申请,带container启动以后deploy
【3】YarnTaskExecutorRunner (slave) 负责接收subTask,并运行。

整个任务运行代码调用流程如下图

[点击并拖拽以移动] ​

subTask在执行时是怎么运行的?

调用StreamTaskinvoke方法,执行步骤如下:
【1】initializeState()operatorinitializeState()
【2】openAllOperators()operatoropen()方法;
【3】最后调用run方法来进行真正的任务处理;

我们来看下flatMap对应的OneInputStreamTaskrun方法具体是怎么处理的。

@Override
protected void run() throws Exception {// 在堆栈上缓存处理器引用,使代码更易于JITfinal StreamInputProcessor<IN> inputProcessor = this.inputProcessor;while (running && inputProcessor.processInput()) {// 所有的工作都发生在“processInput”方法中}
}

最终是调用StreamInputProcessorprocessInput()做数据的处理,这里面包含用户的处理逻辑。

public boolean processInput() throws Exception {if (isFinished) {return false;}if (numRecordsIn == null) {try {numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();} catch (Exception e) {LOG.warn("An exception occurred during the metrics setup.", e);numRecordsIn = new SimpleCounter();}}while (true) {if (currentRecordDeserializer != null) {DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);if (result.isBufferConsumed()) {currentRecordDeserializer.getCurrentBuffer().recycleBuffer();currentRecordDeserializer = null;}if (result.isFullRecord()) {StreamElement recordOrMark = deserializationDelegate.getInstance();//处理watermarkif (recordOrMark.isWatermark()) {// handle watermark//watermark处理逻辑,这里可能引起timer的triggerstatusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);continue;} else if (recordOrMark.isStreamStatus()) {// handle stream statusstatusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);continue;//处理latency watermark} else if (recordOrMark.isLatencyMarker()) {// handle latency markersynchronized (lock) {streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());}continue;} else {//用户的真正的代码逻辑// now we can do the actual processingStreamRecord<IN> record = recordOrMark.asRecord();synchronized (lock) {numRecordsIn.inc();streamOperator.setKeyContextElement1(record);//处理数据streamOperator.processElement(record);}return true;}}}//这里会进行checkpoint barrier的判断和对齐,以及不同partition 里面checkpoint barrier不一致时候的,数据buffer,final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();if (bufferOrEvent != null) {if (bufferOrEvent.isBuffer()) {currentChannel = bufferOrEvent.getChannelIndex();currentRecordDeserializer = recordDeserializers[currentChannel];currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());}else {// Event receivedfinal AbstractEvent event = bufferOrEvent.getEvent();if (event.getClass() != EndOfPartitionEvent.class) {throw new IOException("Unexpected event: " + event);}}}else {isFinished = true;if (!barrierHandler.isEmpty()) {throw new IllegalStateException("Trailing data in checkpoint barrier handler.");}return false;}}
}

streamOperator.processElement(record)最终会调用用户的代码处理逻辑,假如operatorStreamFlatMap的话。

@Override
public void processElement(StreamRecord<IN> element) throws Exception {collector.setTimestamp(element);userFunction.flatMap(element.getValue(), collector);//用户代码
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/588639.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql间隙锁demo分析

概述 通常用的mysql都是innodb引擎&#xff1b; 一般在update的时候用id都会认为是给行记录加锁&#xff1b; 在使用非唯一索引更新时&#xff0c;会遇到临键锁&#xff08;范围锁&#xff09;&#xff1b; 临键锁和表中的数据有关&#xff1b; mysq版本:8 隔离级别&#xf…

jmeter的常用功能及在测试中的基本使用和压测实战

Jmeter基础功能 了解Jmeter的常用组件 元件&#xff1a;多个类似功能组件的容器&#xff08;类似于类&#xff09; 一&#xff1a;Test Plan&#xff08;测试计划&#xff09; 测试计划通常用来给测试的项目重命名&#xff0c;使用多线程脚本运行时还可以配置线程组运行方式…

【C++】STL 容器 - map 关联容器 ① ( std::map 容器简介 | std::map 容器排序规则 | std::map 容器底层实现 )

文章目录 一、std::map 容器1、std::map 容器简介2、std::map 容器排序规则3、std::map 容器底层实现 二、代码示例 - std::map 容器1、代码示例2、执行结果 一、std::map 容器 1、std::map 容器简介 std::map 容器 是 C 语言 标准模板库 ( STL , Standard Template Library ) …

分布式技术之数据复制技术

文章目录 什么是数据复制技术&#xff1f;数据复制技术原理及应用同步复制技术原理及应用异步复制技术原理及应用半同步复制技术原理及应用三种数据复制技术对比 什么是数据复制技术&#xff1f; 数据复制是一种实现数据备份的技术。数据复制技术&#xff0c;可以保证存储在不…

Plantuml之甘特图语法介绍(二十八)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

磁盘和文件系统管理

一&#xff1a;磁盘结构&#xff1a; 1.磁盘基础&#xff1a; 扇区固定大小&#xff0c;每个扇区4k。磁盘会进行磨损&#xff0c;损失生命周期。 设备文件&#xff1a; 一切皆文件 设备文件&#xff1a;关联至一个设备驱动程序&#xff0c;进而能够跟与之对应硬件设备进行通…

Rust学习笔记000 安装

安装命令 curl --proto https --tlsv1.2 -sSf https://sh.rustup.rs | sh $ curl --proto https --tlsv1.2 -sSf https://sh.rustup.rs | sh info: downloading installerWelcome to Rust!This will download and install the official compiler for the Rust programming la…

【基础】【Python网络爬虫】【3.chrome 开发者工具】(详细笔记)

Python网络爬虫基础 chrome 开发者工具元素面板&#xff08;Elements)控制台面板&#xff08;Console&#xff09;资源面板&#xff08;Source&#xff09;网络面板&#xff08;Network&#xff09;工具栏Requests Table详情 chrome 开发者工具 ​ 当我们爬取不同的网站是&…

javaWeb学生信息管理系统2

一、学生信息管理系统SIMS 一款基于纯Servlet技术开发的学生信息管理系统&#xff08;SIMS&#xff09;&#xff0c;在设计中没有采用SpringMVC和Spring Boot等框架。系统完全依赖于Servlet来处理HTTP请求和管理学生信息&#xff0c;实现了信息的有效存储、检索和更新&#xf…

JVM之jinfo虚拟机配置信息工具

jinfo虚拟机配置信息工具 1、jinfo jinfo&#xff08;Configuration Info for Java&#xff09;的作用是实时地查看和调整虚拟机的各项参数。 使用jps -v 可以查看虚拟机启动时显示指定的参数列表&#xff0c;但是如果想知道未被显示指定的参数的系统默认值&#xff0c;除 …

GCP 创建1个windows vm 并连接

有时需要临时使用1台windows 的机器 创建windows vm 既然是临时 直接用gcloud command gcloud compute instances create instance-windows \--zoneeurope-west2-c \--machine-typen2d-standard-4 \--boot-disk-size100GB \--image-projectwindows-cloud \--imagewindows-se…

Rust学习笔记001:HELLOW WORLD + Cargo

Rust介绍 Rust&#xff08;中文称为“锈”&#xff09;是一种由Mozilla开发的系统编程语言&#xff0c;它着力于提供安全性、并发性和实用性。Rust的设计目标是消除程序出现的内存安全性问题&#xff0c;如空指针引用、数据竞争等。它通过在编译时进行严格的所有权和借用检查来…

Vue(三):Vue 生命周期与工程化开发

2023 的最后一篇博客&#xff0c;祝大家元旦快乐&#xff0c;新的一年一起共勉&#xff01; 06. Vue 生命周期 6.1 基本介绍 生命周期就是一个 Vue 示例从 创建 到 销毁 的整个过程&#xff0c;创建、挂载、更新、销毁 有一些请求是必须在某个阶段完成之后或者某个阶段之前执行…

10TB海量JSON数据从OSS迁移至MaxCompute

前提条件 开通MaxCompute。 在DataWorks上完成创建业务流程&#xff0c;本例使用DataWorks简单模式。详情请参见创建业务流程。 将JSON文件重命名为后缀为.txt的文件&#xff0c;并上传至OSS。本文中OSS Bucket地域为华东2&#xff08;上海&#xff09;。示例文件如下。 {&qu…

zabbix通过自动发现-配置监控项、触发器(小白教程)

自动发现配置参考链接&#xff08;不小白&#xff0c;不友好&#xff09; zabbix-get介绍 1配置 zabbix server&#xff1a;版本7&#xff08;不影响&#xff09;,IP地址&#xff1a;192.168.0.60zabbix agent&#xff1a;版本agent1&#xff08;不影响&#xff09;&#xff…

Megatron-LM源码系列(六):Distributed-Optimizer分布式优化器实现Part1

1. 使用说明 在megatron中指定--use-distributed-optimizer就能开启分布式优化器, 参数定义在megatron/arguments.py中。分布式优化器的思路是将训练中的优化器状态均匀地分布到不同数据并行的rank结点上&#xff0c;相当于开启ZERO-1的训练。 group.add_argument(--use-distr…

SpringIOC之ClassPathXmlApplicationContext

博主介绍&#xff1a;✌全网粉丝5W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…

PostgreSQL16.1(Windows版本)

1、卸载原有的PostgreSQL &#xfeff; &#xfeff; 点击Next即可。 &#xfeff;&#xfeff; 点击OK即可。 卸载完成。 2、安装 &#xff08;1&#xff09; 前两部直接Next&#xff0c;第二部可以换成自己想要安装的路径。 &#xff08;2&#xff09; 直接点击Next。…

雪花算法(Snowflake)介绍和Java实现

1、雪花算法介绍 (1) 雪花算法(SnowFlake)是分布式微服务下生成全局唯一ID&#xff0c;并且可以做到去中心化的常用算法&#xff0c;最早是Twitter公司在其内部的分布式环境下生成ID的方式。 雪花算法的名字可以这么理解&#xff0c;世界上没有两片完全相同的雪花&#xff0c;…

Select缺点及代码示例

一、Select缺点 二、服务器端 #include <stdio.h> #include <arpa/inet.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <sys/select.h>int main() {// 创建socketint lfd socket(PF_INET, SOCK_STREAM, 0)…