六. 自定义协议
1. 需求分析
在目前的RPC框架中,采用Vert.x的HttpServer作为服务提供者的Web服务器,底层使用HTTP协议进行网络传输。但HTTP协议只是RPC框架网络传输的一种可选方式,还有其它更好的选择。
RPC框架注重性能,但HTTP协议中的头部信息、请求响应等复杂且繁重,会影响网络传输性能。
查看任意一个网页,即可发现大量响应标头和请求头信息,
因此,通过自定义一套RPC协议,来实现性能更高且更安全的RPC框架。
2. 设计方案
(1)自定义网络传输
目标:选择一个能够高性能通信的网络协议和传输方式。
由于HTTP协议头部信息较重,会影响传输性能,并且其本身属于无状态协议,即每个HTTP请求相互独立,每次响应都需要重新建立和关闭连接,也会影响性能。此外,HTTP属于应用层协议,性能不如传输层的TCP协议高。
因此,采用TCP协议进行网络传输,已追求更高性能与灵活性。
(2)自定义消息结构
目标:用最少的空间传递需要的信息。
最少的空间:
选择轻量级类型,如byte字节类型,占用1个字节,8个bit。
而其它常用的数据类型,如整型int,占用4个字节,32个bit;长整型long,占用8个字节,64个bit;浮点型float,占用4个字节等。这些类型相对较重,占用字节数较多。
需要的信息:
分析HTTP请求结构,能够得到RPC消息所需的信息,
- 魔数:用来安全校验,防止服务器处理非框架消息(类似于HTTPS的安全证书)。
- 版本号:保证请求和响应的一致性(类似于HTTP协议的1.0/2.0版本)。
- 序列化方式:告诉服务端和客户端如何解析数据(类似于HTTP的Content—Type)。
- 类型:标记消息是Request或Response,或者是heartBeat(类似于HTTP的请求响应头)。
- 状态:如果是响应,记录响应的结果(类似于HTTP的200状态码)。
- 请求id:用于唯一标识请求,因为TCP是双向通信,需要有唯一标识来追踪每个请求。
- 请求体:即数据内容(类似于HTTP请求中发送的RpcRequest)。
- 请求体数据长度:保证完整地获取到请求体内容。
HTTP协议有专门的key/value结构,比较容易找到完整的请求体数据。但TCP协议本身存在半包和粘包问题,每次传输的数据可能是不完整的,因此需要在消息头中增加一个字段请求体数据长度,保证能够完整地获取到信息内容。
综上,自定义消息结构设计如下,
这种消息结构本质上就是拼接在一起的一个字节数组。通过这种方式,我们不需要额外记录头部信息,而通过读取某个或某段字节来获取到具体内容。比如读取第一个字节,得到魔数。
该协议设计参考了Dubbo的协议架构,
3. 具体实现
(1)消息结构
创建protocol包,将所有与自定义协议有关的代码都放在该包下。
创建协议消息类ProtocolMessage。
消息头封装为内部类,消息体使用泛型:
package com.khr.krpc.protocol;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** 协议消息结构*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProtocolMessage<T> {/*** 消息头*/private Header header;/*** 消息体(请求或响应对象)*/private T body;/*** 协议消息头*/@Datapublic static class Header {/*** 魔数,保证安全性*/private byte magic;/*** 版本号*/private byte version;/*** 序列化器*/private byte serializer;/*** 消息类型(请求/响应)*/private byte type;/*** 状态*/private byte status;/*** 请求 id*/private long requestId;/*** 消息体长度*/private int bodyLength;}
}
创建协议常量类ProtocolConstant。
记录与自定义协议有关的关键信息,如消息头长度、魔数、版本号:
package com.khr.krpc.protocol;/*** 协议常量*/
public interface ProtocolConstant {/*** 消息头长度*/int MESSAGE_HEADER_LENGTH = 17;/*** 协议魔数*/byte PROTOCOL_MAGIC = 0x1;/*** 协议版本号*/byte PROTOCOL_VERSION = 0x1;
}
创建消息字段的枚举类ProtocolMessageStatusEnum。
协议状态枚举,成功、请求失败、响应失败三类枚举值:
package com.khr.krpc.protocol;import lombok.Getter;/*** 协议消息的状态枚举*/
@Getter
public enum ProtocolMessageStatusEnum {OK("ok", 20),BAD_REQUEST("badRequest", 40),BAD_RESPONSE("badResponse", 50);private final String text;private final int value;ProtocolMessageStatusEnum(String text, int value){this.text = text;this.value = value;}/*** 根据 value 获取枚举** @param value* @return*/public static ProtocolMessageStatusEnum getEnumByValue(int value){for (ProtocolMessageStatusEnum anEnum : ProtocolMessageStatusEnum.values()){if (anEnum.value == value){return anEnum;}}return null;}
}
创建消息类型的枚举类ProtocolMessageTypeEnum。
协议消息类型枚举,请求、响应、心跳、其它四类枚举值:
package com.khr.krpc.protocol;import lombok.Getter;/*** 协议消息的类型枚举*/
@Getter
public enum ProtocolMessageTypeEnum {REQUEST(0),RESPONSE(1),HEART_BEAT(2),OTHERS(3);private final int key;ProtocolMessageTypeEnum(int key){this.key = key;}/*** 根据 key 获取枚举** @param key* @return*/public static ProtocolMessageTypeEnum getEnumByType(int key){for (ProtocolMessageTypeEnum anEnum : ProtocolMessageTypeEnum.values()){if (anEnum.key == key){return anEnum;}}return null;}
}
创建序列化器的枚举类ProtocolMessageSerializerEnum。
序列化器枚举,对应之前设计好的序列化器:
package com.khr.krpc.protocol;import cn.hutool.core.util.ObjectUtil;
import lombok.Getter;import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;/*** 协议消息的序列化器枚举*/
@Getter
public enum ProtocolMessageSerializerEnum {JDK(0, "jdk"),JSON(1, "json"),KRYO(2, "kryo"),HESSIAN(3, "hessian");private final int key;private final String value;ProtocolMessageSerializerEnum(int key, String value){this.key = key;this.value = value;}/*** 获取值列表** @return*/public static List<String> getValues(){return Arrays.stream(values()).map(item -> item.value).collect(Collectors.toList());}/*** 根据 key 获取枚举** @param key* @return*/public static ProtocolMessageSerializerEnum getEnumByKey(int key){for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()){if (anEnum.key == key){return anEnum;}}return null;}/*** 根据 value 获取枚举** @param value* @return*/public static ProtocolMessageSerializerEnum getEnumByValue(String value){if (ObjectUtil.isEmpty(value)){return null;}for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()){if (anEnum.value.equals(value)){return anEnum;}}return null;}
}
(2)网络传输
之前采用了基于HTTP协议的Vert.x服务器。同样,Vert.x也支持TCP服务器,简单易用。
在server包下创建tcp包,将所有与TCP服务相关的代码都放在该包下。
TCP服务器实现。
创建VertxTcpServer类,与之前的VertxHttpServer逻辑类似,先创建服务器实例,然后定义处理请求的方法,最后启动服务器:
package com.khr.krpc.server.tcp;import com.khr.krpc.server.HttpServer;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class VertxTcpServer implements HttpServer{private byte[] handleRequest(byte[] requestData){//在此处编写处理请求的逻辑,根据 requestData 构造响应数据并返回String requestString = new String(requestData);System.out.println("Received request from client: " + requestString);//示例,实际逻辑根据具体业务需求实现return "Hello, client!".getBytes();}@Overridepublic void doStart(int port){//创建 Vert.x 实例Vertx vertx = Vertx.vertx();//创建 TCP 服务器NetServer server = vertx.createNetServer();//处理请求server.connectHandler(socket -> {//处理连接socket.handler(buffer -> {//处理接收到的字节数组byte[] requestData = buffer.getBytes();//自定义字节数组处理逻辑,比如解析请求、调用服务、构造响应等byte[] responseData =handleRequest(requestData);//发送响应socket.write(Buffer.buffer(responseData));});});//启动 TCP 服务器并监听指定端口server.listen(port, result ->{if (result.succeeded()){System.out.println("TCP server started on port "+ port);}else {System.out.println("Failed to start TCP server: "+ result.cause());}});}public static void main(String[] args){new VertxTcpServer().doStart(8888);}
}
其中,socket.write(Buffer.buffer(responseData)) 方法,就是在向连接到服务器的客服端发送数据。数据格式为Buffer,这是Vert.x提供的字节数组缓冲区实现。
TCP客户端实现。
创建VertxTcpClient类,先创建客户端实例,然后定义处理请求的方法,最后建立连接:
package com.khr.krpc.server.tcp;import io.vertx.core.Vertx;public void start(){//创建 Vert.x 实例Vertx vertx = Vertx.vertx();vertx.createNetClient().connect(8888,"localhost", result ->{if (result.succeeded()){System.out.println("Connected to TCP server");io.vertx.core.net.NetSocket socket = result.result();//发送数据socket.write("Hello,server!");//接收响应socket.handler(buffer -> {System.out.println("Received response from server: "+ buffer.toString());});} else {System.out.println("Failed to connect to TCP server");}});}public static void main(String[] args){new VertxTcpClient().start();}
运行测试可以看到服务器与客户端相互打招呼。
(3)编码器和解码器
Vert.x的TCP服务器收发的消息是 Buffer 类型,不能直接写入一个对象。因此需要编码器和解码器将Java的消息对象和 Buffer 进行相互转换。
编码器先 new 一个空的 Buffer 缓冲区,然后按照顺序向缓冲区依次写入数据;解码器在读取时也按照顺序依次读取,还原出编码前的数据。
实现消息编码器。
在protocol包下创建ProtocolMessageEncoder类,核心流程是依次向 Buffer 缓冲区写入消息对象里的字段:
package com.khr.krpc.protocol;import com.khr.krpc.serializer.Serializer;
import com.khr.krpc.serializer.SerializerFactory;
import io.vertx.core.buffer.Buffer;import java.io.IOException;public class ProtocolMessageEncoder {/***编码** @param protocolMessage* @return* @throws IOException*/public static Buffer encode(ProtocolMessage<?> protocolMessage) throws IOException{if (protocolMessage == null || protocolMessage.getHeader() == null){return Buffer.buffer();}ProtocolMessage.Header header = protocolMessage.getHeader();//依次向缓冲区写入字节Buffer buffer = Buffer.buffer();buffer.appendByte(header.getMagic());buffer.appendByte(header.getVersion());buffer.appendByte(header.getSerializer());buffer.appendByte(header.getType());buffer.appendByte(header.getStatus());buffer.appendLong(header.getRequestId());//获取序列化器ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());if (serializerEnum == null){throw new RuntimeException("序列化协议不存在");}Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());//写入 body 长度和数据buffer.appendInt(bodyBytes.length);buffer.appendBytes(bodyBytes);return buffer;}
}
实现消息解码器。
在protocol包下创建ProtocolMessageDecoder类,核心流程是依次从 Buffer 缓冲区的指定位置读取字段,构造出完整的消息对象:
package com.khr.krpc.protocol;import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.model.RpcResponse;
import com.khr.krpc.serializer.Serializer;
import com.khr.krpc.serializer.SerializerFactory;
import io.vertx.core.buffer.Buffer;import java.io.IOException;/*** 协议消息解码器*/
public class ProtocolMessageDecoder {/*** 解码** @param buffer* @return* @throws IOException*/public static ProtocolMessage<?> decode(Buffer buffer) throws IOException{//分别从指定位置读出 BufferProtocolMessage.Header header = new ProtocolMessage.Header();byte magic = buffer.getByte(0);//校验魔数if (magic != ProtocolConstant.PROTOCOL_MAGIC){throw new RuntimeException("消息 magic 非法");}header.setMagic(magic);header.setVersion(buffer.getByte(1));header.setSerializer(buffer.getByte(2));header.setType(buffer.getByte(3));header.setStatus(buffer.getByte(4));header.setRequestId(buffer.getLong(5));header.setBodyLength(buffer.getInt(13));//解决粘包问题,只读指定长度的数据byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength());//解析消息体ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());if (serializerEnum == null) {throw new RuntimeException("序列化消息的协议不存在");}Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByType(header.getType());if (messageTypeEnum == null){throw new RuntimeException("序列化消息的类型不存在");}switch (messageTypeEnum){case REQUEST:RpcRequest request = serializer.deserialize(bodyBytes,RpcRequest.class);return new ProtocolMessage<>(header, request);case RESPONSE:RpcResponse response = serializer.deserialize(bodyBytes,RpcResponse.class);return new ProtocolMessage<>(header, response);case HEART_BEAT:case OTHERS:default:throw new RuntimeException("暂不支持该消息类型");}}
}
创建单元测试类测试。
编码解码均能正常工作:
package com.khr.rpc.protocol;import cn.hutool.core.util.IdUtil;
import com.khr.krpc.constant.RpcConstant;
import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.protocol.*;
import io.vertx.core.buffer.Buffer;
import org.junit.Assert;
import org.junit.Test;import java.io.IOException;public class ProtocolMessageTest {@Testpublic void testEncodeAndDecode() throws IOException{//构造消息ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();ProtocolMessage.Header header = new ProtocolMessage.Header();header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);header.setVersion(ProtocolConstant.PROTOCOL_VERSION);header.setSerializer((byte) ProtocolMessageSerializerEnum.JDK.getKey());header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());header.setStatus((byte) ProtocolMessageStatusEnum.OK.getValue());header.setRequestId(IdUtil.getSnowflakeNextId());header.setBodyLength(0);RpcRequest rpcRequest = new RpcRequest();rpcRequest.setServiceName("kService");rpcRequest.setMethodName("kMethod");rpcRequest.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);rpcRequest.setParameterTypes(new Class[]{System.class});rpcRequest.setArgs(new Object[]{"aaa","bbb"});protocolMessage.setHeader(header);protocolMessage.setBody(rpcRequest);Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);ProtocolMessage<?> message = ProtocolMessageDecoder.decode(encodeBuffer);Assert.assertNotNull(message);}
}
(4)请求处理器(服务提供者)
请求处理器的作用是接受请求,然后通过反射调用服务实现类。
类似于之前的HttpServerHandler,TCP服务器需要一个TcpServerHandler。通过实现Vert.x提供的Handler<NetSocket>接口,可以定义TCP请求处理器。
在server.tcp包下创建TcpServerHandler类:
package com.khr.krpc.server.tcp;import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.model.RpcResponse;
import com.khr.krpc.protocol.ProtocolMessage;
import com.khr.krpc.protocol.ProtocolMessageEncoder;
import com.khr.krpc.protocol.ProtocolMessageDecoder;
import com.khr.krpc.protocol.ProtocolMessageTypeEnum;
import com.khr.krpc.registry.LocalRegistry;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;import java.io.IOException;
import java.lang.reflect.Method;public class TcpServerHandler implements Handler<NetSocket> {/*** 处理请求** @param socket the event to handle*/@Overridepublic void handle(NetSocket socket){//处理连接netSocket.handler(buffer -> {//接受请求,解码ProtocolMessage<RpcRequest> protocolMessage;try {protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);} catch (IOException e){throw new RuntimeException("协议消息解码错误");}RpcRequest rpcRequest = protocolMessage.getBody();//处理请求//构造响应结果对象RpcResponse rpcResponse = new RpcResponse();try{//获取要调用的服务实现类,通过反射调用Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());Method method = implClass.getMethod(rpcRequest.getMethodName(),rpcRequest.getParameterTypes());Object result = method.invoke(implClass.newInstance(),rpcRequest.getArgs());//封装返回结果rpcResponse.setData(result);rpcResponse.setDataType(method.getReturnType());rpcResponse.setMessage("ok");} catch (Exception e){e.printStackTrace();rpcResponse.setMessage(e.getMessage());rpcResponse.setException(e);}//发送响应,编码ProtocolMessage.Header header = protocolMessage.getHeader();header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey());ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header,rpcResponse);try {Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage);socket.write(encode);} catch (IOException e){throw new RuntimeException("协议消息编码错误");}});}
}
(5)请求发送(服务消费者)
调整消费者发送HTTP请求为TCP请求:
package com.khr.krpc.proxy;import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.khr.krpc.RpcApplication;
import com.khr.krpc.config.RpcConfig;
import com.khr.krpc.constant.RpcConstant;
import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.model.RpcResponse;
import com.khr.krpc.model.ServiceMetaInfo;
import com.khr.krpc.protocol.*;
import com.khr.krpc.registry.Registry;
import com.khr.krpc.registry.RegistryFactory;
import com.khr.krpc.serializer.Serializer;
import com.khr.krpc.serializer.SerializerFactory;
import com.khr.krpc.server.tcp.VertxTcpClient;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetClient;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.SocketAddress;import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;/*** 服务代理(JDK动态代理)*/public class ServiceProxy implements InvocationHandler {/*** 调用代理** @return* @throws Throwable*/@Overridepublic Object invoke(Object proxy,Method method,Object[] args) throws Throwable{//指定序列化器final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());//构造请求String serviceName = method.getDeclaringClass().getName();RpcRequest rpcRequest = RpcRequest.builder().serviceName(serviceName).methodName(method.getName()).parameterTypes(method.getParameterTypes()).args(args).build();try {//序列化byte[] bodyBytes = serializer.serialize(rpcRequest);//从注册中心获取服务提供者请求地址RpcConfig rpcConfig = RpcApplication.getRpcConfig();Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();serviceMetaInfo.setServiceName(serviceName);serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());if (CollUtil.isEmpty(serviceMetaInfoList)) {throw new RuntimeException("暂无服务地址");}//暂时先取第一个ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);//发送 TCP 请求Vertx vertx = Vertx.vertx();NetClient netClient = vertx.createNetClient();CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();netClient.connect(selectedServiceMetaInfo.getServicePort(), serviceMetaInfo.getServiceHost(),result -> {if (result.succeeded()) {System.out.println("Connect to TCP server");io.vertx.core.net.NetSocket socket = result.result();//发送数据//构造消息ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();ProtocolMessage.Header header = new ProtocolMessage.Header();header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);header.setVersion(ProtocolConstant.PROTOCOL_VERSION);header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey());header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());//生成全局请求IDheader.setRequestId(IdUtil.getSnowflakeNextId());protocolMessage.setHeader(header);protocolMessage.setBody(rpcRequest);//编码请求try {Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);socket.write(encodeBuffer);} catch (IOException e) {throw new RuntimeException("协议消息编码错误");}//接收响应TcpBufferHandlerWarpper bufferHandlerWarpper = new TcpBufferHandlerWarpper(buffer -> {try {ProtocolMessage<RpcResponse> rpcResponseProtocolMessage = (ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer);responseFuture.complete(rpcResponseProtocolMessage.getBody());} catch (IOException e) {throw new RuntimeException("协议消息解码错误");}});} else {System.out.println("Failed to connect to TCP server");}});RpcResponse rpcResponse = responseFuture.get();//关闭连接netClient.close();return rpcResponse.getData();} catch(IOException e){e.printStackTrace();}return null;}
}
重点关注发送TCP请求部分。由于Vert.x提供的请求处理器是异步、反应式的,为了更方便地获取结果,使用CompletableFuture转异步为同步。
CompleteableFuturn<RpcResponse> responseFuture = new CompletableFuture<>();
netClient.connet(XXX,result -> {// 完成响应responseFuture.complete(rpcResponseProtocolMessage.getBody());});
);
//阻塞,直到响应完成,才会继续向下执行
RpcResponse rpcResponse = responseFuture.get();
4. 测试
修改服务提供者ProviderExample代码,改为启动TCP服务器:
package com.khr.example.provider;import com.khr.example.common.service.UserService;
import com.khr.krpc.RpcApplication;
import com.khr.krpc.config.RegistryConfig;
import com.khr.krpc.config.RpcConfig;
import com.khr.krpc.model.ServiceMetaInfo;
import com.khr.krpc.registry.LocalRegistry;
import com.khr.krpc.registry.Registry;
import com.khr.krpc.registry.RegistryFactory;
import com.khr.krpc.server.HttpServer;
import com.khr.krpc.server.VertxHttpServer;
import com.khr.krpc.server.tcp.VertxTcpServer;
import io.vertx.core.Vertx;/*** 服务提供者示例*/
public class ProviderExample {public static void main(String[] args){//RPC框架初始化RpcApplication.init();//注册服务String serviceName = UserService.class.getName();LocalRegistry.registry(serviceName,UserServiceImpl.class);//注册服务到注册中心RpcConfig rpcConfig = RpcApplication.getRpcConfig();RegistryConfig registryConfig = rpcConfig.getRegistryConfig();Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();serviceMetaInfo.setServiceName(serviceName);serviceMetaInfo.setServiceHost(rpcConfig.getServerHost());serviceMetaInfo.setServicePort(Integer.valueOf(rpcConfig.getServerPort()));try {registry.register(serviceMetaInfo);}catch (Exception e){throw new RuntimeException(e);}//启动 TCP 服务VertxTcpServer vertxTcpServer = new VertxTcpServer();vertxTcpServer.doStart(8080);//启动 Web 服务//HttpServer httpServer = new VertxHttpServer();//httpServer.doStart(Integer.parseInt(RpcApplication.getRpcConfig().getServerPort()));}
}
ConsumerExample项目不动,先后启动后如果不能正常完成调用,说明出现了半包粘包问题。
5. 解决半包粘包问题
(1)什么是半包和粘包
粘包:连续给对端发送两个或两个以上的数据包,对端在一次收取时可能收到的数据包大于一个,即可能是一个包和另一个包一部分的结合,或者是两个完整的数据包头尾相连。
半包:一次收取到的数据只是其中一个包的一部分。
比如,客户端连续2次发送消息:
//第一次
Hello, server!Hello, server!Hello, server!Hello, server!
//第二次
Hello, server!Hello, server!Hello, server!Hello, server!
服务端接收到的是:
半包:
//第一次
Hello, server!Hello, server!
//第二次
Hello, server!Hello, server!Hello, server!
粘包:
//第三次
Hello, server!Hello, server!Hello, server!Hello, server!Hello, server!
(2)半包粘包问题演示
修改TCP客户端代码,连续发送1000次消息:
public class VertxTcpClient{public void start(){//创建 Vert.x 实例Vertx vertx = Vertx.vertx();vertx.createNetClient().connect(8888,"localhost", result ->{if (result.succeeded()){System.out.println("Connected to TCP server");io.vertx.core.net.NetSocket socket = result.result();for (int i = 0; i < 1000; i++){//发送数据Buffer buffer = Buffer.buffer();String str = "Hello, server!Hello, server!Hello, server!Hello, server!";buffer.appendInt(0);buffer.appendInt(str.getBytes().length);buffer.appendBytes(str.getBytes());socket.write(buffer);}//接收响应socket.handler(buffer -> {System.out.println("Received response from server: "+ buffer.toString());});} else {System.out.println("Failed to connect to TCP server");}});}public static void main(String[] args){new VertxTcpClient().start();}
}
TCP服务端打印出每次收到的消息:
package com.khr.krpc.server.tcp;import com.khr.krpc.server.HttpServer;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class VertxTcpServer implements HttpServer{@Overridepublic void doStart(int port){//创建 Vert.x 实例Vertx vertx = Vertx.vertx();//创建 TCP 服务器NetServer server = vertx.createNetServer();//处理请求server.connectHandler(socket -> {socket.handler(buffer -> {String testMessage = "Hello,server!Hello,server!Hello,server!"int messageLength = testMessage.getBytes().length;if (buffer.getBytes().length < messageLength){System.out.println("半包,length = " + buffer.getBytes().length);return;}if (buffer.getBytes().length > messageLength){System.out.println("粘包,length = " + buffer.getBytes().length);return;}String str = new String(buffer.getBytes(0, messageLength));System.out.println(str);if (testMessage.equals(str)){System.out.println("good");}});});//启动 TCP 服务器并监听指定端口server.listen(port, result ->{if (result.succeeded()){log.info("TCP server started on port "+ port);}else {log.info("Failed to start TCP server: "+ result.cause());}});}public static void main(String[] args){new VertxTcpServer().doStart(8888);}
}
运行后,再服务端会查看到类似这样的结果:
(3)Vert.x解决半包和粘包
解决半包的核心思路:在消息头中设置请求体的长度,服务端接收时,判断每次消息的长度是否符合预期,不完整就不读,留到下一次接收到消息时再读取。
if(buffer == null || buffer.length() == 0){throw new RuntimeException("消息 buffer 为空");
}
if(buffer.getBytes().length < ProtocolConstant.MESSAGE_HEADER_LENGTH){throw new RuntimeException("发生半包问题");
}
解决粘包的核心思路:每次只读取指定长度的数据,超过长度的留着下一次接收到消息时再读取。
//只读指定长度的数据
byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength())
Vert.x框架中,内置了RecordParser, 可以保证下次读取到特定长度的字符。
先修改TCP服务端代码进行测试,采用RecordParser读取固定长度的消息:
package com.khr.krpc.server.tcp;import com.khr.krpc.server.HttpServer;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class VertxTcpServer implements HttpServer{@Overridepublic void doStart(int port){//创建 Vert.x 实例Vertx vertx = Vertx.vertx();//创建 TCP 服务器NetServer server = vertx.createNetServer();//处理请求server.connectHandler(socket -> {String testMessage = "Hello,server!Hello,server!Hello,server!"int messageLength = testMessage.getBytes().length;//构造parserRecordParser parser = RecordParser.newFixed(messageLength);//每次读取固定值长度的内容parser.setOutput(new Handler<Buffer>() {@Overridepublic void handle(Buffer buffer) {String str = new String(buffer.getBytes());System.out.println(str);if (testMessage.equals(str)) {System.out.println("good");}}});socket.handler(parser);});//启动 TCP 服务器并监听指定端口server.listen(port, result ->{if (result.succeeded()){log.info("TCP server started on port "+ port);}else {log.info("Failed to start TCP server: "+ result.cause());}});}public static void main(String[] args){new VertxTcpServer().doStart(8888);}
}
测试后发现,输出类似结果,非常整齐,解决了半包和粘包:
但在实际应用中,消息体的长度是不固定的,所以要调整RecordParser的固定长度。
将读取完整的消息拆分为2次:
- 先完整读取请求头信息。由于请求头信息长度是固定的,可以使用RecordParser保证每次都完整读取。
- 再根据请求头长度信息更改RecordParser的固定长度,保证完整获取到请求体。
package com.khr.krpc.server.tcp;import com.khr.krpc.server.HttpServer;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class VertxTcpServer implements HttpServer{@Overridepublic void doStart(int port){//创建 Vert.x 实例Vertx vertx = Vertx.vertx();//创建 TCP 服务器NetServer server = vertx.createNetServer();//处理请求server.connectHandler(socket -> {RecordParser parser = RecordParser.newFixed(8);parser.setOutput(new Handler<Buffer>() {//初始化int size = -1;//一次完整的读取(头 + 体)Buffer resultBuffer = Buffer.buffer();@Overridepublic void handle(Buffer buffer) {if (-1 == size){//读取消息头长度size = buffer.getInt(4);parser.fixedSizeMode(size);//写入头信息到结果resultBuffer.appendBuffer(buffer);} else {//写入体信息到结果resultBuffer.appendBuffer(buffer);System.out.println(resultBuffer.toString());//重置一轮parser.fixedSizeMode(8);size = -1;resultBuffer = Buffer.buffer();}}});socket.handler(parser);});//启动 TCP 服务器并监听指定端口server.listen(port, result ->{if (result.succeeded()){log.info("TCP server started on port "+ port);}else {log.info("Failed to start TCP server: "+ result.cause());}});}public static void main(String[] args){new VertxTcpServer().doStart(8888);}
}
将size变量初始化为-1,表示尚未读取头部消息,还没去确定消息体的长度。因此接下来读取的数据就是消息的头部信息。
读取到消息体的长度信息后,将size设置为读取到的长度值,然后设置解析器进入固定大小模式,以读取消息体。消息体内容读取完毕并写入缓冲区后,再重置解析器和状态,准备处理下一条消息。
在TCP客户端中构造一个变长、长度信息不再Buffer最开头的消息:
public class VertxTcpClient{public void start(){//创建 Vert.x 实例Vertx vertx = Vertx.vertx();vertx.createNetClient().connect(8888,"localhost", result ->{if (result.succeeded()){System.out.println("Connected to TCP server");io.vertx.core.net.NetSocket socket = result.result();for (int i = 0; i < 1000; i++){//发送数据Buffer buffer = Buffer.buffer();String str = "Hello, server!Hello, server!Hello, server!Hello, server!";buffer.appendInt(0);//添加一个整数0,占4个字节,使长度信息不在Buffer最开头。buffer.appendInt(str.getBytes().length);//真正的消息体长度buffer.appendBytes(str.getBytes());//添加实际的消息体内容socket.write(buffer);//发给服务器}//接收响应socket.handler(buffer -> {System.out.println("Received response from server: "+ buffer.toString());});} else {System.out.println("Failed to connect to TCP server");}});}public static void main(String[] args){new VertxTcpClient().start();}
}
测试后发现能够正常读取到消息,不会出现半包粘包问题。
(4)封装半包粘包处理器
解决半包粘包问题有一定的代码量,并且由于ServiceProxy和请求Handler都需要接受Buffer,所以都要进行半包粘包处理。因此可以对代码进行封装复用。
采用装饰者模式,使用RecordParser对原有的Buffer处理器的能力进行增强。
在server.tcp包下新建TcpBufferHandlerWrapper类,实现并增强Handler<Buffer>接口:
package com.khr.krpc.server.tcp;import com.khr.krpc.protocol.ProtocolConstant;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;/*** 装饰者模式(使用 recordParser 对原有的 buffer 处理能力进行增强)*/public class TcpBufferHandlerWarpper implements Handler<Buffer>{private final RecordParser recordParser;public TcpBufferHandlerWarpper(Handler<Buffer> bufferHandler){recordParser = initRecordParser(bufferHandler);}@Overridepublic void handle(Buffer buffer){recordParser.handle(buffer);}private RecordParser initRecordParser(Handler<Buffer> bufferHandler){//构造 parserRecordParser parser = RecordParser.newFixed(ProtocolConstant.MESSAGE_HEADER_LENGTH);parser.setOutput(new Handler<Buffer>() {//初始化int size = -1;//一次完整的读取(头 + 体)Buffer resultBuffer = Buffer.buffer();@Overridepublic void handle(Buffer buffer) {if (-1 == size){//读取消息体长度size = buffer.getInt(13);parser.fixedSizeMode(size);//写入头信息到结果resultBuffer.appendBuffer(buffer);} else {//写入体信息到结果resultBuffer.appendBuffer(buffer);//已拼接为完整 Buffer,执行处理bufferHandler.handle(resultBuffer);//重置一轮parser.fixedSizeMode(ProtocolConstant.MESSAGE_HEADER_LENGTH);size = -1;resultBuffer = Buffer.buffer();}}});return parser;}
}
优化客户端调用代码。
修改TCP请求处理器,使用TcpBufferHandlerWrapper封装之前处理请求的部分:
package com.khr.krpc.server.tcp;import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.model.RpcResponse;
import com.khr.krpc.protocol.ProtocolMessage;
import com.khr.krpc.protocol.ProtocolMessageEncoder;
import com.khr.krpc.protocol.ProtocolMessageDecoder;
import com.khr.krpc.protocol.ProtocolMessageTypeEnum;
import com.khr.krpc.registry.LocalRegistry;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;import java.io.IOException;
import java.lang.reflect.Method;public class TcpServerHandler implements Handler<NetSocket> {/*** 处理请求** @param socket the event to handle*/@Overridepublic void handle(NetSocket socket){TcpBufferHandlerWarpper bufferHandlerWarpper = new TcpBufferHandlerWarpper(buffer -> {//处理请求逻辑不变……});socket.handler(bufferHandlerWrapper);}
}
修改客户端处理响应。之前是将所有发送请求、处理响应的代码都放到了ServiceProxy类中,使其变得复杂臃肿。因此将其优化,把所有的请求响应逻辑提出来,封装为单独的VertxTcpClient类,放在server.tcp包下。
修改后的VertxTcpClient:
package com.khr.krpc.server.tcp;import cn.hutool.core.util.IdUtil;
import com.khr.krpc.RpcApplication;
import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.model.RpcResponse;
import com.khr.krpc.model.ServiceMetaInfo;
import com.khr.krpc.protocol.*;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** Vertx TCP 请求客户端*/public class VertxTcpClient {/*** 发送请求** @param rpcRequest* @param serviceMetaInfo* @return* @throws InterruptedException* @throws ExecutionException*/public static RpcResponse doRequest(RpcRequest rpcRequest, ServiceMetaInfo serviceMetaInfo) throws InterruptedException, ExecutionException {//发送TCP请求Vertx vertx = Vertx.vertx();NetClient netClient = vertx.createNetClient();CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();netClient.connect(serviceMetaInfo.getServicePort(), serviceMetaInfo.getServiceHost(),result -> {if (!result.succeeded()) {System.err.println("Failed to connect to TCP server");return;}NetSocket socket = result.result();//发送数据//构造消息ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();ProtocolMessage.Header header = new ProtocolMessage.Header();header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);header.setVersion(ProtocolConstant.PROTOCOL_VERSION);header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey());header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());//生成全局请求IDheader.setRequestId(IdUtil.getSnowflakeNextId());protocolMessage.setHeader(header);protocolMessage.setBody(rpcRequest);//编码请求try {Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);socket.write(encodeBuffer);} catch (IOException e) {throw new RuntimeException("协议消息编码错误");}//接收响应TcpBufferHandlerWarpper bufferHandlerWarpper = new TcpBufferHandlerWarpper(buffer -> {try {ProtocolMessage<RpcResponse> rpcResponseProtocolMessage = (ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer);responseFuture.complete(rpcResponseProtocolMessage.getBody());} catch (IOException e) {throw new RuntimeException("协议消息解码错误");}});socket.handler(bufferHandlerWarpper);});RpcResponse rpcResponse = responseFuture.get();//关闭连接netClient.close();return rpcResponse;}
}
而在ServiceProxy中,直接调用VertxTcpClient即可:
package com.khr.krpc.proxy;import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.khr.krpc.RpcApplication;
import com.khr.krpc.config.RpcConfig;
import com.khr.krpc.constant.RpcConstant;
import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.model.RpcResponse;
import com.khr.krpc.model.ServiceMetaInfo;
import com.khr.krpc.protocol.*;
import com.khr.krpc.registry.Registry;
import com.khr.krpc.registry.RegistryFactory;
import com.khr.krpc.serializer.Serializer;
import com.khr.krpc.serializer.SerializerFactory;
import com.khr.krpc.server.tcp.VertxTcpClient;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetClient;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.SocketAddress;import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;/*** 服务代理(JDK动态代理)*/public class ServiceProxy implements InvocationHandler {/*** 调用代理** @return* @throws Throwable*/@Overridepublic Object invoke(Object proxy,Method method,Object[] args) throws Throwable{//指定序列化器final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());//构造请求String serviceName = method.getDeclaringClass().getName();RpcRequest rpcRequest = RpcRequest.builder().serviceName(serviceName).methodName(method.getName()).parameterTypes(method.getParameterTypes()).args(args).build();try {//序列化byte[] bodyBytes = serializer.serialize(rpcRequest);//从注册中心获取服务提供者请求地址RpcConfig rpcConfig = RpcApplication.getRpcConfig();Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();serviceMetaInfo.setServiceName(serviceName);serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());if (CollUtil.isEmpty(serviceMetaInfoList)) {throw new RuntimeException("暂无服务地址");}ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);//发送 TCP 请求RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo);return rpcResponse.getData();} catch (IOException e){throw new RuntimeException("调用失败");}}
}
至此,扩展功能,自定义协议完成。