【Flink集群RPC通讯机制(二)】创建AkkaRpcService、启动RPC服务、实现相互通信

文章目录

    • 零. RpcService服务概述
    • 1. AkkaRpcService的创建和初始化
    • 2.通过AkkaRpcService初始化RpcServer
    • 3. ResourceManager中RPC服务的启动
    • 4. 实现相互通讯能力

零. RpcService服务概述

RpcService负责创建和启动Flink集群环境中RpcEndpoint组件的RpcServer,且RpcService在启动集群时会提前创建好。AkkaRpcService作为RpcService的唯一实现类,基于Akka的ActorSystem进行封装,为不同的RpcEndpoint创建相应的ActorRef实例。

 

RpcService主要包含如下两个重要方法。

  1. startServer():用于启动RpcEndpoint中的RpcServer。RpcServer实际上就是对Actor进行封装,启动完成后,RpcEndpoint中的RpcServer就能够对外提供服务了。
  2. connect():用于连接远端RpcEndpoint并返回给调用方RpcGateway接口的方法,建立连接后RPC客户端就能像本地一样调用RpcServer提供的RpcGateway接口了。

例如在JobMaster组件中创建与ResourceManager组件之间的RPC连接时。此时可以通过Akka发送消息到ResourceManager的RpcServer中,这样就使得JobMaster像调用本地方法一样在ResourceManager中执行请求任务。

 

1. AkkaRpcService的创建和初始化

在创建和启动ClusterEntrypoint及TaskManagerRunner的过程中,会调用AkkaRpcServiceUtils.createRpcService()方法创建默认的AkkaRpcService,接着启动RpcServer。

例如管理节点中会使用AkkaRpcService实例创建并启动ResourceManager、Dispatcher以及JobMaster等RPC服务。

创建AkkaRpcService主要包括如下步骤。

  1. 在ClusterEntrypoint中创建RpcService。
  2. 启动ActorSystem服务。
  3. 创建RobustActorSystem。RobustActorSystem实际上是对Akka的ActorSystem进行了封装和拓展,相比于原生Akka
    ActorSystem,RobustActorSystem包含了UncaughtExceptionHandler组件,能够对ActorSystem抛出的异常进行处理。
  4. 使用RobustActorSystem创建AkkaRpcService实例。
  5. 将AkkaRpcService返回到ClusterEntrypoint中,用于启动集群中各个RpcEndpoint组件服务

在这里插入图片描述

 

2.通过AkkaRpcService初始化RpcServer

在集群运行时中创建了共用的AkkaRpcService服务,相当于创建了Akka系统中的ActorSystem,接下来就是使用AkkaRpcService启动各个RpcEndpoint中的RpcServer实例。(AkkaRpcService服务作为共用的rpc服务,启动其他各个组件的RpcServer实例?)

 
这里先看通过AkkaRpcService初始化RpcEndpoint对应的RpcServer的逻辑。如下在org.apache.flink.runtime.rpc.RpcEndpoint的构造器中,执行了RpcServer的初始化

protected RpcEndpoint(final RpcService rpcService, final String endpointId) {this.rpcService = checkNotNull(rpcService, "rpcService");this.endpointId = checkNotNull(endpointId, "endpointId");// 初始化RpcEndpoint中的RpcServerthis.rpcServer = rpcService.startServer(this);this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}

具体看下rpcService.startServer(this) 启动rpcServer的逻辑

  1. ActorSystem创建相应Actor的ActorRef引用类。创建完毕后会将RpcEndpoint和ActorRef信息存储在Actor键值对集合中。
  2. 启动RpcEndpoint对应的RPC服务,首先获取当前RpcEndpoint实现的RpcGateways接口。 RpcGateway接口最终通过RpcUtils.extractImplementedRpcGateways()方法从类定义抽取出来,例如JobMaster组件会抽取JobMasterGateway接口定义。
  3. 创建InvocationHandler代理类,根据InvocationHandler代理类提供的invoke()方法实现被代理类的具体方法。
  4. 根据RpcEndpoint是否为FencedRpcEndpoint,InvocationHandler分为FencedAkkaInvocationHandler和AkkaInvocationHandler两种类型。

FencedMainThreadExecutable代理的接口主要有FencedMainThreadExecutable和FencedRpcGateway两种。
AkkaInvocationHandler主要代理实现AkkaBasedEndpoint、RpcGateway、StartStoppable、MainThreadExecutable、RpcServer等接口。

  1. 创建好InvocationHandler代理类后,通过反射的方式(Proxy.newProxyInstance())创建代理类。创建的代理类会被转换为RpcServer实例,再返回给RpcEndpoint使用。

在RpcServer创建的过程中可以看出,实际上包含了创建RpcEndpoint中的Actor引用类ActorRef和AkkaInvocationHandler动态代理类。最后将动态代理类转换为RpcServer接口返回给RpcEndpoint实现类,此时实现的组件就能够获取到RpcServer服务,且通过RpcServer代理了所有的RpcGateways接口,提供了本地方法调用和远程方法调用两种模式。

@Override  
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {  checkNotNull(rpcEndpoint, "rpc endpoint");  final SupervisorActor.ActorRegistration actorRegistration =  registerAkkaRpcActor(rpcEndpoint);  final ActorRef actorRef = actorRegistration.getActorRef();  final CompletableFuture<Void> actorTerminationFuture =  actorRegistration.getTerminationFuture();  //启动RpcEndpoint对应的RPC服务LOG.info(  "Starting RPC endpoint for {} at {} .",  rpcEndpoint.getClass().getName(),  actorRef.path());  final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);  final String hostname;  Option<String> host = actorRef.path().address().host();  if (host.isEmpty()) {  hostname = "localhost";  } else {  hostname = host.get();  }  //解析EpcEndpoint实现的所有RpcGateway接口Set<Class<?>> implementedRpcGateways =  new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));  //额外添加RpcServer和AkkaBasedEndpoint类implementedRpcGateways.add(RpcServer.class);  implementedRpcGateways.add(AkkaBasedEndpoint.class);  final InvocationHandler akkaInvocationHandler;  //根据是否是FencedRpcEndpoint创建不同的动态代理对象if (rpcEndpoint instanceof FencedRpcEndpoint) {  // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler  akkaInvocationHandler =  new FencedAkkaInvocationHandler<>(  akkaAddress,  hostname,  actorRef,  configuration.getTimeout(),  configuration.getMaximumFramesize(),  actorTerminationFuture,  ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,  captureAskCallstacks);  implementedRpcGateways.add(FencedMainThreadExecutable.class);  } else {  akkaInvocationHandler =  new AkkaInvocationHandler(  akkaAddress,  hostname,  actorRef,  configuration.getTimeout(),  configuration.getMaximumFramesize(),  actorTerminationFuture,  captureAskCallstacks);  }  // Rather than using the System ClassLoader directly, we derive the ClassLoader  // from this class . That works better in cases where Flink runs embedded and all Flink    // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader    ClassLoader classLoader = getClass().getClassLoader();  @SuppressWarnings("unchecked")  RpcServer server =  (RpcServer)  Proxy.newProxyInstance(  classLoader,  implementedRpcGateways.toArray(  new Class<?>[implementedRpcGateways.size()]),  akkaInvocationHandler);  return server;  
}

 

3. ResourceManager中RPC服务的启动

RpcServer在RpcEndpoint的构造器中完成初始化后,接下来就是启动RpcEndpoint和RpcServer,这里以ResourceManager为例进行说明。

在启动集群时,看下如何启动ResourceManager的RPC服务的。如下调用链

ClusterEntrypoint.startCluster->runCluster
->dispatcherResourceManagerComponentFactory.create
->resourceManager.start();
=>
public final void start() {  rpcServer.start();  
}

继续探索RPC服务是如何启动的

首先在DefaultDispatcherResourceManagerComponentFactory中调用ResourceManager.start()方法启动ResourceManager实例,此时在ResourceManager.start()方法中会同步调用RpcServer.start()方法,启动ResourceManager所在RpcEndpoint中的RpcServer,如下。

在这里插入图片描述

  1. 调用ResourceManager.start()方法,此时会调用RpcEndpoint.start()父方法,启动ResourceManager组件的RpcServer。
  2. 通过动态代理AkkaInvocationHandler.invoke()方法执行流程,发现调用的是StartStoppable.start()方法,此时会直接调用AkkaInvocationHandler.start()本地方法。
  3. 在AkkaInvocationHandler.start()方法中,实际上会调用rpcEndpoint.tell(ControlMessages.START,ActorRef.noSender())方法向ResourceManager对应的Actor发送控制消息,表明当前Actor实例可以正常启动并接收来自远端的RPC请求。
  4. AkkaRpcActor调用handleControlMessage()方法处理ControlMessages.START控制消息。
  5. 将AkkaRpcActor中的状态更新为StartedState,此时ResourceManager的RpcServer启动完成,ResourceManager组件能够接收来自其他组件的RPC请求。

在flink1.12中省略了AkkaInvocationHandler的干预。

经过以上步骤,指定组件的RpcEndpoint节点就正常启动,此时RpcServer会作为独立的线程运行在JobManager或TaskManager进程中,处理本地和远程提交的RPC请求

 

4. 实现相互通讯能力

当AkkaRpcService启动RpcEndpoint中的RpcServer后,RpcEndpoint组件仅能对外提供处理RPC请求的能力,RpcEndpoint组件需要在启动后向其他组件注册自己的RpcEndpoint信息,并完成组件之间的RpcConnection注册,才能相互访问和通信。而创建RPC连接需要调用RpcService.connect()方法。

如代码所示,在AkkaRpcService.connect()方法中,完成了RpcConnection对象的创建。

@Override  
public <C extends RpcGateway> CompletableFuture<C> connect(  final String address, final Class<C> clazz) {  return connectInternal(  address,  clazz,  (ActorRef actorRef) -> {  Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);  return new AkkaInvocationHandler(  addressHostname.f0,  addressHostname.f1,  actorRef,  configuration.getTimeout(),  configuration.getMaximumFramesize(),  null,  captureAskCallstacks);  });  
}

具体看AkkaRpcService.connectInternal()方法逻辑。

  1. 获取ActorRef引用对象。
  2. 调用Patterns.ask()方法,向actorRef对应的RpcEndpoint节点发送RemoteHandshakeMessage消息,确保连接的RpcEndpoint节点正常,如果成功,则RpcEndpoint会返回HandshakeSuccessMessage消息。
  3. 调用invocationHandlerFactory创建invocationHandler动态代理类,此时可以看到传递的接口列表为new Class<?>[]{clazz},也就是当前RpcEndpoint需要访问的RpcGateway接口。例如JobMaster访问ResourceManager时,这里就是ResourceManagerGateway接口。
private <C extends RpcGateway> CompletableFuture<C> connectInternal(  final String address,  final Class<C> clazz,  Function<ActorRef, InvocationHandler> invocationHandlerFactory) {  checkState(!stopped, "RpcService is stopped");  LOG.debug(  "Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",  address,  clazz.getName());  //获取actorRef实例  final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);  //进行handshake操作,确保需要连接的RpcEndpoint节点正常  final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =  actorRefFuture.thenCompose(  (ActorRef actorRef) ->  FutureUtils.toJava( //调用Patterns.ask()方法,向actorRef对应的//RpcEndpoint节点发送RemoteHandshakeMessage消息,//确保连接的RpcEndpoint节点正常,如果成功,则//RpcEndpoint会返回HandshakeSuccessMessage消息。 Patterns.ask(  actorRef,  new RemoteHandshakeMessage(  clazz, getVersion()),  configuration.getTimeout().toMilliseconds())  .<HandshakeSuccessMessage>mapTo(  ClassTag$.MODULE$  .<HandshakeSuccessMessage>apply(  HandshakeSuccessMessage  .class))));  //创建RPC动态代理类  return actorRefFuture.thenCombineAsync(  handshakeFuture,  (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {  InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);  // Rather than using the System ClassLoader directly, we derive the ClassLoader  // from this class . That works better in cases where Flink runs embedded and                // all Flink                // code is loaded dynamically (for example from an OSGI bundle) through a custom                // ClassLoader                ClassLoader classLoader = getClass().getClassLoader();  @SuppressWarnings("unchecked")  C proxy =  (C)  Proxy.newProxyInstance(  classLoader, new Class<?>[] {clazz}, invocationHandler);  return proxy;  },  actorSystem.dispatcher());  
}

经过以上步骤,实现了创建RpcEndpoint组件之间的RPC连接,此时集群RPC组件之间可以进行相互访问,例如JobMaster可以向ResourceManager发送Slot资源请求。
RPC 服务启动的 Akka actor 能接收来自RpcGateway RPC 调用。

 

参考:《Flink设计与实现:核心原理与源码解析》–张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/694757.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

32单片机基础:OLED调试工具的使用

下面会介绍OLED显示屏的驱动函数模块&#xff0c;先学会如何使用&#xff0c;至于OLED屏幕的原理和代码编写&#xff0c; 我们之后会再写一篇。 现在我们就是用OLED当一个调试的显示屏&#xff0c;方便我们调试程序。 为什么要调试呢&#xff0c;是为了方便我们看现象&#…

嵌入式学习之Linux入门篇——使用VMware创建Unbuntu虚拟机

目录 主机硬件要求 VMware 安装 安装Unbuntu 18.04.6 LTS 新建虚拟机 进入Unbuntu安装环节 主机硬件要求 内存最少16G 硬盘最好分出一个单独的盘&#xff0c;而且最少预留200G&#xff0c;可以使用移动固态操作系统win7/10/11 VMware 安装 版本&#xff1a;VMware Works…

CQT新里程碑:SOC 2 数据安全认证通过,加强其人工智能支持

Covalent Network&#xff08;CQT&#xff09;发展新里程碑&#xff1a;SOC 2 数据安全认证通过&#xff0c;进一步加强了其人工智能支持 Covalent Network&#xff08;CQT&#xff09;现已完成并通过了严格的 Service Organization Control&#xff08;SOC) 2 Type II 的合规性…

vivo 基于 StarRocks 构建实时大数据分析平台,为业务搭建数据桥梁

在大数据时代&#xff0c;数据分析和处理能力对于企业的决策和发展至关重要。 vivo 作为一家全球移动互联网智能终端公司&#xff0c;需要基于移动终端的制造、物流、销售等各个方面的数据进行分析以满足业务决策。 而随着公司数字化服务的演进&#xff0c;业务诉求和技术架构有…

ELK Stack 日志平台搭建

前言 最近在折腾 ELK 日志平台&#xff0c;它是 Elastic 公司推出的一整套日志收集、分析和展示的解决方案。 专门实操了一波&#xff0c;这玩意看起来简单&#xff0c;但是里面的流程步骤还是很多的&#xff0c;而且遇到了很多坑。在此记录和总结下。 本文亮点&#xff1a;…

如何添加或编辑自定义WordPress侧边栏

WordPress侧边栏是许多WordPress网站上的固定装置。它为您的内容提供了一个垂直空间&#xff0c;您可以在其中帮助读者导航、增加电子邮件列表或社交关注、展示广告等。 因为它是许多WordPress网站不可或缺的一部分&#xff0c;所以我们认为侧边栏值得拥有自己的大型指南。在这…

【AIGC】开源声音克隆GPT-SoVITS

GPT-SoVITS 是由 RVC 创始人 RVC-Boss 与 AI 声音转换技术专家 Rcell 共同开发的一款跨语言 TTS 克隆项目&#xff0c;被誉为“最强大中文声音克隆项目” 相比以往的声音克隆项目&#xff0c;GPT-SoVITS 对硬件配置的要求相对较低&#xff0c;一般只需 6GB 显存以上的 GPU 即可…

物体检测-系列教程8:YOLOV5 项目配置

1、项目配置 yolo的v1、v2、v3、v4这4个都有一篇对应的论文&#xff0c;而v5在算法上没有太大的改变&#xff0c;主要是对v4做了一个更好的工程化实现 1.1 环境配置 深度学习环境安装请参考&#xff1a;PyTorch 深度学习 开发环境搭建 全教程 要求torch版本>1.6&#xf…

【Java EE初阶二十一】http的简单理解(二)

2. 深入学习http 2.5 关于referer Referer 描述了当前页面是从哪个页面跳转来的&#xff0c;如果是直接在地址栏输入 url(或者点击收藏夹中的按钮) 都是没有 Referer。如下图所示&#xff1a; HTTP 最大的问题在于"明文传输”,明文传输就容易被第三方获取并篡改. …

#gStore-weekly | gStore最新版1.2之新增内置高级函数详解(一)

gStore1.2版本新增了七个高级函数&#xff0c;我们第2期将继续介绍的高级函数为&#xff1a;整体/局部集聚系数&#xff08;clusterCoeff&#xff09;、鲁汶算法&#xff08;louvain&#xff09;、K跳计数&#xff08;kHopCount&#xff09;/K跳邻居&#xff08;kHopNeighbor&a…

React之拖动组件的设计(一)

春节终结束了&#xff0c;忙得我头疼。终于有时间弄自己的东西了。今天来写一个关于拖动的实例讲解。先看效果&#xff1a; 这是一个简单的组件设计&#xff0c;如果用原生的js设计就很简单&#xff0c;但在React中有些事件必须要多考虑一些。这是一个系列的文章&#xff0c;…

Linux CAfile 文件下的/ca-bundle.crt怎么生成的

在配置Linux Nginx SSL证书后&#xff0c;通过服务器访问域名时发现&#xff0c;服务器返回的CA证书是&#xff1a;/etc/pki/tls/certs/ca-bundle.crt 正式我在使用Spring Native安装了Docker自动生成的&#xff0c;而且开启了Docker的自启动&#xff0c;如果你和我一样&#x…

10MARL深度强化学习 Value Decomposition in Common-Reward Games

文章目录 前言1、价值分解的研究现状2、Individual-Global-Max Property3、Linear and Monotonic Value Decomposition3.1线性值分解3.2 单调值分解 前言 中心化价值函数能够缓解一些多智能体强化学习当中的问题&#xff0c;如非平稳性、局部可观测、信用分配与均衡选择等问题…

从零开始学习Netty - 学习笔记 - NIO基础 - 文件编程:FileChannel,Path,Files

3.文件编程 3.1.FileChannel FileChannel只能工作在非阻塞模式下面&#xff0c;不能和selector一起使用 获取 不能直接打开FIleChannel&#xff0c;必须通过FileInputSream&#xff0c;或者FileOutputSetream &#xff0c;或者RandomAccessFile来获取FileChannel 通过FileIn…

互联网高科技公司领导AI工业化,MatrixGo加速人工智能落地

作者&#xff1a;吴宁川 AI&#xff08;人工智能&#xff09;工业化与AI工程化正在引领人工智能的大趋势。AI工程化主要从企业CIO角度&#xff0c;着眼于在企业生产环境中规模化落地AI应用的工程化举措&#xff1b;而AI工业化则从AI供应商的角度&#xff0c;着眼于以规模化方式…

Rust ?运算符 Rust读写txt文件

一、Rust &#xff1f;运算符 &#xff1f;运算符&#xff1a;传播错误的一种快捷方式。 如果Result是Ok&#xff1a;Ok中的值就是表达式的结果&#xff0c;然后继续执行程序。 如果Result是Err&#xff1a;Err就是整个函数的返回值&#xff0c;就像使用了return &#xff…

电脑wifi丢失修复

当你打开电脑突然发现wifi功能不见了&#xff0c;可以先查看一下网卡的状态 在控制面板中找到设备管理器&#xff0c;打开就能找到网络适配器&#xff0c; 我这里是修复过的&#xff0c;wifi丢失后这里可能会显示WALN是丢失的&#xff0c;其他项显示黄色感叹号。 如何修复呢…

Go语言中的TLS加密:深入crypto/tls库的实战指南

Go语言中的TLS加密&#xff1a;深入crypto/tls库的实战指南 引言crypto/tls库的核心组件TLS配置&#xff1a;tls.Config证书加载与管理TLS握手过程及其实现 构建安全的服务端创建TLS加密的HTTP服务器配置TLS属性常见的安全设置和最佳实践 开发TLS客户端应用编写使用TLS的客户端…

[游戏开发][虚幻5]新建项目注意事项

鼠标右键点击Client.uproject文件&#xff0c;可以看到三个比较关键的选项&#xff0c; 启动游戏&#xff0c;生成sln解决方案&#xff0c;切换引擎版本 断点调试 C代码重要步骤 如果你想断点调试C代码&#xff0c;则必须使用使用代码编译启动引擎&#xff0c;你需要做几个操作…

从零开始学习Netty - 学习笔记 - NIO基础 - 网络编程: Selector

4.网络编程 4.1.非阻塞 VS 阻塞 在网络编程中&#xff0c;**阻塞&#xff08;Blocking&#xff09;和非阻塞&#xff08;Non-blocking&#xff09;**是两种不同的编程模型&#xff0c;描述了程序在进行网络通信时的行为方式。 阻塞&#xff08;Blocking&#xff09;&#xff1…