【BlossomRPC】服务端与客户端请求Handler

文章目录

  • 客户端Handler
  • 服务端Handler

RPC项目

配置中心项目

网关项目

客户端Handler

承接上文,客户端的Handler其实就比较简单了,因为客户端作为接收数据的时候,我们只需要从上文提到的Cache中通过reqId的方式拿到Future/Promise对象,然后设置他们的值,就可以马上进行返回。
客户端代码如下:

import blossom.project.rpc.core.entity.RpcDto;
import blossom.project.rpc.core.entity.RpcCache;
import blossom.project.rpc.core.entity.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;import java.util.Objects;/*** @author: ZhangBlossom* @date: 2023/12/17 02:43* @contact: QQ:4602197553* @contact: WX:qczjhczs0114* @blog: https://blog.csdn.net/Zhangsama1* @github: https://github.com/ZhangBlossom* NettyRpcServerHandler类** 备忘录* 有点烧脑 分析一下这个类怎么用 先睡了* 1.1:当前类是客户端接收到服务器的response了* 1.2:如果没有报错,那么我就要从我的cache中拿到* 我特定reqId对应的promise* 1.3:设定promise的值* 1.3.1:promise一旦被设定,promise.get()的阻塞马上就会结束* 1.3.2:也就是我成功拿到了Server的响应值* 1.3.3:那么Client的这次调用就是成功的* 1.3.4:否则失败* 1.4:删除promise再缓存中的reqId* 1.5:这里如果对future/promise进行设置值之后,代理应该马上返回* 1.6:用promise的setXxx类型方法比较合适**/
@Slf4j
public class NettyRpcClientHandler extends SimpleChannelInboundHandler<RpcDto<RpcResponse>> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcDto<RpcResponse> msg) throws Exception {if (Objects.isNull(msg)){log.info("the RpcDto<Response> is Null,return...");return;}log.info("receive the Rpc Server Data, msg is: {}",msg);long reqId = msg.getHeader().getReqId();//TODO 得到并且删除 考虑一下DefaultPromise是否需要封装DefaultPromise defaultPromise = RpcCache.RESPONSE_CACHE.remove(reqId);defaultPromise.setSuccess(msg.getData());}
}

这里比较重要的是了解一下Future和Promise的特性。
Netty中的异步模型广泛地使用了Future和Promise来处理异步操作。以下是它们的核心作用:

  1. Future:
    ○ Future代表了一个可能还没有完成的异步操作的结果。
    ○ 在Netty中,当你执行一个异步操作(如发送数据),你会得到一个Future对象。这个对象可以用来在未来某个时刻获取操作的结果。
  2. Promise:
    ○ Promise是Future的一个子接口,它不仅代表了异步操作的结果,还可以被操作的执行者显式地标记为成功或失败。
    ○ 在Netty中,Promise用于在操作完成时设置操作的结果(成功或失败)。这是一个写入结果的Future。
    这里使用Promise会更加方便。
    因为Promise提供了setXxx类型方法,这个方法确保一旦被设置值,get/exception就会马上进行返回从而结束阻塞。因此Promise类型非常适合我们当前的场景,同时,还有一个点,就是因为,Netty那边的返回值。
    我们知道Netty使用的是异步处理。
    当我们发送一个请求的时候,我们会拿到一个返回值如下:
ChannelFuture sendFuture = future.channel().writeAndFlush(requestRpcDto);
//继承了Future
public interface ChannelFuture extends Future<Void> 

对于ChannelFuture的处理,有非常非常多种的处理方法。
监听器,sync同步处理,await/get异步处理等。
这里由于我们希望是能显式的拿到客户端请求的返回值,同时减少阻塞等待。
我们不使用原生的方法,也就是我们只是用Netty发送完毕请求,而请求返回值最后的处理,我们通过对上面Cache的处理来进行。
我们只要确保,Cache中对于一个reqId,唯一对应一个Future/Promise对象即可。
然后再客户端拿到数据的时候,通过reqId对Promise进行设置值即可。
这样子就能结束Promise的get方法的阻塞等待。
参考思路和Promise的测试代码如下:

package blossom.project.rpc.core.entity;import io.netty.channel.DefaultEventLoop;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.concurrent.ExecutionException;/*** @author: ZhangBlossom* @date: 2023/12/16 23:39* @contact: QQ:4602197553* @contact: WX:qczjhczs0114* @blog: https://blog.csdn.net/Zhangsama1* @github: https://github.com/ZhangBlossom* RpcPromise类* Promise用来异步处理* 1:Future代表了一个可能还没有完成的异步操作的结果。* 2:在Netty中,当你执行一个异步操作(如发送数据),* 会得到一个Future对象。这个对象可以用来在未来某个时刻获取操作的结果。* 3:Promise是Future的一个子接口,它不仅代表了异步操作的结果,* 还可以被操作的执行者显式地标记为成功或失败。* 4:在Netty中,Promise用于在操作完成时设置操作的结果(成功或失败)。* 这是一个写入结果的Future。***/
@Data
@NoArgsConstructor
public class RpcPromise<T>  extends DefaultPromise<T>
{//private Promise<T> promise;////public RpcPromise(Promise<T> promise) {//    this.promise = promise;//}/*** 思考一下* 1: 我的代码在这里是异步处理的返回结果* 2: 什么时候这个返回结果可以被设置值?* 3: 应该就是在我client接收到server的返回值的时候* 4: 也就是说我可以再clienthandler里面添加一个对promise的处理* 5: 也就是说我得有一个cache一样的东西能缓存我的promise* 6: 然后再client得到数据的时候去设置promise的值* 7: 不论成功失败都如main函数里面一样操作就行* 8: promise应该也要被server去使用* 9: cache应该是map结构* @param args*/public static void main(String[] args) throws ExecutionException, InterruptedException {//1:使用Promise作为属性//RpcPromise<RpcResponse> promise1=new RpcPromise<>//        (new DefaultPromise<RpcResponse>//                (new DefaultEventLoop()));//promise1.promise.setSuccess(new RpcResponse());//promise1.setSuccess(new RpcResponse());//第二种方式 直接用原生defaultpromiseRpcPromise promise = new RpcPromise();promise.setSuccess("success");promise.get();}
}

到此为止,我想我们就已经顺利的完成了对于客户端的代理请求的处理。
总结一下完整流程:
NettyRpcClientHandler 设置 Promise 的状态

  1. 接收响应: 当RPC响应从服务器端返回时,Netty通过我设置的pipeline中的RpcDecoder解码这个响应,并将其传递到RpcClientHandler。
  2. 处理响应: 在RpcClientHandler的channelRead0方法中,代码处理接收到的响应。这个方法首先从响应消息中提取出请求ID。
  3. 查找对应的 Promise: 使用这个请求ID,RpcClientHandler从CACHE中查找之前存储的与该请求ID对应的Promise。
  4. 设置Promise的状态: 一旦找到相应的RpcFuture,RpcClientHandler就会调promise.setSuccess(msg.getData()),将Promise的状态设置为成功,并附上从响应中获取的数据。如果在处理响应的过程中发生了错误,也可能会调用setFailure方法来标记Promise为失败,并传递错误信息(懒得失败了)。

服务端Handler

服务端的Handler要做的事情也很简单,其实就是拿到请求数据之后,通过反射的方式去调用我们本地的方法即可。
这里按照我之前的思路,先编写一个RPC服务方法的缓存,缓存所有的服务信息,然后到时候服务端接收到客户端请求的时候,先从缓存中判断是否存在有方法可以被调用。

package blossom.project.rpc.core.proxy.spring.rpcmethod;import blossom.project.rpc.core.entity.RpcRequest;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;/*** @author: ZhangBlossom* @date: 2023/12/18 22:10* @contact: QQ:4602197553* @contact: WX:qczjhczs0114* @blog: https://blog.csdn.net/Zhangsama1* @github: https://github.com/ZhangBlossom* RpcServiceMethodCache类*/
public class RpcServiceMethodCache {/*** rpc方法cache* 规则:使用 class.getClass()+"."+methodName的方式保存方法路径*/public static Map<String, RpcServiceMethod> METHOD_CACHE =new ConcurrentHashMap<>();/*** 使用饿汉式单例*/private static RpcServiceMethodCache INSTANCE = new RpcServiceMethodCache();private RpcServiceMethodCache(){}public static RpcServiceMethodCache getInstance(){return INSTANCE;}/*** 当前方法用于调用rpcmethod* 这里的invoke方法最终目的就是真正的去调用client发送过来的rpc请求,* 从cache里面拿到那些有注解的rpc方法即可* @param request* @return*/public Object rpcMethodInvoke(RpcRequest request){String key=request.getClassName()+"."+request.getMethodName();RpcServiceMethod rpcServiceMethod= METHOD_CACHE.get(key);if(Objects.isNull(rpcServiceMethod)){return null;}Object methodPath =rpcServiceMethod.getMethodPath();Method method=rpcServiceMethod.getMethod();try {return method.invoke(methodPath,request.getParams());} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}return null;}}

然后接下来我们就要考虑使用哪几种反射方法来进行方法的调用了。
很容易想到的三种方法。

  1. jdk动态代理
  2. cglib动态代理
  3. spring容器获取bean反射代理
    再源码中,我对这三种方法都进行了实现。不过最简单的肯定还是用反射了,简单量小。
package blossom.project.rpc.core.netty.handler;import blossom.project.rpc.common.enums.ReqTypeEnum;
import blossom.project.rpc.core.entity.RpcDto;
import blossom.project.rpc.core.entity.RpcHeader;
import blossom.project.rpc.core.entity.RpcRequest;
import blossom.project.rpc.core.entity.RpcResponse;
import blossom.project.rpc.core.proxy.spring.rpcmethod.RpcServiceMethodCache;
import blossom.project.rpc.core.proxy.spring.server.SpringRpcProxy;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;/*** @author: ZhangBlossom* @date: 2023/12/16 19:43* @contact: QQ:4602197553* @contact: WX:qczjhczs0114* @blog: https://blog.csdn.net/Zhangsama1* @github: https://github.com/ZhangBlossom* NettyRpcServerHandler类* 1:服务端接收到请求数据之后,需要进行解析* 2:解析后确定具体要调用的请求服务是哪一个* 2.1:这里应该要用到动态代理了* 2.2:分析使用那种动态代理 JDK/CGLIB/SpringIoC* 2.3:分析这三种方法的代码实现* 1:对于JDK直接用正常的反射* 2:对于CGLIB那么就是走CGLIB的常规写法* 3:对于Spring就要考虑把这些类存到容器中,* 然后要使用的时候从容器中进行获取*/
public class NettyRpcServerHandler extends SimpleChannelInboundHandler<RpcDto<RpcRequest>> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcDto<RpcRequest> msg) throws Exception {RpcHeader header = msg.getHeader();//当前是响应数据header.setReqType(ReqTypeEnum.RESPONSE.getCode());//使用反射的方式在运行时调用对应的类的方法//这里你可以思考一下用什么方式可以最快的找到我想要的类并且调用方法//目前我提供了:JDK CGLIB SpringIOC容器 HashMap自制工厂//Object data = SpringRpcProxy.invoke(msg.getData());//使用JDK动态代理//Object data = RpcInvocationHandler.invoke(msg.getData());//使用CGLIB动态代理//Object data = RpcCglibProxy.invoke(msg.getData());//使用封装好的rpc对象去发送请求Object data = RpcServiceMethodCache.getInstance().rpcMethodInvoke(msg.getData());RpcDto<RpcResponse> dto = new RpcDto();RpcResponse response = new RpcResponse();response.setData(data);response.setMsg("success!!!");dto.setData(response);dto.setHeader(header);//写出数据ctx.writeAndFlush(dto);}}

至此,我们就完成了服务端接收到请求并反射调用本地方法之后得到返回数据,并将返回数据返回给客户端的代码。

到此为止,RPC项目中最重要的几个功能我们其实就都完成了。
接下来的注册中心模块其实就只是一个简单的锦上添花了。
如果不使用注册中心,那么其实直接再项目服务启动的时候,通过application.yml文件的方式对项目的ip/port进行配置即可,然后直接从固定ip/port拿到数据即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/781898.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

接口自动化框架搭建(一):框架介绍

1&#xff0c;背景目的 最近在搭建接口自动化框架&#xff0c;打算写个专栏&#xff0c;方便自己查找和他人学习。有不正确的地方&#xff0c;欢迎指正。 2&#xff0c;技术框架 pythonpytestalllurejenkins多进程钉钉消息通知 3&#xff0c;版本 推荐方法&#xff1a;创建…

【数据结构与算法】排序

概述 最容易想到的排序&#xff0c;从旧数组中找到一个最小的&#xff0c;不断放入新的数组中。&#xff08;不使用数组的slice等方法是因为会使效率变得更慢&#xff09; let arr [3, 5, 6, 7, 1, 2, 4, 9, 8]function getMin(arr) {if (arr null || arr.length 0) retur…

Oracle VM(虚拟机)性能监控工具

Oracle VM是一个独立的虚拟化环境&#xff0c;由 Oracle 提供支持和设计&#xff0c;旨在为运行虚拟机提供轻量级、安全的基于服务器的平台。Oracle VM 能够在受支持的虚拟化环境中部署操作系统和应用软件&#xff0c;Oracle VM 将用户和管理员与底层虚拟化技术隔离开来&#x…

cmake编译faiss源码记录

文章目录 简介下载源码安装环境编译 简介 Faiss&#xff08;Facebook AI Similarity Search&#xff09;是一个由Facebook AI研发并开源的&#xff0c;用于大规模向量检索的库。其核心算法采用了聚类、压缩和量化等技术&#xff0c;以优化最近邻搜索的效率和准确性。Faiss提供…

ctfshow xxe web373-378

web373 libxml_disable_entity_loader(false)&#xff1a;这行代码用于启用实体加载器&#xff0c;允许加载外部实体。 $xmlfile file_get_contents(php://input)&#xff1a;从输入流中读取XML数据并存储在 $xmlfile 变量中。 $dom->loadXML($xmlfile, LIBXML_NOENT |…

异常信息泄露 应用程序错误泄露 原理以及修复方法

漏洞名称&#xff1a;未自定义统一错误页面导致信息泄露&#xff0c;抛出异常信息泄露&#xff0c;错误详情信息泄漏&#xff0c;AWVS漏洞名称如下&#xff1a; Application error message Error message on page ASP.NET error message 漏洞描述&#xff1a;攻击者可通过构造…

FebHost:意大利.IT域名一张意大利网络名片

.IT域名是意大利的国家顶级域名&#xff0c;对于意大利企业和个人而言,拥有一个属于自己的”.IT”域名无疑是件令人自豪的事。这个被誉为意大利互联网标志性代表的域名,不仅隐含着浓厚的意大利文化特色,还为使用者在当地市场的推广铺平了道路。 对于那些希望在意大利市场建立强…

微信开发者工具接入短剧播放器插件

接入短剧播放插线 申请添加插件基础接入app.jsonapp.jsplayerManager.js数据加密跳转到播放器页面运行出错示例小程序页面页面使用的方法小程序输入框绑定申请添加插件 添加插件:登录微信开发者平台 ——> 设置 ——> 第三方设置 ——> 插件管理 ——> 搜索“短剧…

基于SpringBoot + Vue实现的养老院管理系统设计与实现+毕业论文(12000字)+搭建视频

介绍 养老院管理系统是一款运用软件开发技术设计实现的应用系统&#xff0c;在信息处理上可以达到快速的目的&#xff0c;不管是针对数据添加&#xff0c;数据维护和统计&#xff0c;以及数据查询等处理要求&#xff0c;养老院管理系统都可以轻松应对。 系统包含登录、注册、…

蓝桥杯每日不知道多少题之更小的数

题目链接&#xff1a;更小的数 - 洛谷 P9232 - Virtual Judge (vjudge.net) 解题思路&#xff1a;这个题看数据量是可以过n方的算法的&#xff0c;首先考虑dp&#xff0c;那么则是两层for循环&#xff0c;即枚举所有情况&#xff0c;那么怎么进行状态转移呢&#xff0c;当 s[i…

Linux 系统 docker搭建LNMP环境

1、安装nginx docker pull nginx (默认安装的是最新版本) 2、运行nginx docker run --name nginx -p 80:80 -d nginx:latest 备注&#xff1a;--name nginx 表示容器名为 nginx -d 表示后台运行 -p 80:80 表示把本地80端口绑定到Nginx服务端的 80端口 nginx:lates…

【YOLOv5改进系列(9)】高效涨点----使用CAM(上下文增强模块)替换掉yolov5中的SPPF模块

文章目录 &#x1f680;&#x1f680;&#x1f680;前言一、1️⃣ CAM模块详细介绍二、2️⃣CAM模块的三种融合模式三、3️⃣如何添加CAM模块3.1 &#x1f393; 添加CAM模块代码3.2 ✨添加yolov5s_CAM.yaml文件3.3 ⭐️修改yolo.py文相关文件 四、4️⃣实验结果4.1 &#x1f39…

【阅读笔记】《你的第一本博弈论》

博弈论入门书&#xff0c;很多例子方便理解 副标题: 用博弈论解决工作和生活的难题 作者&#xff1a;欧俊 笔记 CH1 博弈论&#xff1a;最高级的思维和生存策略 博弈的分类&#xff1a; • 负和博弈 • 零和博弈 • 正和博弈 博弈论带给我们的启示&#xff1a; • 要会选择 …

Leetcode 82. 删除排序链表中的重复元素 II

给定一个已排序的链表的头 head &#xff0c; 删除原始链表中所有重复数字的节点&#xff0c;只留下不同的数字 。返回 已排序的链表 。 输入&#xff1a;head [1,2,3,3,4,4,5] 输出&#xff1a;[1,2,5] 提示&#xff1a; 链表中节点数目在范围 [0, 300] 内 -100 < Node.…

一些实用的功能函数

1. 【算法】求两个数中&#xff0c;bit位不同的个数&#xff08;在计网那道题中用过&#xff09; 解法&#xff1a;首先把两位数异或&#xff0c;得到的结果&#xff0c;通过自身跟自身-1相与&#xff0c;直到等于0为止 代码如下&#xff1a; int calculateNotSam(int a,int …

uniapp怎么使用接口返回的iconfont图标

uniapp怎么使用接口返回的iconfont图标 首先在你的项目中添加该图标&#xff0c;名称要对应 实际应用 item.ICONFONT_NAME“tools”; item.ICONFONT_COLOR“FA5151”; <view class"iconfont" :class"icon-item.ICONFONT_NAME" :color"item.ICON…

ORACLE 存中文

笔记 oracle 存中文要用 nvarchar2 &#xff0c;涉及长度校验 NVARCHAR2和VARCHAR2的区别&#xff0c;从使用角度来看区别在于&#xff1a; NVARCHAR2在计算长度时和字符集相关&#xff0c; 例如数据库是中文字符集时&#xff0c;以长度10为例&#xff0c;则 NVARCHAR2(10)可…

Kubernetes Pod深度解析:构建可靠微服务的秘密武器(上)

&#x1f407;明明跟你说过&#xff1a;个人主页 &#x1f3c5;个人专栏&#xff1a;《Kubernetes航线图&#xff1a;从船长到K8s掌舵者》 &#x1f3c5; &#x1f516;行路有良友&#xff0c;便是天堂&#x1f516; 目录 一、引言 1、Kubernetes概述 2、Pod概述 二、Po…

输出单链表倒数第K个结点值

方法一&#xff1a; 两次遍历链表。第一次遍历&#xff0c;计算链表长度&#xff0c;然后计算链表倒数第m个结点的正数位置k&#xff0c;判断位置是否合法&#xff0c;如果不合法&#xff0c;输出NOT FOUND&#xff0c;否则&#xff0c;进行第二次遍历链表&#xff0c;查找链表…

手写SpringBoot(三)之自动配置

系列文章目录 手写SpringBoot&#xff08;一&#xff09;之简易版SpringBoot 手写SpringBoot&#xff08;二&#xff09;之动态切换Servlet容器 手写SpringBoot&#xff08;三&#xff09;之自动配置 手写SpringBoot&#xff08;四&#xff09;之bean动态加载 手写SpringBoot…