【Flink网络通讯(一)】Flink RPC框架的整体设计

文章目录

  • 1. Akka基本概念与Actor模型
  • 2. Akka相关demo
    • 2.1. 创建Akka系统
    • 2.2. 根据path获取Actor并与之通讯
  • 3. Flink RPC框架与Akka的关系
  • 4.运行时RPC整体架构设计
  • 5. RpcEndpoint的设计与实现

我们从整体的角度看一下Flink RPC通信框架的设计与实现,了解其底层Akka通信框架的基础概念及二者之间的关系。

 

1. Akka基本概念与Actor模型

Akka是使用Scala语言编写的库,用于在JVM上简化编写具有可容错、高可伸缩性的Java或Scala的Actor模型。Akka基于Actor模型,提供了一个用于构建可扩展、弹性、快速响应的应用程序的平台。

Actor 模型是一种并发计算模型,Actor 模型的核心思想是将计算单元抽象为独立的并发实体,称为 “actors”,这些 actors 之间通过消息传递进行通信。

以下是 Actor 模型的一些关键概念:

  1. Actor:Actor 是计算模型的基本执行单元。每个 Actor 都有自己的状态、行为和邮箱(用于接收消息)。Actor 之间是相互独立的,它们通过消息传递进行通信。
  2. 消息传递:在 Actor 模型中,通信是通过消息传递来实现的。一个 Actor 可以向另一个 Actor 发送消息,消息包含了要执行的操作或者改变状态的请求。这种异步消息传递使得系统更具有弹性和可伸缩性。
  3. 地址:每个 Actor 都有一个唯一的地址,用于唯一标识该 Actor。其他 Actor 可以使用地址向目标 Actor 发送消息。
  4. 邮箱:每个 Actor 都有一个邮箱,用于存储接收到的消息。Actor 处理消息的速度可能不同,但由于消息传递是异步的,这不会阻塞发送者。
  5. 行为:Actor 的行为定义了对消息的响应方式,包括状态的修改、消息的处理等。行为可以随着时间和接收到的消息而动态变化。

 

Actor由状态(State)、行为(Behavior)和邮箱(Mailbox)三部分组成。

actors和其他actors通过发送异步消息通信。Actor模型的强大来自于异步。它也可以显式等待响应,这使得可以执行同步操作。但是,强烈不建议同步消息,因为它们限制了系统的伸缩性(?怎么实现的伸缩性)。

actor系统
在这里插入图片描述

每个actor是一个单一的线程,它不断地从其邮箱中poll(拉取)消息,并且连续不断地处理。对于已经处理过的消息的结果,actor可以改变它自身的内部状态或者发送一个新消息或者孵化一个新的actor

 

2. Akka相关demo

2.1. 创建Akka系统

Akka系统的核心组件包括ActorSystem和Actor,构建一个Akka系统,首先需要创建ActorSystem,然后通过ActorSystem创建Actor。

需要注意的是:

  • Akka不允许直接创建Actor实例,只能通过ActorSystem.actorOf和ActorContext.actorOf等特定接口创建Actor。
  • 只能通过ActorRef与Actor进行通信,ActorRef对原生Actor实例做了良好的封装,外界不能随意修改其内部状态。

如代码所示,Akka系统中包含了创建ActorSystem以及Actor的基本实例。

// 1. 构建ActorSystem
// 使用缺省配置
ActorSystem system = ActorSystem.create("sys");
// 也可显示指定appsys配置
// ActorSystem system1 = ActorSystem.create("helloakka", ConfigFactory.load("appsys"));// 2. 构建Actor,获取该Actor的引用,即ActorRef
ActorRef helloActor = system.actorOf(Props.create(HelloActor.class), "helloActor");// 3. 给helloActor发送消息
helloActor.tell("hello helloActor", ActorRef.noSender());// 4. 关闭ActorSystem
system.terminate();

在Akka中,创建的每个Actor都有自己的路径,该路径遵循 ActorSystem 的层级结构,大致如下:

本地:akka://sys/user/helloActor
远程:akka.tcp://sys@l27.0.0.1:2020/user/remoteActor   - sys,创建的ActorSystem的名字;
- user,通过ActorSystem#actorOf和ActorContext#actorOf 方法创建的 Actor 都属于/user下,其是系统层面创建的,与系统整体行为有关,在开发阶段并不需要对其过多关注;
- helloActor,我们创建的HelloActor。其中远程部分路径含义如下:- akka.tcp,远程通信方式为tcp;
- sys@127.0.0.1:2020,ActorSystem名字及远程主机ip和端口号。

 

2.2. 根据path获取Actor并与之通讯

若提供了Actor的路径,可以通过路径获取到ActorRef,然后与之通信,代码如下所示:

ActorSystem system = ActorSystem.create("sys")ActorSelection as = system.actorSelection("/path/to/actor");Timeout timeout = new Timeout(Duration.create(2, "seconds"));
Future<ActorRef> fu = as.resolveOne(timeout);fu.onSuccess(new OnSuccess<ActorRef>() {@Overridepublic void onSuccess(ActorRef actor) {System.out.println("actor:" + actor);actor.tell("hello actor", ActorRef.noSender());}
}, system.dispatcher());fu.onFailure(new OnFailure() {@Overridepublic void onFailure(Throwable failure) {System.out.println("failure:" + failure);}
}, system.dispatcher());

 

3. Flink RPC框架与Akka的关系

Flink进行RPC通信的组件

如图所示,从Flink RPC节点关系中可以看出,集群运行时中实现了RPC通信节点功能的主要有Dispatcher、ResourceManager和TaskManager以及JobMaster等组件。
借助RPC通信,这些组件共同参与任务提交及运行的整个流程,例如通过客户端向Dispatcher服务提交JobGraph,JobManager向TaskManager提交Task请求,以及TaskManager向JobManager更新Task执行状态等。

在这里插入图片描述

通过AkkaRpcService实现远程通讯能力

从图中也可以看出,集群的RPC服务组件是(1)RpcEndpoint,每个RpcEndpoint包含一个内置的RpcServer负责执行本地和远程的代码请求,(2)RpcServer对应Akka中的Actor实例。RpcEndpoint中创建和启动RpcServer主要是基于集群中的(3)RpcService实现,(4)RpcService的主要实现是AkkaRpcService。
 
从图可以看出,AkkaRpcService将Akka中的ActorSystem进行封装,通过AkkaRpcService可以创建RpcEndpoint中的RpcServer,同时基于AkkaRpcService提供的connect()方法与远程RpcServer建立RPC连接,提供远程进程调用的能力。

 

4.运行时RPC整体架构设计

Flink的RPC框架设计非常复杂,除了基于Akka构建了底层通信系统之外,还会使用JDK动态代理构建RpcGateway接口的代理类。

在这里插入图片描述

Flink RPC UML关系图

这里我们简单梳理一下RPC架构涉及的组件以及每种组件的作用。

  1. 集群RPC组件的基本实现类:

RpcEndpoint提供了集群RPC组件的基本实现,所有需要实现RPC服务的组件都会继承RpcEndpoint抽象类。
RpcEndpoint中包含了endpointId,用于唯一标记当前的RPC节点。RpcEndpoint借助RpcService启动内部RpcServer,之后通过RpcServer完成本地和远程线程执行。

  1. 基本实现类与FencedToken对比

对于RpcEndpoint来讲,底层主要有FencedRpcEndpoint基本实现类。
实现FencedRpcEndpoint的RPC节点都会有自己的FencedToken,当进行远程RPC调用时,会对比访问者分配的FencedToken和被访问者的FencedToken,结果一致才会进行后续操作。

  1. RpcEndpoint的实现类有TaskExecutor组件,FencedRpcEndpoint的实现类有Dispatcher、JobMaster以及ResourceManager等组件。这些组件可以获取RpcService中ActorSystem的dispatcher服务,并直接通过dispatcher创建Task线程实例
  2. RpcService提供了创建和启动RpcServer的方法。

在启动RpcServer的过程中,通过RpcEndpoint的地址创建Akka Actor实例,并基于Actor实例构建RpcServer接口的动态代理类,向RpcServer的主线程中提交Runnable以及Callable线程等。
同时在RpcService中提供了连接远程RpcEndpoint的方法,并创建了相应RpcGateway接口的动态代理类,用于执行远程RPC请求。

  1. RpcServer接口通过AkkaInvocationHandler动态代理类实现,所有远程或本地的执行请求最终都会转换到AkkaInvocationHandler代理类中执行。

AkkaInvocationHandler实现了MainThreadExecutable接口,提供了runAsync(Runnable runnable)以及callAsync(Callable<V> callable, Time callTimeout)等在主线程中执行代码块的功能。例如在TaskExecutor中释放Slot资源时,会调用runAsync()方法将freeSlotInternal()方法提交到TaskExecutor对应的RpcServer中运行,此时就会调用AkkaInvocationHandler在主线程中执行任务.

 

5. RpcEndpoint的设计与实现

RpcEndpoint是集群中RPC组件的端点,每个RpcEndpoint都对应一个由endpointId和actorSystem确定的路径,且该路径对应同一个Akka Actor。

如图,所有需要实现RPC通信的集群组件都会继承RpcEndpoint抽象类,例如TaskExecutor、Dispatcher以及ResourceManager组件服务,还包括根据JobGraph动态创建和启动的JobMaster服务。
在这里插入图片描述

从图中我们可以看出,RpcEndpoint实现了RpcGateway和AutoCloseableAsync两个接口,其中 RpcGateway 提供了动态获取RpcEndpoint中Akka地址和HostName的方法。

因为JobMaster组件在任务启动时才会获取Akka中ActorSystem分配的地址信息,所以借助RpcGateway接口提供的方法就能获取Akka相关连接信息。

 
RpcEndpoint中包含RpcService、RpcServer以及MainThreadExecutor三个重要的成员变量,其中

  • RpcService是RpcEndpoint的后台管理服务
  • RpcServer是RpcEndpoint的内部服务类
  • MainThreadExecutor封装了MainThreadExecutable接口,其主要底层实现是AkkaInvocationHandler代理类。所有本地和远程的RpcGateway执行请求都会通过动态代理的形式转换到AkkaInvocationHandler代理类中执行。

在这里插入图片描述

 
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/693344.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

利用nbsp设置空格

想要实现上面效果&#xff0c;一开始直接<el-col :span"8" >{{ item.name }} </el-col> 或者<el-col :span"8" >{{ item.name }}</el-col>或者<el-col :span"8" >{{ item.name }}</el-col> 都无…

深入浅出JVM(三)之HotSpot虚拟机类加载机制

HotSpot虚拟机类加载机制 类的生命周期 什么叫做类加载? 类加载的定义: JVM把描述类的数据从Class文件加载到内存,并对数据进行校验,解析和初始化,最终变成可以被JVM直接使用的Java类型(因为可以动态产生,这里的Class文件并不是具体存在磁盘中的文件,而是二进制数据流) 一个…

善于利用GPT确实可以解决许多难题

当我设计一个导出Word文档的功能时&#xff0c;我面临了一个挑战。在技术选型时&#xff0c;我选择了poi-tl这个模板引擎&#xff0c;因为在网上看到了很多关于它的推荐。poi-tl可以根据模板快速导出Word文档。虽然之前没有做过类似的功能&#xff0c;而且项目中也没有用过&…

开年喜报!Walrus成功入选CNCF云原生全景图

近日&#xff0c;数澈软件 Seal &#xff08;以下简称“Seal”&#xff09;旗下开源应用管理平台 Walrus 成功入选云原生计算基金会全景图&#xff08;CNCF Landscape&#xff09;并收录至 “App Definition and Development - Application Definition & Image Build”板块…

Encoder-decoder 与Decoder-only 模型之间的使用区别

承接上文&#xff1a;Transformer Encoder-Decoer 结构回顾 笔者以huggingface T5 transformer 对encoder-decoder 模型进行了简单的回顾。 由于笔者最近使用decoder-only模型时发现&#xff0c;其使用细节和encoder-decoder有着非常大的区别&#xff1b;而huggingface的借口为…

热阻基础理论 --NMOS温度评估

热阻基础理论 器件 温度差 功率 * 热阻 MOS应用实例 1.假如MOS管悬挂或者外壳贴到散热器上&#xff0c;就意味着用CASE到空气的散热热阻会很大&#xff0c; 如下图中的20。 2. 假如MOS管金属面焊接到PCB上&#xff0c;就意味着用CASE到空气的散热热阻会很校&#xff0c; 如…

计算机设计大赛 深度学习人脸表情识别算法 - opencv python 机器视觉

文章目录 0 前言1 技术介绍1.1 技术概括1.2 目前表情识别实现技术 2 实现效果3 深度学习表情识别实现过程3.1 网络架构3.2 数据3.3 实现流程3.4 部分实现代码 4 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习人脸表情识别系…

vmware的ubuntu虚拟机因空间满无法启动

正在虚拟机编译android源代码&#xff0c;没注意空间不足&#xff0c;结果回来发现了 Assuming drive cache: write through 的问题&#xff0c;经查是空间不足的原因 按照这个教程&#xff0c;清除出来部分空间&#xff0c;才能进去系统&#xff0c;并且对系统空间做下优化 …

为什么运维要转行

为什么运维要转行 粉丝提问&#xff1a; 在各种APP里经常看到&#xff0c;趁年轻赶紧远离运维&#xff0c;为什么&#xff1f; 互联网老兵是这样回答的&#xff1a; 运维有很多分类&#xff0c;有干实施运维的&#xff0c;有干交付运维的&#xff0c;也有自动化运维&#xf…

07 Redis之持久化(RDB+AOF)

4 Redis持久化 Redis 是一个内存数据库&#xff0c;然而内存中的数据是不持久的&#xff0c;若主机宕机或 Redis 关机重启&#xff0c;则内存中的数据全部丢失。 当然&#xff0c;这是不允许的。Redis 具有持久化功能&#xff0c;其会按照设置以快照或操作日志的形式将数据持…

Stable Diffusion WebUI 界面介绍

本文收录于《AI绘画从入门到精通》专栏&#xff0c;专栏总目录&#xff1a;点这里。 大家好&#xff0c;我是水滴~~ 本文主要对 Stable Diffusion WebUI 的界面进行简单的介绍&#xff0c;让你对该 WebUI 有个大致的了解&#xff0c;为后面的深入学习打下一个基础。主要内容包…

《VitePress 简易速速上手小册》第1章:VitePress 入门(2024 最新版)

文章目录 1.1 VitePress 简介与架构1.1.1 基础知识点解析1.1.2 重点案例&#xff1a;企业文档站点1.1.3 拓展案例 1&#xff1a;个人博客1.1.4 拓展案例 2&#xff1a;产品展示网站 1.2 安装与初次运行1.2.1 基础知识点解析1.2.2 重点案例&#xff1a;公司内部知识分享平台1.2.…

ts 枚举类型原理及其应用详解

ts 枚举类型介绍 TypeScript的枚举类型是一种特殊的数据类型&#xff0c;它允许开发者为一组相关值定义一个共同的名称&#xff0c;使我们可以更清晰、更一致地使用这些值。 枚举类型在TypeScript中用enum关键字定义&#xff0c;每个枚举值默认都是数字类型&#xff0c;从0开…

前端 webSocket 的使用

webSocket使用 注意要去监听websocket 对象事件&#xff0c;处理我们需要的数据 我是放在了最外层的index 内&#xff0c;监听编辑状态&#xff0c;去触发定义的方法。因为我这个项目是组件化开发&#xff0c;全部只有一个总编辑按钮&#xff0c;我只需监听是否触发了编辑即可…

为什么2023年是AI视频的突破年,以及对2024年的预期#a16z

2023年所暴露的AI生成视频的各种问题&#xff0c;大部分被OpenAI发布的Sora解决了吗&#xff1f;以下为a16z发布的总结&#xff0c;在关键之处&#xff0c;我做了OpenAI Sora的对照备注。 推荐阅读&#xff0c;了解视频生成技术进展。 Why 2023 Was AI Video’s Breakout Year,…

Qt|大小端数据转换(补充)

Qt|大小端数据转换-CSDN博客 之前这篇文章大小端数据转换如果是小数就会有问题。 第一个方法&#xff1a; template <typename T> static QByteArray toData(const T &value, bool isLittle) {QByteArray data;for (int i 0; i < sizeof(T); i) {int bitOffset…

vue3 用xlsx 解决 excel 低版本office无法打开问题

需求背景解决思路解决效果将json导出为excel将table导为excel导出样式 需求背景 原使用 vue3-json-excel &#xff0c;导致在笔记本office环境下&#xff0c;出现兼容性问题 <vue3-json-excel class"export-btn" :fetch"excelGetList" :fields"js…

【Python程序开发系列】利用git实现协同开发做开源贡献(完整过程)

一、问题 假如我在gitee或者github上看到了一个优质的项目&#xff0c;我想对这个项目做一些深入的研究&#xff0c;并对其进行优化&#xff0c;并最终提交PR做出贡献。但是这个项目需要或者最好在虚拟机上或服务器上运行&#xff0c;虚拟机或服务器没有IDE这种代码编辑器&…

2024-02-20(DataX,Spark)

1.Oracle利用DataX工具导出数据到Mysql。Oracle利用DataX工具导出数据到HDFS。 只是根据导入导出的目的地不同&#xff0c;DataX的Json文件书写内容有所不同。万变不离其宗。 书写的Json格式的导入导出规则文件存放再Job目录下的。 2.Spark概念 Apache Spark是用于大规模数…

智能风控体系之逻辑回归

逻辑回归就是这样的一个过程&#xff1a;面对一个回归或者分类问题&#xff0c;建立代价函数&#xff0c;然后通过优化方法迭代求解出最优的模型参数&#xff0c;然后测试验证我们这个求解的模型的好坏。在信贷风控领域最常用的广义线性模型就是逻辑回归。其实逻辑回归线性可分…