Netty-选择器-监听

归档

  • GitHub: Netty-选择器-监听

介绍

  • 参考 NioEventLoop
    • 类结构:基础类介绍-NioEventLoop
  • 主要逻辑为:死循环监听 selector
  • 总结:
    • 创建的线程是 FastThreadLocalThread 实例

原理

  • io.netty.channel.nio.NioEventLoop
/*** NIO 事件轮循 */
public final class NioEventLoop extends SingleThreadEventLoop {private Selector selector;              // 封装的选择器。可参考:#openSelector()private Selector unwrappedSelector;     // JDK 底层选择器private SelectedSelectionKeySet selectedKeys;private final SelectorProvider provider;// 选择器 SPI 提供者/*** sign_m_511 钩子函数(任务逻辑),可理解为线程的 Runnable #run() 方法 */@Overrideprotected void run() {for (;;) { // 死循环监听try {int strategy;try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {... // 省略其他 casecase SelectStrategy.SELECT:long curDeadlineNanos = ... // 一般设置为 NONE...try {if (!hasTasks()) {strategy = select(curDeadlineNanos); // select 监听}} ... // finallydefault:}} catch (IOException e) {rebuildSelector0(); // 有错误,重新构建选择器并将信道注册进去...continue;}...if (ioRatio == 100) {...} else if (strategy > 0) {try {processSelectedKeys(); // 处理 keys} ... // finally} else {ranTasks = runAllTasks(0); // 运行队列中所有积压的任务}...} ... // catch finally}}/*** 监听 */private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {return selector.select(); // 相当于底层监听}long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis); // 限时监听}/*** 处理 keys */private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys()); // 处理 keys}}/*** 处理 keys */private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {... // 校验空Iterator<SelectionKey> i = selectedKeys.iterator();for (;;) {final SelectionKey k = i.next();final Object a = k.attachment();i.remove(); // 需要移除(可参考 JDK-示例)if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a); // 处理单个 key} ... // else... // 没有下一项,退出循环... // needsToSelectAgain(再次选择)处理}}/*** 处理单个 key */private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();... // key 无效处理try {int readyOps = k.readyOps();if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 客户端连接事件int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);     // 重置 key 的兴趣事件unsafe.finishConnect(); // 相当于 sc.finishConnect(),等待连接完成}if ((readyOps & SelectionKey.OP_WRITE) != 0) { // 写事件unsafe.forceFlush();    // sign_o_001 出站处理(但测试时但没调用过)}if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 读事件、接收连接事件unsafe.read();          // sign_o_002 入站处理}} ... // catch }
}
  • 可参考:读写原理

启动线程

  • 继续上面 NioEventLoop 原理介绍

    • 流程:调用 execute(Runnable) 执行任务时,若没有线程,则会启动线程并绑定
  • 默认其实例变量 thread(io.netty.util.concurrent.SingleThreadEventExecutor #thread) 为 null

    • 方法 inEventLoop(curThread) 会返回 false
  • io.netty.util.concurrent.SingleThreadEventExecutor

    • executor 设值参考 executor-设值
/*** 单线程-执行器 */
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {private final Executor executor;/*** 执行任务 */@Overridepublic void execute(Runnable task) {execute0(task);}private void execute0(@Schedule Runnable task) {ObjectUtil.checkNotNull(task, "task");execute(task, wakesUpForTask(task)); // sign_m_101}protected boolean wakesUpForTask(Runnable task) {return true; // 直接返回 true,表示立即执行}// sign_m_121@Overridepublic boolean inEventLoop(Thread thread) {return thread == this.thread; // 第一次执行 this.thread 为 null,所有返回 false}// sign_m_101 执行任务private void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop(); // sign_m_120 第一次执行时,返回 falseaddTask(task); // 添加到队列if (!inEventLoop) {startThread(); // 启动线程... // 省略停机判断和拒绝处理}if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);}}// 启动线程private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { // CAS 改状态boolean success = false;try {doStartThread(); // 启动线程success = true;} ... // finally}}}// 启动线程private void doStartThread() {assert thread == null;/*** sign_m_501 创建线程并执行任务* * executor 为 ThreadPerTaskExecutor 实例*   executor.execute() 会创建新线程*   executor 设值参考 #executor-设值*/executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread(); // 给 this.thread 设值try {SingleThreadEventExecutor.this.run(); // sign_m_511 调用钩子函数 run(子类实现具体的逻辑)} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {... // 省略停机(Shutdown)处理}}});}/*** 构造器 */protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, ...) {...this.executor = ThreadExecutorMap.apply(executor, this); // sign_m_301 封装 executor...}
}
  • io.netty.util.concurrent.AbstractEventExecutor
/*** 抽象事件执行器 */
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {// sign_m_120 判断是不是在当前线程中执行新任务@Overridepublic boolean inEventLoop() {return inEventLoop(Thread.currentThread()); // sign_m_121 用当前线程判断}
}
  • io.netty.util.internal.ThreadExecutorMap
/*** 线程执行器映射。* 用于获取调用线程的 EventExecutor*/
public final class ThreadExecutorMap {/*** sign_m_301 封装 executor */public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {... // 校验return new Executor() { // 封装@Overridepublic void execute(final Runnable command) {executor.execute(apply(command, eventExecutor)); // sign_m_302 封装 command}};}/*** sign_m_302 封装 command */public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {... // 校验return new Runnable() {@Overridepublic void run() {setCurrentEventExecutor(eventExecutor);try {command.run();} finally {setCurrentEventExecutor(null);}}};}
}
  • io.netty.util.concurrent.ThreadPerTaskExecutor
/*** 每任务一线程执行器 */
public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory; // 线程工厂public ThreadPerTaskExecutor(ThreadFactory threadFactory) {this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");}// sign_m_501 创建线程并执行任务@Overridepublic void execute(Runnable command) {/*** sign_m_601 创建线程* ------------------* 并启动线程。*/threadFactory.newThread(command).start();}
}
executor-设值
  • io.netty.channel.nio.NioEventLoopGroup
// NIO 事件轮循组
public class NioEventLoopGroup extends MultithreadEventLoopGroup {/*** 构造器 */public NioEventLoopGroup(int nThreads) {this(nThreads, (Executor) null); // sign_m_401 executor 一直传 null 过去}// sign_m_401public NioEventLoopGroup(int nThreads, Executor executor) {this(nThreads, executor, SelectorProvider.provider()); // sign_m_402}// sign_m_402public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); // sign_m_403}// sign_m_403public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); // sign_m_411}/*** sign_m_4c1 创建子线程(钩子函数)* executor 一般是 ThreadPerTaskExecutor 实例*/@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception {... // 省略构造参数设置逻辑return new NioEventLoop(this, executor, selectorProvider,selectStrategyFactory.newSelectStrategy(),rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);}
}
  • io.netty.channel.MultithreadEventLoopGroup
// 多线程事件轮循
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {// sign_m_411 构造器protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {/*** nThreads 为 0,则默认用 cpus * 2*/super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); // sign_m_421}
}
  • io.netty.util.concurrent.MultithreadEventExecutorGroup
/*** 多线程事件执行组 */
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {// sign_m_421protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); // sign_m_422}// sign_m_422protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {/*** executor 传值一直为 null,所以默认使用 ThreadPerTaskExecutor 实例*/if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}children = new EventExecutor[nThreads]; // 根据线程数创建数组for (int i = 0; i < nThreads; i ++) {boolean success = false;try {/*** sign_m_4c1 创建子线程(钩子函数)* executor 传上面创建的 ThreadPerTaskExecutor 实例*/children[i] = newChild(executor, args);success = true;} catch (Exception e) {throw new IllegalStateException("failed to create a child event loop", e);} finally {... // 未成功创建子线程的处理 (success = false)}}chooser = chooserFactory.newChooser(children); // 选取器(轮循选择下一个子线程)... // 省略事件监控处理}// 创建默认线程工厂protected ThreadFactory newDefaultThreadFactory() {return new DefaultThreadFactory(getClass());}
}
  • io.netty.util.concurrent.DefaultThreadFactory
// 默认线程工厂
public class DefaultThreadFactory implements ThreadFactory {// sign_m_601 创建线程@Overridepublic Thread newThread(Runnable r) {Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());... // 省略守护和优先级设置return t;}// 创建 FastThreadLocalThread 实例protected Thread newThread(Runnable r, String name) {return new FastThreadLocalThread(threadGroup, r, name);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/17759.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vivado设置Vscode为默认编辑器

D:\vscode\Microsoft VS Code\Code.exe -g [file name]:[line number]

鸿蒙ArkUI-X跨平台开发:【资源分类与访问】

资源分类与访问 应用开发过程中&#xff0c;经常需要用到颜色、字体、间距、图片等资源&#xff0c;在不同的设备或配置中&#xff0c;这些资源的值可能不同。 应用资源&#xff1a;借助资源文件能力&#xff0c;开发者在应用中自定义资源&#xff0c;自行管理这些资源在不同…

话术巧妙分隔沟通效果更佳看看这个小技巧

客服回复客户咨询&#xff0c;如果遇到比较复杂的问题&#xff0c;经常会有大段的文字回复&#xff0c;用聊天宝的分段符功能&#xff0c;在需要分段的地方点击右上角的“插入分隔符”&#xff0c;就可以在指定位置分段&#xff0c;实现多段发送的目的。 前言 客服回复客户咨询…

干冰清洗机的清洗原理及应用

干冰清洗机的清洗原理及应用可以详细阐述如下&#xff1a; 一、清洗原理 干冰清洗机的清洗原理主要基于干冰的低温冷冻作用。干冰在常温下会迅速升华&#xff0c;吸收大量的热量&#xff0c;使周围的温度迅速降低。当干冰颗粒通过特殊的干冰清洗机喷射到清洗物体表面时&#…

系统架构设计师【第1章】: 绪论 (核心总结)

文章目录 1.1 系统架构概述1.1.1 系统架构的定义及发展历程1.1.2 软件架构的常用分类及建模方法1.1.3 软件架构的应用场景1.1.4 软件架构的发展未来 1.2 系统架构设计师概述1.2.1 架构设计师的定义、职责和任务1.2.2 架构设计师应具备的专业素质1.2.3 架构设计师的知识…

Java入门基础学习笔记45——String使用的注意事项

String使用时的注意事项&#xff1a; 1&#xff09;String对象的内容不可改变&#xff0c;被称为不可变字符串对象。 Strings are constant; their values cannot be changed after they are created. String buffers support mutable strings. Because String objects are im…

C++代码使用ClangCL编译注意事项

遇到cmake指定模板类工程使用msvc的clang编译器编译代码&#xff0c;代码变量出现与预期不符的问题&#xff1b; 如下&#xff1a; clangcl将实现放到头文件里则不会出现这样的情况&#xff1b; 最后按照pcl的模板类写法则解决这个问题&#xff1b;

前端常见的页面自适应布局方案

流式布局 使用百分比单位等单位来表示长度和宽度&#xff0c;这样在整体的长度和宽度变化的时候&#xff0c;里面的内容也会发生变化。 使用 % 直接使用百分比&#xff0c;在页面宽度变化的时候会对应放大缩小。 <!DOCTYPE html> <html lang"en"><he…

vue3 RouterLink路由query传参

vue route query传参 一、传参页面,需要传id、title、content三个参数 <ul> <li v-for"news in newsList" :key"news.id"> <!--/news/detail--> <RouterLink :to"{ path:/news/deta…

[SCTF2019]Who is he

unity 游戏&#xff0c;直接输入字符串 直接修改 if 判断&#xff0c;看能不能直接输出flag 修改了程序逻辑&#xff0c;但还是输出了 明明已经把这个 if 删了 不知道为什么还会输出这串字符 应该程序还有什么引入吧&#xff0c;看 wp 应该先查一下程序的动态链接库 DLL 是…

瓦罗兰特账号怎么注册 瓦罗兰特延迟高用什么加速器

《瓦罗兰特》&#xff08;Valorant&#xff09;是由拳头游戏&#xff08;Riot Games&#xff09;开发并发行的一款免费的多人在线第一人称射击游戏&#xff08;FPS&#xff09;&#xff0c;它结合了传统的硬核射击机制与英雄角色的能力系统&#xff0c;为玩家提供了独特的竞技体…

【加密与解密(第四版)】第十七章笔记

第十七章 软件保护技术 17.1 防范算法求逆 17.2 抵御静态分析 反汇编算法&#xff1a;线性扫描&#xff08;无法正确地将代码和数据分开&#xff09;、递归进行 巧妙构造代码和数据&#xff0c;在指令流中插入很多“数据垃圾"&#xff0c;干扰反汇编软件的判断&#xf…

为什么要学习c++?

你可能在想&#xff0c;“C&#xff1f;那不是上个时代的产物吗&#xff1f;” 哎呀&#xff0c;可别小看了这位“老将”&#xff0c;它在21世纪的科技舞台上依旧光芒万丈&#xff0c;是许多尖端技术不可或缺的基石&#xff01; 1. 无可替代 c源于c语言&#xff0c;它贴近于硬…

地下停车场FM信号覆盖系统技术原理用与应用

随着我国城市化水平的快速推进与房地产的快速发展&#xff0c;城市停车场称为每栋建筑物的硬性配套建筑&#xff0c;尤其是商业综合体、医院、政府机关、机场、高铁站等场所出现了超大规模停车场&#xff0c;停放车辆可达数千辆&#xff0c;停车场的智能化与信息化水平也越来越…

Java | Leetcode Java题解之第104题二叉树的最大深度

题目&#xff1a; 题解&#xff1a; class Solution {public int maxDepth(TreeNode root) {if (root null) {return 0;}Queue<TreeNode> queue new LinkedList<TreeNode>();queue.offer(root);int ans 0;while (!queue.isEmpty()) {int size queue.size();wh…

Llama模型家族之使用 Supervised Fine-Tuning(SFT)微调预训练Llama 3 语言模型(十) 使用 LoRA 微调常见问题答疑

LlaMA 3 系列博客 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;一&#xff09; 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;二&#xff09; 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;三&#xff09; 基于 LlaMA…

进程和用户管理

查看进程的命令 ps top pstree 发送信号命令 kill 使用是后加-l 用户管理命令 添加用户:sudo adduser 用户名 修改组:sudo usermod -G 用户名1 用户名2 修改家目录:sudo usermod -d /home/用户名 -m 用户名 删除用户名:sudo deluser --remove -home 用户名

Docker 快速搭建 MongoDB 4.x 集群(一主一从)

目录 1. 生成 mongo-file2. 启动主节点3. 启动从节点4. 配置副本集5. 注意事项 环境&#xff1a;MongoDB 4.0.25&#xff0c;Alma Linux&#xff08;建议使用 Linux&#xff09; 部署的时候是在同一个及其上操作的&#xff0c;实际可以放在不同机器上。 截止到 2024年05月&…

JAVA学习·String类的常用方法

String 类及其创建 String 类的创建 String 类是 Java 内置的一个类&#xff0c;其完全限定类名是java.lang.String。想要创建一个字符串有多重方式&#xff0c;比如创建字符串"Hello"&#xff1a; String s1 "Hello"; // 字面量创建 String s2 new St…

在组件外使用pinia的坑

来源 项目包含很多静态的类型&#xff0c;我新建了一个js来专门管理和使用这些类型&#xff0c;如下图这种&#xff0c;有一部分是固定的&#xff0c;千年不变&#xff0c;有一部分是偶尔会变&#xff08;需要后台获取&#xff09;&#xff0c;还有一部分是要登录后才能拿到的…