DangerWind-RPC-framework---二、动态代理

       RPC调用需要达到的效果是,远程调用某方法就像本地调用一样,以下列代码为例:

@Component
public class HelloController {@RpcReference(version = "version1", group = "test1")private HelloService helloService;public void test() throws InterruptedException {// 远程调用,就像本地调用String hello = this.helloService.hello(new Hello("111", "222"));Thread.sleep(12000);for (int i = 0; i < 10; i++) {System.out.println(helloService.hello(new Hello("111", "222")));}}

       调用HelloService的hello方法实际上是向RPC Server端发起远程调用,并且应该像本地调用一样。如Apache Thrift框架,通过配置调用的server端服务信息,即可像本地调用一样发起远程调用。这是如何做到的呢?实际上是使用的动态代理技术。在代理类中与RPC Server端建立连接进行通信。比如HelloService是RPC调用的interface,这样的interface还会有很多,如UserService、TradeService,我们没有办法为每个Service编写实现类并在每个实现类中均编写远程通信的逻辑,这样也失去了作为RPC框架的意义,因此需要使用统一的动态代理类,调用各个Service时实际上是调用生成的代理类,在动态代理类中统一封装远程调用逻辑以实现远程通信。

        代理类的生成与装配在Spring Bean的生命周期中完成,具体步骤计划在BeanPostProcessor的postProcessAfterInitialization中完成,代码如下:

    @Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class<?> targetClass = bean.getClass();Field[] declaredFields = targetClass.getDeclaredFields();for (Field declaredField : declaredFields) {RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);if (rpcReference != null) {RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder().group(rpcReference.group()).version(rpcReference.version()).build();RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceConfig);Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());declaredField.setAccessible(true);try {declaredField.set(bean, clientProxy);} catch (IllegalAccessException e) {e.printStackTrace();}}}return bean;}

       在各个Bean初始化的过程中,都会历经此步骤,重写方法后首先通过反射获取Bean的成员变量,并检查各个成员变量上是否有RpcReference注解,有的话证明是RPC调用相关interface,需要为该接口装配动态代理类,装配过程通过反射完成。

       接下来是动态代理的实现方式以及具体的调用逻辑,代码如下:

    @SneakyThrows@SuppressWarnings("unchecked")@Overridepublic Object invoke(Object proxy, Method method, Object[] args) {log.info("invoked method: [{}]", method.getName());RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName()).parameters(args).interfaceName(method.getDeclaringClass().getName()).paramTypes(method.getParameterTypes()).requestId(UUID.randomUUID().toString()).group(rpcServiceConfig.getGroup()).version(rpcServiceConfig.getVersion()).build();RpcResponse<Object> rpcResponse = null;if (rpcRequestTransport instanceof NettyRpcClient) {// 考虑主线程与NIOEventLoopGroup工作线程间的通信CompletableFuture<RpcResponse<Object>> completableFuture = (CompletableFuture<RpcResponse<Object>>) rpcRequestTransport.sendRpcRequest(rpcRequest);rpcResponse = completableFuture.get();}if (rpcRequestTransport instanceof SocketRpcClient) {rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);}this.check(rpcResponse, rpcRequest);return rpcResponse.getData();}

        RpcClientProxy通过实现InvocationHandler接口来进行动态代理,每次调用接口是实际上调用的是代理类的invoke方法。在该方法中首先封装RPC远程调用的Request信息,如接口名、方法名、参数类型、参数值、请求ID,group与version等信息。

        接下来是发送RPC请求的过程,代码如下:

    @Overridepublic Object sendRpcRequest(RpcRequest rpcRequest) {// build return valueCompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();// get server addressInetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);// get  server address related channelChannel channel = getChannel(inetSocketAddress);if (channel.isActive()) {// put unprocessed requestunprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest).codec(SerializationTypeEnum.HESSIAN.getCode()).compress(CompressTypeEnum.GZIP.getCode()).messageType(RpcConstants.REQUEST_TYPE).build();channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (future.isSuccess()) {log.info("client send message: [{}]", rpcMessage);} else {future.channel().close();// 发送失败,设置异常信息resultFuture.completeExceptionally(future.cause());log.error("Send failed:", future.cause());}});} else {throw new IllegalStateException();}return resultFuture;}

       以Netty框架实现的通信为例,大致过程如下:首先从注册中心拉取服务列表并通过负载均衡算法获取到具体server端的机器,接下来连接对应的IP与端口号获取交换信息的channel。

        之后将对应的future保存到对应的未处理完成请求的集合中(只有一行代码,却是关键的一步),最后设置序列化算法以及压缩方式,将消息发出。

  public class UnprocessedRequests {private static final Map<String, CompletableFuture<RpcResponse<Object>>> UNPROCESSED_RESPONSE_FUTURES = new ConcurrentHashMap<>();public void put(String requestId, CompletableFuture<RpcResponse<Object>> future) {UNPROCESSED_RESPONSE_FUTURES.put(requestId, future);}public void complete(RpcResponse<Object> rpcResponse) {CompletableFuture<RpcResponse<Object>> future = UNPROCESSED_RESPONSE_FUTURES.remove(rpcResponse.getRequestId());if (null != future) {future.complete(rpcResponse);} else {throw new IllegalStateException();}}
}

       CompletableFuture的使用是对这一过程的优化,更加优雅地实现了等待请求响应的过程。后续接收到server端响应时,处理线程是EventLoop线程,该线程拿到响应结果后可以为future设置结果,阻塞获取结果的主线程可以获取到结果。事实上,CompletableFuture的get方法也是一个同步阻塞的过程,但是这种获取响应的方式显得更加清晰,很好地实现了线程间通信,如接收到消息后可以调用complete方法设置响应结果,阻塞等待的主线程即可获取到结果。并且当消息发送失败时也可灵活设置异常信息。

       接下来对这一过程中的关键步骤进行介绍,首先是lookupService的过程,代码如下:

    @Overridepublic InetSocketAddress lookupService(RpcRequest rpcRequest) {String rpcServiceName = rpcRequest.getRpcServiceName();CuratorFramework zkClient = CuratorUtils.getZkClient();List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName);if (CollectionUtil.isEmpty(serviceUrlList)) {throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName);}// load balancingString targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);log.info("Successfully found the service address:[{}]", targetServiceUrl);String[] socketAddressArray = targetServiceUrl.split(":");String host = socketAddressArray[0];int port = Integer.parseInt(socketAddressArray[1]);return new InetSocketAddress(host, port);}

       不难看出,首先根据服务名拼接了zk的前缀,从zk上获取了子节点,对应路径下的子节点即为服务列表,之后通过负载均衡算法(后续介绍)从这些机器实例中选择一台实例进行访问。机器的IP地址和端口号也可以获取(注册时的叶子结点带上了对应信息)。

   private static final Map<String, List<String>> SERVICE_ADDRESS_MAP = new ConcurrentHashMap<>();public static List<String> getChildrenNodes(CuratorFramework zkClient, String rpcServiceName) {if (SERVICE_ADDRESS_MAP.containsKey(rpcServiceName)) {return SERVICE_ADDRESS_MAP.get(rpcServiceName);}List<String> result = null;String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;try {result = zkClient.getChildren().forPath(servicePath);SERVICE_ADDRESS_MAP.put(rpcServiceName, result);registerWatcher(rpcServiceName, zkClient);} catch (Exception e) {log.error("get children nodes for path [{}] fail", servicePath);}return result;}

        SERVICE_ADDRESS_MAP起到了本地缓存的作用,第一次访问时缓存中必然不会有数据,需要访问ZK进行获取,将获取到的结果进行缓存。并且需要注意的是,服务端随时可能有机器上下线,缓存列表也需要及时更新,这就需要使用ZK的Watcher机制,Curator框架对其进行了封装,代码如下:       

   private static void registerWatcher(String rpcServiceName, CuratorFramework zkClient) throws Exception {String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true);// Curator框架此处的作用// 1. 缓存节点到本地,当节点发生变化时,会使用监听机制自动更新框架缓存// 2. 监听节点变化,当节点发生变化时,会触发监听器,拉取最新的服务列表到本地缓存(CHM)// 3. ZK原生的Watcher机制触发一次回调后就会失效,框架会再次自动注册Watcher事件PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> {List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicePath);SERVICE_ADDRESS_MAP.put(rpcServiceName, serviceAddresses);};pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);pathChildrenCache.start();}

        Curator框架此处的作用如注释所言:首先是缓存节点到本地,当节点发生变化时,会使用监听机制自动更新框架缓存;监听节点变化,当节点发生变化时,会触发监听器,拉取最新的服务列表到本地缓存(ConcurrentHashMap);ZK原生的Watcher机制触发一次回调后就会失效,框架会再次自动注册Watcher事件。

        拿到一台具体的机器后需要和对应的机器建立连接,代码如下:

    @SneakyThrowspublic Channel doConnect(InetSocketAddress inetSocketAddress) {CompletableFuture<Channel> completableFuture = new CompletableFuture<>();//连接操作和回调是在 Netty 的 EventLoop 线程中执行的,而 CompletableFuture.get() 是在调用 doConnect 方法的线程中执行的,因此 CompletableFuture 起到了线程间通信的作用。bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {if (future.isSuccess()) {log.info("The client has connected [{}] successful!", inetSocketAddress.toString());completableFuture.complete(future.channel());} else {throw new IllegalStateException();}});return completableFuture.get();}

       这里同样使用了CompletableFuture,起到了线程间通信的作用,可以在与对应机器建立好连接后获取channel连接。将消息封装好后从channel刷出,如此便完成了RPC动态代理需要封装的逻辑,从而实现通过动态代理类屏蔽RPC远程调用细节的需求,并且可以在运行时完成代理类的装配。

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/44039.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

树莓派采集系统

树莓派&#xff08;Raspberry Pi&#xff09;是一款非常受欢迎的小型单板计算机&#xff0c;因其低成本、低功耗以及丰富的I/O接口&#xff0c;非常适合用来搭建数据采集系统。无论是环境监测、智能家居、工业自动化&#xff0c;还是科学实验&#xff0c;树莓派都能胜任。以下是…

Spring Boot Vue 毕设系统讲解 7

数据仓库 HIVE实战 ConfigurationProperties(prefix "hive") Data public class HiveDruidConfig {private String url;private String user;private String password;private String driverClassName;private int initialSize;private int minIdle;private int ma…

gcc: options: -specs

spec *[spek] n. 投机, 投机事业, 规格, 说明书, 专业人员 【化】 加工单 编译器读取了file之后,处理这个文件,是为了覆盖默认的编译选项,这些选项会被gcc驱动传递给cc1/cc1plus/as/ld等程序。可以设置多个-specs文件,但是会根据顺序来处理,从左到右。 -specs=file Proce…

机器学习笔记:初始化0的问题

1 前言 假设我们有这样的两个模型&#xff1a; 第一个是逻辑回归 第二个是神经网络 他们的损失函数都是交叉熵 sigmoid函数的导数&#xff1a; 他们能不能用0初始化呢&#xff1f; 2 逻辑回归 2.1 求偏导 2.1.1 结论 2.1.2 L对a的偏导 2.1.3 对w1&#xff0c;w2求偏导 w2同…

提升SQL查询效率的终极指南

在面试中&#xff0c;SQL 调优经常是被问及的问题&#xff0c;它可以考察候选人对于 SQL 整体性能优化的理解和掌握程度。一般来说&#xff0c;SQL 调优的步骤可以从以下几个方面入手。 首先&#xff0c;需要准确地定位问题。在面试中&#xff0c;最好能结合具体的业务场景进行…

【组件库】element-plus组件库

文章目录 0. 启动项目1. gc.sh 新增组件2. 本地验证(组件注册的方式)3. 官方文档修改3-1. 左侧菜单3-2 . 配置md文档3-3. 代码问题:文档修改----------------------------------------------4. 将naiveui的split 分割组件【 复制、迁移】到 element-ui-plus组件库4.1 naiveu…

三级_网络技术_11_路由设计技术基础

1.以下协议中不属于内部网关协议的是()。 RIP OSPF BGP IGRP 2.下列关于路由协议的描述中&#xff0c;错误的是()。 RIP协议中&#xff0c;路由器在接收到更新报文后按照最短路径原则更新路由表 RIP协议中&#xff0c;要求路由器周期性的向外发送路由刷新报文 OSPF协议…

linux:命令执行过程【图表】

命令执行过程 步骤描述详细信息1启动终端在CentOS系统上打开终端窗口。可以通过快捷键 Ctrl Alt T 或在图形界面中找到并启动终端应用程序。2输入命令在终端中输入命令&#xff0c;如 ls -l&#xff0c;然后按下回车键。3Shell接收命令Shell&#xff08;如bash&#xff09;…

关于向日葵的P5旁路由

日常生活需要内网穿透的时候越来越多,买了两台P5,p2p 传输 时间延时在 20ms 左右,相当好用 现在的路由器添加静态路由之类的,得开启开发者模式 [ 官方手册中给了,双旁路的用法 (企业级部署)] 如果是个人,可以在常用的服务器上设置静态路由,不用非得在 内网的主要路由器中设置静…

FastReport 指定sql,修改数据源 ( 非DataSet修改 )

FastReport 指定sql&#xff0c;修改数据源&#xff0c;非DataSet修改 介绍报告文件&#xff1a; codetest.frx 文件核心代码&#xff1a;&#xff08;扩展&#xff09;小结一下&#xff1a; 介绍 在FastReport中&#xff0c;经常会遇到需要给 sql 加条件的情况。 &#xff0…

爆破器材期刊

《爆破器材》简介   《爆破器材》自1958年创刊以来&#xff0c;深受广大读者喜爱&#xff0c;是中国兵工学会主办的中央级技术刊物&#xff0c;在国内外公开发行&#xff0c;近几年已发行到10个国家和地区。《爆破器材》杂志被美国著名检索机构《化学文摘》&#xff08;CA&a…

相机光学(二十九)——显色指数(Ra)

显指Ra是衡量光源显色性的数值&#xff0c;表示光源对物体颜色的还原能力。显色性是指光源对物体颜色的呈现能力&#xff0c;即光源照射在同一颜色的物体上时&#xff0c;所呈现的颜色特性。通常用显色指数&#xff08;CRI&#xff09;来表示光源的显色性&#xff0c;而显指Ra是…

c# 基础习题答案 20240709

一、实现一个冒泡排序函数 using System;public class Program {public static void Main(){int[] arr { 22,11,33 };BubbleSort(arr);foreach (var item in arr){Console.Write(item " ");}Console.WriteLine();}// 冒泡排序函数public static void BubbleSort(i…

XTuner 微调 LLM:1.8B, 部署

扫码立刻参与白嫖A100&#xff0c;书生大模型微调部署学习活动。亲测有效 内容来源&#xff1a;Tutorial/xtuner/personal_assistant_document.md at camp2 InternLM/Tutorial GitHubLLM Tutorial. Contribute to InternLM/Tutorial development by creating an account on G…

从零手写实现 nginx-26-rewrite url 重写

前言 大家好&#xff0c;我是老马。很高兴遇到你。 我们为 java 开发者实现了 java 版本的 nginx https://github.com/houbb/nginx4j 如果你想知道 servlet 如何处理的&#xff0c;可以参考我的另一个项目&#xff1a; 手写从零实现简易版 tomcat minicat 手写 nginx 系列 …

设计无缝体验:交互设计流程全解析

完整的产品交互设计流程是什么&#xff1f;完整的产品交互设计流程包括研究用户需求、指定信息架构、制作产品原型、进行用户测试和实时发布产品。交互设计就是从人与产品之间的关系入手&#xff0c;通过产品设计来满足大众的日常需求。随着网络技术的流行&#xff0c;产品交互…

工业机床CNC设备如何上云?

工业机床CNC设备如何上云&#xff1f; 工业机床的计算机数控&#xff08;CNC&#xff09;设备实现远程监控数据上云&#xff0c;是现代制造业智能化转型的关键一环。这一过程不仅能够实时监测设备状态、优化生产流程&#xff0c;还能通过大数据分析提升生产效率与产品质量&…

Java包装类简单认识泛型

1 包装类 在 Java 中&#xff0c;由于基本类型不是继承自 Object &#xff0c;为了在泛型代码中可以支持基本类型&#xff0c; Java 给每个基本类型都对应了 一个包装类型。 例如我们之前的基本数据类型和包装类。 1. 装箱和拆箱 2.自动装箱和自动拆箱 2.泛型 1.什么是泛型 …

【C++项目】从零实现一个在线编译器

前言 身为一名程序员&#xff0c;想必大家都有接触过像leetcode这样的刷题网站&#xff0c;不知你们在刷题的过程中是否思考过一个问题&#xff1a;它们是如何实现在线编译运行的功能。如果你对此感到好奇&#xff0c;那么本文将一步步带你来实现一个简易在线编译器。 项目概…

vue3+antdv仿百度网盘样式文件夹管理组件

实现&#xff1a; 默认进入页面时&#xff0c;文件夹全选&#xff1b;文件夹状态&#xff0c;以及文件夹内的文件选择状态&#xff0c;与组件联动文件夹数量&#xff0c;根据后端数据动态生成 实现思路&#xff1a; 将后端数据存到vuex中&#xff0c;增加&#xff08;多选框…