目录
ChannelHandlerContext接口
使用ChannelHandlerContext
ChannelHandler和ChannelHandlerContext的高级用法
异常处理
处理入站异常
处理出站异常
本文继上文《ChannelHandler和ChannelPipeline之一》,接着讲ChannelHandlerContext接口。
ChannelHandlerContext接口
ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之间的关联,每当有ChannelHandler添加到ChannelPipeline中时都会创建ChannelHandlerContext。ChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler之间的交互。
ChannelHandlerContext有很多的方法,其中一些方法也存在于Channel和ChannelPipeline本身上,但是有一点重要的不同。如果调用Channel或者ChannelPipeline上的这些方法,它们将沿着整个ChannelPipeline进行传播。而调用ChannelHandlerContext上的相同方法,则将从当前关联的ChannelHandler开始,并且只会传播给位于该ChannelPipeline中的下一个能够处理该事件的ChannelHandler。下表对ChannelHandlerContext的API进行了总结。
方法名称 | 描述 |
alloc | 返回和这个实例相关联的Channel所配置的ByteBufAllocator |
bind | 绑定到给定的SocketAddress,并返回ChannelFuture |
channel | 返回绑定到这个实例的Channel |
close | 关闭Channel,并返回ChannelFuture |
connect | 连接给定的SocketAddress,并返回ChannelFuture |
deregister | 从之前分配的EventExecutor注销,并返回ChannelFuture |
disconnect | 从远程节点断开,并返回ChannelFuture |
executor | 返回调度事件的EventExecutor |
fireChannelActive | 触发对下一个ChannelOutboundHandler上的channelActive()方法(已连接)的调用 |
fireChannelInactive | 触发对下一个ChannelOutboundHandler上的channelInactive()方法(已关闭)的调用 |
fireChannelRead | 触发对下一个ChannelOutboundHandler上的channelRead()方法(已接收的消息)的调用 |
fireChannelReadComplete | 触发对下一个ChannelOutboundHandler上的channelReadComplete()方法的调用 |
fireChannelRegistered | 触发对下一个ChannelOutboundHandler上的channelRegistered()方法的调用 |
fireChannelUnregistered | 触发对下一个ChannelOutboundHandler上的channelUnregistered()方法的调用 |
fireChannelWritabilityChanged | 触发对下一个ChannelOutboundHandler上的channelWritabilityChanged()方法的调用 |
fireExceptionCaught | 触发对下一个ChannelOutboundHandler上的channelExceptionCaught(Throwable)方法的调用 |
fireUserEventTriggered | 触发对下一个ChannelOutboundHandler上的userEventTriggered(Object)方法的调用 |
handler | 返回绑定到这个实例的ChannelHandler |
isRemoved | 如果所关联的ChannelHandler已经从所关联的ChannelPipeline中移除,则返回true |
name | 返回这个实例的唯一名称 |
pipeline | 返回这个实例所关联的ChannelPipeline |
read | 将数据从Channel读取到第一个入站缓冲区;如果读取成功则触发一个channelRead事件,并(在最后一个消息被读取完成后)通知ChannelInboundHandler的channelReadComplete(ChannelHandlerContext)方法 |
write | 通过这个实例写入消息并经过ChannelPipeline |
writeAndFlush | 通过这个实例写入消息并冲刷并经过ChannelPipeline |
当使用ChannelHandlerContext的API的时候,请牢记以下两点:
- ChannelHandlerContext和ChannelHandler之间的关联(绑定)是永远不会变的,所以缓存对它的引用是安全的;
- 相对于其他类的同名方法,ChannelHandlerContext的方法将产生更短的事件流,应该尽可能地利用这个性能获得最大的性能。
使用ChannelHandlerContext
接下来我们讨论ChannelHandlerContext的用法,以及存在于ChannelHandlerContext、Channel和ChannelPipeline上的方法的行为。下图展示了它们之间的关系:
以下代码通过ChannelHandlerContext获得Channel,调用Channel上的write方法将会导致写入事件从尾端到头部地流经ChannelPipeline。
ChannelHandlerContext ctx = ...
Channel channel = ctx.channel();
channel.write(Unpooled.copiedBuffer("Netty In Action", CharsetUtil.UTF_8));
以下代码类似,但是这一次是写入ChannelPipeline。我们再次看到,(到ChannelPipeline的)引用是通过ChannelHandlerContext获得的。
ChannelHandlerContext ctx = ...
ChannelPipeline pipeline = ctx.pipeline();
pipeline.write(Unpooled.copiedBuffer("Netty In Action", CharsetUtil.UTF_8));
虽然被调用的Channel或ChannelPipeline上的write()方法将一直传播事件通过整个ChannelPipeline,但是在ChannelHandler的级别上,事件从一个ChannelHandler到下一个ChannelHandler的移动是由ChannelHandlerContext上的调用完成的。
为什么会想要从ChannelPipeline中的某个特定点开始传播事件呢?
- 为了减少事件传经对它不感兴趣的ChannelHandler所带来的开销。
- 为了避免将事件传经那些可能会对它产生兴趣的ChannelHandler。
要想调用从某个特定的ChannelHandler开始的处理过程,必须获取到在(ChannelPipeline)该ChannelHandler之前的ChannelHandler所关联的ChannelHandlerContext。这个ChannelHandlerContext将调用和它相关联的ChannelHandler之后的ChannelHandler。
如下图所示,消息将从下一个ChannelHandler开始流经ChannelPipeline,绕过了前面所有的ChannelHandler。
以上描述的用例是常见的,对于调用特定的ChannelHandler实例上的操作尤其有用。
ChannelHandler和ChannelHandlerContext的高级用法
你可以通过调用ChannelHandlerContext的pipeline()方法来获得被封闭的ChannelPipeline的引用。这使得运行时得以操作ChannelPipeline的ChannelHandler,我们可以利用这一点实现一些复杂的设计。例如,你可以通过将ChannelHandler添加到ChannelPipeline中实现动态的协议切换。
另一种高级的用法是缓存到ChannelHandlerContext的引用以供稍后使用,这可能会发生在任何的ChannelHandler之外,甚至来自于不同的线程。
public class WriteHandler extends ChannelHandlerAdapter {private ChannelHandlerContext ctx;public void handlerAdded(ChannelHandlerContext ctx) {this.ctx = ctx;}public void send(String msg) {ctx.writeAndFlush(msg);}
}
因为一个ChannelHandler可以从属于多个ChannelPipeline,所以它也可以绑定到多个ChannelHandlerContext。对于这种用法旨在多个ChannelPipeline中共享同一个ChannelHandler,对应的ChannelHandler必须要使用@Sharable注解标注;否则,试图将它添加到多个ChannelPipeline时将会触发异常。显而易见,为了安全地被用于多个并发的Channel(即连接),这样的ChannelHandler必须是线程安全的。
public class UnsharableHandler extends ChannelInboundHandlerAdapter {private int count;public void channelRead(ChannelHandlerContext ctx, Object msg) {count++;System.out.println("channelRead(...) called the " + count + " time");ctx.fireChannelRead(msg);}
}
以上代码的时间将会导致问题,因为它拥有状态,即用于跟踪方法调用次数的实例变量count。将这个类的一个实例添加到一个ChannelPipeline将极有可能在它被多个并发的Channel访问时导致问题。(当然,这个简单的问题可以通过使channelRead()方法变位同步方法来修正。)
总之,只应该在你确认了你的ChannelHandler是线程安全后才使用@Sharable注解。
为何要共享同一个ChannelHandler
在多个ChannelPipeline中安装同一个ChannelHandler的一个常见原因是用于收集跨越多个Channel的统计信息。
异常处理
异常处理是任何真实应用程序的重要组成部分,它也可以通过多种方式来实现。因此,Netty提供了几种方式用于处理入站或者出站处理过程中所抛出的异常。
处理入站异常
如果在处理入站事件的过程中有异常被抛出,那么它将从它在ChannelInboundHandler里被触发的那一点开始流经ChannelPipeline。要想处理这种类型的入站异常,你需要在你的ChannelInboundHandler实现中重写下面的方法。
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
以下代码展示了一个简单的示例,其关闭了Channel并打印了异常的栈跟踪信息。
public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}
因为异常将会继续按照入站方向流动(就像所有的入站事件一样),所以实现了前面所示逻辑的ChannelInboundHandler通常位于ChannelPipeline的最后。这确保了所有的入站异常总是会被处理,无论它们可能发生在ChannelPipeline中的什么位置。
你应该如何响应异常,可能很大程度上取决于你的应用程序。你可能想要关闭Channel(和连接),也可能会尝试进行恢复。如果你不实现任何处理入站异常的逻辑(或者没有消费该异常),那么Netty将会记录该异常没有被处理的事实。
- ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline中的下一个ChannelHandler;
- 如果异常到达了ChannelPipeline的尾端,它将会被记录为未被处理;
- 要想定义自定义的处理逻辑,你需要重写exceptionCaught()方法。然后你需要决定是否需要将该异常传播出去。
处理出站异常
用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制。
- 每个出站操作都将返回一个ChannelFuture。注册到该ChannelFuture的ChannelFutureListener将在操作完成时被通知该操作是成功了还是出错了。
- 几乎所有的ChannelOutboundHandler上的方法都会传入一个ChannelPromise的实例。作为ChannelFuture的子类,ChannelPromse也可以被分配用于异步通知的监听器。但是,ChannelPromise还具有提供立即通知的可写方法。
ChannelPromise seSuccess();
ChannelPromise setFailure(Throwable cause);
添加ChannelFutureListener只需要调用ChannelFuture实例上的addListener(ChannelFutureListener)方法,并且有两种不同的方式可以做到这一点。其中最常用的方式是调用出站操作(如write方法)所返回的ChannelFuture上的addListener()方法。
ChannelFuture future = channel.write(msg);
future.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture f) {if (!f.isSuccess()) {f.cause().printStackTrace();f.channel().close();}}
});
第二方式是將ChannelFutureListener添加到即將作为参数传递给ChannelOutboundHandler的方法的ChannelPromise。
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {promise.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture f) {if (!f.isSuccess()) {f.cause().printStackTrace();f.channel().close();}}});}
}
ChannelPromise的可写方法
通过调用ChannelPromise上的setSuccess和setFailure方法,可以使一个操作的状态在ChannelHandler的方法返回给其调用者时便即刻被感知到。
为什么选用一种方式而不是另一种呢? 对于细致的异常处理,你可能会发现,在调用出站操作时添加ChannelFutureListener更加合适。而对于一般的异常处理,你可能会发现,第一种实现方式更加简单。
如果你的ChannelOutboundHandler本身抛出异常会发生什么呢?在这种情况下,Netty本身会通知任何已经注册到对应ChannelPromise的监听器。