1 ChannelHandlerContext
每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext,并与ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler进行交互。ChannelHandlerContext不会改变添加到其中的ChannelHandler,因此它是安全的。ChannelHandlerContext、ChannelHandler、ChannelPipeline的关系如下图:
2 Channel的声明周期
Netty有一个简单但强大的状态模型,能完美映射到ChannelInboundHandler的各个方法。如下表所示是Channel生命周期四个不同的状态。
一个Channel正常的生命周期如下图所示。随着状态发生变化产生相应的事件。这些事件被转发到ChannelPipeline中的ChannelHandler来触发相应的操作。
3 ChannelHandler常用的API
先看一个Netty中整个Handler体系的类关系图。
Netty定义了良好的类型层次结构来表示不同的处理程序类型,所有类型的父类是ChannelHandler, ChannelHandler提供了在其生命周期内添加或从ChannelPipeline中删除的方法,如下表
Netty还提供了一个实现了ChannelHandler的抽象类ChannelHandlerAdapter。 ChannelHandlerAdapter实现了父类的所有方法,主要功能就是将请求从一个ChannelHandler往下传递到下一个ChannelHandler,直到全部ChannelHandler传递完毕。也可以直接继承于ChannelHandlerAdapter,然后重写里面的方法。
4 ChannelInboundHandler
ChannelInboundHandler还提供了一些在接收数据或Channel状态改变时被调用的方法。下面是ChannelInboundHandler的一些方法。
5 异步处理Future
java.util.concurrent.Future是Java原生API中提供的接口,用来记录异步执行的状态,Future的get方法会判断任务是否执行完成,如果完成立即返回执行结果,否则阻塞线程,知道任务完成再返回。
Netty扩展了Java的Future,在Future的基础上扩展了监听器(Listener)接口,通过监听器可以让异步执行更加有效率,不需要通过调用get方法来等待异步执行结束,而是通过监听器回调来精确控制异步执行结束时间。
public interface Future<V> extends java.util.concurrent.Future<V> {boolean isSuccess();boolean isCancellable();Throwable cause();Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);Future<V> sync() throws InterruptedException;Future<V> syncUninterruptibly();Future<V> await() throws InterruptedException;Future<V> awaitUninterruptibly();boolean await(long timeout, TimeUnit unit) throws InterruptedException;boolean await(long timeoutMillis) throws InterruptedException;boolean awaitUninterruptibly(long timeout, TimeUnit unit);boolean awaitUninterruptibly(long timeoutMillis);V getNow();@Overrideboolean cancel(boolean mayInterruptIfRunning);
}
ChannelFuture接口有扩展了Netty的Future接口,表示一种没有返回值的异步调用,同时和一个Channel进行绑定。
public interface ChannelFuture extends Future<Void> {/*** Returns a channel where the I/O operation associated with this* future takes place.*/Channel channel();@OverrideChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);@OverrideChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);@OverrideChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);@OverrideChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);@OverrideChannelFuture sync() throws InterruptedException;@OverrideChannelFuture syncUninterruptibly();@OverrideChannelFuture await() throws InterruptedException;@OverrideChannelFuture awaitUninterruptibly();boolean isVoid();
}
6 异步执行Promise
Promise接口也是Future的扩展接口,它表示一种可写的Future,可以自定义设置异步执行的结果。
/*** Special {@link Future} which is writable.*/
public interface Promise<V> extends Future<V> {Promise<V> setSuccess(V result);boolean trySuccess(V result);/*** Marks this future as a failure and notifies all* listeners.** If it is success or failed already it will throw an {@link IllegalStateException}.*/Promise<V> setFailure(Throwable cause);/*** Marks this future as a failure and notifies all* listeners.** @return {@code true} if and only if successfully marked this future as* a failure. Otherwise {@code false} because this future is* already marked as either a success or a failure.*/boolean tryFailure(Throwable cause);/*** Make this future impossible to cancel.** @return {@code true} if and only if successfully marked this future as uncancellable or it is already done* without being cancelled. {@code false} if this future has been cancelled already.*/boolean setUncancellable();@OverridePromise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);@OverridePromise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);@OverridePromise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);@OverridePromise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);@OverridePromise<V> await() throws InterruptedException;@OverridePromise<V> awaitUninterruptibly();@OverridePromise<V> sync() throws InterruptedException;@OverridePromise<V> syncUninterruptibly();
}
ChannelPromise接口扩展了Promise和ChannelFuture,绑定了Channel,既可以写异步执行结果,又具备了监听者的功能,是Netty实际编程中使用的表示异步执行的接口。
public interface ChannelPromise extends ChannelFuture, Promise<Void> {@OverrideChannel channel();@OverrideChannelPromise setSuccess(Void result);ChannelPromise setSuccess();boolean trySuccess();@OverrideChannelPromise setFailure(Throwable cause);@OverrideChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);@OverrideChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);@OverrideChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);@OverrideChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);@OverrideChannelPromise sync() throws InterruptedException;@OverrideChannelPromise syncUninterruptibly();@OverrideChannelPromise await() throws InterruptedException;@OverrideChannelPromise awaitUninterruptibly();/*** Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.*/ChannelPromise unvoid();
}
DefaultChannelPromise是ChannelPromise的实现类,它是实际运行时的Promise实例。Netty使用addListener方法来回调异步执行的结果。DefaultPromise的addListener()方法的代码如下
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {checkNotNull(listener, "listener");synchronized (this) {addListener0(listener);}if (isDone()) {notifyListeners();}return this;}private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {if (listeners == null) {listeners = listener;} else if (listeners instanceof DefaultFutureListeners) {((DefaultFutureListeners) listeners).add(listener);} else {listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);}}private void notifyListeners() {EventExecutor executor = executor();if (executor.inEventLoop()) {final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();final int stackDepth = threadLocals.futureListenerStackDepth();if (stackDepth < MAX_LISTENER_STACK_DEPTH) {threadLocals.setFutureListenerStackDepth(stackDepth + 1);try {notifyListenersNow();} finally {threadLocals.setFutureListenerStackDepth(stackDepth);}return;}}safeExecute(executor, new Runnable() {@Overridepublic void run() {notifyListenersNow();}});}
从上述代码中可以看到,DefaultChannelPromise会判断异步任务执行的状态,如果执行完毕就立即通知监听者,否则加入监听者队列。通知监听者就是找一个线程来执行调用监听者的回调函数。
再来看监听者的接口,其实就是一个方法,即等待异步任务执行完毕后,获得Future结果,执行回调的逻辑,代码如下。
public interface GenericFutureListener<F extends Future<?>> extends EventListener {/*** Invoked when the operation associated with the {@link Future} has been completed.** @param future the source {@link Future} which called this callback*/void operationComplete(F future) throws Exception;
}