netty基础_12.用 Netty 自己实现简单的RPC

用 Netty 自己实现简单的RPC

    • RPC 基本介绍
    • 我们的RPC 调用流程图
    • 己实现 Dubbo RPC(基于 Netty)
      • 需求说明
      • 设计说明
      • 代码
        • 封装的RPC
          • NettyServer
          • NettyServerHandler
          • NettyClientHandler
          • NettyClient
        • 接口
        • 服务端(provider)
          • HelloServiceImpl
          • ServerBootstrap
        • 客户端(消费者)
      • 调用过程
      • 效果

RPC 基本介绍

  1. RPC(Remote Procedure Call)—远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
  2. 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样(如图)

在这里插入图片描述

过程:

  1. 调用者(Caller),调用远程API(Remote API)

  2. 调用远程API会通过一个RPC代理(RpcProxy)

  3. RPC代理再去调用RpcInvoker(这个是PRC的调用者)

  4. RpcInvoker通过RPC连接器(RpcConnector)

  5. RPC连接器用两台机器规定好的PRC协议(RpcProtocol)把数据进行编码

  6. 接着RPC连接器通过RpcChannel通道发送到对方的PRC接收器(RpcAcceptor)

  7. PRC接收器通过PRC协议进行解码拿到数据

  8. 然后将数据传给RpcProcessor

  9. RpcProcessor再传给RpcInvoker

  10. RpcInvoker调用Remote API

  11. 最后推给被调用者(Callee)

  12. 常见的 RPC 框架有:比较知名的如阿里的 DubboGooglegRPCGo 语言的 rpcxApachethriftSpring 旗下的 SpringCloud

我们的RPC 调用流程图

在这里插入图片描述

RPC 调用流程说明

  1. 服务消费方(client)以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给 server stub
  7. server stub 将返回导入结果进行编码并发送至消费方
  8. client stub 接收到消息并进行解码
  9. 服务消费方(client)得到结果

小结:RPC 的目标就是将 2 - 8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用

己实现 Dubbo RPC(基于 Netty)

需求说明

  1. Dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架
  2. 模仿 Dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty 4.1.20

设计说明

  1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
  2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
  3. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据
  4. 开发的分析图

在这里插入图片描述

代码

封装的RPC

可以把这块代码理解成封装的dubbo

NettyServer
package com.atguigu.netty.dubborpc.netty;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;public class NettyServer {public static void startServer(String hostName, int port) {startServer0(hostName,port);}//编写一个方法,完成对NettyServer的初始化和启动private static void startServer0(String hostname, int port) {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyServerHandler()); //业务处理器}});ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();System.out.println("服务提供方开始提供服务~~");channelFuture.channel().closeFuture().sync();}catch (Exception e) {e.printStackTrace();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
NettyServerHandler
package com.atguigu.netty.dubborpc.netty;import com.atguigu.netty.dubborpc.customer.ClientBootstrap;
import com.atguigu.netty.dubborpc.provider.HelloServiceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;//服务器这边handler比较简单
public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("---服务端开始收到来自客户单的消息---");//获取客户端发送的消息,并调用服务System.out.println("原始消息:" + msg);/*1.客户端在调用服务器的api 时,我们需要定义一个协议,比如我们要求 每次发消息是都必须以某个字符串开头 "HelloService#hello#你好"2.Dubbo注册在Zookeeper里时,这种就是类的全路径字符串,你用IDEA的zookeeper插件就可以清楚地看到*/if(msg.toString().startsWith(ClientBootstrap.providerName)) {String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));ctx.writeAndFlush(result);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}
NettyClientHandler
package com.atguigu.netty.dubborpc.netty;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.concurrent.Callable;public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {private ChannelHandlerContext context;//上下文private String result; //返回的结果private String para; //客户端调用方法时,传入的参数//与服务器的连接创建后,就会被调用, 这个方法是第一个被调用(1)@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println(" channelActive 被调用  ");context = ctx; //因为我们在其它方法会使用到 ctx}//收到服务器的数据后,调用方法 (4)//@Overridepublic synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(" channelRead 被调用  ");result = msg.toString();notify(); //唤醒等待的线程}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}//被代理对象调用, 发送数据给服务器,-> wait -> 等待被唤醒(channelRead) -> 返回结果 (3)-》5@Overridepublic synchronized Object call() throws Exception {System.out.println(" call1 被调用  ");context.writeAndFlush(para);//进行waitwait(); //等待channelRead 方法获取到服务器的结果后,唤醒System.out.println(" call2 被调用  ");return  result; //服务方返回的结果}//(2)void setPara(String para) {System.out.println(" setPara  ");this.para = para;}
}
NettyClient
package com.atguigu.netty.dubborpc.netty;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;import java.lang.reflect.Proxy;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class NettyClient {//创建线程池private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private static NettyClientHandler client;private int count = 0;//编写方法使用代理模式,获取一个代理对象public Object getBean(final Class<?> serivceClass, final String providerName) {return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serivceClass}, (proxy, method, args) -> {System.out.println("(proxy, method, args) 进入...." + (++count) + " 次");//{}  部分的代码,客户端每调用一次 hello, 就会进入到该代码if (client == null) {initClient();}//设置要发给服务器端的信息//providerName:协议头,args[0]:就是客户端要发送给服务端的数据client.setPara(providerName + args[0]);//return executor.submit(client).get();});}//初始化客户端private static void initClient() {client = new NettyClientHandler();//创建EventLoopGroupNioEventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(client);}});try {bootstrap.connect("127.0.0.1", 7000).sync();} catch (Exception e) {e.printStackTrace();}}
}
接口
package com.atguigu.netty.dubborpc.publicinterface;//这个是接口,是服务提供方和 服务消费方都需要
public interface HelloService {String hello(String mes);
}
服务端(provider)
HelloServiceImpl
package com.atguigu.netty.dubborpc.provider;import com.atguigu.netty.dubborpc.publicinterface.HelloService;public class HelloServiceImpl implements HelloService{private static int count = 0;//当有消费方调用该方法时, 就返回一个结果@Overridepublic String hello(String mes) {System.out.println("收到客户端消息=" + mes);System.out.println();//根据mes 返回不同的结果if(mes != null) {return "你好客户端, 我已经收到你的消息。消息为:[" + mes + "] ,第" + (++count) + " 次 \n";} else {return "你好客户端, 我已经收到你的消息 ";}}
}
ServerBootstrap
package com.atguigu.netty.dubborpc.provider;import com.atguigu.netty.dubborpc.netty.NettyServer;//ServerBootstrap 会启动一个服务提供者,就是 NettyServer
public class ServerBootstrap {public static void main(String[] args) {//代码代填..NettyServer.startServer("127.0.0.1", 7000);}
}
客户端(消费者)
package com.atguigu.netty.dubborpc.customer;import com.atguigu.netty.dubborpc.netty.NettyClient;
import com.atguigu.netty.dubborpc.publicinterface.HelloService;public class ClientBootstrap {//这里定义协议头public static final String providerName = "HelloService#hello#";public static void main(String[] args) throws  Exception{//创建一个消费者NettyClient customer = new NettyClient();//创建代理对象HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);for (;; ) {Thread.sleep(2 * 1000);//通过代理对象调用服务提供者的方法(服务)String res = service.hello("你好 dubbo~");System.out.println("调用的结果 res= " + res);}}
}

调用过程

  1. ClientBootstrap#main发起调用
  2. 走到下面这一行代码后
 HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
  1. 调用NettyClient#getBean,在此方法里与服务端建立链接。

  2. 于是就执行NettyClientHandler#channelActive

  3. 接着回到NettyClient#getBean调用NettyClientHandler#setPara,调用完之后再回到NettyClient#getBean,用线程池提交任务

  4. 因为用线程池提交了任务,就准备执行NettyClientHandler#call线程任务

  5. NettyClientHandler#call中发送数据给服务提供者

    context.writeAndFlush(para);
    

    由于还没收到服务提供者的数据结果,所以wait住

  6. 来到了服务提供者这边,从Socket通道中收到了数据,所以执行NettyServerHandler#channelRead,然后因为此方法中执行了

    String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
    
  7. 就去HelloServiceImpl#hello中执行业务逻辑,返回数据给NettyServerHandler#channelReadNettyServerHandler#channelRead再把数据发给客户端

  8. NettyClientHandler#channelRead收到服务提供者发来的数据,唤醒之前wait的线程

  9. 所以之前wait的线程从NettyClientHandler#call苏醒,返回result给NettyClient#getBean

  10. NettyClient#getBeanget()到数据,ClientBootstrap#main中的此函数调用返回,得到服务端提供的数据。

     String res = service.hello("你好 dubbo~");
    

13.至此,一次RPC调用结束。

效果

ClientBootstrap打印

(proxy, method, args) 进入....1 次setPara  channelActive 被调用  call1 被调用  channelRead 被调用  call2 被调用  
调用的结果 res= 你好客户端, 我已经收到你的消息。消息为:[你好 dubbo~] ,第1(proxy, method, args) 进入....2 次setPara  call1 被调用  channelRead 被调用  call2 被调用  
调用的结果 res= 你好客户端, 我已经收到你的消息。消息为:[你好 dubbo~] ,第2(proxy, method, args) 进入....3 次setPara  call1 被调用  channelRead 被调用  call2 被调用  
调用的结果 res= 你好客户端, 我已经收到你的消息。消息为:[你好 dubbo~] ,第3(proxy, method, args) 进入....4 次setPara  call1 被调用  channelRead 被调用  call2 被调用  
调用的结果 res= 你好客户端, 我已经收到你的消息。消息为:[你好 dubbo~] ,第4(proxy, method, args) 进入....5 次setPara  call1 被调用  channelRead 被调用  call2 被调用  
调用的结果 res= 你好客户端, 我已经收到你的消息。消息为:[你好 dubbo~] ,第5

ServerBootstrap打印

服务提供方开始提供服务~~
---服务端开始收到来自客户单的消息---
原始消息:HelloService#hello#你好 dubbo~
收到客户端消息=你好 dubbo~---服务端开始收到来自客户单的消息---
原始消息:HelloService#hello#你好 dubbo~
收到客户端消息=你好 dubbo~---服务端开始收到来自客户单的消息---
原始消息:HelloService#hello#你好 dubbo~
收到客户端消息=你好 dubbo~---服务端开始收到来自客户单的消息---
原始消息:HelloService#hello#你好 dubbo~
收到客户端消息=你好 dubbo~---服务端开始收到来自客户单的消息---
原始消息:HelloService#hello#你好 dubbo~
收到客户端消息=你好 dubbo~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/758718.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第四百一十四回

文章目录 1. 概念介绍2. 思路与方法2.1 实现思路2.2 实现方法 3. 示例代码4. 内容总结 我们在上一章回中介绍了"自定义标题栏"相关的内容&#xff0c;本章回中将介绍自定义Action菜单.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. 概念介绍 我们在这里提到的…

【呼市经开区建设服务项目水、电能耗监测 数采案例】

实施方案 针对能耗采集中的水、电能源数据采集&#xff0c;因客观因素条件&#xff0c;数据采集方面存在较大难度。大多数国网电表485接口由于封签限制&#xff0c;不能实施采集&#xff0c;不让拆机接线&#xff0c;采集实施存在困难。水量能耗采集&#xff0c;存在类似问题&a…

腾讯云GPU服务器深度计算怎么收费?1小时、一个月和一年报价

腾讯云GPU服务器怎么收费&#xff1f;GPU服务器1小时多少钱&#xff1f;一个月收费价格表和一年费用标准&#xff0c;腾讯云百科txybk.com分享腾讯云GPU服务器GPU计算型GN10Xp、GPU服务器GN7、GPU渲染型 GN7vw等GPU实例费用价格&#xff0c;以及NVIDIA Tesla T4 GPU卡和V100详细…

Jmeter Ultimate Thread Group 和 Stepping Thread Group

线程组&#xff1a;使用复杂场景的性能测试 有时候我们做性能测试时&#xff0c;只依靠自带的线程组&#xff0c;显示满足不了性能测试中比较复杂的场景&#xff0c;下面这两种线程组可以帮助你很好的完成复杂的场景 第一种&#xff1a;Stepping Thread Group 在取样器错误后…

Socket类

2.2 Socket类 Socket 类&#xff1a;该类实现客户端套接字&#xff0c;套接字指的是两台设备之间通讯的端点。 构造方法 public Socket(String host, int port) :创建套接字对象并将其连接到指定主机上的指定端口号。如果指定的host是null &#xff0c;则相当于指定地址为回送…

Appium —— 移动应用自动化测试开源工具!

Appium介绍 Appium是一个用于自动化移动应用程序的开源工具&#xff0c;它支持iOS和Android平台。通过Appium&#xff0c;开发人员可以使用各种编程语言&#xff08;如Java、Python、Ruby等&#xff09;编写测试脚本&#xff0c;以自动化测试移动应用程序的功能和用户界面。Ap…

基于springboot+vue的小区团购管理

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

如何在Windows系统使用VS Code制作游戏网页并实现无公网IP远程访问

文章目录 前言1. 编写MENJA小游戏2. 安装cpolar内网穿透3. 配置MENJA小游戏公网访问地址4. 实现公网访问MENJA小游戏5. 固定MENJA小游戏公网地址 前言 本篇教程&#xff0c;我们将通过VS Code实现远程开发MENJA小游戏&#xff0c;并通过cpolar内网穿透发布到公网&#xff0c;分…

《操作系统实践-基于Linux应用与内核编程》第10章--实验 Qt聊天程序

前言: 内容参考《操作系统实践-基于Linux应用与内核编程》一书的示例代码和教材内容&#xff0c;所做的读书笔记。本文记录再这里按照书中示例做一遍代码编程实践加深对操作系统的理解。 引用: 《操作系统实践-基于Linux应用与内核编程》 作者&#xff1a;房胜、李旭健、黄…

微信小程序调试、断点调试

1、wxml 查看对应的页面组件 2、console面板可以用来打印信息 3、sources 用来断点调试 4、network面板用来调试接口 5、storage面板 可以查看每个key对应的value内容&#xff0c;这些数据在用户使用小程序时被持久化保存在本地。

【mac M3】idea删除不用或者失效的jdk

【mac M3】idea删除不用或者失效的jdk 不用&#xff08;重复&#xff09;或者失效的jdk如下&#xff1a; 重复或者已失效的JDK版本出现在下拉列表中不仅影响美观&#xff0c;也影响效率&#xff0c;删除jdk的步骤如下&#xff1a; 步骤1.点击File 步骤2.选择Project Structure…

【C语言】文件操作揭秘:C语言中文件的顺序读写、随机读写、判断文件结束和文件缓冲区详细解析【图文详解】

欢迎来CILMY23的博客喔&#xff0c;本篇为【C语言】文件操作揭秘&#xff1a;C语言中文件的顺序读写、随机读写、判断文件结束和文件缓冲区详细解析【图文详解】&#xff0c;感谢观看&#xff0c;支持的可以给个一键三连&#xff0c;点赞关注收藏。 前言 欢迎来到本篇博客&…

Java AOP 简单实例演示

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

WT32-ETH02 plus 串口转以太网开发,WT32-ETH01网关开发板升级款!

广受欢迎的WT32-ETH01网关开发板迎来了升级。 就是这款启明云端新推出的嵌入式串口转以太网开发板——WT32-ETH02 plus。应广大客户的需求&#xff0c;在WT32-ETH01的基础上增加了POE供电&#xff0c;可广泛应用于智能家居和网关等应用。开发板搭载2.4GHz Wi-Fi和蓝牙双模的SO…

一键部署灵境矩阵,属于自己的ai智能平台。

灵境矩阵 | 想象即现实 “灵境杯”智能体创意大赛&#xff0c;瓜分百万超级奖励 打造专属AI智能平台&#xff1a;一键部署灵境矩阵的无限可能 在数字化浪潮席卷全球的今天&#xff0c;人工智能技术已逐渐成为推动社会进步的关键力量。面对这一趋势&#xff0c;许多企业和个人…

永续合约多空双开“戴套”策略的逻辑是什么,胜率惊人的96%是怎么做到的,其实并没有想的那么复杂,会代码的都可以写出来

为什么叫多空双开“戴套”量化策略呢&#xff0c;因为这个策略的特点是永远有一个仓位是被套的&#xff0c;但是这个不影响我们盈利&#xff0c;具体怎么实现大家看下面这个图就明白是怎么回事了。 这个策略的逻辑很简单也容易理解&#xff0c;就是多空双开&#xff0c;盈利平仓…

FREERTOS空闲任务和低功耗

空闲任务 空闲任务是 FreeRTOS 必不可少的一个任务&#xff0c;其他 RTOS 类系统也有空闲任务&#xff0c;比如uC/OS。看名字就知道&#xff0c;空闲任务是处理器空闲的时候去运行的一个任务&#xff0c;当系统中没有其他就绪任务的时候空闲任务就会开始运行&#xff0c;空闲任…

slab分配器

什么是slab分配器&#xff1f; 用户态程序可以使用malloc及其在C标准库中的相关函数申请内存&#xff1b;内核也需要经常分配内存&#xff0c;但无法使用标准库函数&#xff1b;linux内核中&#xff0c;伙伴分配器是一种页分配器&#xff0c;是以页为单位的&#xff0c;但这个…

基于 Echarts + Python Flask ,我搭建了一个动态实时大屏监管系统

一、效果展示 1. 动态实时更新数据效果图 2. 鼠标右键切换主题 二、确定需求方案 支持Windows、Linux、Mac等各种主流操作系统&#xff1b;支持主流浏览器Chrome&#xff0c;Microsoft Edge&#xff0c;360等&#xff1b;服务器采用python语言编写&#xff0c;配置好python环…

计算机设计大赛 题目: 基于深度学习的疲劳驾驶检测 深度学习

文章目录 0 前言1 课题背景2 实现目标3 当前市面上疲劳驾驶检测的方法4 相关数据集5 基于头部姿态的驾驶疲劳检测5.1 如何确定疲劳状态5.2 算法步骤5.3 打瞌睡判断 6 基于CNN与SVM的疲劳检测方法6.1 网络结构6.2 疲劳图像分类训练6.3 训练结果 7 最后 0 前言 &#x1f525; 优…