一、RPC 的基本介绍
RPC (Remote Procedure Call) 远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外的为这个交互编程。也就是说可以达到两个或者多个应用程序部署在不同的服务器上,他们之间的调用都像是本地方法调用一样。RPC 的调用如下图。
常用的RPC 框架有阿里的dubbo,Google的gRPC,Go 语言的rpcx,Apache的thrift,Spring的Spring Cloud.
若想了解dubbo与Spring Cloud的区别参考:SpringCloud 与 Dubbo 的区别,终于有人讲明白了...-腾讯云开发者社区-腾讯云
二、RPC 调用的过程
在RPC 中,Client 端叫做服务消费者,Server 叫做服务提供者。
调用流程说明
- 服务消费方(client)以本地调用方式调用服务
- client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
- client stub 将消息进行编码并发送到服务端
- server stub 接收到消息后进行解码
- server stub 根据解码结果调用本地的服务
- 本地服务执行并将结果返回给server stub
- server stub 将返回导入结果进行编码并发送给消费方
- client stub 接收到消息并进行解码
- 服务消费方(client) 得到结果
- 其中,RPC 框架的目标就是把2-8 这些步骤封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。
三、dubbo RPC
1.需求说明
dubbo 底层使用了Netty 作为网络通信框架,要求用netty 实现一个简单的RPC框架。
模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信给予Netty 4.x。
2.设计说明
创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用netty请求提供者返回数据。 开发的分析图如下:
3.代码实现
netty用的包:4.1.20.Final。pom.xml如下:
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.20.Final</version>
</dependency>
1)公共接口
/*** @author: fqtang* @date: 2024/05/05/21:51* @description: 服务提供方和服务消费方都需要*/
public interface HelloService {String say(String mes);
}
2)公共接口实现类
import org.springframework.util.StringUtils;
import com.tfq.netty.dubborpc.publicinterface.HelloService;/*** @author: fqtang* @date: 2024/05/05/21:53* @description: 描述*/
public class HelloServiceImpl implements HelloService {private static int count = 0;/*** 当有消费方调用该方法时就返回一个结果** @param mes 传入消息* @return 返回结果*/@Overridepublic String say(String mes) {System.out.println("收到客户端消息=" + mes);if(StringUtils.isEmpty(mes)) {return "你好客户端,我已经收到你的消息 ";}else{return "你好客户端,我已经收到你的消息:【" + mes+"】,第 "+(++count)+"次。";}}
}
3)服务提供者
import com.tfq.netty.dubborpc.netty.NettyServer;/*** @author: fqtang* @date: 2024/05/05/21:57* @description: 启动服务提供者,就是NettyServer*/
public class ServerBootstrap {public static void main(String[] args) {String hostName="127.0.0.1";int port = 8001;NettyServer.startServer(hostName,port);}}import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;/*** @author: fqtang* @date: 2024/05/05/21:59* @description: 描述*/
public class NettyServer {public static void startServer(String hostName,int port){startServer0(hostName,port);}/*** 编写一个方法,完成对Netty Server的初始化工作和启动* @param hostName* @param port*/private static void startServer0(String hostName,int port){EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try{ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(hostName,port).sync();System.out.println("服务提供方开始提供服务~~~");channelFuture.channel().closeFuture().sync();}catch(Exception e){e.printStackTrace();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import com.tfq.netty.dubborpc.consumer.ClientBootstrap;
import com.tfq.netty.dubborpc.provider.HelloServiceImpl;/*** @author: fqtang* @date: 2024/05/05/22:03* @description: 描述*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//获取客户端调用的消息,并调用服务System.out.println("msg = " + msg);//客户端在调用服务器的时候,需要定义一个协议。比如我们要求每次发消息时,都必须以某个字符器开头//比如:dubboserver#hello#xxxxif(msg.toString().startsWith(ClientBootstrap.ProtocolHeader)) {String res = new HelloServiceImpl().say(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));ctx.writeAndFlush(res);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}
4)消费者
import com.tfq.netty.dubborpc.netty.NettyClient;
import com.tfq.netty.dubborpc.publicinterface.HelloService;/*** @author: fqtang* @date: 2024/05/05/23:26* @description: 消费者*/
public class ClientBootstrap {/*** 这里定义协议头*/public static final String ProtocolHeader = "dubboserver#say#";public static void main(String[] args) throws InterruptedException {//创建一个消费者NettyClient customer = new NettyClient();//创建代理对象HelloService helloService = (HelloService) customer.getBean(HelloService.class, ProtocolHeader);while(true) {Thread.sleep(10 * 1000);//通过代理对象调用提供者的方法(服务)String res = helloService.say("你好 dubbo~");System.out.println("调用的结果 res = " + res);}}
}import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;/*** @author: fqtang* @date: 2024/05/05/23:04* @description: 描述*/
public class NettyClient {//创建一个线程池private static ExecutorService executorService = Executors.newFixedThreadPool(2);private static NettyClientHandler clientHandler;/*** 编写方法使用代理模式,获取一个代理对象* @param serviceClass* @param protocolHeader* @return*/public Object getBean(final Class<?> serviceClass, final String protocolHeader) {return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serviceClass}, (proxy, method, args) -> {if(clientHandler == null) {initClient("127.0.0.1", 8001);}//设置要发送给服务器端的信息,protocolHeader为协议头[dubboserver#hello#],//args[0] 就是客户端调用api say(???),参数clientHandler.setParam(protocolHeader + args[0]);return executorService.submit(clientHandler).get();});}private static void initClient(String hostName, int port) {EventLoopGroup worker = new NioEventLoopGroup();try {clientHandler = new NettyClientHandler();Bootstrap bootstrap = new Bootstrap();bootstrap.group(worker).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline channelPipeline = ch.pipeline();channelPipeline.addLast(new StringDecoder());channelPipeline.addLast(new StringEncoder());channelPipeline.addLast(clientHandler);}});ChannelFuture channelFuture = bootstrap.connect(hostName, port).sync();/*channelFuture.channel().closeFuture().sync();*/} catch(InterruptedException e) {e.printStackTrace();} /*finally {worker.shutdownGracefully();}*/}
}package com.tfq.netty.dubborpc.netty;import java.util.concurrent.Callable;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;/*** @author: fqtang* @date: 2024/05/05/22:48* @description: 描述*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {private ChannelHandlerContext context;/*** 返回的结果*/private String result;/*** 客户端调用方法返回的参数*/private String param;/*** 与服务器的连接创建后,就会被调用,这个方法被第一个,调用(1)* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//因为在其他方法会使用到这个ctxcontext = ctx;System.out.println("调用(1) channelActive--->连接到服务器");}/*** 被调用(4)* 收到服务器的数据后,调用方法* @param ctx* @param msg* @throws Exception*/@Overridepublic synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {result = (String) msg;System.out.println("调用(4)channelRead--->从服务器读取到数据:"+result);//唤醒等待的线程notify();System.out.println("调用(4)channelRead---notify()---->从服务器读取到数据后唤醒线程.....");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}/*** 被调用(3), 被调用(5)* 被代理对象调用,发送数据给服务器,--->wait ---> 等待被唤醒 --->返回结果* @return* @throws Exception*/@Overridepublic synchronized Object call() throws Exception {context.writeAndFlush(param);System.out.println("调用(3) call()--->被代理对象调用,发送数据给服务器.....");//进行wait,等待channelRead 方法获取到服务器的结果后,唤醒wait();System.out.println("调用(5) call()--->wait() 等待channelRead 方法获取到服务器的结果后.....");return result;}/*** 被调用(2)* @param param*/void setParam(String param){System.out.println("调用(2) setParam()--->被代理对象调用,发送数据给服务器.....");this.param = param;}
}
若有问题请留言。