【netty】三万字详解!JAVA高性能通信框架,关于netty,看这一篇就够了

目录

1.概述

2.hello world

3.EventLoop

4.channel

4.1.同步

4.2.异步

4.3.调试

4.4.关闭

4.5.为什么要用异步

5.future

6.promise

7.pipeline

8.byteBuf

8.1.创建

8.2.内存模式和池化

8.2.1.内存模式

8.2.2.池化

8.3.组成

8.4.操作

8.4.1.读写

8.4.2.释放

8.5.零拷贝

8.5.1.slice

8.5.2.composite

8.6.工具类

9.双向通信

10.粘包半包

10.1.问题成因

10.2.解决办法

10.2.1.短连接

10.2.2.解码器

1.概述

2.定长解码器

3.行解码器

4.固定帧长的解码器

11.协议解析

11.1.Redis

11.2.Http

12.协议设计

12.1.概述

12.2.编码


1.概述

netty,说人话就是封装NIO做出来的一个JAVA高性能通信框架。在JAVA领域,有高性能网络通信需求的时候,绝大多数都会选择netty作为通信框架。

关于JAVA的通信,我猜想可能博主的另外两篇关于BIO和NIO的文章作为本文的导读会不错:

详解TCP-CSDN博客

详解JAVA Socket-CSDN博客

JAVA BIO_java的bio有哪些-CSDN博客

全网最清晰JAVA NIO,看一遍就会-CSDN博客

netty底层就是封装的NIO。如果自己使用NIO的话至少会有以下的不便:

  • 需要自己构建协议。

  • 需要自己解决TCP传输问题,如粘包、半包。

  • API过于底层,不便于使用。

netty其实就是封装了一下NIO,使得NIO更便于使用。

2.hello world

依赖:

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.39.Final</version>
</dependency>

服务器:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
​
public class HelloServer {public static void main(String[] args) {//ServerBootstrap,启动器,负责组装netty组件new ServerBootstrap()//1.怎样去接收IO?//事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件.group(new NioEventLoopGroup())//2.接收成什么?//服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的//除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel.channel(NioServerSocketChannel.class)//3.做什么处理?//支持用责任链模式来对收到的IO进行链式处理.childHandler(new ChannelInitializer<NioSocketChannel>() {//连接建立后才会调用初始化方法@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//指定解码方式nioSocketChannel.pipeline().addLast(new StringDecoder());//ChannelInboundHandlerAdapter接口是netty种让用户自定义handler的接口nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);}});}})//4.绑定监听端口.bind(8080);}
}

客户端:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
​
import java.net.InetSocketAddress;
​
public class HelloCleint {public static void main(String[] args) throws InterruptedException {new Bootstrap().group(new NioEventLoopGroup())//用什么进行发送?//可以是BIO,也可以是NIO,也可以是epoll.channel(NioSocketChannel.class)//处理器.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//指定编码方式nioSocketChannel.pipeline().addLast(new StringEncoder());}})//连接到服务器.connect(new InetSocketAddress("localhost",8080))//同步通信.sync()//代表连接对象.channel()//发送数据.writeAndFlush("hello world");}
}

3.EventLoop

eventLoop,事件循环对象,是一个单线程执行器,本质上就是一条线程+一个selector,用来单线程监听处理IO事件。

实际使用上很少直接使用EventLoop,而是使用EventLoopGroup,EventLoopGroup的构造方法中可以指定其中的EventLoop数量。

eventLoop除了继承Netty体系类的一些标准化接口外,还继承了JDK中的ScheduledExecutorService,使得其自身具备线程池一切的能力。既然是线程池,就可以用来执行任务。

eventLoop执行普通任务:

EventLoopGroup group =new NioEventLoopGroup(5);
​group.next().submit(()->{try {Thread.sleep(10000);System.out.println("success!");} catch (Exception e) {e.printStackTrace();}});

eventLoop执行IO任务:

一个EventGroupLoop其实就是一条线程,用来处理一条通信连接。

public static void main(String[] args) {//ServerBootstrap,启动器,负责组装netty组件new ServerBootstrap()//1.怎样去接收IO?//事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件.group(new NioEventLoopGroup())//2.接收成什么?//服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的//除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel.channel(NioServerSocketChannel.class)//3.做什么处理?//支持用责任链模式来对收到的IO进行链式处理.childHandler(new ChannelInitializer<NioSocketChannel>() {//连接建立后才会调用初始化方法@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//指定解码方式nioSocketChannel.pipeline().addLast(new StringDecoder());//ChannelInboundHandlerAdapter接口是netty种让用户自定义handler的接口nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);}});}})//4.绑定监听端口.bind(8080);}

netty还给了一种更加细粒度的分层,就是让一部分EventLoop来选择IO,一部分EventLoop来处理IO,说白了就是一部分EventLoop出selector,一部分EventLoop出Thread。

public static void main(String[] args) {//ServerBootstrap,启动器,负责组装netty组件new ServerBootstrap()//boss线程只负责accept事件,worker线程只负责io读写.group(new NioEventLoopGroup(),new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new StringDecoder());nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);}});}}).bind(8080);}

4.channel

channel,对NIO的channel的二次封装,内核段缓冲区的抽象。不管是服务端还是客户端,只要调用channel()方法都能获取当前工作的这条channel。channel无非要注意的点就是它的同步和异步。

在实际应用中我们要知道在读的时候同步和异步是没有意义的,不可能在读IO的时候还区分同步读或者异步读,只可能是准备好了就读。只有写IO的时候区分同步和异步才是意义。所以在netty体系里很少会去服务端操作channel的同步和异步,一般都是在客户端操作channel的同步和异步。

4.1.同步

服务端:

在服务端让建立连接的时候休眠3秒。

public static void main(String[] args) {//ServerBootstrap,启动器,负责组装netty组件new ServerBootstrap()//1.怎样去接收IO?//事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件.group(new NioEventLoopGroup())//2.接收成什么?//服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的//除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel.channel(NioServerSocketChannel.class)//3.做什么处理?//支持用责任链模式来对收到的IO进行链式处理.childHandler(new ChannelInitializer<NioSocketChannel>() {//连接建立后才会调用初始化方法@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//指定解码方式nioSocketChannel.pipeline().addLast(new StringDecoder());//ChannelInboundHandlerAdapter接口是netty种让用户自定义handler的接口nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Thread.sleep(3000);System.out.println(msg);}});}})//4.绑定监听端口.bind(8080);}

客户端:

客户端使用channel的sync来进行同步通信,同步模式下在connect建立连接的时候,主线程会同步等待,连接建立后再向下执行。

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
​
import java.net.InetSocketAddress;
​
public class HelloCleint {public static void main(String[] args) throws InterruptedException {new Bootstrap().group(new NioEventLoopGroup())//用什么进行发送?//可以是BIO,也可以是NIO,也可以是epoll.channel(NioSocketChannel.class)//处理器.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//指定编码方式nioSocketChannel.pipeline().addLast(new StringEncoder());}})//连接到服务器.connect(new InetSocketAddress("localhost",8080))//同步通信.sync()//代表连接对象.channel()//发送数据.writeAndFlush("hello world");}
}

4.2.异步

channel默认处于异步通信模式,connect建立连接的时候,不会同步等待,而是会继续向下执行,由于服务器端延迟了3秒来建立连接,所以客户端发送这条“hello server”发送时,连接并未建立完成,最终效果就是丢包,服务器收不到这条数据。

public static void main(String[] args) throws InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup())//用什么进行发送?//可以是BIO,也可以是NIO,也可以是epoll.channel(NioSocketChannel.class)//处理器.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//指定编码方式nioSocketChannel.pipeline().addLast(new StringEncoder());}})//连接到服务器.connect(new InetSocketAddress("localhost", 8080));//异步channelFuture.channel().writeAndFlush("hello world");}

当然,在异步通信上,netty支持了监听器,建立连接完成后,用事件回调的方式触发监听器。利用监听器,可以使得异步通信不丢包:

//异步
channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {channelFuture.channel().writeAndFlush("hello world");}
});

用监听器发送数据后,在当前业务场景下,即使服务端延迟了三秒才建立连接,但是任然能收到“hello world”这条消息。

4.3.调试

EmbeddedChannel是Netty中提供的一种特殊类型的Channel实现,主要用于单元测试。它允许你在测试中模拟输入事件(例如读取数据、写入数据)并检查输出事件(例如读取到的数据)。使用EmbeddedChannel可以在不启动真实的网络连接的情况下测试你的ChannelHandler逻辑。

代码示例:

自定义一个handler:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
​
public class UpperCaseHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) {String upperCaseMsg = msg.toUpperCase();ctx.writeAndFlush(upperCaseMsg);}
}

测试:

import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.jupiter.api.Test;
​
import static org.junit.jupiter.api.Assertions.assertEquals;
​
public class UpperCaseHandlerTest {@Testpublic void testUpperCaseHandler() {// 创建EmbeddedChannel,并添加要测试的HandlerEmbeddedChannel channel = new EmbeddedChannel(new UpperCaseHandler());
​// 写入一个字符串消息到Channelchannel.writeInbound("hello");
​// 读取Channel的输出String output = channel.readOutbound();
​// 验证处理后的消息是否符合预期assertEquals("HELLO", output);
​// 关闭Channelchannel.finish();}
}

4.4.关闭

由于channel的close方法是异步的,所以在关闭资源时会存在风险。比如代码顺序为:

  • close掉channel

  • close掉其它资源

有可能在close掉其它资源的时候,channel并没有close掉,也就可能出现,channel中还有数据没处理完,其它资源被关掉了,导致数据处理失败的问题。所以更为稳妥的方式是用同步的机制来关闭channel。netty中封装了CloseFuture来同步关闭channel。

ChannelFuture closeFuture = channelFuture.channel().closeFuture();
//同步关闭
closeFuture.sync();

要注意的是channel停止后如果EventLoopGroup还有其它线程时,程序是不会中止的,想要中止程序,必须再close掉group,EventLoopGroup提供了优雅停机的API——shutdownGracefully,会先停止接收请求,驻留的请求处理完成后,关掉group。

4.5.为什么要用异步

我们可以看到channel里面大量的用到了异步,对一个channel的操作,connect是一条线程,write是一条线程,close也是一条线程......

用异步的方式来处理,不仅不会加快单个IO任务的速度,反而还会略微拉长一个IO的响应时间,但是异步能明显提高吞吐量。

举个例子,一个病人看病,分为挂号、看病、缴费。取药,同步的方式就是一个医生走完一个病人的所有流程:

而异步的方式就是医生分工合作,每个医生单独负责一个项目,这样一个时间段内虽然处理的任务综合是一样的,但是在峰值的吞吐量上,异步是同步的四倍:

5.future

JDK的future是表示一个任务,netty的future是对JDK的future做了二次封装。

同步:

public static void main(String[] args) throws Exception {NioEventLoopGroup nioEventLoopGroup=new NioEventLoopGroup();Future<String> future = nioEventLoopGroup.submit(new Callable<String>() {public String call() throws Exception {Thread.sleep(1000);return "success!";}});//future的get方法是同步的,同步等待线程返回返回值为止System.out.println(future.get());}

异步:

用监听器实现异步

public static void main(String[] args) throws Exception {NioEventLoopGroup nioEventLoopGroup=new NioEventLoopGroup();Future<String> future = nioEventLoopGroup.submit(new Callable<String>() {public String call() throws Exception {Thread.sleep(1000);return "success!";}});//用监听器来实现异步future.addListener(new GenericFutureListener<Future<? super String>>() {public void operationComplete(Future<? super String> future) throws Exception {System.out.println(future.get());}});}

6.promise

光是有future是不够的,因为future必须处理完了,才能拿到结果,有些时候需要提前拿到结果开始处理,就需要在两个线程间进行通信,通信就需要一个存放数据的地方,也就有了promise,其可以理解为一个数据容器,可以向该容器中手动的存放数据、拿数据。

import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;public static void main(String[] args) {EventLoopGroup eventLoopGroup=new NioEventLoopGroup();EventLoop eventLoop = eventLoopGroup.next();final DefaultPromise<String> promise=new DefaultPromise<String>(eventLoop);eventLoop.execute(new Runnable() {public void run() {try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}promise.setSuccess("success!");}});//默认是同步try {System.out.println(promise.get());} catch (Exception e) {e.printStackTrace();}//可以用监听器来实现异步//promise.addListener(new GenericFutureListener<Future<? super String>>() {//public void operationComplete(Future<? super String> future) throws Exception {//System.out.println(promise.get());//}//});}

promise支持向外抛异常:

import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;public class PromiseDemo {public static void main(String[] args) {EventLoopGroup eventLoopGroup = new NioEventLoopGroup();EventLoop eventLoop = eventLoopGroup.next();final DefaultPromise<String> promise = new DefaultPromise<String>(eventLoop);eventLoop.execute(new Runnable() {public void run() {try {int i = 1 / 0;} catch (Exception e) {promise.setFailure(e);}}});try {System.out.println(promise.get());} catch (Exception e) {e.printStackTrace();}}
}

7.pipeline

netty中使用了责任链来处理对channel的读写请求,链上每一个节点都是一个处理器,有两种处理器:

  • 出站处理器,用来处理write操作。

  • 入站处理器,用来处理read操作。

这里要注意,是出战、入站,不是出栈、入栈。Netty 的设计参考了这种网络协议栈的思想,所以出站(Outbound)和入站(Inbound)这两个概念是遵循网络协议栈的传统命名。

  • 出站(Outbound):数据从应用程序流向网络的过程被称为“出站”,因为数据是从应用程序向外发送,穿越协议栈的各个层级,最终到达网络。

    在 Netty 中,ChannelOutboundHandlerAdapter 处理的是数据从应用程序到网络的过程,即数据从上层(应用层或业务层)向下层(传输层、网络层、数据链路层等)传递的过程。

  • 入站(Inbound):数据从网络流向应用程序的过程被称为“入站”,因为数据是从外部网络进入应用程序,穿越协议栈的各个层级,最终到达应用程序。

    在 Netty 中,ChannelInboundHandlerAdapter 处理的是数据从网络到应用程序的过程,即数据从下层(传输层、网络层、数据链路层等)向上层(应用层或业务层)传递的过程。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class Server {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//H1->H2->H3->h4->h5->h6//入站处理器nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 							{System.out.println("H1");//向下走super.channelRead(ctx,msg);}});nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 							{System.out.println("H2");//向下走super.channelRead(ctx,msg);}});nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 							{System.out.println("H3");//写操作,用来触发后面的出站处理器nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Servers......".getBytes()));}});//出站处理器nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object o, ChannelPromise 		channelPromise) throws Exception {System.out.println("h4");super.write(ctx,o,channelPromise);}});nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object o, ChannelPromise channelPromise) throws Exception {System.out.println("h5");super.write(ctx,o,channelPromise);}});nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object o, ChannelPromise channelPromise) throws Exception {System.out.println("h6");super.write(ctx,o,channelPromise);}});}}).bind(8080);}
}

client:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;import java.net.InetSocketAddress;public class Client {public static void main(String[] args) throws InterruptedException {new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress("localhost", 8080)).sync().channel().writeAndFlush("hello world");}
}

server端的输出结果:

入站处理器顺序执行,出栈处理器逆序执行。

8.byteBuf

8.1.创建

在Java NIO(New I/O)中,ByteBuffer 是一个用来处理字节数据的缓冲区类,是对NIO的byteBuffer的二次封装和扩展,可以直接理解为用户段内存的抽象。

开辟byteBuf:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;public class test {public static void main(String[] args) {//可以通过传参来指定大小ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();}
}

8.2.内存模式和池化

8.2.1.内存模式

根据所开辟的内存空间的位置的不同,byteBuf分为两类:

  • 直接缓冲区

  • 非直接缓冲区

直接缓冲区:

直接创建在物理机的缓冲区中,创建和销毁的代价昂贵,但是读写性能高。要注意的是直接内存不受GC的管理,需要注意手动释放内存,避免内存泄露。

创建池化的直接缓冲区:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;public class test {public static void main(String[] args) {ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer();}
}

非直接缓冲区:

创建在JVM中的缓冲区,创建和销毁的代价相对没那么高,但是读写性能相对较低。

创建池化的非直接缓冲区:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;public class test {public static void main(String[] args) {ByteBuf directBuffer = ByteBufAllocator.DEFAULT.heapBuffer();}
}

8.2.2.池化

ByteBuf 的池化是指将 ByteBuf 实例预先分配并存储在内存池中,以便在需要时进行重复使用。池化 ByteBuf 的主要目的是减少内存分配和垃圾回收的开销,从而提高性能。Netty 提供了池化 ByteBuf 的功能,它内置了两种 ByteBuf 池化的实现:PooledByteBufAllocator 和 UnpooledByteBufAllocator。

1.PooledByteBufAllocator(池化的内存分配器)

PooledByteBufAllocator 是 Netty 提供的默认的 ByteBuf 池化实现。它通过预先分配一些 ByteBuf 实例,并将它们存储在池中。当需要创建新的 ByteBuf 实例时,它会从池中获取已有的实例,而不是每次都重新分配内存。

使用 PooledByteBufAllocator 可以减少频繁的内存分配和释放操作,避免了堆内存的碎片化,提高了性能。

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;public class PooledByteBufExample {public static void main(String[] args) {// 使用 PooledByteBufAllocator 创建 ByteBufByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;ByteBuf pooledBuffer = allocator.buffer(1024); // 创建1KB的池化 ByteBuf// 使用 pooledBuffer...// 释放 ByteBuf,将其返回到池中pooledBuffer.release();}
}

2.UnpooledByteBufAllocator(非池化的内存分配器)

UnpooledByteBufAllocator 是 Netty 提供的非池化的 ByteBuf 实现。它每次都会分配新的内存,不会重用已有的 ByteBuf 实例。虽然不会涉及到池的管理,但在一些短期存活或者需要手动管理内存的场景下使用非池化内存分配器可能更合适。

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;public class UnpooledByteBufExample {public static void main(String[] args) {// 使用 UnpooledByteBufAllocator 创建 ByteBufByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;ByteBuf unpooledBuffer = allocator.buffer(1024); // 创建1KB的非池化 ByteBuf// 使用 unpooledBuffer...// 释放 ByteBuf(注意:在非池化情况下,需要手动释放 ByteBuf)unpooledBuffer.release();}
}

在使用池化 ByteBuf 时,需要注意在不再使用 ByteBuf 时调用 release() 方法,将它返回到池中,以便被重用。这样可以避免内存泄漏和提高性能。

8.3.组成

10.Nettyä¹ByteBufä»ç»ï¼äºï¼01.png

bytebuf一开始有个初始化容量(capacity),可以手动指定,没有手动指定时也有个默认值。

bytebuf是自动扩容的,扩容的上限(max capacity)其实就是机器的物理内存。

读写指针一开始在0位,随着读写,读写指针向后移动。要注意,bytebuf的读写,只涉及指针的移动,不涉及内存的回收,也就是读过的区域(废弃字节)并不会被释放,除非调用特殊的API(discardReadBytes())。

netty的bytebuf相较于NIO的bytebuffer,有以下优势:

  • bytebuffer读写公用一个指针,所以,读之前要切换到读模式;写之前要切换到写模式。

  • bytebuf自动扩容,而bytebuffer不行。

8.4.操作

8.4.1.读写

写操作:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.nio.charset.StandardCharsets;public class test {public static void main(String[] args) {//写入数字ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();buffer.writeInt(666);// 写入字符串String stringValue = "Hello, World!";byte[] stringBytes = stringValue.getBytes(StandardCharsets.UTF_8);buffer.writeBytes(stringBytes);}
}

读操作:

// 读取整数
int readIntValue = buffer.readInt();// 读取字符串
int readableBytes = buffer.readableBytes();
byte[] stringBytes = new byte[readableBytes];
buffer.readBytes(stringBytes);
String readStringValue = new String(stringBytes, StandardCharsets.UTF_8);

需要注意的是,在读取数据之前,你需要确保 ByteBuf 中有足够的可读字节数。可以使用 readableBytes() 方法来检查 ByteBuf 中的可读字节数。

此外,ByteBuf 还提供了其他的读写操作,比如 readableBytes() 用于获取可读字节数,writerIndex()readerIndex() 用于获取写入和读取的索引位置等。在使用 ByteBuf 时,请确保在读写时不越界,并且注意释放 ByteBuf 以避免内存泄漏。在Netty中,通常会使用 ReferenceCountUtil.release(buffer) 来释放 ByteBuf,确保资源得到正确释放。

读写指针:

8.4.2.释放

bytebuf要特别注意资源的释放,以避免内存泄漏。Netty使用引用计数(Reference Counting)来管理 ByteBuf 的生命周期,确保在不再需要使用时及时释放资源。

在Netty中,release()retain() 是用于管理 ByteBuf 引用计数的方法。

release() 方法用于将 ByteBuf 的引用计数减少1。当引用计数减至0时,Netty会释放 ByteBuf 的内存(如果使用了池化的 ByteBuf,则将它归还给池)。

ByteBuf buffer = //... 从某个地方获取ByteBuf实例
buffer.release(); // 引用计数减少1,如果引用计数为0,释放ByteBuf的内存

retain() 方法用于将 ByteBuf 的引用计数增加1。当你调用 retain() 方法时,你告诉Netty你对这个 ByteBuf 感兴趣,即使在你使用完后,其他代码也可能继续使用它。

ByteBuf buffer = //... 从某个地方获取ByteBuf实例
buffer.retain(); // 引用计数增加1,防止在使用完后被提前释放

8.5.零拷贝

零拷贝其实没有严格的定义,指的是减少IO过程中数据在内存中拷贝的次数这样一个大致目标。在netty的ByteBuf中也存在一些零拷贝机制,用来在多个ByteBuf之间进行数据传递。

8.5.1.slice

在 Netty 中,ByteBufslice() 方法用于创建一个与原始 ByteBuf 共享数据的新 ByteBuf。换句话说,slice() 方法返回一个从原始 ByteBuf 中截取出来的视图,这个视图与原始 ByteBuf 共享底层数据,但拥有自己的独立读写指针。由于是直接通过读写指针指向同一块内存的,所以slice出来的bytebuf并没有发送数据拷贝,是0拷贝。

如何理解拥有自己的独立读写指针喃?因为slice出来的buf和元buf共享内存,为了避免slice出来的buf通过写指针来进行写,进而影响元buf,netty在设计时故意就禁止了slice动用写指针来向元buf中进行写。只能通过读指针来读。

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;public class test {public static void main(String[] args) {//开一个容量为10字节的ByteBufByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);//写入数据buffer.writeBytes(new byte[]{1,2});//sliceByteBuf slice = buffer.slice(0,2);//因为slice不与元buf共享读写指针,所以write会报错,因为write是用的读写指针来进行读写,但是set不会报错,因为set不是用的读写指针来进行读写的。//slice.writeByte(1);slice.setByte(0,2);while(slice.isReadable()){System.out.println(slice.readByte());}}
}

8.5.2.composite

slice是将一个大的bytebuf划分成多个小的bytebuff,composite是将多个小的bytebuf聚合成一个大的bytebuf。

在 Netty 中,CompositeByteBufByteBuf 的一个特殊实现,它提供了一种能够组合多个 ByteBuf 实例的方式。CompositeByteBuf 允许将多个 ByteBuf 视为一个单一的逻辑缓冲区,而不需要将它们合并成一个实际的连续内存块。这种设计可以提高内存的利用率和降低内存拷贝的次数。

public static void main(String[] args) {ByteBuf buffer1 = ByteBufAllocator.DEFAULT.buffer();buffer1.writeBytes(new byte[]{1,2,3,4,5});ByteBuf buffer2 = ByteBufAllocator.DEFAULT.buffer();buffer2.writeBytes(new byte[]{6,7,8,9,10});CompositeByteBuf compositeBuffer = ByteBufAllocator.DEFAULT.compositeBuffer();//可变参数,可以有多个compositeBuffer.addComponents(buffer1,buffer2);while (compositeBuffer.isReadable()){System.out.println(compositeBuffer.readByte());}}

8.6.工具类

Unpooled 是 Netty 提供的一个工具类,用于创建不需要池化的 ByteBuf 实例。在 Netty 中,ByteBuf 是用来操作字节数据的缓冲区类。通常,Unpooled 类提供了一些静态方法,用于创建不同类型的 ByteBuf 实例,包括堆缓冲区(heap buffer)、直接缓冲区(direct buffer)、组合缓冲区(composite buffer)等。

也就是说可以用unpooled来开辟各类型的bytebuf。

9.双向通信

服务端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;public class Server {public static void main(String[] args) {//ServerBootstrap,启动器,负责组装netty组件new ServerBootstrap()//1.怎样去接收IO?//事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件.group(new NioEventLoopGroup())//2.接收成什么?//服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的//除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel.channel(NioServerSocketChannel.class)//3.做什么处理?//支持用责任链模式来对收到的IO进行链式处理.childHandler(new ChannelInitializer<NioSocketChannel>() {//连接建立后才会调用初始化方法@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//指定解码方式nioSocketChannel.pipeline().addLast(new StringDecoder());//ChannelInboundHandlerAdapter接口是netty种让用户自定义handler的接口nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);ByteBuf response = ctx.alloc().buffer();response.writeBytes(msg.toString().getBytes());ctx.writeAndFlush(response);}});}})//4.绑定监听端口.bind(8080);}
}

客户端:

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;public class Client {public static void main(String[] args) throws InterruptedException {new Bootstrap().group(new NioEventLoopGroup())//用什么进行发送?//可以是BIO,也可以是NIO,也可以是epoll.channel(NioSocketChannel.class)//处理器.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//指定编码方式nioSocketChannel.pipeline().addLast(new StringEncoder());nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes("hello".getBytes());ctx.writeAndFlush(buffer);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg.toString());}});}})//连接到服务器.connect(new InetSocketAddress("localhost", 8080));}
}

10.粘包半包

10.1.问题成因

粘包:发送abc def,接收到abcdef

半包:发送abcdef,接收到abc或者def

原因:

  • 没有清晰的结束符,导致不知道收到何处才是一个完成的包。

  • IO缓冲区大小过大或者过小,导致收太多或者收不完。

解决粘包和半包问题通常需要在设计通信协议时采取一些策略。

10.2.解决办法

10.2.1.短连接

解决粘包半包问题的其中一个办法是——短连接。

所谓短连接就是当一次完整的报文返送完成后,客户端主动断开TCP连接。粘包半包的根本原因其实就是不知道一个完整的报文何时收完,通过客户端发送完一次完整的信息后主动断开连接,让服务器端感知到,一次完整的信息发送完成。

客户端:

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;public class Client {public static void main(String[] args) throws InterruptedException {for (int i=0;i<10;i++){send();}}public static void send(){NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//指定编码方式nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10});ctx.writeAndFlush(buffer);ctx.channel().close();}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();}catch(Exception e){e.printStackTrace();}finally {worker.shutdownGracefully();}}
}

服务器:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class Server {public static void main(String[] args) {NioEventLoopGroup boss=new NioEventLoopGroup();NioEventLoopGroup worker=new NioEventLoopGroup();try{ServerBootstrap serverBootstrap=new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss,worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();}catch (Exception e){e.printStackTrace();}finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}

10.2.2.解码器

1.概述

解码器是netty自带的一类用来从请求报文中解析出数据的handler。其底层原理都是从指定位置开始,解析出定长的字节内容来。

2.定长解码器

FixedLengthFrameDecoder,定长解码器,用来在报文中获取出指定长度的字节。

server:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;import java.nio.charset.StandardCharsets;public class server {public static void main(String[] args) throws InterruptedException {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 定长解码器,每个消息长度固定为10个字节pipeline.addLast(new FixedLengthFrameDecoder(10));// 业务处理器pipeline.addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf byteBuf = (ByteBuf) msg;String content = byteBuf.toString(StandardCharsets.UTF_8);System.out.println("Received message: " + content);byteBuf.release(); // 释放ByteBuf资源}});}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();}
}
3.行解码器

Netty的行处理器(LineBasedFrameDecoder)是一种用于处理以换行符(\n)或回车换行符(\r\n)为消息分隔符的情况。它会按照换行符或回车换行符将接收到的数据切分成消息,适用于处理文本协议中每行代表一个消息的场景。

server:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.LineBasedFrameDecoder;public class LineBasedServerHandler extends ChannelInboundHandlerAdapter {public LineBasedServerHandler() {// 添加行处理器到ChannelPipeline中,使用换行符作为消息分隔符ctx.pipeline().addLast(new LineBasedFrameDecoder(1024));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 这里的msg是一个ByteBuf,表示一个完整的行消息ByteBuf buf = (ByteBuf) msg;String line = buf.toString(io.netty.util.CharsetUtil.UTF_8);System.out.println("Received message: " + line);buf.release(); // 释放ByteBuf资源}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

客户端:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class LineBasedClient {public static void main(String[] args) throws InterruptedException {Bootstrap bootstrap = new Bootstrap();bootstrap.group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 添加行处理器到ChannelPipeline中,使用换行符作为消息分隔符pipeline.addLast(new LineBasedFrameDecoder(1024));// 客户端的业务处理器pipeline.addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) {// 发送带换行符的消息String message = "Hello, Netty!\n";ctx.writeAndFlush(message);}});}});bootstrap.connect("localhost", 8080).sync().channel().closeFuture().sync();}
}
4.固定帧长的解码器

Netty中的LengthFieldBasedFrameDecoder是一种用于解决粘包和半包问题的解码器。通信报文的结构说白了无非就是头部+身体,头部中记录关于消息长度等信息,身体中携带要传递的消息。LengthFieldBasedFrameDecoder就是根据设置的参数来准确的切分消息的头部和身体,就能确保每个消息被正确地接收和处理。

构造方法如下:

public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip)
  • maxFrameLength:指定消息的最大长度,超过这个长度的消息将被丢弃。

  • lengthFieldOffset:指定长度字段在消息中的偏移量。

  • lengthFieldLength:指定长度字段的长度,可以是1、2、3、4、8等字节。

  • lengthAdjustment:指定长度字段的值需要进行调整的偏移量,通常为消息头的长度。

  • initialBytesToStrip:指定解码时需要跳过的字节数,通常为长度字段的长度。

代码示例:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;public class LengthFieldServerHandler extends ChannelInboundHandlerAdapter {public LengthFieldServerHandler() {// 添加LengthFieldBasedFrameDecoder,指定各个参数ctx.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 这里的msg是一个ByteBuf,表示一个完整的消息ByteBuf buf = (ByteBuf) msg;String message = buf.toString(io.netty.util.CharsetUtil.UTF_8);System.out.println("Received message: " + message);buf.release(); // 释放ByteBuf资源}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

11.协议解析

11.1.Redis

11.2.Http

名字里带codec的,在业内基本都是编解码器。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;public class TestHttp {public static void main(String[] args) {NioEventLoopGroup boss=new NioEventLoopGroup();NioEventLoopGroup worker=new NioEventLoopGroup();try{ServerBootstrap serverBootstrap=new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss,worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));//Http的解码器socketChannel.pipeline().addLast(new HttpServerCodec());socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//经过http解码器解码后,请求会被解析为请求头实体或者请求体实体if(msg instanceof HttpRequest){//请求行、请求头}else if(msg instanceof HttpContent){//请求体}}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();}catch (Exception e){e.printStackTrace();}finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}

当然上面这种写法太繁琐了,netty提供了SimpleChannelInboundHandler,用泛型来指定处理请求头还是请求体:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;public class TestHttp {public static void main(String[] args) {NioEventLoopGroup boss=new NioEventLoopGroup();NioEventLoopGroup worker=new NioEventLoopGroup();try{ServerBootstrap serverBootstrap=new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss,worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));//Http的解码器socketChannel.pipeline().addLast(new HttpServerCodec());socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);byte[] bytes = "<h1>hello world!</h1>".getBytes();//响应头设置返回的消息的长度,否则浏览器不知道消息有多长,会一直刷新response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH,bytes.length);//响应体设置返回的消息response.content().writeBytes(bytes);//写回响应ctx.writeAndFlush(response);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();}catch (Exception e){e.printStackTrace();}finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}

12.协议设计

12.1.概述

自定义协议要素:

  • 魔术,用来判断数据包是否有效。

  • 版本号,协议版本号,用来支持协议升级。

  • 序列化算法,消息正文采用的序列化方式。

  • 指令类型,是登录、注册、还是其他........

  • 请求序号,用来支持双工通信,如TCP之类的。

  • 正文长度

  • 消息正文

12.2.编码

编解码,netty自带编解码器接口ByteToMessageCodec,允许开发者将数据报文转为自己想要的类型。

注意:想要转为的目标类型,必须是实现了序列化接口,可序列化的,不然会报错。

public class MyCodec extends ByteToMessageCodec<Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {//4字节的魔数out.writeBytes(new byte[]{1, 2,3,4});//1字节的版本out.writeByte(1);//1字节的序列化方式jdkout.writeByte(0);//1字节的指令类型out.writeByte(msg.getMessageType());//4个字节序号out.writeInt(msg.getSequenceId());//填充字段out.writeByte(0xff);//消息内容ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();//消息长度out.writeInt(bytes.length);//写入内容out.writeBytes(bytes);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//魔数int magicNum = in.readInt();//版本号byte version = in.readByte();//序列化类型byte serializerType = in.readByte();//指令类型byte messageType = in.readByte();//序号int sequenceId = in.readInt();//读填充字节in.readByte();//消息长度int length = in.readInt();//读消息byte[] bytes = new byte[length];in.readBytes(bytes, 0,length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();out.add(message);}
}

测试:

public static void main(String[] args) throws Exception {EmbeddedChannel channel=new EmbeddedChannel(new LoggingHandler(),new MyCodec());Message message=new Message();message.setData("hello".getBytes());//出站会调用codec的encode()channel.writeOutbound(message);ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();new MyCodec().encode( null,message, buf);//入站会调用codec的decode()channel.writeInbound(buf);}

测试半包、粘包问题:

public static void main(String[] args) throws Exception {EmbeddedChannel channel=new EmbeddedChannel(new LengthFieldBasedFrameDecoder(1024,12,4,0,0),new LoggingHandler(),new MyCodec());Message message=new Message();message.setData("hello".getBytes());ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();new MyCodec().encode( null,message, buf);ByteBuf s1=buf.slice(0,100);ByteBuf s2=buf.slice(100,buf.readableBytes()-100);//writeInbound后ByteBuf的引用计数会被-1,导致ByteBuf被释放掉,这里需要手动维持一下s1.retain();channel.writeInbound(s1);channel.writeInbound(s2);}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/856788.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习神经网络协同过滤模型(NCF)与用户协同过滤(UCF)的区别

一、效果图 点我查看在线demo 二、启发式推荐系统 推荐系统的核心是根据用户的兴趣需求&#xff0c;给用户推荐喜欢的内容。常用的推荐算法有启发式推荐算法&#xff0c;可分为基于用户的 协同过滤&#xff0c;基于物品的协同过滤。 1、基于用户的协同过滤&#xff08;UCF…

【笔记】打卡01 | 初学入门

初学入门:01-02 01 基本介绍02 快速入门库处理数据集网络构建模型训练保存模型加载模型打卡-时间 01 基本介绍 MindSpore Data&#xff08;数据处理层&#xff09; ModelZoo&#xff08;模型库&#xff09; MindSpore Science&#xff08;科学计算&#xff09;&#xff0c;包含…

DP:完全背包+多重背包问题

完全背包和01背包的区别就是&#xff1a;可以多次选 一、完全背包&#xff08;模版&#xff09; 【模板】完全背包_牛客题霸_牛客网 #include <iostream> #include<string.h> using namespace std; const int N1001; int n,V,w[N],v[N],dp[N][N]; //dp[i][j]表示…

【机器学习 复习】第6章 支持向量机(SVM)

一、概念 1.支持向量机&#xff08;support vector machine&#xff0c;SVM&#xff09;&#xff1a; &#xff08;1&#xff09;基于统计学理论的监督学习方法&#xff0c;但不属于生成式模型&#xff0c;而是判别式模型。 &#xff08;2&#xff09;支持向量机在各个领域内的…

CentOS Linux 7系统中离线安装MySQL5.7步骤

预计数据文件存储目录为&#xff1a;/opt/mysql/data 1、文件下载&#xff1a; 安装文件下载链接&#xff1a;https://downloads.mysql.com/archives/community/ 2、检查当前系统是否安装过MySQL [rootcnic51 mysql]# rpm -qa|grep mariadb mariadb-libs-5.5.68-1.el7.x86_6…

跨区域文件管控解决方案,一文了解

跨区域文件管控是一个涉及在不同地域或区域之间管理和控制文件的过程&#xff0c;它包括安全性、合规性和管理效率等多个方面。以下是一些关键的考量因素&#xff1a; 1.安全性&#xff1a;确保在传输过程中文件不被截获、篡改或泄露。使用加密技术保护文件&#xff0c;并确保传…

【初阶数据结构】深入解析带头双向循环链表:探索底层逻辑

&#x1f525;引言 本篇将介绍带头双向循环链表底层实现以及在实现中需要注意的事项&#xff0c;帮助各位在使用过程中根据底层实现考虑到效率上问题和使用时可能会导致的错误使用 &#x1f308;个人主页&#xff1a;是店小二呀 &#x1f308;C语言笔记专栏&#xff1a;C语言笔…

C#使用NPOI库实现Excel的导入导出操作——提升数据处理效率的利器

文章目录 一、NPOI库简介二、安装与引入三、Excel的导入操作1.CSV格式导入2.XLS格式导入3. XLSX格式导入 四、Excel的导出操作1. CSV格式导出2. XLS格式导出3. XLSX格式导出 五、NPOI库的应用优势与改进方向总结 在日常工作学习中&#xff0c;我们经常需要处理Excel文件&#x…

紫光展锐芯片进入烧录模式

实验平台&#xff1a;移远通信SC200L搭载SMART-EVB-G5开发板 软件进入&#xff1a; SPRD平台芯片可以通过adb进入fastboot模式&#xff0c;由fastboot flash boot等指令烧录&#xff1a; $ adb root $ adb reboot fastboot $ fastboot flash boot boot.img 由于usb传输一般都…

使用kibana创建索引的时候报错处理

报错信息&#xff1a;The index pattern youve entered doesnt match any indices. You can match your 1 index, below. 使用kibana创建索引的时候&#xff0c;无法进行下一步创建操作&#xff0c;出现这种情况有很多种情况&#xff0c;每个人遇到的问题会不一样。 第一种&am…

python基础篇(3):print()补偿知识点

1 print输出不换行 默认print语句输出内容会自动换行&#xff0c;如下&#xff1a; print("hello") print(" world") 结果&#xff1a; 在print语句中&#xff0c;加上 end’’ 即可输出不换行了 print("hello",end) print(" world&quo…

Java | Leetcode Java题解之第171题Excel表列序号

题目&#xff1a; 题解&#xff1a; class Solution {public int titleToNumber(String columnTitle) {int number 0;int multiple 1;for (int i columnTitle.length() - 1; i > 0; i--) {int k columnTitle.charAt(i) - A 1;number k * multiple;multiple * 26;}ret…

Python重力弹弓流体晃动微分方程模型和交直流电阻电容电路

&#x1f3af;要点 &#x1f3af;计算地球大气层中热层金属坠物运动轨迹 | &#x1f3af;计算炮弹最佳弹射角度耦合微分方程 | &#x1f3af;计算电磁拉莫尔半径螺旋运动 | &#x1f3af;计算航天器重力弹弓运动力学微分方程 | &#x1f3af;计算双摆的混沌运动非线性微分方程…

【C++】类和对象(四)拷贝构造、赋值运算符重载

文章目录 四、拷贝构造函数干嘛的&#xff1f;写拷贝构造函数的注意事项正确写法 不显示定义拷贝构造函数的情况浅拷贝:one:示例&#xff1a;内置类型:two:示例&#xff1a;自定义类型一个提问 深拷贝 五、赋值运算符重载运算符重载函数原型注意调用时的两种书写方式完整实现代…

SAFEnet加密机的加密算法和技术

SAFEnet加密机是一款功能强大、安全可靠的加密设备&#xff0c;它在网络安全领域发挥着不可替代的作用。下面将从特点、功能、应用及优势等方面对SAFEnet加密机进行详细介绍。 一、特点 先进的加密算法和技术&#xff1a;SAFEnet加密机采用了最先进的加密算法和技术&#xff0c…

12 物理层解析

物理层解析 一、物理层功能 ​ 物理层主要功能 功能一&#xff1a;为数据端设备提供传送数据的通路 功能二&#xff1a;传输数据 二、物理层关心的问题 &#xff08;一&#xff09;信号 ​ 信息是人对现实世界事物存在方式或运动状态的某种认识 ​ 数据是用于描述事物的…

网络安全:什么是SQL注入

文章目录 网络安全&#xff1a;什么是SQL注入引言SQL注入简介工作原理示例代码 攻击类型为什么SQL注入危险结语 网络安全&#xff1a;什么是SQL注入 引言 在数字化时代&#xff0c;数据安全成为了企业和个人最关心的问题之一。SQL注入&#xff08;SQL Injection&#xff09;是…

【面试干货】Java的基础类型和字节大小

【面试干货】Java的基础类型和字节大小 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在Java编程语言中&#xff0c;有八种基本数据类型&#xff0c;它们分别是&#xff1a;布尔型&#xff08;boolean&#xff09;、字节型&#xff08;byt…

厚膜电阻电路丝网印刷

厚膜丝网印刷 该技术用于需要长寿命、热耐久性、机械强度、导热性、高密度电气互连、低介电损耗等的苛刻应用 特征&#xff1a; 陶瓷标准工艺从前到后的通孔连接 正面和背面的丝网印刷电阻器是标准工艺 金导体可以用金线和/或氧化铝线进行线键合 可焊接金属化&#xff0c;…

oracle 数据库导入dmp文件

荆轲刺秦王 从线上正式环境导出的 dmp 文件&#xff0c;导入到本地 oracle 数据库。 1. 创建用户: CREATE USER hf_chip IDENTIFIED BY hf_chip; 2. 授予 CONNECT 和 RESOURCE 基本权限给新用户。 GRANT CONNECT, RESOURCE TO hf_chip; 3. 创建表空间 CREATE TABLESPACE…