一. 基本概念
区别于传统的增删改查型的业务项目,本项目侧重于开发框架,并且涉及架构方面的技术知识点。
1. 什么是RPC?
远程过程调用(Remote Procedure Call),是一种计算机通信协议,它允许程序在不同的计算机之间交互通信,以实现本地调用的效果。
类似于如今的外卖平台,外卖平台出现之前消费者需要到线下店铺购买,耗时耗力。现在通过外卖平台,只需要下单选择商品,无需关心数据在网络中如何传输的、外卖平台是怎么转发的、骑手是如何配送的等事情,等待商品即可。
进一步说明:可以让客户端在不清楚调用细节的情况下,实现对远程计算机上某个方法或服务的调用,就像调用本地方法一样。
RPC既然是一种计算机协议,那么就需要开发者去遵循这套规范进行实现,目前市面上常见的RPC框架有:Dubbo、GRPC等。
2. 为什么需要RPC?
在现实场景中,随着企业中业务量和应用功能模块的增多,单机部署运行已经无法满足需求,可能需要将业务或功能模块拆分到不同的机器和服务器上,以减轻压力,但有些功能是通用的,因此可以将这些通用的模块划分成公共模块,组成独立的一部分。
RPC就允许一个程序(服务消费者)像调用自己的本地方法一样去调用这个公共模块的接口(服务提供者),不需要了解数据的传输处理过程、底层网络通信的细节等。RPC已经帮助做完了这些事情。
举例:
项目A提供了点餐服务,项目B需要调用点餐服务完成下单。
//伪代码
interface OrderService{//点餐,返回orderIdlong order(参数1,参数2,参数3);
}
如果没有RPC,并且由于项目A和项目B都是独立的系统,所以不能像SDK一样作为依赖包直接引入。那么就需要项目A提供Web服务,同时编写一个点餐服务接口提供给外界,比如访问http://xxx.com 就能调用点餐服务。之后项目B作为服务消费者,需要自己构造请求,并通过HttpClient请求上述地址。理论上来说,如果B需要多个第三方服务,那么每个服务和方法的调用都需要编写一个HTTP请求,耗时耗力。
//伪代码
url = "http://xxx.com"
req = new Req(参数1,参数2,参数3)
res = httpClient.post(url).body(req).execute()
orderId = res.data.orderId
而通过RPC框架,项目B可以像调用本地方法一样完成调用,一行代码即可解决。
//伪代码
orderId = orderService.order(参数1,参数2,参数3)
二. 简易版RPC框架
1. 设计流程
(1)现在有一个消费者和一个服务提供者。
(2)消费者想要调用服务,需要提供者启动一个web服务,然后消费者通过请求客户端发送HTTP请求或其它协议的请求来调用。比如请求xxx.com/order地址后,就会调用提供者的order方法。
(3) 但如果提供者提供了多个服务和方法,每个服务和方法都要单独写一个接口,消费者想调用的话需要对每个接口都写一段HTTP调用逻辑。效率很低。
因此,可以提供一个统一的服务调用接口,通过请求处理器根据客户端的请求参数来调用不同的方法。同时在服务提供者程序中维护一个本地服务注册器,用于记录服务和对应实现类的映射。
此外,由于Java对象无法直接在网络中传输,因此需要对传递的参数进行序列化和反序列化。
例如:消费者想要调用orderService接口的order方法,发送请求,参数为service=orderService,method=order,然后请求处理器会根据serv从服务注册器中找到对应的服务实现类,并通过Java的反射机制调用method指定的方法。
(4)再进一步,为了简化消费者发送请求的代码,实现类似本地的一行调用。可以基于代理模式,为消费者要调用的接口生成一个代理对象,由代理对象完成请求和响应的过程。
至此,简易版RPC框架完成:
网上的一个关于RPC响应流程的图(参考):
2. 构造初始项目
(1)项目初始化
简易版RPC框架目录:
- common模块包含示例代码的公共依赖,比如接口、Model等。
- consumer模块包含示例消费者代码。
- provider模块包含例服务提供者代码。
- khr-rpc-easy模块是简易版RPC框架。
简易版RPC框架侧重于整个调用流程的实现。
(2)common模块
公共模块需要同时被消费者和服务提供者引入,主要包含和服务相关的接口和数据模型代码。
用户实体类User:
package com.khr.example.common.model;import java.io.Serializable;/*** 用户*/// 继承Serializable,用于指示该类实例可以被序列化和反序列化。
// 这意味着User对象可以在程序之间进行持久化存储或在网络通信中传输。
public class User implements Serializable{private String name;public String getName(){return name;}public void setName(String name){this.name = name;}
}
用户接口服务UserService,提供一个获取用户的方法:
package com.khr.example.common.service;import com.khr.example.common.model.User;/*** 用户服务*/
public interface UserService {/*** 获取用户** @param user* @return*/User getUser(User user);}
(3)provider模块
服务提供者是真正实现了接口的模块。
先引入hutool、lombok依赖。
服务实现类UserServiceImpl:
实现公共模块中定义的用户接口服务UserService,功能是打印用户名称并返回参数中的用户对象。
package com.khr.example.provider;import com.khr.example.common.model.User;
import com.khr.example.common.service.UserService;/*** 用户服务实现类*/
public class UserServiceImpl implements UserService {public User getUser(User user){System.out.println("用户名:"+ user.getName());return user;}
}
服务提供者启动类EasyProviderExample:
提供服务的代码之后再补充。
package com.khr.example.provider;import com.khr.example.common.service.UserService;
import com.khr.krpc.server.HttpServer;
import com.khr.krpc.server.VertxHttpServer;
import com.khr.krpc.registry.LocalRegistry;/*** 简易服务提供者示例*/
public class EasyProviderExample {public static void main(String[] args){//提供服务}
}
(3)consumer模块
消费者模块是需要调用服务的模块。
同样先引入hutool、lombok等依赖。
创建消费者启动类EasyConsumerExample:
调用接口。目前无法获取到userService实例,先预留为null。之后的目标是能够通过RPC框架快速得到一个支持远程调用服务提供者的代理对象,像调用本地方法一样调用UserService的方法。
package com.khr.example.consumer;import com.khr.example.common.model.User;
import com.khr.example.common.service.UserService;
import com.khr.krpc.proxy.ServiceProxyFactory;/*** 简易服务消费者示例*/
public class EasyConsumerExample {public static void main(String[] args){//todo 需要获取UserService的实现类对象UserService userService = null;User user = new User();user.setName("KHR");//调用User newUser = userService.getUser(user);if (newUser != null){System.out.println(newUser.getName());} else {System.out.println("user == null");}}
}
3. Web服务器
消费者想要调用另一台机器上的服务提供者的方法,需要服务提供者开启可远程访问的服务。因此需要一个Web服务器,能够接受处理请求,并返回响应。
Spring Boot框架内置了Tomcat,还有NIO框架的Netty、Vert.x等都是Web服务器。本项目中使用高性能的NIO框架 Vert.x 作为RPC框架的Web服务器。关于Vert.x之后会另出文章总结。
先引入Vert.x的依赖。
创建Web服务器接口HttpServer:
定义统一的启动服务器方法,便于后续扩展,比如实现多种不同的Web服务器。
package com.khr.krpc.server;/*** HTTP服务器接口*/
public interface HttpServer {/*** 启动服务器** @param port*/void doStart(int port);
}
创建基于Vert.x实现的Web服务器VertxHttpServer:
监听指定端口并处理请求。
package com.khr.krpc.server;import io.vertx.core.Vertx;/*** Vertx HTTP 服务器*/
public class VertxHttpServer implements HttpServer{/*** 启动服务器** @param port*/public void doStart(final int port) {//创建Vert.x实例Vertx vertx = Vertx.vertx();//创建HTTP服务器io.vertx.core.http.HttpServer server = vertx.createHttpServer();//监听端口并处理请求server.requestHandler(request ->{//处理HTTP请求System.out.println("Received request:"+ request.method()+" "+ request.uri());//发送HTTP响应request.response().putHeader("content-type","text/plain").end("Hello from Vert.x HTTP server!")});//启动HTTP服务器并监听指定端口server.listen(port, result -> {if (result.succeeded()) {System.out.println("Server is now listening on port" + port);} else {System.out.println("Failed to start server:" + result.cause());}});}
}
验证web服务器能否启动成功并接受请求:
修改provider模块的EasyProviderExample类,编写启动Web服务的代码。
通过浏览器访问localhost:8080,即可查看到" Hello from Vert.x HTTP server! "字样。
package com.khr.example.provider;import com.khr.example.common.service.UserService;
import com.khr.krpc.server.HttpServer;
import com.khr.krpc.server.VertxHttpServer;
import com.khr.krpc.registry.LocalRegistry;/*** 简易服务提供者示例*/
public class EasyProviderExample {public static void main(String[] args){//启动web服务HttpServer httpServer = new VertxHttpServer();httpServer.doStart(8080);}
}
4. 本地服务注册器
简易版RPC框架暂时不使用第三方注册中心(Nacos、Zookeeper等),先跑通整个流程。因此直接把服务注册到服务提供者本地。
在RPC模块中创建本地服务注册器LocalRegistry:
使用线程安全的ConcurrentHashMap存储服务注册信息,key为服务名称,value为服务的实现类。之后就可以根据要调用的服务名称获取到对应的实现类,然后通过反射进行方法调用了。
package com.khr.krpc.registry;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** 本地注册中心*/public class LocalRegistry {/*** 注册信息存储*/private static final Map<String,Class<?>> map = new ConcurrentHashMap<>();/*** 注册服务** @param serviceName* @param implClass*/public static void registry(String serviceName,Class<?> implClass){map.put(serviceName,implClass);}/*** 获取服务** @param serviceName* @return*/public static Class<?> get(String serviceName){return map.get(serviceName);}/*** 删除服务** @param serviceName*/public static void remove(String serviceName){map.remove(serviceName);}
}
修改EasyProviderExample:
给provider模块增加注册服务到注册器中的逻辑,也就是服务提供者启动时,会进行本地注册。
package com.khr.example.provider;import com.khr.example.common.service.UserService;
import com.khr.krpc.server.HttpServer;
import com.khr.krpc.server.VertxHttpServer;
import com.khr.krpc.registry.LocalRegistry;/*** 简易服务提供者示例*/
public class EasyProviderExample {public static void main(String[] args){//注册服务LocalRegistry.registry(UserService.class.getName(),UserServiceImpl.class);//启动web服务HttpServer httpServer = new VertxHttpServer();httpServer.doStart(8080);}
}
5. 序列化器
服务在本地注册后,就可以根据请求信息取出实现类并调用方法了。
但由于传递的参数是Java对象,仅运行在JVM中,如果想要在网络中传输,需要进行序列化与反序列化操作。序列化与反序列化不再做过多介绍,之前的Java八股中已经解释。
有多种不同的序列化方法,比如Java原生序列化、Json、Hessian等。为了方便实现,此处先选择Java原生序列化器。
在RPC模块中创建序列化接口Serializer:
提供序列化和反序列化两个方法,同时便于后续扩展。
package com.khr.krpc.serializer;import java.io.IOException;/*** 序列化器接口*/
public interface Serializer {/*** 序列化** @param object* @param <T>* @return* @throws IOException*/<T> byte[] serialize(T object) throws IOException;/*** 反序列化** @param bytes* @param type* @param <T>* @return* @throws IOException*/<T> T deserialize(byte[] bytes,Class<T> type) throws IOException;
}
基于Java自带的序列化器实现JdkSerializer(现用现查):
package com.khr.krpc.serializer;import java.io.*;/*** JDK序列化器*/public class JdkSerializer implements Serializer{/*** 序列化** @param object* @param <T>* @return* @throws IOException*/@Overridepublic <T> byte[] serialize(T object) throws IOException{ByteArrayOutputStream outputStream = new ByteArrayOutputStream();ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);objectOutputStream.writeObject(object);objectOutputStream.close();return outputStream.toByteArray();}/*** 反序列化** @param bytes* @param type* @param <T>* @return* @throws IOException*/@Overridepublic <T> T deserialize(byte[] bytes,Class<T> type) throws IOException{ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);try{return (T) objectInputStream.readObject();} catch (ClassNotFoundException e){throw new RuntimeException(e);} finally {objectInputStream.close();}}
}
6. 提供者处理调用 — 请求处理器
请求处理器用于处理接收到的请求,并根据请求参数找到对应的服务和方法,通过反射实现调用,最后封装返回结果并响应请求。
在RPC模块中创建请求和响应封装类:
请求类RpcRequest的作用是封装调用所需的信息,比如服务名称、方法名称、调用参数的类型列表、参数列表等。这些都是Java反射机制所需的参数。
package com.khr.krpc.model;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;/*** RPC请求*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RpcRequest implements Serializable{/*** 服务名称*/private String serviceName;/*** 方法名称*/private String methodName;/*** 参数类型列表*/private Class<?>[] parameterTypes;/*** 参数列表*/private Object[] args;
}
响应类RpcResponse的作用是封装调用方法后得到的返回值以及调用信息等。
package com.khr.krpc.model;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;/*** RPC响应*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RpcResponse implements Serializable{/*** 响应数据*/private Object data;/*** 响应数据类型(预留)*/private Class<?> dataType;/*** 响应信息*/private String message;/*** 异常信息*/private Exception exception;
}
创建请求处理器HttpServerHandler:
业务流程:
- 反序列化请求为对象(因为在消费者发送请求给Web服务器时已经将各类参数序列化),并从请求对象中获取参数。
- 根据服务名称从本地注册器中获取到对应的服务实现类。
- 通过反射机制调用方法,得到返回结果。
- 封装返回结果并序列化,然后写入到响应中。
package com.khr.krpc.server;import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.model.RpcResponse;
import com.khr.krpc.registry.LocalRegistry;
import com.khr.krpc.serializer.JdkSerializer;
import com.khr.krpc.serializer.Serializer;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;import java.io.IOException;
import java.lang.reflect.Method;/*** HTTP请求处理*/
public class HttpServerHandler implements Handler<HttpServerRequest> {@Overridepublic void handle(HttpServerRequest request) {//指定序列化器final Serializer serializer = new JdkSerializer();//记录日志System.out.println("Received request:" + request.method() + " " + request.uri());//异步处理HTTP请求request.bodyHandler(body -> {byte[] bytes = body.getBytes();RpcRequest rpcRequest = null;try {rpcRequest = serializer.deserialize(bytes, RpcRequest.class);} catch (Exception e) {e.printStackTrace();}//构造响应结果对象RpcResponse rpcResponse = new RpcResponse();//如果请求为null,直接返回if (rpcRequest == null) {rpcResponse.setMessage("rpcRequest is null");doResponse(request, rpcResponse, serializer);return;}try {//获取要调用的服务实现类,通过反射调用Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());//封装返回结果rpcResponse.setData(result);rpcResponse.setDataType(method.getReturnType());rpcResponse.setMessage("ok");} catch (Exception e) {e.printStackTrace();rpcResponse.setMessage(e.getMessage());rpcResponse.setException(e);}//响应doResponse(request, rpcResponse, serializer);});}/*** 响应** @param request* @param rpcResponse* @param serializer*/void doResponse(HttpServerRequest request,RpcResponse rpcResponse,Serializer serializer){HttpServerResponse httpServerResponse = request.response().putHeader("content-type","application/json");try{//序列化byte[] serialized = serializer.serialize(rpcResponse);httpServerResponse.end(Buffer.buffer(serialized));}catch (IOException e){e.printStackTrace();httpServerResponse.end(Buffer.buffer());}
}
}
不同的Web服务器对应的请求处理器实现方式也不同,Vert.x是通过实现Handler<HttpServerRequest>接口来自定义请求处理器,并且可以通过request.bodyHandler异步处理请求。
之后,再给HttpServer绑定请求处理器:
修改VertxHttpServer的代码,通过server.requestHandler绑定请求处理器。
package com.khr.krpc.server;import io.vertx.core.Vertx;/*** Vertx HTTP 服务器*/
public class VertxHttpServer implements HttpServer{/*** 启动服务器** @param port*/public void doStart(final int port) {//创建Vert.x实例Vertx vertx = Vertx.vertx();//创建HTTP服务器io.vertx.core.http.HttpServer server = vertx.createHttpServer();//监听端口并处理请求server.requestHandler(new HttpServerHandler());//启动HTTP服务器并监听指定端口server.listen(port, result -> {if (result.succeeded()) {System.out.println("Server is now listening on port" + port);} else {System.out.println("Failed to start server:" + result.cause());}});}
}
至此,服务提供者已经能够接受请求并完成服务调用。
7. 消费者发起调用 — 基于代理模式
在之前完成的consumer模块中,消费者无法获取userService实例,我们是希望通过代理对象来发起调用,从而实现一行代码调用。
只要能获取到UserService对象(实现类),就能完成调用,那么问题是如何获取呢?如果把provider模块的UserServiceImpl的代码复制粘贴到consumer模块,那么RPC框架就失去了它存在的意义。在分布式系统中,当开发者调用其它项目或团队提供的接口时,一般只关注请求参数和响应结果(点外卖),而不关注具体实现(信息传输、配送等)。
因此可以通过生成代理对象的方式来简化消费者的调用过程,即把事情都交给中介去做。
代理的实现方式:静态代理和动态代理。
静态代理:
为每个特定类型的接口或对象编写一个代理类,即在consumer模块中创建静态代理类UserServiceProxy,实现UserService接口和getUser方法。
但getUser方法的实现不是复制粘贴provider模块中UserServiceImpl的代码,而是通过构造HTTP请求去调用服务提供者。代理对象在发送请求时要先将参数进行序列化。
package com.khr.example.consumer;import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.khr.example.common.model.User;
import com.khr.example.common.service.UserService;
import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.model.RpcResponse;
import com.khr.krpc.serializer.JdkSerializer;
import com.khr.krpc.serializer.Serializer;import java.io.IOException;/*** 静态代理*/public class UserServiceProxy implements UserService {public User getUser(User user){//指定序列化器Serializer serializer = new JdkSerializer();//发请求RpcRequest rpcRequest = RpcRequest.builder().serviceName(UserService.class.getName()).methodName("getUser").parameterTypes(new Class[]{User.class}).args(new Object[]{user}).build();try{byte[] boduBytes = serializer.serialize(rpcRequest);byte[] result;try(HttpResponse httpResponse = HttpRequest.post("http://localhost:8080").body(boduBytes).execute()){result = httpResponse.bodyBytes();}RpcResponse rpcResponse = serializer.deserialize(result,RpcResponse.class);return(User)rpcResponse.getData();} catch (IOException e){e.printStackTrace();}return null;}
}
然后修改EasyConsumerExample:
添加一个代理对象并赋值给userService。
package com.khr.example.consumer;import com.khr.example.common.model.User;
import com.khr.example.common.service.UserService;
import com.khr.krpc.proxy.ServiceProxyFactory;/*** 简易服务消费者示例*/
public class EasyConsumerExample {public static void main(String[] args){//静态代理//UserService userService = new UserServiceProxy();User user = new User();user.setName("KHR");//调用User newUser = userService.getUser(user);if (newUser != null){System.out.println(newUser.getName());} else {System.out.println("user == null");}}
}
静态代理其实就是写一个实现类,但如果服务接口很多,需要为每个接口都写一个实现类,依然比较繁琐,灵活性较差。
因此RPC框架中一般采用动态代理。
动态代理的作用是根据要生成的对象类型,自动生成一个代理对象。常用的方式有JDK动态代理或CGLIB。此处使用JDK动态代理。
在RPC模块中创建动态代理类ServiceProxy:
需要实现InvocationHandler接口的invoke方法。
当用户调用某个接口的方法时,会改为调用invoke方法。在invoke方法中,可以获取到要调用的方法信息、传入的参数列表等,这些就是服务提供者需要的参数。用这些参数来构造请求对象即可。
package com.khr.krpc.proxy;import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.model.RpcResponse;
import com.khr.krpc.serializer.JdkSerializer;
import com.khr.krpc.serializer.Serializer;import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;/*** 服务代理(JDK动态代理)*/public class ServiceProxy implements InvocationHandler {/*** 调用代理** @return* @throws Throwable*/@Overridepublic Object invoke(Object proxy,Method method,Object[] args) throws Throwable{//指定序列化器Serializer serializer = new JdkSerializer();//构造请求RpcRequest rpcRequest = RpcRequest.builder().serviceName(method.getDeclaringClass().getName()).methodName(method.getName()).parameterTypes(method.getParameterTypes()).args(args).build();try{//序列化byte[] boduBytes = serializer.serialize(rpcRequest);//发送请求//todo 注意,这里地址被硬编码了,需要使用注册中心和服务发现机制解决try(HttpResponse httpResponse = HttpRequest.post("http://localhost:8080").body(boduBytes).execute()){byte[] result = httpResponse.bodyBytes();//反序列化RpcResponse rpcResponse = serializer.deserialize(result,RpcResponse.class);return rpcResponse.getData();}} catch (IOException e){e.printStackTrace();}return null;}
}
创建动态代理工厂ServiceProxyFactory:
根据指定类创建动态代理对象,使用了工厂设计模式,来简化对象的创建过程。
package com.khr.krpc.proxy;import java.lang.reflect.Proxy;/*** 服务代理工厂(用于创建代理对象)*/public class ServiceProxyFactory {/*** 根据服务类获取代理对象** @param serviceClass* @param <T>* @return*/public static <T> T getProxy(Class<T> serviceClass){return (T) Proxy.newProxyInstance( //为指定类型创建代理对象serviceClass.getClassLoader(),new Class[]{serviceClass},new ServiceProxy());}
}
修改EasyConsumerExample代码:
通过调用工厂为UserService获取动态代理对象。
package com.khr.example.consumer;import com.khr.example.common.model.User;
import com.khr.example.common.service.UserService;
import com.khr.krpc.proxy.ServiceProxyFactory;/*** 简易服务消费者示例*/
public class EasyConsumerExample {public static void main(String[] args){//静态代理//UserService userService = new UserServiceProxy();//动态代理UserService userService = ServiceProxyFactory.getProxy(UserService.class);User user = new User();user.setName("KHR");//调用User newUser = userService.getUser(user);if (newUser != null){System.out.println(newUser.getName());} else {System.out.println("user == null");}}
}
至此,简易版RPC框架开发完成。
8. 测试调用
首先debug启动服务提供者。
之后debug启动消费者,在ServiceProxy代理类中加断点,可以发现调用userService的时候,实际上是去调用了代理对象的invoke方法,并获取到了serviceName、methodName、参数类型和列表等信息。
继续dubug,可以看到序列化后的请求对象,结构式字节数组:
在请求处理器中打断点,可以看到接受并反序列化后的请求与发送时的内容一致:
说明代理对象将请求参数序列化后发送到Web服务器中,被请求处理器接收后进行反序列化,这一流程实现了。
最后在provider与consumer模块中都输出了用户名称,说明整个流程跑通了。
后续将会持续扩展本项目,以实现更为复杂的RPC框架。