深度了解flink(七) JobManager(1) 组件启动流程分析

前言

JobManager是Flink的核心进程,主要负责Flink集群的启动和初始化,包含多个重要的组件(JboMaster,Dispatcher,WebEndpoint等),本篇文章会基于源码分析JobManagr的启动流程,对其各个组件进行介绍,希望对JobManager有一个更全面的了解。

集群启动模式

ClusterEntryPoint是Flink集群的入口点的基类,该类是抽象类,类继承关系UML图如下

通过上图可知道,Flink有3种集群模式

Flink Session集群

根据不同的资源管理器,有3个不同的子类:

  • StandaloneSessionClusterEntrypoint Standalone session模式下集群的入口类
  • KubernetesSessionClusterEntrypoint K8s session模式下集群的入口类
  • YarnSessionClusterEntrypoint Yarn session模式下集群的入口类

集群生命周期

在 Flink Session 集群中,客户端连接到一个预先存在的、长期运行的集群,该集群可以接受多个作业提交。即使所有作业完成后,集群(和 JobManager)仍将继续运行直到手动停止 session 为止。因此,Flink Session 集群的寿命不受任何 Flink 作业寿命的约束。

资源隔离:

Flink作业共享集群的ResourceManager和Dispacher等组件,TaskManager slot 由 ResourceManager 在提交作业时分配,并在作业完成时释放。由于所有作业都共享同一集群,因此在集群资源方面存在一些竞争 — 例如提交工作阶段的网络带宽。此共享设置的局限性在于,如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败;类似的,如果 JobManager 上发生一些致命错误,它将影响集群中正在运行的所有作业。

适用场景:

因为组件共享,session集群资源使用率高,集群预先存在,不需要额外申请资源,适合一些比较小的,不是长期运行的作业,例如SQL预览,交互式查询,实时任务测试环境等

Flink Per Job集群

只要Yarn提供了继承的子类:YarnJobClusterEntrypoint

集群生命周期

在 Flink Job 集群中,可用的集群管理器(例如 YARN)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。一旦作业完成,Flink Job 集群将被拆除。

资源隔离:

每一个提交的Flink应用程序单独创建一套完整集群环境,该Job独享使用的计算资源和组件服务。

使用场景:

实时由于Per Job模式下用户应用程序的main方法在客户端执行生成JobGraph,任务量大情况下存在性能瓶颈,目前已被标记为废弃状态。

Flink Application集群

根据不同的资源管理器,有3个不同的子类:

  • StandaloneApplicationClusterEntryPoint Standalone Application模式下集群的入口类
  • KubernetesApplicationClusterEntrypoint K8s Application模式下集群的入口类
  • YarnApplicationClusterEntryPoint Yarn Application模式下集群的入口类

集群生命周期

Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且main方法在集群上而不是客户端上运行。应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口(ApplicationClusterEntryPoint)负责调用main方法来提取 JobGraph

资源隔离:

每一个提交的Flink应用程序单独创建一套完整集群环境,该Job独享使用的计算资源和组件服务。

使用场景:

Application模式资源隔离性好,Per Job模式的替换方案,适合长期运行、具有高稳定性的大型作业

JobManager启动流程

JobManger启动流程在不同模式下基本相同,Standalone模式可以在本地运行(可以参考),方便Debug,因为使用Standalone模式的入口类StandaloneSessionClusterEntrypoint进行启动流程的分析。

main方法入口

public static void main(String[] args) {// 打印系统相关信息EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);//信号注册器,注册系统级别的信号,接收到系统级别终止信号优雅的关闭SignalHandler.register(LOG);//注册一个安全的钩子,这样jvm停止之前会睡眠5s去释放资源,5s之后强制关闭JvmShutdownSafeguard.installAsShutdownHook(LOG);// 解析命令行参数,获取配置信final EntrypointClusterConfiguration entrypointClusterConfiguration =ClusterEntrypointUtils.parseParametersOrExit(args,new EntrypointClusterConfigurationParserFactory(),StandaloneSessionClusterEntrypoint.class);//加载config.yaml,构建Configuration对象Configuration configuration = loadConfiguration(entrypointClusterConfiguration);StandaloneSessionClusterEntrypoint entrypoint =new StandaloneSessionClusterEntrypoint(configuration);ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}

主要步骤

  1. 打印系统信息。
  2. 注册信号处理器,注册系统级别的信号,确保优雅关闭。
  3. 注册一个安全的钩子,这样jvm停止之前会睡眠5s去释放资源,5s之后强制关闭。
  4. 解析命令行参数,加载配置文件。
  5. 初始化 StandaloneSessionClusterEntrypoint
  6. 调用 ClusterEntrypoint#runClusterEntrypoint 方法启动集群。

ClusterEntrypoint#runClusterEntrypoint

public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {//clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),e);System.exit(STARTUP_FAILURE_RETURN_CODE);
}//无关代码 无需关注
}

核心步骤

  • 调用 clusterEntrypoint.startCluster() 启动集群。

ClusterEntrypoint#startCluster

public void startCluster() throws ClusterEntrypointException {//无关代码 无需关注try {FlinkSecurityManager.setFromConfiguration(configuration);//插件管理类,用来加载插件。插件加载两种方式。//1).通过如下参数配置FLINK_PLUGINS_DIR。//2).将插件jar包放入到plugins下PluginManager pluginManager =PluginUtils.createPluginManagerFromRootFolder(configuration);//初始化文件系统的配置configureFileSystems(configuration, pluginManager);//初始化安全上下文环境 默认HadoopSecurityContext,Hadoop安全上下文,//使用先前初始化的UGI(UserGroupInformation)和适当的安全凭据。比如Kerberos。//总结:初始化安全环境,创建安全环境的时候会做一系列的检查。SecurityContext securityContext = installSecurityContext(configuration);ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);//安全的情况下调用runCluster开始初始化组件securityContext.runSecured((Callable<Void>)() -> {runCluster(configuration, pluginManager);return null;});} catch (Throwable t) {//异常处理代码 无需关注}}

startCluster方法主要做了一些环境和配置初始化的工作

主要步骤

  1. 初始化插件管理器,用来加载插件。
  2. 初始化文件系统设置 例如 hdfs、本地file。此时只是初始化的配置。
  3. 初始化安全环境。
  4. 安全环境下调用 runCluster 方法。

ClusterEntrypoint#runCluster

private void  runCluster(Configuration configuration, PluginManager pluginManager)throws Exception {synchronized (lock) {//初始化集群所需要的服务:例如通信服务,监控服务,高可用服务等initializeServices(configuration, pluginManager);// write host information into configurationconfiguration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress());configuration.set(JobManagerOptions.PORT, commonRpcService.getPort());//创建Dispatcher和ResourceManger组件的工厂类final DispatcherResourceManagerComponentFactorydispatcherResourceManagerComponentFactory =createDispatcherResourceManagerComponentFactory(configuration);//创建Dispatcher和ResourceManger组件clusterComponent =dispatcherResourceManagerComponentFactory.create(configuration,resourceId.unwrap(),ioExecutor,commonRpcService,haServices,blobServer,heartbeatServices,delegationTokenManager,metricRegistry,executionGraphInfoStore,new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),failureEnrichers,this);//组件停止运行后的异步方法clusterComponent.getShutDownFuture().whenComplete(//代码省略)}}

主要步骤

1.初始化集群所需要的服务:例如通信服务,监控服务,高可用服务等

2.创建Dispatcher和ResourceManger组件的工厂类

3.创建Dispatcher和ResourceManger组件

4.定义组件停止运行后的异步方法

总结

本篇文章分享了Flink任务的集群模式,通过源码的方式分析了JobManger的启动流程,后续会对JobManger相关的服务和组件进行更详细的分析。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/883956.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

.NET内网实战:通过白名单文件反序列化漏洞绕过UAC

01阅读须知 此文所节选自小报童《.NET 内网实战攻防》专栏&#xff0c;主要内容有.NET在各个内网渗透阶段与Windows系统交互的方式和技巧&#xff0c;对内网和后渗透感兴趣的朋友们可以订阅该电子报刊&#xff0c;解锁更多的报刊内容。 02基本介绍 03原理分析 在渗透测试和红…

ELK之路第三步——日志收集筛选logstash和filebeat

logstash和filebeat&#xff08;偷懒版&#xff09; 前言logstash1.下载2.修改配置文件3.测试启动4.文件启动 filebeat1.下载2.配置3.启动 前言 上一篇&#xff0c;我们说到了可视化界面Kibana的安装&#xff0c;这一篇&#xff0c;会简单介绍logstash和filebeat的安装和配置。…

植物源UDP-糖基转移酶及其分子改造-文献精读75

植物源UDP-糖基转移酶及其分子改造 摘要 糖基化能够增加化合物的结构多样性,有效改善水溶性、药理活性和生物利用度,对植物天然产物的药物开发至关重要。UDP-糖基转移酶(UGTs)能够催化糖基从活化的核苷酸糖供体转移到受体形成糖苷键,植物中天然产物的糖基化修饰主要通过UGTs实…

[MySQL#7] CRUD(2) | 更新 | 删除 | 聚合函数 | group by

目录 3. 更新 4. 删除 截断表 日志的作用 5. (实验) 插入查询结果 6. 聚合函数 7. 分组查询 接着上篇文章[MySQL#6] 表的CRUD (1) | Create | Retrieve(查) | where继续讲解~ 3. 更新 语法&#xff1a; UPDATE table_name SET column expr [, column expr ...][WHE…

RegCM模式运行./bin/regcmMPI报错

1、报错 在运行RegCM时到截止模拟时间段时&#xff0c;不显示successfully&#xff0c;而是报错&#xff1a; MPI_ABORT was invoked on rank 0 in communicator MPI COMMUNICATOR 3 DUP FROM 0 with errorcode 1. NOTE: invoking MPI_ABORT causes Open MPI to kill all MP…

日本也有九九乘法表?你会读吗?柯桥零基础学日语到蓝天广场

日本也有“九九乘法表”&#xff1f; 九九乘法表起源于中国&#xff0c;可以追溯到春秋战国时代。 日本奈良县橿原市境内的“藤原京”遗址&#xff0c;出土了日本目前可找到最古老的“九九乘法表”木简。 根据日本奈良研究所的研究&#xff0c;其内容可能是1300多年前的官吏用…

Python(包和模块)

包 定义 包是将模块以文件夹的组织形式进行分组管理的方法&#xff0c;以便更好地组织和管理相关模块。 包是一个包含一个特殊的__init__.py文件的目录&#xff0c;这个文件可以为空&#xff0c;但必须存在&#xff0c;以标识目录为Python包。 包可以包含子包&#xff08;子…

集群聊天服务器——逻辑梳理

网络聊天服务器项目&#xff0c;该项目分为4个模块&#xff1a; 首先是网络模块&#xff1a;我使用了muduo高性能网络库&#xff0c;解耦合网络与业务之间这两部分代码&#xff0c;可以更加专注与业务的功能开发其次是服务层模块&#xff1a;我使用了基于C11的技术比如绑定器和…

前沿技术与未来发展第一节:C++与机器学习

第六章&#xff1a;前沿技术与未来发展 第一节&#xff1a;C与机器学习 1. C在机器学习中的应用场景 C在机器学习中的应用优势主要体现在高效的内存管理、强大的计算能力和接近底层硬件的灵活性等方面。以下是 C 在机器学习领域的几个主要应用场景&#xff1a; 1.1 深度学习…

项目解决方案:在弱网(低带宽、高延迟、有丢包的网络)环境下建设视频监控平台的设计方案(下)

目录 一、需求分析 1、业务需求分析 2、功能需求分析 二、建设目标 三、设计原则 四、标准规范建设 五、系统架构 1、视频接入管理系统 2、资源管理调度平台 3、视频转码解码服务器 4、媒体输出引擎 5、媒体录制引擎 6、智能联动引擎 7、API开发引擎 六、部署架构 七、产…

操作系统笔记(四)进程间通信,竞争条件与解决方案

进程间通信(IPC) 如何在进程间传递信息? 如何防止两个进程冲突&#xff1f; 如何实现进程执行的先后顺序&#xff1f; 竞争条件&#xff08;Race conditions&#xff09; 竞争条件&#xff08;Race conditions&#xff09; 多个进程访问一个共享数据&#xff0c;而数据最…

jmeter的基本使用

Jmeter基本使用 一、变量 1.用户定义变量 2.用户参数 二、函数 1.计数器${__counter(,)} 2.时间函数 3.加密函数${__digest(,,,,)} 4. 整数相加${__intSum(,,)} 5.属性函数&#xff0c;${__P(,)}、${__property(,,)}、${__setProperty(,,)} 6.V函数 三、获取响应数据…

Go语言基础教程:指针

在 Go 中&#xff0c;函数参数默认是按值传递的。若要改变变量的原始值&#xff0c;可以使用指针。本教程将通过示例代码来演示如何使用值传递和指针传递。 package mainimport "fmt"// 传值方式 - 函数内改变 ival 的值不会影响原始变量 func zeroval(ival int) {i…

2024年优秀的天气预测API

准确、可操作的天气预报对于许多组织的成功至关重要。 事实上&#xff0c;在整个行业中&#xff0c;天气条件会直接影响日常运营&#xff0c;包括航运、按需、能源和供应链&#xff08;仅举几例&#xff09;。 以公用事业为例。根据麦肯锡的数据&#xff0c;在 1.4 年的时间里…

HCIP-HarmonyOS Application Developer V1.0 笔记(二)

类Web开发范式自定义组件基本用法 自定义组件通过element引入到宿主页面。 Props自定义属性 自定义属性支持类型 String&#xff0c;Number&#xff0c;Boolean&#xff0c;Array&#xff0c;Object。 命名规范&#xff1a; 命名时禁止以on、、on:、grab:等保留关键字为开头…

天润融通突破AI客服局限,三大关键提升文本机器人问答效果

近期&#xff0c;AI客服再次登上热搜&#xff0c;引发网友集体吐槽&#xff0c;比如AI客服虽然态度客气&#xff0c;但听不懂客户诉求&#xff0c;回答问题驴唇不对马嘴&#xff0c;解决不了问题...... 更有网友将这些问题升级到&#xff0c;企业就是不想解决问题才交给AI客服…

aarch64-opencv341交叉编译,并在arm上部署helloopencv

背景 当需要在jetson xavier nx或者rk 3562等平台上开发关于视觉检测的工程时&#xff0c;由于arm板子资源不足或者不能联网等原因&#xff0c;通常在虚拟机上利用交叉编译器编译得到可执行程序&#xff0c;然后部署到arm板上。 aarch64-opencv341交叉编译 ubuntu虚拟机中先…

mysql中redolog、binlog

我们中说删库跑路&#xff0c;那么数据库删除后&#xff0c;里面的数据怎么恢复呢&#xff1f; 这里就涉及到了redolog和binlog了 一、什么是存储引擎和缓冲池 存储引擎是 MySQL 中直接与磁盘交互部分。也是存储引擎读写数据的最小单位&#xff0c;一个页里可以有一条或多条…

【分布式技术】分布式事务深入理解

文章目录 概述产生原因关键点 分布式事务解决方案3PC3PC的三个阶段&#xff1a;3PC相比于2PC的改进&#xff1a;3PC的缺点&#xff1a; TCCTCC事务的三个阶段&#xff1a;TCC事务的设计原则&#xff1a;TCC事务的适用场景&#xff1a;TCC事务的优缺点&#xff1a;如何解决TCC模…

字节青训-找出最长的神奇数列

问题描述 小F是一个好学的中学生&#xff0c;今天他学习了数列的概念。他在纸上写下了一个由 0 和 1 组成的正整数序列&#xff0c;长度为 n。这个序列中的 1 和 0 交替出现&#xff0c;且至少由 3 个连续的 0 和 1 组成的部分数列称为「神奇数列」。例如&#xff0c;10101 是一…