需求分析
使用netty实现方法远程调用, 在client调用本地接口中方法时, 使用反射进行远程调用, server执行完结果后, 将执行结果进行封装, 发送到client
RPC调用模型:
1. 服务消费方(client)以本地调用方式调用服务
2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
3. client stub 将消息进行编码并发送到服务端
4. server stub 收到消息后进行解码
5. server stub 根据解码结果调用本地的服务
6. 本地服务执行并将结果返回给 server stub
7. server stub 将返回导入结果进行编码并发送至消费方
8. client stub 接收到消息并进行解码
9. 服务消费方(client)得到结果
自定义RPC结构
根据上面执行流程图, 编写代码
定义request消息结构:
/*** 封装class信息, 用于反射过程, 封装client的request*/
public class ClassInfo implements Serializable {private String className;private String methodName;//参数类型private Class<?>[] type;//参数列表private Object[] objects;public ClassInfo(String className, String methodName, Class<?>[] type, Object[] objects) {this.className = className;this.methodName = methodName;this.type = type;this.objects = objects;}public String getClassName() {return className;}public String getMethodName() {return methodName;}public Class<?>[] getType() {return type;}public Object[] getObjects() {return objects;}
}
Client
client接口:
public interface HelloRpc {String hello(String name);
}
client的代理类:
public class NettyRpcProxy {/*** 创建代理对象* @param target* @return*/public static Object create(Class target) {//动态代理, 在代理过程中执行远程数据发送return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target},(proxy, method, args) -> {//定义要调用哪一个方法的信息ClassInfo classInfo = new ClassInfo(target.getName(), method.getName(), method.getParameterTypes(), args);NioEventLoopGroup workGroup = new NioEventLoopGroup();MyClientResultHandler myClientResultHandler = new MyClientResultHandler();Bootstrap bootstrap = new Bootstrap().group(workGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("encoder", new ObjectEncoder());pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));pipeline.addLast(myClientResultHandler);}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();//client进行发送channelFuture.channel().writeAndFlush(classInfo).sync();channelFuture.channel().closeFuture().sync();return myClientResultHandler.getResponse();});}
}
client的handler:
public class MyClientResultHandler extends ChannelInboundHandlerAdapter {private Object response;public Object getResponse() {return response;}//读取从Server发送过来的执行结果@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {response = msg;ctx.close();}
}
Server
server接口(该接口与client的接口是一样的):
public interface HelloRpc {String hello(String name);
}
server接口实现类:
public class HelloRpcImpl implements HelloRpc {@Overridepublic String hello(String name) {return "hello" + name;}
}
server端:
/*** 网络处理服务器*/
public class NettyRpcServer {private final int port;public NettyRpcServer(int port) {this.port = port;}public void start() {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//对象编码器, 底层使用Java序列化, 效率低下, 通常使用protobufpipeline.addLast("encoder", new ObjectEncoder());pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));pipeline.addLast(new MyInvokeHandler());}});try {ChannelFuture channelFuture = serverBootstrap.bind(port).sync();System.out.println("server is ready");channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}public static void main(String[] args) {new NettyRpcServer(9999).start();}}
server的MyInvokeHandler(解析从client发送的内容):
/*** 封装client调用方法后执行结果*/
public class MyInvokeHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//使用反射机制, 调用本地方法实现类, 将方法执行结果封装, 发送至clientClassInfo classInfo = (ClassInfo) msg;Object clazz = Class.forName(this.getImplClassName(classInfo)).newInstance();Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getType());Object result = method.invoke(clazz, classInfo.getObjects());ctx.writeAndFlush(result);}//根据ClassInfo反射获取对应method执行结果private String getImplClassName(ClassInfo classInfo) throws ClassNotFoundException {String interfacePath = "com.regotto.test.netty_test_rpc.server.fun";int lastIndexOf = classInfo.getClassName().lastIndexOf(".");String interfaceName = classInfo.getClassName().substring(lastIndexOf);Class superClass = Class.forName(interfacePath + interfaceName);Reflections reflections = new Reflections(interfacePath);//获得该接口下的所有实现类Set<Class<?>> implClassSet = reflections.getSubTypesOf(superClass);if (implClassSet.size() == 0) {System.out.println("未找到实现类, erro");return null;} else if (implClassSet.size() > 1) {System.out.println("实现类存在多个, 未指明使用哪一个");return null;}return (implClassSet.toArray(new Class[0]))[0].getName();}
}
Client测试类
/*** 测试*/
public class TestNettyRpc {public static void main(String[] args) {//反射调用HelloNetty helloNetty = (HelloNetty) NettyRpcProxy.create(HelloNetty.class);System.out.println(helloNetty.hello());}
}
执行结果:
总结
RPC机制:
1.Client使用动态代理机制, 将本地接口信息编码封装发送至Server
2.Server将接收到的信息解码, 根据反射机制获得方法执行结果, 然后将执行结果编码封装发送至Client
3.Client解析Server发送的执行结果