public interface ChannelInboundHandler extends ChannelHandler {/*** The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}*/void channelRegistered(ChannelHandlerContext ctx) throws Exception;/*** The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}*/void channelUnregistered(ChannelHandlerContext ctx) throws Exception;/*** The {@link Channel} of the {@link ChannelHandlerContext} is now active*/void channelActive(ChannelHandlerContext ctx) throws Exception;/*** The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its* end of lifetime.*/void channelInactive(ChannelHandlerContext ctx) throws Exception;/*** Invoked when the current {@link Channel} has read a message from the peer.*/void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;/*** Invoked when the last message read by the current read operation has been consumed by* {@link #channelRead(ChannelHandlerContext, Object)}. If {@link ChannelOption#AUTO_READ} is off, no further* attempt to read an inbound data from the current {@link Channel} will be made until* {@link ChannelHandlerContext#read()} is called.*/void channelReadComplete(ChannelHandlerContext ctx) throws Exception;/*** Gets called if an user event was triggered.*/void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;/*** Gets called once the writable state of a {@link Channel} changed. You can check the state with* {@link Channel#isWritable()}.*/void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;/*** Gets called if a {@link Throwable} was thrown.*/@Override@SuppressWarnings("deprecation")void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
public abstract class ChannelHandlerAdapter implements ChannelHandler {// Not using volatile because it's used only for a sanity check.boolean added;/*** Throws {@link IllegalStateException} if {@link ChannelHandlerAdapter#isSharable()} returns {@code true}*/protected void ensureNotSharable() {if (isSharable()) {throw new IllegalStateException("ChannelHandler " + getClass().getName() + " is not allowed to be shared");}}/*** Return {@code true} if the implementation is {@link Sharable} and so can be added* to different {@link ChannelPipeline}s.*/public boolean isSharable() {/*** Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a* {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different* {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of* {@link Thread}s are quite limited anyway.** See <a href="">#2289</a>.*/Class<?> clazz = getClass();Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();Boolean sharable = cache.get(clazz);if (sharable == null) {sharable = clazz.isAnnotationPresent(Sharable.class);cache.put(clazz, sharable);}return sharable;}/*** Do nothing by default, sub-classes may override this method.*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {// NOOP}/*** Do nothing by default, sub-classes may override this method.*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {// NOOP}/*** Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.** Sub-classes may override this method to change behavior.** @deprecated is part of {@link ChannelInboundHandler}*/@Skip@Override@Deprecatedpublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.fireExceptionCaught(cause);}
ChannelHandlerAdapter 是一个抽象类,实现接口ChannelHandler。exceptionCaught方法被实现。
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {/*** Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/@Skip@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelRegistered();}/*** Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/@Skip@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelUnregistered();}/*** Calls {@link ChannelHandlerContext#fireChannelActive()} to forward* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/@Skip@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();}/*** Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/@Skip@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelInactive();}/*** Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/@Skip@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.fireChannelRead(msg);}/*** Calls {@link ChannelHandlerContext#fireChannelReadComplete()} to forward* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/@Skip@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelReadComplete();}/*** Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/@Skip@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {ctx.fireUserEventTriggered(evt);}/*** Calls {@link ChannelHandlerContext#fireChannelWritabilityChanged()} to forward* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/@Skip@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelWritabilityChanged();}/*** Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/@Skip@Override@SuppressWarnings("deprecation")public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {ctx.fireExceptionCaught(cause);}
ChannelInboundHandlerAdapter 是入站的一个落地实现。继承ChannelHandlerAdapter 实现接口ChannelInboundHandler 。默认反应方法实现调用ChannelHandlerContext中的事件发布。
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {/*** Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.*/Channel channel();/*** Returns the {@link EventExecutor} which is used to execute an arbitrary task.*/EventExecutor executor();/*** The unique name of the {@link ChannelHandlerContext}.The name was used when then {@link ChannelHandler}* was added to the {@link ChannelPipeline}. This name can also be used to access the registered* {@link ChannelHandler} from the {@link ChannelPipeline}.*/String name();/*** The {@link ChannelHandler} that is bound this {@link ChannelHandlerContext}.*/ChannelHandler handler();/*** Return {@code true} if the {@link ChannelHandler} which belongs to this context was removed* from the {@link ChannelPipeline}. Note that this method is only meant to be called from with in the* {@link EventLoop}.*/boolean isRemoved();@OverrideChannelHandlerContext fireChannelRegistered();@OverrideChannelHandlerContext fireChannelUnregistered();@OverrideChannelHandlerContext fireChannelActive();@OverrideChannelHandlerContext fireChannelInactive();@OverrideChannelHandlerContext fireExceptionCaught(Throwable cause);@OverrideChannelHandlerContext fireUserEventTriggered(Object evt);@OverrideChannelHandlerContext fireChannelRead(Object msg);@OverrideChannelHandlerContext fireChannelReadComplete();@OverrideChannelHandlerContext fireChannelWritabilityChanged();@OverrideChannelHandlerContext read();@OverrideChannelHandlerContext flush();/*** Return the assigned {@link ChannelPipeline}*/ChannelPipeline pipeline();/*** Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.*/ByteBufAllocator alloc();/*** @deprecated Use {@link Channel#attr(AttributeKey)}*/@Deprecated@Override<T> Attribute<T> attr(AttributeKey<T> key);/*** @deprecated Use {@link Channel#hasAttr(AttributeKey)}*/@Deprecated@Override<T> boolean hasAttr(AttributeKey<T> key);
ChannelHandlerContext 继承ChannelInboundInvoker,ChannelOutboundInvoker。可以实现入站,出站事件的处理。
public interface ChannelInboundInvoker {/*** A {@link Channel} was registered to its {@link EventLoop}.** This will result in having the {@link ChannelInboundHandler#channelRegistered(ChannelHandlerContext)} method* called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelInboundInvoker fireChannelRegistered();/*** A {@link Channel} was unregistered from its {@link EventLoop}.** This will result in having the {@link ChannelInboundHandler#channelUnregistered(ChannelHandlerContext)} method* called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelInboundInvoker fireChannelUnregistered();/*** A {@link Channel} is active now, which means it is connected.** This will result in having the {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)} method* called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelInboundInvoker fireChannelActive();/*** A {@link Channel} is inactive now, which means it is closed.** This will result in having the {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)} method* called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelInboundInvoker fireChannelInactive();/*** A {@link Channel} received an {@link Throwable} in one of its inbound operations.** This will result in having the {@link ChannelInboundHandler#exceptionCaught(ChannelHandlerContext, Throwable)}* method called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelInboundInvoker fireExceptionCaught(Throwable cause);/*** A {@link Channel} received an user defined event.** This will result in having the {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}* method called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelInboundInvoker fireUserEventTriggered(Object event);/*** A {@link Channel} received a message.** This will result in having the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}* method called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelInboundInvoker fireChannelRead(Object msg);/*** Triggers an {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)}* event to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.*/ChannelInboundInvoker fireChannelReadComplete();/*** Triggers an {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)}* event to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.*/ChannelInboundInvoker fireChannelWritabilityChanged();
public interface ChannelOutboundInvoker {/*** Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation* completes, either because the operation was successful or because of an error.* <p>* This will result in having the* {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)} method* called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelFuture bind(SocketAddress localAddress);/*** Request to connect to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation* completes, either because the operation was successful or because of an error.* <p>* If the connection fails because of a connection timeout, the {@link ChannelFuture} will get failed with* a {@link ConnectTimeoutException}. If it fails because of connection refused a {@link ConnectException}* will be used.* <p>* This will result in having the* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelFuture connect(SocketAddress remoteAddress);/*** Request to connect to the given {@link SocketAddress} while bind to the localAddress and notify the* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of* an error.* <p>* This will result in having the* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);/*** Request to disconnect from the remote peer and notify the {@link ChannelFuture} once the operation completes,* either because the operation was successful or because of an error.* <p>* This will result in having the* {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)}* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelFuture disconnect();/*** Request to close the {@link Channel} and notify the {@link ChannelFuture} once the operation completes,* either because the operation was successful or because of* an error.** After it is closed it is not possible to reuse it again.* <p>* This will result in having the* {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)}* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelFuture close();/*** Request to deregister from the previous assigned {@link EventExecutor} and notify the* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of* an error.* <p>* This will result in having the* {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)}* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.**/ChannelFuture deregister();/*** Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation* completes, either because the operation was successful or because of an error.** The given {@link ChannelPromise} will be notified.* <p>* This will result in having the* {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)} method* called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);/*** Request to connect to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation* completes, either because the operation was successful or because of an error.** The given {@link ChannelFuture} will be notified.** <p>* If the connection fails because of a connection timeout, the {@link ChannelFuture} will get failed with* a {@link ConnectTimeoutException}. If it fails because of connection refused a {@link ConnectException}* will be used.* <p>* This will result in having the* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);/*** Request to connect to the given {@link SocketAddress} while bind to the localAddress and notify the* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of* an error.** The given {@link ChannelPromise} will be notified and also returned.* <p>* This will result in having the* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);/*** Request to disconnect from the remote peer and notify the {@link ChannelFuture} once the operation completes,* either because the operation was successful or because of an error.** The given {@link ChannelPromise} will be notified.* <p>* This will result in having the* {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)}* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelFuture disconnect(ChannelPromise promise);/*** Request to close the {@link Channel} and notify the {@link ChannelFuture} once the operation completes,* either because the operation was successful or because of* an error.** After it is closed it is not possible to reuse it again.* The given {@link ChannelPromise} will be notified.* <p>* This will result in having the* {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)}* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelFuture close(ChannelPromise promise);/*** Request to deregister from the previous assigned {@link EventExecutor} and notify the* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of* an error.** The given {@link ChannelPromise} will be notified.* <p>* This will result in having the* {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)}* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelFuture deregister(ChannelPromise promise);/*** Request to Read data from the {@link Channel} into the first inbound buffer, triggers an* {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} event if data was* read, and triggers a* {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext) channelReadComplete} event so the* handler can decide to continue reading. If there's a pending read operation already, this method does nothing.* <p>* This will result in having the* {@link ChannelOutboundHandler#read(ChannelHandlerContext)}* method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the* {@link Channel}.*/ChannelOutboundInvoker read();/*** Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}.* This method will not request to actual flush, so be sure to call {@link #flush()}* once you want to request to flush all pending data to the actual transport.*/ChannelFuture write(Object msg);/*** Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}.* This method will not request to actual flush, so be sure to call {@link #flush()}* once you want to request to flush all pending data to the actual transport.*/ChannelFuture write(Object msg, ChannelPromise promise);/*** Request to flush all pending messages via this ChannelOutboundInvoker.*/ChannelOutboundInvoker flush();/*** Shortcut for call {@link #write(Object, ChannelPromise)} and {@link #flush()}.*/ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);/*** Shortcut for call {@link #write(Object)} and {@link #flush()}.*/ChannelFuture writeAndFlush(Object msg);/*** Return a new {@link ChannelPromise}.*/ChannelPromise newPromise();/*** Return an new {@link ChannelProgressivePromise}*/ChannelProgressivePromise newProgressivePromise();/*** Create a new {@link ChannelFuture} which is marked as succeeded already. So {@link ChannelFuture#isSuccess()}* will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also* every call of blocking methods will just return without blocking.*/ChannelFuture newSucceededFuture();/*** Create a new {@link ChannelFuture} which is marked as failed already. So {@link ChannelFuture#isSuccess()}* will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also* every call of blocking methods will just return without blocking.*/ChannelFuture newFailedFuture(Throwable cause);/*** Return a special ChannelPromise which can be reused for different operations.* <p>* It's only supported to use* it for {@link ChannelOutboundInvoker#write(Object, ChannelPromise)}.* </p>* <p>* Be aware that the returned {@link ChannelPromise} will not support most operations and should only be used* if you want to save an object allocation for every write operation. You will not be able to detect if the* operation was complete, only if it failed as the implementation will call* {@link ChannelPipeline#fireExceptionCaught(Throwable)} in this case.* </p>* <strong>Be aware this is an expert feature and should be used with care!</strong>*/ChannelPromise voidPromise();
1. fire开头方法,实现也是调用invoke开头方法。只不过入参的channelHandlerContext通过方法findContextInbound查询下一个入站或者出站的方法。例如FireChannelRead方法:
@Overridepublic ChannelHandlerContext fireChannelRead(final Object msg) {invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);return this;}private AbstractChannelHandlerContext findContextInbound(int mask) {AbstractChannelHandlerContext ctx = this;EventExecutor currentExecutor = executor();do {ctx =;} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));return ctx;}
2. invoke开头方法是静态方法,执行对应事件的方法。里面有个逻辑,获取当前context中的EventExecutor是不是NioEventLoop线程,是的话直接执行方法调用,不是的话,添加到NioEventLoop任务队列中。例如invokeChannelRead方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}}
1. handlerAdded: 当一个新的ChannelHandler被添加到ChannelPipeline时调用。
2. channelRegistered: 当Channel成功注册到EventLoop上时调用。
3. channelActive: 当Channel激活,可以开始接收和发送数据时调用。
4. channelRead: 当从Channel中读取到数据时调用。
5. channelReadComplete: 当一次读取操作完成时调用。
6. channelInactive: 当Channel变为非激活状态时调用。
7. channelUnregistered: 当Channel从EventLoop上注销时调用。
8. handlerRemoved: 当ChannelHandler从ChannelPipeline中移除时调用