提示:dubbo底层实现,手写dubbo框架。手写rpc框架、用servlet实现dubbo、用servlet实现rpc框架
文章目录
- 前言
- 一、实现步骤描述
- 1.1、provider的原理
- 1.2、consumer的原理:
- 二、代码实现
- 2.1、api项目
- 2.1.1
- 2.1.2
- 2.1.3
- 2.2、provider项目
- 2.2.1、provider项目的调用流程
- 2.2.2、代码
- 2.2.2.1、
- 2.2.2.2、
- 2.2.2.3、
- 2.2.2.4、
- 2.3、consumer
- 2.3.1、consumer项目的流程
- 2.3.2、代码
- 2.3.2.1、
- 2.3.2.2、
- 2.3.2.3、
- 2.4、测试
- 总结
前言
上次开会,同时们讨论了一个dubbo框架的事务问题。才猛然意识到,自己之前手写过dubbo框架,结果都还给老师了。趁此机会,回忆并记录一下,方便自己日后查阅。本人水平有限,如有误导,欢迎斧正,一起学习,共同进步!
一、实现步骤描述
手写rpc原理时,一共有三个项目,分别是api项目、consumer项目、provider项目。其中consumer和provider都依赖了api。其中的
api项目中:请求的参数定义、接口的定义 等一些公共规范。
provider服务提供方:接口的具体实现(具体的功能)、socket的服务器端
consumer服务消费方:准备了请求参数、socket的客户端。
1.1、provider的原理
项目启动起来,创建一个socket的服务端,不断的监听某一个端口,等待着socket的客户端的链接,然后通过socket.getInputSream来拿到socket传递过来的对象,拿到对象以后,就是具体实现的参数(比如说UserServiceImpl的addUser的方法的请求参数),通过这个参数,去调用这个方法,调用完成以后,拿到一个返回结果。将这个返回结果以socket.getOutputSream的writeObject方法返回给请求端。
1.2、consumer的原理:
项目通过动态代理来生成一个目标(UserService)的代理对象(此次是jdk动态代理)。因为是jdk动态代理,所以是实现了InvocationHandler接口,具体的执行的invoke方法,在invoke方法中,去发起了socket的调用(指定了socket的host、port,参数,然后socket服务器那边就会监听到)。拿到这个代理对象以后,调用这个对象的addUser方法,并拿到响应结果。因为这个动态代理对象有这个目标对象的全部方法,所以可以直接调用。
二、代码实现
2.1、api项目
2.1.1
这个是UserService。也就是定义的规范
/*** @author: ZhengTianLiang* @date: 2021/07/11 17:16* @desc: 服务的标准(对外暴露的api规范)*/
public interface UserService {public UserDTO addUser(UserDTO userDTO);
}
2.1.2
这个是UserDTO 也就是addUser方法的请求参数
/*** @author: ZhengTianLiang* @date: 2021/07/11 17:16* @desc: 是在网络中传输的dto,*/@Data
public class UserDTO implements Serializable {/*** 为了保证系列化和反序列化的安全性,可以加一个id*/private static final long serialVersionUID = -7085411221862236858L;private String name;private String age;private String userId;
}
2.1.3
这个是RPCCommonReqDTO,也就是网络传输中的对象,里面有一个Object类型的参数,这个里面可以放上面的UserDTO
/*** @author: ZhengTianLiang* @date: 2021/07/12 22:34* @desc: 定义的统一的在网络中数据传输的规则*/@Data
public class RPCCommonReqDTO implements Serializable {private static final long serialVersionUID = 666960806401175269L;// 方法名称private String methodName;// 类的权限定路径private String classpath;// 方法的参数private Object[] args;
}
2.2、provider项目
2.2.1、provider项目的调用流程
项目跑起来,调用startup方法,监听一个端口。监听某一个端口时它会等着某个客户端的链接。然后来一个socket对象,就创建一个线程去处理这个socket对象,所以具体的操作这个socket的方法,就需要写在runnable接口的run方法里面。run方法中,通过构造器的方式,将socket注入进来了,然后通过socket获取到输入流对象,然后拿到传输过来的对象,然后将这个对象,交给ServiceDispatch对象进行服务的分发,然后将ServiceDispatch对象分发完以后,返回的对象,通过网络(socket,OutputStream)返回给请求端
2.2.2、代码
2.2.2.1、
这个是serviceImpl,是上面的规范的具体实现
/*** @author: ZhengTianLiang* @date: 2021/07/11 17:28* @desc: 这个是,api项目中的。UserService 接口的实现类* 就是说,rpc-api项目,只是单纯的定义规范,* rpc-provider项目,是具体实现(提供服务的)* prc-consumer项目,是服务的消费方*/public class UserServiceImpl implements UserService {/*** @author: ZhengTianLiang* @date: 2021/07/11 17:29* @desc: 服务的具体实现*/public UserDTO addUser(UserDTO userDTO) {System.out.println("接收到的dto:" + userDTO);userDTO.setUserId(new Random().nextInt(1000000) + "");System.out.println("返回的dto:" + userDTO);return userDTO;}
}
2.2.2.2、
dispatch,进行服务的分发,有点类似于nginx的服务转发
package com.csdn.dispatch;import com.csdn.dao.RPCCommonReqDTO;import java.lang.reflect.Method;/*** @author: ZhengTianLiang* @date: 2021/07/11 18:00* @desc: 用来做网络的分发*/
public class ServiceDispatch {/*** @author: ZhengTianLiang* @date: 2021/07/11 18:00* @desc: 做服务的分发* 若是不用反射的话,可能是* 当type=1 ---> 调用某个方法(比如说addUser)* 当type=2 ---> 调用某个方法(比如说selectUser)* ...* 你至少要写一个枚举,type=1,2,3.。。 可能还要提供一份文档** 但是若是用了反射,则省下了这些操作,直接提供不同的参数,就调用了不同的方法*/public static Object dispatch(Object reqObj) {// 基于方法的反射的调用,来实现服务的分发。RPCCommonReqDTO rpcCommonReqDTO = (RPCCommonReqDTO) reqObj;Object[] args = rpcCommonReqDTO.getArgs();String classpath = rpcCommonReqDTO.getClasspath();String methodName = rpcCommonReqDTO.getMethodName();// 这个是存放形参的类型的数组的,是因为反射调用方法时,要传递的Class [] types = new Class[args.length];for (int i=0;i<args.length;i++){types[i] = args[i].getClass();}Object respObj = null;try {Class<?> clazz = Class.forName(classpath);Method method = clazz.getDeclaredMethod(methodName, types);// 调用方法 第一个参数是对象,第二个参数是方法的参数值(参数值,不是参数的clazz对象)// Object obj = clazz.getConstructor().newInstance(); Object result = m2.invoke(obj,10,20);respObj = method.invoke(clazz.newInstance(),args);}catch (Exception e){e.printStackTrace();}return respObj;}
}
2.2.2.3、
netServer,也就是socket的服务端,其中的main,是让socket服务端启动,开始监听socket请求
package com.csdn.net;import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @author: ZhengTianLiang* @date: 2021/07/11 17:34* @desc: 提供网络上的服务的传输 的功能的类*/
public class NetServer {// 总不能说,我这个服务器,只能处理一个线程吧,所以此时需要一个socket对应一个线程,因此我们创建一个线程池对象private static final ExecutorService threadPool = Executors.newFixedThreadPool(100);/*** @author: ZhengTianLiang* @date: 2021/07/11 17:35* @desc: 提供服务的实现。port代表的是端口** 流程是:项目跑起来,调用startup方法,监听一个端口。监听某一个端口时它会等着某个客户端的链接* 然后来一个socket对象,就创建一个线程去处理这个socket对象,* 所以具体的操作这个socket的方法,就需要写在runnable接口的run方法里面。* run方法中,通过构造器的方式,将socket注入进来了,然后通过socket获取到输入流对象,* 然后拿到传输过来的对象,然后将这个对象,交给ServiceDispatch对象进行服务的分发,* 然后将ServiceDispatch对象分发完以后,返回的对象,通过网络(socket,OutputStream)返回给请求端*/public static void startup(int port) throws IOException {ServerSocket serverSocket = new ServerSocket(port);while (true){// 阻塞的等待着,socket客户端的链接Socket socket = serverSocket.accept();// 让线程池去提交一个任务。submit中可以是 实现了runnable接口的类threadPool.submit(new RPCThreadProcessor(socket));// socket是通过输入、输出流的操作来实现通讯。这样是一种典型的nio(同步阻塞io),nio是会导致线程等待的。
// socket.getInputStream();
// socket.getOutputStream();}}/*** @author: ZhengTianLiang* @date: 2021/07/11 17:34* @desc: 写一个启动类,去启动这个socket的服务端,去不断的监听 是否有socket请求进来*/public static void main(String[] args) throws IOException {startup(9999);}}
2.2.2.4、
RPCThreadProcessor,每一个线程的具体的操作
package com.csdn.net;import com.csdn.dispatch.ServiceDispatch;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;/*** @author: ZhengTianLiang* @date: 2021/07/11 17:44* @desc: 线程的处理类(线程池中的线程对象 , 都这样处理)*/public class RPCThreadProcessor implements Runnable {/*** 因为我们这个线程是要操作socket对象的,所以我们为了方便,自己通过构造器方式注入一个socker对象* spring中的注入方式有三种,getter方法、构造器方式、注解方式(@Component/@Repository/@Controller/@Service)*/private Socket socket;public RPCThreadProcessor(Socket socket) {this.socket = socket;}/*** @author: ZhengTianLiang* @date: 2021/07/11 17:54* @desc: 具体的操作应该写在这里,流程的步骤我写在了:NetServer的解释中*/public void run() {// 因为在网络中传输的是java对象(dto对象),所以可以用ObjectInputStream而不是InputStreamObjectInputStream ois = null;ObjectOutputStream oos = null;try {ois = new ObjectInputStream(socket.getInputStream());// 在网络传输的过程中,传输过来的一个对象Object reqObj = ois.readObject();// 将网络中传输过来的对象,交给这个分发器去分发。拿到的返回对象,是要往请求端返回的Object respObj = ServiceDispatch.dispatch(reqObj);oos = new ObjectOutputStream(socket.getOutputStream());oos.writeObject(respObj);oos.flush();}catch (Exception e){e.printStackTrace();}finally {try {ois.close();oos.close();} catch (IOException e) {e.printStackTrace();}}}
}
2.3、consumer
2.3.1、consumer项目的流程
首先 通过动态工厂的方式,拿到一个目标对象的动态代理对象。然后封装请求参数,发起调用。调用的过程中,最终会跑到实现了InvocationHandler接口的 invoke 方法中,所以我们在invoke方法中调用了 socket的工具类,去发起socket请求。并接收到服务提供方的返回结果,然后拿到这个返回结果。完成一次rpc远程调用。
2.3.2、代码
2.3.2.1、
NetClient,也就是socket客户端的工具类
package com.csdn.net;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;/*** @author: ZhengTianLiang* @date: 2021/07/25 10:24* @desc: 是socket的工具类*/
public class NetClient {/*** @author: ZhengTianLiang* @date: 2021/07/25 10:24* @desc: 是socket的实现,发出具体的网络请求。 host代表目标主机、port代表目标端口,obj是请求入参*/public static Object callRemoteService(String host,int port,Object obj){ObjectInputStream ois = null;ObjectOutputStream oos = null;Object o = null;try {Socket socket = new Socket(host,port);oos = new ObjectOutputStream(socket.getOutputStream());oos.writeObject(obj);oos.flush();ois = new ObjectInputStream(socket.getInputStream());// 网络传过来的响应对象o = ois.readObject();}catch (Exception e){e.printStackTrace();}finally {try {ois.close();oos.close();} catch (IOException e) {e.printStackTrace();}}return o;}
}
2.3.2.2、
ProxyFactory,也就是生成目标对象(UserServiceImpl)的动态工厂
package com.csdn.proxy;import java.lang.reflect.Proxy;/*** @author: ZhengTianLiang* @date: 2021/07/25 9:56* @desc: 代理工厂对象,用来动态的生成 各种 serviceImpl 对象*/
public class ProxyFactory {/*** @author: ZhengTianLiang* @date: 2021/07/25 9:57* @param: 想创建什么类型的对象,就传入什么类型的clazz对象* @desc: 使用jdk动态代理来生成 传入参数的 动态代理对象*/public static <T> T getProxyInstance(Class<T> interfaceClazz){/*这个里面有三个参数,分别是:classLoader、clazz数组、InvocationHandler当前类的clazzLoader: 类名.class().getClassLoaderclass数组:接口列表(要创建的对象),是一个数组InvocationHandler:实现了InvocationHandler 接口的实现类*/return (T) Proxy.newProxyInstance(ProxyFactory.class.getClassLoader(),new Class[]{interfaceClazz},new RPCInvocationHandler());}
}
2.3.2.3、
InvocationHandler,其中的invoke方法,也就是具体的执行(里面调用了socket工具类去发请求)。
package com.csdn.proxy;import com.csdn.dao.RPCCommonReqDTO;
import com.csdn.net.NetClient;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;/*** @author: ZhengTianLiang* @date: 2021/07/25 10:04* @desc: 是实现了InvocationHandler接口的实现类,jdk动态代理需要实现这个接口。* 其中的 invoke 方法是具体的实现*/
public class RPCInvocationHandler implements InvocationHandler {/*** @author: ZhengTianLiang* @date: 2021/07/25 10:04* @desc: 动态代理可以对 保护目标对象、实例对象进行强化,* 这个invoke 方法内就是强化的具体* 这个invoke方法,进行网络请求的封装(因为你调用目标对象serviceImpl的addUser方法,* 最终是执行的这个invoke方法,所以我们可以把网络请求封装到这个invoke方法中去)*/public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 封装网络请求RPCCommonReqDTO rpcCommonReqDTO = new RPCCommonReqDTO();rpcCommonReqDTO.setMethodName(method.getName());rpcCommonReqDTO.setArgs(args);// 这个目前是写死的,二期优化会是动态的rpcCommonReqDTO.setClasspath("com.csdn.api.impl.UserServiceImpl");// 调用socket的工具类来发起请求Object responseObj = NetClient.callRemoteService("localhost", 9999, rpcCommonReqDTO);return responseObj;}
}
2.4、测试
测试如下:
package com.csdn;import com.csdn.api.UserService;
import com.csdn.dao.UserDTO;
import com.csdn.proxy.ProxyFactory;/*** @author: ZhengTianLiang* @date: 2021/07/25 9:55* @desc: 测试发起实例(这个项目是一个rpc Consumer项目 , 是测试消费方的代码)*/
public class Test {/*** @author: ZhengTianLiang* @date: 2021/07/25 9:55* @desc: 测试发起消费。通过自己动态代理工厂生成的,服务提供方的serviceImpl(我本身是服务消费方),* 通过这个动态代理生成的 服务提供方的动态代理对象,来具体的执行serviceImpl中的方法,来实现rpc远程调用*** 首先 通过动态工厂的方式,拿到一个目标对象的动态代理对象。然后封装请求参数,发起调用。* 调用的过程中,最终会跑到实现了InvocationHandler接口的 invoke 方法中,所以我们在invoke方法* 中调用了 socket的工具类,去发起socket请求。并接收到服务提供方的返回结果,然后拿到这个返回结果。* 完成一次rpc远程调用。*/public static void main(String[] args) {UserService proxyInstance = ProxyFactory.getProxyInstance(UserService.class);// 封装请求参数UserDTO userDTO = new UserDTO();userDTO.setName("userName");userDTO.setAge("18");UserDTO responsePojo = proxyInstance.addUser(userDTO);System.out.println("传入的参数是:" + userDTO);System.out.println("传入的参数是:" + responsePojo);}
}
总结
这只是dubbo框架的一个简单的实现,内部东西还很多。此次只是分享一下他最核心的rpc调用的流程。以后有机会了,会继续分享一些更细致的东西。