目录
ChannelHandler
Channel的生命周期
ChannelHandler的生命周期
ChannelInboundHandler接口
ChannelOutboundHandler接口
ChannelHandler适配器
资源管理
ChannelPipeline接口
修改ChannelPipeline
触发事件
ChannelHandler
Channel的生命周期
Channel定义了一组和ChannelInboundHandler API密切相关的简单但功能强大的状态模型,如下:
状态 | 描述 |
ChannelUnregistered | channel已经被创建,但还未注册到EventLoop |
ChannelRegistered | channel已经被注册到EventLoop |
ChannelActive | channel处于活跃状态(已经连接到它的远程节点),它现在可以接收和发送数据了 |
ChannelInactive | channel没有连接到远程节点 |
当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline中的ChannelHandler,其可以随后对它们作出响应。
ChannelHandler的生命周期
下表列出了ChannelHandler定义的生命周期操作,在ChannelHandler被添加到ChannelPipeline中或者从ChannelPipeline中移除时会调用这些操作。这些方法都接受一个ChannelHandlerContext参数。
类型 | 描述 |
handlerAdd | 当把ChannelHandler添加到ChannelPipeline中时被调用 |
handlerRemoved | 当把ChannelHandler从ChannelPipeline中移除时被调用 |
exceptionCaught | 当处理过程中在ChannelPipeline中有错误产生时被调用 |
Netty定义了下面两个重要的ChannelHandler子接口:
- ChannelInboundHandler——处理入站数据以及各种状态变化
- ChannelOutboundHandler——处理出站数据并且允许拦截所有操作
ChannelInboundHandler接口
下表列出了ChannelInboundHandler的生命周期方法。这些方法将会在数据被接收时或者与其对应的Channel状态发生变更时被调用。这些方法和Channel的生命周期状态密切相关。
类型 | 描述 |
channelRegistered | 当Channel已经注册到它的EventLoop并且能够处理IO时被调用 |
channelUnregistered | 当Channel从它的EventLoop注销并且无法处理任何IO时被调用 |
channelActive | 当Channel处于活动状态时被调用 |
channelInactive | 当Channel离开活动状态并且不再连接它的远程节点时被调用 |
channelReadComplete | 当Channel上的一个读操作完成时被调用 |
channelRead | 当从Channel读取数据时被调用 |
channelWritabilityChanged | 当Channel的可写状态发生变化时被调用。用户可以确保写操作不会发生地太快(以避免发生OutOfMemoryError)或者在Channel变得再次可写时恢复写入。可以调用Channel的isWritable()方法来检测Channel的可写性。与可写性相关的阈值可以通过Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()来设置。 |
userEventTriggered | 当ChannelInboundHandler.fireUserEventTriggered()方法被调用时被调用,因为一个POJO被传经了ChannelPipeline。 |
当某个ChannelInboundHandler的实现重写channelRead()方法时,它将负责显式地释放与池化的ByteBuf实例的内存。Netty为此提供了一个实用方法ReferenceCountUtil.release()。
@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {public void channelRead(ChannelHandlerContext ctx, Object msg) {ReferenceCountUtil.release(msg);}
}
Netty将使用WARN级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例。但是以这种方式管理资源可能很繁琐,一个更简单的方式是使用SimpleChannelInboundHandler。
@Sharable
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {public void channelRead0(ChannelHandlerContext ctx, Object msg) {//不需要显式释放资源}
}
由于SimpleChannelInboundHandler会自动释放资源,所以你不应该存储任何消息的引用供将来使用,因为这些引用都将会失效。
ChannelOutboundHandler接口
出站操作和数据将由ChannelOutboundHandler处理。它的方法将被Channel、ChannelPipeline和ChannelHandlerContext调用。
ChannelOutboundHandler的一个强大功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷操作并在稍后继续。下表显示了所有由ChannelOutboundHandler本身所定义的方法(忽略了那些从ChannelHandler继承的方法)。
类型 | 描述 |
bind(ChannelHandlerContext,SocketAddress,ChannelPromise) | 当请求将Channel绑定到本地地址时被调用 |
connect(ChannelHandlerContext, SocketAddress,SocketAddress,ChannelPromise) | 当请求将Channel连接到远程节点时被调用 |
disconnect(ChannelHandlerContext,ChannelPromise) | 但请求将Channel从远程节点断开时被调用 |
close(ChannelHandlerContext,ChannelPromise) | 当请求关闭Channel时被调用 |
deregister(ChannelHandlerContext,ChannelPromise) | 当请求将Channel从它的EventLoop注销时被调用 |
read(ChannelHandlerContext) | 当请求从Channel读取更多的数据时被调用 |
flush(ChannelHandlerContext) | 当请求通过Channel将入队数据冲刷到远程节点时被调用 |
write(ChannelHandlerContext,Object,ChannelPromise) | 当请求通过Channel将数据写到远程节点时被调用 |
ChannelPromise与ChannelFuture ChannelOutboundHandler中的大部分方法都需要一个ChannelPromise参数,以便在操作完成时得到通知。ChannelPromise是ChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()和setFailure(),从而使ChannelFuture不
可变。
接下来我们看一些简化了编写ChannelHandler的任务的类。
ChannelHandler适配器
你可以使用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter作为自己实现ChannelHandler的起点,这两个适配器分别提供了ChannelInboundHandler和ChannelOutboundHandler的基本实现。通过扩展抽象类ChannelHandlerAdapter,它们获得了它们共同的超接口ChannelHandler的方法。生成的类层次结构如下图所示:
ChannelHandlerAdapter还提供了实用方法isSharable()。如果其对应的实现被标注为@Sharable,那么这个方法将返回true,表示它可以被添加到多个ChannelPipeline中。
你想要在自己的ChannelHandler中使用这些适配器类,只需要简单地扩展它们,并且重写那些你想要自定义的方法。
资源管理
每当通过ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()方法来处理数据时,你都需要确保没有任何的资源泄露。Netty使用引用计数来处理池化的ByteBuf,所以在完全使用完某个ByteBuf后,调整其引用计数是很重要的。
为了帮助你诊断潜在的(资源泄漏)问题,Netty提供了class ResourceLeakDetector,它将对你应用程序的缓冲区分配做大约1%的采样来检测内存泄漏,相关的开销是非常小的。
如果检测到了内存泄漏,将会产生类似于下面的日志消息:
LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable
advanced leak reporting to find out where the leak occurred. To enable
advanced leak reporting, specify the JVM option'-Dio.netty.leakDetectionLevel=ADVANCED' or call ResourceLeakDetector.setLevel().
Netty目前定义了四种泄漏检测级别,如下表所示:
级别 | 描述 |
DISABLED | 禁用泄漏检测,只有在详尽的测试之后才应设置为该值。 |
SIMPLE | 使用1%的默认采用率检测并报告任何发现的泄漏。这是默认级别,适用于绝大部分的情况。 |
ADVANCED | 使用默认的采用率,报告所发现的任何泄漏以及对应的消息被访问的位置。 |
PARANOID | 类似于ADVANCED,但是其将会对每次(对消息的)访问都进行采样。这对性能将会有很大的影响,应该只在调试阶段使用。 |
泄漏级别可以通过将下面的java系统属性设置为表中的一个值来设定。
java -Dio.netty.leakDetectionLevel=ADVANCED
如果带着该JVM选项重新启动你的应用程序,你将看到自己的应用程序最近被泄漏的缓冲区被访问的位置。下面是一个典型的由单元测试产生的泄漏报告。
Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK:
ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 1
#1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString(
AdvancedLeakAwareByteBuf.java:697)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(
XmlFrameDecoderTest.java:157)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(
XmlFrameDecoderTest.java:133)
实现ChannelInboundHandler.channelRead()和ChannelOutboundHandler.write()方法时,应用如何使用这个诊断工具来防止泄漏呢?让我们看看你的channelRead()操作直接消费入站消息的情况:也就是说,它不会通过调用ChannelHandlerContext.fireChannelRead()方法将入站消息转发给下一个ChannelInboundHandler。
@Sharable
public class DiscardInboundHandler extends ChannelInboundHandlerAdapter {public void channelRead(ChannelHandlerContext ctx, Object msg) {ReferenceCountUtil.release(msg);}
}
消费入站消息的简单方式 由于消费入站数据是一项常规任务,所以Netty提供了一个特殊的被称为SimpleChannelInboundHandler的ChannelInboundHandler实现。这个实现会在消息被channelRead0()方法消费之后自动释放消息。
在出站方向这边,如果你处理了write()操作并丢弃了一个消息,那么你也应该负责释放它。
@Sharable
public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter {public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {ReferenceCountUtil.release(msg);promise.setSuccess();}
}
重要的是,不仅要释放资源,还要通知ChannelPromise。否则可能会出现ChannelFutureListener收不到某个消息已经被处理了的通知的情况。
总之,如果一个消息被消费或者丢弃了,并且没有传递给ChannelPipeline中的下一个ChannelOutboundHandler,那么用户就有责任调用ReferenceCountUtil.release()。如果消息到达了实际的传输层,那么当它被写入时或者Channel关闭时,都将被自动释放。
ChannelPipeline接口
如果你认为ChannelPipeline是一个拦截流经Channel的入站和出站事件的ChannelHandler实例链,那么就很容易看出这些ChannelHandler之间的交互是如何组成一个应用程序数据和事件处理逻辑的核心的。
每一个新创建的Channel都将会被分配一个新的ChannelPipeline。这项关联是永久性的;Channel既不能附加另一个ChannelPipeline,也不能分离其当前的。在Netty组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。
根据事件的起源,事件将会被ChannelInboundHandler或者ChannelOutboundHandler处理。随后通过调用ChannelHandlerContext实现,它将被转发给同一超类型的下一个ChannelHandler,甚至可以动态修改它所属的ChannelPipeline。ChannelHandlerContext具有丰富的用于处理事件和执行IO操作的API。
下图展示了一个典型的同时具有出站和入站ChannelHandler的ChannelPipeline布局,并且印证了我们之前的关于ChannelPipeline主要由一系列的ChannelHandler所组成的说法。ChannelPipeline还提供了通过ChannelPipeline本身传播事件的方法。如果一个入站事件被触发,它将被从ChannelPipeline的头部开始一直被传播到ChannelPipeline的尾端。
你可能会说,从事件途经ChannelPipeline的角度来看,ChannelPipeline的头部和尾部取决于该事件是入站的还是出站的。然而Netty总是将ChannelPipeline的入站口(左端)作为头部,而将出站口 (右端)作为尾部。
当你完成了通过调用ChannelPipeline.add*()方法将入站处理器(ChannelInboundHandler)和出站处理器(ChannelOutboundHandler)混合添加到ChannelPipeline之后,每一个ChannelHandler从头部到尾部的顺序位置正如方才我们所定义它们的一样。因此,如果你将上图中的处理器从左到右进行编号,那么第一个被入站事件看到的ChannelHandler是1,第一个被出站事件看到的ChannelHandler是5。
在ChannelPipeline传播事件时,它会测试ChannelPipeline中的下一个ChannelHandler的类型是否和事件的方向一致。如果不一致ChannelPipeline将跳过该ChannelHandler并前进到下一个,直到它找到和该事件所期望的方向一致的为止。(当然,ChannelHandler可以同时实现ChannelInboundHandler和ChannelOutboundHandler)。
修改ChannelPipeline
ChannelHandler可以通过添加、删除或者替换其他的ChannelHandler来实时地修改ChannelPipeline的布局。这是ChannelHandler最重要的能力之一,下表罗列了这些方法。
名称 | 描述 |
addFirst addBefore addAfter addLast | 将一个ChannelHandler添加到ChannelPipeline中 |
remove | 将一个ChannelHandler从ChannelPipeline中移除 |
replace | 将ChannelPipeline中的一个ChannelHandler替换为另一个ChannelHandler |
ChannelHandler的执行和阻塞
通常ChannelPipeline中的每一个ChannelHandler都是通过它的EventLoop(IO线程)来处理传递给它的事件的。所以至关重要的是不要阻塞这个线程,因为这会对整体的IO处理产生负面的影响。
但有时可能需要与那些使用阻塞API的遗留代码进行交互。对于这种情况,ChannelPipeline有一些接受一个EventExecutorGroup的add()方法。如果一个事件被传递给一个自定义的EventExecutorGroup,它将被包含在这个EventExecutorGroup中的某个EventExecutor所处理,从而被从该Channel本身的EventLoop中移除。对于这种用例,Netty提供了一个叫做DefaultEventExecutorGroup的默认实现。
除了这些操作,还有别的通过类型或名称来访问ChannelHandler的方法,如下表。
名称 | 描述 |
get | 通过类型或者名称返回ChannelHandler |
context | 返回和ChannelHandler绑定的ChannelHandlerContext |
names | 返回ChannelPipeline中所有的ChannelHandler名称 |
触发事件
ChannelPipeline的API公开了用于调用入站和出站操作的附加方法。下表列出了入站操作,用于通知ChannelInboundHandler在ChannelPipeline中发生的事件。
方法名称 | 描述 |
fireChannelRegistered | 调用ChannelPipeline中下一个ChannelInboundHandler的channelRegistered(ChannelHandlerContext)方法 |
fireChannelUnregistered | 调用ChannelPipeline中下一个ChannelInboundHandler的channelUnregistered(ChannelHandlerContext)方法 |
fireChannelActive | 调用ChannelPipeline中下一个channelActive(ChannelHandlerContext)方法 |
fireChannelInactive | 调用ChannelPipeline中下一个channelInactive(ChannelHandlerContext)方法 |
fireExceptionCaught | 调用ChannelPipeline中下一个exceptionCaught(ChannelHandlerContext, Throwable)方法 |
fireUserEventTriggered | 调用ChannelPipeline中下一个userEventTriggered(ChannelHandlerContext, Object)方法 |
fireChannelRead | 调用ChannelPipeline中下一个channelRead(ChannelHandlerContext, Object)方法 |
fireChannelReadComplete | 调用ChannelPipeline中下一个channelReadComplete(ChannelHandlerContext)方法 |
fireChannelWritabilityChanged | 调用ChannelPipeline中下一个channelWritabilityChanged(ChannelHandlerContext)方法 |
在出站这边,处理事件将会导致底层的套接字上发生一系列的动作。下表列出了ChannelPipeline的出站操作。
方法名称 | 描述 |
bind | 将Channel绑定到一个本地地址,这将调用ChannelPipeline中下一个ChannelOutboundHandler的bind(ChannelHandlerContext, SocketAddress, ChannelPromise)方法 |
connect | 将Channel连接到一个远程地址,这将调用ChannelPipeline中下一个ChannelOutboundHandler的connect(ChannelHandlerContext, SocketAddress, ChannelPromise)方法 |
disconnect | 将Channel断开连接,这将调用ChannelPipeline中下一个ChannelOutboundHandler的disconnect(ChannelHandlerContext, ChannelPromise)方法 |
close | 将Channel关闭,这将调用ChannelPipeline中下一个ChannelOutboundHandler的connect(ChannelHandlerContext, ChannelPromise)方法 |
deregister | 将Channel从它先前所分配的EventExecutor(即EventLoop)中注销,这将调用ChannelPipeline中下一个ChannelOutboundHandler的deregister(ChannelHandlerContext, ChannelPromise)方法 |
flush | 冲刷Channel所有挂起的写入,这将调用ChannelPipeline中下一个ChannelOutboundHandler的flush(ChannelHandlerContext)方法 |
write | 将消息写入Channel,这将调用ChannelPipeline中下一个ChannelOutboundHandler的write(ChannelHandlerContext, Object, ChannelPromise)方法。注意,这并不会将消息写入底层的Socket,而只会将它放入队列中。要将它写入Socket,需要调用flush或者writeAndFlush方法。 |
writeAndFlush | 这是一个先调用write然后调用flush的便利方法 |
read | 请求从Channel读取更多的数据。这将调用ChannelPipeline下一个ChannelOutboundHandler的read(ChannelHandlerContext)方法 |