【Flink集群RPC通讯机制(三)】AkkaRpcActor设计与实现:接收RPC消息以及处理逻辑

文章目录

    • 1. 创建Receiver
    • 2. 进行消息处理

RPC请求发送后接收方的处理逻辑

在RpcEndpoint中创建的RemoteRpcInvocation消息,最终会通过Akka系统传递到被调用方。例如TaskExecutor向ResourceManager发送SlotReport请求的时候,会在TaskExecutor中将ResourceManagerGateway的方法名称和参数打包成RemoteRpcInvocation对象。然后经过网络发送到ResourceManager中的AkkaRpcActor,处理请求。

接下来深入了解AkkaRpcActor的设计与实现,了解在AkkaRpcActor中如何接收RemoteRpcInvocation消息并执行后续的操作。

 

1. 创建Receiver

如代码所示,首先在AkkaRpcActor中创建Receive对象,用于处理Akka系统接收的其他Actor发送过来的消息。

Receiver相关能力

在AkkaRpcActor中主要创建了RemoteHandshakeMessage、ControlMessages等消息对应的处理器,

  • 其中RemoteHandshakeMessage主要用于进行正式RPC通信之前的网络连接检测,保障RPC通信正常。
  • ControlMessages用于控制Akka系统,例如启动和停止Akka Actor等控制消息。这里我们重点关注第三种类型的消息,即在集群运行时中RPC组件通信使用的Message类型,此时会调用handleMessage()方法对这类消息进行处理。
public Receive createReceive() {return ReceiveBuilder.create().match(RemoteHandshakeMessage.class, this::handleHandshakeMessage).match(ControlMessages.class, this::handleControlMessage).matchAny(this::handleMessage).build();
}

 

2. 进行消息处理

在AkkaRpcActor.handleMessage()方法中,最终会调用handleRpcMessage()方法继续对RPC消息进行处理。

如下代码:


//根据RPC消息类型,进行不同方式处理
protected void handleRpcMessage(Object message) {if (message instanceof RunAsync) {//将代码块提交到本地线程池中执行handleRunAsync((RunAsync) message);} else if (message instanceof CallAsync) {handleCallAsync((CallAsync) message);} else if (message instanceof RpcInvocation) {handleRpcInvocation((RpcInvocation) message);} else {// 省略部分代码sendErrorIfSender(new AkkaUnknownMessageException("Received unknown message " + message +  " of type " +  message.getClass().getSimpleName() + '.'));}
}

接着看AkkaRpcActor.handleRpcInvocation()方法逻辑:

  1. 判断当前RpcEndpoint是否实现了指定rpcMethod。

例如JobMaster调用ResourceManagerGateway.requestSlot()方法,会在lookupRpcMethod()方法中判断当前ResourceManager实现的Endpoint是否提供了该方法的实现。

  1. 当rpcMethod不为空时,rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs())
  2. 调用sendAsyncResponse()、sendSyncResponse()方法通过Akka系统将RpcMethod返回值返回给调用方。
private void handleRpcInvocation(RpcInvocation rpcInvocation) {Method rpcMethod = null;try {String methodName = rpcInvocation.getMethodName();Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();rpcMethod = lookupRpcMethod(methodName, parameterTypes);} catch (ClassNotFoundException e) {// 省略部分代码}if (rpcMethod != null) {try {rpcMethod.setAccessible(true);if (rpcMethod.getReturnType().equals(Void.TYPE)) {// 没有返回值的情况rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());}else {// 有返回值的情况final Object result;try {result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());}catch (InvocationTargetException e) {getSender().tell(new Status.Failure(e.getTargetException()), getSelf());return;}final String methodName = rpcMethod.getName();if (result instanceof CompletableFuture) {final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;sendAsyncResponse(responseFuture, methodName);} else {sendSyncResponse(result, methodName);}}} catch (Throwable e) {log.error("Error while executing remote procedure call {}.", rpcMethod, e);// 通知错误信息getSender().tell(new Status.Failure(e), getSelf());}}
}

接下来从更加宏观的角度了解各组件之间如何基于已经实现的RPC框架进行通信,进一步加深对Flink中RPC框架的了解。

 
 
参考:《Flink设计与实现:核心原理与源码解析》–张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/696975.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

petalinux_zynq7 驱动DAC以及ADC模块之二:petalinux

petalinux_zynq7 C语言驱动DAC以及ADC模块之一&#xff1a;建立IPhttps://blog.csdn.net/qq_27158179/article/details/136234296在上一篇&#xff0c;建立了ADC和DAC两个IP。这里继续。本文在 petalinux默认配置的基础上&#xff0c;添加了python和qt。再编译出sdk可以给x86主…

汽车智能座舱中 显示屏市场战略趋势分析 中篇

今天主要讲讲主流车厂显示屏的趋势。 主流车厂的中控&液晶仪表屏的尺寸及趋势汇总 奔驰 奔驰A级 10.2510.25 奔驰C级 12.310.25 奔驰GLA 10.2510.25 奔驰E级 12.312.3 奔驰S级 12.312.8 1、奔驰的仪表几乎都为液晶仪表&#xff0c;几乎所有车型都有HUD的选配&#xff…

大功率应用中的厚膜电阻散热器的设计?

在许多大功率应用中&#xff0c;例如电机和电源&#xff0c;电源电阻器位于主电源线中。它们的目的是防止损坏或提供一定程度的控制。 在这些应用中&#xff0c;电阻器承受恒定的、相对较高的电流。当电流流过电阻器时&#xff0c;它会产生热量。这种热能必须消散到环境中&…

1、WEB攻防-通用漏洞SQL注入MYSQL跨库ACCESS偏移

用途&#xff1a;个人学习笔记&#xff0c;欢迎指正&#xff01; 前言&#xff1a; 为了网站和数据库的安全性&#xff0c;MYSQL 内置有 ROOT 最高用户&#xff0c;划分等级&#xff0c;每个用户对应管理一个数据库&#xff0c;这样保证无不关联&#xff0c;从而不会影响到其他…

Autosar-WdgM配置详解-3

1.11生成代码解析 1.11.1MasterSWC代码解析 在MasterSWC的RE_TestRun这个runnable里会调用两个检测点函数,我们可以在两个检测点函数之间,加入我们所需要监控的代码。 Rte_Call_RPort_StartCheckPoint_CheckpointReached(); Rte_Call_RPort_EndCheckPoint_CheckpointReac…

C#串口 Modbus通讯工具类

一、安装Modbus包 二、创建modbushelper类 1、打开串口 public bool IfCOMOpend; //用于实例内的COM口的状态 public SerialPort OpenedCOM;//用于手动输入的COM转成SERIAL PORT /// <summary> /// 打开串口 /// </summary> /// <param name="COMname&quo…

unity小工具-非实时的值变化监听器

项目里有代码专门监听网络环境的变化&#xff0c;特别是在下载中&#xff0c;如果遇到断网或者切换为移动网络&#xff0c;可能需要触发提醒等等。这种需求可能是通用的&#xff0c;于是便写了一个通用的监听代码。是 using System; using System.Collections; using System.C…

c++服务器开源项目Tinywebserver运行

c服务器开源项目Tinywebserver运行 一、Tinywebserver介绍二、环境搭建三、构建数据库四、编译Tinywebserver五、查看效果 Tinywebserver是github上一个十分优秀的开源项目&#xff0c;帮助初学者学习如何搭建一个服务器。 本文讲述如何在使用mysql跟该项目进行连接并将项目运行…

python 层次分析(AHP)

文章目录 一、算法原理二、案例分析2.1 构建指标层判断矩阵2.2 求各指标权重2.2.1 算术平均法&#xff08;和积法&#xff09;2.2.2 几何平均法&#xff08;方根法&#xff09; 2.3 一致性检验2.3.1 求解最大特征根值2.3.2 求解CI、RI、CR值2.3.3 一致性判断 2.4 分别求解方案层…

利用Ubuntu22.04启动U盘对电脑磁盘进行格式化

概要&#xff1a; 本篇演示利用Ubuntu22.04启动U盘的Try Ubuntu模式对电脑磁盘进行格式化 一、说明 1、电脑 笔者的电脑品牌是acer(宏碁/宏基) 开机按F2进入BIOS 开机按F12进入Boot Manager 2、Ubuntu22.04启动U盘 制作方法参考笔者的文章&#xff1a; Ubuntu制作Ubun…

【OpenAI官方课程】第五课:ChatGPT文本转换Transforming

欢迎来到ChatGPT 开发人员提示工程课程&#xff08;ChatGPT Prompt Engineering for Developers&#xff09;&#xff01;本课程将教您如何通过OpenAI API有效地利用大型语言模型&#xff08;LLM&#xff09;来创建强大的应用程序。 本课程由OpenAI 的Isa Fulford和 DeepLearn…

缓存篇—缓存雪崩

什么是缓存雪崩 通常我们为了保证缓存中的数据与数据库中的数据一致性&#xff0c;会给 Redis 里的数据设置过期时间&#xff0c;当缓存数据过期后&#xff0c;用户访问的数据如果不在缓存里&#xff0c;业务系统需要重新生成缓存&#xff0c;因此就会访问数据库&#xff0c;并…

QEMU源码全解析 —— virtio(22)

接前一篇文章&#xff1a;QEMU源码全解析 —— virtio&#xff08;21&#xff09; 前几回讲解了virtio驱动的加载。本回开始讲解virtio驱动的初始化。 在讲解virtio驱动的初始化之前&#xff0c;先要介绍virtio配置的函数集合变量virtio_pci_config_ops。实际上前文书也有提到…

c# HttpCookie操作,建立cookie工具类

HttpCookie 是一个在.NET Framework中用于管理和操作HTTP Cookie的类。它提供了一种方便的方式来创建、设置、读取和删除Cookie。 Cookie是一种在客户端和服务器之间传递数据的机制&#xff0c;用于跟踪用户的会话状态和存储用户相关的信息。它通常由服务器发送给客户端&#…

万字干货-京东零售数据资产能力升级与实践

开篇 京东自营和商家自运营模式&#xff0c;以及伴随的多种运营视角、多种组合计算、多种销售属性等数据维度&#xff0c;相较于行业同等量级&#xff0c;数据处理的难度与复杂度都显著增加。如何从海量的数据模型与数据指标中提升检索数据的效率&#xff0c;降低数据存算的成…

parallels配置centos虚拟环境

parallels Desktop M1/M2芯片Parallels Desktop 19虚拟机安装使用教程&#xff08;超详细&#xff09;-CSDN博客 下镜像记得找和mac芯片匹配的 安装就选第一个centos7不要选第二个 安装有问题就选回退重启 parallel desktop 18/19安装centos7.2009教程_parallels desktop 19…

echarts多y轴样式重叠问题

1、主要属性设置 yAxis: [{//y轴1nameTextStyle: {align: "right",padding: 0}},{//y轴2nameTextStyle: {align: "left",padding: 0}},{//y轴3axisLabel: {margin: 50},nameTextStyle: {align: "left",padding: [0, 0, 0, 50]},axisPointer: {l…

Python Web开发记录 Day2:CSS

名人说&#xff1a;莫道桑榆晚&#xff0c;为霞尚满天。——刘禹锡&#xff08;刘梦得&#xff0c;诗豪&#xff09; 创作者&#xff1a;Code_流苏(CSDN)&#xff08;一个喜欢古诗词和编程的Coder&#x1f60a;&#xff09; 目录 二、CSS1、CSS-初始入门①快速了解②CSS应用方式…

【C语言】sizeof()函数

前言 sizeof函数用于获取数据类型或变量在内存中所占的字节数。 sizeof函数返回的是编译时确定的值&#xff0c;不会计算动态分配的内存大小。 sizeof函数可以用于多种类型的数据&#xff0c;包括数组、指针、结构体、枚举等。 1.数组 int arr[5];printf("%zu ", siz…

文件上传与下载

文件上传与下载 1. 文件上传 为了能上传文件&#xff0c;必须将表单的 method 设置为 POST&#xff0c;并将 enctype 设置为 multipart/form-data 。 有两种实现文件上传的方式&#xff1a; 底层使用 Apache Commons FileUpload 包 底层使用 Servlet 3.1 内置的文件上传功能…