Reactor反应器模式

文章目录

    • 一、单线程Reactor反应器模式
    • 二、多线程Reactor反应器模式

在Java的OIO编程中,最初和最原始的网络服务器程序使用一个while循环,不断地监听端口是否有新的连接,如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网络连接的处理没有结束,那么后面的连接请求没法被接收,于是后面的请求统统会被阻塞住,服务器的吞吐量就太低了。

为了解决这个严重的连接阻塞问题,出现了一个即为经典模式:Connection Per Thread。即对于每一个新的网络连接都分配一个线程,每个线程都独自处理自己负责的输入和输出,任何socket连接的输入和输出处理不会阻塞到后面新socket连接的监听和建立。早期版本的Tomcat服务器就是这样实现的。

这种模式的优点是解决了前面的新连接被严重阻塞的问题,在一定程度上极大地提高了服务器的吞吐量。但是对于大量的连接,需要消耗大量的现成资源,如果线程数太多,系统无法承受。而且线程的反复创建、销毁、线程的切换也需要代价。因此高并发应用场景下多线程OIO的缺陷是致命的,因此引入了Reactor反应器模式。

反应器模式由Reactor反应器线程、Handlers处理器两大角色组成:

  1. Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器
  2. Handlers处理器的职责:非阻塞的执行业务处理逻辑

一、单线程Reactor反应器模式

Reactor反应器模式有点儿类似事件驱动模式,当有事件触发时,事件源会将事件dispatch分发到handler处理器进行事件处理。反应器模式中的反应器角色类似于事件驱动模式中的dispatcher事件分发器角色。

  • Reactor反应器:负责查询IO事件,当检测到一个IO时间,将其发送给对应的Handler处理器处理,这里的IO事件就是NIO选择器监控的通道IO事件。
  • Handler处理器:与IO事件绑定,负责IO事件的处理,完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写出到通道等。

基于NIO实现单线程版本的反应器模式需要用到SelectionKey选择键的几个重要的成员方法:

  1. void attach(Object o):将任何的Java对象作为附件添加到SelectionKey实例,主要是将Handler处理器实例作为附件添加到SelectionKey实例
  2. Object attachment():取出之前通过attach添加到SelectionKey选择键实例的附件,一般用于取出绑定的Handler处理器实例。

Reactor实现示例:

package cn.ken.jredis;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 14:29*/
public class Reactor implements Runnable {final private Selector selector;final private ServerSocketChannel serverSocketChannel;public Reactor() {try {this.selector = Selector.open();this.serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(8088));// 注册ServerSocket的accept事件SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// 为事件绑定处理器sk.attach(new AcceptHandler());} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {try {while (!Thread.interrupted()) {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey selectedKey : selectionKeys) {dispatch(selectedKey);}selectionKeys.clear();}} catch (Exception e) {throw new RuntimeException(e);}}private void dispatch(SelectionKey selectedKey) {Runnable handler = (Runnable) selectedKey.attachment();// 此处返回的可能是AcceptHandler也可能是IOHandlerhandler.run();}class AcceptHandler implements Runnable {@Overridepublic void run() {try {SocketChannel socketChannel = serverSocketChannel.accept();if (socketChannel != null) {new IOHandler(selector, socketChannel); // 注册IO处理器,并将连接加入select列表}} catch (IOException e) {throw new RuntimeException(e);}}}public static void main(String[] args) {new Reactor().run();}
}

Handler实现示例:

package cn.ken.jredis;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 14:53*/
public class IOHandler implements Runnable {final private SocketChannel socketChannel;final private ByteBuffer buffer;public IOHandler(Selector selector, SocketChannel channel) {buffer = ByteBuffer.allocate(1024);socketChannel = channel;try {channel.configureBlocking(false);SelectionKey sk = channel.register(selector, 0); // 此处没有注册感兴趣的事件sk.attach(this);sk.interestOps(SelectionKey.OP_READ); // 注册感兴趣的事件,下一次调用select时才生效selector.wakeup(); // 立即唤醒当前阻塞select操作,使得迅速进入下次select,从而让上面注册的读事件监听可以立即生效} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {try {int length;while ((length = socketChannel.read(buffer)) > 0) {System.out.println(new String(buffer.array(), 0, length));}} catch (IOException e) {throw new RuntimeException(e);}}
}

在单线程反应器模式中,Reactor反应器和Handler处理器都执行在同一条线程上(dispatch方法是直接调用run方法,没有创建新的线程),因此当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。

二、多线程Reactor反应器模式

既然Reactor反应器和Handler处理器在一个线程会造成非常严重的性能缺陷,那么可以使用多线程对基础的反应器模式进行改造。

  1. 将负责输入输出处理的IOHandler处理器的执行,放入独立的线程池中。这样业务处理线程与负责服务监听和IO时间查询的反应器线程相隔离,避免服务器的连接监听收到阻塞。
  2. 如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器线程,同时引入多个选择器,每一个SubReactor子线程负责一个选择器。

MultiReactor:

package cn.ken.jredis;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 16:51*/
public class MultiReactor {private final ServerSocketChannel server;private final Selector[] selectors = new Selector[2];private final SubReactor[] reactors = new SubReactor[2];private final AtomicInteger index = new AtomicInteger(0);public MultiReactor() {try {server = ServerSocketChannel.open();selectors[0] = Selector.open();selectors[1] = Selector.open();server.bind(new InetSocketAddress(8080));server.configureBlocking(false);SelectionKey register = server.register(selectors[0], SelectionKey.OP_ACCEPT);register.attach(new AcceptHandler());reactors[0] = new SubReactor(selectors[0]);reactors[1] = new SubReactor(selectors[1]);} catch (IOException e) {throw new RuntimeException(e);}}private void startService() {new Thread(reactors[0]).start();new Thread(reactors[1]).start();}class SubReactor implements Runnable {final private Selector selector;public SubReactor(Selector selector) {this.selector = selector;}@Overridepublic void run() {while (!Thread.interrupted()) {try {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey selectionKey : selectionKeys) {dispatch(selectionKey);}selectionKeys.clear();} catch (IOException e) {throw new RuntimeException(e);}}}}private void dispatch(SelectionKey selectionKey) {Runnable attachment = (Runnable) selectionKey.attachment();if (attachment != null) {attachment.run();}}class AcceptHandler implements Runnable {@Overridepublic void run() {try {SocketChannel socketChannel = server.accept();new MultiHandler(selectors[index.getAndIncrement()], socketChannel);if (index.get() == selectors.length) {index.set(0);}} catch (IOException e) {throw new RuntimeException(e);}}}
}

MultiHandler:

package cn.ken.jredis;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** <pre>** </pre>** @author <a href="https://github.com/Ken-Chy129">Ken-Chy129</a>* @since 2023/10/14 17:28*/
public class MultiHandler implements Runnable {final private Selector selector;final private SocketChannel channel;final ByteBuffer buffer = ByteBuffer.allocate(1024);static ExecutorService pool = Executors.newFixedThreadPool(4);public MultiHandler(Selector selector, SocketChannel channel) {this.selector = selector;this.channel = channel;try {channel.configureBlocking(false);SelectionKey register = channel.register(selector, SelectionKey.OP_READ);register.attach(this);selector.wakeup();} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {pool.execute(() -> {synchronized (this) {int length;try {while ((length = channel.read(buffer)) > 0) {System.out.println(new String(buffer.array(), 0, length));buffer.clear();}} catch (IOException e) {throw new RuntimeException(e);}}   });}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/121722.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分享一波操作系统、谢希仁版本计算机网络学习笔记【思维导图】

操作系统复习笔记 - 幕布第一章引论第二章处理器管理进程同步与通信https://www.mubu.com/doc/58qrnf20ndg 大纲 - 幕布物理层数据链路层网络层https://www.mubu.com/doc/1eo9_8TyUdg计算机网络-语雀https://www.yuque.com/yuqueyonghu6nc56e/dgg1dl/wx34gx72xpgmt598?singleD…

HackTheBox-Starting Point--Tier 1---Crocodile

文章目录 一 题目二 实验过程 一 题目 Tags Web、Network、Custom Applications、Protocols、Apache、FTP、Reconnaissance、Web Site Structure Discovery、Clear Text Credentials、Anonymous/Guest Access译文&#xff1a;Web、网络、定制应用程序、协议、Apache、FTP、侦…

Python的random随机模块相关学习记录

random是有关随机功能的一个内置模块 import random# 获取0-1之间的随机小数 print(random.random()) # 0.6224750165089413 # 获取0-1之间的随机小数# a-----b之间的随机小数 a 0 b 10 print(random.uniform(a, b)) # 1.25491670861257# 两边的值都包含在内&#xff0c;获…

html和css中图片加载与渲染的规则是什么?

浏览器渲染web页面的过程 解析html&#xff0c;构成dom树 2.加载css&#xff0c;构成样式规则树 3.加载js&#xff0c;解析js代码 4.dom树和样式树进行匹配&#xff0c;构成渲染树 5.计算元素位置进行页面布局 5.绘制页面&#xff0c;呈现到浏览器中 图片加载和渲染的过程 1.解…

java后端请求过滤options方式,亲测有效

前端每次发出post 请求时&#xff0c;浏览器会默认请求2次&#xff0c;一次是options类型&#xff0c;一次是真实的请求&#xff0c;为了避免这种情况发生&#xff0c;需在后端过滤器中拦截下options请求&#xff0c;代码如下&#xff1a; import java.io.IOException; import …

华为eNSP配置专题-策略路由的配置

文章目录 华为eNSP配置专题-策略路由的配置0、概要介绍1、前置环境1.1、宿主机1.2、eNSP模拟器 2、基本环境搭建2.1、终端构成和连接2.2、终端的基本配置 3、配置接入交换机上的VLAN4、配置核心交换机为网关和DHCP服务器5、配置核心交换机和出口路由器互通6、配置PC和出口路由器…

【软件安装环境配置】vscode 安装界面没有出现安装路径的选择 的解决,以及vscode的删除的问题

由于vscode 没有删除干净&#xff0c;就会出现vscode 安装的时候&#xff0c;没有出现安装路径的界面&#xff0c;所以可以来到vscode的安装路径&#xff0c;点击 unins000.exe 文件就可以 实现将vscode 相关的文件删除&#xff0c; 如果是删除了整个vscode 安装下的文件&…

Win11 安装wsl遇到的问题解决

Win11 安装wsl遇到的问题解决 Win11 安装wsl遇到的问题解决WslRegisterDistribution failed:0x8007019eWslRegisterDistribution failed:0x800701bcUbuntu换源WSL通过网络访问Windows Win11 安装wsl遇到的问题解决 WslRegisterDistribution failed:0x8007019e 参考Link WslR…

软考高项-计算题(3)

题10 问题一 EV50*0.525 问题二 EACBAC/CPI CPIEV/AC25/28 EAC50*28/2556 问题三 因为CPI<1&#xff0c;所以项目实际费用超支 题11 PV2000500010000750006500020000177000 AC2100450012000860006000015000179600 EV200050001000075000*0.965000*0.720000*0.351370…

智能终端界面自动化测试操作工具 - Appium常见用法

1. Appium 是什么可以做什么&#xff1f; Appium 是一款开源的移动应用自动化测试框架&#xff0c;用于测试移动应用程序的功能和用户界面。它支持多种移动平台&#xff0c;包括 Android 和 iOS&#xff0c;可以使用多种编程语言进行脚本编写&#xff0c;如 Python、Java、Jav…

网络协议--TCP的成块数据流

20.1 引言 在第15章我们看到TFTP使用了停止等待协议。数据发送方在发送下一个数据块之前需要等待接收对已发送数据的确认。本章我们将介绍TCP所使用的被称为滑动窗口协议的另一种形式的流量控制方法。该协议允许发送方在停止并等待确认前可以连续发送多个分组。由于发送方不必…

SpringMVC Day02 : 请求方式

前言 欢迎阅读 Spring MVC 系列教程的第二篇文章&#xff01;在上一篇文章中&#xff0c;我们介绍了 Spring MVC 的基本概念和使用方法。今天&#xff0c;我们将深入探讨 Spring MVC 中不同的请求方式&#xff0c;以及如何在你的应用程序中正确地处理它们。 在 Web 开发中&am…

nlp与知识图谱代码解读_词嵌入

目录 词嵌入简单原理代码案例解读专业原理介绍场景 词嵌入 简单原理 可以使用一些比喻和生活中的例子&#xff1a; 老师&#xff1a; 你们还记得玩乐高积木的时候&#xff0c;每个积木块代表了一个特定的事物或形状吗&#xff1f;现在&#xff0c;想象一下&#xff0c;每个词…

day01:数据库DDL

一:基础概念 数据库:存储数据的仓库&#xff0c;数据是有组织的进行存储 数据库管理系统:操纵和管理数据库的大型软件 SQL&#xff1a;操作关系型数据库的编程语言&#xff0c;定义了一套操作关系型数据库统一标准 关系图 二:数据模型 关系型数据库:建…

vue的双向绑定的原理,和angular的对比

目录 前言 Vue的双向绑定用法 代码 Vue的双向绑定原理 Angular的双向绑定用法 代码 Angular的双向绑定原理 理解 图片 关于Vue的双向绑定原理和与Angular的对比&#xff0c;我们可以从以下几个方面进行深入探讨&#xff1a; 前言 双向绑定是现代前端框架的核心特性之…

经典卷积神经网络 - ResNet

ResNet是一种残差网络&#xff0c;咱们可以把它理解为一个子网络&#xff0c;这个子网络经过堆叠可以构成一个很深的网络。 我们一直在加深神经网络&#xff0c;但是加深不一定只会带来好处。 残差块 串联一个层改变函数类&#xff0c;我们希望能扩大函数类残差块加入快速通…

计算机网络【CN】子网划分与子网掩码

一个子网定义(X.X.X.X/n) 子网掩码为 n 个 1&#xff0c;32-n 个 0包含的 IP 地址数&#xff1a;232−n 主机号全 0 表示本网段主机号全 1 表示网段的广播地址可分配的 IP 地址数 :232−&#x1d45b;−2 子网划分原则 满足子网定义子网&#x1d434;1…&#x1d434;&#x…

分布式合集

1.Spring 的事务 Spring框架为应用程序提供了强大的事务管理功能。它通过将事务逻辑与业务逻辑分离&#xff0c;使得开发者可以专注于业务逻辑的实现&#xff0c;而不必过多关注事务的管理。Spring事务的核心是基于AOP&#xff08;面向切面编程&#xff09;的声明式事务管理&a…

iOS插件

把平时看到或项目用到的一些插件进行整理&#xff0c;文章后面分享一些不错的实例&#xff0c;若你有其它的插件欢迎分享&#xff0c;不断的进行更新&#xff1b; 一&#xff1a;第三方插件 1:基于响应式编程思想的oc 地址&#xff1a;https://github.com/ReactiveCocoa/Rea…

Three.js 基础纹理贴图

本文简介 带尬猴&#xff0c;我嗨德育处主任 尽管 Three.js 文档已经比较详细了&#xff0c;但对于刚接触 Three.js 的工友来说&#xff0c;最麻烦的还是不懂如何组合。Three.js 的功能实在太多了&#xff0c;初学者很容易被大量的新概念冲晕。 本文主要讲解入门 Three.js 必…