NioEventLoopGroup
可以看到NioEventLoopGroup继承了MultithreadEventExecutorGroup并且实现了EventLoopGroup接口,而这两个类被ExecutorService修饰,所以NioEventLoopGroup实际上是一个线程池,池中的对象其实就是单个的NioEventLoop。
源码解读
NioEventLoopGroup 的参数初始化
// 进入无参构造public NioEventLoopGroup() {this(0);}
public NioEventLoopGroup(int nThreads) {// group所包含的executorthis(nThreads, (Executor) null);}
public NioEventLoopGroup(int nThreads, Executor executor) {// 注入单例模式的提供者 this(nThreads, executor, SelectorProvider.provider());}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {// 注入默认的策略工厂实例this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) { // 注入拒绝处理器super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}
private static final int DEFAULT_EVENT_LOOP_THREADS;static {//默认线程数是cpu核数的两倍DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));if (logger.isDebugEnabled()) {logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);}}
构造真正的NioEventGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}//1、//executor校验非空, 如果为空就创建ThreadPerTaskExecutor, 该类实现了 Executor接// 这个executor 是用来执行线程池中的所有的线程,也就是所有的NioEventLoop,其实从//NioEventLoop构造器中也可以知道,NioEventLoop构造器中都传入了executor这个参数。if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}//2、//这里的children数组, 其实就是线程池的核心实现,线程池中就是通过指定的线程数组来实现 线程池;//数组中每个元素其实就是一个EventLoop,EventLoop是EventExecutor的子接口。children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {//3、//newChild(executor, args) 函数在NioEventLoopGroup类中实现了,// 实质就是就是存入了一个 NIOEventLoop类实例children[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {if (!success) {for (int j = 0; j < i; j ++) {children[j].shutdownGracefully();}// 终止所有eventLoop上所执行的任务for (int j = 0; j < i; j ++) {EventExecutor e = children[j];try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);}} catch (InterruptedException interrupted) {// Let the caller handle the interruption.Thread.currentThread().interrupt();break;}}}}}
//4、实例化线程工厂执行器选择器: 根据children获取选择器chooser = chooserFactory.newChooser(children);//5、为每个EventLoop线程添加 线程终止监听器final FutureListener<Object> terminationListener = new FutureListener<Object>() {@Overridepublic void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {terminationFuture.setSuccess(null);}}};for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);}
//6、将children 添加到对应的set集合中去重, 表示只可读。Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);Collections.addAll(childrenSet, children);readonlyChildren = Collections.unmodifiableSet(childrenSet);}
children[i] = newChild(executor, args);
/*** newChild(executor, args) 里的方法* 我们可以看到 返回的就是一个 NioEventLoop*/
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
1. NioEventLoopGroup初始化时未指定线程数,那么会使用默认线程数,即 `线程数 = CPU核心数 * 2`;
2. 每个NioEventLoopGroup对象内部都有一组可执行的`NioEventLoop数组`,其大小是 nThreads, 这样就构成了一个线程池, `一个NIOEventLoop可以理解成就是一个线程`。
3. 所有的NIOEventLoop线程是使用相同的 executor、SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler以及是属于某一个NIOEventLoopGroup的。这一点从 newChild(executor, args); 方法就可以看出:newChild()的实现是在NIOEventLoopGroup中实现的。
4. 当有IO事件来时,需要从线程池中选择一个线程出来执行,这时候的NioEventLoop选择策略是由GenericEventExecutorChooser实现的,并调用该类的next()方法。
5. 每个NioEventLoopGroup对象都有一个NioEventLoop选择器与之对应,其会根据NioEventLoop的个数,动态选择chooser(如果是2的幂次方,则按位运算,否则使用普通的轮询)