这一篇理解如果有难度,可能对netty不是很理解, 可以关注我netty专栏,还有另外一篇: 用 Netty 自己实现简单的RPC, 这一篇是学习netty的时候写的,更倾向于分析netty相关的知识, 今天我是学习dubbo,从一个rpc框架进行思考写的一篇文章
RPC 是“远程过程调用(Remote Procedure Call)”的缩写形式,比较通俗的解释是:像本地方法调用一样调用远程的服务。虽然 RPC 的定义非常简单,但是相对完整的、通用的 RPC 框架涉及很多方面的内容,例如注册发现、服务治理、负载均衡、集群容错、RPC 协议等,如下图所示:
简易 RPC 框架的架构图
本课时我们主要实现RPC 框架的基石部分——远程调用,简易版 RPC 框架一次远程调用的核心流程是这样的:
- Client 首先会调用本地的代理,也就是图中的 Proxy。
- Client 端 Proxy 会按照协议(Protocol),将调用中传入的数据序列化成字节流。
- 之后 Client 会通过网络,将字节数据发送到 Server 端。
- Server 端接收到字节数据之后,会按照协议进行反序列化,得到相应的请求信息。
- Server 端 Proxy 会根据序列化后的请求信息,调用相应的业务逻辑。
- Server 端业务逻辑的返回值,也会按照上述逻辑返回给 Client 端。
这个远程调用的过程,就是我们简易版本 RPC 框架的核心实现,只有理解了这个流程,才能进行后续的开发。
项目结构
了解了简易版 RPC 框架的工作流程和实现目标之后,我们再来看下项目的结构,为了方便起见,这里我们将整个项目放到了一个 Module 中了,如下图所示,你可以按照自己的需求进行模块划分。
那这各个包的功能是怎样的呢?我们就来一一说明。
- protocol:简易版 RPC 框架的自定义协议。
- serialization:提供了自定义协议对应的序列化、反序列化的相关工具类。
- codec:提供了自定义协议对应的编码器和解码器。
- transport:基于 Netty 提供了底层网络通信的功能,其中会使用到 codec 包中定义编码器和解码器,以及 serialization 包中的序列化器和反序列化器。
- registry:基于 ZooKeeper 和 Curator 实现了简易版本的注册中心功能。
- proxy:使用 JDK 动态代理实现了一层代理。
自定义协议
当前已经有很多成熟的协议了,例如 HTTP、HTTPS 等,那为什么我们还要自定义 RPC 协议呢?
从功能角度考虑,HTTP 协议在 1.X 时代,只支持半双工传输模式,虽然支持长连接,但是不支持服务端主动推送数据。从效率角度来看,在一次简单的远程调用中,只需要传递方法名和加个简单的参数,此时,HTTP 请求中大部分数据都被 HTTP Header 占据,真正的有效负载非常少,效率就比较低。
当然,HTTP 协议也有自己的优势,例如,天然穿透防火墙,大量的框架和开源软件支持 HTTP 接口,而且配合 REST 规范使用也是很便捷的,所以有很多 RPC 框架直接使用 HTTP 协议,尤其是在 HTTP 2.0 之后,如 gRPC、Spring Cloud 等。
这里我们自定义一个简易版的 Demo RPC 协议,如下图所示:
在 Demo RPC 的消息头中,包含了整个 RPC 消息的一些控制信息,例如,版本号、魔数、消息类型、附加信息、消息 ID 以及消息体的长度,在附加信息(extraInfo)中,按位进行划分,分别定义消息的类型、序列化方式、压缩方式以及请求类型。当然,你也可以自己扩充 Demo RPC 协议,实现更加复杂的功能。
Demo RPC 消息头对应的实体类是 Header,其定义如下:
public class Header {private short magic; // 魔数private byte version; // 版本号private byte extraInfo; // 附加信息private Long messageId; // 消息IDprivate Integer size; // 消息体长度
确定了 Demo RPC 协议消息头的结构之后,我们再来看 Demo RPC 协议消息体由哪些字段构成,这里我们通过 Request 和 Response 两个实体类来表示请求消息和响应消息的消息体:
public class Request implements Serializable {private String serviceName; // 请求的Service类名private String methodName; // 请求的方法名称private Class[] argTypes; // 请求方法的参数类型private Object[] args; // 请求方法的参数
}
public class Response implements Serializable {private int code = 0; // 响应的错误码,正常响应为0,非0表示异常响应private String errMsg; // 异常信息private Object result; // 响应结果
}
注意,Request 和 Response 对象是要进行序列化的,需要实现 Serializable 接口。为了让这两个类的对象能够在 Client 和 Server 之间跨进程传输,需要进行序列化和反序列化操作,这里定义一个 Serialization 接口,统一完成序列化相关的操作:
public interface Serialization {<T> byte[] serialize(T obj) throws IOException;<T> T deserialize(byte[] data, Class<T> clz) throws IOException;
}
在 Demo RPC 中默认使用 Hessian 序列化方式,下面的 HessianSerialization 就是基于 Hessian 序列化方式对 Serialization 接口的实现:
public class HessianSerialization implements Serialization {@Overridepublic byte[] serialize(Object data) throws IOException {ByteArrayOutputStream bos = new ByteArrayOutputStream();Hessian2Output out = new Hessian2Output(bos);out.writeObject(data);out.flush();return bos.toByteArray();}public <T> T deserialize(byte[] data, Class<T> clz) throws IOException {Hessian2Input input = new Hessian2Input(new ByteArrayInputStream(data));return (T) input.readObject(clz);}
}
在有的场景中,请求或响应传输的数据比较大,直接传输比较消耗带宽,所以一般会采用压缩后再发送的方式。在前面介绍的 Demo RPC 消息头中的 extraInfo 字段中,就包含了标识消息体压缩方式的 bit 位。这里我们定义一个 Compressor 接口抽象所有压缩算法:
public interface Compressor {byte[] compress(byte[] array) throws IOException;byte[] unCompress(byte[] array) throws IOException;
}
同时提供了一个基于 Snappy 压缩算法的实现,作为 Demo RPC 的默认压缩算法:
public class SnappyCompressor implements Compressor {public byte[] compress(byte[] array) throws IOException {if (array == null) { return null; }return Snappy.compress(array);}public byte[] unCompress(byte[] array) throws IOException {if (array == null) { return null; }return Snappy.uncompress(array);}
}
编解码实现
了解了自定义协议的结构之后,我们再来解决协议的编解码问题。
前面课时介绍 Netty 核心概念的时候我们提到过,Netty 每个 Channel 绑定一个 ChannelPipeline,并依赖 ChannelPipeline 中添加的 ChannelHandler 处理接收到(或要发送)的数据,其中就包括字节到消息(以及消息到字节)的转换。Netty 中提供了 ByteToMessageDecoder、 MessageToByteEncoder、MessageToMessageEncoder、MessageToMessageDecoder 等抽象类来实现 Message 与 ByteBuf 之间的转换以及 Message 之间的转换,如下图所示:
Netty 提供的 Decoder 和 Encoder 实现
在 Netty 的源码中,我们可以看到对很多已有协议的序列化和反序列化都是基于上述抽象类实现的,例如,HttpServerCodec 中通过依赖 HttpServerRequestDecoder 和 HttpServerResponseEncoder 来实现 HTTP 请求的解码和 HTTP 响应的编码。如下图所示,HttpServerRequestDecoder 继承自 ByteToMessageDecoder,实现了 ByteBuf 到 HTTP 请求之间的转换;HttpServerResponseEncoder 继承自 MessageToMessageEncoder,实现 HTTP 响应到其他消息的转换(其中包括转换成 ByteBuf 的能力)。
Netty 中 HTTP 协议的 Decoder 和 Encoder 实现
在简易版 RPC 框架中,我们的自定义请求暂时没有 HTTP 协议那么复杂,只要简单继承 ByteToMessageDecoder 和 MessageToMessageEncoder 即可。
首先来看 DemoRpcDecoder,它实现了 ByteBuf 到 Demo RPC Message 的转换,具体实现如下:
public class DemoRpcDecoder extends ByteToMessageDecoder {protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {if (byteBuf.readableBytes() < Constants.HEADER_SIZE) {return; // 不到16字节的话无法解析消息头,暂不读取}// 记录当前readIndex指针的位置,方便重置byteBuf.markReaderIndex();// 尝试读取消息头的魔数部分short magic = byteBuf.readShort();if (magic != Constants.MAGIC) { // 魔数不匹配会抛出异常byteBuf.resetReaderIndex(); // 重置readIndex指针throw new RuntimeException("magic number error:" + magic);}// 依次读取消息版本、附加信息、消息ID以及消息体长度四部分byte version = byteBuf.readByte();byte extraInfo = byteBuf.readByte();long messageId = byteBuf.readLong();int size = byteBuf.readInt();Object body = null;// 心跳消息是没有消息体的,无需读取if (!Constants.isHeartBeat(extraInfo)) {// 对于非心跳消息,没有积累到足够的数据是无法进行反序列化的if (byteBuf.readableBytes() < size) {byteBuf.resetReaderIndex();return;}// 读取消息体并进行反序列化byte[] payload = new byte[size];byteBuf.readBytes(payload);// 这里根据消息头中的extraInfo部分选择相应的序列化和压缩方式Serialization serialization = SerializationFactory.get(extraInfo);Compressor compressor = CompressorFactory.get(extraInfo);if (Constants.isRequest(extraInfo)) {// 得到消息体body = serialization.deserialize(compressor.unCompress(payload),Request.class);} else {// 得到消息体body = serialization.deserialize(compressor.unCompress(payload),Response.class);}}// 将上面读取到的消息头和消息体拼装成完整的Message并向后传递Header header = new Header(magic, version, extraInfo, messageId, size);Message message = new Message(header, body);out.add(message);}
}
public class DemoRpcEncoder extends MessageToByteEncoder<Message> {@Overrideprotected void encode(ChannelHandlerContext ctx,Message message, ByteBuf byteBuf) throws Exception {Header header = message.getHeader();// 依次序列化消息头中的魔数、版本、附加信息以及消息IDbyteBuf.writeShort(header.getMagic());byteBuf.writeByte(header.getVersion());byteBuf.writeByte(header.getExtraInfo());byteBuf.writeLong(header.getMessageId());Object content = message.getContent();if (Constants.isHeartBeat(header.getExtraInfo())) {byteBuf.writeInt(0); // 心跳消息,没有消息体,这里写入0return;}// 按照extraInfo部分指定的序列化方式和压缩方式进行处理Serialization serialization = SerializationFactory.get(header.getExtraInfo());Compressor compressor = CompressorFactory.get(header.getExtraInfo());byte[] payload = compressor.compress(serialization.serialize(content));byteBuf.writeInt(payload.length); // 写入消息体长度byteBuf.writeBytes(payload); // 写入消息体}
}
transport 相关实现
正如前文介绍 Netty 线程模型的时候提到,我们不能在 Netty 的 I/O 线程中执行耗时的业务逻辑。在 Demo RPC 框架的 Server 端接收到请求时,首先会通过上面介绍的 DemoRpcDecoder 反序列化得到请求消息,之后我们会通过一个自定义的 ChannelHandler(DemoRpcServerHandler)将请求提交给业务线程池进行处理。
在 Demo RPC 框架的 Client 端接收到响应消息的时候,也是先通过 DemoRpcDecoder 反序列化得到响应消息,之后通过一个自定义的 ChannelHandler(DemoRpcClientHandler)将响应返回给上层业务。
DemoRpcServerHandler 和 DemoRpcClientHandler 都继承自 SimpleChannelInboundHandler,如下图所示:
下面我们就来看一下这两个自定义的 ChannelHandler 实现:
public class DemoRpcServerHandler extends SimpleChannelInboundHandler<Message<Request>> {// 业务线程池private static Executor executor = Executors.newCachedThreadPool();@Overrideprotected void channelRead0(final ChannelHandlerContext channelHandlerContext, Message<Request> message) throws Exception {byte extraInfo = message.getHeader().getExtraInfo();if (Constants.isHeartBeat(extraInfo)) { // 心跳消息,直接返回即可channelHandlerContext.writeAndFlush(message);return;}// 非心跳消息,直接封装成Runnable提交到业务线程池executor.execute(new InvokeRunnable(message, channelHandlerContext));}
}
public class DemoRpcClientHandler extends SimpleChannelInboundHandler<Message<Response>> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Message<Response> message) throws Exception {NettyResponseFuture responseFuture =Connection.IN_FLIGHT_REQUEST_MAP.remove(message.getHeader().getMessageId());Response response = message.getContent();// 心跳消息特殊处理if (response == null && Constants.isHeartBeat(message.getHeader().getExtraInfo())) {response = new Response();response.setCode(Constants.HEARTBEAT_CODE);}responseFuture.getPromise().setSuccess(response.getResult());}
}
注意,这里有两个点需要特别说明一下。一个点是 Server 端的 InvokeRunnable,在这个 Runnable 任务中会根据请求的 serviceName、methodName 以及参数信息,调用相应的方法:
class InvokeRunnable implements Runnable {private ChannelHandlerContext ctx;private Message<Request> message;public InvokeRunnable(Message<Request> message, ChannelHandlerContext ctx) {this.message = message;this.ctx = ctx;}@Overridepublic void run() {Response response = new Response();Object result = null;try {Request request = message.getContent();String serviceName = request.getServiceName();// 这里提供BeanManager对所有业务Bean进行管理,其底层在内存中维护了// 一个业务Bean实例的集合。感兴趣的同学可以尝试接入Spring等容器管// 理业务BeanObject bean = BeanManager.getBean(serviceName);// 下面通过反射调用Bean中的相应方法Method method = bean.getClass().getMethod(request.getMethodName(), request.getArgTypes());result = method.invoke(bean, request.getArgs());} catch (Exception e) {// 省略异常处理} finally {}Header header = message.getHeader();header.setExtraInfo((byte) 1);response.setResult(result); // 设置响应结果// 将响应消息返回给客户端ctx.writeAndFlush(new Message(header, response));}}
另一个点是 Client 端的 Connection,它是用来暂存已发送出去但未得到响应的请求,这样,在响应返回时,就可以查找到相应的请求以及 Future,从而将响应结果返回给上层业务逻辑,具体实现如下:
public class Connection implements Closeable {private static AtomicLong ID_GENERATOR = new AtomicLong(0);public static Map<Long, NettyResponseFuture<Response>> IN_FLIGHT_REQUEST_MAP = new ConcurrentHashMap<>();private ChannelFuture future;private AtomicBoolean isConnected = new AtomicBoolean();public Connection(ChannelFuture future, boolean isConnected) {this.future = future;this.isConnected.set(isConnected);}public NettyResponseFuture<Response> request(Message<Request> message, long timeOut) {// 生成并设置消息IDlong messageId = ID_GENERATOR.incrementAndGet();message.getHeader().setMessageId(messageId);// 创建消息关联的FutureNettyResponseFuture responseFuture = new NettyResponseFuture(System.currentTimeMillis(),timeOut, message, future.channel(), new DefaultPromise(new DefaultEventLoop()));// 将消息ID和关联的Future记录到IN_FLIGHT_REQUEST_MAP集合中IN_FLIGHT_REQUEST_MAP.put(messageId, responseFuture);try {future.channel().writeAndFlush(message); // 发送请求} catch (Exception e) {// 发送请求异常时,删除对应的FutureIN_FLIGHT_REQUEST_MAP.remove(messageId);throw e;}return responseFuture;}// 省略getter/setter以及close()方法
}
我们可以看到,Connection 中没有定时清理 IN_FLIGHT_REQUEST_MAP 集合的操作,在无法正常获取响应的时候,就会导致 IN_FLIGHT_REQUEST_MAP 不断膨胀,最终 OOM。你也可以添加一个时间轮定时器,定时清理过期的请求消息,这里我们就不再展开讲述了。
完成自定义 ChannelHandler 的编写之后,我们需要再定义两个类—— DemoRpcClient 和 DemoRpcServer,分别作为 Client 和 Server 的启动入口。DemoRpcClient 的实现如下:
public class DemoRpcClient implements Closeable {protected Bootstrap clientBootstrap;protected EventLoopGroup group;private String host;private int port;public DemoRpcClient(String host, int port) {this.host = host;this.port = port;// 创建并配置客户端BootstrapclientBootstrap = new Bootstrap();group = NettyEventLoopFactory.eventLoopGroup(Constants.DEFAULT_IO_THREADS, "NettyClientWorker");clientBootstrap.group(group).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).channel(NioSocketChannel.class) // 创建的Channel类型// 指定ChannelHandler的顺序.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("demo-rpc-encoder", new DemoRpcEncoder());ch.pipeline().addLast("demo-rpc-decoder", new DemoRpcDecoder());ch.pipeline().addLast("client-handler", new DemoRpcClientHandler());}});}public ChannelFuture connect() {// 连接指定的地址和端口ChannelFuture connect = clientBootstrap.connect(host, port);connect.awaitUninterruptibly();return connect;}@Overridepublic void close() {group.shutdownGracefully();}
}
通过 DemoRpcClient 的代码我们可以看到其 ChannelHandler 的执行顺序如下:
客户端 ChannelHandler 结构图
另外,在创建EventLoopGroup时并没有直接使用NioEventLoopGroup,而是在 NettyEventLoopFactory 中根据当前操作系统进行选择,对于 Linux 系统,会使用 EpollEventLoopGroup,其他系统则使用 NioEventLoopGroup。
接下来我们再看DemoRpcServer 的具体实现:
public class DemoRpcServer {private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;private ServerBootstrap serverBootstrap;private Channel channel;protected int port;public DemoRpcServer(int port) throws InterruptedException {this.port = port;// 创建boss和worker两个EventLoopGroup,注意一些小细节,// workerGroup 是按照中的线程数是按照 CPU 核数计算得到的bossGroup = NettyEventLoopFactory.eventLoopGroup(1,"NettyServerBoss");workerGroup = NettyEventLoopFactory.eventLoopGroup(Math.min(Runtime.getRuntime().availableProcessors() + 1, 32),"NettyServerWorker");serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_REUSEADDR, Boolean.TRUE).childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE).childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)// 指定每个Channel上注册的ChannelHandler以及顺序.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("demp-rpc-decoder", new DemoRpcDecoder());ch.pipeline().addLast("demo-rpc-encoder", new DemoRpcEncoder());ch.pipeline().addLast("server-handler", new DemoRpcServerHandler());}});}public ChannelFuture start() throws InterruptedException {// 监听指定的端口ChannelFuture channelFuture = serverBootstrap.bind(port).sync();channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (channelFuture.isSuccess()) {System.out.println("监听端口 6668 成功");} else {System.out.println("监听端口 6668 失败");}}});channel = channelFuture.channel();channel.closeFuture().sync();return channelFuture;}public void startAndWait() throws InterruptedException {try {channel.closeFuture().await();} catch (InterruptedException e) {Thread.interrupted();}}public void shutdown() throws InterruptedException {channel.close().sync();if (bossGroup != null)bossGroup.shutdownGracefully().awaitUninterruptibly(15000);if (workerGroup != null)workerGroup.shutdownGracefully().awaitUninterruptibly(15000);}}
通过对 DemoRpcServer 实现的分析,我们可以知道每个 Channel 上的 ChannelHandler 顺序如下:
服务端 ChannelHandler 结构图
registry 相关实现
介绍完客户端和服务端的通信之后,我们再来看简易 RPC 框架的另一个基础能力——服务注册与服务发现能力,对应 demo-rpc 项目源码中的 registry 包。
registry 包主要是依赖 Apache Curator 实现了一个简易版本的 ZooKeeper 客户端,并基于 ZooKeeper 实现了注册中心最基本的两个功能:Provider 注册以及 Consumer 订阅。
这里我们先定义一个 Registry 接口,其中提供了注册以及查询服务实例的方法,如下图所示:
public interface Registry<T> {void registerService(ServiceInstance<T> service) throws Exception;void unregisterService(ServiceInstance<T> service) throws Exception;List<ServiceInstance<T>> queryForInstances(String name) throws Exception;
}
ZooKeeperRegistry 是基于 curator-x-discovery 对 Registry 接口的实现类型,其中封装了之前课时介绍的 ServiceDiscovery,并在其上添加了 ServiceCache 缓存提高查询效率。ZooKeeperRegistry 的具体实现如下:
public class ZookeeperRegistry<T> implements Registry<T> {private Map<String, List<ServiceInstanceListener<T>>> listeners = Maps.newConcurrentMap();private InstanceSerializer serializer = new JsonInstanceSerializer<>(ServerInfo.class);private ServiceDiscovery<T> serviceDiscovery;private ServiceCache<T> serviceCache;private String address = "localhost:2181";public void start() throws Exception {String root = "/demo/rpc";// 初始化CuratorFrameworkCuratorFramework client = CuratorFrameworkFactory.newClient(address, new ExponentialBackoffRetry(1000, 3));client.start(); // 启动Curator客户端// client.createContainers(root);// 初始化ServiceDiscoveryserviceDiscovery = ServiceDiscoveryBuilder.builder(ServerInfo.class).client(client).basePath(root).serializer(serializer).build();serviceDiscovery.start(); // 启动ServiceDiscovery// 创建ServiceCache,监Zookeeper相应节点的变化,也方便后续的读取serviceCache = serviceDiscovery.serviceCacheBuilder().name("/demoService").build();
// client.start(); // 启动Curator客户端client.blockUntilConnected(); // 阻塞当前线程,等待连接成功serviceDiscovery.start(); // 启动ServiceDiscoveryserviceCache.start(); // 启动ServiceCache}@Overridepublic void registerService(ServiceInstance<T> service) throws Exception {serviceDiscovery.registerService(service);}@Overridepublic void unregisterService(ServiceInstance service) throws Exception {serviceDiscovery.unregisterService(service);}@Overridepublic List<ServiceInstance<T>> queryForInstances(String name) throws Exception {// 直接根据name进行过滤ServiceCache中的缓存数据return serviceCache.getInstances().stream().filter(s -> s.getName().equals(name)).collect(Collectors.toList());}
}
通过对 ZooKeeperRegistry的分析可以得知,它是基于 Curator 中的 ServiceDiscovery 组件与 ZooKeeper 进行交互的,并且对 Registry 接口的实现也是通过直接调用 ServiceDiscovery 的相关方法实现的。在查询时,直接读取 ServiceCache 中的缓存数据,ServiceCache 底层在本地维护了一个 ConcurrentHashMap 缓存,通过 PathChildrenCache 监听 ZooKeeper 中各个子节点的变化,同步更新本地缓存。这里我们简单看一下 ServiceCache 的核心实现:
public class ServiceCacheImpl<T> implements ServiceCache<T>,
PathChildrenCacheListener{//实现PathChildrenCacheListener接口// 关联的ServiceDiscovery实例private final ServiceDiscoveryImpl<T> discovery;// 底层的PathChildrenCache,用于监听子节点的变化private final PathChildrenCache cache; // 本地缓存private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();public List<ServiceInstance<T>> getInstances(){ // 返回本地缓存内容return Lists.newArrayList(instances.values());}public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception{switch(event.getType()){case CHILD_ADDED:case CHILD_UPDATED:{addInstance(event.getData(), false); // 更新本地缓存notifyListeners = true;break;}case CHILD_REMOVED:{ // 更新本地缓存instances.remove(instanceIdFromData(event.getData()));notifyListeners = true;break;}}... // 通知ServiceCache上注册的监听器}
}
proxy 相关实现
在简易版 Demo RPC 框架中,Proxy 主要是为 Client 端创建一个代理,帮助客户端程序屏蔽底层的网络操作以及与注册中心之间的交互。
简易版 Demo RPC 使用 JDK 动态代理的方式生成代理,这里需要编写一个 InvocationHandler 接口的实现,即下面的 DemoRpcProxy。其中有两个核心方法:一个是 newInstance() 方法,用于生成代理对象;另一个是 invoke() 方法,当调用目标对象的时候,会执行 invoke() 方法中的代理逻辑。
下面是 DemoRpcProxy 的具体实现:
public class DemoRpcProxy implements InvocationHandler {private String serviceName; // 需要代理的服务(接口)名称public Map<Method, Header> headerCache = new ConcurrentHashMap<>();// 用于与Zookeeper交互,其中自带缓存private Registry<ServerInfo> registry;public DemoRpcProxy(String serviceName,Registry<ServerInfo> registry) throws Exception {this.serviceName = serviceName;this.registry = registry;}public static <T> T newInstance(Class<T> clazz, Registry<ServerInfo> registry) throws Exception {// 创建代理对象return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class[]{clazz},new DemoRpcProxy("demoService", registry));}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 从Zookeeper缓存中获取可用的Server地址,并随机从中选择一个List<ServiceInstance<ServerInfo>> serviceInstances =registry.queryForInstances(serviceName);ServiceInstance<ServerInfo> serviceInstance =serviceInstances.get(ThreadLocalRandom.current().nextInt(serviceInstances.size()));// 创建请求消息,然后调用remoteCall()方法请求上面选定的Server端String methodName = method.getName();Header header = headerCache.computeIfAbsent(method, h -> new Header(MAGIC, VERSION_1));Message<Request> message = new Message(header, new Request(serviceName, methodName, args));return remoteCall(serviceInstance.getPayload(), message);}protected Object remoteCall(ServerInfo serverInfo, Message message) throws Exception {if (serverInfo == null) {throw new RuntimeException("get available server error");}Object result;try {// 创建DemoRpcClient连接指定的Server端DemoRpcClient demoRpcClient = new DemoRpcClient(serverInfo.getHost(), serverInfo.getPort());ChannelFuture channelFuture = demoRpcClient.connect().awaitUninterruptibly();// 创建对应的Connection对象,并发送请求Connection connection = new Connection(channelFuture, true);NettyResponseFuture responseFuture = connection.request(message, Constants.DEFAULT_TIMEOUT);// 等待请求对应的响应result = responseFuture.getPromise().get(Constants.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);} catch (Exception e) {throw e;}return result;}
}
从 DemoRpcProxy 的实现中我们可以看到,它依赖了 ServiceInstanceCache 获取ZooKeeper 中注册的 Server 端地址,同时依赖了 DemoRpcClient 与Server 端进行通信,上层调用方拿到这个代理对象后,就可以像调用本地方法一样进行调用,而不再关心底层网络通信和服务发现的细节。当然,这个简易版 DemoRpcProxy 的实现还有很多可以优化的地方,例如:
- 缓存 DemoRpcClient 客户端对象以及相应的 Connection 对象,不必每次进行创建。
- 可以添加失败重试机制,在请求出现超时的时候,进行重试。
- 可以添加更加复杂和灵活的负载均衡机制,例如,根据 Hash 值散列进行负载均衡、根据节点 load 情况进行负载均衡等。
你若感兴趣的话可以尝试进行扩展,以实现一个更加完善的代理层。
使用方接入
介绍完 Demo RPC 的核心实现之后,下面我们讲解下Demo RPC 框架的使用方式。这里涉及Consumer、DemoServiceImp、Provider三个类以及 DemoService 业务接口。
相应的业务接口和实现比较简单,我们再来看Provider的实现,它的角色类似于 Dubbo 中的 Provider,其会创建 DemoServiceImpl 这个业务 Bean 并将自身的地址信息暴露出去,如下所示:
public class Provider {public static void main(String[] args) throws Exception {// 创建DemoServiceImpl,并注册到BeanManager中BeanManager.registerBean("demoService", new DemoServiceImpl());// 创建ZookeeperRegistry,并将Provider的地址信息封装成ServerInfo// 对象注册到ZookeeperZookeeperRegistry<ServerInfo> discovery = new ZookeeperRegistry<>();discovery.start();ServerInfo serverInfo = new ServerInfo("127.0.0.1", 6666);discovery.registerService(ServiceInstance.<ServerInfo>builder().name("demoService").payload(serverInfo).build());// 启动DemoRpcServer,等待Client的请求DemoRpcServer rpcServer = new DemoRpcServer(6666);rpcServer.start();Thread.sleep(100000000L);}
}
最后是Consumer,它类似于 Dubbo 中的 Consumer,其会订阅 Provider 地址信息,然后根据这些信息选择一个 Provider 建立连接,发送请求并得到响应,这些过程在 Proxy 中都予以了封装,那Consumer 的实现就很简单了,可参考如下示例代码:
public class Consumer {public static void main(String[] args) throws Exception {// 创建ZookeeperRegistr对象ZookeeperRegistry<ServerInfo> discovery = new ZookeeperRegistry<>();discovery.start();// 创建代理对象,通过代理调用远端ServerDemoService demoService = DemoRpcProxy.newInstance(DemoService.class, discovery);// 调用sayHello()方法,并输出结果String result = demoService.sayHello("hello");System.out.println(result);// Thread.sleep(10000000L);}
}
总结
本课时我们首先介绍了简易 RPC 框架中的transport 包,它在上一课时介绍的编解码器基础之上,实现了服务端和客户端的通信能力。之后讲解了registry 包如何实现与 ZooKeeper 的交互,完善了简易 RPC 框架的服务注册与服务发现的能力。接下来又分析了proxy 包的实现,其中通过 JDK 动态代理的方式,帮接入方屏蔽了底层网络通信的复杂性。最后,我们编写了一个简单的 DemoService 业务接口,以及相应的 Provider 和 Consumer 接入简易 RPC 框架。