Flink源码之JobManager启动流程

从启动命令flink-daemon.sh中可以看出StandaloneSession入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint, 从该类的main方法会进入ClusterEntrypoint::runCluster中, 该方法中会创建出主要服务和组件。

StandaloneSessionClusterEntrypoint::main
ClusterEntrypoint::runClusterEntrypoint
ClusterEntrypoint::startCluster
ClusterEntrypoint::runClusterprivate void runCluster(Configuration configuration, PluginManager pluginManager)throws Exception {synchronized (lock) {initializeServices(configuration, pluginManager); //初始化服务// write host information into configurationconfiguration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());final DispatcherResourceManagerComponentFactorydispatcherResourceManagerComponentFactory =createDispatcherResourceManagerComponentFactory(configuration);//创建核心组件clusterComponent =dispatcherResourceManagerComponentFactory.create(configuration,ioExecutor,commonRpcService,haServices,blobServer,heartbeatServices,metricRegistry,executionGraphInfoStore,new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),this);...ignore code}
}

可以看出关键代码是调用initializeServices以及创建Cluster Component。

protected void initializeServices(Configuration configuration, PluginManager pluginManager)throws Exception {LOG.info("Initializing cluster services.");synchronized (lock) {rpcSystem = RpcSystem.load(configuration);commonRpcService =RpcUtils.createRemoteRpcService(rpcSystem,configuration,configuration.getString(JobManagerOptions.ADDRESS),getRPCPortRange(configuration),configuration.getString(JobManagerOptions.BIND_HOST),configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));// update the configuration used to create the high availability servicesconfiguration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());ioExecutor =Executors.newFixedThreadPool(ClusterEntrypointUtils.getPoolSize(configuration),new ExecutorThreadFactory("cluster-io"));haServices = createHaServices(configuration, ioExecutor, rpcSystem);blobServer = new BlobServer(configuration, haServices.createBlobStore());blobServer.start();heartbeatServices = createHeartbeatServices(configuration);metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);final RpcService metricQueryServiceRpcService =MetricUtils.startRemoteMetricsRpcService(configuration, commonRpcService.getAddress(), rpcSystem);metricRegistry.startQueryService(metricQueryServiceRpcService, null);final String hostname = RpcUtils.getHostname(commonRpcService);processMetricGroup =MetricUtils.instantiateProcessMetricGroup(metricRegistry,hostname,ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));executionGraphInfoStore =createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());}
}

在initializeServices中首先创建commonRpcService,这个RPCService实例是JobManager提供RPC服务的核心,可以看出它会有个地址和监听端口号,commonRpcService可将继承自Gateway的服务实例包装成AkkaActor对外提供RPC服务,比如ResourceManager、Dispatcher。此外还创建了其他服务:

haService: 可通过HAService获取ResourceManager/Dispatcher/RestEndpoint的地址,同时也提供选主服务,组件启动时需向HAService注册,如果被选主成功,则会调用监听器的grandLeadership回调函数
BlobServer: 可用来提供存储大对象存储服务
heartbeatServices:为组件间传递心跳信息
metricRegistry:提供metric上报和查询服务,监听端口不同,新建了一个RpcService专为Metric服务
processMetricGroup:注册系统运行状态信息的Metric,比如GC/Memory/Network运行时状况,添加Metric都是通过一个MetricGroup添加
executionGraphInfoStore:缓存Job执行时信息,比如ExecutionGrap

初始化服务创建完成后,通过DefaultDispatcherResourceManagerComponentFactory:create创建JobManager的三大核心组件:Dispacher/ResourceManager/RestEndpointServer, 都是通过工厂方法创建:

DefaultDispatcherRunnerFactory
StandaloneResourceManagerFactory
SessionRestEndpointFactory

这些组件是JobManager向HAService注册获取leadership后,被ElectionService回调grantLeadership函数中创建出具体组件实例。

RestServer

RestServer并不是一个RPCServer,没有继承RpcGateway,只提供HTTP接口服务,然后将请求转交给Dispatcher处理,它的生成启动流程如下:

SessionRestEndpointFactory::createRestEndpoint
DispatcherRestEndpoint::new
RestServerEndpoint::start //通过Netty启动Rest服务
DispatcherRestEndpoint::initializeHandlers //JobSubmitHeaders、JobSubmitHandler处理客户端提交Job
WebMonitorEndpoint::initializeHandlers //关联Rest请求的Header和Handler
WebMonitorEndpoint::startInternal //竞选leader

ResourceManager

RM生成启动过程是ResourceManagerServiceImpl先竞选leader成功后再创建出具体的ResourceManager

ResourceManagerServiceImpl::start
ResourceManagerServiceImpl::grantLeadership
ResourceManagerServiceImpl::startNewLeaderResourceManager
ResourceManagerServiceImpl::startResourceManagerIfIsLeader//调用start方法
StandaloneResourceManagerFactory::createResourceManager
StandaloneResourceManager::new
StandaloneResourceManager::start

Dispatcher

Dispacher生成启动过程是DefaultDispatcherRunner选主后再创建出具体实例

DefaultDispatcherRunnerFactory::createDispatcherRunner
DefaultDispatcherRunner::create
DispatcherRunnerLeaderElectionLifecycleManager.createFor
DefaultDispatcherRunner::grantLeadership //
DefaultDispatcherRunner::startNewDispatcherLeaderProcess//创建SessionDispatcherLeaderProcess并调用其start方法
DefaultDispatcherRunner::createNewDispatcherLeaderProcess
SessionDispatcherLeaderProcessFactoryFactory::createFactory
SessionDispatcherLeaderProcessFactory::create
SessionDispatcherLeaderProcess::create
SessionDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::startInternal
SessionDispatcherLeaderProcess:onstart
SessionDispatcherLeaderProcess::createDispatcherIfRunning
SessionDispatcherLeaderProcess::createDispatcher
DefaultDispatcherGatewayServiceFactory::create//创建Dispatcher并调用其start方法
SessionDispatcherFactory::createDispatcher
StandaloneDispatcher::new
StandaloneDispatcher::start
Dispatcher::onstart

总结

在这里插入图片描述
JobManager的启动过程就是创建三大组件RestServer/RM/Dispacher实例初始化的过程,RestSever通过Netty启动HTTP服务,RM/Dispacher被AkkaRpcService包装成AkkaActor提供本地或远程RPC服务,RestServer仅仅是接受请求解析消息后由具体Handler处理,JobGrap提交执行会转发给Dispatcher处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/30093.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

博客项目(Spring Boot)

1.需求分析 注册功能(添加用户操纵)登录功能(查询操作)我的文章列表页(查询我的文章|文章修改|文章详情|文章删除)博客编辑页(添加文章操作)所有人博客列表(带分页功能)…

FPGA外部触发信号毛刺产生及滤波

1、背景 最近在某个项目中,遇到输入给FPGA管脚的外部触发信号因为有毛刺产生,导致FPGA接收到的外部触发信号数量多于实际值。比如:用某个信号源产生1000个外部触发信号(上升沿触发方式)给到FPGA输入IO,实际…

冠达管理:股票注册制通俗理解?

目前我国A股商场正在进行股票注册制变革,相较之前的发行准则,股票注册制在理念上更为商场化,这意味着公司发行股票的门槛将下降,公司数量将添加,而股票流通的方式也将有所改变。那么股票注册制指的是什么,它…

使用vscode远程登录以及本地使用的配置(插件推荐)

1、远程登陆ssh 1.1打开vscode插件商店,安装remote-ssh插件 远程ssh添加第三方插件:vscode下链接远程服务器安装插件失败、速度慢等解决方法_vscode远程安装不上扩展_Emphatic的博客-CSDN博客 转到定义,选中代码->鼠标右键->转到定义…

2023/08/05【网络课程总结】

1. 查看git拉取记录 git reflog --dateiso|grep pull2. TCP/IP和OSI七层参考模型 3. DNS域名解析 4. 预检请求OPTIONS 5. 渲染进程的回流(reflow)和重绘(repaint) 6. V8解析JavaScript 7. CDN负载均衡的简单理解 8. 重学Ajax 重学Ajax满神 9. 对于XML的理解 大白话叙述XML是…

JavaScript中的几种常用循环方式对比

JavaScript中的几种循环方式 1. for 循环1.1 使用方式1.2 不支持遍历对象(但不会报错) 2. for-of 循环2.1 使用方式2.2 for-of 和 for 循环比较(不允许修改原数组元素)2.2.1 相同点2.2.1.2 都可以遍历数组,但不允许遍历…

灰度非线性变换之c++实现(qt + 不调包)

本章介绍灰度非线性变换,具体内容包括:对数变换、幂次变换、指数变换。他们的共同特点是使用非线性变换关系式进行图像变换。 1.灰度对数变换 变换公式:y a log(1x) / b,其中,a控制曲线的垂直移量;b为正…

快速解决IDEA中类的图标变成J,不是C的情况

有时候导入新的项目后,会出现如下情况,类的图标变成J,如图: 直接上解决方法: 找到项目的pom.xml,右键,在靠近最下方的位置找到Add as Maven Project,点击即可。 此时,一般类的图标就…

Wlan——射频和天线基础知识

目录 射频的介绍 射频和Wifi 射频的相关基础概念 射频的传输 信号功率的单位 射频信号传输行为 天线的介绍 天线的分类 天线的基本原理 天线的参数 射频的介绍 射频和Wifi 什么是射频 从射频发射器产生一个变化的电流(交流电),通过…

【信号生成器】从 Excel 数据文件创建 Simulink 信号生成器块研究(Simulink)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

Linux 文件基本属性

Linux 文件基本属性 Linux 系统是一种典型的多用户系统,不同的用户处于不同的地位,拥有不同的权限。 为了保护系统的安全性,Linux 系统对不同的用户访问同一文件(包括目录文件)的权限做了不同的规定。 在 Linux 中我…

【项目学习1】如何将java对象转化为XML字符串

如何将java对象转化为XML字符串 将java对象转化为XML字符串,可以使用Java的XML操作库JAXB,具体操作步骤如下: 主要分为以下几步: 1、创建JAXBContext对象,用于映射Java类和XML。 JAXBContext jaxbContext JAXBConte…

8.10CPI决战日来临,黄金会意外走高吗?

近期有哪些消息面影响黄金走势?黄金多空该如何研判? ​黄金消息面解析:周四(8月10日)亚市早盘,美元指数在102.50维持多头走势,黄金避险情绪消散,金价跌至1916美元,下破1900美元前景深化。周三黄…

分布式 - 服务器Nginx:一小时入门系列之静态网页配置

文章目录 1. 静态文件配置2. nginx listen 命令解析3. nginx server_name 命令解析4. nginx server 端口重复5. nginx location 命令 1. 静态文件配置 在 /home 文件下配置一个静态的AdminLTE后台管理系统: [rootnginx-dev conf.d]# cd /home [rootnginx-dev home…

「2024」预备研究生mem-论证推理强化:评价类

一、论证推理强化:评价类 二、课后题

0基础学习VR全景平台篇 第81篇:全景相机-临云镜如何直播推流

临云镜全景相机是阿里巴巴定制全景设备,实现空间三维信息的快速采集,与阿里云三维空间重建平台搭配,帮助品牌商与平台以较低的成本完成空间的快速采集,并支持对室内/室外空间的三维全景展示及空间漫游,同时支持VR浏览、…

jmeter创建一个压测项目

1.jemeter新建一个项目: 2.接下来对Thread进行描述,也可以先使用默认的Thread进行操作。 3.添加http请求头的信息。按照如图所示操作 4.在请求头里面添加必要的字段,可以只填必要字段就可以 5.添加Http请求信息,如下图&#xff…

LeetCode练习习题集【4月 - 7 月】

LEETCODE习题集【4月-7月总结】 简单 数组部分 1.重复数 题目: 在一个长度u为 n 的数组 nums 里的所有数字都在 0~n-1 的范围内。数组中某些数字是重复的,但不知道有几个数字重复了,也不知道每个数字重复了几次。请找出数组中…

【Java设计模式】建造者模式 注解@Builder

概念 将一个复杂对象的构造与它的表示分离,使同样的构建过程可以创建不同的表示。它使将一个复杂的对象分解成多个简单的对象,然后一步步构建而成。 每一个具体建造者都相对独立,而与其它的具体建造者无关,因此可以很方便地替换具…

nginx 负载均衡

1.环境准备 我使用的说centos7的系统 1.20版本的nginx 另外还有3台虚拟机 主机:192.168.163.142 两台服务器:服务器A--192.168.163.140 服务器B---192.168.163.141 2.配置服务器A和B 找到nginx下的html目录,编辑其中的index.html(在此…