文章目录
- 1. 概述
- 2. 代码实例
- 2.1 服务端
- 2.2 客户端
- 2.3 运行截图
- 3. 整体结构
- 4. 重要组件
- 4.1 EventLoopGroup、EventLoop
- 4.2 Handler & Pipeline
- 4.3 ByteBuf
- 参考文献
1. 概述
Netty 是一款用于高效开发网络应用的 NIO 网络框架,它大大简化了网络应用的开发过程。
Netty 相比 JDK NIO 的优势:
● 易用性:Netty 在 NIO 基础上进行了更高层次的封装,屏蔽了 NIO 的复杂性,大大降低了开发者的上手难度;
● 稳定性:Netty 修复和完善了 JDK NIO 较多已知问题,例:select 空转导致 CPU 消耗 100%,TCP 断线重连,keep-alive 检测等;
● 可扩展性: 可定制化的线程模型,用户可以通过启动的配置参数选择 Reactor 线程模型;另一个是可扩展的事件驱动模型,将框架层和业务层的关注点分离,开发者只需要关注 ChannelHandler
Netty 比 JDK NIO 更低的资源消耗:
● 对象池复用技术。 Netty 通过复用对象,避免频繁创建和销毁带来的开销;
● 零拷贝技术。 除了操作系统级别的零拷贝技术外,Netty 提供了更多面向用户态的零拷贝技术,例如 Netty 在 I/O 读写时直接使用 DirectBuffer,从而避免了数据在堆内存和堆外内存之间的拷贝;
2. 代码实例
2.1 服务端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;import java.nio.charset.CharacterCodingException;
import java.nio.charset.StandardCharsets;/*** Netty服务端示例01*/
public class NettyServer01 {/*** 主函数,服务器的入口点* @param args 命令行参数*/public static void main(String[] args) {// 创建BossGroup和WorkerGroup,分别处理连接接受和数据读写NioEventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(2);NioEventLoopGroup workerEventLoopGroup = new NioEventLoopGroup(8);new ServerBootstrap() // 初始化ServerBootstrap.group(bossEventLoopGroup, workerEventLoopGroup) // 设置EventLoopGroup.channel(NioServerSocketChannel.class) // 指定服务器通道类.childHandler(new ChannelInitializer<NioSocketChannel>() { // 设置通道初始化器/*** 初始化通道,添加处理器到通道的管道中* @param ch 当前初始化的通道*/protected void initChannel(NioSocketChannel ch) {// 添加多个处理器,分别处理入站和出站事件ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {/*** 处理入站数据* @param ctx 通道上下文* @param msg 接收到的消息对象*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf byteBuf = inbound((ByteBuf) msg, "1");ctx.fireChannelRead(byteBuf);}});ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws CharacterCodingException {ByteBuf byteBuf = inbound((ByteBuf) msg, "2");ctx.fireChannelRead(byteBuf);}});ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {/*** 处理入站数据,将处理后的数据写回通道* @param ctx 通道上下文* @param msg 接收到的消息对象*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf byteBuf = inbound((ByteBuf) msg, "3");ctx.channel().write(byteBuf);}});ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {/*** 处理出站数据,在数据写出前进行加工* @param ctx 通道上下文* @param msg 要写出的消息对象* @param promise 写操作的承诺*/@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {ByteBuf byteBuf = outbound((ByteBuf) msg, "4");ctx.writeAndFlush(msg);ctx.write(byteBuf, promise);}});ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {ByteBuf byteBuf = outbound((ByteBuf) msg, "5");ctx.write(byteBuf, promise);}});ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {ByteBuf byteBuf = outbound((ByteBuf) msg, "6");ctx.write(byteBuf, promise);}});}}).bind(8080); // 绑定端口并启动服务器}/*** 对出站数据进行处理* @param msg 待处理的ByteBuf对象* @param no 数据标识号* @return 处理后的ByteBuf对象*/private static ByteBuf outbound(ByteBuf msg, String no) {ByteBuf byteBuf = msg;String output = byteBufToString(byteBuf);System.out.printf("\n\noutbound%s output: %s", no, output);stringWriteToByteBuf(byteBuf, String.format("\noutbound%s 已处理", no));return byteBuf;}/*** 对入站数据进行处理* @param msg 待处理的ByteBuf对象* @param no 数据标识号* @return 处理后的ByteBuf对象*/private static ByteBuf inbound(ByteBuf msg, String no) {String input = byteBufToString(msg);System.out.printf("\n\ninbound%s input: %s\n", no, input);stringWriteToByteBuf(msg, String.format("\ninbound%s 已处理", no));return msg;}/*** 将ByteBuf对象转换为字符串* @param msg 待转换的ByteBuf对象* @return 字符串表示的数据*/private static String byteBufToString(ByteBuf msg) {return msg.toString(StandardCharsets.UTF_8);}/*** 将字符串写入ByteBuf对象* @param byteBuf 待写入的ByteBuf对象* @param msg 要写入的字符串数据*/private static void stringWriteToByteBuf(ByteBuf byteBuf, String msg) {byteBuf.writeBytes(msg.getBytes(StandardCharsets.UTF_8));}
}
2.2 客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;import java.nio.charset.StandardCharsets;public class NettyClient01 {public static void main(String[] args) {new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) {ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println(((ByteBuf) msg).toString(StandardCharsets.UTF_8));}});}}).connect("127.0.0.1", 8080).addListener((ChannelFutureListener) future -> {future.channel().writeAndFlush("hello,world");});}
}
2.3 运行截图
服务端截图:
客户端截图:
3. 整体结构
● Boss EventLoopGroup: 负责监听网络连接事件,把新网络连接的Channel 注册到 Worker EventLoopGroup
● Worker EventLoopGroup: 分配一个 EventLoop 负责处理该 Channel 的读写事件,每个 EventLoop 都是单线程的,所以该连接线程安全的,通过 Selector 进行事件循环
● 客户端发起 I/O 读写事件时,服务端 EventLoop 会进行数据的读取,然后通过 Pipeline 触发各种监听器进行数据的加工处理。客户端数据会被传递到 ChannelPipeline 的第一个 ChannelInboundHandler 中,数据处理完成后,将加工完成的数据传递给下一个 ChannelInboundHandler。当数据写回客户端时,会将处理结果在 ChannelPipeline 的 ChannelOutboundHandler 中传播,最后到达客户端。
4. 重要组件
4.1 EventLoopGroup、EventLoop
EventLoop本质上是一个单线程执行器,维护了一个 Selector,里面有run方法处理Channel上源源不断的io事件。EventLoop继承了ScheduledExecutorService、和自己的OrderedEventExecutor,OrderedEventExecutor 提供了inEventLoop(java.lang.Thread thread)、EventExecutorGroup parent()、EventExecutor next() 等方法。
EventLoopGroup 是一组 EventLoop,Channel一般会调用 EventLoopGroup 的 register 方法绑定其中一个EventLoop,后续这个Channel上 的io都由这个EventLoop处理,EventLoop又是单线程的,保证了单个Channel io 事件处理的线程安全性。
4.2 Handler & Pipeline
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
● 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
● 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表
4.3 ByteBuf
传送
参考文献
- 黑马 Netty教程
- 拉钩教育 Netty 核心原理剖析与 RPC 实践 若地老师