从零开始手写RPC框架(4)

这一节主要讲述网络传输模块的代码,并且几乎每一行代码都加上了我个人理解的注释,同时也讲述了其中一些以前没见过的函数,和大致的底层运行逻辑。

目录

  • 网络传输实体类
  • 网络传输实现
    • 基于Socket实现网络传输
    • 基于Netty实现网络传输
      • 客户端
      • 服务端

再重新梳理一下RPC的逻辑,当你需要调用远程方法时,你必须通过网络请求将目标类、方法信息以及方法参数等数据发送到服务器端。这就涉及到了网络传输的问题。
对于网络传输的具体实现,你可以选择使用 Socket —— 这是 Java 中最基础且最原始的网络通信方式。然而,Socket 是阻塞IO,其性能较低且功能单一。你也可以选择使用同步非阻塞的 I/O 模型 NIO,但是使用 NIO 进行网络编程可能会比较复杂。因此,你可以考虑使用基于 NIO 的网络编程框架 Netty,它是最佳选择。

网络传输模块整体结构如下:

请添加图片描述

一共被分为了 4 个包:

1. constants : 存放一些网络传输模块共用的常量
2. dto : 用于网络传输的类。
3. handler : 里面只有一个用于处理 rpc 请求的类RpcRequestHandler(根据 rpc 请求调用目标类的目标方法)。
4. transport : 用户网络传输相关类(真正传输网络请求的地方。提供了 Socket 和 Netty 两种网络传输方式)。

下面分别进行介绍。

网络传输实体类

网络传输实体类在 dto 包下,主要有两个类。RpcRequest.java和RpcResponse.java

首先是RpcRequest.java——rpc 请求实体类。当你要调用远程方法的时候,你需要先传输一个 RpcRequest 给对方, RpcRequest里面包含了要调用的目标方法和类的名称、参数等数据。

@AllArgsConstructor
@NoArgsConstructor
@Getter
@Builder
@ToString
public class RpcRequest implements Serializable {private static final long serialVersionUID = 1905122041950251207L;// 序列化版本号private String requestId;// 请求IDprivate String interfaceName;// 接口名称private String methodName;// 方法名称private Object[] parameters;// 参数列表private Class<?>[] paramTypes;// 参数类型列表private String version;// 版本号 主要是为后续不兼容升级提供可能private String group;// 分组 主要用于处理一个接口有多个类实现的情况/*** 获取RPC服务名称** @return 返回接口名称、分组和版本号的组合*/public String getRpcServiceName() {return this.getInterfaceName() + this.getGroup() + this.getVersion();}
}

然后是RpcResponse.java——rpc 响应实体类,当服务端通过 RpcRequest 中的相关数据调用到目标服务的目标方法之后,调用结果就通过RpcResponse 返回给客户端。

@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@Builder
@ToString
public class RpcResponse<T> implements Serializable {private static final long serialVersionUID = 715745410605631233L;// 序列化版本号private String requestId;// 请求IDprivate Integer code;// 响应码private String message;// 响应消息private T data;// 响应体/*** 成功响应* @param data 响应数据* @param requestId 请求ID* @return 返回一个包含成功响应码、消息和请求ID的响应对象*/public static <T> RpcResponse<T> success(T data, String requestId) {RpcResponse<T> response = new RpcResponse<>();response.setCode(RpcResponseCodeEnum.SUCCESS.getCode());response.setMessage(RpcResponseCodeEnum.SUCCESS.getMessage());response.setRequestId(requestId);if (null != data) {response.setData(data);}return response;}/*** 失败响应* @param rpcResponseCodeEnum 响应码枚举* @return 返回一个包含失败响应码和消息的响应对象*/public static <T> RpcResponse<T> fail(RpcResponseCodeEnum rpcResponseCodeEnum) {RpcResponse<T> response = new RpcResponse<>();response.setCode(rpcResponseCodeEnum.getCode());response.setMessage(rpcResponseCodeEnum.getMessage());return response;}
}



网络传输实现

这部分基于 Socket,和基于 Netty 的网络传输方式实现。因此,先定义一个发送 RPC 请求的顶层接口,然后分别使用 Socket 和 Netty 两种方式对这个
接口进行实现即可,RpcRequestTransport.java 传输请求的接口:

@SPI
public interface RpcRequestTransport {/*** send rpc request to server and get result** @param rpcRequest message body* @return data from server*/Object sendRpcRequest(RpcRequest rpcRequest);
}

其中@SPI 是一个自定义注解

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface SPI {
}

在 Java 中,SPI 是一种服务发现机制。它允许第三方为应用程序提供插件或模块。在运行时,应用程序可以查询哪些插件或模块可用,并选择其中之一进行调用。
具体到我们的代码中,@SPI 被用于标记 RpcRequestTransport 接口。这意味着 RpcRequestTransport 的实现可以由第三方提供,并在运行时动态加载。这样,RPC 框架就可以支持多种传输协议,比如 HTTP、TCP、UDP 等,只要有相应的 RpcRequestTransport 实现即可。

下面,我们先来看一下比较简单点的使用 Socket 进行网络传输的方式。

基于Socket实现网络传输

客户端

客户端主要用于发送网络请求到服务端(目标方法所在的服务器)。当我们知道了服务端的地址之后,我们就可以通过 SocketRpcClient 发送 rpc 请求(RpcRequest) 到服务端了(如果我们要找到服务端的地址,涉及到了注册中心相关的知识,下一节会介绍)

@AllArgsConstructor
@Slf4j
public class SocketRpcClient implements RpcRequestTransport {private final ServiceDiscovery serviceDiscovery;// 服务发现组件/*** 构造函数* 默认使用 ZooKeeper 作为服务发现组件*/public SocketRpcClient() {//ExtensionLoader 是一个用于加载扩展实现的工具类,它实现了一种称为 SPI(Service Provider Interface)的设计模式。//getExtensionLoader(ServiceDiscovery.class):获取 ServiceDiscovery 接口的 ExtensionLoader。如果缓存中没有,就创建一个新的 ExtensionLoader 并放入缓存。//getExtension(ServiceDiscoveryEnum.ZK.getName()):从 ExtensionLoader 中获取名为 ServiceDiscoveryEnum.ZK.getName() 的扩展实现。如果缓存中没有,就创建一个新的实例并放入缓存。this.serviceDiscovery = ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension(ServiceDiscoveryEnum.ZK.getName());}/*** 发送 RPC 请求** @param rpcRequest RPC 请求* @return 服务端返回的数据*/@Overridepublic Object sendRpcRequest(RpcRequest rpcRequest) {InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);// 通过服务发现组件获取服务端地址try (Socket socket = new Socket()) {socket.connect(inetSocketAddress);// 连接到服务端ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());// 通过输出流向服务端发送数据objectOutputStream.writeObject(rpcRequest);ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());// 通过输入流读取服务端返回的数据return objectInputStream.readObject();} catch (IOException | ClassNotFoundException e) {throw new RpcException("调用服务失败:", e);}}
}

服务端

Socket 服务端。用于等待客户端连接。当客户端成功连接之后,就可以发送 rpc 请求( RpcRequest ) 到服务端了。然后,服务端拿到 RpcRequest 就会去执行对应的方法。执行完对应的方法之后,就把执行得到的结果放在RpcResponse 中返回给客户端。

/*** Socket 服务端* 用于等待客户端连接并处理请求*/
@Slf4j
public class SocketRpcServer {private final ExecutorService threadPool;// 线程池,用于处理客户端请求private final ServiceProvider serviceProvider;// 服务提供者,用于注册和查找服务public SocketRpcServer() {threadPool = ThreadPoolFactoryUtil.createCustomThreadPoolIfAbsent("socket-server-rpc-pool");serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);}/*** 注册服务* @param rpcServiceConfig 服务配置*/public void registerService(RpcServiceConfig rpcServiceConfig) {serviceProvider.publishService(rpcServiceConfig);}/*** 启动服务* 开始监听客户端连接并处理请求*/public void start() {try (ServerSocket server = new ServerSocket()) { // 创建一个新的 ServerSocketString host = InetAddress.getLocalHost().getHostAddress();// 获取本地主机的 IP 地址server.bind(new InetSocketAddress(host, PORT));// 将 ServerSocket 绑定到指定的 IP 地址和端口号CustomShutdownHook.getCustomShutdownHook().clearAll();// 添加一个自定义的关闭钩子,当 JVM 关闭时,这个关闭钩子会执行 clearAll 方法//clearAll()方法为了告诉其他 RPC 服务或客户端,当前服务器已经不再提供服务,会清除注册中心中的当前服务器信息,并且关闭所有线程池,释放资源。Socket socket;while ((socket = server.accept()) != null) {// 循环接受客户端的连接log.info("client connected [{}]", socket.getInetAddress());// 记录客户端的 IP 地址threadPool.execute(new SocketRpcRequestHandlerRunnable(socket));// 将新的客户端连接提交给线程池处理}threadPool.shutdown();// 关闭线程池} catch (IOException e) {log.error("occur IOException:", e);}}
}

其中SocketRpcRequestHandlerRunnable.java的代码如下

/*** Socket 请求处理器* 用于处理来自客户端的请求*/
@Slf4j
public class SocketRpcRequestHandlerRunnable implements Runnable {private final Socket socket;// 客户端 Socket 连接private final RpcRequestHandler rpcRequestHandler;// RPC 请求处理器public SocketRpcRequestHandlerRunnable(Socket socket) {this.socket = socket;this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);}/*** 处理客户端请求*/@Overridepublic void run() {log.info("server handle message from client by thread: [{}]", Thread.currentThread().getName());// 记录当前线程的名称try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();// 从输入流读取 RpcRequest 对象Object result = rpcRequestHandler.handle(rpcRequest);// 处理 RpcRequest,得到处理结果objectOutputStream.writeObject(RpcResponse.success(result, rpcRequest.getRequestId()));// 将处理结果包装成 RpcResponse,写入到输出流objectOutputStream.flush();// 刷新输出流,确保 RpcResponse 对象被发送到客户端} catch (IOException | ClassNotFoundException e) {log.error("occur exception:", e);}}}

基于Netty实现网络传输

Netty 这部分的原理也差不多,不过实现代码差别很大。

客户端

Netty 客户端NettyClient.java主要提供了:

  • doConnect() :用于连接服务端(目标方法所在的服务器)并返回对应的 Channel 。当我们知道了服务端的地址之后,我们就可以通过 NettyClient 成功连接服务端了。(有了Channel 之后就能发送数据到服务端了)
  • sendRpcRequest() : 用于传输 rpc 请求( RpcRequest ) 到服务端。
@Slf4j
public final class NettyRpcClient implements RpcRequestTransport {private final ServiceDiscovery serviceDiscovery;// 服务发现组件private final UnprocessedRequests unprocessedRequests;// 未处理的请求private final ChannelProvider channelProvider;// 通道提供者private final Bootstrap bootstrap;// Netty 启动类private final EventLoopGroup eventLoopGroup;// Netty 事件循环组public NettyRpcClient() {//构造函数初始化资源,如 EventLoopGroup、Bootstrap 等eventLoopGroup = new NioEventLoopGroup();bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup)// 设置事件循环组.channel(NioSocketChannel.class)// 设置通道类型为 NioSocketChannel.handler(new LoggingHandler(LogLevel.INFO))// 添加日志处理器// 设置连接超时时间,如果超过这个时间还未连接成功,则连接失败.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {// 添加通道初始化器ChannelPipeline p = ch.pipeline();// 如果在 15 秒内没有向服务器发送数据,就发送一个心跳请求p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));p.addLast(new RpcMessageEncoder());// 添加 RPC 消息编码器p.addLast(new RpcMessageDecoder());// 添加 RPC 消息解码器p.addLast(new NettyRpcClientHandler());// 添加 Netty RPC 客户端处理器}});// 获取服务发现组件的实例this.serviceDiscovery = ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension(ServiceDiscoveryEnum.ZK.getName());// 获取未处理的请求的实例this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);// 获取通道提供者的实例this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class);}/*** 连接服务端并返回对应的 Channel* @param inetSocketAddress 服务端地址* @return 服务端的 Channel*/@SneakyThrowspublic Channel doConnect(InetSocketAddress inetSocketAddress) {// 创建一个 CompletableFuture 对象,用于存储 ChannelCompletableFuture<Channel> completableFuture = new CompletableFuture<>();// 使用 Bootstrap 的 connect 方法连接到服务端bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {if (future.isSuccess()) {// 如果连接成功记录连接成功的日志log.info("The client has connected [{}] successful!", inetSocketAddress.toString());completableFuture.complete(future.channel());// 将 Channel 存入 CompletableFuture} else {throw new IllegalStateException();}});return completableFuture.get();// 返回 CompletableFuture 中的 Channel}/*** 发送 RPC 请求* @param rpcRequest RPC 请求* @return 服务端返回的数据*/@Overridepublic Object sendRpcRequest(RpcRequest rpcRequest) {// 创建 CompletableFuture 对象,用于存储 RPC 响应的结果CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();// 通过服务发现组件获取服务端的地址InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);// 获取与服务端地址相关的通道Channel channel = getChannel(inetSocketAddress);if (channel.isActive()) {// 检查通道是否活跃// 将未处理的请求放入 unprocessedRequestsunprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);// 创建 RPC 消息对象RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest).codec(SerializationTypeEnum.HESSIAN.getCode()).compress(CompressTypeEnum.GZIP.getCode()).messageType(RpcConstants.REQUEST_TYPE).build();// 将 RPC 消息对象写入通道并刷新,同时添加一个监听器处理发送失败的情况channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (future.isSuccess()) {log.info("client send message: [{}]", rpcMessage);} else {// 如果消息发送失败,关闭通道,并将异常存入 resultFuturefuture.channel().close();resultFuture.completeExceptionally(future.cause());log.error("Send failed:", future.cause());}});} else {throw new IllegalStateException();}return resultFuture; // 返回 CompletableFuture 对象}/*** 获取通道* @param inetSocketAddress 服务端地址* @return 与服务端地址相关的通道*/public Channel getChannel(InetSocketAddress inetSocketAddress) {Channel channel = channelProvider.get(inetSocketAddress);if (channel == null) {channel = doConnect(inetSocketAddress);channelProvider.set(inetSocketAddress, channel);}return channel;}// 优雅地关闭事件循环组,即在关闭事件循环组之前,会等待所有任务都完成,包括正在执行的任务和提交的但还未执行的任务public void close() {eventLoopGroup.shutdownGracefully();}
}

CompletableFuture 是 Java 8 引入的一个类,它实现了 Future 和 CompletionStage 接口,提供了一种异步编程的方式。Future 是 Java 5 引入的一个接口,用于表示异步计算的结果。但是,Future 的功能比较有限,例如,它无法表示计算完成后的回调,也无法组合多个 Future 的结果。CompletableFuture 弥补了这些不足,提供了丰富的方法来处理异步计算的结果。


在我们的代码中,CompletableFuture 被用于存储 RPC 响应的结果。当 RPC 响应返回时,CompletableFuture 的 complete 方法被调用,将结果存入 CompletableFuture。然后,可以通过 CompletableFuture 的 get 方法来获取结果。如果结果还未返回,get 方法会阻塞,直到结果返回为止。


其中UnprocessedRequests.java用于存放未被服务端处理的请求(建议限制 map 容器大小,避免未处理请求过多 OOM)。

public class UnprocessedRequests {// 存储未处理的请求private static final Map<String, CompletableFuture<RpcResponse<Object>>> UNPROCESSED_RESPONSE_FUTURES = new ConcurrentHashMap<>();/*** 将未处理的请求放入 UNPROCESSED_RESPONSE_FUTURES* @param requestId 请求 ID* @param future 未来的 RPC 响应*/public void put(String requestId, CompletableFuture<RpcResponse<Object>> future) {UNPROCESSED_RESPONSE_FUTURES.put(requestId, future);}/*** 完成 RPC 响应* @param rpcResponse RPC 响应*/public void complete(RpcResponse<Object> rpcResponse) {// 从 UNPROCESSED_RESPONSE_FUTURES 中移除请求并获取对应的 CompletableFutureCompletableFuture<RpcResponse<Object>> future = UNPROCESSED_RESPONSE_FUTURES.remove(rpcResponse.getRequestId());// 如果 CompletableFuture 存在,则完成它if (null != future) {future.complete(rpcResponse);} else {throw new IllegalStateException();}}
}

自定义客户端 ChannelHandler 用于处理服务器返回的数据。下面是NettyRpcClientHandler.java

@Slf4j
public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {private final UnprocessedRequests unprocessedRequests;// 未处理的请求private final NettyRpcClient nettyRpcClient;// Netty RPC 客户端public NettyRpcClientHandler() {this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);this.nettyRpcClient = SingletonFactory.getInstance(NettyRpcClient.class);}/*** 读取服务器传输的消息*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 当从服务器接收到一条消息时被调用try {log.info("client receive msg: [{}]", msg);if (msg instanceof RpcMessage) {// 如果消息是RpcMessage类型RpcMessage tmp = (RpcMessage) msg;byte messageType = tmp.getMessageType();if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) {// 如果是心跳响应类型log.info("heart [{}]", tmp.getData());} else if (messageType == RpcConstants.RESPONSE_TYPE) {// 如果是响应类型RpcResponse<Object> rpcResponse = (RpcResponse<Object>) tmp.getData();// 获取响应数据unprocessedRequests.complete(rpcResponse);// 完成未处理的请求}}} finally {ReferenceCountUtil.release(msg);// 释放消息资源}}//userEventTriggered也是netty的一个回调方法 当发生空闲状态事件时被调用@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {// 如果事件是空闲状态事件IdleState state = ((IdleStateEvent) evt).state();// 获取空闲状态if (state == IdleState.WRITER_IDLE) {// 如果是写空闲状态log.info("write idle happen [{}]", ctx.channel().remoteAddress());// 记录写空闲事件Channel channel = nettyRpcClient.getChannel((InetSocketAddress) ctx.channel().remoteAddress());// 获取与远程地址相关的通道RpcMessage rpcMessage = new RpcMessage();// 创建一个新的RpcMessagerpcMessage.setCodec(SerializationTypeEnum.PROTOSTUFF.getCode());// 设置编解码类型rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());// 设置压缩类型rpcMessage.setMessageType(RpcConstants.HEARTBEAT_REQUEST_TYPE);// 设置消息类型为心跳请求类型rpcMessage.setData(RpcConstants.PING);// 设置数据为PINGchannel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);// 将消息写入并刷新到通道,如果失败则关闭通道}} else {super.userEventTriggered(ctx, evt);}}/*** 当处理客户端消息时发生异常时调用*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("client catch exception:", cause);cause.printStackTrace();// 打印异常堆栈跟踪ctx.close();}}

在Netty中,空闲状态事件是指在一段时间内没有进行读操作、写操作或者两者都没有进行的情况。这是通过IdleStateHandler来检测的,它会在指定的空闲时间后触发一个IdleStateEvent事件。
空闲状态有三种类型:

读空闲(READER_IDLE):在一段时间内没有读取到对方的数据,也就是说,如果在指定的时间内没有接收到对方的数据,那么就会触发这个事件。
写空闲(WRITER_IDLE):在一段时间内没有向对方写数据,也就是说,如果在指定的时间内没有向对方发送数据,那么就会触发这个事件。
读写空闲(ALL_IDLE):在一段时间内既没有读取到对方的数据,也没有向对方写数据,也就是说,如果在指定的时间内既没有接收到对方的数据,也没有向对方发送数据,那么就会触发这个事件。

在我们的代码中,当发生写空闲事件时,客户端会向服务器发送一个心跳请求。这是因为,如果客户端在一段时间内没有向服务器发送数据,可能会导致服务器认为客户端已经断开连接,从而关闭连接。为了保持连接的活跃,当发生写空闲事件时,客户端会向服务器发送一个心跳请求,告诉服务器它还在。这样,即使在没有数据交换的情况下,客户端和服务器之间的连接也能保持活跃。这就是心跳机制的作用。

从代码中,可以看出当 rpc 请求被成功处理(客户端收到服务端的执行结果)之后,我们调用了unprocessedRequests.complete(rpcResponse) 方法,这样的话,你只需要通过下面的方式就能成
功接收到服务端返回的结果。

CompletableFuture<RpcResponse> completableFuture =(CompletableFuture<RpcResponse>) clientTransport.sendRpcRequest(rpcRequest);
rpcResponse = completableFuture.get();

然后是ChannelProvider.java用于存放Channel ( Channel 用于在服务端和客户端之间传输数据)。

@Slf4j
public class ChannelProvider {private final Map<String, Channel> channelMap;// 存储通道的映射public ChannelProvider() {channelMap = new ConcurrentHashMap<>();}/*** 获取通道* @param inetSocketAddress 服务端地址* @return 与服务端地址相关的通道*/public Channel get(InetSocketAddress inetSocketAddress) {String key = inetSocketAddress.toString();// 将服务端地址转换为字符串作为键// 判断是否存在对应地址的连接if (channelMap.containsKey(key)) {Channel channel = channelMap.get(key);// 如果存在,则判断连接是否可用,如果可用,则直接获取if (channel != null && channel.isActive()) {return channel;} else {channelMap.remove(key);}}return null;}/*** 设置通道* @param inetSocketAddress 服务端地址* @param channel 通道*/public void set(InetSocketAddress inetSocketAddress, Channel channel) {String key = inetSocketAddress.toString();channelMap.put(key, channel);}/*** 移除通道* @param inetSocketAddress 服务端地址*/public void remove(InetSocketAddress inetSocketAddress) {String key = inetSocketAddress.toString();channelMap.remove(key);log.info("Channel map size :[{}]", channelMap.size());}
}




服务端

NettyRpcServer.java,Netty 服务端,监听客户端的连接。另外,还提供了两个用户手动注册服务的方法(还可以通过注解RpcService 注册服务,这个后面也会介绍到)。

@Slf4j
@Component
public class NettyRpcServer {public static final int PORT = 9998;// 服务端口// 服务提供者,用于注册和查找服务private final ServiceProvider serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);/*** 注册服务* @param rpcServiceConfig 服务配置*/public void registerService(RpcServiceConfig rpcServiceConfig) {serviceProvider.publishService(rpcServiceConfig);}/*** 启动服务*/@SneakyThrows// Lombok提供的注解,用于处理所有受检异常在方法体中自动捕获并处理异常,将异常转换为非受检异常(Unchecked Exception)并抛出。public void start() {// 添加一个自定义的关闭钩子,当 JVM 关闭时,这个关闭钩子会执行 clearAll 方法//clearAll()方法为了告诉其他 RPC 服务或客户端,当前服务器已经不再提供服务,会清除注册中心中的当前服务器信息,并且关闭所有线程池,释放资源。CustomShutdownHook.getCustomShutdownHook().clearAll();// 获取自定义关闭钩子实例并调用其clearAll方法String host = InetAddress.getLocalHost().getHostAddress();// 获取本地主机地址EventLoopGroup bossGroup = new NioEventLoopGroup(1);//创建一个bossGroup,它负责接收客户端的连接,指定线程数为1EventLoopGroup workerGroup = new NioEventLoopGroup();// 创建一个workerGroup,它负责处理已接受的连接DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup(RuntimeUtil.cpus() * 2,// 创建一个线程数为CPU核数的两倍的DefaultEventExecutorGroup// 使用ThreadPoolFactoryUtil创建一个线程工厂ThreadPoolFactoryUtil.createThreadFactory("service-handler-group", false));try {ServerBootstrap b = new ServerBootstrap();// 创建一个ServerBootstrap实例b.group(bossGroup, workerGroup)// 设置bossGroup和workerGroup.channel(NioServerSocketChannel.class)// 设置通道为NioServerSocketChannel// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。.childOption(ChannelOption.TCP_NODELAY, true)// 设置TCP_NODELAY为true,禁用Nagle算法// 是否开启 TCP 底层心跳机制.childOption(ChannelOption.SO_KEEPALIVE, true)// 设置SO_KEEPALIVE为true,开启TCP底层心跳机制//表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数.option(ChannelOption.SO_BACKLOG, 128).handler(new LoggingHandler(LogLevel.INFO))// 添加一个日志处理器// 当客户端第一次进行请求的时候才会进行初始化.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline p = ch.pipeline();// 获取通道的管道// 添加一个空闲状态处理器,如果30秒内没有收到客户端的请求,就关闭连接p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));p.addLast(new RpcMessageEncoder());// 添加一个RpcMessage编码器p.addLast(new RpcMessageDecoder());// 添加一个RpcMessage解码器p.addLast(serviceHandlerGroup, new NettyRpcServerHandler());// 添加一个NettyRpcServerHandler处理器}});ChannelFuture f = b.bind(host, PORT).sync(); // 绑定主机和端口,并同步等待绑定成功// 等待服务端监听端口关闭f.channel().closeFuture().sync();} catch (InterruptedException e) {// 记录启动服务器时发生的异常log.error("occur exception when start server:", e);} finally {log.error("shutdown bossGroup and workerGroup");// 优雅地关闭事件循环组,即在关闭事件循环组之前,会等待所有任务都完成,包括正在执行的任务和提交的但还未执行的任务bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();serviceHandlerGroup.shutdownGracefully();}}
}


然后是NettyServerHandler.java,自定义服务端ChannelHandler 用于处理客户端发送的数据。当客户端发的 rpc 请求( RpcRequest ) 来了之后,服务端就会处理 rpc 请求( RpcRequest ) ,处理完之后就把得到 rpc 相应( RpcResponse )传输给客户端。

@Slf4j
public class NettyRpcServerHandler extends ChannelInboundHandlerAdapter {private final RpcRequestHandler rpcRequestHandler;//RpcRequestHandler实例,用于处理Rpc请求public NettyRpcServerHandler() {this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 当从客户端接收到一条消息时被调用try {if (msg instanceof RpcMessage) {// 如果消息是RpcMessage类型log.info("server receive msg: [{}] ", msg);byte messageType = ((RpcMessage) msg).getMessageType();// 获取消息类型RpcMessage rpcMessage = new RpcMessage();// 创建一个新的RpcMessage用作响应rpcMessage.setCodec(SerializationTypeEnum.HESSIAN.getCode());// 设置编解码类型rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());// 设置压缩类型if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {// 如果是心跳请求类型rpcMessage.setMessageType(RpcConstants.HEARTBEAT_RESPONSE_TYPE);// 设置响应消息类型为心跳响应类型rpcMessage.setData(RpcConstants.PONG);//设置数据为PONG} else {//不是心跳请求类型 说明是rpc请求RpcRequest rpcRequest = (RpcRequest) ((RpcMessage) msg).getData();// 获取请求数据Object result = rpcRequestHandler.handle(rpcRequest);// 执行目标方法(客户端需要执行的方法)并返回方法结果log.info(String.format("server get result: %s", result.toString()));// 记录获取到的结果rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);// 设置消息类型为响应类型if (ctx.channel().isActive() && ctx.channel().isWritable()) {// 如果通道是活跃的并且是可写的RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId());rpcMessage.setData(rpcResponse);// 创建一个成功的Rpc响应并设置响应数据} else {// 如果通道是不活跃的或者不可写的RpcResponse<Object> rpcResponse = RpcResponse.fail(RpcResponseCodeEnum.FAIL);rpcMessage.setData(rpcResponse);// 创建一个失败的Rpc响应并// 设置响应数据log.error("not writable now, message dropped");}}// 将消息写入并刷新到通道,如果失败则关闭通道ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}} finally {//释放消息资源仿真内存泄露ReferenceCountUtil.release(msg);}}// 当发生用户事件时被调用即一段时间内没有读取到客户端的数据,那么就关闭连接。@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {// 如果事件是空闲状态事件IdleState state = ((IdleStateEvent) evt).state();// 获取空闲状态if (state == IdleState.READER_IDLE) {// 如果是读空闲状态log.info("idle check happen, so close the connection");ctx.close();// 关闭通道处理上下文}} else {super.userEventTriggered(ctx, evt);// 如果不是空闲状态事件,调用父类的userEventTriggered方法//父类的这个方法默认实现是不做任何事情但,如果有其他的ChannelInboundHandler在管道中,// 并且这个ChannelInboundHandler重写了userEventTriggered方法,那么这个方法就会被调用。}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 当处理客户端消息时发生异常时调用log.error("server catch exception");// 记录异常信息cause.printStackTrace();// 打印异常堆栈跟踪ctx.close();// 关闭通道处理上下文}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/719029.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【JavaEE进阶】 Linux常用命令

文章目录 &#x1f343;前言&#x1f334;ls 与 pwd&#x1f6a9;ls&#x1f6a9;pwd &#x1f38d;cd&#x1f6a9;认识Linux目录结构 &#x1f340;touch与cat&#x1f6a9;touch&#x1f6a9;cat &#x1f332;mkdir与rm&#x1f6a9;mkdir&#x1f6a9;rm &#x1f384;cp与…

GD库没有安装FreeType 支持Call to undefined function App\Services\imagettfbbox()

GD库是一个功能强大的图像处理库&#xff0c;广泛用于生成和处理图像。然而&#xff0c;默认情况下&#xff0c;GD库不包含FreeType扩展&#xff0c;该扩展用于处理字体和文本。如果您需要在GD库中使用更多的字体和文本效果&#xff0c;您可以按照以下步骤安装和启用FreeType扩…

十五、单词造句

描述 GG Bond在和妹妹做一个游戏&#xff0c;GG Bond给定了妹妹一些单词字符串&#xff0c;他想让妹妹把这些单词拼接成以空格间隔开的句子&#xff0c;很可惜妹妹Python没有学好&#xff0c;你能使用join函数帮帮她吗&#xff1f; 输入描述&#xff1a; 多行输入多个字符串…

Java基础 - 7 - 常用API(二)

API&#xff08;全称 Application Programming Interface&#xff1a;应用程序编程接口&#xff09; API就是Java帮我们已经写好的一些程序&#xff0c;如类、方法等&#xff0c;可以直接拿过来用 JDK8 API文档&#xff1a;Java Platform SE 8 一. Object Object类的作用 Ob…

mybatis多数据源切换

1.前提 项目中有可能需要去其他的数据库取其他的表的信息 2.思路 2.1 直接使用原生jdbc&#xff08;不推荐&#xff09; 2.2 不使用我们全局配置的mybatis&#xff0c;对指定文件夹下使用我们指定的Session 3.解决办法 指定该配置的范围 package com.maycur.openapi.dao.my…

『Linux从入门到精通』第 ㉓ 期 - 管道

文章目录 &#x1f490;专栏导读&#x1f490;文章导读&#x1f427;进程间通信的目的&#x1f427;如何进行进程间通信&#x1f427;进程间通信的分类&#x1f427;管道&#x1f426;什么是管道&#x1f426;管道原理 &#x1f427;实例代码&#x1f427;管道的特点&#x1f4…

Protobuf学习笔记以及序列化的一些概念要点(暂放C++笔记专栏)

Protobuf学习笔记以及序列化的一些概念要点 —— 杭州 2024-03-03 文章目录 Protobuf学习笔记以及序列化的一些概念要点1.Protobuf概念2.实际测试2.1.准备一个test.proto2.2.使用 protoc 命令行工具来编译一个 Protocol Buffers 文件 test.proto3.3.创建一个main.cpp写C++代码…

mysql 事务的隔离级别

一、事务的隔离级别要解决的问题&#xff1a; 1&#xff09;脏读&#xff1a;读到了其它事务未提交的数据即脏读&#xff0c;未提交意味着数据有可能会被回滚&#xff0c;也就是最终有可能不会存储到数据库中&#xff0c;即读到了最终不一定存在存在的数据&#xff0c;即为脏读…

如何选择程序员职业赛道:挑战与机遇并存的职业探索指南

程序员如何选择职业赛道&#xff1f; 作为程序员&#xff0c;选择职业赛道是一项重要的决策&#xff0c;不仅影响你的职业发展&#xff0c;也影响着你的工作生活。本文将为你介绍如何选择程序员职业赛道&#xff0c;以及每个方向的特点、挑战和机遇&#xff0c;帮助你做出明智…

《极客时间 - 左耳听风》【文章笔记 + 个人思考】

《极客时间 - 左耳听风》 原文链接 &#xff1a;https://time.geekbang.org/column/intro/100002201?tabcatalog 备注&#xff1a;加粗部分为个人思考 01 | 程序员如何用技术变现&#xff1f;&#xff08;上&#xff09; 备注&#xff1a;加粗部分为个人思考) 01 | 程序员如何…

Window系统部署Splunk Enterprise并结合内网穿透实现远程访问本地服务

文章目录 前言1. 搭建Splunk Enterprise2. windows 安装 cpolar3. 创建Splunk Enterprise公网访问地址4. 远程访问Splunk Enterprise服务5. 固定远程地址 前言 本文主要介绍如何简单几步&#xff0c;结合cpolar内网穿透工具实现随时随地在任意浏览器&#xff0c;远程访问在本地…

【24最新版PythonPycharm安装教程】小白保姆级别安装教程

今天&#xff0c;我就来教大家一下&#xff0c;如何去安装Python&#xff01; 需要博主打包好的一键激活版Pycharm&&Python也可扫下方直接获取 ​ 1 了解Python Python是一种面向对象的解释型计算机程序设计语言&#xff0c;由荷兰人Guido van Rossum于1989年发明&…

[C++]使用纯opencv去部署yolov9的onnx模型

【介绍】 部署 YOLOv9 ONNX 模型在 OpenCV 的 C 环境中涉及一系列步骤。以下是一个简化的部署方案概述&#xff0c;以及相关的文案。 部署方案概述&#xff1a; 模型准备&#xff1a;首先&#xff0c;你需要确保你有 YOLOv9 的 ONNX 模型文件。这个文件包含了模型的结构和权…

Flutter Gradle下载失败的解决方案

Flutter Gradle可能会由于网络原因下载失败,这个时候我们可以首先下载Gradle&#xff0c;然后再进行配置。具体步骤如下&#xff1a; 第一步&#xff1a;下载对应版本的gradle 可以通过下面地址下载&#xff0c;也可以百度里面搜对应的版本 【极速下载】gradle各版本快速下载地…

【HTML】HTML基础2(一些常用标签)

目录 例子 首先是网页图标 然后是一些常用标签 插入图片 例子 <!DOCTYPE html> <html><head><link rel"icon" href"img/银河护卫队-星爵.png" type"image/x-icon"><meta charset"utf-8"><title>…

大数据的分类分级管理

一.背景 为了公司给的师带徒&#xff0c;为培训写点材料。让徒弟做事情要有章法&#xff0c;有行业视野&#xff0c;知道方向和资料从哪里去找。 二.参考标准 要管理企业的大数据&#xff0c;从什么地方开始呢&#xff1f;首先应该完成企业数据的分类、分级&#xff0c;或者参…

一文掌握python常用的dict(字典)操作

目录 一、字典的创建与基本特性 1.创建字典 2.字典的基本特性 二、字典的常用操作 1.访问字典中的值 2.添加、修改键值对 3.删除键值对 4.获取字典中的所有键、值和键值对 5.遍历字典 6.查找键是否存在 7.使用 get() 方法获取值 8.合并字典 9.字典排序 10.使用字…

如何限制一个账号只在一处登陆

大家好&#xff0c;我是广漂程序员DevinRock&#xff01; 1. 需求分析 前阵子&#xff0c;和问答群里一个前端朋友&#xff0c;随便唠了唠。期间他问了我一个问题&#xff0c;让我印象深刻。 他问的是&#xff0c;限制同一账号只能在一处设备上登录&#xff0c;是如何实现的…

Javascript:数组的使用

目录 一、前言 二、正文 三、结语 一、前言 为了能存储多个数据&#xff0c;我们接下来介绍数组。 二、正文 数组是一种将一组数据存储在单个变量名下的方式。 let arr[] 声明语法: let 数组名[数据1,数据2,..数据n] 例 let names[小明,小刚,小红,小丽] 数组是按顺序保存…

【大厂AI课学习笔记NO.56】(9)模型评测

作者简介&#xff1a;giszz&#xff0c;腾讯云人工智能从业者TCA认证&#xff0c;信息系统项目管理师。 博客地址&#xff1a;https://giszz.blog.csdn.net 声明&#xff1a;本学习笔记来自腾讯云人工智能课程&#xff0c;叠加作者查阅的背景资料、延伸阅读信息&#xff0c;及学…