Netty学习(二)——黏包半包、协议设计解析、聊天室

一、粘包与半包

1.1 粘包和半包复现

1、粘包复现:

Server代码:

public class ProblemServer {public static void main(String[] args) throws InterruptedException {new ServerBootstrap()//若是指定接收缓冲区大小:就会出现黏包、半包情况// .option(ChannelOption.SO_RCVBUF, 10)  //设置指定大小的接收缓冲区(TCP)(定义接收的系统缓冲区buf字节大小).group(new NioEventLoopGroup(), new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch){//添加日志处理器(会打印每次接收包得到的数据)ch.pipeline().addLast(new LoggingHandler());}}).bind(8080).sync();System.out.println("服务器启动成功!");}
}

client代码:

public class ProblemClient {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();Channel channel = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder());//String=>ByteBufch.pipeline().addLast(new ChannelInboundHandlerAdapter() {//channelActive:连接建立之后会执行会触发Active事件@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//连续发送10次16字节的内容for (int i = 0; i < 10; i++) {final ByteBuf buffer = ctx.alloc().buffer(16);buffer.writeBytes(new byte[]{1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16});ctx.writeAndFlush(buffer);}System.out.println("finish!");}});}}).connect("127.0.0.1", 8080).sync().channel();System.out.println("客户端连接成功:" + channel);channel.closeFuture().addListener(future -> {group.shutdownGracefully();});}}

效果:

image-20240526174249399

半包复现:

服务器代码

//对ServerBootstrap进行配置,在server的18行添加接收缓冲区配置
.option(ChannelOption.SO_RCVBUF, 10)  //设置指定大小的接收缓冲区(TCP)(定义接收的系统缓冲区buf字节大小)

image-20240526174831342

说明:由于我们客户端每次发送的数据长度都为16个字节,而服务端每次接收到的有50,有10就说明出现了粘包、半包情况。这里出现这种情况是,对系统接收的网络缓冲区进行了设置,而ByteBuf每次设置的容量没有限制就会出现这种情况。

注意

serverBootstrap.option(ChannelOption.SO_RCVBUF, 10) 影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 netty 读取的最小单位,netty 实际每次读取的一般是它的整数倍

1.2 现象分析

1.2.1 粘包、半包情况分析

粘包:

  • 现象:发送 abc def,接收 abcdef。(明明是多次发送请求,服务器端一次就全部接收了)
  • 原因
    • 应用层:接收方 ByteBuf 设置太大(Netty 默认 1024),直接将多个请求的数据统一直接处理。
    • 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包。
    • Nagle 算法:会造成粘包。(出现原因:因为只要是传输层都会加上一个报头,IP层的报头20个字节,tcp的也是20个,此时就会出现一个问题,若是只是发送一个1个字节数据,那么总体也会发送41个字节,此时报头的长度远远大于内容长度,造成了浪费,此时就出现了该算法,其就是尽可能多的发送数据,攒够了一批再发,也就是说若是待发送的数据量太少会先进行积攒,之后攒够了统一再发!)

半包:

  • 现象,发送 abcdef,接收 abc def。(明明是一次发送的请求,服务器端却使用了两次或多次接收到请求的一部分数据)

  • 原因

    • 应用层:接收方 ByteBuf 小于实际发送数据量

    • TCP(滑动窗口):假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时放不下了,只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包。

    • 链路层(MSS限制):当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包。

      • 网络层网卡设备对于数据包的大小是有限制的,(MTU)笔记本普通的网卡是1500个字节,抛开TCP、IP的报文头,那么最大能够传1460个字节,超过这个数据就会将数据切分发送。MTU是数据链路层最大载荷长度,其中MTU包含了MSS。
      • 在自己电脑上一般都是使用localhost(回环地址)来进行测试的,而回环地址对于MSS没有限制,大小为65535,所以本地开发时不好复现。若是向局域网的另一台电脑发送,就会有限制了

所以黏包、半包是在网络编程时必须要解决的问题!本质是因为TCP是流式协议,消息无边界。

1.2.2 滑动窗口、MSS限制、Nagle算法介绍

滑动窗口:

  • TCP 以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点是包的往返时间越长性能就越差

  • 为了解决此问题,引入了窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值

  • 窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用

    • 图中深色的部分即要发送的数据,高亮的部分即窗口

    • 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动

    • 如果 1001~2000 这个段的数据 ack 回来了,窗口就可以向前滑动

    • 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收

MSS 限制:

  • 链路层对一次能够发送的最大数据有限制,这个限制称之为 MTU(maximum transmission unit),不同的链路设备的 MTU 值也有所不同,例如

  • 以太网的 MTU 是 1500

  • FDDI(光纤分布式数据接口)的 MTU 是 4352

  • 本地回环地址的 MTU 是 65535 - 本地测试不走网卡

  • MSS 是最大段长度(maximum segment size),它是 MTU 刨去 tcp 头和 ip 头后剩余能够作为数据传输的字节数

  • ipv4 tcp 头占用 20 bytes,ip 头占用 20 bytes,因此以太网 MSS 的值为 1500 - 40 = 1460

  • TCP 在传递大量数据时,会按照 MSS 大小将数据进行分割发送

  • MSS 的值在三次握手时通知对方自己 MSS 的值,然后在两者之间选择一个小值作为 MSS

    image-20240602155142417

Nagle 算法:

  • 即使发送一个字节,也需要加入 tcp 头和 ip 头,也就是总字节数会使用 41 bytes,非常不经济。因此为了提高网络利用率,tcp 希望尽可能发送足够大的数据,这就是 Nagle 算法产生的缘由
  • 该算法是指发送端即使还有应该发送的数据,但如果这部分数据很少的话,则进行延迟发送
    • 如果 SO_SNDBUF 的数据达到 MSS,则需要发送
    • 如果 SO_SNDBUF 中含有 FIN(表示需要连接关闭)这时将剩余数据发送,再关闭
    • 如果 TCP_NODELAY = true,则需要发送
    • 已发送的数据都收到 ack 时,则需要发送
    • 上述条件不满足,但发生超时(一般为 200ms)则需要发送
    • 除上述情况,延迟发送

1.3 解决办法

方法列举:

  1. 短链接,发一个包建立一次连接,这样连接建立到连接断开之间就是消息的边界,缺点效率太低
  2. 每一条消息采用固定长度,缺点浪费空间
  3. 每一条消息采用分隔符,例如 \n,缺点需要转义
  4. 每一条消息分为 head 和 body,head 中包含 body 的长度

1.3.1 短链接

客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象

客户端代码改进ctx.channel().close();

public class StudyClient {static final Logger log = LoggerFactory.getLogger(StudyClient.class);public static void main(String[] args) {for (int i = 0;i < 10;i++) {send();}System.out.println("finish");}public static void send() {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connected...");ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");// 每次发送16个字节的数据,共发送10次for (int i = 0; i < 10; i++) {ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});ctx.writeAndFlush(buffer);// 使用短链接,每次发送完毕后就断开连接ctx.channel().close();}}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}}

image-20240414180815988

客户端每次向服务器发送了16B的数据,发送后断开连接,未出现粘包现象

1.3.2 定长解码器

Netty中提供了一个FixedLengthFrameDecoder(固定长度解析器),是一个特殊的handler,只不过是专门用来进行解码的。

  • 客户端给每个发送的数据封装成定长的长度(多余的使用分隔符,统一规定)最后统一通过一个ByteBuf发送出去;服务端的话通过使用FixedLengthFrameDecoder来进行固定长度解析,那么每次自然也就解析到定长的Bytebuf来进行处理。
  • 服务器与客户端作一个长度约定,服务端只有收到固定长度的才会接收完毕,否则也会进行等待直到够一定长度才向下一个handler传递;若是一次接收到的长度过大,ByteBuf也只会截取固定长度的内容并对下一个handler进行传递,多出来的部分会留着后序发来的数据再进行组合。

优缺点:虽然能够解决黏包、半包问题,但是客户端要构成定长长度有时候无效内容占用的字节数比较多(若是传递的内容比较少,则为了构成定长长度那么就会产生资源浪费)。

代码示例:

server:

/*** 使用定长解码器解决黏包、半包*/
public class StudyServerV2 {static final Logger log = LoggerFactory.getLogger(StudyServerV2.class);void start() {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {// 使用FixedLengthFrameDecoder对粘包数据进行拆分,该handler需要添加在LoggingHandler之前,保证数据被打印时已被拆分ch.pipeline().addLast(new FixedLengthFrameDecoder(16));ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 连接建立时会执行该方法log.debug("connected {}", ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 连接断开时会执行该方法log.debug("disconnect {}", ctx.channel());super.channelInactive(ctx);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080);log.debug("{} binding...", channelFuture.channel());channelFuture.sync();log.debug("{} bound...", channelFuture.channel());// 关闭channelchannelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();log.debug("stopped");}}public static void main(String[] args) {new StudyServerV2().start();}
}

Client:

public class StudyClientV2 {static final Logger log = LoggerFactory.getLogger(StudyClientV2.class);public static void main(String[] args) {send();System.out.println("finish");}public static void send() {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connected...");ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");// 约定最大长度为16final int maxLength = 16;// 被发送的数据char c = 'a';// 向服务器发送10个报文for (int i = 0; i < 10; i++) {ByteBuf buffer = ctx.alloc().buffer(maxLength);// 定长byte数组,未使用部分会以0进行填充byte[] bytes = new byte[maxLength];// 生成长度为0~15的数据for (int j = 0; j <= i; j++) {bytes[j] = (byte) c;}System.out.println(new String(bytes));buffer.writeBytes(bytes);c++;// 将数据发送给服务器ctx.writeAndFlush(buffer);}}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}}

执行结果:

image-20240602160821335

image-20240602160850619

1.3.3 分隔符解码器

在Netty中提供了两个解码器:

  • LineBasedFrameDecoder:指定以换行符作为分隔符。\n或者\r\n,使用它的时候,会有一个最大长度限制,若是超过了字长长度还没有找到换行符就会抛出一个异常

  • DelimiterBasedFrameDecoder:可以自定义符号来作为分隔符,在构造方法中有最大长度何一个Bytebuf类型的分隔符.

缺点:效率比较低,需要一个一个字节去找消息的边界!

代码:

server:

/*** 使用行解码器解决黏包、半包*/
public class StudyServerV3 {static final Logger log = LoggerFactory.getLogger(StudyServerV3.class);void start() {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {// 通过行解码器对粘包数据进行拆分,以 \n 为分隔符。需要指定最大长度ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 连接建立时会执行该方法log.debug("connected {}", ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 连接断开时会执行该方法log.debug("disconnect {}", ctx.channel());super.channelInactive(ctx);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080);log.debug("{} binding...", channelFuture.channel());channelFuture.sync();log.debug("{} bound...", channelFuture.channel());// 关闭channelchannelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();log.debug("stopped");}}public static void main(String[] args) {new StudyServerV3().start();}
}

client:

public class StudyClientV3 {static final Logger log = LoggerFactory.getLogger(StudyClientV3.class);public static void main(String[] args) {send();System.out.println("finish");}public static void send() {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connected...");ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");// 约定最大长度为 64final int maxLength = 64;// 被发送的数据char c = 'a';for (int i = 0; i < 10; i++) {ByteBuf buffer = ctx.alloc().buffer(maxLength);// 生成长度为0~62的数据Random random = new Random();StringBuilder sb = new StringBuilder();for (int j = 0; j < (int)(random.nextInt(maxLength-2)); j++) {sb.append(c);}// 数据以 \n 结尾sb.append("\n");buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));c++;// 将数据发送给服务器ctx.writeAndFlush(buffer);}}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}}}

执行结果:

image-20240602163250360

1.3.4 LTC解码器(基于长度字段的帧解码器,长度+内容组成)

在传送数据时可以在数据中添加一个用于表示有用数据长度的字段,在解码时读取出这个用于表明长度的字段,同时读取其他相关参数,即可知道最终需要的数据是什么样子的。

LengthFieldBasedFrameDecoder解码器可以提供更为丰富的拆分方法,其构造方法有五个参数

public LengthFieldBasedFrameDecoder(int maxFrameLength,int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip)Copy

参数解析

  • maxFrameLength 数据最大长度
    • 表示数据的最大长度(包括附加信息、长度标识等内容)
  • lengthFieldOffset 数据长度标识的起始偏移量
    • 用于指明数据第几个字节开始是用于标识有用字节长度的,因为前面可能还有其他附加信息
  • lengthFieldLength 数据长度标识所占字节数(用于指明有用数据的长度)
    • 数据中用于表示有用数据长度的标识所占的字节数
  • lengthAdjustment 长度表示与有用数据的偏移量
    • 用于指明数据长度标识和有用数据之间的距离,因为两者之间还可能有附加信息
  • initialBytesToStrip 数据读取起点
    • 读取起点,不读取 0 ~ initialBytesToStrip 之间的数据
public class TestLengthFieldDecoder {public static void main(String[] args) {// 模拟服务器// 使用EmbeddedChannel测试handlerEmbeddedChannel channel = new EmbeddedChannel(// 数据最大长度为1KB,长度标识前后各有1个字节的附加信息,长度标识长度为4个字节(int)new LengthFieldBasedFrameDecoder(1024, 1, 4, 1, 0),new LoggingHandler(LogLevel.DEBUG));// 模拟客户端,写入数据ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();send(buffer, "Hello");send(buffer, "World");System.out.println("发送的数据:");log(buffer);System.out.println("解析的数据:");channel.writeInbound(buffer);channel.writeInbound(buffer);}private static void send(ByteBuf buf, String msg) {// 得到数据的长度int length = msg.length();byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);// 将数据信息写入buf// 写入长度标识前的其他信息buf.writeByte(0xCA);// 写入数据长度标识buf.writeInt(length);// 写入长度标识后的其他信息buf.writeByte(0xFE);// 写入具体的数据buf.writeBytes(bytes);}
}

image-20240602164918427

二、协议设计与解析

协议的作用:

TCP/IP 中消息传输基于流的方式,没有边界

协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则。例如HTTP协议、redis通信协议、websocket协议等等

如何设计协议呢?其实就是给网络传输的信息加上“标点符号”。但通过分隔符来断句不是很好,因为分隔符本身如果用于传输,那么必须加以区分。因此,下面一种协议较为常用

定长字节表示内容长度 + 实际内容

2.1 redis协议示例

redis对于整个命令会看成一个数组。

例:set key value

//举例:set name changlu   //下面每个命令都由一个回车符、换行符分割 字节对应13,10
*3
$3
set
$4
name 
$7
changlu
12345678
  • *3:首先需要让你发送数组的长度 *表示的是命令的数量,3则是命令组成的长度。
  • $3:$表示的是某个命令参数的长度,3表示该命令参数长度为3。
  • 每个命令参数都由\r\n来进行分割

代码示例:使用redis协议模拟与redis服务端进行通信,执行一条set、get命令。

public class RedisClient {static final Logger log = LoggerFactory.getLogger(StudyServer.class);public static void main(String[] args) {NioEventLoopGroup group =  new NioEventLoopGroup();try {ChannelFuture channelFuture = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {// 打印日志ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 回车与换行符final byte[] LINE = {'\r','\n'};// 获得ByteBufByteBuf buffer = ctx.alloc().buffer();// 连接建立后,向Redis中发送一条指令,注意添加回车与换行// set name Nyimabuffer.writeBytes("*3".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("$3".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("set".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("$4".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("name".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("$5".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("Nyima".getBytes());buffer.writeBytes(LINE);ctx.writeAndFlush(buffer);}});}}).connect(new InetSocketAddress("localhost", 6379));channelFuture.sync();// 关闭channelchannelFuture.channel().close().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {// 关闭groupgroup.shutdownGracefully();}}
}

2.2 HTTP协议

HTTP协议在请求行请求头中都有很多的内容,自己实现较为困难,可以使用HttpServerCodec作为服务器端的解码器与编码器,来处理HTTP请求

//CombinedChannelDuplexHandler组合其他两个handler,分别是InBound和OutBound 编解码处理器
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>implements HttpServerUpgradeHandler.SourceCodec {

使用方式

ch.pipeline().addLast(new HttpServerCodec());

代码示例一:

ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//DefaultHttpRequest实现了HttpRequest接口if (msg instanceof HttpRequest){System.out.println("请求行、头");//LastHttpContent实现了HttpContent接口}else if (msg instanceof HttpContent){System.out.println("请求体");}super.channelRead(ctx, msg);}
});

image-20240602171317658

结果:发现浏览器发送一次请求(无论什么方法请求)实际上会解析成两部分。若是我们重写channelRead方法,那么一个http请求就会走两次该handler方法,每次执行方法其中的Object msg分别为不同部分的解析对象

  • DefaultHttpRequest:解析出来请求行和请求头。
  • LastHttpContent$1:表示请求体。(即便是get请求,请求体内容为空也会专门解析一个请求体对象)

**代码示例二:**访问8080,显示Hello, World!

若是我们只对某个特定类型感兴趣的话,例如只对解析出来的DefaultHttpRequest请求体对象感兴趣,可以实现一个SimpleChannelInboundHandler

public class HttpServer {static final Logger log = LoggerFactory.getLogger(StudyServer.class);public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();new ServerBootstrap().group(group).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));// 作为服务器,使用 HttpServerCodec 作为编码器与解码器ch.pipeline().addLast(new HttpServerCodec());// 服务器只处理HTTPRequestch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {// 获得请求urilog.debug("msg:{}",msg.uri());// 获得完整响应,设置版本号与状态码DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);// 设置响应内容byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);// 设置响应体长度,避免浏览器一直接收响应内容response.headers().setInt(CONTENT_LENGTH, bytes.length);// 设置响应体response.content().writeBytes(bytes);// 写回响应ctx.writeAndFlush(response);}});}}).bind(8080);}
}

效果:

image-20240427155210821

控制台:

15:42:48.586 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x4d47e317, L:/127.0.0.1:8080 - R:/127.0.0.1:62298] READ: 864B+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 69 6e 64 65 78 2e 68 74 6d 6c 20 |GET /index.html |
|00000010| 48 54 54 50 2f 31 2e 31 0d 0a 48 6f 73 74 3a 20 |HTTP/1.1..Host: |
|00000020| 31 32 37 2e 30 2e 30 2e 31 3a 38 30 38 30 0d 0a |127.0.0.1:8080..|
|00000030| 43 6f 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 70 |Connection: keep|
|00000040| 2d 61 6c 69 76 65 0d 0a 43 61 63 68 65 2d 43 6f |-alive..Cache-Co|
|00000050| 6e 74 72 6f 6c 3a 20 6d 61 78 2d 61 67 65 3d 30 |ntrol: max-age=0|
|00000060| 0d 0a 73 65 63 2d 63 68 2d 75 61 3a 20 22 43 68 |..sec-ch-ua: "Ch|
|00000070| 72 6f 6d 69 75 6d 22 3b 76 3d 22 31 32 34 22 2c |romium";v="124",|
.......15:42:48.621 [nioEventLoopGroup-2-2] DEBUG com.zb.netty.c4.StudyServer - msg:/index.html
15:42:48.626 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x4d47e317, L:/127.0.0.1:8080 - R:/127.0.0.1:62298] WRITE: 61B+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
|00000020| 20 32 32 0d 0a 0d 0a 3c 68 31 3e 48 65 6c 6c 6f | 22....<h1>Hello|
|00000030| 2c 20 57 6f 72 6c 64 21 3c 2f 68 31 3e          |, World!</h1>   |
+--------+-------------------------------------------------+----------------+

2.3 自定义协议

2.3.1 组成要素

  • 魔数:用来在第一时间判定接收的数据是否为无效数据包

  • 版本号:可以支持协议的升级

  • 序列化算法:消息正文到底采用哪种序列化反序列化方式

    • 如:json、protobuf、hessian、jdk
  • 指令类型:是登录、注册、单聊、群聊… 跟业务相关

  • 请求序号:为了双工通信,提供异步能力

  • 正文长度:正文的长度

  • 消息正文:正文内容(根据序列化算法进行序列化成字节)

2.3.2 自定义消息对象(编解码器、消息抽象类、具体消息类)

image-20240602173150447

  • Message:消息抽象类,定义了消息相关的一些字段内容。

  • LoginRequestMessage:一条业务消息,实现了Message抽象类,是登陆请求消息的抽象。

  • MessageCodec:实现了ByteToMessageCodec执行器,需要传入一个泛型,该泛型就是你要将Bytebuf转换的对象,并且其中需要你重写编解码方法,也就是解析、封装你自定义的一些协议。

Message:

@Data
public abstract class Message implements Serializable {/*** 根据消息类型字节,获得对应的消息 class* @param messageType 消息类型字节* @return 消息 class*/public static Class<? extends Message> getMessageClass(int messageType) {return messageClasses.get(messageType);}private int sequenceId = 0;private int messageType;public int getSequenceId() {return sequenceId;}public void setSequenceId(int sequenceId) {this.sequenceId = sequenceId;}public abstract int getMessageType();private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
}

LoginRequestMessage:

@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message {private String username;private String password;public LoginRequestMessage() {}public LoginRequestMessage(String username, String password) {this.username = username;this.password = password;}@Overridepublic int getMessageType() {return LoginRequestMessage;}
}

MessageCodec:实现了对自定义协议的编解码

public class MessageCodec extends ByteToMessageCodec<Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {// 设置魔数 4个字节out.writeBytes(new byte[]{'N','Y','I','M'});// 设置版本号 1个字节out.writeByte(1);// 设置序列化方式 1个字节。out.writeByte(1);// 设置指令类型 1个字节。out.writeByte(msg.getMessageType());// 设置请求序号 4个字节out.writeInt(msg.getSequenceId());// 为了补齐为16个字节,填充1个字节的数据out.writeByte(0xff);// 获得序列化后的msgByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 获得并设置正文长度 长度用4个字节标识out.writeInt(bytes.length);// 设置消息正文out.writeBytes(bytes);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {// 获取魔数int magic = in.readInt();// 获取版本号byte version = in.readByte();// 获得序列化方式byte seqType = in.readByte();// 获得指令类型byte messageType = in.readByte();// 获得请求序号int sequenceId = in.readInt();// 移除补齐字节in.readByte();// 获得正文长度int length = in.readInt();// 获得正文byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();// 将信息放入List中,传递给下一个handlerout.add(message);// 打印获得的信息正文System.out.println("===========魔数===========");System.out.println(magic);System.out.println("===========版本号===========");System.out.println(version);System.out.println("===========序列化方法===========");System.out.println(seqType);System.out.println("===========指令类型===========");System.out.println(messageType);System.out.println("===========请求序号===========");System.out.println(sequenceId);System.out.println("===========正文长度===========");System.out.println(length);System.out.println("===========正文===========");System.out.println(message);}
}

2.3.3 案例测试

案例一:自定义编解码器测试

public class TestMessageCodec {public static void main(String[] args) throws Exception {EmbeddedChannel channel = new EmbeddedChannel();channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));channel.pipeline().addLast(new MessageCodec());// encodeLoginRequestMessage user = new LoginRequestMessage("zhangsan", "123");channel.writeOutbound(user);// decodeByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();new MessageCodec().encode(null, user, byteBuf);channel.writeInbound(byteBuf);}
}

执行结果:

image-20240602174544803

案例二:解码出现半包问题及解决方案

半包问题出现原因:若是我们将一个编码过后的ByteBuf分为两个包来入站,那么每发一个包就会走一个decode()也就是解码方法,那么此时可以肯定的是由于包没有发完整,序列化字符串肯定也不完整,那么此时进行解序列化肯定就会报错出现异常!

解决半包思路:我们可以使用LTC解码器来进行解决,按照指定的长度规则来进行解码,那么之前半包会走两次handler再使用了解码器之后,由于半包不完整就会进行等待继续接收包,直到取到完整的包才会走handler那么此时执行decode解码自然不会出现序列化问题!

// 添加长度字段解码器,避免粘包半包问题
channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));

2.4 @Sharable注解

@Sharable作用:Netty中原生的handler中用@Sharable注解来标明,该handler能否在多个channel中共享。加了注解可以被共享,不加则不能被共享。

@Sharable的引入

为了提高handler的复用率,可以将handler创建为handler对象,然后在不同的channel中使用该handler对象进行处理操作。但是并不是所有的handler都能通过这种方法来提高复用率的,例如:LengthFieldBasedFrameDecoder。如果多个channel中使用同一个LengthFieldBasedFrameDecoder对象,则可能发生如下问题:

  • channel1中收到了一个半包,LengthFieldBasedFrameDecoder发现不是一条完整的数据,则没有继续向下传播。
  • 此时channel2中也收到了一个半包,因为两个channel使用了同一个LengthFieldBasedFrameDecoder,存入其中的数据刚好拼凑成了一个完整的数据包。LengthFieldBasedFrameDecoder让该数据包继续向下传播,最终引发错误。

为了提高handler的复用率,同时又避免出现一些并发问题,Netty中原生的handler中用@Sharable注解来标明,该handler能否在多个channel中共享。加了注解可以被共享,不加则不能被共享。

那么,我们自定义编解码器能否使用@Sharable注解,这需要根据自定义的handler的处理逻辑进行分析。我们的MessageCodec本身接收的是LengthFieldBasedFrameDecoder处理之后的数据,那么数据肯定是完整的,按分析来说是可以添加@Sharable注解的,

  • 但是实际情况我们并不能添加该注解,会抛出异常信息ChannelHandler cn.nyimac.study.day8.protocol.MessageCodec is not allowed to be shared,因为MessageCodec继承自ByteToMessageCodec,ByteToMessageCodec类的注解如下:

    image-20240602175511538

  • 这就意味着ByteToMessageCodec不能被多个channel所共享的。

原因:因为该类的目标是将ByteBuf转化为Message,意味着传进该handler的数据还未被处理过。所以传过来的ByteBuf可能并不是完整的数据,如果共享则会出现问题。

如果想要加注解共享怎么办呢?

  • 继承MessageToMessageDecoder即可。该类的目标是:将已经被处理的完整数据再次被处理。传过来的Message如果是被处理过的完整数据,那么被共享也就不会出现问题了,也就可以使用@Sharable注解了。

  • 代码示例:

    @ChannelHandler.Sharable
    public class MessageSharableCodec extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {}
    }
    

总结:什么时候可以加 @Sharable?

  • 当 handler 不保存状态时,就可以安全地在多线程下被共享
  • 但要注意对于编解码器类,不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制
  • 如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类

三、在线聊天室案例

3.1 业务介绍

客户端、服务端定义好指定的传输协议,之后根据指定的传输协议来进行传输数据。实现简单的登录、单聊、拉群、群聊、加入退出群、退出登录等功能。

整体架构:

image-20240625165705030

客户端代码:

public class ChatClient {static final Logger log = LoggerFactory.getLogger(ChatClient.class);public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();//登陆消息通知计数器CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);//成功状态变量AtomicBoolean LOGIN = new AtomicBoolean(false);AtomicBoolean EXIT = new AtomicBoolean(false);try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group);bootstrap.channel(NioSocketChannel.class);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProtocolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);// 用来判断是不是 读空闲时间过长,或 写空闲时间过长// 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));// ChannelDuplexHandler 可以同时作为入站和出站处理器ch.pipeline().addLast(new ChannelDuplexHandler() {// 用来触发特殊事件@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{IdleStateEvent event = (IdleStateEvent) evt;// 触发了写空闲事件if (event.state() == IdleState.WRITER_IDLE) {// log.debug("3s 没有写数据了,发送一个心跳包");ctx.writeAndFlush(new PingMessage());}}});ch.pipeline().addLast("client handle", new ChannelInboundHandlerAdapter(){// 在两节监理后触发 active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//负责接收用户在控制台上的输入,负责向服务器发送数据new Thread(()-> {Scanner scanner = new Scanner(System.in);System.out.println("请输入用户名:");String username = scanner.nextLine();System.out.println("请输入密码:");String password = scanner.nextLine();//构造登陆消息对象发送给服务端Message message = new LoginRequestMessage(username, password);ctx.channel().writeAndFlush(message);System.out.println("等待后续操作...");try {// 等待其他线程进行计数为0,此时才会唤醒向下执行WAIT_FOR_LOGIN.await();} catch (InterruptedException e) {e.printStackTrace();}// 如果登录失败if (!LOGIN.get()) {ctx.channel().close();return;}while (true) {System.out.println("==================================");System.out.println("send [username] [content]");System.out.println("gsend [group name] [content]");System.out.println("gcreate [group name] [m1,m2,m3...]");System.out.println("gmembers [group name]");System.out.println("gjoin [group name]");System.out.println("gquit [group name]");System.out.println("quit");System.out.println("==================================");String command = null;try {command = scanner.nextLine();} catch (Exception e) {break;}if(EXIT.get()){return;}String[] s = command.split(" ");switch (s[0]){case "send":ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));break;case "gsend":ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));break;case "gcreate":Set<String> set = new HashSet<>(Arrays.asList(s[2].split(",")));set.add(username); // 加入自己ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], set));break;case "gmembers":ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));break;case "gjoin":ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));break;case "gquit":ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));break;case "quit":ctx.channel().close();return;}}}, "system in").start();}//负责接收服务器的响应数据@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("msg: {}", msg);System.out.println(msg);//单独处理登陆的响应结果,其他结果直接输出消息内容if (msg instanceof LoginResponseMessage) {LoginResponseMessage response = (LoginResponseMessage) msg;if (response.isSuccess()){LOGIN.set(true);//设置登陆状态为true}WAIT_FOR_LOGIN.countDown();//计数-1,若是为0,则会通知使用该计数器阻塞等待的线程}}// 在连接断开时触发@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug("连接已经断开,按任意键退出..");EXIT.set(true);}// 在出现异常时触发@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.debug("连接已经断开,按任意键退出..{}", cause.getMessage());EXIT.set(true);}});}});Channel channel = bootstrap.connect("localhost",8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {group.shutdownGracefully();}}
}

服务端代码:

public class ChatServer {static final Logger log = LoggerFactory.getLogger(ChatServer.class);public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();GroupJoinRequestMessageHandler GROUP_JOIN_HANDLER = new GroupJoinRequestMessageHandler();GroupMembersRequestMessageHandler GROUP_MEMBERS_HANDLER = new GroupMembersRequestMessageHandler();GroupQuitRequestMessageHandler GROUP_QUIT_HANDLER = new GroupQuitRequestMessageHandler();GroupChatRequestMessageHandler GROUP_CHAT_HANDLER = new GroupChatRequestMessageHandler();QuitHandler QUIT_HANDLER = new QuitHandler();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, worker);bootstrap.channel(NioServerSocketChannel.class);bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProtocolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);// 用来判断是不是 读空闲时间过长,或 写空闲时间过长// 5s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件ch.pipeline().addLast(new IdleStateHandler(5,0,0));// ChannelDuplexHandler 可以同时作为入站和出站处理器ch.pipeline().addLast(new ChannelDuplexHandler() {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;// 触发了读空闲事件if (event.state() == IdleState.READER_IDLE) {log.debug("已经 5s 没有读到数据了");ctx.channel().close();}}});ch.pipeline().addLast(LOGIN_HANDLER);ch.pipeline().addLast(CHAT_HANDLER);ch.pipeline().addLast(GROUP_CREATE_HANDLER);ch.pipeline().addLast(GROUP_JOIN_HANDLER);ch.pipeline().addLast(GROUP_MEMBERS_HANDLER);ch.pipeline().addLast(GROUP_QUIT_HANDLER);ch.pipeline().addLast(GROUP_CHAT_HANDLER);ch.pipeline().addLast(QUIT_HANDLER);}});Channel channel = bootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}

注意:由于handler可以在多个channel中共享,所以代码中实现SimpleChannelInboundHandler的消息处理类需要加@Sharable注解。当第二个客户端连接时就会立刻执行INACTIVEUNREGISTERED事件,直接就会连接失败!

3.2 登录

客户端:

  • **发送:**自定义线程在channelActive事件中运行一个线程来主要与我们控制台进行交互,登陆业务同样也是如此,首先需要输入用户名密码,接着将其包装成预先设置好的LoginRequestMessage对象由channel发送出去。
  • 接收:eventloop中的线程接收到经过自定义协议解码取到的对象,将其转为LoginResponseMessage对象,判断其是否登陆成功。
    • 核心:对于如何让eventloop中线程来进行通知主线程登陆成功,我们可以使用一个countdownlatch+AtomicBoolean,前者用于通知主线程拿到登陆结果,后者用于表示登陆的状态成功与否!

服务端:

  • 编写一个实现SimpleChannelInboundHandler的子类,指定接收LoginRequestMessage对象,接着来编写对应的channelRead()方法来进行业务操作,最终根据实际情况来向客户端返回一个LoginResponseMessage

客户端代码:

public class ChatClient {static final Logger log = LoggerFactory.getLogger(ChatClient.class);public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();//登陆消息通知计数器CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);//成功状态变量AtomicBoolean LOGIN = new AtomicBoolean(false);AtomicBoolean EXIT = new AtomicBoolean(false);try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group);bootstrap.channel(NioSocketChannel.class);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProtocolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);// 用来判断是不是 读空闲时间过长,或 写空闲时间过长// 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));// ChannelDuplexHandler 可以同时作为入站和出站处理器ch.pipeline().addLast(new ChannelDuplexHandler() {// 用来触发特殊事件@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{IdleStateEvent event = (IdleStateEvent) evt;// 触发了写空闲事件if (event.state() == IdleState.WRITER_IDLE) {// log.debug("3s 没有写数据了,发送一个心跳包");ctx.writeAndFlush(new PingMessage());}}});ch.pipeline().addLast("client handle", new ChannelInboundHandlerAdapter(){// 在两节监理后触发 active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//负责接收用户在控制台上的输入,负责向服务器发送数据new Thread(()-> {Scanner scanner = new Scanner(System.in);System.out.println("请输入用户名:");String username = scanner.nextLine();System.out.println("请输入密码:");String password = scanner.nextLine();//构造登陆消息对象发送给服务端Message message = new LoginRequestMessage(username, password);ctx.channel().writeAndFlush(message);System.out.println("等待后续操作...");try {// 等待其他线程进行计数为0,此时才会唤醒向下执行WAIT_FOR_LOGIN.await();} catch (InterruptedException e) {e.printStackTrace();}// 如果登录失败if (!LOGIN.get()) {ctx.channel().close();return;}....}, "system in").start();}//负责接收服务器的响应数据@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("msg: {}", msg);System.out.println(msg);//单独处理登陆的响应结果,其他结果直接输出消息内容if (msg instanceof LoginResponseMessage) {LoginResponseMessage response = (LoginResponseMessage) msg;if (response.isSuccess()){LOGIN.set(true);//设置登陆状态为true}WAIT_FOR_LOGIN.countDown();//计数-1,若是为0,则会通知使用该计数器阻塞等待的线程}}});}});Channel channel = bootstrap.connect("localhost",8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {group.shutdownGracefully();}}
}

服务端代码:

@ChannelHandler.Sharable // 必须添加该注解
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {// 获得登录信息String username = msg.getUsername();String password = msg.getPassword();// 校验登录信息boolean login = UserServiceFactory.getUserService().login(username, password);LoginResponseMessage message;if (login) {message = new LoginResponseMessage(true, "登陆成功");// 绑定channel与userSessionFactory.getSession().bind(ctx.channel(), username);} else {message = new LoginResponseMessage(false, "登陆失败");}ctx.writeAndFlush(message);}
}
// 该handler处理登录请求
LoginRequestMessageHandler loginRequestMessageHandler = new LoginRequestMessageHandler();
ch.pipeline().addLast(new LoginRequestMessageHandler());

3.3 单聊

客户端输入send username content即可发送单聊消息,需要服务器端添加处理ChatRequestMessage的handler

客户端:

  • 读取到控制台输入的命令信息,封装成一个ChatRequestMessage发送出去。

服务端:

  • 保存登录信息,用户名和channel的映射

    //保存用户名与channel映射的map集合
    private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
    
  • 订阅经过自定义协议解码得到ChatRequestMessage对象,并对其进行处理。

客户端代码:

case "send" :
ctx.writeAndFlush(new ChatRequestMessage(username, split[1], split[2]));
break;

服务端代码:

@ChannelHandler.Sharable // 必须添加该注解
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {// 获得user所在的channelChannel channel = SessionFactory.getSession().getChannel(msg.getTo());// 如果双方都在线if (channel != null) {// 通过接收方与服务器之间的channel发送信息channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));} else {// 通过发送方与服务器之间的channel发送消息ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户不存在或离线,发送失败"));}}
}C
// 该handler处理单聊请求
ChatRequestMessageHandler chatRequestMessageHandler = new ChatRequestMessageHandler();
ch.pipeline().addLast(chatRequestMessageHandler);

3.4 群聊

客户端:解析命令,封装成GroupChatRequestMessage对象发送出去。

case "gsend" :ctx.writeAndFlush(new GroupChatRequestMessage(username, split[1], split[2]));break;

服务端:

  • 维护一个群名称和对应成员的映射关系

    public class Group {// 聊天室名称private String name;// 聊天室成员private Set<String> members;
    }
    
  • 编写对GroupChatRequestMessage感兴趣的handler,紧接着根据群名获取到所有的channel,接着依次根据channel向外发送出去数据。

@ChannelHandler.Sharable
public class GroupChatMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {String groupName = msg.getGroupName();GroupSession groupSession = GroupSessionFactory.getGroupSession();// 判断群聊是否存在boolean isCreated = groupSession.isCreated(groupName);if (isCreated) {// 给群员发送信息List<Channel> membersChannel = groupSession.getMembersChannel(groupName);for(Channel channel : membersChannel) {channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));}} else {ctx.writeAndFlush(new GroupChatResponseMessage(false, "群聊不存在"));}}
}
// 该handler处理群聊聊天
GroupChatMessageHandler groupChatMessageHandler = new GroupChatMessageHandler();
ch.pipeline().addLast(groupChatMessageHandler);

3.5 空闲检测(发送心跳)

连接假死:

原因:

  • 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源。
  • 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
  • 应用程序线程阻塞,无法进行数据读写

问题

  • 假死的连接占用的资源不能自动释放
  • 向假死的连接发送数据,得到的反馈是发送超时

解决办法:

  • 服务器端解决:

    • 怎么判断客户端连接是否假死呢?如果能收到客户端数据,说明没有假死。因此策略就可以定为,每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死。
  • 客户端解决:

    • 客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器

netty解决方案:

  • netty提供了这中假死的方式,就是空闲检测器。(就是一个handler,IdleStateHandler)

    • IdleStateHandler·:三个参数构造,参数1检测读的空闲时间超过了某秒,参数2检测写的空闲时间超过了多少秒,参数3检测读写都空闲的时间上线。单位秒。

    • 若是指定秒数中没有收到channel发来数据,那么就会触发事件(read or write …),可以编写ChannelDuplexHandler重写其中的userEventTriggered来进行判断触发了什么事件。

  • 当指定时间内未发生读或写事件时,会触发特定事件。想要处理这些事件,需要自定义事件处理函数

    • 读空闲会触发READER_IDLE
    • 写空闲会触发WRITE_IDLE
    • 读和写空闲会触发ALL_IDEL

服务端代码:

// 用于空闲连接的检测,5s内未读到数据,会触发READ_IDLE事件
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
// 添加双向处理器,负责处理READER_IDLE事件
ch.pipeline().addLast(new ChannelDuplexHandler() {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 获得事件IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {// 断开连接ctx.channel().close();}}
});
  • 使用IdleStateHandler进行空闲检测
  • 使用双向处理器,ChannelDuplexHandler

    • 对入站与出站事件进行处理。IdleStateHandler中的事件为特殊事件,需要实现ChannelDuplexHandleruserEventTriggered方法,判断事件类型并自定义处理方式,来对事件进行处理
  • 为避免因非网络等原因引发的READ_IDLE事件,比如网络情况良好,只是用户本身没有输入数据,这时发生READ_IDLE事件,直接让服务器断开连接是不可取的

    为避免此类情况,需要在客户端向服务器发送心跳包,发送频率要小于服务器设置的IdleTimeSeconds,一般设置为其值的一半

客户端代码:

// 发送心跳包,让服务器知道客户端在线
// 3s未发生WRITER_IDLE,就像服务器发送心跳包
// 该值为服务器端设置的READER_IDLE触发时间的一半左右
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
ch.pipeline().addLast(new ChannelDuplexHandler() {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.WRITER_IDLE) {// 发送心跳包ctx.writeAndFlush(new PingMessage());}}
});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/35317.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python3 学习笔记——基本知识入门 | 菜鸟教程

Python3 学习笔记——基本知识入门 | 菜鸟教程 目录 Python3 学习笔记——基本知识入门 | 菜鸟教程基础知识标识符python保留字注释行与缩进多行语句数字(Number)类型字符串(String)空行等待用户输入同一行显示多条语句多个语句构成代码组print 输出import 与 from...import命令…

DDR3控制器(一)DDR3 IP调用

目录 一、DDR3 IP核简介 二、DDR3 IP核调用 在千兆以太网通信中用到了DDR3控制器&#xff0c;但是并没有对其做相关介绍。这次准备重新整理一下DDR3控制相关知识&#xff0c;复习巩固一下。 一、DDR3 IP核简介 MIG IP核&#xff08;Memory Interface Generator&#xff09;是…

气膜游泳馆建造成本要多少—轻空间

随着人们对健康生活的追求和游泳运动的普及&#xff0c;游泳馆的需求不断增加。传统游泳馆的建设周期长、成本高&#xff0c;而气膜游泳馆以其独特的优势成为一种新的选择。轻空间将详细分析气膜游泳馆的建造成本及其优势。 气膜游泳馆的基本结构 气膜游泳馆主要由以下几个部分…

静电场的基本方程

目录 场积分方程 通量&#xff08;高斯定理&#xff09; 环量 场微分方程 散度 旋度 小结 补充知识 立体角 场积分方程 通量&#xff08;高斯定理&#xff09; 环量 场微分方程 散度 旋度 小结 补充知识 立体角

Open3D 删除点云中重复的点

目录 一、算法原理1、重叠点2、主要函数二、代码实现三、结果展示本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT。 一、算法原理 1、重叠点 原始点云克隆一份   构造重叠区域   合并点云获得重叠点 2、主要…

玄子Share-本地部署 AI 大模型与构建知识库

玄子Share-本地部署 AI 大模型与构建知识库 部署环境概述 警告&#xff01;OpenAI 宣布全面封锁中国 API 接入 昨天&#xff0c;许多开发者从 OpenAI 那收到了一份警告信 您好&#xff0c; 据我们的数据监测&#xff0c;贵组织正从 OpenAl 当前未支持的区域产生 API 访问流量…

ai智能写作助手有哪些?3款AI工具推荐

ai智能写作助手有哪些&#xff1f;在数字化时代的浪潮中&#xff0c;AI智能写作助手如同智慧的灯塔&#xff0c;照亮了创作者们的道路。它们不仅极大地提升了写作效率&#xff0c;让文字流淌更加顺畅&#xff0c;更能够深入挖掘和激发创作者的内在灵感&#xff0c;将创意的火花…

github配置可拉取项目到本地

首先配置用户名和邮箱&#xff1a; git config --global user.name 自己的名字git config --global user.email 自己的邮箱配置完之后检查一下&#xff1a; git config --global user.namegit config --global user.email如果提示的是自己配置好的名字和邮箱就Ok 然后拉取githu…

Luminar Neo 1.20.0 (macOS Universal) - 创新 AI 图像编辑器

Luminar Neo 1.20.0 (macOS Universal) - 创新 AI 图像编辑器 利用尖端的人工智能生成技术&#xff0c;轻松增强照片效果 请访问原文链接&#xff1a;https://sysin.org/blog/luminar-neo/&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&#xff1…

Linux系统学习——指令四

Linux系统学习——指令四 Linux 系统学习——指令四查看文件MD5校验和fuser 指令基本语法常用选项访问类型使用示例 系统信息 Linux 系统学习——指令四 查看文件MD5校验和 在Linux中&#xff0c;你可以使用 md5sum 命令来查看一个文件的MD5校验和。以下是具体的操作方法&…

河南资信乙级预评价:人员需缴唯一社保吗?

河南资信乙级预评价中&#xff0c;人员确实需要缴纳唯一社保。以下是详细的解读和归纳&#xff1a; 一、社保唯一性的定义 社保唯一性指的是参与河南资信乙级预评价的咨询工程师&#xff08;投资&#xff09;必须在申请单位有唯一且连续的社保缴纳记录。这一要求旨在确保咨询…

【python013】pyinstaller打包PDF提取脚本为exe工具

1.在日常工作和学习中&#xff0c;遇到类似问题处理场景&#xff0c;如pdf文件核心内容截取&#xff0c;这里将文件打包成exe可执行文件&#xff0c;实现功能简便使用。 2.欢迎点赞、关注、批评、指正&#xff0c;互三走起来&#xff0c;小手动起来&#xff01; 3.欢迎点赞、关…

Pura 70 系列超高速风驰闪拍,捕捉美好,告别抓拍模糊

及时而准确的将画面定格&#xff0c;把事件最具有表现力的瞬间直观、真实地传达给观者&#xff0c;以使将抓拍影响的意义发挥最大化&#xff0c;由于抓拍摄影作品大多反映的是比较自然&#xff0c;真实的人和事&#xff0c;得到了社会的广泛认可&#xff0c;抓拍摄影也正日益成…

vue项目无后台版本打包上传到服务器

打包项目 也可以在文件目录下npm run build 生成dist文件夹 将dist文件夹里的所有文件拷贝到站点的根目录&#xff0c;这里使用宝塔面板进行操作 前提你得先创建站点&#xff0c;域名绑定等操作

#03动态规划

要点&#xff1a; 动态规划方法与贪心法、分治法的异同&#xff1b; 动态规划方法的基本要素与求解步骤&#xff1b; 动态规划方法的应用。 难点&#xff1a; 如何根据问题的最优子结构性质构造构造动态规划方法中的递归公式或动态规划方程。 动态规划的基本思想 动态规…

【计算机网络仿真】b站湖科大教书匠思科Packet Tracer——实验9 IPv4地址 — 划分子网

一、实验目的 1.学习划分子网的方法&#xff1b; 2.验证子网掩码的作用。 二、实验要求 1.使用Cisco Packet Tracer仿真平台&#xff1b; 2.观看B站湖科大教书匠仿真实验视频&#xff0c;完成对应实验。 三、实验内容 1.构建网络拓扑&#xff1b; 2.划分子网&#xff1b; …

vscode_cmake_stm32_lvgl移植及显示优化

1 LVGL移植 本文使用的环境如下&#xff1a; STM32H743FreeRTOSst7789 lcd(320*240) 下载 LVGL源码&#xff0c;本文使用Release v9.1.0&#xff1b; 将压缩包解压到工程目录&#xff0c;例如stm32h7xx_cmake_project/components/lvgl-9.1.0&#xff0c;如下所示&#xff1a; …

算法04 模拟算法之一维数组相关内容详解【C++实现】

大家好&#xff0c;我是bigbigli&#xff0c;模拟算法我们将分为几个章节来讲&#xff0c;今天我们只看一维数组相关的题目 目录 模拟的概念 训练&#xff1a;开关灯 解析 参考代码 训练&#xff1a;数组变化 解析 参考代码 训练&#xff1a;折叠游戏 解析 参考代码 …

[leetcode]rotate-array 轮转数组

. - 力扣&#xff08;LeetCode&#xff09; class Solution { public:void reverse(vector<int>& nums, int start, int end) {while (start < end) {swap(nums[start], nums[end]);start 1;end - 1;}}void rotate(vector<int>& nums, int k) {k % num…

酷开系统丨酷开科技AI赋能数字大屏,开启智能家居新纪元

在当今数字化时代&#xff0c;人工智能&#xff08;AI&#xff09;技术的崛起无疑为科技领域带来了革命性的变化。酷开科技&#xff0c;正以其独特的"AI数字大屏"战略&#xff0c;将创新理念转化为现实&#xff0c;引领行业发展新潮流。 酷开科技的智能电视操作系统…