剖析 Netty 内部网络实现原理

4d72fdb8d9bc95ac2eb8c1c2a8cd70fc.gif

作者 | 张彦飞allen

来源 | 开发内功修炼

Netty 是一个在 Java 生态里应用非常广泛的的网络编程工具包,它在 2004 年诞生到现在依然是火的一塌糊涂,光在 github 上就有 30000 多个项目在用它。所以要想更好地掌握网络编程,我想就绕不开 Netty。所以今天我们就来分析分析 Netty 内部网络模块的工作原理。

一、Netty 用法

我们首先找一个 Netty 的例子,本篇文章整体都是围绕这个例子来展开叙述的。我们下载 Netty 的源码,并在 examples 中找到 echo 这个 demo。同时,为了防止代码更新导致对本文叙述的影响,我们切到 4.1 分支上来。

# git checkout https://github.com/netty/netty.git
# git checkout -b 4.1
# cd example/src/main/java/io/netty/example/echo

在这个 demo 的 EchoServer 中,展示了使用 Netty 写 Server 的经典用法。

public final class EchoServer {public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();final EchoServerHandler serverHandler = new EchoServerHandler();ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));}p.addLast(serverHandler);}});// Start the server.ChannelFuture f = b.bind(PORT).sync();......}
}

如果你是一个 Java 新手,或者干脆像飞哥一样没用 Netty 写过服务,相信上述代码基本是看不懂的。究其根本原因是相比 C/C++ ,Java 的封装程度比较高。Java 语言本身的 JVM 中 NIO 对网络的封装就已经屏蔽了很多底层的概念了,再加上 Netty 又封装了一层,所以 Java 开发者常用的一些术语和概念和其它语言出入很大。

比如上面代码中的 Channel、NioEventLoopGroup 等都是其它语言中所没见过的。不过你也不用感到害怕,因为这其中的每一个概念都是 socket、进程等底层概念穿了一身不同的衣服而已。接下来我们分别细了解一下这些概念。

1.1 NioEventLoopGroup

如果你没接触过 Netty,可以简单把 NioEventLoopGroup 理解为一个线程池就可以。每一个 NioEventLoopGroup 内部包含一个或者多个 NioEventLoop。

fda739b91bed218f6deb246c701bdf5d.png

其中 NioEventLoop 是对线程、epoll 等概念进行了一个集中的封装。

首先,EventLoop 本身就是一个线程。为什么这么说,我们通过看 NioEventLoop 的继承关系就能看出来。NioEventLoop 继承于 SingleThreadEventLoop,而 SingleThreadEventLoop 又继承于 SingleThreadEventExecutor。SingleThreadEventExecutor 实现了在 Netty 中对本地线程的抽象。

public abstract class SingleThreadEventExecutor extends ... {private volatile Thread thread;private final Queue<Runnable> taskQueue;
}

在 SingleThreadEventExecutor 中不但封装了线程对象 Thread,而且还配置了一个任务队列 taskQueue,用于其它线程向它来放置待处理的任务。

1.2 selector

另外 NioEventLoopEventLoop 以 selector 的名义封装了 epoll(在 Linux 操作系统下)。

f5bf76696e780fa364b9f8a8ed96df86.png

在 NioEventLoop 对象内部,会有 selector 成员定义。这其实就是封装的 epoll 而来的。我们来看具体的封装过程。以及 selectedKeys,这是从 selector 上发现的待处理的事件列表。

public final class NioEventLoop extends SingleThreadEventLoop{// selectorprivate Selector selector;private Selector unwrappedSelector;// selector 上发现的各种待处理事件private SelectedSelectionKeySet selectedKeys;
}

NioEventLoopGroup 在构造的时候,会调用 SelectorProvider#provider 来生成 provider,在默认情况下会调用 sun.nio.ch.DefaultSelectorProvider.create 来创建。

//file:java/nio/channels/spi/SelectorProvider.java
public abstract class SelectorProvider {public static SelectorProvider provider() {// 1. java.nio.channels.spi.SelectorProvider 属性指定实现类// 2. SPI 指定实现类......// 3. 默认实现,Windows 和 Linux 下不同provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}
}

在 Linux 下,默认创建的 provider 使用的就是 epoll。

//file:sun/nio/ch/DefaultSelectorProvider.java
public class DefaultSelectorProvider {public static SelectorProvider create() {String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));if (osname.equals("Linux"))return createProvider("sun.nio.ch.EPollSelectorProvider");}}

1.3 Channel

Channel 是 JavaNIO 里的一个概念。大家把它理解成 socket,以及在 socket 之上的一系列操作方法的封装就可以了。

c78fa3ff1acedbf140ceb9e16b925dca.png

Java 在 Channel 中把 connect、bind、read、write 等方法都以成员方法的形式给封装起来了。

public interface Channel extends ... {Channel read();Channel flush();......interface Unsafe {void bind(SocketAddress localAddress, ...);void connect(SocketAddress remoteAddress, ...);void write(Object msg, ...);......}
}

另外在 Java 中,习惯把 listen socket 叫做父 channel,客户端握手请求到达以后创建出来的新连接叫做子 channel,方便区分。

1.4 Pipeline

在每个 Channel 对象的内部,除了封装了 socket 以外,还都一个特殊的数据结构 DefaultChannelPipeline pipeline。在这个 pipeline 里是各种时机里注册的 handler。

Channel 上的读写操作都会走到这个 DefaultChannelPipeline 中,当 channel 上完成 register、active、read、readComplete 等操作时,会触发 pipeline 中的相应方法。

552ef4f2e28ee77e9efcbcb08b3dc402.png

这个 ChannelPipeline 其实就是一个双向链表,以及链表上的各式各样的操作方法。

public interface ChannelPipeline {ChannelPipeline addFirst(String name, ChannelHandler handler);ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);ChannelPipeline addLast(String name, ChannelHandler handler);ChannelPipeline fireChannelRead(Object msg);
}

1.5 EchoServer 解读

现在我们具备了对 Java、对 Netty 的初步理解以后,我们再会后来看一下开篇提到的 EchoServer 源码。

public final class EchoServer {public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();final EchoServerHandler serverHandler = new EchoServerHandler();ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));}p.addLast(serverHandler);}});// Start the server.ChannelFuture f = b.bind(PORT).sync();......}
}

在该代码一开头,bossGroup = new NioEventLoopGroup(1)这一行是创建了一个只有一个线程的线程池。workerGroup = new NioEventLoopGroup又创建了 worker 线程池,没有指定数量,Netty 内部会根据当前机器的 CPU 核数来灵活决定。

ServerBootstrap 这是一个脚手架类,是为了让我们写起服务器程序来更方便一些。

b.group(bossGroup, workerGroup)这一行是将两个线程池传入,第一个作为 boss 只处理 accept 接收新的客户端连接请求。第二个参数作为 worker 线程池,来处理连接上的请求接收、处理以及结果发送发送。

我们注意下 childHandler是传入了一个 ChannelInitializer,这是当有新的客户端连接到达时会回调的一个方法。在这个方法内部,我们给这个新的 chaneel 的 pipeline 上添加了一个处理器 serverHandler,以便收到数据的时候执行该处理器进行请求处理。

上面的几个方法都是定义,在b.bind方法中真正开始启动服务,创建父 channel(listen socket),创建 boss 线程。当有新连接到达的时候 boss 线程再创建子 channel,为其 pipeline 添加处理器,并启动 worker 线程来进行处理。

二、Netty bootstrap 参数构建

简言之 bootstrap.group() .channel() .childHandler() .childOption() 就是在构建 Netty Server 的各种参数。

2.1 group 设置

ServerBootstrap 和其父类 AbstractBootstrap 内部分别定义了两个 EventLoopGroup group 成员。父类 AbstractBootstrap 的 group 是用来处理 accpet 事件的,ServerBootstrap 下的 childGroup 用来处理其它所有的读写等事件。

group() 方法就是把 EventLoopGroup 参数设置到自己的成员上完事。其中如果调用 group() 只传入了一个线程池,那么将来本服务下的所有事件都由这个线程池来处理。详情查看飞哥精简后的源码。

//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {//用来处理非 accept 以外的线程池private volatile EventLoopGroup childGroup;public ServerBootstrap group(EventLoopGroup group) {return group(group, group);}public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");return this;}
}public abstract class AbstractBootstrap ... {//用来处理 accept 的线程volatile EventLoopGroup group;public B group(EventLoopGroup group) {this.group = group;......}
}

2.2 channel 设置

再看 ServerBootstrap#channel 方法 是用来定义一个工厂方法,将来需要创建 channel 的时候都调用该工厂进行创建。

//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {public B channel(Class<? extends C> channelClass) {return channelFactory(new ReflectiveChannelFactory<C>(ObjectUtil.checkNotNull(channelClass, "channelClass")));}
}

回头看本文开头 demo,.channel(NioServerSocketChannel.class)指的是将来需要创建 channel 的时候,创建 NioServerSocketChannel 这个类型的。

2.3 option 设置

再看 option 方法,只是设置到了 options 成员中而已

//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {public <T> B option(ChannelOption<T> option, T value) {ObjectUtil.checkNotNull(option, "option");synchronized (options) {if (value == null) {options.remove(option);} else {options.put(option, value);}}return self();}
}

2.4 handler 方法

本文 demo 设置了两处 handler,一处是 handler,另一处是 childHandler。他们都是分别设置到自己的成员上就完事,看源码。

//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends ...... {public B handler(ChannelHandler handler) {this.handler = ObjectUtil.checkNotNull(handler, "handler");return self();}public ServerBootstrap childHandler(ChannelHandler childHandler) {this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");return this;}
}

三、Netty bootstrap 启动服务

ServerBootstrap 下的 bind 方法是服务启动过程中非常重要的一个方法。创建父 channel(listen socket),创建 boss 线程,为 boss 线程绑定 Acceptor 处理器,调用系统调用 bind 进行绑定和监听都是在这里完成的。

先来直接看一下 bind 相关的入口源码。

//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap ... {......
}//file:io/netty/bootstrap/AbstractBootstrap.java
public abstract class AbstractBootstrap ... {public ChannelFuture bind(SocketAddress localAddress) {validate();return doBind(...);}private ChannelFuture doBind(final SocketAddress localAddress) {//创建父 channel、初始化并且注册final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();......//如果 Register 已经完成,则直接 doBind0if (regFuture.isDone()) {ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;//否则就注册一个 listener(回调),等 register 完成的时候调用 } else {final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {promise.registered();doBind0(regFuture, channel, localAddress, promise);}return promise;}}//创建 channel,对其初始化,并且 register(会创建 parent 线程)final ChannelFuture initAndRegister() {//3.1 创建父 channel(listen socket)channel = channelFactory.newChannel();//3.2 对父 channel(listen socket)进行初始化init(channel);//3.3 注册并启动 boss 线程ChannelFuture regFuture = config().group().register(channel);......}//3.4 真正的bindprivate static void doBind0(...) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);......}});}
}

在这个过程中,做了如下几件重要的事情

  • 创建父 channel(listen socket)

  • 对父 channel(listen socket)进行初始化

  • register父 channel(listen socket)到主 group,并启动主进程

  • 真正的 bind

接下来我们分开来看。

3.1 创建父 channel(listen socket)

在 initAndRegister() 方法中创建 channel(socket),它调用了 channelFactory.newChannel()。

public abstract class AbstractBootstrap//创建 channel,对其初始化,并且 register(会创建 parent 线程)final ChannelFuture initAndRegister() {//3.1 创建 listen socketchannel = channelFactory.newChannel();......}
}

回想下2.2节的channel 方法,返回的是一个反射 ReflectiveChannelFactory。没错这里的 newChannel 就是调用这个工厂方法来创建出来一个 NioServerSocketChannel 对象。

3.2 对父 channel(listen socket)进行初始化

在 initAndRegister 创建除了 channel 之后,需要调用 init 对其进行初始化。

public abstract class AbstractBootstrapfinal ChannelFuture initAndRegister() {//3.1 创建父 channel(listen socket)//3.2 对父 channel(listen socket)进行初始化init(channel);......}
}

在 init() 中对 channel 进行初始化,一是给 options 和 attrs 赋值,二是构建了父 channel 的 pipeline。

//file:src/main/java/io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {void init(Channel channel) {//设置 option 和 attrsetChannelOptions(channel, newOptionsArray(), logger);setAttributes(channel, newAttributesArray());//设置 pipelineChannelPipeline p = channel.pipeline();p.addLast(new ChannelInitializer<Channel>() {......});}
}

在 setChannelOptions 中对 channel 的各种 option 进行设置。回忆我们在使用 ServerBootstrap 时可以传入 SO_BACKLOG,这就是其中的一个 option。在这里会真正设置到 channel(socket)上。

ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 100)

在 init 中,稍微难理解一点是 p.addLast(new ChannelInitializer...)。这一段代码只是给父 channel 添加一个 handler 而已。其真正的执行要等到 register 后,我们待会再看。

3.3 register 父 channel

父 channel 在创建完,并且初始化之后,需要注册到 boss 线程上才可用。

public abstract class AbstractBootstrapfinal ChannelFuture initAndRegister() {//3.1 创建父 channel(listen socket)//3.2 对父 channel(listen socket)进行初始化//3.3 注册并启动 boss 线程ChannelFuture regFuture = config().group().register(channel);......}
}

其中 config().group() 最终会调用到 AbstractBootstrap#group,在这个方法里获取的是我们传入进来的 bossGroup。

public abstract class AbstractBootstrapvolatile EventLoopGroup group;public final EventLoopGroup group() {return group;}
}

其中 bossGroup 是一个 NioEventLoopGroup 实例,所以代码会进入到 NioEventLoopGroup#register 方法。

public class NioEventLoopGroup extends MultithreadEventLoopGroup {}public abstract class MultithreadEventLoopGroup extends ... {@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}@Overridepublic EventLoop next() {return (EventLoop) super.next();}
}

在 NioEventLoopGroup 里包含一个或多个 EventLoop。上面的 next 方法就是从中选择一个出来,然后将 channel 注册到其上。

对于本文来讲,我们使用的是 NioEventLoopGroup,其内部包含的自然也就是 NioEventLoop,我们继续查找其 register 方法。

public final class NioEventLoop extends SingleThreadEventLoop//在 eventloop 里注册一个 channle(socket)public void register(final SelectableChannel ch, ...) {......register0(ch, interestOps, task);}//最终调用 channel 的 registerprivate void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {ch.register(unwrappedSelector, interestOps, task);}
}

可见,NioEventLoop 的 register 最后又调用到 channel 的 register 上了。在我们本文中,我们创建的 channel 是 NioServerSocketChannel,我们就依照这条线索来查。

//file:src/main/java/io/netty/channel/AbstractChannel.java
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {public final void register(EventLoop eventLoop, final ChannelPromise promise) {......//关联自己到 eventLoopAbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});}......}}
}

在 channel 的父类 AbstractChannel 中的 register 中,先是把自己关联到传入的 eventLoop 上。接着调用 inEventLoop 来判断线程当前运行的线程是否是 EventExecutor的支撑线程,是则返回直接 register0。

一般来说,服务在启动的时候都是主线程在运行。这个时候很可能 boss 线程还没有启动。所以如果发现当前不是 boss 线程的话,就调用 eventLoop.execute 来启动 boss 线程。

NioEventLoop 的父类是 SingleThreadEventExecutor, 找到 execute 方法。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {public void execute(Runnable task) {execute0(task);}private void execute0(@Schedule Runnable task) {execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));}private void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop();addTask(task);if (!inEventLoop) {startThread();}if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);}}
}

我们先来看 addTask(task),它是将 task 添加到任务队列中。等待线程起来以后再运行。

public abstract class SingleThreadEventExecutor extends ... {private final Queue<Runnable> taskQueue;protected void addTask(Runnable task) {(task);}final boolean offerTask(Runnable task) {return taskQueue.offer(task);}
}

inEventLoop() 是判断当前线程是不是自己绑定的线程,这时还在主线程中运行,所以 inEventLoop 为假,会进入 startThread 开始为 EventLoop 创建线程。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {private void startThread() {doStartThread();......}private void doStartThread() {executor.execute(new Runnable() {@Overridepublic void run() {SingleThreadEventExecutor.this.run();......}}}  
}

在 doStartThread 中调用 Java 线程管理工具 Executor 来启动 boss 线程。

3.4 boss 线程启动

当线程起来以后就进入了自己的线程循环中了,会遍历自己的任务队列,然后开始处理自己的任务。

public final class NioEventLoop extends SingleThreadEventLoop {protected void run() {for (;;) {if (!hasTasks()) {strategy = select(curDeadlineNanos);}//如果有任务的话就开始处理runAllTasks(0);//任务处理完毕就调用 epoll_wait 等待事件发生processSelectedKeys();}}
}

前面我们在 3.3 节看到 eventLoop.execute 把一个 Runnable 任务添加到了任务队列里。当 EventLoop 线程启动后,它会遍历自己的任务队列并开始处理。这时会进入到 AbstractChannel#register0 方法开始运行。

//file:src/main/java/io/netty/channel/AbstractChannel.java
public abstract class AbstractChannel extends ... {public final void register(...) {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});......}private void register0(ChannelPromise promise) {doRegister();......}
}

函数 doRegister 是在 AbstractNioChannel 类下。

//file:io/netty/channel/nio/AbstractNioChannel.java
public abstract class AbstractNioChannel extends AbstractChannel {private final SelectableChannel ch;protected SelectableChannel javaChannel() {return ch;}public NioEventLoop eventLoop() {return (NioEventLoop) super.eventLoop();}protected void doRegister() throws Exception {boolean selected = false;for (;;) {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);            }}
}

上面最关键的一句是selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);。这一句就相当于在 C 语言下调用 epoll_ctl 把 listen socket 添加到了 epoll 对象下。

其中 javaChannel 获取父 channel,相当于 listen socket。unwrappedSelector 获取 selector,相当于 epoll 对象。register 相当于使用 epoll_ctl 执行 add 操作。

当 channel 注册完后,前面 init 时注册的 ChannelInitializer 回调就会被执行。再回头看它的 回调定义。

//file:src/main/java/io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {void init(Channel channel) ......p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) {......ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}
}

在 ChannelInitializer#initChannel 里,又给 boss 线程的 pipeline 里添加了一个任务。该任务是让其在自己的 pipeline 上注册一个 ServerBootstrapAcceptor handler。将来有新连接到达的时候,ServerBootstrapAcceptor 将会被执行。

3.5 真正的 bind

再看 doBind0 方法,调用 channel.bind 完成绑定。

private static void doBind0(...) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);......}});}

四、新连接到达

我们再回到 boss 线程的主循环中。

public final class NioEventLoop extends SingleThreadEventLoop {protected void run() {for (;;) {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());//任务队列都处理完就开始 selectif (!hasTasks()) {strategy = select(curDeadlineNanos);}//处理各种事件if (strategy > 0) {processSelectedKeys();}}  }private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {return selector.select();}// Timeout will only be 0 if deadline is within 5 microsecslong timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);}
}

假如线程任务队列中的任务都处理干净了的情况下,boss 线程会调用 select 来发现其 selector 上的各种事件。相当于 C 语言中的 epoll_wait。

当发现有事件发生的时候,例如 OP_WRITE、OP_ACCEPT、OP_READ 等的时候,会进入相应的处理

public final class NioEventLoop extends SingleThreadEventLoop {private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {......if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}}
}

对于服务端的 Unsafe.read()  这里会执行 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read() 方法,它会调用 JDK 底层的 ServerSocketChannel.accept() 接收到客户端的连接后,将其封装成 Netty 的 NioSocketChannel,再通过 Pipeline 将 ChannelRead 事件传播出去,这样 ServerBootstrapAcceptor 就可以在 ChannelRead 回调里处理新的客户端连接了。

我们直接看 ServerBootstrapAcceptor#ChannelRead。

//file:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {......private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {public void channelRead(ChannelHandlerContext ctx, Object msg) {// 获取child channelfinal Channel child = (Channel) msg;// 设置 childHandler 到 child channelchild.pipeline().addLast(childHandler);// 设置 childOptions、 childAttrssetChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);// 将 child channel 注册到 childGroupchildGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});}}
}

在 channelRead 先是获取到了新创建出来的子 channel,并为其 pipeline 添加 childHandler。回头看 1.5 节,childHandler 是我们自定义的。

紧接着调用 childGroup.register(child) 将子 channel 注册到 workerGroup 上。这个 register 过程和 3.3节、3.5节过程一样。区别就是前面是父 channel 注册到 bossGroup 上,这里是子 channel 注册到 workerGroup上。

在 register 完成后,子 channel 被挂到了 workerGroup 其中一个线程上,相应的线程如果没有创建也会被创建出来并进入到自己的线程循环中。

当子 channel 注册完毕的时候,childHandler 中 ChannelInitializer#initChannel 会被执行

public final class EchoServer {public static void main(String[] args) throws Exception {...ServerBootstrap b = new ServerBootstrap();b.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));}p.addLast(serverHandler);}});......}
}

在 initChannel 把子 channel 的处理类 serverHandler 添加上来了。Netty demo 中对这个处理类的定义非常的简单,仅仅只是打印出来而已。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(......) {ctx.write(msg);}......
}

五、用户请求到达

当 worker 线程起来以后,会进入线程循环(boss 线程和 worker 线程的 run 函数是一个)。在循环中会遍历自己的任务队列,如果没有任务可处理,便 select 来观察自己所负责的 channel 上是否有事件发生。

public final class NioEventLoop extends SingleThreadEventLoop {protected void run() {for (;;) {if (!hasTasks()) {strategy = select(curDeadlineNanos);}//如果有任务的话就开始处理runAllTasks(0);//任务处理完毕就调用 epoll_wait 等待事件发生processSelectedKeys();}}private int select(long deadlineNanos) throws IOException {selector.selectNow();......    }
}

worker 线程会调用 select 发现自己所管理的所有子 channel 上的可读可写事件。在发现有可读事件后,会调用 processSelectedKeys,最后触发 pipeline 使得 EchoServerHandler 方法开始执行。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(......) {ctx.write(msg);}......
}

六、总结

事实上,Netty 对网络封装的比较灵活。既支持单线程 Reactor,也支持多线程 Reactor、还支持主从多线程 Reactor。三种模型对应的用法如下:

public static void main(String[] args) throws Exception {//单线程 ReactorEventLoopGroup eventGroup = new NioEventLoopGroup(1);ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(eventGroup);    ......//多线程 ReactorEventLoopGroup eventGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(eventGroup);    ......//主从多线程 ReactorEventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup);    ......
}

为了表述的更全面,本文飞哥选择的是最为经典的 主从多线程 Reactor 模式。本文中所描述的内容可以用下面一幅图来表示。

e048bd61f0f964c2e78b6b37d49ed6b8.png

在 Netty 中的 boss 线程中负责对父 channel(listen socket)上事件的监听和处理,当有新连接到达的时候,选择一个 worker 线程把这个子 channel(连接 socket )叫给 worker 线程来处理。

其中 Worker 线程就是等待其管理的所有子 channel(连接 socket)上的事件的监听和处理。当发现有事件发生的时候,回调用户设置的 handler 进行处理。在本文的例子中,这个用户 handler 就是 EchoServerHandler#channelRead。

至此,Netty 网络模块的工作核心原理咱们就介绍完了。飞哥一直“鼓吹”内功的好处。只要你具备了坚实的内功,各种语言里看似风牛马不相及的东西,在底层实际上原理是想通的。我本人从来没用 Java 开发过服务器程序,更没碰过 Netty。但是当你多epoll有了深入理解的时候,再看Netty也能很容易看懂,很快就能理解它的核心。这就是锻炼内功的好处!

7581294a1a839f48e3b303771d21b1f1.gif

往期推荐

Redis 内存优化神技,小内存保存大数据

使用 nginx 轻松管理 kubernetes 资源文件

Redis 内存满了怎么办?这样置才正确!

实战 Kubectl 创建 Deployment 部署应用

df9fb48ec8737bf9d3fb9a081e934334.gif

点分享

4bd82916ab737bafb1c326bd90d35579.gif

点收藏

286ca922ca3d8072205000d0c0987570.gif

点点赞

b16a230ef6340056ad7794229f55e737.gif

点在看

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/511354.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

免费体验,阿里云智能LOGO帮你解决设计难题

简介&#xff1a;超实用&#xff01;零基础搞定一个高大上的智能logo设计 新年过后&#xff0c;往往是大家一年中士气最足的时候&#xff0c;散去了年末的疲惫和emo&#xff0c;重燃对新一年的热情和希望。 想创业的朋友们同样意气风发&#xff0c;趁着新年的劲头想大干一场。…

第十一届吴文俊人工智能科学技术奖颁奖盛典召开,66个项目成果摘得中国智能科学技术奖励最高殊荣

7月16日上午&#xff0c;我国智能科学技术最高奖“吴文俊人工智能科学技术奖”颁奖盛典在北京隆重举行&#xff0c;66个获奖项目及个人受到表彰奖励。中国工程院院士、浙江大学教授潘云鹤荣获“吴文俊人工智能最高成就奖”,并颁授荣誉奖牌和100万人民币奖金。欧洲科学院院士、华…

企业拥抱开源之前,必须了解的七件事

简介&#xff1a;新的时代&#xff0c;开源的发展越来越蓬勃&#xff0c;开源和云的关系越来越复杂&#xff0c;耦合度越来越高&#xff0c;云是开源软件允许的最佳环境之一&#xff0c;也为开源软件插上安全高效的腾飞引擎。本文试图从企业软件的历史&#xff0c;结合开源软件…

荔枝音质高保真AI降噪技术分享

“荔枝音频处理目标就是两个字&#xff1a;清静——清晰、安静。让用户听得更清晰、更真切、更好。”荔枝集团技术副总裁刘晓宇谈及音频技术要攻克的几个难点时一针见血地提到。 随着疫情下直播、在线社交、在线课堂、在线会议等快速发展和元宇宙产业不断壮大&#xff0c;音频…

19年兰州大学计算机分数线,兰州大学2019年在广东省录取分数线

免费申请学习规划请选择在读年级学前小学初中高中大学留学其他已为10472位学员提供学习规划*验证码*短信验证码{"text1":{"label":"薄弱科目","placeholder":"请输入你的薄弱科目","required":1,"formType&q…

最佳实践丨构建云上私有池(虚拟IDC)的5种方案详解

简介&#xff1a;云上私有池系列终篇终于来了&#xff0c;本文将重点介绍构建云上的私有池&#xff08;虚拟IDC&#xff09;的多种方案和各自的优缺点&#xff0c;并给出相关的性价比优化建议。 本文作者&#xff1a;阿里云技术专家李雨前 摘要 围绕私有池&#xff08;虚拟I…

阿里云万郁香:多样付费选择构筑成本最优的弹性体验

简介&#xff1a;云上成本优化三部曲&#xff1a;云上资源归属拆解、确定资源需求及购买优先级、选择最佳的付费方式。 2021年12月21日&#xff0c;阿里云弹性计算年度峰会在上海举行&#xff0c;本次峰会通过全实景直播的形式为大家呈现。峰会上&#xff0c;阿里云弹性计算高…

智能分层、满足更高工作负载,亚马逊云科技加速云端存储服务创新

编辑 | 宋慧 出品 | CSDN 云计算 分布式存储 Amazon S3、弹性计算 Amazon EC2&#xff0c;都是亚马逊云科技的当家产品。在云原生、人工智能、数据分析领域继续发挥技术优势&#xff0c;夯实云技术领先的地位之后&#xff0c;亚马逊云科技在存储领域的研究和发展也没有止步不前…

计算机开机黑屏,开机黑屏,电脑无法进入系统

win10开机进安全模式我相信经常用电脑的童鞋也遇到过这样的问题&#xff0c;就是桌面上什么都没有&#xff0c;电脑也开着。怎么了&#xff1f;通常&#xff0c;不显示桌面有很多原因。如何解决这个问题&#xff1f;等等&#xff0c;别担心&#xff0c;小...相信经常使用电脑的…

最佳实践丨三种典型场景下的云上虚拟IDC(私有池)选购指南

简介&#xff1a;业务上云常态化&#xff0c;业务在云上资源的选购、弹性交付、自助化成为大趋势。不同行业的不同客户&#xff0c;业务发展阶段不一样&#xff0c;云上资源的成本投入在业务整体成本占比也不一样&#xff0c;最小化成本投入、最大化业务收益始终是不同客户间的…

客流量总是少?是你门店选址出了问题!

零售行业最本质的需求就是降本增效、引流提销&#xff0c;实现利润最大化。如何利用大数据、人工智能、云计算、AIOT等前沿技术&#xff0c;助力企业数智化转型&#xff0c;全生态效率提升和可持续发展&#xff0c;是零售企业的核心诉求。 零售行业客流管理的现状 零售行业已进…

大数据领域的专精特新“小巨人”中科闻歌

客户故事 中科闻歌创立于2017年3月。在成立的四年内&#xff0c;它完成了五轮融资&#xff0c;被评为国家级专精特新“小巨人”企业。2020年&#xff0c;中科闻歌与阿里云展开合作&#xff0c;通过云服务器 ECS&#xff08;Elastic Compute Service&#xff09;为核心业务提供底…

计算机网络c类网络划分子网介绍,IP地址的子网划分详解

原标题&#xff1a;IP地址的子网划分详解来源&#xff1a;今日头条北京炫亿时代一、子网划分基础1、子网划分的若干个好处&#xff1a;①减少网络流量②提高网络性能③简化管理④可以更为灵活的形成大覆盖范围的网络2、你最好遵循以下步骤来进行子网划分&#xff1a;①确认所需…

直播回顾:准确性提升到 5 秒级,ssar 独创的 load5s 指标有多硬核?| 龙蜥技术

简介&#xff1a; 你还在为分析机器负载高而苦恼&#xff1f;这款 ssar 工具独创 load5s 指标精准定位超硬核。 编者按&#xff1a;本文整理自龙蜥SIG技术周会&#xff0c;作者闻茂泉&#xff0c;阿里云计算平台事业部SRE运维专家&#xff0c;是龙蜥社区跟踪诊断SIG核心成员…

dos系统重启计算机名,dos系统重启的命令是是

用DOS命令可以实现很多功能&#xff0c;而且有时候这相对于其他方法比较简单易行&#xff0c;下面就让学习啦小编教大家用dos系统重启的命令&#xff0c;还有自动关机和注销等功能。dos系统重启的命令按快捷键“winR”&#xff0c;输入cmd后按回车进入DOS环境&#xff1a;输入命…

如何搭建云原生大数据平台的K8s底座

作者 | 智领云科技云平台研发经理 金津 供稿 | 智领云科技 伴随着数字化转型脚步的加快&#xff0c;大数据已成为企业经营管理的主要手段之一&#xff0c;越来越多的行业也选择通过大数据来实现业绩增长。今年年初&#xff0c;CNCF中国区总监陈泽辉在2022云原生超级英雄会上表…

用好这28个工具,开发效率爆涨

简介&#xff1a;用好这28个工具&#xff0c;开发效率爆涨。本文我将主要从Terminal 和 Desktop 2个大类、8个核心开发场景介绍一下我最常使用的效率工具&#xff0c;及如何通过这些工具来提升程序员「幸福感」的实践。 大家好&#xff0c;我是秦世成&#xff0c;我在云效负责…

文石服务器维护,文石BOOX OS 2.0新系统即将上线,联合京东读书推出BOOX书城

原标题&#xff1a;文石BOOX OS 2.0新系统即将上线&#xff0c;联合京东读书推出BOOX书城作为行业领先的电纸书品牌&#xff0c;文石多年来一直致力于完善BOOX电纸书的功能&#xff0c;优化系统&#xff0c;提高性能&#xff0c;简化操作&#xff0c;力图在开放性和傻瓜式之间寻…

Docker 那些事儿:如何安全地停止、删除容器?

作者 | 飞向星的客机来源 | CSDN博客&#x1f31f; 前言本篇文章将会讲讲如何停止、删除容器和对容器进行资源限制。停止和删除容器&#x1f351; 停止容器在工作中&#xff0c;有时会需要将容器暂停&#xff0c;例如&#xff0c;要为容器文件系统做一个快照时。使用 docker pa…

独家专访阿里云存储负责人吴结生:我经历的三个重大决策

简介&#xff1a;云原生正在重新定义存储&#xff0c;而存储只是基础软件领域中的一环。本期 C 位面对面&#xff0c;我们有幸邀请到了阿里巴巴高级研究员&#xff0c;阿里云智能存储负责人吴结生&#xff08;Jason Wu&#xff09;&#xff0c;他亲历了阿里云存储技术高速发展的…