根据黑马程序员netty视频教程学习所做笔记。
笔记demo:https://gitee.com/jobim/netty_learn_demo.git
参考博客:https://blog.csdn.net/cl939974883/article/details/122550345
一、概述
1.1 什么是Netty
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.Copy
Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
- 异步是一种独特的网络模型,在这里的指的是调用时的异步与异步IO不同(netty使用多线程来完成方法的调用和处理结果相分离),是指的方法调用和处理结果交由多个线程来进行处理的方式(调用方法的线程可以腾出手来做其他的事情),依旧是基于多路复用。
- 事件驱动指的是底层采用的是多路复用技术,也就是selector,当发生响应请求时才会被处理
1.2 Netty的地位
Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位
以下的框架都使用了 Netty,因为它们有网络通信需求!
- Cassandra - nosql 数据库
- Spark - 大数据分布式计算框架
- Hadoop - 大数据分布式存储框架
- RocketMQ - ali 开源的消息队列
- ElasticSearch - 搜索引擎
- gRPC - rpc 框架
- Dubbo - rpc 框架
- Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端
- Zookeeper - 分布式协调框架
1.3 Netty的优势
- Netty vs NIO
- 传统NIO,工作量大,bug 多。需要自己构建协议
- epoll 空轮询导致 CPU 100%。(netty通过一些方式解决了这个bug)
- Netty,解决 TCP 传输问题,如粘包、半包
- Netty对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer
- Netty vs 其它网络应用框架
- Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀
- 久经考验,16年,Netty 版本
- 2.x 2004
- 3.x 2008
- 4.x 2013
- 5.x 已废弃(没有明显的性能提升,维护成本高)
二、netty入门程序Hello World
2.1 代码示例
目标:客户端向服务端发送一个"helloworld",服务器进行接收打印!
加入依赖:
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.39.Final</version>
</dependency>
服务器端:
public class HelloServer {public static void main(String[] args) {// 1、启动器,负责装配netty组件,启动服务器new ServerBootstrap()// 2、创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector.group(new NioEventLoopGroup())// 3、选择服务器的ServerSocketChannel实现。其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现,其它实现还有.channel(NioServerSocketChannel.class)// 4、child 负责处理读写,该方法决定了 child 执行哪些操作// 是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel// ChannelInitializer 处理器(仅执行一次)// 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {System.out.println("initChannel");// 5、SocketChannel的处理器,使用StringDecoder解码,ByteBuf=>StringnioSocketChannel.pipeline().addLast(new StringDecoder());// 6、SocketChannel的业务处理,使用上一个处理器的处理结果nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {System.out.println(s);}});}// 7、ServerSocketChannel绑定8080端口}).bind(8080);}
}
客户端代码:
public class HelloClient {public static void main(String[] args) throws InterruptedException {new Bootstrap()// 创建 NioEventLoopGroup,同 Server.group(new NioEventLoopGroup())// 选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现.channel(NioSocketChannel.class)// ChannelInitializer 处理器(仅执行一次)// 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {System.out.println("initChannel");// 消息会经过通道 handler 处理,这里是将 String => ByteBuf 编码发出channel.pipeline().addLast(new StringEncoder());}})// 指定要连接的服务器和端口.connect(new InetSocketAddress("localhost", 8080))// Netty 中很多方法都是异步的,如 connect// 这时需要使用 sync 方法等待 connect 建立连接完毕.sync()// 获取 channel 对象,它即为通道抽象,可以进行数据读写操作.channel()// 写入消息并清空缓冲区.writeAndFlush("hello world");}
}
执行结果:
2.2 流程梳理
step1:服务端server启动:
- 首先会创建group组
- 接着指定channel实现类(这里是serversocketchannel,其中会处理accept()事件),并且来添加一些handler处理器。这里的添加的是初始化handler,该handler会在客户端发起连接时执行初始化操作也就是方法内内容。
- 监听端口。
step2:客户端client启动
- 同样创建group组。
- 指定连接的channel。同样也添加了一个初始化处理器,该处理器同样也在连接建立之后会被执行init方法。
- 执行connect(),发起连接(下面经过debug测试)
- 首先触发自己客户端的initChannel()事件执行初始化,这里添加了一个编码器(用于将发送的字符串=>ByteBuf传输出去)
- 接着触发server的initchannel来为pipeline(流水线)添加一些必要工序操作,这里添加了一个字符串解码器(用于接收客户端数据后将ByteBuf=>String);还有一个是InBound适配器,可进行一系列事件的自定义重写,这里的话重写了read()事件,之后客户端发送数据就会执行我们自定义的内容。
- 紧接着连接完毕之后sync()取到连接对象也就是之前定义的NioSocketChannel,取到之后向服务器发送一个字符串
- 发送过程中会先走StringEncoder中的编码方法,将String=>ByteBuf之后发送出去
- 接着服务端的read()事件接收好之后,同样也会走StringDecoder中的解码方法,将ByteBuf=>String,接着会执行channelRead()方法,其中的msg就是转换之后的字符串,我们这里仅仅只是打印即可!
组件介绍:
- 把 channel 理解为数据的通道
- 把 msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 的加工,会变成其它类型对象,最后输出又变成 ByteBuf
- 把 handler 理解为数据的处理工序
- 工序有多道,合在一起就是 pipeline,pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
- handler 分 Inbound 和 Outbound 两类
- 把 eventLoop 理解为处理数据的工人
- 工人可以管理多个 channel 的 io 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)
- 工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
- 工人按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人
三、组件
3.1 EventLoop
3.1.1、认识EventLoop和EventLoopGroup
事件循环对象 EventLoop
-
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理一个或多个 Channel 上源源不断的 io 事件
-
它的继承关系如下
-
继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
-
继承自 netty 自己的 OrderedEventExecutor
- 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
- 提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup
- 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
-
事件循环组 EventLoopGroup
- EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
- 继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
3.1.2、执行普通、定时任务
目的:通过NioEventLoopGroup事件循环组来去执行普通和定时任务。
public class TestEventLoop {public static void main(String[] args) {// 创建拥有两个EventLoop的NioEventLoopGroup,对应两个线程EventLoopGroup group = new NioEventLoopGroup(2);// 通过next方法可以获得下一个 EventLoopSystem.out.println(group.next());System.out.println(group.next());// 第三次调用的和第一次一样(初始化两个的情况)System.out.println(group.next());// 通过EventLoop执行普通任务group.next().execute(()->{System.out.println(Thread.currentThread().getName() + " hello");});// 通过EventLoop执行定时任务group.next().scheduleAtFixedRate(()->{System.out.println(Thread.currentThread().getName() + " hello2");}, 0, 1, TimeUnit.SECONDS);// 优雅地关闭// group.shutdownGracefully();}
}
结果:
对于demo中主线程结束了还能运行的原因是,线程中开辟的用户线程依旧在运行中。
- 分析:ThreadPoolExecutor中的runWorker方法里有一个getTask()方法,该方法不断从队列中拿任务执行,没有就阻塞,这也就是为什么主线程结束了,程序依旧在运行中的原因。
关闭 EventLoopGroup:
优雅关闭 shutdownGracefully
方法。该方法会首先切换 EventLoopGroup
到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的
3.1.3、执行IO任务(含2点细化)
服务端代码:
public class MyIOServer {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));}});}}).bind(8080);}
}
客户端代码:
public class MyIOClient {public static void main(String[] args) throws IOException, InterruptedException {Channel channel = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress("localhost", 8080)).sync().channel();System.out.println(channel);channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("zhangsan3".getBytes()));Thread.sleep(2000);channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("lisi3".getBytes()));// 此处打断点调试,调用 channel.writeAndFlush(...);System.in.read();}
}
多个客户端执行:分别修改发送字符串为 zhangsan1(第一次),zhangsan2(第二次),zhangsan3(第三次)
每当来临一个连接,此时就会将该channel去绑定到指定的一个EventLoop中的selector中,每个NioEventLoop都是一个线程,之后该channel的其他事件都由这个EventLoop来去处理执行,这就与我们之前手写多线程NIO多路复用的思路完全一致
分工细化
-
Bootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理不同的事件。
- Boos、worker各指定一个组,Boos只负责serversocketchannel的accept监听,worker负责建立连接后得到的channel均衡绑定到各个eventloop的selector上。
-
若是执行handler中间有一些较耗时的操作,那么可以添加一个新的handler并交由一个处理普通事件的eventloop来进行异步处理!
/*** 1:细化工作组。* 2:耗时较长的任务交给指定组进行异步执行*/
public class MyServerV2 {public static void main(String[] args) {// 增加自定义的非NioEventLoopGroupEventLoopGroup group = new DefaultEventLoopGroup();new ServerBootstrap()// 两个Group,分别为Boss 负责Accept事件,Worker 负责读写事件//1. Boss对应一个组(不用传递参数也没事),负责NioServerSocketChannel的accept监听;// worker对应一个组,之后来临连接的channel都会绑定其某个EventLoop.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 增加两个handler,第一个使用NioEventLoopGroup处理,第二个使用自定义EventLoopGroup处理socketChannel.pipeline().addLast("nioHandler",new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));// 调用下一个handlerctx.fireChannelRead(msg);}})// 2.该handler绑定自定义的Group.addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));}});}}).bind(8080);}
}
执行结果:分别修改发送字符串为 zhangsan1(第一次),zhangsan2(第二次),zhangsan3(第三次)
可以看出,客户端与服务器之间的事件,被nioEventLoopGroup和defaultEventLoopGroup分别处理
3.1.4、源码分析(不同eventLoop,线程如何切换)
关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程EventExecutor executor = next.executor();// 返回下一个handler的eventLoop// 当前handler中的线程,是否和eventLoop是同一个线程if (executor.inEventLoop()) {// 是,直接调用next.invokeChannelRead(m);} // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}
}
- 如果两个 handler 绑定的是同一个线程,那么就直接调用
- 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用
3.2 Channel
Channel 的常用方法:
- close() 可以用来关闭Channel
- closeFuture() 用来处理 Channel 的关闭
- sync 方法作用是同步等待 Channel 关闭
- 而 addListener 方法是异步等待 Channel 关闭
- pipeline() 方法用于添加处理器
- write() 方法将数据写入
- 因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
- 只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
- writeAndFlush() 方法将数据写入并立即发送(刷出)
3.2.1 ChannelFuture连接建立(同步、异步)
ChannelFuture
作用:专门用于记录异步方法状态的返回结果。
public class MyClient {public static void main(String[] args) throws IOException, InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringEncoder());}})// 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程// NIO线程:NioEventLoop 中的线程.connect(new InetSocketAddress("localhost", 8080));// 该方法用于等待连接真正建立channelFuture.sync();// 获取客户端-服务器之间的Channel对象Channel channel = channelFuture.channel();channel.writeAndFlush("hello world");System.in.read();}
}
如果我们去掉channelFuture.sync()
方法,会服务器无法收到hello world
这是因为建立连接(connect)的过程是异步非阻塞的,若不通过sync()
方法阻塞主线程(调用线程),等待连接真正建立,这时通过 channelFuture.channel() 拿到的 Channel 对象,还未真正与服务器建立好连接,也就没法将信息正确的传输给服务器端
解决方法:
-
方法一:所以需要通过
channelFuture.sync()
方法,阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程//方式一:同步阻塞等待连接 //阻塞方法,直到连接建立之后再会停止阻塞继续向下执行。 // 若是不调用该方法,直接去获取channel来发送数据,很有可能因为没有建立好连接导致发送失败 channelFuture.sync();//底层源码保护性暂停,主线程await(),另一个线程创建成功之后唤醒 Channel channel = channelFuture.channel(); log.info("channel {}",channel); //测试:channel.writeAndFlush("hello")
-
方法二:用于异步获取建立连接后的 Channel 和发送数据,使得执行这些操作的线程是 NIO(eventLoop) 线程(去执行connect操作的线程)
//方式二:添加一个监听器,来异步处理结果 channelFuture.addListener(new ChannelFutureListener() {//当连接完成就会执行该回调方法:执行完成事件,其中channelFuture就是本身对象@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {Channel channel = channelFuture.channel();log.info("channel {}",channel);channel.writeAndFlush("hello!");} });
代码示例:
public class MyClient {public static void main(String[] args) throws IOException, InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringEncoder());}})// 1.链接到服务器// 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程(NioEventLoop 中的线程).connect(new InetSocketAddress("localhost", 8080));// 2.1 使用sync方法同步处理结果// 该方法用于等待连接真正建立channelFuture.sync();// 获取客户端-服务器之间的Channel对象Channel channel = channelFuture.channel();channel.writeAndFlush("hello world");System.in.read();// 2.2 使用addListener(回调对象)方法一步处理结果channelFuture.addListener(new ChannelFutureListener() {@Override// 当connect方法执行完毕后,也就是连接真正建立后,会在NIO线程中调用operationComplete方法public void operationComplete(ChannelFuture channelFuture) throws Exception {Channel channel = channelFuture.channel();channel.writeAndFlush("hello world");}});System.in.read();}
}
3.2.2 CloseFuture连接关闭(同步、异步)
当我们要关闭channel时,可以调用channel.close()方法进行关闭。但是该方法也是一个异步方法,调用方法返回一个ChannelFuture。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作
如果我们想在channel真正关闭以后,执行一些额外的操作,可以选择以下两种方法来实现
-
通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作
// 获得closeFuture对象 ChannelFuture closeFuture = channel.closeFuture();// 同步等待NIO线程执行完close操作 closeFuture.sync(); // 关闭EventLoopGroup group.shutdownGracefully();
-
调用closeFuture.addListener方法,添加close的后续操作
closeFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {// 等待channel关闭后才执行的操作System.out.println("关闭之后执行一些额外操作...");// 关闭EventLoopGroupgroup.shutdownGracefully();} });
代码示例:
public class CloseFutureClient {public static void main(String[] args) throws InterruptedException {// 创建EventLoopGroup,使用完毕后关闭NioEventLoopGroup group = new NioEventLoopGroup();ChannelFuture channelFuture = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 可以打印channel的一些运行流程、状态信息。需要配置logbacksocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));socketChannel.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress("localhost", 8080));channelFuture.sync();Channel channel = channelFuture.channel();Scanner scanner = new Scanner(System.in);// 创建一个线程用于输入并向服务器发送new Thread(()->{while (true) {String msg = scanner.next();if ("q".equals(msg)) {// 关闭操作是异步的,在NIO线程中执行channel.close();break;}channel.writeAndFlush(msg);}}, "inputThread").start();// 获得closeFuture对象ChannelFuture closeFuture = channel.closeFuture();System.out.println("waiting close...");// 方式一:同步等待NIO线程执行完close操作closeFuture.sync();// 关闭之后执行一些操作,可以保证执行的操作一定是在channel关闭以后执行的System.out.println("关闭之后执行一些额外操作...");// 关闭EventLoopGroupgroup.shutdownGracefully();//方式2:异步处理关闭结果// closeFuture.addListener(new ChannelFutureListener() {// @Override// public void operationComplete(ChannelFuture future) throws Exception {// System.out.println("处理关闭之后的操作");// // 关闭EventLoopGroup// group.shutdownGracefully();// }// });}
}
3.2.3 为什么netty要用异步?异步提升了什么?
结论:对每个操作步骤进行合理的拆解并且通过多线程+异步执行,在一定时间内能够提升吞吐量,但是对于总体响应时间不减反增。(这里吞吐量实际上我们可以看成来建立连接处理的个数!)
3.3 Future & Promise
在异步处理时,经常用到这两个接口。netty的future继承了JDK的future;netty的promise继承了netty的future。
3.3.1、介绍Future与Promise
说明: netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
- jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
- netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
- netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addLinstener | - | 添加回调,异步接收结果 | - |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
本质都是等待唤醒机制,这个机制一个应用就是保护性暂停,另一个就是生产者消费者,都是线程通信。
额外:
- 对于promise,netty比es6出来早
- jdk中的future不能够区分任务是成功还是失败!
- future就是在线程间传递一个结果或者传递一个数据的容器。
- 该future中的数据是由执行任务的线程来进行填充进去的,我们自己没有机会去填,之后我们可以使用promise来去自己填充进去!
3.3.2、JDK的Future示例(线程间取值)
案例目的:主线程中获取线程池中某个线程处理任务的结果!
public class JdkFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadFactory factory = new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "JdkFuture");}};// 创建线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), factory);// 获得Future对象Future<Integer> future = executor.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {TimeUnit.SECONDS.sleep(1);return 50;}});// 通过阻塞的方式,获得运行结果System.out.println("运行结果:"+future.get());}
}
3.3.3、netty的Future示例(同步、异步)
public class NettyFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();// 获得 EventLoop 对象EventLoop eventLoop = group.next();Future<Integer> future = eventLoop.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {System.out.println(Thread.currentThread().getName() + " 执行任务...");return 50;}});// 主线程中获取结果System.out.println(Thread.currentThread().getName() + " 获取结果");// 获取任务结果,非阻塞,还未产生结果时返回 nullSystem.out.println("getNow " + future.getNow());// 方式一:同步取得结果(主线程阻塞获取)// System.out.println("get " + future.get());// NIO线程中异步获取结果future.addListener(new GenericFutureListener<Future<? super Integer>>() {@Overridepublic void operationComplete(Future<? super Integer> future) throws Exception {System.out.println(Thread.currentThread().getName() + " 获取结果");System.out.println("getNow " + future.getNow());}});}
}
同步方法执行:
异步方法执行:可以看到执行任务和获取结果都是同一个线程处理
3.3.4、netty的promise示例
描述:
- 前面的future不能主动来装数据
- 使用promise可以准确的知道数据是处理正常还是异常!
- 开发网络框架,例如RPC,Promise的重要性比较大
- setSuccess()表示结果正确,setFailure(e)表示结果不正确会抛出异常!
案例目的:通过使用promise来去表示执行某个任务的结果是成功还是失败!主线程可以来进行接收。
public class NettyPromise {public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建EventLoopNioEventLoopGroup group = new NioEventLoopGroup();EventLoop eventLoop = group.next();// 创建Promise对象,用于存放结果DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);new Thread(()->{try {TimeUnit.SECONDS.sleep(1);// 自定义线程向Promise中存放结果promise.setSuccess(50);} catch (Exception e) {e.printStackTrace();promise.setFailure(e);}}).start();// 主线程从Promise中获取结果(如果调用setFailure设置的会抛出异常)System.out.println(Thread.currentThread().getName() + " " + promise.get());}
}
执行结果:
3.4 handler & pipeline
pipeline
:类似于流水线,handler则是一道道工序,流动的内容就是要处理的数据。
handler
:handler是最为重要的,之后编写一些业务我们都直接在handler中进行,并且在netty中包含了许多内置的handler给我们简化工作(例如netty提供的StringEncoder是OutBoundHandler,StringDecode是InBoundHandler,日志new LoggingHandler()若是使用了logback需要进行额外配置)。
3.4.1 入站、出站handler执行顺序
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
- 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
- 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
服务端代码:
public class PipeLineServer {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel socketChannel) throws Exception {// 在socketChannel的pipeline中添加handler// pipeline中handler是带有head与tail节点的双向链表,的实际结构为// head <-> handler1 <-> ... <-> handler4 <->tail// Inbound主要处理入站操作,一般为读操作,发生入站操作时会触发Inbound方法// 入站时,handler是从head向后调用的socketChannel.pipeline().addLast("inboundHandler1" ,new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(Thread.currentThread().getName() + " Inbound handler 1");// 父类该方法内部会调用fireChannelRead// 将数据传递给下一个入栈处理器handler。如果不调用,调用链会断开super.channelRead(ctx, msg);// 1}});socketChannel.pipeline().addLast("inboundHandler2", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(Thread.currentThread().getName() + " Inbound handler 2");super.channelRead(ctx, msg);//2}});socketChannel.pipeline().addLast("inboundHandler3", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(Thread.currentThread().getName() + " Inbound handler 3");// 执行write操作,使得Outbound的方法能够得到调用socketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes(StandardCharsets.UTF_8)));//3// 会触发Outbound操作,此时是从当前handler向前寻找OutboundHandler// 如果是最后一个入栈处理器可以不用执行 super.channelRead(ctx, msg)// super.channelRead(ctx, msg);}});// Outbound主要处理出站操作,一般为写操作,发生出站操作时会触发Outbound方法// 出站时,handler的调用是从tail向前调用的socketChannel.pipeline().addLast("outboundHandler4" ,new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(Thread.currentThread().getName() + " Outbound handler 1");super.write(ctx, msg, promise);//4}});socketChannel.pipeline().addLast("outboundHandler5" ,new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(Thread.currentThread().getName() + " Outbound handler 2");super.write(ctx, msg, promise);//5}});socketChannel.pipeline().addLast("outboundHandler6" ,new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(Thread.currentThread().getName() + " Outbound handler 3");super.write(ctx, msg, promise);//6}});}}).bind(8080);}
}
执行结果:
可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。
- ChannelPipeline结构是一个带有head与tail指针的双向链表,其中的节点为handler。要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
- 当有入站(Inbound)操作时,会从head开始向后调用handler,直到handler不是处理Inbound操作为止
- 当有出站(Outbound)操作时,会从tail开始向前调用handler,直到handler不是处理Outbound操作为止
- 入站处理器中,ctx.fireChannelRead(msg) 是 调用下一个入站处理器
- 如果注释掉 1 处代码,则仅会打印 1
- 如果注释掉 2 处代码,则仅会打印 1 2
- 3 处的 ctx.channel().write(msg) 会 从尾部开始触发 后续出站处理器的执行
- 如果注释掉 3 处代码,则仅会打印 1 2 3
- 类似的,出站处理器中,ctx.write(msg, promise) 的调用也会 触发上一个出站处理器
- 如果注释掉 6 处代码,则仅会打印 1 2 3 6
3.4.2 InBoundHandler案例(加工数据)
- 若是想要InBoundHandler依次执行,那么需要调用一个
super.channelRead(ctx, data);
或ctx.fireChannelRead(data);
来进行调用下一个handler,前者源码实际就是调用的后者! - handler之间可以传递数据,那么可以来使用多个handler可以进行对数据加工处理!
- 最后一个InBoundHandler不需要去调用super.channelRead了,因为已经是最后一个执行结果了!
案例目的:通过三个自定义InBoundHandler,来对Bytebuf 进行如Bytebuf -> String对象进行加工处理。
server:
public class InboundHandlerTest {public static void main(String[] args) throws InterruptedException {new ServerBootstrap().group(new NioEventLoopGroup(),new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {//添加入站事件//第一个handler:将ByteBuf => Stringch.pipeline().addLast("h1", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("Inbound handler 1");System.out.println("解析得到的数据:" + msg);ByteBuf buf = (ByteBuf)msg;final String data = buf.toString(Charsets.UTF_8);super.channelRead(ctx, data);//方式一:执行下一个handler}});//第二个handler:将String封装到Result对象中ch.pipeline().addLast("h2", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("Inbound handler 2");System.out.println("解析得到的数据:" + msg);ctx.fireChannelRead(msg);}});}}).bind(8080).sync();System.out.println("服务器启动成功!");}
}
client:
public class MyClient {public static void main(String[] args) throws IOException, InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringEncoder());}})// 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程(NioEventLoop 中的线程).connect(new InetSocketAddress("localhost", 8080));// 该方法用于等待连接真正建立channelFuture.sync();// 获取客户端-服务器之间的Channel对象Channel channel = channelFuture.channel();channel.writeAndFlush("hello world");}
}
3.4.3 OutBoundHandler案例(不同对象发出数据效果不一致)
ctx.channel().write(msg)
VS ctx.write(msg)
- 执行OutBoundHandler的顺序是从后往前依次执行的,对于使用channel来写或者ChannelHandlerContext来写handler的处理也有区别。
- 通过ChannelHandlerContext来发送数据效果,实际会从当前的handler向前开始依次执行handler来进行数据的额外处理,若是原本在该handler之后的boundhandler就不会被执行到!
- 通过channel来写数据,一定会从tail(最后一个handler)开始向前依次执行OutBoundHandler。
- 发送数据一定要发出去bytebuf,若是直接writeAndFlush(“字符串”),服务端不会接收到,除非再添加一个handler处理器也就是StringEncoder(),会将String转为ByteBuf。
Server:对比两种write()的执行顺序
public class OutBoundHandlerTest {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast("inboundHandler1" ,new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(Thread.currentThread().getName() + " Inbound handler 1");super.channelRead(ctx, msg);// 向客户端写数据// 方式一:调用NioSocketChannel来进行发送数据。(从tail末尾向前依次执行outhandler)// socketChannel.writeAndFlush("hello,client!");// 方式二:调用ctx来进行发送数据。(从当前handler向前依次执行outhandler)ctx.writeAndFlush("hello,client");}});socketChannel.pipeline().addLast("outboundHandler4" ,new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(Thread.currentThread().getName() + " Outbound handler 1");super.write(ctx, msg, promise);}});socketChannel.pipeline().addLast("outboundHandler5" ,new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(Thread.currentThread().getName() + " Outbound handler 2");super.write(ctx, msg, promise);}});socketChannel.pipeline().addLast("outboundHandler6" ,new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(Thread.currentThread().getName() + " Outbound handler 3");super.write(ctx, msg, promise);}});}}).bind(8080);}
}
通过channel来发送数据效果
通过ctx,也就是ChannelHandlerContext发送数据效果:
3.4.4 EmbeddedChannel
EmbeddedChannel可以用于测试各个handler,通过其构造函数按顺序传入需要测试handler,然后调用对应的Inbound和Outbound方法即可
public class TestEmbeddedChannel {public static void main(String[] args) {ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("1");super.channelRead(ctx, msg);}};ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("2");super.channelRead(ctx, msg);}};ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("3");super.write(ctx, msg, promise);}};ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("4");super.write(ctx, msg, promise);}};// 用于测试Handler的ChannelEmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);// 执行Inbound操作 channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));System.out.println("======");// 执行Outbound操作channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));}
}
执行结果:
3.5 ByteBuf
netty中的ByteBuf
的容量可以动态扩容,相比较于在NIO中的ByteBuffer一旦指定初始容量之后就无法更改了!若是写入超过容量的数据则会出现覆盖的情况!
3.5.1 创建
public class ByteBufLearn {public static void main(String[] args) {// 创建byteBuf(直接内存,而不是堆内存),默认大小是256,通过该方式创建的ByteBuf对象会自动扩容,与nio的ByteBuffer不一样,nio达到融期限之后,会报错ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(32);// 使用ByteBufAllocator创建直接内容,直接内容不需要考虑GC回收,默认获取的 byteBuf 便是直接内存,需要主动释放关闭// ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer();// 使用ByteBufAllocator创建堆内存// ByteBuf heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer();ByteBuffer byteBuffer = ByteBuffer.allocate(32);// 往byteBuf里面写入内容StringBuilder stringBuilder = new StringBuilder();for (int i = 0; i < 32; i++) {stringBuilder.append(i).append("-");}// 往ByteBuf对象中写入数据,当达到容量界限之后,会自动扩容byteBuf.writeBytes(stringBuilder.toString().getBytes(StandardCharsets.UTF_8));ByteBufUtil.log(byteBuf);// 往nio的byteBuffer对象中写入数据,达到界限之后,会抛出 BufferOverflowException 的异常byteBuffer.put(stringBuilder.toString().getBytes(StandardCharsets.UTF_8));}
}
运行结果:
-
ByteBuf通过
ByteBufAllocator
选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小 -
当ByteBuf的容量无法容纳所有数据时,ByteBuf会进行扩容操作。而nio的byteBuffer对象中写入数据,达到界限之后,会抛出 BufferOverflowException 的异常
-
如果在handler中创建ByteBuf,建议使用
ChannelHandlerContext ctx.alloc().buffer()
来创建
3.5.2 直接内存与堆内存
堆内存与直接内存区别:
-
堆内存的分配效率比较高,但是读写内存的效率比较低
-
直接内存分配效率比较低,但是读写效率高(少一次内存复制),适合配合池化功能一起用。直接内存使用的是系统内存
- 直接内存使用的是系统内存,若是从磁盘中读取文件时会将数据直接读入到系统内存,那么系统内存呢就会用直接内存的方式映射到java内存中,java里面访问的和操作系统访问的是同一块内存,那么就可以减少一次内存的复制,所以读取效率会高于堆内存。
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
创建基于直接内存的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
创建基于堆的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);
3.5.3 池化与非池化
池化的最大意义在于可以重用 ByteBuf,优点:
- 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
- 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出的可能
4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
4.1 之前,池化功能还不成熟,默认是非池化实现
池化功能是否开启,可以通过下面的系统环境变量来设置
// unpooled:非池化 pooled:池化
-Dio.netty.allocator.type={unpooled|pooled}
3.5.4 组成
ByteBuf由四部分组成
ByteBuf 是一个字节容器,容器里面的的数据分为四个部分
-
第一个部分是已经丢弃的字节,这部分数据是无效的;(已经读过的内容)
-
第二部分是可读字节,这部分数据是 ByteBuf 的主体数据, 从 ByteBuf 里面读取的数据都来自这一部分;(已经写入但还未读取的内容)
-
第三部分数据是可写字节,所有写到 ByteBuf 的数据都会写到这一段;(剩余可写入数据的空间大小)
-
最后一部分表示的是该 ByteBuf 最多还能扩容多少容量
四个重要属性:
- readerIndex(读指针):指示读取的起始位置
- writerIndex(写指针):指示写入的起始位置
- capacity(当前容量):当前容量。当向 ByteBuf 写数据的时候,如果容量不足,那么这个时候可以进行扩容,直到 capacity 扩容到 maxCapacity,超过 maxCapacity 就会报错。
- maxCapacity(最大容量):表示ByteBuf可以扩容的最大容量。
3.5.5 写入
方法列表,省略一些不重要的方法
方法签名 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) | 写入 boolean 值 | 用一字节 01|00 代表 true|false |
writeByte(int value) | 写入 byte 值 | |
writeShort(int value) | 写入 short 值 | |
writeInt(int value) | 写入 int 值 | Big Endian,即 0x250,写入后 00 00 02 50 |
writeIntLE(int value) | 写入 int 值 | Little Endian,即 0x250,写入后 50 02 00 00 |
writeLong(long value) | 写入 long 值 | |
writeChar(int value) | 写入 char 值 | |
writeFloat(float value) | 写入 float 值 | |
writeDouble(double value) | 写入 double 值 | |
writeBytes(ByteBuf src) | 写入 netty 的 ByteBuf | |
writeBytes(byte[] src) | 写入 byte[] | |
writeBytes(ByteBuffer src) | 写入 nio 的 ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) | 写入字符串 |
注意
- 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用
- 网络传输,默认习惯是 Big Endian
代码示例:
public class ByteBufStudy {public static void main(String[] args) {// 创建ByteBufByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);ByteBufUtil.log(buffer);// 向buffer中写入数据buffer.writeBytes(new byte[]{1, 2, 3, 4});ByteBufUtil.log(buffer);buffer.writeInt(5);ByteBufUtil.log(buffer);buffer.writeIntLE(6);ByteBufUtil.log(buffer);buffer.writeLong(7);ByteBufUtil.log(buffer);}
}
运行结果:
还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置
3.5.6 扩容
从上方写入数据示例根据结果可以看到,在如下的代码中产生了动态扩容操作
buffer.writeLong(7);
ByteBufUtil.log(buffer);
扩容规则:
- 如果需要的容量等于门限阈值,则直接使用阈值作为新的缓存区容量。
- 如果需要的容量大于阈值,则采用每次步进4MB的方式进行内存扩张,即将需要扩容值除以4MB后乘以4MB,然后将结果与最大容量进行比较,取其中的较小值作为目标容量。
- 如果需要的容量小于阈值,则采用倍增的方式,以64字节作为基本数值,每次翻倍增长(如64,128,256…),直到倍增后的结果大于或等于所需的容量值。
- 扩容不能超过 maxCapacity,否则会抛出
java.lang.IndexOutOfBoundsException
异常
3.5.7 读取
参数 | 含义 |
---|---|
buffer.readByte() | 每次读取一个字节 |
buffer.readInt() | 每次读取一个整数,也就是四个字节 |
buffer.markReaderIndex() | 为读指针做一个标记,配合下面的方法可以实现重复读取某个数 |
buffer.resetReaderIndex() | 将读指针跳到上一个标记过的地方实现重复读取某个数 |
除了上面一些了read开头的方法以外,还有一系列get开头的方法也可以读取数据,只不过get开头的方法不会改变读指针位置。相当于是按索引去获取。
示例:如果需要重复读取,需要调用buffer.markReaderIndex()
对读指针进行标记,并通过buffer.resetReaderIndex()
将读指针恢复到mark标记的位置
public class ByteBufReadTest {public static void main(String[] args) {// 创建ByteBufByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);// 向buffer中写入数据buffer.writeBytes(new byte[]{1, 2, 3, 4});buffer.writeInt(5);ByteBufUtil.log(buffer);// 读取4个字节System.out.println(buffer.readByte());System.out.println(buffer.readByte());System.out.println(buffer.readByte());System.out.println(buffer.readByte());ByteBufUtil.log(buffer);// 通过mark与reset实现重复读取buffer.markReaderIndex();System.out.println(buffer.readInt());ByteBufUtil.log(buffer);// 恢复到mark标记处buffer.resetReaderIndex();ByteBufUtil.log(buffer);}
}
执行结果:
3.5.8 内存回收(retain & release)
由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
- UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
- UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
- PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
- 每个 ByteBuf 对象的初始计数为 1
- 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
- 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
- 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
内存回收规则:
- 因为
pipeline
的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则是,谁是最后使用者,谁负责 release,详细分析如下
- 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
- 入站 ByteBuf 处理原则
- 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
- 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
- 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
- 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
- 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
- 出站 ByteBuf 处理原则
- 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
- 异常处理原则
- 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
TailContext 释放未处理消息逻辑
// io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object)
protected void onUnhandledInboundMessage(Object msg) {try {logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. " +"Please check your pipeline configuration.", msg);} finally {ReferenceCountUtil.release(msg);}
}
当ByteBuf被传到了pipeline的head与tail时,ByteBuf会被其中的方法彻底释放,但前提是ByteBuf被传递到了head与tail中
具体代码
// io.netty.util.ReferenceCountUtil#release(java.lang.Object)
public static boolean release(Object msg) {if (msg instanceof ReferenceCounted) {return ((ReferenceCounted) msg).release();}return false;
}
3.5.9 零拷贝
切片
- 【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针,修改子分片,会修改原ByteBuf。
- 得到分片后的buffer后,要调用其retain方法,使其内部的引用计数加一。避免原ByteBuf释放,导致切片buffer无法使用
public class TestSlice {public static void main(String[] args) {// 创建ByteBufByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);// 向buffer中写入数据buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});// 将buffer分成两部分。在切片过程中,没有发生数据复制// 注意此时切片最大容量做了限制,不需要新增数据ByteBuf slice1 = buffer.slice(0, 5);ByteBuf slice2 = buffer.slice(5, 5);// 需要让分片的buffer引用计数加一// 避免原Buffer释放导致分片buffer无法使用slice1.retain();slice2.retain();ByteBufUtil.log(slice1);ByteBufUtil.log(slice2);// 释放原有byteBuf 内存//若是在release()之后也想正常使用,可以在此之前使用retain()进行引用+1,release()相对于会引用-1,此时就不会真正释放内存,自然也就能欧使用buffer.release();ByteBufUtil.log(slice1);// 更改原始buffer中的值System.out.println("===========修改原buffer中的值===========");buffer.setByte(0,5);System.out.println("===========打印slice1===========");ByteBufUtil.log(slice1);}
}
运行结果:
注意:slice后的分片,不能再次写入新的数据,这会影响原ByteBuf。
duplicate:整块
【零拷贝】的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的。
代码示例:
public class ByteBufDuplicateDemo {public static void main(String[] args) {ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);byteBuf.writeBytes(new byte[]{1,2,3,4,5,6,7,8,9,0});// 拷贝一块bufByteBuf duplicate = byteBuf.duplicate();ByteBufUtil.log(duplicate);// 将最后一位0修改成10duplicate.setByte(9,10);// 打印byteBufByteBufUtil.log(byteBuf);// 写入新数据11duplicate.writeByte(11);// 打印byteBufByteBufUtil.log(byteBuf);}}
运行结果:
CompositeByteBuf:组装ByteBuf
【零拷贝】的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝。
CompositeByteBuf
是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。
- 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
- 缺点,复杂了很多,多次操作会带来性能的损耗
代码示例:将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝。注意要设置true来让其调整读,写指针。
public class TestCompositeByteBuf {public static void main(String[] args) {ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});ByteBufUtil.log(buf1);ByteBufUtil.log(buf2);// 效率较低方案:直接通过writeBytes()写入字节方式写入// ByteBufUtil.log(ByteBufAllocator.DEFAULT.buffer(20).writeBytes(buf1).writeBytes(buf2));// 零拷贝:合并两个Buffer到一个Buffer中,使用的共享内存CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();// true 表示增加新的 ByteBuf 自动递增 write index, 否则 write index 会始终为 0buf3.addComponents(true, buf1, buf2);System.out.println("结果:");ByteBufUtil.log(buf3);}
}
执行结果:
Unpooled
- Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作
- 这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf。
代码示例:
public class UnpooledTest {public static void main(String[] args) {ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});// 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBufByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);buf3.setByte(0,6);ByteBufUtil.log(buf3);}
}
执行结果:
3.5.10 copy:深度拷贝
ByteBuf提供了copy方法,这一类方法是真正的拷贝原ByteBuf到新的内存,返回一个新的ByteBuf,与原ByteBuf没有关系。
提供两个拷贝:
- 一个是全量。
public abstract ByteBuf copy();
- 一个指定位置和长度。public abstract ByteBuf copy(int index, int length);
代码示例:
public class ByteBufCopyDemo {public static void main(String[] args) {ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);byteBuf.writeBytes(new byte[]{1,2,3,4,5,6,7,8,9,0});ByteBuf copy1 = byteBuf.copy();ByteBufUtil.log(copy1);ByteBuf copy2 = byteBuf.copy(5, 5);ByteBufUtil.log(copy2);}
}
结果:
3.5.11 ByteBuf 优势
- 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
- 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
- 可以自动扩容
- 支持链式调用,使用更流畅
- 很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf