Netty优化-扩展自定义协议中的序列化算法
- 一. 优化与源码
- 1. 优化
- 1.1 扩展自定义协议中的序列化算法
一. 优化与源码
1. 优化
1.1 扩展自定义协议中的序列化算法
序列化,反序列化主要用在消息正文的转换上
- 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
- 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理
目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下
// 反序列化
byte[] body = new byte[bodyLength];
byteByf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);// 序列化
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();
为了支持更多序列化算法,抽象一个 Serializer 接口,提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中
/*** 用于扩展序列化、反序列化算法*/
public interface Serializer {// 反序列化方法<T> T deserialize(Class<T> clazz, byte[] bytes);// 序列化方法<T> byte[] serialize(T object);enum Algorithm implements Serializer {Java {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {try {ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));return (T) ois.readObject();} catch (IOException | ClassNotFoundException e) {throw new RuntimeException("反序列化失败", e);}}@Overridepublic <T> byte[] serialize(T object) {try {ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(object);return bos.toByteArray();} catch (IOException e) {throw new RuntimeException("序列化失败", e);}}},/*** <dependency>* <groupId>com.google.code.gson</groupId>* <artifactId>gson</artifactId>* <version>2.8.5</version>* </dependency>*/Json {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();String json = new String(bytes, StandardCharsets.UTF_8);return gson.fromJson(json, clazz);}@Overridepublic <T> byte[] serialize(T object) {Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();String json = gson.toJson(object);return json.getBytes(StandardCharsets.UTF_8);}}}class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {@Overridepublic Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {try {String str = json.getAsString();return Class.forName(str);} catch (ClassNotFoundException e) {throw new JsonParseException(e);}}@Override // String.classpublic JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {// class -> jsonreturn new JsonPrimitive(src.getName());}}
}
增加配置类和配置文件
public abstract class Config {static Properties properties;static {try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);} catch (IOException e) {throw new ExceptionInInitializerError(e);}}public static int getServerPort() {String value = properties.getProperty("server.port");if(value == null) {return 8080;} else {return Integer.parseInt(value);}}public static Serializer.Algorithm getSerializerAlgorithm() {String value = properties.getProperty("serializer.algorithm");if(value == null) {return Serializer.Algorithm.Java;} else {return Serializer.Algorithm.valueOf(value);}}
}
配置文件
serializer.algorithm=Json
修改编解码器
/*** 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {@Overridepublic void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {ByteBuf out = ctx.alloc().buffer();// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(Config.getSerializerAlgorithm().ordinal());// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);outList.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerAlgorithm = in.readByte(); // 0 或 1byte messageType = in.readByte(); // 0,1,2...int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);// 找到反序列化算法Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];// 确定具体消息类型Class<? extends Message> messageClass = Message.getMessageClass(messageType);Message message = algorithm.deserialize(messageClass, bytes);
// log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
// log.debug("{}", message);out.add(message);}
}
其中确定具体消息类型,可以根据 消息类型字节
获取到对应的 消息 class
@Data
public abstract class Message implements Serializable {/*** 根据消息类型字节,获得对应的消息 class* @param messageType 消息类型字节* @return 消息 class*/public static Class<? extends Message> getMessageClass(int messageType) {return messageClasses.get(messageType);}private int sequenceId;private int messageType;public abstract int getMessageType();public static final int LoginRequestMessage = 0;public static final int LoginResponseMessage = 1;public static final int ChatRequestMessage = 2;private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();static {messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);}
}
测试类
public class TestSerializer {public static void main(String[] args) {MessageCodecSharable CODEC = new MessageCodecSharable();LoggingHandler LOGGING = new LoggingHandler();EmbeddedChannel channel = new EmbeddedChannel(LOGGING, CODEC, LOGGING);LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123","张三");//channel.writeOutbound(message);ByteBuf buf = messageToByteBuf(message);channel.writeInbound(buf);}public static ByteBuf messageToByteBuf(Message msg) {int algorithm = Config.getSerializerAlgorithm().ordinal();ByteBuf out = ByteBufAllocator.DEFAULT.buffer();out.writeBytes(new byte[]{1, 2, 3, 4});out.writeByte(1);out.writeByte(algorithm);out.writeByte(msg.getMessageType());out.writeInt(msg.getSequenceId());out.writeByte(0xff);byte[] bytes = Serializer.Algorithm.values()[algorithm].serialize(msg);out.writeInt(bytes.length);out.writeBytes(bytes);return out;}
}
json序列化结果
16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=zhangsan, password=123, name=张三)
16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 103B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 01 00 00 00 00 00 ff 00 00 00 57 |...............W|
|00000010| 7b 22 75 73 65 72 6e 61 6d 65 22 3a 22 7a 68 61 |{"username":"zha|
|00000020| 6e 67 73 61 6e 22 2c 22 70 61 73 73 77 6f 72 64 |ngsan","password|
|00000030| 22 3a 22 31 32 33 22 2c 22 6e 61 6d 65 22 3a 22 |":"123","name":"|
|00000040| e5 bc a0 e4 b8 89 22 2c 22 73 65 71 75 65 6e 63 |......","sequenc|
|00000050| 65 49 64 22 3a 30 2c 22 6d 65 73 73 61 67 65 54 |eId":0,"messageT|
|00000060| 79 70 65 22 3a 30 7d |ype":0} |
+--------+-------------------------------------------------+----------------+
16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH
16:59:45 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH
json反序列化结果
16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 103B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 01 00 00 00 00 00 ff 00 00 00 57 |...............W|
|00000010| 7b 22 75 73 65 72 6e 61 6d 65 22 3a 22 7a 68 61 |{"username":"zha|
|00000020| 6e 67 73 61 6e 22 2c 22 70 61 73 73 77 6f 72 64 |ngsan","password|
|00000030| 22 3a 22 31 32 33 22 2c 22 6e 61 6d 65 22 3a 22 |":"123","name":"|
|00000040| e5 bc a0 e4 b8 89 22 2c 22 73 65 71 75 65 6e 63 |......","sequenc|
|00000050| 65 49 64 22 3a 30 2c 22 6d 65 73 73 61 67 65 54 |eId":0,"messageT|
|00000060| 79 70 65 22 3a 30 7d |ype":0} |
+--------+-------------------------------------------------+----------------+
16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=zhangsan, password=123, name=张三)
16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
16:57:11 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE