Netty 使用和常用组件
简述
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId
<version>4.1.42.Final </version>
<scope>compile</scope>
</dependency>
Netty 的优势
1 、 API 使用简单,开发门槛低;
2 、功能强大,预置了多种编解码功能,支持多种主流协议;
3 、定制能力强,可以通过 ChannelHandler 对通信框架进行灵活地扩展;
4 、性能高,通过与其他业界主流的 NIO 框架对比, Netty 的综合性能最优;
5 、成熟、稳定, Netty 修复了已经发现的所有 JDK NIO BUG ,业务开发人员不需要再为 NIO 的 BUG 而烦恼;
6 、社区活跃,版本迭代周期短,发现的 BUG 可以被及时修复,同时,更多的新功能会 加入;
7 、经历了大规模的商业应用考验,质量得到验证。
为什么不用 Netty5
Netty5 已经停止开发了。
为什么 Netty 使用 NIO 而不是 AIO ?
Netty 不看重 Windows 上的使用,在 Linux 系统上, AIO 的底层实现仍使用 EPOLL ,没有 很好实现 AIO ,因此在性能上没有明显的优势,而且被 JDK 封装了一层不容易深度优化。 AIO 还有个缺点是接收数据需要预先分配缓存 , 而不是 NIO 那种需要接收时才需要分配 缓存, 所以对连接数量非常大但流量小的情况 , 内存浪费很多。 而且 Linux 上 AIO 不够成熟,处理回调结果速度跟不上处理需求。
第一个 Netty 程序
Bootstrap 、 EventLoop(Group) 、 Channel
Bootstrap 是 Netty 框架的启动类和主入口类,分为客户端类 Bootstrap 和服务器ServerBootstrap 两种。
Channel 是 Java NIO 的一个基本构造。 它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一 个或者多个不同的 I/O 操作的程序组件)的开放连接,如读操作和写操作 目前,可以把 Channel 看作是传入(入站)或者传出(出站)数据的载体。因此,它 可以被打开或者被关闭,连接或者断开连接。
EventLoop 暂时可以看成一个线程、 EventLoopGroup 自然就可以看成线程组。
事件和 ChannelHandler 、 ChannelPipeline
Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于 已经发生的事件来触发适当的动作。
Netty 事件是按照它们与入站或出站数据流的相关性进行分类的。 可能由入站数据或者相关的状态更改而触发的事件包括: 连接已被激活或者连接失活;
数据读取;用户事件;错误事件。
出站事件是未来将会触发的某个动作的操作结果,这些动作包括:打开或者关闭到远程 节点的连接;将数据写到或者冲刷到套接字。 每个事件都可以被分发给 ChannelHandler 类中的某个用户实现的方法,既然事件分为 入站和出站,用来处理事件的 ChannelHandler 也被分为可以处理入站事件的 Handler 和出站 事件的 Handler ,当然有些 Handler 既可以处理入站也可以处理出站。
Netty 提供了大量预定义的可以开箱即用的 ChannelHandler 实现,包括用于各种协议 (如 HTTP 和 SSL/TLS )的 ChannelHandler 。 基于 Netty 的网络应用程序中根据业务需求会使用 Netty 已经提供的 ChannelHandler 或 者自行开发 ChannelHandler ,这些 ChannelHandler 都放在 ChannelPipeline 中统一管理,事件 就会在 ChannelPipeline 中流动,并被其中一个或者多个 ChannelHandler 处理。
ChannelFuture
Netty 中所有的 I/O 操作都是异步的,我们知道“异步的意思就是不需要主动等待结果 的返回,而是通过其他手段比如,状态通知,回调函数等”,那就是说至少我们需要一种获 得异步执行结果的手段。
JDK 预置了 interface java.util.concurrent.Future , Future 提供了一种在操作完成时通知 应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时 刻完成,并提供对其结果的访问。但是其所提供的实现,只允许手动检查对应的操作是否已 经完成,或者一直阻塞直到它完成。这是非常繁琐的,所以 Netty 提供了它自己的实现 ChannelFuture,用于在执行异步操作的时候使用。 一般来说,每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuture 。
Netty 服务端
EchoServer
public class EchoServer {private static final Logger LOG = LoggerFactory.getLogger(EchoServer.class);private final int port;public EchoServer(int port) {this.port = port;}public static void main(String[] args) throws InterruptedException {int port = 9999;EchoServer echoServer = new EchoServer(port);LOG.info("服务器即将启动");echoServer.start();LOG.info("服务器关闭");}public void start() throws InterruptedException {/*线程组*/EventLoopGroup group = new NioEventLoopGroup();try {/*服务端启动必备*/ServerBootstrap b = new ServerBootstrap();b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new EchoServerHandler());}});/*异步绑定到服务器,sync()会阻塞到完成*/ChannelFuture f = b.bind().sync();LOG.info("服务器启动完成。");/*阻塞当前线程,直到服务器的ServerChannel被关闭*/f.channel().closeFuture().sync();} finally {group.shutdownGracefully().sync();}}}
服务端的业务Handler
public class EchoServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in = (ByteBuf)msg;System.out.println("server accept :" + in.toString(CharsetUtil.UTF_8));ctx.writeAndFlush(in);//ctx.close();}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("连接已建立");super.channelActive(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
基于Netty的客户端
public class EchoClient {private final int port;private final String host;public EchoClient(int port, String host) {this.port = port;this.host = host;}public void start() throws InterruptedException {/*线程组*/EventLoopGroup group = new NioEventLoopGroup();try {/*客户端启动必备,和服务器的不同点*/Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class)/*指定使用NIO的通信模式*//*指定服务器的IP地址和端口,和服务器的不同点*/.remoteAddress(new InetSocketAddress(host,port))/*和服务器的不同点*/.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new EchoClientHandler());}});/*异步连接到服务器,sync()会阻塞到完成,和服务器的不同点*/ChannelFuture f = b.connect().sync();f.channel().closeFuture().sync();/*阻塞当前线程,直到客户端的Channel被关闭*/} finally {group.shutdownGracefully().sync();}}public static void main(String[] args) throws InterruptedException {new EchoClient(9999,"127.0.0.1").start();}
}
客户端的业务Handler
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {/*读取到网络数据后进行业务处理,并关闭连接*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {System.out.println("client Accept"+msg.toString(CharsetUtil.UTF_8));//关闭连接///ctx.close();}/*channel活跃后,做业务处理*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,Netty",CharsetUtil.UTF_8));
// ctx.pipeline().write()
// ctx.channel().write()ctx.alloc().buffer();}
}
EventLoop 和 EventLoopGroup
回想一下我们在 NIO 中是如何处理我们关心的事件的?在一个 while 循环中 select 出事 件,然后依次处理每种事件。我们可以把它称为事件循环,这就是 EventLoop 。 interface io.netty.channel. EventLoop 定义了 Netty 的核心抽象,用于处理网络连接的生命周期中所发 生的事件。
io.netty.util.concurrent 包构建在 JDK 的 java.util.concurrent 包上。而 io.netty.channel 包 中类,为了与 Channel 的事件进行交互,扩展了这些接口 / 类。一个 EventLoop 将由一个 永远都不会改变的 Thread 驱动,同时任务( Runnable 或者 Callable )可以直接提交给 EventLoop 实现,以立即执行或者调度执行。
线程的分配
服务于 Channel 的 I/O 和事件的 EventLoop 包含在 EventLoopGroup 中。 异步传输实现只使用了少量的 EventLoop(以及和它们相关联的 Thread),而且在当前 的线程模型中,它们可能会被多个 Channel 所共享。这使得可以通过尽可能少量的 Thread 来 支撑大量的 Channel,而不是每个 Channel 分配一个 Thread。EventLoopGroup 负责为每个 新创建的 Channel 分配一个 EventLoop。在当前实现中,使用顺序循环(round-robin)的方 式进行分配以获取一个均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel。 一旦一个 Channel 被分配给一个 EventLoop,它将在它的整个生命周期中都使用这个 EventLoop(以及相关联的 Thread)。
需要注意, EventLoop 的分配方式对 ThreadLocal 的使用的影响。因为一个 EventLoop 通 常会被用于支撑多个 Channel ,所以对于所有相关联的 Channel 来说, ThreadLocal 都将是 一样的。这使得它对于实现状态追踪等功能来说是个糟糕的选择。然而,在一些无状态的上下文中,它仍然以被用于在多个 Channel 之间共享一些重度的或者代价昂贵的对象,甚 至是事件。
线程管理
在内部,当提交任务到如果 ( 当前)调用线程正是支撑 EventLoop 的线程,那么所提交 的代码块将会被(直接)执行。否则,EventLoop 将调度该任务以便稍后执行,并将它放入 到内部队列中。当 EventLoop 下次处理它的事件时,它会执行队列中的那些任务 / 事件。
Channel 、 EventLoop(Group) 和 ChannelFuture
Netty 网络抽象的代表:
Channel—Socket ;
EventLoop— 控制流、多线程处理、并发;
ChannelFuture— 异步通知。
Channel 和 EventLoop 关系如图:
从图上我们可以看出 Channel 需要被注册到某个 EventLoop 上,在 Channel 整个生命周 期内都由这个EventLoop 处理 IO 事件,也就是说一个 Channel 和一个 EventLoop 进行了绑定, 但是一个EventLoop 可以同时被多个 Channel 绑定。
Channel 接口
基本的 I/O 操作( bind() 、 connect() 、 read() 和 write() )依赖于底层网络传输所提供的原 语。在基于 Java 的网络编程中,其基本的构造是类 Socket 。 Netty 的 Channel 接口所提供 的 API ,被用于所有的 I/O 操作。大大地降低了直接使用 Socket 类的复杂性。此外, Channel 也是拥有许多预定义的、专门化实现的广泛类层次结构的根。 由于 Channel 是独一无二的,所以为了保证顺序将 Channel 声明为 java.lang.Comparable 的一个子接口。因此,如果两个不同的 Channel 实例都返回了相同的散列码,那么 AbstractChannel 中的 compareTo() 方法的实现将会抛出一个 Error 。
Channel 的生命周期状态
ChannelUnregistered : Channel 已经被创建,但还未注册到 EventLoop
ChannelRegistered : Channel 已经被注册到了 EventLoop
ChannelActive : Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接 收和发送数据了
ChannelInactive : Channel 没有连接到远程节点 当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler ,其可以随后对它们做出响应。在我们的编程中,关注 ChannelActive 和 ChannelInactive 会更多一些。
重要 Channel 的方法
eventLoop : 返回分配给 Channel 的 EventLoop pipeline : 返回 Channel 的 ChannelPipeline ,也就是说每个 Channel 都有自己的
ChannelPipeline 。
isActive : 如果 Channel 是活动的,则返回 true 。活动的意义可能依赖于底层的传输。 例如,一个 Socket 传输一旦连接到了远程节点便是活动的,而一个 Datagram 传输一旦被 打开便是活动的。
localAddress : 返回本地的 SokcetAddress
remoteAddress : 返回远程的 SocketAddress
write : 将数据写到远程节点,注意,这个写只是写往 Netty 内部的缓存,还没有真正 写往 socket 。
flush : 将之前已写的数据冲刷到底层 socket 进行传输。
writeAndFlush : 一个简便的方法,等同于调用 write() 并接着调用 flush()
ChannelPipeline 和 ChannelHandlerContext
ChannelPipeline 接口
当 Channel 被创建时,它将会被自动地分配一个新的 ChannelPipeline ,每个 Channel 都 有自己的 ChannelPipeline 。这项关联是永久性的。在 Netty 组件的生命周期中,这是一项固 定的操作,不需要开发人员的任何干预。
ChannelPipeline 提供了 ChannelHandler 链的容器,并定义了用于在该链上传播 入站(也 就是从网络到业务处理) 和 出站(也就是从业务处理到网络) ,各种事件流的 API ,我们 代码中的 ChannelHandler 都是放在 ChannelPipeline 中的。 使得事件流经 ChannelPipeline 是 ChannelHandler 的工作,它们是在应用程序的初始化 或者引导阶段被安装的。这些 ChannelHandler 对象接收事件、执行它们所实现的处理逻辑, 并将数据传递给链中的下一个 ChannelHandler ,而且 ChannelHandler 对象也完全可以拦截 事件不让事件继续传递。它们的执行顺序是由它们被添加的顺序所决定的
ChannelHandler 的生命周期
在 ChannelHandler 被添加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调 用下面这些方法。这些方法中的每一个都接受一个 ChannelHandlerContext 参数。
handlerAdded 当把 ChannelHandler 添加到 ChannelPipeline 中时被调用
handlerRemoved 当从 ChannelPipeline 中移除 ChannelHandler 时被调用
exceptionCaught 当处理过程中在 ChannelPipeline 中有错误产生时被调用 ChannelPipeline 中的 ChannelHandler
入站和出站 ChannelHandler 被安装到同一个 ChannelPipeline 中, ChannelPipeline 以双 向链表的形式进行维护管理。比如下图,我们在网络上传递的数据,要求加密,但是加密后 密文比较大,需要压缩后再传输,而且按照业务要求,需要检查报文中携带的用户信息是否 合法,于是我们实现了 5 个 Handler :解压(入) Handler 、压缩(出) handler 、解密(入) Handler、加密(出) Handler 、授权(入) Handler 。
如果一个消息或者任何其他的入站事件被读取,那么它会从 ChannelPipeline 的头部开 始流动,但是只被处理入站事件的 Handler 处理,也就是解压(入) Handler 、解密(入) Handler 、 授权(入) Handler ,最终,数据将会到达 ChannelPipeline 的尾端,届时,所有处理就都结
束了。
数据的出站运动(即正在被写的数据)在概念上也是一样的。在这种情况下,数据将从 链的尾端开始流动,但是只被处理出站事件的 Handler 处理,也就是加密(出) Handler 、 压缩(出)handler ,直到它到达链的头部为止。在这之后,出站数据将会到达网络传输层, 也就是我们的 Socket 。
Netty 能区分入站事件的 Handler 和出站事件的 Handler ,并确保数据只会在具有相同定 向类型的两个 ChannelHandler 之间传递。
所以在我们编写 Netty 应用程序时要注意,分属出站和入站不同的 Handler , 在业务没 特殊要求的情况下 是无所谓顺序的,正如我们下面的图所示,比如‘压缩(出) handler ‘可 以放在‘解压(入)handler ‘和‘解密(入) Handler ‘中间,也可以放在‘解密(入) Handler ‘和‘授权(入) Handler ‘之间。 而同属一个方向的 Handler 则是有顺序的,因为上一个 Handler 处理的结果往往是下一 个 Handler 的要求的输入。比如入站处理,对于收到的数据,只有先解压才能得到密文,才 能解密,只有解密后才能拿到明文中的用户信息进行授权检查,所以解压-> 解密 -> 授权这个 三个入站 Handler 的顺序就不能乱。