【手撸IM】通讯协议设计与实现

  • 【手撸IM】专题由来:之前利用业余空闲时间写一个Java版Akka-Rpc,并且基于它写了一个分布式高性能文件服务,从反馈上来看,还是得到了一定的关注,甚至部分同学真的基于此直接抄作业和二开了。因此有了再进一步去手撸一个分布式IM系统脚手架原型项目的想法。要手撸一个分布式 IM 涉及的东西太多,例如:RPC,分布式策略,通讯协议,会话列表,消息存储(离线消息,历史消息,发件箱,收件箱),基础IM业务实现(用户,单聊,群聊等),基础消息体实现(文本消息,图片消息等),富文本消息文件存储。虽然RPC,文件服务等已经可以复用,但是工作量仍然是巨大的,希望《手撸IM》专题可以早日初具规模。作为一个开源学习交流用途项目,本身也没有什么商业考虑,因此如有不妥,还请不喜勿喷,同时欢迎各种技术类交流和反馈。
  • 源码地址:https://github.com/bossfriday/bossfriday-nubybear

【手撸IM】专栏导读
1.《【手撸IM】消息ID设计与实现》https://blog.csdn.net/camelials/article/details/136558285
2.《【手撸IM】通讯协议设计与实现》https://blog.csdn.net/camelials/article/details/136879608

1. 背景

之前说过要手撸一个 IM 需要考虑的东西太多了,以至于前期做的很多工作与 IM 本身其实并没有太多直接的关系,例如:分布式策略、接入服务等。上一篇抛砖引玉的介绍了消息ID的设计与实现,现在谈下如何设计与实现一个私有的通讯协议。

在 IM 系统中常用的开放协议有:XMPP(Extensible Messaging and Presence Protocol)、SIP(Session Initiation Protocol)、MQTT(Message Queuing Telemetry Transport),那么为什么不直接使用已有的开放协议非要自己去搞一个私有协议呢?原因如下:

  • 定制化需求:IM应用通常具有特定的功能和业务需求,这些需求可能无法完全通过已有的开放协议来满足。为了实现特定的功能和业务逻辑,开发者可能需要设计和实现自己的通讯协议。

  • 性能优化:自定义的私有协议通常可以更好地满足应用的性能需求,包括减少通讯延迟、降低带宽消耗等。通过优化协议设计,可以提升系统的通讯效率和用户体验。

  • 安全考虑:一些IM应用可能有较高的安全需求,需要采取特定的安全措施来保护用户数据和通讯隐私。使用自定义的私有协议可以更灵活地实现安全功能,如加密通讯、身份验证等。

  • 竞争优势:自定义的私有协议可以成为IM应用的竞争优势之一。通过独特的通讯协议设计,可以为用户提供独特的功能和体验,从而吸引更多用户和提升市场竞争力。

  • 控制权:使用自定义的私有协议可以使开发者对通讯协议有更多的控制权,可以根据实际需求和市场变化灵活地调整和优化协议设计,而不受外部协议规范的限制。

举个例子,早期很多 IM 应用,直接选择 XMPP ,虽然上手快,但是XMPP是一种基于XML的开放式即时通讯协议,首先的问题就是消息体臃肿,对于减少通讯延迟、降低带宽消耗极其不友好(XML标记臃肿性无法规避的问题)。另外,XML 的直接可读性又导致消息安全性几乎是裸奔:随便一个抓包,一切尽收眼底。

2. 私有通讯协议设计

2.1 设计思路

要设计一个私有的通讯协议,其实大致上走的就是一个传统套路:协议头如何表达和组织,协议体如何序列化。纵观那些常用的开放协议都是如此,例如下面的一个SIP消息示例:

INVITE sip:alice@example.com SIP/2.0
Via: SIP/2.0/UDP client.example.com:5060;branch=z9hG4bKnashds7
Max-Forwards: 70
To: Alice <sip:alice@example.com>
From: Bob <sip:bob@example.com>;tag=1928301774
Call-ID: a84b4c76e66710
CSeq: 314159 INVITE
Contact: <sip:bob@client.example.com>
Content-Type: application/sdp
Content-Length: 142v=0
o=bob 2890844526 2890844526 IN IP4 client.example.com
s=-
c=IN IP4 client.example.com
t=0 0
m=audio 49217 RTP/AVP 0
a=rtpmap:0 PCMU/8000

请求行:指定请求方法(INVITE)、请求的URI(目标用户的SIP地址)和SIP协议的版本。
Via:指定了发送请求的SIP终端的地址信息。
Max-Forwards:指定了请求可以被转发的最大次数。
To:指定了目标用户的SIP地址。
From:指定了发送者的SIP地址和标签。
Call-ID:指定了当前呼叫的唯一标识符。
CSeq:指定了请求的序列号和请求方法。
Contact:指定了发送者的联系地址。
Content-Type:指定了消息体的类型。
Content-Length:指定了消息体的长度。
消息体(SDP格式):指定了会话描述协议(SDP)内容,包括会话的属性、媒体类型和媒体参数。

从上面的例子可以看出,对于一个SIP协议的消息,光是消息头就几十个字节出去了。其实早年的飞信和微软MSN用的就是一个非标的SIP协议,然后称之为:SIP-C协议(本人在飞信服务端干了7年,后期主导了和飞信服务端的建设),其中的一个改动就是对SIP协议头的Key做了精简:例如:将“From”精简为“F”。目的其实也就是给消息体瘦身嘛,在那个智能手机刚刚兴起的年代人们尚且如此,现在更应当如此。

于此同时,我们也能发现:将常用的开放协议进行改造其实是设计和实现一个私有通讯协议的捷径。例如:CMPP(China Mobile Peer-to-Peer)中国移动短信协议其实就看作是一个非标的SMPP协议(Short Message Peer-to-Peer:一种专门用于短信消息传输的协议,最初由欧洲电信标准化机构(ETSI)制定)。

2.2 设计参照选型

既然将常用的开放协议进行改造是设计和实现一个私有通讯协议的捷径,那么放眼望去MQTT协议无疑是一个非常精简的协议,毕竟SMPP之类的太过于追求消息荷载的紧凑序列化了,PB或者其他基于TLV的序列化方式它不香吗?现在不是流行说:不是羽绒服买不起,而是军大衣更有性价比!
在这里插入图片描述
从上图中MQTT协议消息体结构中可以看出其固定头(FixHeader)仅仅只有2个字节,这对于传统的SIP简直太香了,毕竟基于应用开发的我们做到bit位级的精简已经是极致了!对于可变头(VariableHeader)来说包括的信息也比较少和紧凑(较常的应用是做为包的标识),对于消息荷载(Payload)来说,那么就是自己爱怎么定就怎么定就行了。

2.3 通讯协议设计思路

2.3.1 消息类型

标准的MQTT协议中的消息类型由固定头第一字节的高4位表达,那么限制了其范围最多为:2的4次方,即16种。标准MQTT消息类型为:
Reserved1(0);CONNECT(1);CONNACK(2);PUBLISH(3);PUBACK(4);PUBREC(5);PUBREL(6);PUBCOMP(7);SUBSCRIBE(8);SUBACK(9);UNSUBSCRIBE(10);UNSUBACK(11);PINGREQ(12);PINGRESP(13);DISCONNECT(14);Reserved2(15);

其实回过头来想想我们的实际需求就会发现好像16种表达足够用了:

  • 连接相关:连接,断连,重连
  • 信令相关:发消息专用一类(PUBLISH & PUBACK)、其他信令用共用一类(QUERY & QUERYACK),同时结合可变头中的Topic。
  • 保活相关:Ping & Pong

2.3.2 剩余长度

标准的MQTT协议中第2字节为剩余长度,官方解释为:固定头的第二字节用来保存变长头部和消息体的总大小的,但不是直接保存的。这一字节是可以扩展,其保存机制,前7位用于保存长度,后一部用做标识。当最后一位为 1时,表示长度不足,需要使用二个字节继续保存。

这些官方的描述比较绕口,白话就是:使用变长方式表达消息长度,剩余长度字段的长度可以是一个字节,也可以是多个字节,取决于消息的实际长度。这无疑是一种非常好的做法。因此在实际的方案中将消息体长度约定为:用最多3个字节的无符号变长Int表达。3字节无符号Int的范围为:2的24次方 -1 = 16,777,215 字节,即:≈ 16 MB;因为对于一般系统和 IM 系统来说16 MB其实足够用了:

  • 文本消息:16MB那太够了。
  • 富媒体消息:用外链去表达富媒体、同时限制富媒体缩略图的上限。
  • 拉消息:限制一批最多拉多少条或者多少MB即可。例如:200条或者10MB。

2.3.3 校验和(Checksum)

在标准的MQTT协议中没有明确定义校验和(Checksum),但是在TCP协议中,校验和(Checksum)是一种错误检测机制,用于检测数据在传输过程中是否发生了损坏或修改。TCP校验和的计算方式是对TCP报文段中的数据字段和部分首部字段进行计算,然后将结果添加到TCP报文段的首部中。接收方会根据校验和对接收到的数据进行验证,以确保数据的完整性和可靠性。介于Checksum的重要性,决定使用协议固定头第2字节用来表达校验和(Checksum)

2.3.4 设计结果

最终基于MQTT协议的私有通讯协议可以简要表达为下面的描述,其中第1字段与标准MQTT协议完全一致,荷载则决定使用protostuff序列化方式,原因是.proto文件的编写有点恶心,并且它与Protocol Buffer性能几乎接近,另外的原因是为了和《Java版Akka-Rpc实现与应用》https://blog.csdn.net/camelials/article/details/123327236 的实现保持一致。

  • 设计结果
类型位置说明
固定头第1字节MsgType (消息类型:4 bits);DUP (重传标记:1 bits);QoS (质量等级:2 bits);RETAIN (保留位:1 bits)
固定头第2字节校验和(Checksum)
可变头第3-J字节剩余长度
可变头第K-L字节signature(鉴权信息) 、topic(可以理解为信令名)、targetId(目标用户ID)
荷载第 M-N 字节消息体Bytes
  • MessageType改造
public enum MqttMessageType {/*** connect*/CONNECT(1),CONNACK(2),DISCONNECT(14),/*** publish*/PUBLISH(3),PUBACK(4),/*** query*/QUERY(5),QUERYACK(6),QUERYCON(7),/*** reconnect*/RECONNECT(8),RECONNECTACK(9),/*** ping*/PINGREQ(12),PINGRESP(13),/*** reserve*/RESERVE1(0),RESERVE2(15);private final int value;MqttMessageType(int value) {this.value = value;}public int getValue() {return this.value;}/*** valueOf** @param i* @return*/public static MqttMessageType valueOf(int i) {for (MqttMessageType t : MqttMessageType.values()) {if (t.value == i) {return t;}}throw new IllegalArgumentException("Invalid MqttMessageType value: " + i);}
}
  • QoS(与标准MQTT一致)
public enum QoS {AT_MOST_ONCE(0),AT_LEAST_ONCE(1),EXACTLY_ONCE(2),DEFAULT(3);private int value;QoS(int value) {this.value = value;}public int getValue() {return this.value;}/*** valueOf** @param i* @return*/public static QoS valueOf(int i) {for (QoS q : QoS.values()) {if (q.value == i) {return q;}}throw new IllegalArgumentException("Invalid QoS value: " + i);}
}

3. 通讯协议主要实现

1、系统接入服务打算用Netty实现,因此通讯协议消息编解码器基于Netty实现。
2、消息体荷载打算使用基于TLV的protostuff序列化方式,不过介于荷载怎么样都可以,因此测试代码中仅使用Utf8字符串(protostuff的序列化与反序列化不需要在这里证明)。

3.1 消息头(MqttMessageHeader)

package cn.bossfriday.im.protocol.core;import cn.bossfriday.im.protocol.enums.MqttMessageType;
import cn.bossfriday.im.protocol.enums.QoS;/*** MqttMessageHeader* <p>* | MsgType (消息类型:4 bits) |  DUP (重传标记:1 bits)  |   QoS (质量等级:2 bits)  |  RETAIN (保留位:1 bits)  |** @author chenx*/
public class MqttMessageHeader {private MqttMessageType mqttMessageType;private boolean retain;private QoS qos = QoS.AT_MOST_ONCE;private boolean dup;public MqttMessageHeader(MqttMessageType mqttMessageType, boolean retain, QoS qos, boolean dup) {this.mqttMessageType = mqttMessageType;this.retain = retain;this.qos = qos;this.dup = dup;}public MqttMessageHeader(byte flags) {this.retain = (flags & 1) > 0;this.qos = QoS.valueOf((flags & 0x6) >> 1);this.dup = (flags & 8) > 0;this.mqttMessageType = MqttMessageType.valueOf((flags >> 4) & 0xF);}public MqttMessageType getType() {return this.mqttMessageType;}public MqttMessageType getMqttMessageType() {return this.mqttMessageType;}public boolean isRetained() {return this.retain;}public QoS getQos() {return this.qos;}public boolean isDup() {return this.dup;}public void setMqttMessageType(MqttMessageType mqttMessageType) {this.mqttMessageType = mqttMessageType;}public void setRetain(boolean retain) {this.retain = retain;}public void setQos(QoS qos) {this.qos = qos;}public void setDup(boolean dup) {this.dup = dup;}/*** encode* <p>* MsgType (消息类型:4 bits)* DUP (重传标记:1 bits)* QoS (质量等级:2 bits)* RETAIN (保留位:1 bits)** @return*/public byte encode() {byte b = 0;b = (byte) (this.mqttMessageType.getValue() << 4);b |= this.retain ? 1 : 0;b |= this.qos.getValue() << 1;b |= this.dup ? 8 : 0;return b;}@Overridepublic String toString() {return "MqttMessageHeader{" + "mqttMessageType=" + this.mqttMessageType + ", retain=" + this.retain + ", qos=" + this.qos + ", dup=" + this.dup + '}';}
}

3.2 消息基类(MqttMessage)

package cn.bossfriday.im.protocol.core;import cn.bossfriday.im.protocol.enums.MqttMessageType;
import cn.bossfriday.im.protocol.enums.QoS;import java.io.*;/*** Message** @author chenx*/
public abstract class MqttMessage {private final MqttMessageHeader header;private byte headerCode;private int lengthSize = 0;protected MqttMessage(MqttMessageType mqttMessageType) {this.header = new MqttMessageHeader(mqttMessageType, false, QoS.AT_MOST_ONCE, false);}protected MqttMessage(MqttMessageHeader header) {this.header = header;}/*** getMessageLength** @return*/protected abstract int getMessageLength();/*** writeMessage** @param out* @throws IOException*/protected abstract void writeMessage(OutputStream out) throws IOException;/*** readMessage** @param in* @param msgLength* @throws IOException*/protected abstract void readMessage(InputStream in, int msgLength) throws IOException;/*** read** @param in* @throws IOException*/public final void read(InputStream in) throws IOException {int msgLength = this.readMsgLength(in);this.readMessage(in, msgLength);}/*** write** @param out* @throws IOException*/public final void write(OutputStream out) throws IOException {this.headerCode = this.header.encode();out.write(this.headerCode);this.writeMsgCode(out);this.writeMsgLength(out);this.writeMessage(out);}/*** toBytes** @return*/public final byte[] toBytes() {ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();try {this.write(byteArrayOutputStream);} catch (IOException e) {throw new MqttException("Message.toBytes() error!");}return byteArrayOutputStream.toByteArray();}/*** toUtfBytes** @param s* @return*/public final byte[] toUtfBytes(String s) {if (s == null) {return new byte[0];}try (ByteArrayOutputStream byteOut = new ByteArrayOutputStream();DataOutputStream dos = new DataOutputStream(byteOut)) {dos.writeUTF(s);dos.flush();return byteOut.toByteArray();} catch (IOException e) {throw new MqttException("MessageObfuscator.writeString() error!");}}/*** 消息长度为变长Int(为了省那么点字节)*/public final int getLengthSize() {return this.lengthSize;}public void setRetained(boolean retain) {this.header.setRetain(retain);}public boolean isRetained() {return this.header.isRetained();}public void setQos(QoS qos) {this.header.setQos(qos);}public QoS getQos() {return this.header.getQos();}public void setDup(boolean dup) {this.header.setDup(dup);}public boolean isDup() {return this.header.isDup();}public MqttMessageType getType() {return this.header.getMqttMessageType();}/*** readMsgLength*/private int readMsgLength(InputStream in) throws IOException {int msgLength = 0;int multiplier = 1;int digit;do {digit = in.read();msgLength += (digit & 0x7f) * multiplier;multiplier *= 128;} while ((digit & 0x80) > 0);return msgLength;}/*** writeMsgLength*/private void writeMsgLength(OutputStream out) throws IOException {int val = this.getMessageLength();do {this.lengthSize++;byte b = (byte) (val & 0x7F);val >>= 7;if (val > 0) {b |= 0x80;}out.write(b);} while (val > 0);}/*** writeMsgCode*/private void writeMsgCode(OutputStream out) throws IOException {int val = this.getMessageLength();int code = this.headerCode;do {byte b = (byte) (val & 0x7F);val >>= 7;if (val > 0) {b |= 0x80;}code = code ^ b;} while (val > 0);out.write(code);}
}

3.3 可重试消息基类(RetryableMqttMessage)

package cn.bossfriday.im.protocol.core;import cn.bossfriday.im.protocol.enums.MqttMessageType;import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;import static cn.bossfriday.im.protocol.core.MqttConstant.FIX_HEADER_LENGTH;/*** RetryableMqttMessage** @author chenx*/
public abstract class RetryableMqttMessage extends MqttMessage {/*** 在MQTT协议栈中,消息序号(messageSequence)通常用于标识消息的顺序和唯一性。这里的视线中消息序号是一个2字节的字段(最大可以表达无符号整型65535),用于发布(publish)消息和发布确认(publishAck)消息的对齐。* 具体而言,消息序号在协议栈中的用途包括:* 1、消息排序和唯一性:每个消息都有一个唯一的序号,用于在通信中标识消息的顺序和确保消息的唯一性。这在处理分片消息或者确保消息到达的顺序十分重要。* 2、重传机制:当消息需要重传时,消息序号可以用于标识需要重传的消息。例如,在发布消息(publish)时,如果没有收到确认,发送方可能会重新发送该消息,而消息序号可以确保接收方能够识别重传的消息。* 3、质量等级为1和2的消息传递:在MQTT中,质量等级(QoS)为1或2的消息需要确认。消息序号可以用于匹配发布消息和发布确认消息,从而实现消息的可靠传输。* 消息序号在MQTT协议栈中扮演了重要的角色,用于确保消息的顺序性、唯一性和可靠传输。*/private int messageSequence;protected RetryableMqttMessage(MqttMessageHeader header) {super(header);}protected RetryableMqttMessage(MqttMessageType mqttMessageType) {super(mqttMessageType);}@Overrideprotected int getMessageLength() {return FIX_HEADER_LENGTH;}@Overrideprotected void writeMessage(OutputStream out) throws IOException {int id = this.getMessageSequence();int lsb = id & 0xFF;int msb = (id & 0xFF00) >> 8;out.write(msb);out.write(lsb);}@Overrideprotected void readMessage(InputStream in, int msgLength) throws IOException {int msgId = in.read() * 0x100 + in.read();this.setMessageSequence(msgId);}public void setMessageSequence(int messageSequence) {this.messageSequence = messageSequence;}public int getMessageSequence() {return this.messageSequence;}
}

3.4 可重试消息实现示例(PublishMessage)

package cn.bossfriday.im.protocol.message;import cn.bossfriday.im.protocol.core.MqttMessageHeader;
import cn.bossfriday.im.protocol.core.RetryableMqttMessage;
import cn.bossfriday.im.protocol.enums.MqttMessageType;import java.io.*;import static cn.bossfriday.im.protocol.core.MqttConstant.FIX_HEADER_LENGTH;/*** PublishMessage** @author chenx*/
public class PublishMessage extends RetryableMqttMessage {private String topic;private byte[] data;private String targetId;private long signature;private int date;private boolean isServer;public PublishMessage(String topic, byte[] data, String targetId, boolean isServer) {super(MqttMessageType.PUBLISH);this.topic = topic;this.targetId = targetId;this.data = data;this.signature = 0xffL;this.isServer = isServer;}public PublishMessage(MqttMessageHeader header, boolean isServer) {super(header);this.isServer = isServer;}@Overrideprotected int getMessageLength() {int length = FIX_HEADER_LENGTH + Long.BYTES;if (this.isServer) {length += Integer.BYTES;}length += this.toUtfBytes(this.topic).length;length += this.toUtfBytes(this.targetId).length;length += this.data.length;return length;}@Overrideprotected void writeMessage(OutputStream out) throws IOException {DataOutputStream dos = new DataOutputStream(out);dos.writeLong(this.signature);if (this.isServer) {this.date = (int) (System.currentTimeMillis() / 1000);dos.writeInt(this.date);}dos.writeUTF(this.topic);dos.writeUTF(this.targetId);dos.flush();super.writeMessage(out);dos.write(this.data);dos.flush();}@Overrideprotected void readMessage(InputStream in, int msgLength) throws IOException {DataInputStream dis = new DataInputStream(in);int pos = 0;this.signature = dis.readLong();pos += 8;this.date = dis.readInt();pos += 4;this.topic = dis.readUTF();pos += this.toUtfBytes(this.topic).length;this.targetId = dis.readUTF();pos += this.toUtfBytes(this.targetId).length;super.readMessage(in, msgLength);pos += 2;this.data = new byte[msgLength - pos];dis.read(this.data);}public String getTopic() {return this.topic;}public byte[] getData() {return this.data;}public String getTargetId() {return this.targetId;}public int getDate() {return this.date;}
}

3.5 Netty自定义消息编码器(MessageEncoder)

package cn.bossfriday.im.protocol.codec;import cn.bossfriday.im.protocol.core.MqttException;
import cn.bossfriday.im.protocol.core.MqttMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;import java.util.Objects;/*** MessageEncoder** @author chenx*/
public class MessageEncoder extends MessageToByteEncoder<MqttMessage> {@Overrideprotected void encode(ChannelHandlerContext ctx, MqttMessage msg, ByteBuf out) throws Exception {this.encode(msg, out);}/*** encode** @param msg* @param out*/public void encode(MqttMessage msg, ByteBuf out) {if (Objects.isNull(msg)) {throw new MqttException("msg is null!");}if (Objects.isNull(out)) {throw new MqttException("outByteBuf is null!");}byte[] data = msg.toBytes();data = MessageObfuscator.obfuscateData(data, 2 + msg.getLengthSize());out.writeBytes(data);}
}

3.5 Netty自定义消息解码器(MessageDecoder)

package cn.bossfriday.im.protocol.codec;import cn.bossfriday.im.protocol.core.MqttMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;import static cn.bossfriday.im.protocol.core.MqttConstant.FIX_HEADER_LENGTH;
import static cn.bossfriday.im.protocol.core.MqttConstant.MAX_MESSAGE_LENGTH_SIZE;
import static cn.bossfriday.im.protocol.core.MqttException.BAD_MESSAGE_EXCEPTION;
import static cn.bossfriday.im.protocol.core.MqttException.READ_DATA_TIMEOUT_EXCEPTION;/*** MessageDecoder** @author chenx*/
public class MessageDecoder extends ByteToMessageDecoder {private final long timeoutMillis;private final String userId;private final boolean isServer;private volatile long lastReadTime;private volatile ScheduledFuture<?> timeout;private boolean closed;public MessageDecoder(long timeoutMillis, String userId, boolean isServer) {this.timeoutMillis = timeoutMillis;this.userId = userId;this.isServer = isServer;}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {Object decoded = this.decode(ctx, in);if (decoded != null) {out.add(decoded);}}/*** readTimedOut** @param ctx*/protected void readTimedOut(ChannelHandlerContext ctx) {if (!this.closed) {this.timeout.cancel(false);this.timeout = null;this.closed = true;ctx.fireExceptionCaught(READ_DATA_TIMEOUT_EXCEPTION);ctx.close();}}/*** decode** @param ctx* @param buf* @return* @throws IOException*/public Object decode(ChannelHandlerContext ctx, ByteBuf buf) throws IOException {if (buf.readableBytes() == 0) {return null;}if (buf.readableBytes() < 3) {this.resumeTimer(ctx);return null;}buf.markReaderIndex();// read away headerint first = buf.readByte();int second = buf.readByte();int digit;int code = first;int msgLength = 0;int multiplier = 1;int lengthSize = 0;do {lengthSize++;digit = buf.readByte();code = code ^ digit;msgLength += (digit & 0x7f) * multiplier;multiplier *= 128;if ((digit & 0x80) > 0 && !buf.isReadable()) {this.resumeTimer(ctx);buf.resetReaderIndex();return null;}} while ((digit & 0x80) > 0);if (code != second) {this.close(ctx, buf);return null;}if (lengthSize > MAX_MESSAGE_LENGTH_SIZE) {this.close(ctx, buf);return null;}if (buf.readableBytes() < msgLength) {this.resumeTimer(ctx);buf.resetReaderIndex();return null;}byte[] data = new byte[FIX_HEADER_LENGTH + lengthSize + msgLength];buf.resetReaderIndex();buf.readBytes(data);this.pauseTimer();data = MessageObfuscator.obfuscateData(data, FIX_HEADER_LENGTH + lengthSize);MqttMessage msg = MessageInputStream.readMessage(new ByteArrayInputStream(data), this.isServer);if (msg == null) {this.close(ctx, buf);}return msg;}/*** resumeTimer** @param ctx*/private void resumeTimer(ChannelHandlerContext ctx) {this.lastReadTime = System.currentTimeMillis();if (this.timeoutMillis > 0 && (this.timeout == null || this.timeout.isCancelled()) && !this.closed) {this.timeout = ctx.executor().schedule(new ReadTimeoutTask(ctx), this.timeoutMillis, TimeUnit.MILLISECONDS);}}/*** pauseTimer*/private void pauseTimer() {if (this.timeout != null) {this.timeout.cancel(false);}}/*** close*/private void close(ChannelHandlerContext ctx, ByteBuf buf) {if (this.timeout != null) {this.timeout.cancel(false);}this.timeout = null;this.closed = true;buf.skipBytes(buf.readableBytes());ctx.fireExceptionCaught(BAD_MESSAGE_EXCEPTION);ctx.close();}public String getUserId() {return this.userId;}/*** ReadTimeoutTask*/private final class ReadTimeoutTask implements Runnable {private final ChannelHandlerContext ctx;ReadTimeoutTask(ChannelHandlerContext ctx) {this.ctx = ctx;}@Overridepublic void run() {if (!this.ctx.channel().isOpen()) {return;}long currentTime = System.currentTimeMillis();long nextDelay = MessageDecoder.this.timeoutMillis - (currentTime - MessageDecoder.this.lastReadTime);if (nextDelay <= 0) {// Read timed out - set a new timeout and notify the callback.MessageDecoder.this.timeout = this.ctx.executor().schedule(this, MessageDecoder.this.timeoutMillis, TimeUnit.MILLISECONDS);try {MessageDecoder.this.readTimedOut(this.ctx);} catch (Throwable t) {this.ctx.fireExceptionCaught(t);}} else {// Read occurred before the timeout - set a new timeout with// shorter delay.MessageDecoder.this.timeout = this.ctx.executor().schedule(this, nextDelay, TimeUnit.MILLISECONDS);}}}
}

3.7 主要测试代码

package cn.bossfriday.im.protocol.test;import cn.bossfriday.im.protocol.codec.MessageDecoder;
import cn.bossfriday.im.protocol.codec.MessageEncoder;
import cn.bossfriday.im.protocol.codec.MessageInputStream;
import cn.bossfriday.im.protocol.message.PublishMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;/*** PublishMessageTest** @author chenx*/
@RunWith(MockitoJUnitRunner.class)
public class PublishMessageTest {@Mockprivate ChannelHandlerContext mockCtx;@Beforepublic void mockInit() {}/*** 消息读写测试*/@Testpublic void readWriteMessageTest() throws IOException {String topic = "topic-1";String targetId = "targetId-1";byte[] data = "中文abc1234!@#$".getBytes(StandardCharsets.UTF_8);boolean isServer = true;PublishMessage pubMsg1 = new PublishMessage(topic, data, targetId, isServer);pubMsg1.setMessageSequence(123);// MqttMessage -> bytesbyte[] msgData = pubMsg1.toBytes();// bytes -> MqttMessagePublishMessage pubMsg2 = (PublishMessage) MessageInputStream.readMessage(new ByteArrayInputStream(msgData), isServer);System.out.println("topic: " + pubMsg2.getTopic());System.out.println("targetId: " + pubMsg2.getTargetId());System.out.println("dataString: " + new String(pubMsg2.getData(), StandardCharsets.UTF_8));Assert.assertEquals(pubMsg2.getTopic(), topic);Assert.assertEquals(pubMsg2.getTargetId(), targetId);Assert.assertEquals(new String(pubMsg2.getData(), StandardCharsets.UTF_8), new String(data, StandardCharsets.UTF_8));}/*** 消息编解码器测试*/@Testpublic void messageCodecTest() throws IOException {ByteBuf buf = Unpooled.buffer();String topic = "topic-1";String targetId = "targetId-1";byte[] data = "中文abc1234!@#$".getBytes(StandardCharsets.UTF_8);boolean isServer = true;PublishMessage pubMsg1 = new PublishMessage(topic, data, targetId, isServer);pubMsg1.setMessageSequence(123);// 编码MessageEncoder msgEncoder = new MessageEncoder();msgEncoder.encode(pubMsg1, buf);// 解码MessageDecoder msgDecoder = new MessageDecoder(6000L, targetId, isServer);PublishMessage pubMsg2 = (PublishMessage) msgDecoder.decode(this.mockCtx, buf);System.out.println("topic: " + pubMsg2.getTopic());System.out.println("targetId: " + pubMsg2.getTargetId());System.out.println("dataString: " + new String(pubMsg2.getData(), StandardCharsets.UTF_8));Assert.assertEquals(pubMsg2.getTopic(), topic);Assert.assertEquals(pubMsg2.getTargetId(), targetId);Assert.assertEquals(new String(pubMsg2.getData(), StandardCharsets.UTF_8), new String(data, StandardCharsets.UTF_8));}
}

单元测试运行结果通过可以基本判定:消息读写 & 消息编解码器 大致没有问题(MD,这篇Blog是三两白酒下肚后的醒酒之作,谁能保障没问题??),后续计划基于此,使用Netty去实现一个接入服务。完整代码参考:https://github.com/bossfriday/bossfriday-nubybear/tree/master/cn.bossfriday.im.protocol
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/758903.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Vue】el-select下选组件

系列文章 【Vue】vue增加导航标签 本文链接&#xff1a;https://blog.csdn.net/youcheng_ge/article/details/134965353 【Vue】Element开发笔记 本文链接&#xff1a;https://blog.csdn.net/youcheng_ge/article/details/133947977 【Vue】vue&#xff0c;在Windows IIS平台…

修改约束

目录 修改约束 创建数据库 添加约束 删除约束 Oracle从入门到总裁:​​​​​​https://blog.csdn.net/weixin_67859959/article/details/135209645 修改约束 如果说表结构的修改还在可以容忍的范畴之内&#xff0c;那么约束的修改是绝对 100% 禁止的 所有的约束一定要在…

【Flutter 面试题】讲一讲 Dart 的一些重要概念?

【Flutter 面试题】讲一讲 Dart 的一些重要概念&#xff1f; 文章目录 写在前面口述回答补充说明完整代码运行结果详细说明 写在前面 &#x1f64b; 关于我 &#xff0c;小雨青年 &#x1f449; CSDN博客专家&#xff0c;GitChat专栏作者&#xff0c;阿里云社区专家博主&#…

UE5 TPS开发p25 设置大厅,销毁会话,快速退出,检查按钮

这节课主要是完善了菜单选择地图作为游戏大厅,MultiPlayerSessionSubsystem的销毁会话函数,然后就是给Menu菜单添加了Quit和禁止使用按钮 同时还完善了创建房间的漏洞,因为在创建房间的时候如果退出后就马上加入就会发现自己无法创建房间,这是因为服务器判断房间销毁是有一定延…

shell的介绍以及Linux权限的讲解

1 shell命令以及运行原理 1.1 shell命令行的概念 大家在以前的学习中可能会有一个固态的印象&#xff1a; 一台计算机的大概分层为下图 也就是说人可以直接调用计算机的操作系统&#xff0c;但真的是这样吗&#xff1f; 答案是否定的&#xff01; 其实操作系统还有一个“外壳…

31.网络游戏逆向分析与漏洞攻防-网络通信数据包分析工具-其它消息的实现与使用优化

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 如果看不懂、不知道现在做的什么&#xff0c;那就跟着做完看效果 内容参考于&#xff1a;易道云信息技术研究院VIP课 上一个内容&#xff1a;30.数据搜索功能…

如果QQ当年有AI,当年的经典头像会是什么样子?-WEBUI有趣案例分享

&#x1f454;背景介绍 IP_Adapter是个很有趣的模型&#xff0c;可以实现一定程度上的角色一致性&#xff0c;忽然有个想法&#xff0c;用IP_Adapter还原一下QQ当年的经典头像会是什么样子呢&#xff1f; 不过&#xff0c;大家还记得这些古早时期的头像吗&#xff1f;或者&…

优雅的 Markdown

Markdown浅尝 一、勾选框 注意[]前后都要有空格 - [x] 干的漂亮 - [x] 吃饭 - [x] 写代码 - [ ] 睡觉 干的漂亮 吃饭 写代码 睡觉 二、列表 #无序列列表 * 换成 - 也行 * 你 * 你好 * 你好呀 - 你很好啊 你你好你好呀你很好啊 #有序列表 . 后面有个空格 1. 我 2. 是我 3.…

Day73:WEB攻防-支付逻辑篇篡改属性值并发签约越权盗用算法溢出替换对冲

目录 SRC-支付逻辑测试 购买支付-修改数量&篡改价格&订单对冲 修改数量 篡改价格 产品替换对冲 订单替换对冲 购买支付-优惠券复用盗用&积分对冲溢出 优惠卷复用 优惠卷盗用 积分对冲溢出 SRC实战案例分享 越权让他人支付 四舍五入半价购 并发提前全…

python字典:打印字典中的键、值、打印所有字典、python遍历字典

1.定义 字典以花括号&#xff08;{}&#xff09;括起来&#xff0c;里面的元素是成对出现的&#xff0c;不同对元素用逗号&#xff08;,&#xff09;分开&#xff1b;一对元素用 冒号&#xff08;&#xff1a;&#xff09;分割。 2.解读 字典里的每一对元素准确的来说是键值对&…

javaSwing连连看

一、简介 基于java的连连看游戏设计和实现&#xff0c;基本功能包括&#xff1a;消除模块&#xff0c;重新开始模块&#xff0c;刷新模块&#xff0c;选择难度模块&#xff0c;计时模块。本系统结构如下&#xff1a; &#xff08;1&#xff09;消除模块&#xff1a; 完成连连…

python socket 实时通信,多对多,一对一,转发

研究一下python socket 实时通信&#xff0c;多对多&#xff0c;一对一&#xff0c;转发 C>S 单独通信 server1 import socket import threading# 在线客户端列表 online_clients {}def broadcast(message, sender):"""向所有在线客户端广播消息,除了发送…

面试算法-58-求根节点到叶节点数字之和

题目 给你一个二叉树的根节点 root &#xff0c;树中每个节点都存放有一个 0 到 9 之间的数字。 每条从根节点到叶节点的路径都代表一个数字&#xff1a; 例如&#xff0c;从根节点到叶节点的路径 1 -> 2 -> 3 表示数字 123 。 计算从根节点到叶节点生成的 所有数字之和…

2024年腾讯云优惠券全解析、云服务器代金券领取、查询和使用方法

腾讯云代金券领取渠道有哪些&#xff1f;腾讯云官网可以领取、官方媒体账号可以领取代金券、完成任务可以领取代金券&#xff0c;大家也可以在腾讯云百科蹲守代金券&#xff0c;因为腾讯云代金券领取渠道比较分散&#xff0c;腾讯云百科txybk.com专注汇总优惠代金券领取页面&am…

Dockerfile Docker Compose(实战总结)

Dockerfile & Docker Compose&#xff08;实战总结&#xff09; Dockerfile Dockerfile 是用来构建Docker镜像文件&#xff0c;是由一条条构建镜像所需的指令构成的脚步。 步骤&#xff1a; 编写Dockerfile 文件docker build 构建镜像docker run 运行镜像docker push 发…

python coding with ChatGPT 打卡第23天| 回溯算法:理论基础

文章目录 视频讲解回溯法的效率解决的问题如何理解回溯法回溯框架 视频讲解 回溯算法理论篇 回溯是递归的副产品&#xff0c;只要有递归就会有回溯。 回溯法的效率 回溯的本质是穷举&#xff0c;穷举所有可能&#xff0c;然后选出我们想要的答案&#xff0c;如果想让回溯法…

区域规划(Regional Planning)的学习笔记

目录 一、概念题 1.区域的概念、类型、特性 2.区域分析的概念、主要内容 3.自然环境、自然资源的概念 4.区域自然资源评价的内容 5.可持续发展理论定义 6.经济增长、经济结构定义 7.产业结构概念 8.人口增长分析的含义、指标 9.技术进步概念、类型 10.技术进步对区域…

【C++ leetcode】双指针问题(续)

3. 202 .快乐数 题目 编写一个算法来判断一个数 n 是不是快乐数。 「快乐数」 定义为&#xff1a; 对于一个正整数&#xff0c;每一次将该数替换为它每个位置上的数字的平方和。然后重复这个过程直到这个数变为 1&#xff0c;也可能是 无限循环 但始终变不到 1。如果这个过程 结…

ArcGIS Pro、R和INVEST:三位一体的生态系统服务评估框架

生态系统服务是指生态系统所形成的用于维持人类赖以生存和发展的自然环境条件与效用&#xff0c;是人类直接或间接从生态系统中得到的各种惠益。联合国千年生态系统评估&#xff08;Millennium ecosystem assessment&#xff0c;MA&#xff09;提出生态系统服务包括供给、调节、…

智慧水务:雨季山区水域水务智能化监控与监测管理方案

一、方案背景 雨季的水务管理对于各区县来说&#xff0c;无疑是一项至关重要的任务。夏季雨水充沛&#xff0c;江河湖泊水位上涨&#xff0c;山洪、上游排水等情况时有发生&#xff0c;给各地的水务设施和防汛工作带来了严峻的挑战。针对区县的各类水域监管场景&#xff0c;需…