文章目录
- EventLoop
- Channel
- Future 与 Promise
- Handler与Pipeline
- ByteBuf
Netty的核心组件包括以下几种:
- EventLoop:负责处理注册到其上的channel的所有I/O事件。
- Channel:表示数据传输的网络通道。
- Future 与 Promise:Future用于等待任务完成后获取结果,Promise类似于Future,但可以主动设置操作的结果。
- Handler与Pipeline:Handle用于处理Channel上的各种事件,如数据读取和写入,Pipeline由若干个Handler组成,负责事件的传播和处理。
- ByteBuf:Netty中的字节容器,可以用于高效地读写数据。
EventLoop
- EventLoop(事件循环对象)本质上是一个单线程执行器,里面的 run 方法会处理注册到其上Channel的所有I/O事件。
- EventLoopGroup(事件循环组)由多个 EventLoop 组成,Channel 会通过调用 EventLoopGroup 的register 方法来与其中一个 EventLoop 进行绑定,后续该 Channel 上的所有 I/O 事件都由此 EventLoop进行处理,从而保证了处理 I/O 事件时的线程安全。
DefaultEventLoopGroup()与NioEventLoopGroup()区别
DefaultEventLoopGroup()可以执行普通任务和定时任务,NioEventLoopGroup()除了可以执行以上两种任务外,还可以执行IO任务。
NioEventLoopGroup执行普通任务与定时任务
public static void main(String[] args) {// 1. 创建事件循环组EventLoopGroup group = new NioEventLoopGroup(2);//EventLoopGroup group = new DefaultEventLoopGroup(); // 2. 获取下一个事件循环对象System.out.println(group.next());System.out.println(group.next());System.out.println(group.next());System.out.println(group.next());// 3. 执行普通任务group.next().execute(()->{try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}log.debug("ok");});// 4. 执行定时任务group.next().scheduleAtFixedRate(() -> {log.debug("ok");}, 0, 2, TimeUnit.SECONDS);log.debug("main");}
NioEventLoopGroup执行IO任务
服务端代码
以下服务端创建了1个boss线程,2个worker线程,boss线程用于处理客户端的连接,worker线程用于处理客户端的IO事件。
new ServerBootstrap()
//1个boss线程,2个worker线程.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;if (byteBuf != null) {byte[] buf = new byte[16];ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());log.debug(new String(buf));}}});}}).bind(8080).sync();
客户端代码
启动三次客户端,分别发送字符串 hello,hello2,hello3。
public static void main(String[] args) throws InterruptedException {Channel channel = new Bootstrap().group(new NioEventLoopGroup(1)).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {System.out.println("init...");ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}}).channel(NioSocketChannel.class).connect("localhost", 8080).sync().channel();channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello3".getBytes()));Thread.sleep(2000);channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello3".getBytes()));}
服务端运行结果
从结果可以看出,EventLoop可以处理多个Channel,当EventLoop与某个Channel确立了绑定关系,后续该Channel所产生的事件都由绑定的EventLoop处理。
服务端再增加两个非IO的work线程,客户端保持不变
public static void main(String[] args) throws InterruptedException {DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2);new ServerBootstrap().group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(normalWorkers,"handler",new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;if (byteBuf != null) {byte[] buf = new byte[16];ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());log.debug(new String(buf));}}});}}).bind(8080).sync();}
服务端执行结果
从结果可以看到,nio事件循环对象和 非nio事件循环对象都绑定了 channel。
不同EventLoopGroup切换的实现原理如下
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);// 判断下一个handler的事件循环是否与当前的事件循环是同一个线程EventExecutor executor = next.executor();// 是的话直接调用if (executor.inEventLoop()) {next.invokeChannelRead(m);} // 否则将要执行的代码作为任务提交给下一个事件循环处理else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}
}
Channel
channel 的主要方法
- close() :关闭 channel
- closeFuture() :灵活处理 channel 的关闭,分为同步和异步两种方式,sync 方法用于同步等待 channel 关闭,而 addListener 方法可以异步等待 channel 的关闭
- pipeline() :添加处理器
- write() :将数据写入channel,不会立即发送,当缓冲满了或者调用了flush()方法后才会发送出去
- writeAndFlush() :将数据写入channel并刷出
ChannelFuture
ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) {ch.pipeline().addLast(new StringEncoder());}})//异步非阻塞,主线程发起了调用,真正执行connect方法的是nio线程.connect("127.0.0.1", 8080); // 该方法等待连接真正建立channelFuture.sync(); //1
channelFuture.channel().writeAndFlush(new Date() + ": hello world!");
若注释掉代码1,服务端可能收不到客户端的信息,因为connect 方法是异步的,可能出现连接还没建立,方法执行完就返回了,导致channelFuture 不能得到正确的 Channel 对象。正确的方式应该使用sync 方法同步阻塞等待,或者使用回调的方式。
addListener回调方法
ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) {ch.pipeline().addLast(new StringEncoder());}}).connect("127.0.0.1", 8080);
System.out.println(channelFuture.channel());
channelFuture.addListener((ChannelFutureListener) future -> {System.out.println(future.channel());
});
CloseFuture
CloseFuture 可以用于注册回调,当 Channel 关闭时,这些回调会被触发执行,适用于channel关闭后做一些额外操作的场景。
@Slf4j
public class CloseFutureClient {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup group new NioEventLoopGroup();ChannelFuture channelFuture = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress("localhost", 8080));Channel channel = channelFuture.sync().channel();log.debug("{}", channel);new Thread(()->{Scanner scanner = new Scanner(System.in);while (true) {String line = scanner.nextLine();if ("q".equals(line)) {channel.close();
// log.debug("关闭之后的额外操作"); // 不能在这里执行break;}channel.writeAndFlush(line);}}, "input").start();// 获取 CloseFuture 对象,关闭分为两种方式, 1) 同步关闭, 2) 异步关闭ChannelFuture closeFuture = channel.closeFuture();/*log.debug("waiting close...");//同步关闭closeFuture.sync();log.debug("处理关闭之后的操作");*///异步回调的方式关闭closeFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {log.debug("关闭之后的额外操作");group.shutdownGracefully();}});}
}
Future 与 Promise
这两个接口通常用于处理异步操作,netty 的 Future 继承自 jdk 的 Future,Promise 对 netty Future 进行了功能扩展 。JDK 的 Future 只能同步等待任务完成后获取结果,Netty 的 Future 既可以同步等待,也可以异步等待任务完成后获取结果,但都需要等待任务完成,Netty 的 Promise 拥有 Netty Future 的所有功能,常用于不同线程间传递结果。
JDK Future
public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 获取线程池ExecutorService service = Executors.newFixedThreadPool(2);// 2. 提交任务Future<Integer> future = service.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {log.debug("JDK Future");Thread.sleep(1000);return 50;}});// 3. 主线程通过 future 来获取结果log.debug("等待结果");log.debug("结果是 {}", future.get());}
Netty Future
public static void main(String[] args) throws ExecutionException, InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();EventLoop eventLoop = group.next();Future<Integer> future = eventLoop.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {log.debug("Netty Future");Thread.sleep(1000);return 70;}});// 同步阻塞获取结果
// log.debug("结果是 {}", future.get());
// 异步获取结果future.addListener(new GenericFutureListener<Future<? super Integer>>(){@Overridepublic void operationComplete(Future<? super Integer> future) throws Exception {log.debug("接收结果:{}", future.getNow());}});}
Netty Promise
Promise是一个容器,可以让线程获取执行结果,将结果存到该容器。
public static void main(String[] args) throws ExecutionException, InterruptedException {EventLoop eventLoop = new NioEventLoopGroup().next();DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);new Thread(() -> {// 3. 线程执行计算,任务完成向 promise 放结果log.debug("开始计算...");try {// int i = 1 / 0;Thread.sleep(1000);promise.setSuccess(80);} catch (Exception e) {e.printStackTrace();promise.setFailure(e);}}).start();log.debug("等待结果...");log.debug("结果是: {}", promise.get());}
Handler与Pipeline
ChannelHandler 用于处理 Channel 上的事件,事件分为两类:入站事件和出站事件,Pipeline采用责任链模式,由多个ChannelHandler 实例组成双向链表。入站处理器继承自 ChannelInboundHandlerAdapter 类,用于读取客户端的数据,并将处理结果写回;出站处理器继承 ChannelOutboundHandlerAdapter 类,对写回的数据进行处理。
public class ServerPipeline {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("ln_1", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("1");super.channelRead(ctx, msg);}});pipeline.addLast("ln_2", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {log.debug("2");//该方法里面会调用ctx.fireChannelRead(msg);将数据传递给下个 handler,如果不调用,调用链会断开 super.channelRead(ctx, name);}});pipeline.addLast("ln_3", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("3");// ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));}});pipeline.addLast("Out_4", new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("4");super.write(ctx, msg, promise);}});pipeline.addLast("Out_5", new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("5");super.write(ctx, msg, promise);}});pipeline.addLast("Out_6", new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("6");super.write(ctx, msg, promise);}});}}).bind(8080);}}
Pipeline的结构如下
hannder的调用顺序如下
- 调用ctx.fireChannelRead(msg)方法可以将当前处理器的处理结果传递给Pipeline的下一个处理器。
- 对于入站(Inbound)事件,会从Pipeline的head向后依次调用每个入站处理器。
- 对于出站(Outbound)事件,有两种情况,一种是从Pipeline的tail,向前依次调用每个出站处理器,另一种是从当前处理器往前调用出站处理器。
对于出站事件,有两种触发方式
NioSocketChannel.writeAndFlush():该方法是从tail开始向前查找OutboundHandler。
ChannelHandlerContext.writeAndFlush():该方法是从当前handler向前寻找OutboundHandler。
ByteBuf
ByteBuf分为两种,分别为基于堆和基于直接内存的 ByteBuf
//基于堆的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
//基于直接内存的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
直接内存对 GC 压力小,读写性能高,适合池化功能一起用,但是创建和销毁的成本高,需要及时主动释放。
public class TestByteBuf {public static void main(String[] args) {// 创建一个容量为10的ByteBufByteBuf buf = Unpooled.buffer(10);// 写入数据buf.writeInt(10);buf.writeBytes("Learn Netty!".getBytes());// 标记当前读指针的位置buf.markReaderIndex();// 读取数据int value = buf.readInt();System.out.println("Value: " + value);// 逐字节读取while (buf.isReadable()) {System.out.print((char) buf.readByte());}System.out.println();// 重置读指针到标记的位置buf.resetReaderIndex();// 读取数据value = buf.readInt();System.out.println("Value: " + value);// 创建一个切片,从当前读指针开始,长度为5ByteBuf slicedBuf = buf.slice(buf.readerIndex(), 5);System.out.println("切片内容: " + slicedBuf.toString(io.netty.util.CharsetUtil.UTF_8));// 释放ByteBuf占用的资源buf.release();}
}
池化 vs 非池化
开启池化,可以重用池中 ByteBuf 的实例,若没开启,每次都得创建新的 ByteBuf 实例,所以池化功能更节约内存,减少内存溢出的发生,通过系统环境变量来设置
-Dio.netty.allocator.type={unpooled|pooled}
ByteBuf组成
写入方法
扩容规则
- 若写入的数据未超过 512字节,则选择下一个 16 的整数倍,比如写入后容量为 15 ,则扩容后 capacity 是 16
- 若写入后数据超过 512字节,则选择下一个 2 的n次方,比如写入后容量为 513,则扩容后 capacity 是 1024,扩容不能超过maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException异常
内存回收
Netty 采用了引用计数法来控制内存的回收,每个 ByteBuf 都实现了 ReferenceCounted 接口,每个 ByteBuf 对象的初始计数为 1,每调用 retain 方法计数加 1,调用 release 方法计数减 1,当计数为 0时,ByteBuf 对象被回收。
slice
slice使用了零拷贝,slice将原ByteBuf 切分为多个 ByteBuf实例,切片后的 ByteBuf 仍然使用原来ByteBuf 的内存,只是切片后的 ByteBuf 维护各自独立的读写指针。
public class TestSlice {public static void main(String[] args) {ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);buf.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});log(buf);// 从位置0开始切片,长度为5ByteBuf f1 = buf.slice(0, 5);f1.retain();ByteBuf f2 = buf.slice(5, 5);f2.retain();log(f1);log(f2);System.out.println("释放原来的 byteBuf 内存");buf.release();log(f1);f1.release();f2.release();}
}
CompositeByteBuf
CompositeByteBuf也使用了零拷贝,常用于将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免了拷贝。
public class TestCompositeByteBuf {public static void main(String[] args) {ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();buffer.addComponents(true, buf1, buf2);log(buffer);}
}
优点
内存池化 : 通过重复使用池内的ByteBuf实例,节省了内存并降低内存溢出的可能。
读写指针分离: 不需要像使用NIO中的ByteBuffer那样在不同读写模式间切换,提高了操作效率。
动态扩容:可以根据大小进行自动扩容。
零拷贝 : 比如slice和CompositeByteBuf等操作中使用了零拷贝,减少数据复制的次数。