聊聊分布式架构04——RPC通信原理

目录

RPC通信的基本原理

RPC结构

手撸简陋版RPC

知识点梳理

1.Socket套接字通信机制

2.通信过程的序列化与反序列化

3.动态代理

4.反射

思维流程梳理

码起来

服务端时序图

服务端—Api与Provider模块

客户端时序图


RPC通信的基本原理

RPC(Remote Procedure Call)是一种远程过程调用协议,用于在分布式系统中进行远程通信,允许一个计算机程序调用另一个地址空间(通常是在不同的机器上)的函数或过程,就像调用本地函数一样。下面是RPC通信的基本原理:

  1. 客户端调用:远程客户端(调用方)希望调用远程服务器(提供方)上的一个或多个远程过程(函数)。客户端在本地创建一个请求,并指定要调用的远程过程的名称以及传递给该过程的参数。

  2. 参数序列化:在将请求发送到远程服务器之前,客户端需要将参数序列化为字节流或其他适合传输的格式。序列化是将数据转换为可以在网络上传输的形式的过程。

  3. 网络传输:客户端通过网络将请求发送到远程服务器。通常,这涉及到将请求数据打包成网络消息,然后通过网络协议(如HTTP、TCP/IP)将消息发送到服务器的地址。

  4. 服务器接收:远程服务器接收到客户端的请求消息,通常通过网络协议(如HTTP服务器、Socket服务器)监听特定的端口。

  5. 参数反序列化:服务器从请求消息中提取参数数据,并将其反序列化为本地数据结构,以便将其传递给远程过程。

  6. 远程过程调用:服务器调用相应的远程过程,将参数传递给该过程并执行相应的操作。远程过程可以位于服务器的本地代码中或远程服务器上的远程服务中。

  7. 结果序列化:远程过程执行完毕后,服务器将结果序列化为字节流或其他适合传输的格式。

  8. 结果传输:服务器通过网络将结果数据打包成响应消息,并将其发送回客户端。

  9. 客户端接收:客户端接收到服务器的响应消息。

  10. 结果反序列化:客户端从响应消息中提取结果数据,并将其反序列化为本地数据结构。

  11. 客户端处理:客户端可以根据远程过程的执行结果采取相应的行动,可能是继续执行本地代码或返回结果给调用方。

  12. 通信完成:一次RPC调用完成后,客户端和服务器之间的通信过程结束。

RPC结构

  1. 客户端模块代理所有远程方法的调用

  2. 将目标服务、目标方法、调用目标方法的参数等必要信息序列化

  3. 序列化之后的数据包进一步压缩,压缩后的数据包通过网络通信传输到目标服务节点

  4. 服务节点将接受到的数据包进行解压

  5. 解压后的数据包反序列化成目标服务、目标方法、目标方法的调用参数

  6. 通过服务端代理调用目标方法获取结果,结果同样需要序列化、压缩然后回传给客户端

手撸简陋版RPC

总觉得文字描述太干了,有点咽不下去,还是手撸下试试吧。毕竟艾瑞莉娅的奶奶总说:

纸上得来终觉浅,绝知此事要躬行。

知识点梳理
1.Socket套接字通信机制

在Java中,Socket是用于网络通信的基础类之一,它提供了一种机制,通过该机制,计算机程序可以在网络上建立连接、发送数据、接收数据和关闭连接。

  • 服务器套接字(ServerSocket)

    • 服务器套接字用于在服务器端监听并接受客户端的连接请求。

    • 使用以下步骤创建和使用服务器套接字:

      • 创建ServerSocket对象,并绑定到一个特定的端口号。

      • 使用ServerSocket对象的accept()方法来等待客户端的连接请求,并接受连接。

      • 一旦接受连接,可以创建新的Socket对象来处理客户端的通信。

      • 完成通信后,关闭SocketServerSocket连接。

    ServerSocket serverSocket = new ServerSocket(port_number);
    Socket clientSocket = serverSocket.accept(); // 等待连接
    InputStream inputStream = clientSocket.getInputStream();
    OutputStream outputStream = clientSocket.getOutputStream();
    // 使用输入流和输出流进行数据读写
    clientSocket.close();
    serverSocket.close();
    ​
  • 客户端套接字(Socket)

    • 客户端套接字用于连接到远程服务器,发送请求并接收响应。

    • 使用以下步骤创建和使用客户端套接字:

      • 创建Socket对象,指定远程服务器的主机名或IP地址以及端口号。

      • 使用Socket对象的输入流和输出流来进行数据的读取和写入。

      • 完成通信后,关闭Socket连接。

    Socket socket = new Socket("server_hostname", port_number);
    InputStream inputStream = socket.getInputStream();
    OutputStream outputStream = socket.getOutputStream();
    // 使用输入流和输出流进行数据读写
    socket.close();
2.通信过程的序列化与反序列化

在Java中通过 JDK 提供了 Java 对象的序列化方式实现对象序列化传输,主要通过输出流java.io.ObjectOutputStream和输入流java.io.ObjectInputStream来实现;

java.io.ObjectOutputStream:表示对象输出流 , 它的 writeObject(Object obj)方法可以对参数指定的 obj 对象进行序列化,把得到的字节序列写到一个目标输出流中;

java.io.ObjectInputStream:表示对象输入流 ,它的 readObject()方法源输入流中读取字节序列,再把它们反序列化成为一个对象,并将其返回;

3.动态代理

动态代理是在运行时动态生成代理类的方式。Java提供了java.lang.reflect.Proxy类和java.lang.reflect.InvocationHandler接口来实现动态代理。JDK中提供了基于接口动态代理的方法:

        // 创建代理对象MyInterface proxyObject = (MyInterface) Proxy.newProxyInstance(MyInterface.class.getClassLoader(),new Class[]{MyInterface.class},new MyInvocationHandler(realObject));

使用动态代理的目的:

  1. 透明远程调用: 动态代理可以隐藏底层的远程调用细节,使得远程过程调用看起来像本地方法调用一样简单。客户端无需关心网络通信、数据序列化和反序列化等细节,因为这些都由代理对象处理。

  2. 减少重复性代码: 使用动态代理可以减少编写和维护远程调用代码的工作量。代理类负责处理通用的远程调用逻辑,开发者只需关注具体的业务逻辑。

  3. 集中管理远程调用逻辑: 动态代理将远程调用逻辑集中在一个地方,这样可以更容易地管理和维护,例如添加统一的错误处理、日志记录或性能监控等功能。

4.反射

在RPC(Remote Procedure Call,远程过程调用)中使用反射的主要目的是实现透明的远程调用,即使在客户端和服务器之间存在远程分离,也可以像调用本地方法一样调用远程服务。反射在RPC中的具体目的和用途如下:

  1. 动态代理生成代理对象: 反射机制允许在运行时生成代理对象,这些代理对象可以代替实际的远程服务对象执行方法调用。这样,客户端代码不需要提前知道要调用的具体远程方法和对象,而可以动态生成代理并执行方法。

  2. 动态识别方法和参数: 反射允许在运行时识别远程方法和方法参数的名称、类型和数量。这对于将方法调用信息打包成请求并传递给远程服务器非常有用,服务器可以根据这些信息正确地解析请求并调用相应的方法。

  3. 动态序列化和反序列化: 反射可以用于动态地序列化请求和响应数据。在RPC中,请求和响应数据通常需要以某种格式进行序列化和反序列化,以便在网络上进行传输。反射可以帮助动态识别数据类型,将数据转换为适当的格式,并在服务器端进行反序列化。

思维流程梳理

码起来

假设需求是客户端想要访问服务端(ip:prot/helloService/sayHello)上的helloService调用sayHello()。先定义服务端的实现。

服务端时序图

服务端—Api与Provider模块

服务端作为服务提供者,自然需要具备常规的业务模块:Api与Provider模块

Api模块定义简单的HelloService接口,包含两个方法sayHello(String content)和saveUSer(User user)

public interface IHelloService {String sayHello(String content);String saveUSer(User user);
}

定义一个简单对象User类

public class User {private String name;private int age;
​// getter和setterpublic String getName() {return name;}
​public void setName(String name) {this.name = name;}
​public int getAge() {return age;}
​public void setAge(int age) {this.age = age;}
​@Overridepublic String toString() {return "User{" +"name='" + name + '\'' +", age=" + age +'}';}
}

Provider模块定义HelloService接口的实现类HelloServiceImpl(rpc-server-provider模块的pom中需要添加rpc-server-api模块的依赖)

public class HelloServiceImpl implements IHelloService {@Overridepublic String sayHello(String content) {System.out.println("request in sayHello:" + content);return "Say hello:" + content;}
​@Overridepublic String saveUSer(User user) {System.out.println("request in saveUser:" + user);return "Save user success";}
}

定义RpcServerProxy通过代理的方式使用Socket对外暴露服务接口

public class RpcServerProxy {private ExecutorService executorService = Executors.newCachedThreadPool();
​public void publisher(Object service, int port) {ServerSocket serverSocket = null;try {serverSocket = new ServerSocket(port);while (true) {Socket socket = serverSocket.accept(); // 使用accept()等待客户端连接,接收请求executorService.execute(new ProcessorHandler(socket, service)); // 每一个socket交给一个processorHandler处理}} catch (IOException e) {e.printStackTrace();} finally {// 关闭资源,jdk1.7后提供了try-with可以自动关闭if (serverSocket != null) {try {serverSocket.close();} catch (IOException e) {e.printStackTrace();}}}}
}

需要定义一个线程ProcessorHandler对每次的socket请求进行处理

public class ProcessorHandler implements Runnable{
​private Socket socket;private Object service;
​public ProcessorHandler(Socket socket, Object service) {this.socket = socket;this.service = service;}
​
​@Overridepublic void run() {// 使用ObjectOutputStream和ObjectInputStream配合socket的输入流输出流进行序列化和反序列化ObjectOutputStream objectOutputStream = null;ObjectInputStream objectInputStream = null;try {objectInputStream = new ObjectInputStream(socket.getInputStream());
​// 客户端传过来的信息:请求哪个类,哪个方法,方法的参数  ——>  封装为RpcRequest类RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject(); // 拿到客户端通信传递的请求类Object result = invoke(rpcRequest); // 反射调用本地服务
​objectOutputStream = new ObjectOutputStream(socket.getOutputStream());objectOutputStream.writeObject(result);objectOutputStream.flush(); // 手动将缓冲区中的数据强制刷新到输出流中,以确保数据被立即写入底层的输出流。
​} catch (IOException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();} catch (NoSuchMethodException e) {e.printStackTrace();} catch (IllegalAccessException e) {e.printStackTrace();} finally {// 关闭关联资源,jdk1.7后提供了try-with可以自动关闭if (objectInputStream != null) {try {objectInputStream.close();} catch (IOException e) {throw new RuntimeException(e);}}if (objectOutputStream != null) {try {objectOutputStream.close();} catch (IOException e) {throw new RuntimeException(e);}}}}
​private Object invoke(RpcRequest rpcRequest) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {// 反射调用Object[] args = rpcRequest.getParameters(); // 获取客户端传递的RpcRequest中的参数Class<?>[] types = new Class[args.length]; // 获得每个参数的类型for (int i = 0; i < args.length; i++) {types[i] = args.getClass();}Class clazz = Class.forName(rpcRequest.getClassName()); // 根据请求的类名反射加载类Method method = clazz.getMethod(rpcRequest.getMethodName(), types); // 获取类中的方法Object result = method.invoke(service, args); // 进行反射调用方法return result;}
}

定义一个通用的RpcRequest类参与通信过程中的请求处理,这里是需要序列化的

public class RpcRequest implements Serializable {private String className;private String methodName;private Object[] parameters;// getter and setterpublic String getClassName() {return className;}
​public void setClassName(String className) {this.className = className;}
​public String getMethodName() {return methodName;}
​public void setMethodName(String methodName) {this.methodName = methodName;}
​public Object[] getParameters() {return parameters;}
​public void setParameters(Object[] parameters) {this.parameters = parameters;}
}

Provider的app中发布下服务,运行没有报错,服务端OK

public class App 
{public static void main( String[] args ){IHelloService helloService = new HelloServiceImpl();RpcServerProxy rpcServerProxy = new RpcServerProxy();rpcServerProxy.publisher(helloService, 8080); // 把服务发布出去}
}

客户端时序图

定义动态代理RpcClientProxy,JDK的接口代理方式就是一句话:

public class RpcClientProxy {public <T> T clientProxy(final Class<T> interfaceCls, final String host, final int port) {return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port));}
}

定义被代理接口RemoteInvocationHandler

public class RemoteInvocationHandler implements InvocationHandler {private String host;private int port;
​public RemoteInvocationHandler(String host, int port) {this.host = host;this.port = port;}
​@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 客户端请求会进入这里进行包装RpcRequest rpcRequest = new RpcRequest();rpcRequest.setClassName(method.getDeclaringClass().getName());rpcRequest.setMethodName(method.getName());rpcRequest.setParameters(args);// 远程通信交给RpcNetTransportRpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);Object result = rpcNetTransport.send(rpcRequest);return result;}
}

定义通信传输类RpcNetTransport

public class RpcNetTransport {private String host;private int port;
​public RpcNetTransport(String host, int port) {this.host = host;this.port = port;}
​public Object send(RpcRequest rpcRequest) {Socket socket = null;Object result = null;ObjectInputStream objectInputStream = null;ObjectOutputStream objectOutputStream = null;try {socket = new Socket(host, port); // 建立连接
​objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); // 客户端通信写入objectOutputStream.writeObject(rpcRequest);objectOutputStream.flush();
​objectInputStream = new ObjectInputStream(socket.getInputStream()); // 客户端通信输出result = objectInputStream.readObject();return result;} catch (UnknownHostException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();} finally {// 关闭关联资源if (objectInputStream != null) {try {objectInputStream.close();} catch (IOException e) {throw new RuntimeException(e);}}if (objectOutputStream != null) {try {objectOutputStream.close();} catch (IOException e) {throw new RuntimeException(e);}}}return result;}
}

客户端请求远程服务

public class App 
{public static void main( String[] args ){RpcClientProxy rpcClientProxy = new RpcClientProxy();IHelloService helloService = rpcClientProxy.clientProxy(IHelloService.class, "localhost", 8080);String result = helloService.sayHello("Elaine");System.out.println(result);}
}

请求成功,客户端获得返回结果

服务端打印了请求日志

到这里,一个简陋且粗糙的RPC通信版本就好了。

别灰心,要记住艾瑞莉娅的奶奶总说

路漫漫其修选兮,吾将上下而求索。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/97481.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2023-10-07 LeetCode每日一题(股票价格跨度)

2023-10-07每日一题 一、题目编号 901. 股票价格跨度二、题目链接 点击跳转到题目位置 三、题目描述 设计一个算法收集某些股票的每日报价&#xff0c;并返回该股票当日价格的 跨度 。 当日股票价格的 跨度 被定义为股票价格小于或等于今天价格的最大连续日数&#xff08…

全栈开发笔记2:项目部署上线的三种方式

文章目录 最原始的方式宝塔Docker 部署其他 本文为编程导航实战项目学习笔记。 项目部署的三种方式&#xff1a; 最原始方式✅ yum 手动安装 jdk mysql tomcat nginx打包前端项目&#xff0c;放到某个目录&#xff0c;修改 nginx 配置修改线上的 mysql 配置&#xff0c;打包 j…

Mysql bin-log日志恢复数据与物理备份-xtrabackup

主打一个数据备份与恢复 binlog与xtarbackup bin-log日志恢复开启bin-log配置bin-log日志恢复 物理备份-xtrabackup三种备份方式安装xtrabackup备份全量备份增量备份差异备份 bin-log日志恢复 bin-log 日志&#xff0c;就记录对数据库进行的操作&#xff0c;什么增删改的操作全…

深度学习笔记之优化算法(三)动量法的简单认识

机器学习笔记之优化算法——动量法的简单认识 引言回顾&#xff1a;条件数与随机梯度下降的相应缺陷动量法简单认识动量法的算法过程描述附&#xff1a;动量法示例代码 引言 上一节介绍了随机梯度下降 ( Stochastic Gradient Descent,SGD ) (\text{Stochastic Gradient Descen…

基础算法之——【动态规划之路径问题】1

今天更新动态规划路径问题1&#xff0c;后续会继续更新其他有关动态规划的问题&#xff01;动态规划的路径问题&#xff0c;顾名思义&#xff0c;就是和路径相关的问题。当然&#xff0c;我们是从最简单的找路径开始&#xff01; 动态规划的使用方法&#xff1a; 1.确定状态并…

vue3+vite+uniapp 封装一个省市区组件

一、预览图 二、使用前的一些注意事项 只支持在 uniapp vue3 项目中使用支持微信小程序和h5 (app端没有测试过)ui库用的 uview-plus省市区数据用的是 vant-ui 提供的一个赖库 vant/area-data 三、组件代码 <template><u-popup :show"show" type"botto…

软件工程与计算总结(二)软件工程的发展

本章开始介绍第二节内容&#xff0c;主要是一些历史性的东西~ 一.软件工程的发展脉络 1.基础环境因素的变化及其对软件工程的推动 抽象软件实体和虚拟计算机都是软件工程的基础环境因素&#xff0c;它们能从根本上影响软件工程的生产能力&#xff0c;而且是软件工程无法反向…

Zabbix 监控系统安装和部署

Zabbix 监控系统安装和部署 一、zabbix 是什么&#xff1f;1.1、zabbix 监控原理&#xff08;重点&#xff09;1.2、Zabbix 6.0 新特性1.3、Zabbix 6.0 功能组件1.4、数据库1.5、Web 界面1.6、Zabbix Agent1.7、Zabbix Proxy1.8、Java Gateway 二、部署Zabbix 6.02.1、 解决 za…

Configuration of phpstudy and sqli-labs

Go download the app&#xff1a; 小皮面板(phpstudy) - 让天下没有难配的服务器环境&#xff01; (xp.cn) Have done. Then enter the program. Enable both functions&#xff1a; Apache and MySQL. Open the website&#xff1a; Next, Lets make the sqli-liab. GitHub…

基于MDK-Keil环境如何把STM32程序直接下载到SRAM运行

1. 前言 对于 Cortex-M 内核的微控制器&#xff0c;它们都可以支持在 RAM 中执行程序&#xff0c;有些非 ARM 的微控制器是不支持的。 在内部 SRAM 执行程序&#xff0c;有基于以下几方面的原因&#xff1a; 1、所使用的设备可能具有OTP&#xff08;One-time Programmable&a…

笔记一:odoo透视表和图表

透视表 1、首先在xml文件添加pivot 说明&#xff1a;&#xff08;1&#xff09;根元素pivot中属性&#xff1a; disable_linking&#xff1a;设置为True&#xff0c;删除表格单元格到列表视图的链接 display_quantity&#xff1a;设置为True&#xff0c;默认显示“数量”列 d…

关于IDEA中gradle项目bootrun无法进入断点以及gradle配置页面不全的解决方案

问题背景 在使用gradle编写的bootrun&#xff0c;采用debug方式启动项目时&#xff0c;无法进入断点&#xff0c;程序正常运行 并发现象1 此处无法识别为大象图标 点击右键后&#xff0c;没有圈中的这个选项 并发现象2 图片圈中的位置缺失 问题原因 正常的 run 命令是通过…

SoapUI实践:自动化测试、压力测试、持续集成

因为项目的原因&#xff0c;前段时间研究并使用了 SoapUI 测试工具进行自测开发的 api。下面将研究的成果展示给大家&#xff0c;希望对需要的人有所帮助。 SoapUI 是什么&#xff1f; SoapUI 是一个开源测试工具&#xff0c;通过 soap/http 来检查、调用、实现 Web Service …

springboot和vue:十、vue2和vue3的差异+组件间的传值

首先用vue-cli创建一个vue2的项目。 vue2和vue3的差异 main.js的语法有所差别。 vue2是 import Vue from vue import App from ./App.vuenew Vue({render: h > h(App), }).$mount(#app)vue3是 import { createApp } from vue import App from ./App.vuecreateApp(App).…

Java虚拟机内存模型

JVM虚拟机将内存数据分为&#xff1a; 程序计数器、虚拟机栈、本地方法栈、Java堆、方法区等部分。 程序计数器用于存放下一条运行的指令&#xff1b; 虚拟机栈和本地方法栈用于存放函数调用堆栈信息&#xff1b; Java堆用于存放Java程序运行时所需的对象等数据&#xff1b…

学习笔记|串口通信的基础知识|同步/异步|RS232|常见的串口软件的参数|STC32G单片机视频开发教程(冲哥)|第二十集:串口通信基础

目录 1.串口通信的基础知识串口通信(Serial Communication)同步/异步&#xff1f;全双工&#xff1f;常见的串口软件的参数 2.STC32的串口通信实现原理引脚选择&#xff1a;实现分时复用模式选择串口1模式1&#xff0c;模式1波特率计算公式 3.串口通信代码实现编写串口1通信程序…

前端 | AjaxAxios模块

文章目录 1. Ajax1.1 Ajax介绍1.2 Ajax作用1.3 同步异步1.4 原生Ajax 2. Axios2.1 Axios下载2.2 Axios基本使用2.3 Axios方法 1. Ajax 1.1 Ajax介绍 Ajax: 全称&#xff08;Asynchronous JavaScript And XML&#xff09;&#xff0c;异步的JavaScript和XML。 1.2 Ajax作用 …

vue3+elementPlus:el-tree复制粘贴数据功能,并且有弹窗组件

在tree控件里添加contextmenu属性表示右键点击事件。 因右键自定义菜单事件需要获取当前点击的位置&#xff0c;所以此处绑定动态样式来控制菜单实时跟踪鼠标右键点击位置。 //html <div class"box-list"><el-tree ref"treeRef" node-key"id…

python+selenium实现UI自动化(入门篇)

一、基础准备。 python环境安装&#xff0c;参考&#xff1a;CSDN pycharm安装&#xff0c;参考&#xff1a;CSDN 谷歌浏览器驱动配置&#xff0c;参考&#xff1a;CSDN二、新建pycharm项目 截图中&#xff0c;上面是项目地址&#xff08;可以提前在指定位置创建文件夹&#xf…

Redis 主从复制及哨兵模式

目录 1 Redis 主从复制 1.1 主从复制的作用 1.2 主从复制流程 2 搭建Redis 主从复制 2.1 安装 Redis 2.2 修改 Redis 配置文件&#xff08;Master节点操作&#xff09; 2.3 修改 Redis 配置文件&#xff08;Slave节点操作&#xff09; 2.4 验证主从效果 3 Redis 哨兵模…