Netty源码解析——服务端启动
- Netty案例复习
- Netty原理复习
- Netty服务端启动源码解析
- bind(int)
- initAndRegister()
- channelFactory.newChannel()
- init(channel)
- config().group().register(channel)
- startThread()
- run()
- register0(ChannelPromise promise)
- doBind0(...)
今天我们一起来学习Netty源码,对Netty有一个深入的认知,既能掌握其使用和原理,又能对它底层的设计有一个大概的认知
Netty案例复习
在阅读源码前,我们再看一下Netty服务端的启动代码
public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new EchoServerHandler());}});ChannelFuture f = b.bind(PORT).sync();f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
- 我们创建了两个EventLoopGroup已经,然后设置到ServerBootstrap上,bossGroup中的EventLoop处理accept事件,workerGroup中的EventLoop处理read事件时。
- 再通过ServerBootstrap设置好一些其他的参数,其中包括Netty服务端接收连接请求用的NioServerSocketChannel,NioServerSocketChannel会注册到bossGroup上,监听accept事件
- 然后设置好ChannelInitializer,服务端接收到客户端连接之后,会创建一个NioSocketChannel,ChannelInitializer就是用于初始化连接建立成功后的NioSocketChannel
- 最后就是通过ServerBootstrap的bind(int inetPort)绑定一个端口
Netty原理复习
我们再来复习一下Netty的原理,带着对Netty原理的认知去看源码,效率才会高,不至于走偏。
- 当调用了ServerBootstrap的bind(int inetPort)绑定端口后,我们注册的NioServerSocketChannel就会被注册到bossGroup上的唯一一个NioEventLoop上,然后NioEventLoop会把ServerSocketChannel注册到Selector上监听accept事件,并启动事件循环
- 当有客户端连接后,boosGroup中的EventLoop会获得一个SocketChannel然后将其包装成NioSocketChannel,然后NioServerSocketChannel对应的ChannelHandler会把它注册到workerGroup中
- workerGroup会将NioSocketChannel注册到其中一个NioEventLoop上,并使用我们配置的ChannelInitializer初始化NioSocketChannel
- workerGroup中的NioEventLoop的事件循环中监听到Channel有read事件时发生,就会调用Channel对应的ChannelPipeline进行处理
- ChannelPipeline通过责任链模式,逐一调用pipeline上的ChannelHandler处理事件
Netty服务端启动源码解析
接下来我们就开始读源码了,我们从ServerBootstrap的bind方法走起。
bind(int)
/*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));}
bind方法的作用就是创建一个Channel并绑定端口,创建的Channel类型就是我们指定的NioServerSocketChannel。
bind方法会调用doBind方法。
private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();...doBind0(regFuture, channel, localAddress, promise);...}
省略了非关键代码后,剩下的就是关键的两行:
- initAndRegister()方法做的事情就是创建并初始化Channel然后注册到NioEventLoop上。
- doBind0(…)方法给NioServerSocockChannel中的ServerSocketChannel绑定端口。
initAndRegister()
我们进入initAndRegister()方法,同样是只看核心代码,其他的代码不关心。
final ChannelFuture initAndRegister() {...channel = channelFactory.newChannel();init(channel);...ChannelFuture regFuture = config().group().register(channel);...}
- channelFactory.newChannel():反射调用NioServerSocketChannel的构造方法进行实例化。
- init(channel):为NioServerSocketChannel的ChannelPipeline添加ChannelInitializer。
- config().group().register(channel):把NioServerSocketChannel中的ServerSocketChannel注册到workerGroup的NioEventLoop的Selector上。
channelFactory.newChannel()
channelFactory.newChannel()会进入到ReflectiveChannelFactory的newChannel()方法,里面就是调用Constructor构造器的newInstance()方法反射实例化一个Channel,这个构造器就是NioServerSocketChannel的构造器。是在我们调用ServerBootstrap的channel(NioServerSocketChannel.class)方法时设置进去的。
我们进入NioServerSocketChannel的构造方法看看。
public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));}
newSocket(…)方法就是创建了一个NIO原生的ServerSocketChannel,然后进入重载的构造方法。
public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);config = new NioServerSocketChannelConfig(this, javaChannel().socket());}
继续调用父类的构造方法,可以看到指定了事件类型是accept事件。然后进入到父类AbstractNioChannel的构造方法。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;...ch.configureBlocking(false);...}
AbstractNioChannel的构造方法除了继续调用父类的构造方法以外,还保存了ServerSocketChannel和关注的事件类型OP_ACCEPT,然后设置ServerSocketChannel为非阻塞。
继续进入父类的构造方法。
protected AbstractChannel(Channel parent) {...unsafe = newUnsafe();pipeline = newChannelPipeline();}
创建了一个unsafe对象,和一个ChannelPipeline。unsafe其实是Netty内部自己使用的一个工具类,先不用管,后面会看到怎么用,这里看看如何创建ChannelPipeline。
protected DefaultChannelPipeline(Channel channel) {...tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}
我们知道ChannelPipeline中的ChannelHandler都是包在ChannelHandlerContext里面的,而这里先创建了一个头部Context和尾部Context。
也就是这样:
channelFactory.newChannel()的大体逻辑如下:
init(channel)
我们回到initAndRegister()方法,再来看init方法。
@Overridevoid init(Channel channel) {...ChannelPipeline p = channel.pipeline();...p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) {...ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}
init方法就是拿到NioServerSocketChannel的ChannelPipeline,往里面添加了一个ChannelInitializer,ChannelInitializer会在ServerSocketChannel被注册到Selector后,异步的给ChannelPipeline添加一个ServerBootstrapAcceptor,这个ServerBootstrapAcceptor是一个专门处理连接事件的Handler。当然ChannelInitializer还会从ChannelPipeline中把自己删除。
config().group().register(channel)
我们回到initAndRegister()方法,再往后看一下config().group().register(channel)。
@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}
进入NioEventLoopGroup的父类MultithreadEventLoopGroup的register方法中。next()方法返回一个NioEventLoop,然后调用NioEventLoop的register(channel)方法。进入到NioEventLoop的父类SingleThreadEventLoop的register方法中。
@Overridepublic ChannelFuture register(final ChannelPromise promise) {...promise.channel().unsafe().register(this, promise);...}
promise.channel().unsafe()获取到NioServerSocketChannel中的Unsafe对象,然后调用Unsafe对象的register(…)
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {...if (eventLoop.inEventLoop()) {register0(promise);} else {...eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});... }}
Unsafe的register方法中,判断当前线程是否是eventLoop的线程,如果是,直接调用register0方法注册Channel,否则调用eventLoop的execute方法开启一个任务异步注册Channel。
private void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop();addTask(task);if (!inEventLoop) {startThread();...}...}
execute方法首先调用addTask(task)方法,把这个任务放入taskQueue中。然后判断当选线程不是当前EventLoop的线程,因此调用startThread()开启EventLoop的线程,启动事件循环。
addTask方法就是把这个Runnable放入到NioEventLoop内部的队列当中taskQueue中,在NioEventLoop的事件循环的每一轮的最后,会处理taskQueue中的任务,我们就不细看了。我们接下来看一下startThread方法。
startThread()
private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {...doStartThread();...}}}
startThread方法先修改NioEventLoop的状态state为开启,防止再次调用startThread方法的时候重复开启线程。然后调用doStartThread()方法。
private void doStartThread() {...executor.execute(new Runnable() {@Overridepublic void run() {...SingleThreadEventExecutor.this.run();...}});}
doStartThread()方法里面调用了executor的execute方法,异步执行SingleThreadEventExecutor.this.run()方法,这个run方法就会进入到NioEventLoop的run方法中,里面就是事件循环。
这个executor其实是一个单线程的线程池,可以看做就是NioEventLoop的唯一一个线程,startThread方法并没有再创建一个线程,而是往NioEventLoop预先创建好的线程提交一个任务,这个任务会启动NioEventLoop的事件循环。
run()
接下来进入NioEventLoop的run方法看看,所谓的事件循环是什么。
@Overrideprotected void run() {...for (;;) {...strategy = select(curDeadlineNanos);if (strategy > 0) {processSelectedKeys();}...runAllTasks();...}}
可以看到,就是先调用select方法,这个select方法自然是调用NioEventLoop的Selector的select方法,当然现在还没有任何Channel被注册,因此这里是不会select到东西的。然后processSelectedKeys()是处理所有就绪事件的方法,因为这里还没有Channel被注册,自然也没有就绪事件发生。因此最后只有runAllTasks()被执行,也就是获取taskQueue中刚刚放进去的任务,这个任务的执行就会触发ServerSocketChannel的注册。
runAlllTasks方法执行taskQueue中目前唯一一个的任务,也就是刚刚放进去的用于注册ServerSocketChannel的任务,这个任务会调用Unsafe的register0(ChannelPromise promise)方法。
register0(ChannelPromise promise)
private void register0(ChannelPromise promise) {...doRegister();...}
register0调用doRegister()方法,进入AbstractNioChannel的doRegister()方法。
@Overrideprotected void doRegister() throws Exception {...selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);...}
doRegister方法中把Channel注册到Selector的代码就是上面这一行,javaChannel()返回NIO原生的Channel类型,然后调用register方法,eventLoop().unwrappedSelector()返回的就是Selector,这个Selector作为register方法的参数,这样NIO的Channel就被注册到Selector中了,这里的Channel当然是ServerSocketChannel。
这里注册完ServerSocketChannel之后,就会调用ChannelInitializer初始化ChannelPipeline。
doBind0(…)
我们回调doBind方法,initAndRegister()方法就已经看完了,再看下面的doBind0方法。
private static void doBind0(...,final Channel channel,...) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {...channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);...}});}
dobind0()方法也是通过NioEventLoop执行异步任务的方式去绑定端口,channel.bind(localAddress, promise)方法最终会进入NioServerSocketChannel的doBind(SocketAddress localAddress)方法。
protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}}
然后就是调用ServerSocketChannel的bind(SocketAddress local, int backlog)方法绑定指定端口。
至此,Netty服务端就启动起来,下面是Netty服务端启动的源码流程图。
服务端启动之后,就可以接收并处理客户端的请求了,后续的流程就放到下一次再作分析。