1 Pipeline设计原理
在Netty中每个Channel都有且仅有一个ChannelPipeline与之对应,它们的组成关系如下图:
通过上图可以看到,一个Channel包含了一个ChannelPipeline,而ChannelPipeline中又维护了一个由ChannelHandlerContext组成的双向链表。这个链表的头是HeadContext,链表的尾是TailContext,并且每个ChannelHandlerContext又关联着一个ChannelHandler。
通过分析代码,已经知道了一个Channel初始化的基本过程,下面在回顾一下。AbstractChannel构造器的代码如下:
protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}
AbstractChannel有一个pipeline属性,在构造器中会把它初始化为DefaultChannelPipeline的实例。这里的代码就印证了这一点:每个Channel都有一个ChannelPipeline。来看一下DefaultChannelPipeline的构造器,代码如下:
protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise = new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}
在DefaultChannelPipeline构造器中,首先将与之关联的Channel保存到属性channel中。然后实例化两个ChannelHandlerContext:一个是HeadContext实例Head,另一个是TailContext实例Tail。接着将Head和Tail互相指向,构成一个双向链表。
特别注意的是:在开始的示意图中,Head和Tail并没有包含ChannelHandler,这是因为HeadContext和TailContext继承于AbstractChannelHandlerContext的同时,也实现了ChannelHandler接口,所以它们有Context和Handler的双重属性。
2 ChannelPipeline初始化
下面看一下ChannelPipeline的初始化具体做了哪些工作。先回顾一下,在实例化一个Channel时,会伴随着一个ChannelPipeline的实例化,并且此Channel会与这个ChannelPipeline相互关联,这一点可以通过NioEventLoop的父类AbstractChannel的构造器予以佐证,代码如下:
protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}
当实例化一个NioSocketChannel时,其pipeline属性就是新创建的DefaultChannelPipeline对象,再来回顾一下DefaultChannelPipeline的构造方法,代码如下:
protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise = new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}
上面代码中的Head实现了ChannelInboundHandler接口,而Tail实现了ChannelOutboundHandler接口,因此可以说Head和Tail就是ChannelHandler,又是ChannelHandlerContext。
3 ChannelInitializer的添加
前面分析过Channel的组成,最开始的时候ChannelPipeline中含有两个ChannelHandlerContext,但是此时的Pipeline并不能实现特定的功能,因为还没有添加自定义的ChannelHandler。通常来说,在初始化Bootstrap时,会添加自定义的ChannelHandler,下面就以具体的客户端启动代码片段举例:
Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE,true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());System.out.println("初始化channel:" + socketChannel);}});
在调用Handler时,传入ChannelInitializer对象,它提供了一个initChannel方法来初始化ChannelHandler。通过代码跟踪,发现ChannelInitializer是在Bootstrap的init方法中添加到ChannelPipiline中的,代码如下:
void init(Channel channel) throws Exception {ChannelPipeline p = channel.pipeline();p.addLast(new ChannelHandler[]{this.config.handler()});Map<ChannelOption<?>, Object> options = this.options0();synchronized(options) {Iterator i$ = options.entrySet().iterator();while(true) {if (!i$.hasNext()) {break;}Entry e = (Entry)i$.next();try {if (!channel.config().setOption((ChannelOption)e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable var10) {logger.warn("Failed to set a channel option: " + channel, var10);}}}Map<AttributeKey<?>, Object> attrs = this.attrs0();synchronized(attrs) {Iterator i$ = attrs.entrySet().iterator();while(i$.hasNext()) {Entry<AttributeKey<?>, Object> e = (Entry)i$.next();channel.attr((AttributeKey)e.getKey()).set(e.getValue());}}}
从上面的代码可见,将handler()方法返回的ChannelHandler添加到Pipeline中,而handler()方法返回的其实就是在初始化Bootstrap时通过handler方法设置的ChannelInitializer实例,因此这里就将ChannelInitializer插到了Pipieline的末端。此时Pipeline的结构如下图所示:
这是会有一个疑问,明明插入的是 ChannelInitializer实例,为什么在ChannelPipeline的双向链表中的元素却是一个ChannelHandlerContext呢?继续看源码,在Bootstrap的init方法中会调用p.addList()方法,将ChannelInitializer插入链表的末端,代码如下:
@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {checkMultiplicity(handler);newCtx = newContext(group, filterName(name, handler), handler);addLast0(newCtx);// If the registered is false it means that the channel was not registered on an eventloop yet.// In this case we add the context to the pipeline and add a task that will call// ChannelHandler.handlerAdded(...) once the channel is registered.if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {newCtx.setAddPending();executor.execute(new Runnable() {@Overridepublic void run() {callHandlerAdded0(newCtx);}});return this;}}callHandlerAdded0(newCtx);return this;}
addList方法有很多重载的方法,只需要关注这个方法即可。上面的addList方法中,首先检查ChannelHandler的名字是否重复,如果不重复,则调用newContext方法为这个Handler创建一个对应的DefaultChannelHandlerContext实例,并与之关联起来。
为了添加一个Handler到Pipeline中,必须把此Handler包装成ChannelHandlerContext。因此在上面的代码中,我们新实例化一个newCx对象,并将Handler作为参数传递到构造方法中。下面来看一下DefaultChannelHandlerContext的构造器。
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {super(pipeline, executor, name, isInbound(handler), isOutbound(handler));if (handler == null) {throw new NullPointerException("handler");}this.handler = handler;}
在DefaultChannelHandlerContext的构造器中,调用了isInbound()方法和isOutbound()方法,这两个方法的代码如下:
private static boolean isInbound(ChannelHandler handler) {return handler instanceof ChannelInboundHandler;}private static boolean isOutbound(ChannelHandler handler) {return handler instanceof ChannelOutboundHandler;}
从上面代码中可以看到,当一个Handler实现了ChannelInboundHandler接口,则isInbound返回true;类似的,当一个Handler实现了ChannelOutboundHandler接口,则isOuntbound返回true。而这两个boolean类型变量会传递给父类AbstractChannelHandlerContext中,并初始化父类的两个属性:inbound和outbound。
这里的ChannelInitializer所对应的DefaultChannelHandlerContext的inbound与outbound属性分别是什么呢?先来看ChannelInitializer的类层次结构图,如下图:
可以看到,ChannelInitializer仅仅实现了ChannelInboundHandler接口,因此这里实例化的DefaultChannelHandlerContext的inbound是true,outbound是false。
inbound和outbound这两个属性关系到Pipeline事件的流向与分类,因此十分关键。这里先记住一个结论:ChannelInitializer所对应的DefaultChannelHandlerContext的inbound=true,outbound=false。
当创建好Context之后,就将这个Context插入Pipeline的双向链表中。
DefaultChannelPipeline.java
private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;}
添加完ChannelInitializer的Pipeline内部如下图所示
4 自定义ChannelHandler的添加过程
上面分析了ChannelInitializer是如何插入Pipeline中的,接下来探讨ChannelInitializer在哪里被调用、ChannelInitializer的作用以及自定义的ChannelHandler是如何插入Pipeline中的。
自定义ChannelHandler的添加过程,发生在AbstractUnsafe的register方法中,在这个方法中调用了pipeline.fireChannelRegister()方法,代码如下:
@Overridepublic final ChannelPipeline fireChannelRegistered() {AbstractChannelHandlerContext.invokeChannelRegistered(head);return this;}
再看AbstractChannelHandlerContext的invokeChannelRegister()方法。
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRegistered();} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRegistered();}});}}
很显然,这个代码将从Head开始遍历Pipeline的双向链表,然后找到第一个属性inbound为true的ChannelHandlerContext实例。在分析ChannelInitializer时,专门分析了inbound和outbound属性,现在这里就用上了。回想一下,ChannelInitializer实现了ChannelInboundHandler,因此它所对应的ChannelHandlerContext的inbound属性为true,因此这里返回的就是ChannelInitializer实例所对应的ChannelHandlerContext对象,如下图所示
当获取inbound的Context后,就调用它的invokeChannelRegistered()方法。
private void invokeChannelRegistered() {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRegistered(this);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRegistered();}}
我们已经知道,每个ChannelHandler都和一个ChannelHandlerContext关联,可以通过ChannelHandlerContext获取对应的ChannelHandler。很明显,这里handler()方法返回的对象其实就是一开始实例化的ChannelInitializer对象,接着调用了ChannelInitializer的channelRegister()方法。ChannelInitializer的channelRegister()方法的代码如下:
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {if (this.initChannel(ctx)) {ctx.pipeline().fireChannelRegistered();} else {ctx.fireChannelRegistered();}}private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (this.initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {try {this.initChannel(ctx.channel());} catch (Throwable var6) {this.exceptionCaught(ctx, var6);} finally {this.remove(ctx);}return true;} else {return false;}
initChannel()方法就是在初始化Bootstrap时,调用handler方法传入的匿名内部类所实现的方法。因此,在调用这个方法之后,自定义的ChannelHandler就插入到Pipeline中,此时Pipeline的状态如下图:
当添加完自定义的ChannelHandler后,在finally代码块会删除自定义的ChannelHandler,也就是remove(ctx),最终调用ctx.pipeline().remove(this),因此最后Pipeline的状态如下图:
到此,自定义ChannelHandler的添加过程也就分析完成了。
5 给ChannelHandler命名
pipeline.addXXX()都有一个重载方法,例如addList()有一个重载的版本,代码如下:
public final ChannelPipeline addLast(String name, ChannelHandler handler) {return addLast(null, name, handler);}
第一个参数指定添加的是Handler的名字(更准确的说是ChannelHandlerContext的名字)。那么Handler的名字有什么用呢?如果不设置name,那么Handler默认的名字是怎么样的?上面的方法会调用重载的addLast()方法,代码如下:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {checkMultiplicity(handler);newCtx = newContext(group, filterName(name, handler), handler);addLast0(newCtx);// If the registered is false it means that the channel was not registered on an eventloop yet.// In this case we add the context to the pipeline and add a task that will call// ChannelHandler.handlerAdded(...) once the channel is registered.if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {newCtx.setAddPending();executor.execute(new Runnable() {@Overridepublic void run() {callHandlerAdded0(newCtx);}});return this;}}callHandlerAdded0(newCtx);return this;}
第一个参数设置为null,不用关心。第二个参数就是Handler的名字。有代码可知,再添加一个Handler之前,需要调用checkMultiplicity()方法来确定新添加的Handler名字是否与已添加的Handler名字重复。