从启动命令flink-daemon.sh中可以看出StandaloneSession入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint, 从该类的main方法会进入ClusterEntrypoint::runCluster中, 该方法中会创建出主要服务和组件。
StandaloneSessionClusterEntrypoint::main
ClusterEntrypoint::runClusterEntrypoint
ClusterEntrypoint::startCluster
ClusterEntrypoint::runClusterprivate void runCluster(Configuration configuration, PluginManager pluginManager)throws Exception {synchronized (lock) {initializeServices(configuration, pluginManager); //初始化服务// write host information into configurationconfiguration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());final DispatcherResourceManagerComponentFactorydispatcherResourceManagerComponentFactory =createDispatcherResourceManagerComponentFactory(configuration);//创建核心组件clusterComponent =dispatcherResourceManagerComponentFactory.create(configuration,ioExecutor,commonRpcService,haServices,blobServer,heartbeatServices,metricRegistry,executionGraphInfoStore,new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),this);...ignore code}
}
可以看出关键代码是调用initializeServices以及创建Cluster Component。
protected void initializeServices(Configuration configuration, PluginManager pluginManager)throws Exception {LOG.info("Initializing cluster services.");synchronized (lock) {rpcSystem = RpcSystem.load(configuration);commonRpcService =RpcUtils.createRemoteRpcService(rpcSystem,configuration,configuration.getString(JobManagerOptions.ADDRESS),getRPCPortRange(configuration),configuration.getString(JobManagerOptions.BIND_HOST),configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));// update the configuration used to create the high availability servicesconfiguration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());ioExecutor =Executors.newFixedThreadPool(ClusterEntrypointUtils.getPoolSize(configuration),new ExecutorThreadFactory("cluster-io"));haServices = createHaServices(configuration, ioExecutor, rpcSystem);blobServer = new BlobServer(configuration, haServices.createBlobStore());blobServer.start();heartbeatServices = createHeartbeatServices(configuration);metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);final RpcService metricQueryServiceRpcService =MetricUtils.startRemoteMetricsRpcService(configuration, commonRpcService.getAddress(), rpcSystem);metricRegistry.startQueryService(metricQueryServiceRpcService, null);final String hostname = RpcUtils.getHostname(commonRpcService);processMetricGroup =MetricUtils.instantiateProcessMetricGroup(metricRegistry,hostname,ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));executionGraphInfoStore =createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());}
}
在initializeServices中首先创建commonRpcService,这个RPCService实例是JobManager提供RPC服务的核心,可以看出它会有个地址和监听端口号,commonRpcService可将继承自Gateway的服务实例包装成AkkaActor对外提供RPC服务,比如ResourceManager、Dispatcher。此外还创建了其他服务:
haService: 可通过HAService获取ResourceManager/Dispatcher/RestEndpoint的地址,同时也提供选主服务,组件启动时需向HAService注册,如果被选主成功,则会调用监听器的grandLeadership回调函数
BlobServer: 可用来提供存储大对象存储服务
heartbeatServices:为组件间传递心跳信息
metricRegistry:提供metric上报和查询服务,监听端口不同,新建了一个RpcService专为Metric服务
processMetricGroup:注册系统运行状态信息的Metric,比如GC/Memory/Network运行时状况,添加Metric都是通过一个MetricGroup添加
executionGraphInfoStore:缓存Job执行时信息,比如ExecutionGrap
初始化服务创建完成后,通过DefaultDispatcherResourceManagerComponentFactory:create创建JobManager的三大核心组件:Dispacher/ResourceManager/RestEndpointServer, 都是通过工厂方法创建:
DefaultDispatcherRunnerFactory
StandaloneResourceManagerFactory
SessionRestEndpointFactory
这些组件是JobManager向HAService注册获取leadership后,被ElectionService回调grantLeadership函数中创建出具体组件实例。
RestServer
RestServer并不是一个RPCServer,没有继承RpcGateway,只提供HTTP接口服务,然后将请求转交给Dispatcher处理,它的生成启动流程如下:
SessionRestEndpointFactory::createRestEndpoint
DispatcherRestEndpoint::new
RestServerEndpoint::start //通过Netty启动Rest服务
DispatcherRestEndpoint::initializeHandlers //JobSubmitHeaders、JobSubmitHandler处理客户端提交Job
WebMonitorEndpoint::initializeHandlers //关联Rest请求的Header和Handler
WebMonitorEndpoint::startInternal //竞选leader
ResourceManager
RM生成启动过程是ResourceManagerServiceImpl先竞选leader成功后再创建出具体的ResourceManager
ResourceManagerServiceImpl::start
ResourceManagerServiceImpl::grantLeadership
ResourceManagerServiceImpl::startNewLeaderResourceManager
ResourceManagerServiceImpl::startResourceManagerIfIsLeader//调用start方法
StandaloneResourceManagerFactory::createResourceManager
StandaloneResourceManager::new
StandaloneResourceManager::start
Dispatcher
Dispacher生成启动过程是DefaultDispatcherRunner选主后再创建出具体实例
DefaultDispatcherRunnerFactory::createDispatcherRunner
DefaultDispatcherRunner::create
DispatcherRunnerLeaderElectionLifecycleManager.createFor
DefaultDispatcherRunner::grantLeadership //
DefaultDispatcherRunner::startNewDispatcherLeaderProcess//创建SessionDispatcherLeaderProcess并调用其start方法
DefaultDispatcherRunner::createNewDispatcherLeaderProcess
SessionDispatcherLeaderProcessFactoryFactory::createFactory
SessionDispatcherLeaderProcessFactory::create
SessionDispatcherLeaderProcess::create
SessionDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::startInternal
SessionDispatcherLeaderProcess:onstart
SessionDispatcherLeaderProcess::createDispatcherIfRunning
SessionDispatcherLeaderProcess::createDispatcher
DefaultDispatcherGatewayServiceFactory::create//创建Dispatcher并调用其start方法
SessionDispatcherFactory::createDispatcher
StandaloneDispatcher::new
StandaloneDispatcher::start
Dispatcher::onstart
总结
JobManager的启动过程就是创建三大组件RestServer/RM/Dispacher实例初始化的过程,RestSever通过Netty启动HTTP服务,RM/Dispacher被AkkaRpcService包装成AkkaActor提供本地或远程RPC服务,RestServer仅仅是接受请求解析消息后由具体Handler处理,JobGrap提交执行会转发给Dispatcher处理。