Reactor模型:网络线程模型演进

一,阻塞IO线程池模型(BIO)


这是传统的网络编程方案所采用的线程模型。
即有一个主循环,socket.accept阻塞等待,当建立连接后,创建新的线程/从线程池中取一个,把该socket连接交由新线程全权处理。
这种方案优点即实现简单,缺点则是方案的伸缩性受到线程数的限制。
// 循环监听
while (true) {// 阻塞监听客户端请求client = server.accept();System.out.println(client.getRemoteSocketAddress() + "客户端连接成功!");// 将该客户端请求通过线程池放入HandlMsg线程中进行处理executorService.execute(new HandleMsg(client));
}

二,Reactor单线程模型


有了NIO后,可以采用IO多路复用机制了。
这是一个单Reactor单线程模型,时序图见下文,该方案只有一个线程,所有Channel的连接均注册在了该Reactor上,由一个线程全权负责所有的任务。
这种方案实现简单,且不受线程数的限制,但受限于使用场景,仅适合于IO密集的应用,不太适合CPU密集的应用,且适合于CPU资源紧张的应用上。

三,Reactor线程池模型


Reactor负责全部IO任务(包括每个Channel的连接和读写),线程池负责业务逻辑的处理。
虽然该方案可以充分利用CPU资源,但是这个方案比单线程版本多了进出Thread Pool的两次上下文切换。

四,主从Reactor模型(Netty的线程模型)


  • MainReactor负责连接任务,SubReactor负责IO读写、业务计算。
  • MainReactor和每个SubReactor都是单独的线程,可以调整SubReactor的数量适应CPU资源紧张的应用。
  • 该方案有一个不太明显的缺点,即Session没有分优先级,所有Session平等对待均分到所有的线程中,这样可能会导致优先级低耗资源的Session堵塞高优先级的Session。( TODO 看下Netty的优化

五,主从Reactor线程池模型


和主从Reactor模型相比, 只是把业务计算放到线程池里了,IO读写还是在SubReactor线程里。
该模型可以更为灵活的适应大多应用场景,通过:调整SubReactor数量、调整Thread Pool参数等。
注意:
  1. 如果将IO读写放到线程池里,可能会出现问题:SubReactor选中读就绪事件立马交给线程池,但线程还没来得及read,Channel由于仍然读就绪被select出来重复执行。
  2. 上图这样把Channel的读写放在SubReactor,那么此SubReactor上不同Channel的读写会阻塞,但可能效率很高也问题不大。
主从Reactor线程池模型代码示例(调试过了,注意细节见注释)
客户端
public class ReactorClient {public static void main(String[] args) throws IOException, InterruptedException {for (int i = 0; i < 4; i++) {new Thread(() -> {try {send();} catch (IOException e) {e.printStackTrace();}}).start();}}static void send() throws IOException {// 阻塞模式读写SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress("127.0.0.1", 9090));ByteBuffer writeBuff = ByteBuffer.allocate(20);/*** 分配太小时,客户端表现:接收数据不完整,但正常退出;服务端表现:业务读写正常,但业务结束后会收到2次读就绪事件,一次读返-1,关闭channel,一次读就会报java.io.IOException: Connection reset by peer* TODO 研究下这个原理和如何分配大小*/ByteBuffer readBuff = ByteBuffer.allocate(2000);writeBuff.put(("i am client " + Thread.currentThread().getName()).getBytes());writeBuff.flip();new Thread(new Runnable() {@Override@SneakyThrowspublic void run() {socketChannel.write(writeBuff);System.out.println(Thread.currentThread().getName() + " 已发送数据,等待返回");readBuff.clear();// 阻塞等服务端消息socketChannel.read(readBuff);readBuff.flip();System.out.println(Thread.currentThread().getName() + " 接受服务端消息:" + new String(readBuff.array()));// 正常来讲应放入finallysocketChannel.close();}}).start();}
}

服务端

/*** 主从Reactor多线程模型*/
public class MainSubReactorMultiThread {private static final int SUB_COUNT = 4;public static void main(String[] args) {MainSubReactorMultiThread.MainReactor mainReactor = new MainSubReactorMultiThread.MainReactor(9090);mainReactor.run();}/*** 选择就绪的连接事件*/public static class MainReactor implements Runnable {ServerSocketChannel serverSocketChannel;Selector selector;public MainReactor(int port) {try {serverSocketChannel = ServerSocketChannel.open();selector = Selector.open();serverSocketChannel.socket().bind(new InetSocketAddress(port));serverSocketChannel.configureBlocking(false);// 注册了连接事件SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// 并且在selectionKey对象附加了一个Acceptor对象,这是用来处理连接请求的类selectionKey.attach(new MainSubReactorMultiThread.Acceptor(serverSocketChannel));} catch (IOException e) {e.printStackTrace();}}@Overridepublic void run() {while (true) {try {System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "mainSelector, 开始监听");selector.select();System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "mainSelector, 监听到连接件");Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();// 这里因为是通过attach附加了事件响应的Runnable,所以不用区分事件类型dispatcher(selectionKey);iterator.remove();}} catch (IOException e) {e.printStackTrace();}}}private void dispatcher(SelectionKey selectionKey) {Runnable runnable = (Runnable) selectionKey.attachment();// 同线程执行runnable.run();}}/*** 选择就绪的读写事件*/public static class SubReactor implements Runnable {Selector subSelector;int index;public SubReactor(Selector subSelector, int index) {this.subSelector = subSelector;this.index = index;}@Overridepublic void run() {while (true) {try {System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", 开始监听");int selectNum = subSelector.select();if (selectNum != 0) {System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", 监听到就绪事件:" + JSON.toJSONString(subSelector.selectedKeys()));} else {System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", 未监听到事件,继续轮训");continue;}Set<SelectionKey> selectionKeys = subSelector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();// 这里因为是通过attach附加了事件响应的Runnable,所以不用区分事件类型dispatcher(selectionKey);iterator.remove();}} catch (IOException e) {e.printStackTrace();}}}@SneakyThrowsprivate void dispatcher(SelectionKey selectionKey) {while (true) {Runnable runnable = (Runnable) selectionKey.attachment();if (runnable != null) {// 同线程执行runnable.run();return;}System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "subSelector:" + index + ", runnable对象未添加完成,等待10ms");Thread.sleep(10);}/*** 可能在Acceptor里刚注册channel到selector就被reactor选中执行了,这时注册channel的地方还没执行attach方法,runnable会报NPE,所以要判空*/
//            Runnable runnable = (Runnable) selectionKey.attachment();
//            runnable.run();}}/*** 处理连接*/public static class Acceptor implements Runnable {private static Selector[] subSelector = new Selector[SUB_COUNT];private ServerSocketChannel serverSocketChannel;/*** 单线程不会冲突*/private int index = -1;@SneakyThrowspublic Acceptor(ServerSocketChannel serverSocketChannel) {for (int i = 0; i < SUB_COUNT; i++) {subSelector[i] = Selector.open();SubReactor subReactor = new SubReactor(subSelector[i], i);new Thread(subReactor).start();}this.serverSocketChannel = serverSocketChannel;}@Overridepublic void run() {try {SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);int ind = getNextIndex();/*** 本来以为没必要的,但如果不wakeup,会在下一步register阻塞!底层在等待synchronized同步锁* TODO 研究下原理*/subSelector[ind].wakeup();SelectionKey selectionKey = socketChannel.register(subSelector[ind], SelectionKey.OP_READ);selectionKey.attach(new MainSubReactorMultiThread.ThreadPollWorkHandler(socketChannel));System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "thread:" + Thread.currentThread().getName() + ", " + "客户端已连接:" + socketChannel.getRemoteAddress());} catch (IOException e) {e.printStackTrace();}}private int getNextIndex() {if (index++ == SUB_COUNT - 1) {index = 0;}return index;}}/*** 处理读写*/public static class ThreadPollWorkHandler implements Runnable {private static ExecutorService executorService = Executors.newCachedThreadPool();private SocketChannel socketChannel;public ThreadPollWorkHandler(SocketChannel socketChannel) {this.socketChannel = socketChannel;}@Overridepublic void run() {try {System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", " + "开始处理socket读");/*** 读数据*/ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int readLength = socketChannel.read(byteBuffer);if (readLength == -1) {System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", " + "客户端已关闭,关闭此通道");socketChannel.close();return;}String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", 客户端:" + socketChannel.getRemoteAddress() + ", socket读完成: " + message);/*** 线程池处理业务计算*/TaskHandler taskHandler = new TaskHandler(socketChannel, message);Future<String> taskResult = executorService.submit(taskHandler);/*** 写数据*/ByteBuffer writeBuffer = ByteBuffer.wrap((socketChannel.getRemoteAddress() + ":" + taskResult.get()).getBytes(StandardCharsets.UTF_8));socketChannel.write(writeBuffer);System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】" + "socketChannel:" + socketChannel.hashCode() + ", " + "已返回客户端数据,请求处理最终完成");} catch (Exception e) {e.printStackTrace();}}}static class TaskHandler implements Callable<String> {private SocketChannel socketChannel;private String parameter;public TaskHandler(SocketChannel socketChannel, String parameter) {this.socketChannel = socketChannel;this.parameter = parameter;}@Overridepublic String call() throws Exception {System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】线程池Thread:" + Thread.currentThread().getName() + "socketChannel:" + socketChannel.hashCode() + ", 客户端:" + socketChannel.getRemoteAddress() + ", 开始处理业务计算 参数: " + parameter);Thread.sleep(1000);String result = String.format("response(%s) for (%s)", RandomStringUtils.randomAlphanumeric(30), parameter).trim();System.out.println("【" + DateUtil.getCurrentDateAndTime() + "】线程池Thread:" + Thread.currentThread().getName() + "socketChannel:" + socketChannel.hashCode() + ", 客户端:" + socketChannel.getRemoteAddress() + ", 业务计算完成 返回: " + result);return result;}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/35003.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【机器参数】安装适合的nvidia驱动

背景&#xff1a;我现在nvidia-smi没有显示&#xff0c;我的目标是让nvidia-smi正常显示 参考&#xff1a; nchttps://www.cnblogs.com/carle-09/p/11504544.html 可能是驱动版本不对&#xff0c;所以我重新去nvidia官网下载了驱动。 Official Drivers | NVIDIA 得到了NVID…

discuz迪恩cul!教育课程培训网站模板

Discuz x3.2模板 迪恩cul!教育课程培训 GBK&#xff0c;程序包中内附详细的安装教程&#xff0c;下载后按照教程安装即可 discuz迪恩cul!教育课程培训网站模板

如何将本地的Django项目部署到阿里云服务器上?

场景&#xff1a;在本地的pycharm上已经写好了一个Django架构的网站&#xff0c;现在要把它放到公网上 一、阿里云服务器 选择云服务器ECS&#xff0c;新用户可以免费使用三个月 购买时选择预装宝塔面板 买好后&#xff0c;进入云服务器控制台 重置实例密码 远程连接至服务…

python-17-零基础自学python-

学习内容&#xff1a;《python编程&#xff1a;从入门到实践》第二版 知识点&#xff1a; 类、子类、继承、调用函数 练习内容&#xff1a; 练习9-6&#xff1a;冰激凌小店 冰激凌小店是一种特殊的餐馆。编写一个名为IceCreamStand的类&#xff0c;让它继承为完成练习9-1或…

宝塔计划任务调用node程序时,log4js日志保存本地位置会发生变化

接我上一篇文章的情况 超简单的nodejs使用log4js保存日志到本地&#xff08;可直接复制使用&#xff09;-CSDN博客 原本应当保存在node项目目录下的日志文件&#xff0c;如果使用宝塔的计划任务来定时执行的话&#xff0c;日志保存路径会发生变化到如下图的位置&#xff1a; 如…

六款顶级原型设计工具推荐,满足你所有需求!

即时设计作为一款专业原型工具&#xff0c;无论是从功能还是插件库配备情况来看&#xff0c;都是毫无疑问可以进行原型图设计的&#xff0c;而且&#xff0c;即时设计内设海量资源库&#xff0c;可以支持大家通过关键词进行搜索相关资源&#xff0c;并且在线编辑使用&#xff0…

瑜伽馆管理系统的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;管理员管理&#xff0c;教练管理&#xff0c;用户管理&#xff0c;瑜伽管理&#xff0c;套餐管理&#xff0c;体测报告管理&#xff0c;基础数据管理 前台账户功能包括&#xff1a;系统首页&#xff0c…

51单片机STC89C52RC——8.1 8*8 LED点阵模块(点亮一个LED)

目录 目的/效果 一&#xff0c;STC单片机模块 二&#xff0c;8*8 LED点阵模块 2.1 电路图 2.1.1 8*8 点阵模块电路图 2.1.2 74HC595&#xff08;串转并&#xff09;模块 电路图 2.1.3 芯片引脚 2.2 引脚电平分析 2.3 74HC595 串转并模块 2.3.1 装弹&#xff08;移位…

2024最新免费版轻量级Navicat Premium Lite 下载和安装教程

2024最新免费版轻量级Navicat Premium Lite 下载和安装教程 关于猫头虎 大家好&#xff0c;我是猫头虎&#xff0c;别名猫头虎博主&#xff0c;擅长的技术领域包括云原生、前端、后端、运维和AI。我的博客主要分享技术教程、bug解决思路、开发工具教程、前沿科技资讯、产品评…

【C++】————类和对象(下)

作者主页&#xff1a; 作者主页 本篇博客专栏&#xff1a;C 创作时间 &#xff1a;2024年6月25日 一、日期类 首先我们先来看一下通过类实现对日期的一系列处理&#xff0c;同时给大家说一下当中存在的一些细节问题&#xff1a; 1.1 GetMonthDay函数 这个函数的作用就是…

客户有哪些封装案例,一句克服使用让PCBA工厂泪流满面

作者 | 高速先生成员--王辉东 天空下着雨&#xff0c;萧萧从窗前经过&#xff0c;看窗里。 翠萍那娇艳欲滴的脸上挂着两串泪滴。 萧萧一进去&#xff0c;问啥情况。 翠萍往电脑屏幕一指。 当萧萧看向屏幕一瞬间。 那些曾经以为早已遗忘的伤痛&#xff0c;会在某些时刻如潮…

新增题目接口开发

文章目录 1.基本设计2.生成CRUD代码1.生成五张表的代码1.subject_info2.subject_brief3.subject_judge4.subject_multiple5.subject_radio 2.将所有的dao放到mapper文件夹3.将所有实体类使用lombok简化4.删除所有mapper的Param("pageable") Pageable pageable5.删除所…

【大数据】—谁是世界上最富的人?

引言 在2024年&#xff0c;全球财富的分布再次成为公众和经济学家关注的焦点。随着经济的波动和新兴市场的崛起&#xff0c;亿万富翁的名单也在不断变化。本文将深入探讨这一现象&#xff0c;通过最新的数据可视化分析&#xff0c;揭示世界上最富有的人在2024年的财富状况和趋…

解锁品牌推广小妙招:如何高效塑造品牌影响力

在信息化大时代&#xff0c;企业如何做好品牌传播是一个复杂而重要的课题。随着信息爆炸和新媒体的兴起&#xff0c;传统的广告投放已经无法完全满足品牌的宣传需求&#xff0c;媒体公关传播越来越为企业所重视。今天投媒网就来与您分享在信息化大时代&#xff0c;企业如何做好…

在Mandelbrot 集中“缩放”特定区域

1、问题背景 在创建一个快速生成 Mandelbrot 集图像的 Python 程序时&#xff0c;程序开发者遇到一个问题&#xff1a;他想要渲染该集合的一个特定区域&#xff0c;但他不知道如何修改代码中的数学部分来实现 “缩放”。 2、解决方案 第一种解决方案 问题根源是代码中的一行…

SVN学习(007 svn安装Tortoise工具)

尚硅谷SVN高级教程(svn操作详解) 总时长 4:53:00 共72P 此文章包含第58p-第p72的内容 介绍 安装 选择自己想要装软件的文件夹 进入工作目录&#xff0c;发现无svn的图标&#xff0c;重启电脑即可 就能看到svn的图标 settings功能 进行图标的查看 修改subversion配置文件 …

安卓直装植物大战僵尸杂交版V2.1版完美运行

在移动游戏的世界里&#xff0c;植物大战僵尸无疑是一款深受玩家喜爱的经典游戏。如今&#xff0c;随着技术的发展和玩家需求的变化&#xff0c;植物大战僵尸杂交版V2.1版应运而生&#xff0c;为安卓用户带来了全新的游戏体验。 这一全新版本在原有游戏的基础上进行了多项创新…

SAP系统中的应付账款(与MM集成,关账操作)

1. 与物料管理的集成 Plant: 工厂是后勤中的位于中心位置的组织对象&#xff0c;一个“工厂”可以是公司内的一个作业区&#xff0c;或一个分支机构。一个“工厂”可以是一个中央交付仓库&#xff0c;可以是一个区域的销售营业部&#xff0c;一个制造工厂&#xff0c;一个集团…

合适的智能猫砂盆到底怎么挑?开放式封闭式一次说清!

想当初我也是在网上看了各种测评&#xff0c;纠结了好久才下定决心入手了智能猫砂盆。封闭式和开放式都用过&#xff0c;各有各的利与弊&#xff0c;不过最后的我还是选择了开放式的智能猫砂盆&#xff0c;因为开放式的设计结构会更加方便我观察小猫&#xff0c;哪个铲屎官不喜…

采购OLED透明屏指南

一、引言 OLED透明屏作为一种前沿的显示技术&#xff0c;以其独特的透明度和出色的显示效果&#xff0c;受到了众多行业的青睐。在采购OLED透明屏时&#xff0c;需要综合考虑多个因素&#xff0c;以确保选择到符合需求的高质量产品。以下是一份详细的采购OLED透明屏指南&#x…