Netty学习(一)——基础组件

根据黑马程序员netty视频教程学习所做笔记。

笔记demo:https://gitee.com/jobim/netty_learn_demo.git

参考博客:https://blog.csdn.net/cl939974883/article/details/122550345

一、概述

1.1 什么是Netty

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.Copy

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

  • 异步是一种独特的网络模型,在这里的指的是调用时的异步与异步IO不同(netty使用多线程来完成方法的调用和处理结果相分离),是指的方法调用和处理结果交由多个线程来进行处理的方式(调用方法的线程可以腾出手来做其他的事情),依旧是基于多路复用。
  • 事件驱动指的是底层采用的是多路复用技术,也就是selector,当发生响应请求时才会被处理

1.2 Netty的地位

Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位

以下的框架都使用了 Netty,因为它们有网络通信需求!

  • Cassandra - nosql 数据库
  • Spark - 大数据分布式计算框架
  • Hadoop - 大数据分布式存储框架
  • RocketMQ - ali 开源的消息队列
  • ElasticSearch - 搜索引擎
  • gRPC - rpc 框架
  • Dubbo - rpc 框架
  • Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端
  • Zookeeper - 分布式协调框架

1.3 Netty的优势

  • Netty vs NIO
    • 传统NIO,工作量大,bug 多。需要自己构建协议
    • epoll 空轮询导致 CPU 100%。(netty通过一些方式解决了这个bug)
    • Netty,解决 TCP 传输问题,如粘包、半包
    • Netty对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer
  • Netty vs 其它网络应用框架
    • Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀
    • 久经考验,16年,Netty 版本
      • 2.x 2004
      • 3.x 2008
      • 4.x 2013
      • 5.x 已废弃(没有明显的性能提升,维护成本高)

二、netty入门程序Hello World

2.1 代码示例

目标:客户端向服务端发送一个"helloworld",服务器进行接收打印!

加入依赖:

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.39.Final</version>
</dependency>

服务器端:

public class HelloServer {public static void main(String[] args) {// 1、启动器,负责装配netty组件,启动服务器new ServerBootstrap()// 2、创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector.group(new NioEventLoopGroup())// 3、选择服务器的ServerSocketChannel实现。其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现,其它实现还有.channel(NioServerSocketChannel.class)// 4、child 负责处理读写,该方法决定了 child 执行哪些操作// 是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel// ChannelInitializer 处理器(仅执行一次)// 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {System.out.println("initChannel");// 5、SocketChannel的处理器,使用StringDecoder解码,ByteBuf=>StringnioSocketChannel.pipeline().addLast(new StringDecoder());// 6、SocketChannel的业务处理,使用上一个处理器的处理结果nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {System.out.println(s);}});}// 7、ServerSocketChannel绑定8080端口}).bind(8080);}
}

客户端代码:

public class HelloClient {public static void main(String[] args) throws InterruptedException {new Bootstrap()// 创建 NioEventLoopGroup,同 Server.group(new NioEventLoopGroup())// 选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现.channel(NioSocketChannel.class)// ChannelInitializer 处理器(仅执行一次)// 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {System.out.println("initChannel");// 消息会经过通道 handler 处理,这里是将 String => ByteBuf 编码发出channel.pipeline().addLast(new StringEncoder());}})// 指定要连接的服务器和端口.connect(new InetSocketAddress("localhost", 8080))// Netty 中很多方法都是异步的,如 connect// 这时需要使用 sync 方法等待 connect 建立连接完毕.sync()// 获取 channel 对象,它即为通道抽象,可以进行数据读写操作.channel()// 写入消息并清空缓冲区.writeAndFlush("hello world");}
}

执行结果:

image-20240619224824799

2.2 流程梳理

step1:服务端server启动:

  • 首先会创建group组
  • 接着指定channel实现类(这里是serversocketchannel,其中会处理accept()事件),并且来添加一些handler处理器。这里的添加的是初始化handler,该handler会在客户端发起连接时执行初始化操作也就是方法内内容。
  • 监听端口。

step2:客户端client启动

  • 同样创建group组。
  • 指定连接的channel。同样也添加了一个初始化处理器,该处理器同样也在连接建立之后会被执行init方法。
  • 执行connect(),发起连接(下面经过debug测试)
    • 首先触发自己客户端的initChannel()事件执行初始化,这里添加了一个编码器(用于将发送的字符串=>ByteBuf传输出去)
    • 接着触发server的initchannel来为pipeline(流水线)添加一些必要工序操作,这里添加了一个字符串解码器(用于接收客户端数据后将ByteBuf=>String);还有一个是InBound适配器,可进行一系列事件的自定义重写,这里的话重写了read()事件,之后客户端发送数据就会执行我们自定义的内容。
  • 紧接着连接完毕之后sync()取到连接对象也就是之前定义的NioSocketChannel,取到之后向服务器发送一个字符串
    • 发送过程中会先走StringEncoder中的编码方法,将String=>ByteBuf之后发送出去
    • 接着服务端的read()事件接收好之后,同样也会走StringDecoder中的解码方法,将ByteBuf=>String,接着会执行channelRead()方法,其中的msg就是转换之后的字符串,我们这里仅仅只是打印即可!

组件介绍:

  • 把 channel 理解为数据的通道
  • 把 msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 的加工,会变成其它类型对象,最后输出又变成 ByteBuf
  • 把 handler 理解为数据的处理工序
    • 工序有多道,合在一起就是 pipeline,pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
    • handler 分 Inbound 和 Outbound 两类
  • 把 eventLoop 理解为处理数据的工人
    • 工人可以管理多个 channel 的 io 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)
    • 工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
    • 工人按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人

三、组件

3.1 EventLoop

3.1.1、认识EventLoop和EventLoopGroup

事件循环对象 EventLoop

  • EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理一个或多个 Channel 上源源不断的 io 事件

  • image-20240620231402480

  • 它的继承关系如下

    • 继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法

    • 继承自 netty 自己的 OrderedEventExecutor

      • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
        • 提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup

事件循环组 EventLoopGroup

  • EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop
3.1.2、执行普通、定时任务

目的:通过NioEventLoopGroup事件循环组来去执行普通和定时任务。

public class TestEventLoop {public static void main(String[] args) {// 创建拥有两个EventLoop的NioEventLoopGroup,对应两个线程EventLoopGroup group = new NioEventLoopGroup(2);// 通过next方法可以获得下一个 EventLoopSystem.out.println(group.next());System.out.println(group.next());// 第三次调用的和第一次一样(初始化两个的情况)System.out.println(group.next());// 通过EventLoop执行普通任务group.next().execute(()->{System.out.println(Thread.currentThread().getName() + " hello");});// 通过EventLoop执行定时任务group.next().scheduleAtFixedRate(()->{System.out.println(Thread.currentThread().getName() + " hello2");}, 0, 1, TimeUnit.SECONDS);// 优雅地关闭// group.shutdownGracefully();}
}

结果:

image-20240620232314965

对于demo中主线程结束了还能运行的原因是,线程中开辟的用户线程依旧在运行中。

  • 分析:ThreadPoolExecutor中的runWorker方法里有一个getTask()方法,该方法不断从队列中拿任务执行,没有就阻塞,这也就是为什么主线程结束了,程序依旧在运行中的原因。

关闭 EventLoopGroup:

优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的

3.1.3、执行IO任务(含2点细化)

服务端代码:

public class MyIOServer {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));}});}}).bind(8080);}
}

客户端代码:

public class MyIOClient {public static void main(String[] args) throws IOException, InterruptedException {Channel channel = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress("localhost", 8080)).sync().channel();System.out.println(channel);channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("zhangsan3".getBytes()));Thread.sleep(2000);channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("lisi3".getBytes()));// 此处打断点调试,调用 channel.writeAndFlush(...);System.in.read();}
}

多个客户端执行:分别修改发送字符串为 zhangsan1(第一次),zhangsan2(第二次),zhangsan3(第三次)

image-20240620233455102

每当来临一个连接,此时就会将该channel去绑定到指定的一个EventLoop中的selector中,每个NioEventLoop都是一个线程,之后该channel的其他事件都由这个EventLoop来去处理执行,这就与我们之前手写多线程NIO多路复用的思路完全一致

image-20240620233545686

分工细化

  1. Bootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理不同的事件。

    • Boos、worker各指定一个组,Boos只负责serversocketchannel的accept监听,worker负责建立连接后得到的channel均衡绑定到各个eventloop的selector上。
  2. 若是执行handler中间有一些较耗时的操作,那么可以添加一个新的handler并交由一个处理普通事件的eventloop来进行异步处理!

/*** 1:细化工作组。* 2:耗时较长的任务交给指定组进行异步执行*/
public class MyServerV2 {public static void main(String[] args) {// 增加自定义的非NioEventLoopGroupEventLoopGroup group = new DefaultEventLoopGroup();new ServerBootstrap()// 两个Group,分别为Boss 负责Accept事件,Worker 负责读写事件//1. Boss对应一个组(不用传递参数也没事),负责NioServerSocketChannel的accept监听;//          worker对应一个组,之后来临连接的channel都会绑定其某个EventLoop.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 增加两个handler,第一个使用NioEventLoopGroup处理,第二个使用自定义EventLoopGroup处理socketChannel.pipeline().addLast("nioHandler",new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));// 调用下一个handlerctx.fireChannelRead(msg);}})// 2.该handler绑定自定义的Group.addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));}});}}).bind(8080);}
}

执行结果:分别修改发送字符串为 zhangsan1(第一次),zhangsan2(第二次),zhangsan3(第三次)

image-20240620235326283

可以看出,客户端与服务器之间的事件,被nioEventLoopGroup和defaultEventLoopGroup分别处理

image-20240620235343399

3.1.4、源码分析(不同eventLoop,线程如何切换)

关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程EventExecutor executor = next.executor();// 返回下一个handler的eventLoop// 当前handler中的线程,是否和eventLoop是同一个线程if (executor.inEventLoop()) {// 是,直接调用next.invokeChannelRead(m);} // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}
}
  • 如果两个 handler 绑定的是同一个线程,那么就直接调用
  • 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用

3.2 Channel

Channel 的常用方法:

  • close() 可以用来关闭Channel
  • closeFuture() 用来处理 Channel 的关闭
    • sync 方法作用是同步等待 Channel 关闭
    • 而 addListener 方法是异步等待 Channel 关闭
  • pipeline() 方法用于添加处理器
  • write() 方法将数据写入
    • 因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
    • 只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
  • writeAndFlush() 方法将数据写入并立即发送(刷出)

3.2.1 ChannelFuture连接建立(同步、异步)

ChannelFuture作用:专门用于记录异步方法状态的返回结果。

public class MyClient {public static void main(String[] args) throws IOException, InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringEncoder());}})// 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程// NIO线程:NioEventLoop 中的线程.connect(new InetSocketAddress("localhost", 8080));// 该方法用于等待连接真正建立channelFuture.sync();// 获取客户端-服务器之间的Channel对象Channel channel = channelFuture.channel();channel.writeAndFlush("hello world");System.in.read();}
}

如果我们去掉channelFuture.sync()方法,会服务器无法收到hello world

这是因为建立连接(connect)的过程是异步非阻塞的,若不通过sync()方法阻塞主线程(调用线程),等待连接真正建立,这时通过 channelFuture.channel() 拿到的 Channel 对象,还未真正与服务器建立好连接,也就没法将信息正确的传输给服务器端

解决方法:

  • 方法一:所以需要通过channelFuture.sync()方法,阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程

    //方式一:同步阻塞等待连接
    //阻塞方法,直到连接建立之后再会停止阻塞继续向下执行。
    // 若是不调用该方法,直接去获取channel来发送数据,很有可能因为没有建立好连接导致发送失败
    channelFuture.sync();//底层源码保护性暂停,主线程await(),另一个线程创建成功之后唤醒
    Channel channel = channelFuture.channel();
    log.info("channel {}",channel);
    //测试:channel.writeAndFlush("hello")
  • 方法二:用于异步获取建立连接后的 Channel 和发送数据,使得执行这些操作的线程是 NIO(eventLoop) 线程(去执行connect操作的线程)

    //方式二:添加一个监听器,来异步处理结果
    channelFuture.addListener(new ChannelFutureListener() {//当连接完成就会执行该回调方法:执行完成事件,其中channelFuture就是本身对象@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {Channel channel = channelFuture.channel();log.info("channel {}",channel);channel.writeAndFlush("hello!");}
    });

代码示例:

public class MyClient {public static void main(String[] args) throws IOException, InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringEncoder());}})// 1.链接到服务器// 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程(NioEventLoop 中的线程).connect(new InetSocketAddress("localhost", 8080));// 2.1 使用sync方法同步处理结果// 该方法用于等待连接真正建立channelFuture.sync();// 获取客户端-服务器之间的Channel对象Channel channel = channelFuture.channel();channel.writeAndFlush("hello world");System.in.read();// 2.2 使用addListener(回调对象)方法一步处理结果channelFuture.addListener(new ChannelFutureListener() {@Override// 当connect方法执行完毕后,也就是连接真正建立后,会在NIO线程中调用operationComplete方法public void operationComplete(ChannelFuture channelFuture) throws Exception {Channel channel = channelFuture.channel();channel.writeAndFlush("hello world");}});System.in.read();}
}

3.2.2 CloseFuture连接关闭(同步、异步)

当我们要关闭channel时,可以调用channel.close()方法进行关闭。但是该方法也是一个异步方法,调用方法返回一个ChannelFuture。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作

如果我们想在channel真正关闭以后,执行一些额外的操作,可以选择以下两种方法来实现

  • 通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作

    // 获得closeFuture对象
    ChannelFuture closeFuture = channel.closeFuture();// 同步等待NIO线程执行完close操作
    closeFuture.sync();
    // 关闭EventLoopGroup
    group.shutdownGracefully();
    
  • 调用closeFuture.addListener方法,添加close的后续操作

    closeFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {// 等待channel关闭后才执行的操作System.out.println("关闭之后执行一些额外操作...");// 关闭EventLoopGroupgroup.shutdownGracefully();}
    });
    

代码示例:

public class CloseFutureClient {public static void main(String[] args) throws InterruptedException {// 创建EventLoopGroup,使用完毕后关闭NioEventLoopGroup group = new NioEventLoopGroup();ChannelFuture channelFuture = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 可以打印channel的一些运行流程、状态信息。需要配置logbacksocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));socketChannel.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress("localhost", 8080));channelFuture.sync();Channel channel = channelFuture.channel();Scanner scanner = new Scanner(System.in);// 创建一个线程用于输入并向服务器发送new Thread(()->{while (true) {String msg = scanner.next();if ("q".equals(msg)) {// 关闭操作是异步的,在NIO线程中执行channel.close();break;}channel.writeAndFlush(msg);}}, "inputThread").start();// 获得closeFuture对象ChannelFuture closeFuture = channel.closeFuture();System.out.println("waiting close...");// 方式一:同步等待NIO线程执行完close操作closeFuture.sync();// 关闭之后执行一些操作,可以保证执行的操作一定是在channel关闭以后执行的System.out.println("关闭之后执行一些额外操作...");// 关闭EventLoopGroupgroup.shutdownGracefully();//方式2:异步处理关闭结果// closeFuture.addListener(new ChannelFutureListener() {//     @Override//     public void operationComplete(ChannelFuture future) throws Exception {//         System.out.println("处理关闭之后的操作");//         // 关闭EventLoopGroup//         group.shutdownGracefully();//     }// });}
}

3.2.3 为什么netty要用异步?异步提升了什么?

结论:对每个操作步骤进行合理的拆解并且通过多线程+异步执行,在一定时间内能够提升吞吐量,但是对于总体响应时间不减反增。(这里吞吐量实际上我们可以看成来建立连接处理的个数!)

3.3 Future & Promise

在异步处理时,经常用到这两个接口。netty的future继承了JDK的future;netty的promise继承了netty的future。

3.3.1、介绍Future与Promise

说明: netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回 null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
setSuccess--设置成功结果
setFailure--设置失败结果

本质都是等待唤醒机制,这个机制一个应用就是保护性暂停,另一个就是生产者消费者,都是线程通信。

额外:

  • 对于promise,netty比es6出来早
  • jdk中的future不能够区分任务是成功还是失败!
  • future就是在线程间传递一个结果或者传递一个数据的容器。
  • 该future中的数据是由执行任务的线程来进行填充进去的,我们自己没有机会去填,之后我们可以使用promise来去自己填充进去!

3.3.2、JDK的Future示例(线程间取值)

案例目的:主线程中获取线程池中某个线程处理任务的结果!

public class JdkFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadFactory factory = new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "JdkFuture");}};// 创建线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), factory);// 获得Future对象Future<Integer> future = executor.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {TimeUnit.SECONDS.sleep(1);return 50;}});// 通过阻塞的方式,获得运行结果System.out.println("运行结果:"+future.get());}
}

3.3.3、netty的Future示例(同步、异步)

public class NettyFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();// 获得 EventLoop 对象EventLoop eventLoop = group.next();Future<Integer> future = eventLoop.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {System.out.println(Thread.currentThread().getName() + " 执行任务...");return 50;}});// 主线程中获取结果System.out.println(Thread.currentThread().getName() + " 获取结果");// 获取任务结果,非阻塞,还未产生结果时返回 nullSystem.out.println("getNow " + future.getNow());// 方式一:同步取得结果(主线程阻塞获取)// System.out.println("get " + future.get());// NIO线程中异步获取结果future.addListener(new GenericFutureListener<Future<? super Integer>>() {@Overridepublic void operationComplete(Future<? super Integer> future) throws Exception {System.out.println(Thread.currentThread().getName() + " 获取结果");System.out.println("getNow " + future.getNow());}});}
}

同步方法执行:

image-20240623162005117

异步方法执行:可以看到执行任务和获取结果都是同一个线程处理

image-20240623161943016

3.3.4、netty的promise示例

描述

  • 前面的future不能主动来装数据
  • 使用promise可以准确的知道数据是处理正常还是异常!
  • 开发网络框架,例如RPC,Promise的重要性比较大
  • setSuccess()表示结果正确,setFailure(e)表示结果不正确会抛出异常!

案例目的:通过使用promise来去表示执行某个任务的结果是成功还是失败!主线程可以来进行接收。

public class NettyPromise {public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建EventLoopNioEventLoopGroup group = new NioEventLoopGroup();EventLoop eventLoop = group.next();// 创建Promise对象,用于存放结果DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);new Thread(()->{try {TimeUnit.SECONDS.sleep(1);// 自定义线程向Promise中存放结果promise.setSuccess(50);} catch (Exception e) {e.printStackTrace();promise.setFailure(e);}}).start();// 主线程从Promise中获取结果(如果调用setFailure设置的会抛出异常)System.out.println(Thread.currentThread().getName() + " " + promise.get());}
}

执行结果:

image-20240623163159363

3.4 handler & pipeline

pipeline:类似于流水线,handler则是一道道工序,流动的内容就是要处理的数据。

handler:handler是最为重要的,之后编写一些业务我们都直接在handler中进行,并且在netty中包含了许多内置的handler给我们简化工作(例如netty提供的StringEncoder是OutBoundHandler,StringDecode是InBoundHandler,日志new LoggingHandler()若是使用了logback需要进行额外配置)。

3.4.1 入站、出站handler执行顺序

ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline

  • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工

服务端代码:

public class PipeLineServer {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel socketChannel) throws Exception {// 在socketChannel的pipeline中添加handler// pipeline中handler是带有head与tail节点的双向链表,的实际结构为// head <-> handler1 <-> ... <-> handler4 <->tail// Inbound主要处理入站操作,一般为读操作,发生入站操作时会触发Inbound方法// 入站时,handler是从head向后调用的socketChannel.pipeline().addLast("inboundHandler1" ,new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(Thread.currentThread().getName() + " Inbound handler 1");// 父类该方法内部会调用fireChannelRead// 将数据传递给下一个入栈处理器handler。如果不调用,调用链会断开super.channelRead(ctx, msg);// 1}});socketChannel.pipeline().addLast("inboundHandler2", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(Thread.currentThread().getName() + " Inbound handler 2");super.channelRead(ctx, msg);//2}});socketChannel.pipeline().addLast("inboundHandler3", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(Thread.currentThread().getName() + " Inbound handler 3");// 执行write操作,使得Outbound的方法能够得到调用socketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes(StandardCharsets.UTF_8)));//3// 会触发Outbound操作,此时是从当前handler向前寻找OutboundHandler// 如果是最后一个入栈处理器可以不用执行 super.channelRead(ctx, msg)// super.channelRead(ctx, msg);}});// Outbound主要处理出站操作,一般为写操作,发生出站操作时会触发Outbound方法// 出站时,handler的调用是从tail向前调用的socketChannel.pipeline().addLast("outboundHandler4" ,new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(Thread.currentThread().getName() + " Outbound handler 1");super.write(ctx, msg, promise);//4}});socketChannel.pipeline().addLast("outboundHandler5" ,new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(Thread.currentThread().getName() + " Outbound handler 2");super.write(ctx, msg, promise);//5}});socketChannel.pipeline().addLast("outboundHandler6" ,new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(Thread.currentThread().getName() + " Outbound handler 3");super.write(ctx, msg, promise);//6}});}}).bind(8080);}
}

执行结果:

image-20240623171458487

可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。

  • ChannelPipeline结构是一个带有head与tail指针的双向链表,其中的节点为handler。要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
  • 当有入站(Inbound)操作时,会从head开始向后调用handler,直到handler不是处理Inbound操作为止
  • 当有出站(Outbound)操作时,会从tail开始向前调用handler,直到handler不是处理Outbound操作为止

image-20240623172053482

  • 入站处理器中,ctx.fireChannelRead(msg) 是 调用下一个入站处理器
    • 如果注释掉 1 处代码,则仅会打印 1
    • 如果注释掉 2 处代码,则仅会打印 1 2
  • 3 处的 ctx.channel().write(msg) 会 从尾部开始触发 后续出站处理器的执行
    • 如果注释掉 3 处代码,则仅会打印 1 2 3
  • 类似的,出站处理器中,ctx.write(msg, promise) 的调用也会 触发上一个出站处理器
    • 如果注释掉 6 处代码,则仅会打印 1 2 3 6

3.4.2 InBoundHandler案例(加工数据)

  1. 若是想要InBoundHandler依次执行,那么需要调用一个super.channelRead(ctx, data);ctx.fireChannelRead(data);来进行调用下一个handler,前者源码实际就是调用的后者!
  2. handler之间可以传递数据,那么可以来使用多个handler可以进行对数据加工处理!
  3. 最后一个InBoundHandler不需要去调用super.channelRead了,因为已经是最后一个执行结果了!

案例目的:通过三个自定义InBoundHandler,来对Bytebuf 进行如Bytebuf -> String对象进行加工处理。

server:

public class InboundHandlerTest {public static void main(String[] args) throws InterruptedException {new ServerBootstrap().group(new NioEventLoopGroup(),new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {//添加入站事件//第一个handler:将ByteBuf => Stringch.pipeline().addLast("h1", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("Inbound handler 1");System.out.println("解析得到的数据:" + msg);ByteBuf buf = (ByteBuf)msg;final String data = buf.toString(Charsets.UTF_8);super.channelRead(ctx, data);//方式一:执行下一个handler}});//第二个handler:将String封装到Result对象中ch.pipeline().addLast("h2", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("Inbound handler 2");System.out.println("解析得到的数据:" + msg);ctx.fireChannelRead(msg);}});}}).bind(8080).sync();System.out.println("服务器启动成功!");}
}

client:

public class MyClient {public static void main(String[] args) throws IOException, InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringEncoder());}})// 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程(NioEventLoop 中的线程).connect(new InetSocketAddress("localhost", 8080));// 该方法用于等待连接真正建立channelFuture.sync();// 获取客户端-服务器之间的Channel对象Channel channel = channelFuture.channel();channel.writeAndFlush("hello world");}
}

image-20240624105951904

3.4.3 OutBoundHandler案例(不同对象发出数据效果不一致)

ctx.channel().write(msg) VS ctx.write(msg)

  1. 执行OutBoundHandler的顺序是从后往前依次执行的,对于使用channel来写或者ChannelHandlerContext来写handler的处理也有区别。
  2. 通过ChannelHandlerContext来发送数据效果,实际会从当前的handler向前开始依次执行handler来进行数据的额外处理,若是原本在该handler之后的boundhandler就不会被执行到!
  3. 通过channel来写数据,一定会从tail(最后一个handler)开始向前依次执行OutBoundHandler。
  4. 发送数据一定要发出去bytebuf,若是直接writeAndFlush(“字符串”),服务端不会接收到,除非再添加一个handler处理器也就是StringEncoder(),会将String转为ByteBuf。

Server:对比两种write()的执行顺序

public class OutBoundHandlerTest {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast("inboundHandler1" ,new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(Thread.currentThread().getName() + " Inbound handler 1");super.channelRead(ctx, msg);// 向客户端写数据// 方式一:调用NioSocketChannel来进行发送数据。(从tail末尾向前依次执行outhandler)// socketChannel.writeAndFlush("hello,client!");// 方式二:调用ctx来进行发送数据。(从当前handler向前依次执行outhandler)ctx.writeAndFlush("hello,client");}});socketChannel.pipeline().addLast("outboundHandler4" ,new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(Thread.currentThread().getName() + " Outbound handler 1");super.write(ctx, msg, promise);}});socketChannel.pipeline().addLast("outboundHandler5" ,new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(Thread.currentThread().getName() + " Outbound handler 2");super.write(ctx, msg, promise);}});socketChannel.pipeline().addLast("outboundHandler6" ,new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(Thread.currentThread().getName() + " Outbound handler 3");super.write(ctx, msg, promise);}});}}).bind(8080);}
}

通过channel来发送数据效果

image-20240624114715117

通过ctx,也就是ChannelHandlerContext发送数据效果:

image-20240624114852036

image-20240624115419260

3.4.4 EmbeddedChannel

EmbeddedChannel可以用于测试各个handler,通过其构造函数按顺序传入需要测试handler,然后调用对应的Inbound和Outbound方法即可

public class TestEmbeddedChannel {public static void main(String[] args) {ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("1");super.channelRead(ctx, msg);}};ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("2");super.channelRead(ctx, msg);}};ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("3");super.write(ctx, msg, promise);}};ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("4");super.write(ctx, msg, promise);}};// 用于测试Handler的ChannelEmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);// 执行Inbound操作 channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));System.out.println("======");// 执行Outbound操作channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));}
}

执行结果:

image-20240413181139427

3.5 ByteBuf

netty中的ByteBuf的容量可以动态扩容,相比较于在NIO中的ByteBuffer一旦指定初始容量之后就无法更改了!若是写入超过容量的数据则会出现覆盖的情况!

3.5.1 创建

public class ByteBufLearn {public static void main(String[] args) {// 创建byteBuf(直接内存,而不是堆内存),默认大小是256,通过该方式创建的ByteBuf对象会自动扩容,与nio的ByteBuffer不一样,nio达到融期限之后,会报错ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(32);// 使用ByteBufAllocator创建直接内容,直接内容不需要考虑GC回收,默认获取的 byteBuf 便是直接内存,需要主动释放关闭// ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer();// 使用ByteBufAllocator创建堆内存// ByteBuf heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer();ByteBuffer byteBuffer = ByteBuffer.allocate(32);// 往byteBuf里面写入内容StringBuilder stringBuilder = new StringBuilder();for (int i = 0; i < 32; i++) {stringBuilder.append(i).append("-");}// 往ByteBuf对象中写入数据,当达到容量界限之后,会自动扩容byteBuf.writeBytes(stringBuilder.toString().getBytes(StandardCharsets.UTF_8));ByteBufUtil.log(byteBuf);// 往nio的byteBuffer对象中写入数据,达到界限之后,会抛出 BufferOverflowException 的异常byteBuffer.put(stringBuilder.toString().getBytes(StandardCharsets.UTF_8));}
}

运行结果:

image-20240624162720705

  • ByteBuf通过ByteBufAllocator选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小

  • 当ByteBuf的容量无法容纳所有数据时,ByteBuf会进行扩容操作。而nio的byteBuffer对象中写入数据,达到界限之后,会抛出 BufferOverflowException 的异常

  • 如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建

3.5.2 直接内存与堆内存

堆内存与直接内存区别:

  • 堆内存的分配效率比较高,但是读写内存的效率比较低

  • 直接内存分配效率比较低,但是读写效率高(少一次内存复制),适合配合池化功能一起用。直接内存使用的是系统内存

    • 直接内存使用的是系统内存,若是从磁盘中读取文件时会将数据直接读入到系统内存,那么系统内存呢就会用直接内存的方式映射到java内存中,java里面访问的和操作系统访问的是同一块内存,那么就可以减少一次内存的复制,所以读取效率会高于堆内存。
    • 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放

创建基于直接内存的ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);

创建基于堆的ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);

3.5.3 池化与非池化

池化的最大意义在于可以重用 ByteBuf,优点:

  • 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
  • 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现

4.1 之前,池化功能还不成熟,默认是非池化实现

池化功能是否开启,可以通过下面的系统环境变量来设置

// unpooled:非池化 pooled:池化
-Dio.netty.allocator.type={unpooled|pooled}

3.5.4 组成

ByteBuf由四部分组成

image-20240624161139435

ByteBuf 是一个字节容器,容器里面的的数据分为四个部分

  • 第一个部分是已经丢弃的字节,这部分数据是无效的;(已经读过的内容)

  • 第二部分是可读字节,这部分数据是 ByteBuf 的主体数据, 从 ByteBuf 里面读取的数据都来自这一部分;(已经写入但还未读取的内容)

  • 第三部分数据是可写字节,所有写到 ByteBuf 的数据都会写到这一段;(剩余可写入数据的空间大小)

  • 最后一部分表示的是该 ByteBuf 最多还能扩容多少容量

四个重要属性:

  • readerIndex(读指针):指示读取的起始位置
  • writerIndex(写指针):指示写入的起始位置
  • capacity(当前容量):当前容量。当向 ByteBuf 写数据的时候,如果容量不足,那么这个时候可以进行扩容,直到 capacity 扩容到 maxCapacity,超过 maxCapacity 就会报错。
  • maxCapacity(最大容量):表示ByteBuf可以扩容的最大容量。

3.5.5 写入

方法列表,省略一些不重要的方法

方法签名含义备注
writeBoolean(boolean value)写入 boolean 值用一字节 01|00 代表 true|false
writeByte(int value)写入 byte 值
writeShort(int value)写入 short 值
writeInt(int value)写入 int 值Big Endian,即 0x250,写入后 00 00 02 50
writeIntLE(int value)写入 int 值Little Endian,即 0x250,写入后 50 02 00 00
writeLong(long value)写入 long 值
writeChar(int value)写入 char 值
writeFloat(float value)写入 float 值
writeDouble(double value)写入 double 值
writeBytes(ByteBuf src)写入 netty 的 ByteBuf
writeBytes(byte[] src)写入 byte[]
writeBytes(ByteBuffer src)写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset)写入字符串

注意

  • 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用
  • 网络传输,默认习惯是 Big Endian

代码示例:

public class ByteBufStudy {public static void main(String[] args) {// 创建ByteBufByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);ByteBufUtil.log(buffer);// 向buffer中写入数据buffer.writeBytes(new byte[]{1, 2, 3, 4});ByteBufUtil.log(buffer);buffer.writeInt(5);ByteBufUtil.log(buffer);buffer.writeIntLE(6);ByteBufUtil.log(buffer);buffer.writeLong(7);ByteBufUtil.log(buffer);}
}

运行结果:

image-20240624162239910

还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置

3.5.6 扩容

从上方写入数据示例根据结果可以看到,在如下的代码中产生了动态扩容操作

buffer.writeLong(7);
ByteBufUtil.log(buffer);

image-20240624154645292

扩容规则:

image-20240624164304928

  • 如果需要的容量等于门限阈值,则直接使用阈值作为新的缓存区容量。
  • 如果需要的容量大于阈值,则采用每次步进4MB的方式进行内存扩张,即将需要扩容值除以4MB后乘以4MB,然后将结果与最大容量进行比较,取其中的较小值作为目标容量。
  • 如果需要的容量小于阈值,则采用倍增的方式,以64字节作为基本数值,每次翻倍增长(如64,128,256…),直到倍增后的结果大于或等于所需的容量值。
  • 扩容不能超过 maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException异常

3.5.7 读取

参数含义
buffer.readByte()每次读取一个字节
buffer.readInt()每次读取一个整数,也就是四个字节
buffer.markReaderIndex()为读指针做一个标记,配合下面的方法可以实现重复读取某个数
buffer.resetReaderIndex()将读指针跳到上一个标记过的地方实现重复读取某个数

除了上面一些了read开头的方法以外,还有一系列get开头的方法也可以读取数据,只不过get开头的方法不会改变读指针位置。相当于是按索引去获取。

示例:如果需要重复读取,需要调用buffer.markReaderIndex()对读指针进行标记,并通过buffer.resetReaderIndex()将读指针恢复到mark标记的位置

public class ByteBufReadTest {public static void main(String[] args) {// 创建ByteBufByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);// 向buffer中写入数据buffer.writeBytes(new byte[]{1, 2, 3, 4});buffer.writeInt(5);ByteBufUtil.log(buffer);// 读取4个字节System.out.println(buffer.readByte());System.out.println(buffer.readByte());System.out.println(buffer.readByte());System.out.println(buffer.readByte());ByteBufUtil.log(buffer);// 通过mark与reset实现重复读取buffer.markReaderIndex();System.out.println(buffer.readInt());ByteBufUtil.log(buffer);// 恢复到mark标记处buffer.resetReaderIndex();ByteBufUtil.log(buffer);}
}

执行结果:

image-20240624165031776

3.5.8 内存回收(retain & release)

由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用

内存回收规则:

  • 因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)

基本规则是,谁是最后使用者,谁负责 release,详细分析如下

  • 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
  • 入站 ByteBuf 处理原则
    • 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
    • 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
    • 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
    • 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
    • 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
  • 出站 ByteBuf 处理原则
    • 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
  • 异常处理原则
    • 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true

TailContext 释放未处理消息逻辑

// io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object)
protected void onUnhandledInboundMessage(Object msg) {try {logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. " +"Please check your pipeline configuration.", msg);} finally {ReferenceCountUtil.release(msg);}
}

当ByteBuf被传到了pipeline的head与tail时,ByteBuf会被其中的方法彻底释放,但前提是ByteBuf被传递到了head与tail中

具体代码

// io.netty.util.ReferenceCountUtil#release(java.lang.Object)
public static boolean release(Object msg) {if (msg instanceof ReferenceCounted) {return ((ReferenceCounted) msg).release();}return false;
}

3.5.9 零拷贝

切片
  • 【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针,修改子分片,会修改原ByteBuf。
  • 得到分片后的buffer后,要调用其retain方法,使其内部的引用计数加一。避免原ByteBuf释放,导致切片buffer无法使用

image-20240624231851082

public class TestSlice {public static void main(String[] args) {// 创建ByteBufByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);// 向buffer中写入数据buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});// 将buffer分成两部分。在切片过程中,没有发生数据复制// 注意此时切片最大容量做了限制,不需要新增数据ByteBuf slice1 = buffer.slice(0, 5);ByteBuf slice2 = buffer.slice(5, 5);// 需要让分片的buffer引用计数加一// 避免原Buffer释放导致分片buffer无法使用slice1.retain();slice2.retain();ByteBufUtil.log(slice1);ByteBufUtil.log(slice2);// 释放原有byteBuf 内存//若是在release()之后也想正常使用,可以在此之前使用retain()进行引用+1,release()相对于会引用-1,此时就不会真正释放内存,自然也就能欧使用buffer.release();ByteBufUtil.log(slice1);// 更改原始buffer中的值System.out.println("===========修改原buffer中的值===========");buffer.setByte(0,5);System.out.println("===========打印slice1===========");ByteBufUtil.log(slice1);}
}

运行结果:

image-20240624232326066

注意:slice后的分片,不能再次写入新的数据,这会影响原ByteBuf。

duplicate:整块

【零拷贝】的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的。

代码示例:

public class ByteBufDuplicateDemo {public static void main(String[] args) {ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);byteBuf.writeBytes(new byte[]{1,2,3,4,5,6,7,8,9,0});// 拷贝一块bufByteBuf duplicate = byteBuf.duplicate();ByteBufUtil.log(duplicate);// 将最后一位0修改成10duplicate.setByte(9,10);// 打印byteBufByteBufUtil.log(byteBuf);// 写入新数据11duplicate.writeByte(11);// 打印byteBufByteBufUtil.log(byteBuf);}}

运行结果:

image-20240624233218884

CompositeByteBuf:组装ByteBuf

【零拷贝】的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝。

CompositeByteBuf是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。

  • 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
  • 缺点,复杂了很多,多次操作会带来性能的损耗

代码示例:将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝。注意要设置true来让其调整读,写指针。

public class TestCompositeByteBuf {public static void main(String[] args) {ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});ByteBufUtil.log(buf1);ByteBufUtil.log(buf2);// 效率较低方案:直接通过writeBytes()写入字节方式写入// ByteBufUtil.log(ByteBufAllocator.DEFAULT.buffer(20).writeBytes(buf1).writeBytes(buf2));// 零拷贝:合并两个Buffer到一个Buffer中,使用的共享内存CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();// true 表示增加新的 ByteBuf 自动递增 write index, 否则 write index 会始终为 0buf3.addComponents(true, buf1, buf2);System.out.println("结果:");ByteBufUtil.log(buf3);}
}

执行结果:

image-20240624233933557

Unpooled
  • Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作
  • 这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf。

代码示例:

public class UnpooledTest {public static void main(String[] args) {ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});// 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBufByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);buf3.setByte(0,6);ByteBufUtil.log(buf3);}
}

执行结果:

image-20240624234350352

3.5.10 copy:深度拷贝

ByteBuf提供了copy方法,这一类方法是真正的拷贝原ByteBuf到新的内存,返回一个新的ByteBuf,与原ByteBuf没有关系。

提供两个拷贝:

  • 一个是全量。public abstract ByteBuf copy();
  • 一个指定位置和长度。public abstract ByteBuf copy(int index, int length);

代码示例:

public class ByteBufCopyDemo {public static void main(String[] args) {ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);byteBuf.writeBytes(new byte[]{1,2,3,4,5,6,7,8,9,0});ByteBuf copy1 = byteBuf.copy();ByteBufUtil.log(copy1);ByteBuf copy2 = byteBuf.copy(5, 5);ByteBufUtil.log(copy2);}
}

结果:

image-20240624235030168

3.5.11 ByteBuf 优势

  • 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/35031.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis-哨兵模式-主机宕机-推选新主机的过程

文章目录 1、为哨兵模式准备配置文件2、启动哨兵3、主机6379宕机3.4、查看sentinel控制台日志3.5、查看6380主从信息 4、复活63794.1、再次查看sentinel控制台日志 1、为哨兵模式准备配置文件 [rootlocalhost redis]# ll 总用量 244 drwxr-xr-x. 2 root root 150 12月 6 2…

label studio数据标注平台的自动化标注使用

&#xff08;作者&#xff1a;陈玓玏&#xff09; 开源项目&#xff0c;欢迎star哦&#xff0c;https://github.com/tencentmusic/cube-studio 做图文音项目过程中&#xff0c;我们通常会需要进行数据标注。label studio是一个比较好上手的标注平台&#xff0c;可以直接搜…

突出显示列,重点内容一目了然!

老师在发布查询时&#xff0c;希望学生家长一眼就能看到重要的信息&#xff0c;应该如何设置&#xff1f; 易查分的新功能&#xff1a;突出显示列&#xff0c;就可以轻松实现&#xff01;老师可以个性化设置突出显示列的样式&#xff0c;包括颜色、字体大小、隐藏标题等&#x…

P2实验室装修标准都有哪些

P2实验室&#xff08;也称为生物安全二级实验室&#xff0c;BSL-2实验室&#xff09;的装修标准需要满足一系列的设计和施工要求&#xff0c;以确保实验室的安全性和功能性。因此&#xff0c;P2实验室装修标准不仅要满足一般实验室的要求&#xff0c;还需符合生物安全的特殊规定…

餐厅点餐系统JAVA全栈开发(SSM框架+MYSQL)

代码仓库 GitHub - JJLi0427/Online_Order_SystemContribute to JJLi0427/Online_Order_System development by creating an account on GitHub.https://github.com/JJLi0427/Online_Order_System 项目介绍 餐厅点餐系统包含用户使用界面和功能实现&#xff0c;后台店员和管…

C++初学者指南-2.输入和输出---文件输入和输出

C初学者指南-2.输入和输出—文件输入和输出 文章目录 C初学者指南-2.输入和输出---文件输入和输出1.写文本文件2.读文本文件3.打开关闭文件4.文件打开的模式 1.写文本文件 使用&#xff1a; std::ofstream&#xff08;输出文件流&#xff09; #include <fstream> // 文…

YOLOv8关键点pose训练自己的数据集

这里写自定义目录标题 YOLOv8关键点pose训练自己的数据集一、项目代码下载二、制作自己的关键点pose数据集2.1 标注(非常重要)2.1.1 标注软件2.1.2 标注注意事项a.多类别检测框b.单类别检测框2.2 格式转换(非常重要)2.3 数据集划分三、YOLOv8-pose训练关键点数据集3.1 训练…

通过frp实现内外网映射

frp介绍和使用方法可以参考官网:安装 | frp 1、准备两台服务器&#xff0c;一台内网服务器A&#xff0c;一台有公网ip的外网服务器B(47.12.13.15) 2、去官方仓库下载frp安装包&#xff1a;Releases fatedier/frp GitHub 下载包根据自己服务系统选择 ​ 3、先在外网服务器…

《昇思25天学习打卡营第1天|onereal》

昇思25天学习打卡营第1天;有点一头雾水的感觉&#xff0c;说是要在jupyter中签到打卡&#xff0c;是不是就是复制粘贴。我以为是要在终端机器中运行代码呢。 如果只是粘贴代码&#xff0c;那未免太简单了。 我还是想运行这个算力机器&#xff0c;但是他们说每次只能2小时。太…

AI播客下载:Eye on AI(AI深度洞察)

"Eye on A.I." 是一档双周播客节目&#xff0c;由长期担任《纽约时报》记者的 Craig S. Smith 主持。在每一集中&#xff0c;Craig 都会与在人工智能领域产生影响的人们交谈。该播客的目的是将渐进的进步置于更广阔的背景中&#xff0c;并考虑发展中的技术的全球影响…

pp 学习一 生产模块主数据

生产成本&#xff1a;原材料是什么&#xff0c;价格多少&#xff0c;人工耗费时间&#xff0c;以及其他的费用 离散制造&#xff1a;有生产订单。工序是分开的&#xff08;可以停&#xff09; 重复制造&#xff1a;没有生产订单&#xff08;可能有客户下达的任务单或者计划订…

一分钟彻底掌握Java多线程生产者与消费者模型

代码 package com.example.KFC; public class Cooker extends Thread { public void run() { while (true) { synchronized (Desk.lock) { if (Desk.maxCount 0) { break; } else { if (!Desk.flag) { System.out.println("Cooker makes a hamburger"); …

unity中使用commandbuffer将自定义画面渲染到主相机上

CommandBuffer 保存渲染命令列表&#xff08;例如设置渲染目标或绘制给定网格&#xff09;。您可以指示 Unity 在内置渲染管线中的各个点安排和执行这些命令&#xff0c;因此&#xff0c;您可以自定义和扩展 Unity 的渲染功能。 这句话意味着你可以通过command buffer让相机渲…

计算机基础知识——面向对象:封装+继承+多态整理

面向对象三大特性&#xff1a;封装、继承、多态。 1.封装 将一系列相关事物的共同的属性和行为提取出来&#xff0c;放到一个类中&#xff0c;同时隐藏对象的属性和实现细节&#xff0c;仅对外提供公共的访问方式。 【JavaBean类就可以看作是封装的完美案例。】 setter和get…

云动态摘要 2024-06-25

给您带来云厂商的最新动态&#xff0c;最新产品资讯和最新优惠更新。 最新产品更新 Web应用防火墙 - 验证码支持微信小程序接入 阿里云 2024-06-25 支持客户从微信小程序场景下接入&#xff0c;提供人机识别的安全防护。 工业数字模型驱动引擎 - iDME控制台换新升级 华为云…

[20] Opencv_CUDA应用之 关键点检测器和描述符

Opencv_CUDA应用之 关键点检测器和描述符 本节中会介绍找到局部特征的各种方法&#xff0c;也被称为关键点检测器关键点(key-point)是表征图像的特征点&#xff0c;可用于准确定义对象 1. 加速段测试特征功能检测器 FAST算法用于检测角点作为图像的关键点&#xff0c;通过对…

轻松掌握:工科生如何高效阅读国际期刊和撰写论文(上)

⭐️我叫忆_恒心&#xff0c;一名喜欢书写博客的研究生&#x1f468;‍&#x1f393;。 如果觉得本文能帮到您&#xff0c;麻烦点个赞&#x1f44d;呗&#xff01; 近期会不断在专栏里进行更新讲解博客~~~ 有什么问题的小伙伴 欢迎留言提问欧&#xff0c;喜欢的小伙伴给个三连支…

七天速通javaSE:第三天 程序控制结构:练习题

文章目录 前言一、基础1.计算从0~100之间奇数之和和偶数之和2. 用for循环输出0~1000之间能被5整除的数&#xff0c;每行输出三个 二、进阶1. 九九乘法表2.等边三角形 前言 本文主要讲解三种基本程序控制结构的练习题&#xff0c;以期熟练掌握顺序、选择、循环三种基本结构 一、…

怎么用Python接口发送推广短信

群发短信平台推广&#xff0c;有不少优点。其中通过正规106运营商平台推送&#xff0c;信息更加正规性。尤其是对接接口短信&#xff0c;比如验证码之类的&#xff0c;个人手机号码下发的验证码一般都不靠谱。 支持点对点一对一群发&#xff0c;方便工资条、物业通知等变量信息…

Android开发系列(十)Jetpack Compose之Card

Card是一种常用的UI组件&#xff0c;用于显示一个具有卡片样式的容器。Card组件通常用于显示列表项、卡片式布局或任何需要显示边框和阴影的UI元素。 使用Card组件&#xff0c;您可以轻松地创建带有卡片效果的UI元素。以下是一些Card组件的常见属性和功能&#xff1a; elevati…