归档
- GitHub: Netty-选择器-监听
介绍
- 参考
NioEventLoop
- 类结构:基础类介绍-NioEventLoop
- 主要逻辑为:死循环监听
selector
- 总结:
- 创建的线程是
FastThreadLocalThread
实例
- 创建的线程是
原理
io.netty.channel.nio.NioEventLoop
/*** NIO 事件轮循 */
public final class NioEventLoop extends SingleThreadEventLoop {private Selector selector; // 封装的选择器。可参考:#openSelector()private Selector unwrappedSelector; // JDK 底层选择器private SelectedSelectionKeySet selectedKeys;private final SelectorProvider provider;// 选择器 SPI 提供者/*** sign_m_511 钩子函数(任务逻辑),可理解为线程的 Runnable #run() 方法 */@Overrideprotected void run() {for (;;) { // 死循环监听try {int strategy;try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {... // 省略其他 casecase SelectStrategy.SELECT:long curDeadlineNanos = ... // 一般设置为 NONE...try {if (!hasTasks()) {strategy = select(curDeadlineNanos); // select 监听}} ... // finallydefault:}} catch (IOException e) {rebuildSelector0(); // 有错误,重新构建选择器并将信道注册进去...continue;}...if (ioRatio == 100) {...} else if (strategy > 0) {try {processSelectedKeys(); // 处理 keys} ... // finally} else {ranTasks = runAllTasks(0); // 运行队列中所有积压的任务}...} ... // catch finally}}/*** 监听 */private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {return selector.select(); // 相当于底层监听}long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis); // 限时监听}/*** 处理 keys */private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys()); // 处理 keys}}/*** 处理 keys */private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {... // 校验空Iterator<SelectionKey> i = selectedKeys.iterator();for (;;) {final SelectionKey k = i.next();final Object a = k.attachment();i.remove(); // 需要移除(可参考 JDK-示例)if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a); // 处理单个 key} ... // else... // 没有下一项,退出循环... // needsToSelectAgain(再次选择)处理}}/*** 处理单个 key */private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();... // key 无效处理try {int readyOps = k.readyOps();if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 客户端连接事件int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops); // 重置 key 的兴趣事件unsafe.finishConnect(); // 相当于 sc.finishConnect(),等待连接完成}if ((readyOps & SelectionKey.OP_WRITE) != 0) { // 写事件unsafe.forceFlush(); // sign_o_001 出站处理(但测试时但没调用过)}if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 读事件、接收连接事件unsafe.read(); // sign_o_002 入站处理}} ... // catch }
}
- 可参考:读写原理
启动线程
-
继续上面
NioEventLoop
原理介绍- 流程:调用
execute(Runnable)
执行任务时,若没有线程,则会启动线程并绑定
- 流程:调用
-
默认其实例变量 thread(
io.netty.util.concurrent.SingleThreadEventExecutor #thread
) 为null
- 方法
inEventLoop(curThread)
会返回false
- 方法
-
io.netty.util.concurrent.SingleThreadEventExecutor
executor
设值参考 executor-设值
/*** 单线程-执行器 */
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {private final Executor executor;/*** 执行任务 */@Overridepublic void execute(Runnable task) {execute0(task);}private void execute0(@Schedule Runnable task) {ObjectUtil.checkNotNull(task, "task");execute(task, wakesUpForTask(task)); // sign_m_101}protected boolean wakesUpForTask(Runnable task) {return true; // 直接返回 true,表示立即执行}// sign_m_121@Overridepublic boolean inEventLoop(Thread thread) {return thread == this.thread; // 第一次执行 this.thread 为 null,所有返回 false}// sign_m_101 执行任务private void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop(); // sign_m_120 第一次执行时,返回 falseaddTask(task); // 添加到队列if (!inEventLoop) {startThread(); // 启动线程... // 省略停机判断和拒绝处理}if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);}}// 启动线程private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { // CAS 改状态boolean success = false;try {doStartThread(); // 启动线程success = true;} ... // finally}}}// 启动线程private void doStartThread() {assert thread == null;/*** sign_m_501 创建线程并执行任务* * executor 为 ThreadPerTaskExecutor 实例* executor.execute() 会创建新线程* executor 设值参考 #executor-设值*/executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread(); // 给 this.thread 设值try {SingleThreadEventExecutor.this.run(); // sign_m_511 调用钩子函数 run(子类实现具体的逻辑)} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {... // 省略停机(Shutdown)处理}}});}/*** 构造器 */protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, ...) {...this.executor = ThreadExecutorMap.apply(executor, this); // sign_m_301 封装 executor...}
}
io.netty.util.concurrent.AbstractEventExecutor
/*** 抽象事件执行器 */
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {// sign_m_120 判断是不是在当前线程中执行新任务@Overridepublic boolean inEventLoop() {return inEventLoop(Thread.currentThread()); // sign_m_121 用当前线程判断}
}
io.netty.util.internal.ThreadExecutorMap
/*** 线程执行器映射。* 用于获取调用线程的 EventExecutor*/
public final class ThreadExecutorMap {/*** sign_m_301 封装 executor */public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {... // 校验return new Executor() { // 封装@Overridepublic void execute(final Runnable command) {executor.execute(apply(command, eventExecutor)); // sign_m_302 封装 command}};}/*** sign_m_302 封装 command */public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {... // 校验return new Runnable() {@Overridepublic void run() {setCurrentEventExecutor(eventExecutor);try {command.run();} finally {setCurrentEventExecutor(null);}}};}
}
io.netty.util.concurrent.ThreadPerTaskExecutor
/*** 每任务一线程执行器 */
public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory; // 线程工厂public ThreadPerTaskExecutor(ThreadFactory threadFactory) {this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");}// sign_m_501 创建线程并执行任务@Overridepublic void execute(Runnable command) {/*** sign_m_601 创建线程* ------------------* 并启动线程。*/threadFactory.newThread(command).start();}
}
executor-设值
io.netty.channel.nio.NioEventLoopGroup
// NIO 事件轮循组
public class NioEventLoopGroup extends MultithreadEventLoopGroup {/*** 构造器 */public NioEventLoopGroup(int nThreads) {this(nThreads, (Executor) null); // sign_m_401 executor 一直传 null 过去}// sign_m_401public NioEventLoopGroup(int nThreads, Executor executor) {this(nThreads, executor, SelectorProvider.provider()); // sign_m_402}// sign_m_402public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); // sign_m_403}// sign_m_403public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); // sign_m_411}/*** sign_m_4c1 创建子线程(钩子函数)* executor 一般是 ThreadPerTaskExecutor 实例*/@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception {... // 省略构造参数设置逻辑return new NioEventLoop(this, executor, selectorProvider,selectStrategyFactory.newSelectStrategy(),rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);}
}
io.netty.channel.MultithreadEventLoopGroup
// 多线程事件轮循
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {// sign_m_411 构造器protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {/*** nThreads 为 0,则默认用 cpus * 2*/super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); // sign_m_421}
}
io.netty.util.concurrent.MultithreadEventExecutorGroup
/*** 多线程事件执行组 */
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {// sign_m_421protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); // sign_m_422}// sign_m_422protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {/*** executor 传值一直为 null,所以默认使用 ThreadPerTaskExecutor 实例*/if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}children = new EventExecutor[nThreads]; // 根据线程数创建数组for (int i = 0; i < nThreads; i ++) {boolean success = false;try {/*** sign_m_4c1 创建子线程(钩子函数)* executor 传上面创建的 ThreadPerTaskExecutor 实例*/children[i] = newChild(executor, args);success = true;} catch (Exception e) {throw new IllegalStateException("failed to create a child event loop", e);} finally {... // 未成功创建子线程的处理 (success = false)}}chooser = chooserFactory.newChooser(children); // 选取器(轮循选择下一个子线程)... // 省略事件监控处理}// 创建默认线程工厂protected ThreadFactory newDefaultThreadFactory() {return new DefaultThreadFactory(getClass());}
}
io.netty.util.concurrent.DefaultThreadFactory
// 默认线程工厂
public class DefaultThreadFactory implements ThreadFactory {// sign_m_601 创建线程@Overridepublic Thread newThread(Runnable r) {Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());... // 省略守护和优先级设置return t;}// 创建 FastThreadLocalThread 实例protected Thread newThread(Runnable r, String name) {return new FastThreadLocalThread(threadGroup, r, name);}
}