深入理解Reactor模型的原理与应用

1、什么是Reactor模型

        Reactor意思是“反应堆”,是一种事件驱动机制。

        和普通函数调用的不同之处在于:应用程序不是主动的调用某个 API 完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到 Reactor 上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。

        对于刚开始接触这个机制,个人感觉翻译成“感应器”可能会更好理解一点,因为注册在Reactor上的函数就像感应器一样,只要有事件到达,就会触发它开始工作。

        Reactor 模式是编写高性能网络服务器的必备技术之一。


2、Reactor模型的优点

  • 响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的;
  • 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
  • 可扩展性强,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源;
  • 可复用性高,reactor 框架本身与具体事件处理逻辑无关,具有很高的复用性;
           Reactor 模型开发效率上比起直接使用 IO 复用要高,它通常是单线程的,设计目标是希望单线程使用一颗 CPU 的全部资源。
            优点即每个事件处理中很多时候可以不考虑共享资源的互斥访问。可是缺点也是明显的,现在的硬件发展,已经不再遵循摩尔定律,CPU 的频率受制于材料的限制不再有大的提升,而改为是从核数的增加上提升能力,当程序需要使用多核资源时,Reactor 模型就会悲剧 , 为什么呢?
            如果程序业务很简单,例如只是简单的访问一些提供了并发访问的服务,就可以直接开启多个反应堆,每个反应堆对应一颗 CPU 核心,这些反应堆上跑的请求互不相关,这是完全可以利用多核的。例如 Nginx 这样的 http 静态服务器。

3、通过对网络编程(epoll)代码的优化,深入理解Reactor模型

1、epoll的普通版本,根据fd类型(listen_fd和client_fd)分为两大类处理。

        如果是listen_fd,调用accept处理连接请求;

        如果是client_fd,调用recv或者send处理数据。

         代码实现:


#include <stdio.h>
#include <string.h>
#include <stdlib.h>#include <unistd.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>#include <errno.h>int main(int argc, char* argv[])
{if (argc < 2)return -1;int port = atoi(argv[1]);   //字符串转换为整型int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0)return -1;struct sockaddr_in addr;memset(&addr, 0, sizeof(struct sockaddr_in));   //新申请的空间一定要置零addr.sin_family = AF_INET;addr.sin_port = htons(port);    //转换成网络字节序addr.sin_addr.s_addr = INADDR_ANY;if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0)return -2;if (listen(sockfd, 5) < 0)return -3;//epollint epfd = epoll_create(1); //创建epoll,相当于红黑树的根节点struct epoll_event ev, events[1024] = {0};  //events相当于就绪队列,一次性可以处理的集合ev.events = EPOLLIN;ev.data.fd = sockfd;epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);    //将ev节点加入到epoll,此处的sockfd参数随便添加没有意义,需要操作系统索引和它有对应的句柄while (1){int nready = epoll_wait(epfd, events, 1024, -1);    //第四个参数-1表示一直等待,有事件才返回if (nready < 1) //没有事件触发,nready代表触发事件的个数break;int i = 0;for (i = 0; i < nready; i++)    //epoll_wait带出的就绪fd包括两大类:1、处理连接的listen_fd,2、处理数据的send和recv{if (events[i].data.fd == sockfd) //如果是listenfd,就将它加入到epoll{struct sockaddr_in client_addr;memset(&client_addr, 0, sizeof(struct sockaddr_in));socklen_t client_len = sizeof(client_addr);int client_fd = accept(sockfd, (struct sockaddr *)&client_addr, &client_len);if (client_fd <= 0)continue;char str[INET_ADDRSTRLEN] = {0};printf("recv from IP = %s ,at Port= %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)), ntohs(client_addr.sin_port));ev.events = EPOLLIN | EPOLLET;  //epoll默认是LT模式ev.data.fd = client_fd;epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev);}else    //fd进行读写操作{//对fd的读写操作没有分开int client_fd = events[i].data.fd;char buf[1024] = {0};int ret = recv(client_fd, buf, 1024, 0);if (ret < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//}else{//}printf("ret < 0,断开连接:%d\n", client_fd);close(client_fd);ev.events = EPOLLIN;ev.data.fd = client_fd;epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev);}else if (ret == 0)  //接收到了客户端发来的断开连接请求FIN后,没有及时调用close函数,进入了CLOSE _WAIT状态{printf("ret = 0,断开连接:%d\n", client_fd);close(client_fd);ev.events = EPOLLIN;ev.data.fd = client_fd;epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev); //close关闭连接后要将它既是从epoll中删除}else{printf("Recv: %s, %d Bytes\n", buf, ret);}//区分fd的读写操作,即recv和sendif (events[i].events & EPOLLIN){int client_fd = events[i].data.fd;char buf[1024] = {0};int ret = recv(client_fd, buf, 1024, 0);if (ret < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//...}else{//...}printf("ret < 0,断开连接:%d\n", client_fd);close(client_fd);ev.events = EPOLLIN;ev.data.fd = client_fd;epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev);}else if (ret == 0)  //接收到了客户端发来的断开连接请求FIN后,没有及时调用close函数,进入了CLOSE _WAIT状态{printf("ret = 0,断开连接:%d\n", client_fd);close(client_fd);ev.events = EPOLLIN;ev.data.fd = client_fd;epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev); //close关闭连接后要将它既是从epoll中删除}else{printf("Recv: %s, %d Bytes\n", buf, ret);}}if (events[i].events & EPOLLOUT)    //为什么需要判断EPOLLOUT,而不是直接else?因为一个fd有可能同时存在可读和可写事件的{int client_fd = events[i].data.fd;char buf[1024] = {0};send(client_fd, buf, sizeof(buf), 0);}}}}return 0;
}

 

2、epoll的优化版本,根据事件类型(读和写)分为两大类处理。

         代码实现:

        for (i = 0; i < nready; i++)    //epoll_wait带出的就绪fd包括两大类:1、处理连接的listen_fd,2、处理数据的send和recv{//区分fd的读写操作if (events[i].events & EPOLLIN){if (events[i].data.fd == sockfd) //如果是listenfd,就将它加入到epoll{struct sockaddr_in client_addr;memset(&client_addr, 0, sizeof(struct sockaddr_in));socklen_t client_len = sizeof(client_addr);int client_fd = accept(sockfd, (struct sockaddr *)&client_addr, &client_len);if (client_fd <= 0)continue;char str[INET_ADDRSTRLEN] = {0};printf("recv from IP = %s ,at Port= %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)), ntohs(client_addr.sin_port));ev.events = EPOLLIN | EPOLLET;  //epoll默认是LT模式ev.data.fd = client_fd;epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev);}else {int client_fd = events[i].data.fd;char buf[1024] = {0};int ret = recv(client_fd, buf, 1024, 0);if (ret < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//...}else{//...}printf("ret < 0,断开连接:%d\n", client_fd);close(client_fd);ev.events = EPOLLIN;ev.data.fd = client_fd;epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev);}else if (ret == 0)  //接收到了客户端发来的断开连接请求FIN后,没有及时调用close函数,进入了CLOSE _WAIT状态{printf("ret = 0,断开连接:%d\n", client_fd);close(client_fd);ev.events = EPOLLIN;ev.data.fd = client_fd;epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev); //close关闭连接后要将它既是从epoll中删除}else{printf("Recv: %s, %d Bytes\n", buf, ret);}}}//为什么需要判断EPOLLOUT,而不是直接else?因为一个fd有可能同时存在可读和可写事件的if (events[i].events & EPOLLOUT)    {int client_fd = events[i].data.fd;char buf[1024] = {0};send(client_fd, buf, sizeof(buf), 0);}}

 

3、epoll的Reactor模式, epoll由以前的对网络io(fd)进行管理,转变成对events事件进行管理。

         代码实现:

#include <stdio.h>
#include <string.h>
#include <stdlib.h>#include <unistd.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>#include <errno.h>//每个fd所对应的信息
struct sockitem
{int sockfd;int (*callback)(int fd, int events, void*arg);char sendbuf[1024];char recvbuf[1024];
};//每个epoll所对应的信息
struct epollitem
{int epfd;struct epoll_event events[1024];    //events相当于就绪队列,一次性可以处理的集合
};struct epollitem *eventloop = NULL;int recv_cb(int fd, int events, void*arg);
int send_cb(int fd, int events, void*arg);int accept_cb(int fd, int events, void*arg)
{printf("---accept_cb(int fd, int events, void*arg)---\n");struct sockaddr_in client_addr;memset(&client_addr, 0, sizeof(struct sockaddr_in));socklen_t client_len = sizeof(client_addr);int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);if (client_fd <= 0)return -1;char str[INET_ADDRSTRLEN] = {0};printf("recv from IP = %s ,at Port= %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)), ntohs(client_addr.sin_port));struct epoll_event ev;ev.events = EPOLLIN | EPOLLET;  //epoll默认是LT模式struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));si->sockfd = client_fd;si->callback = recv_cb;ev.data.ptr = si;epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, client_fd, &ev);return client_fd;
}int recv_cb(int fd, int events, void*arg)
{printf("---recv_cb(int fd, int events, void*arg)---\n");struct epoll_event ev;struct sockitem *sit = (struct sockitem*)arg;int ret = recv(fd, sit->recvbuf, 1024, 0);if (ret < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//...}else{//...}printf("ret < 0,断开连接:%d\n", fd);ev.events = EPOLLIN;epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);    //close关闭连接后要将它既是从epoll中删除close(fd);free(sit);  //连接关闭后释放内存}else if (ret == 0)  //接收到了客户端发来的断开连接请求FIN后,没有及时调用close函数,进入了CLOSE _WAIT状态{printf("ret = 0,断开连接:%d\n", fd);ev.events = EPOLLIN;epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev); close(fd);free(sit);}else{printf("Recv from recvbuf:  %s, %d Bytes\n", sit->recvbuf, ret);ev.events = EPOLLIN | EPOLLOUT;  //sit->sockfd = fd;sit->callback = send_cb;ev.data.ptr = sit;epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);}return ret;
}int send_cb(int fd, int events, void*arg)
{struct epoll_event ev;struct sockitem *sit = (struct sockitem*)arg;strncpy(sit->sendbuf, sit->recvbuf, sizeof(sit->recvbuf) + 1);send(fd, sit->sendbuf, sizeof(sit->recvbuf) + 1, 0);ev.events = EPOLLIN | EPOLLET;  //sit->sockfd = fd;sit->callback = recv_cb;ev.data.ptr = sit;epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);return fd;
}int main(int argc, char* argv[])
{if (argc < 2)return -1;int port = atoi(argv[1]);   //字符串转换为整型int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0)return -1;struct sockaddr_in addr;memset(&addr, 0, sizeof(struct sockaddr_in));   //新申请的空间一定要置零addr.sin_family = AF_INET;addr.sin_port = htons(port);    //转换成网络字节序addr.sin_addr.s_addr = INADDR_ANY;if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0)return -2;if (listen(sockfd, 5) < 0)return -3;//epolleventloop = (struct epollitem *)malloc(sizeof(struct epollitem));eventloop->epfd = epoll_create(1); //创建epoll,相当于红黑树的根节点struct epoll_event ev;ev.events = EPOLLIN | EPOLLET;struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));si->sockfd = sockfd;si->callback = accept_cb;ev.data.ptr = si;   //将fd和对应的回调函数绑定一起带进epollepoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, sockfd, &ev);    //将ev节点加入到epoll,此处的sockfd参数随便添加没有意义,需要操作系统索引和它有对应的句柄while (1){int nready = epoll_wait(eventloop->epfd, eventloop->events, 1024, -1);    //第四个参数-1表示一直等待,有事件才返回if (nready < 1) //没有事件触发,nready代表触发事件的个数break;int i = 0;for (i = 0; i < nready; i++){//区分fd的读写操作if (eventloop->events[i].events & EPOLLIN){struct sockitem *sit = (struct sockitem*)eventloop->events[i].data.ptr;sit->callback(sit->sockfd, eventloop->events[i].events, sit);    //不用区分listen_fd和recv_fd,相应的fd都会调用他们所对应的callback}//为什么需要判断EPOLLOUT,而不是直接else?因为一个fd有可能同时存在可读和可写事件的if (eventloop->events[i].events & EPOLLOUT)    {struct sockitem *sit = (struct sockitem*)eventloop->events[i].data.ptr;sit->callback(sit->sockfd, eventloop->events[i].events, sit);}}}return 0;
}

4、Reactor模型的应用 

        1、单线程模式的Reactor,参考libevent、redis;

        2、多线程模式的Reactor,参考memcached;

        3、多进程模式的Reactor,参考nginx。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/55944.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里云机器学习PAI全新推出特征平台 (Feature Store),助力AI建模场景特征数据高效利用

推荐算法与系统在全球范围内已得到广泛应用&#xff0c;为用户提供了更个性化和智能化的产品推荐体验。在推荐系统领域&#xff0c;AI建模中特征数据的复用、一致性等问题严重影响了建模效率。阿里云机器学习平台 PAI 推出特征平台&#xff08;PAI-FeatureStore&#xff09; 。…

Python 密码破解指南:15~19

协议&#xff1a;CC BY-NC-SA 4.0 译者&#xff1a;飞龙 本文来自【OpenDocCN 饱和式翻译计划】&#xff0c;采用译后编辑&#xff08;MTPE&#xff09;流程来尽可能提升效率。 收割 SB 的人会被 SB 们封神&#xff0c;试图唤醒 SB 的人是 SB 眼中的 SB。——SB 第三定律 十五、…

前端基础(Element、vxe-table组件库的使用)

前言&#xff1a;在前端项目中&#xff0c;实际上&#xff0c;会用到组件库里的很多组件&#xff0c;本博客主要介绍Element、vxe-table这两个组件如何使用。 目录 Element 引入element 使用组件的步骤 使用对话框的示例代码 效果展示 vxe-table 引入vxe-table 成果展…

深入理解Android消息机制的原理

Handler & Looper & MessageQueue关系简述 一个线程至多有一个looper&#xff1b;一个looper有一个mq&#xff1b;一个mq对应多个message&#xff1b;一个message对应多个handler。消息类型&#xff1a;同步、异步、同步屏障消息。无限循环&#xff1a;在队列中没有消…

stm32串口通信(PC--stm32;中断接收方式;附proteus电路图;开发方式:cubeMX)

单片机型号STM32F103R6: 最后实现的效果是&#xff0c;开机后PC内要求输入1或0&#xff0c;输入1则打开灯泡&#xff0c;输入0则关闭灯泡&#xff0c;输入其他内容则显示错误&#xff0c;值得注意的是这个模拟的东西只能输入英文 之所以用2个LED灯是因为LED电阻粗略一算就是1…

fat32 文件系统 误删除文件数据恢复 SDK 介绍

fat32 文件系统 误删除文件数据恢复 SDK 介绍 fat32_analyze.dll 是一个专门用于恢复 fat32 文件系统误删除文件的标准的动态链接库(DLL)&#xff0c; 可被任何其他程序直接加载调用。 下载地址&#xff1a; https://gitee.com/tankaishuai/powerful_sdks/tree/master/fat32_a…

【Tkinter系列02/5】界面初步和布局

本文是系列文章第二部分。前文见&#xff1a;【Tkinter系列01/5】界面初步和布局_无水先生的博客-CSDN博客 说明 一般来说&#xff0c;界面开发中&#xff0c;如果不是大型的软件&#xff0c;就不必用QT之类的实现&#xff0c;用Tkinter已经足够&#xff0c;然而即便是Tkinter规…

spring boot 项目整合 websocket

1.业务背景 负责的项目有一个搜索功能&#xff0c;搜索的范围几乎是全表扫&#xff0c;且数据源类型贼多。目前对搜索的数据量量级未知&#xff0c;但肯定不会太少&#xff0c;不仅需要搜索还得点击下载文件。 关于搜索这块类型 众多&#xff0c;未了避免有个别极大数据源影响整…

linux操作系统的权限的深入学习(未完)

1.Linux权限的概念 Linux下有两种用户&#xff1a;超级用户&#xff08;root&#xff09;、普通用户。 超级用户&#xff1a;可以再linux系统下做任何事情&#xff0c;不受限制 普通用户&#xff1a;在linux下做有限的事情。 超级用户的命令提示符是“#”&#xff0c;普通用户…

Spring Authorization Server入门 (十六) Spring Cloud Gateway对接认证服务

前言 之前虽然单独讲过Security Client和Resource Server的对接&#xff0c;但是都是基于Spring webmvc的&#xff0c;Gateway这种非阻塞式的网关是基于webflux的&#xff0c;对于集成Security相关内容略有不同&#xff0c;且涉及到代理其它微服务&#xff0c;所以会稍微比较麻…

Prometheus 监控系统

常用的监控系统有哪些&#xff1f; 老牌传统 Zabbix Nagios Cacti 新一代的 Prometheus 夜莺 Zabbix 和 Prometheus 的区别&#xff1f;如何选择&#xff1f;【重中之重】 Zabbix 更适用于传统业务架构的物理机、虚拟机环境的监控&#xff0c;对容器环境的支持较差&#xf…

战略形成是权力妥协的过程,江湖,政治是常态

战略权力派&#xff1a;战略形成是各种权力妥协的过程【安志强趣讲270期】 趣讲大白话&#xff1a;有人的地方就有政治 **************************** 有人的地方就有江湖 有组织的地方就有政治 公司的战略是各种人的权力博弈的产物 围观权力&#xff1a;就是组织内部 宏观权力…

MyCAT命令行监控

9066端口 &#xff0c;用mysql命令行连接 Mysql –utest –ptest –P9066 show help 可显示所有相关管理命令 显示后端物理库连接信息&#xff0c;包括当前连接数&#xff0c;端口 Show backend Show connection 显示当前前端客户端连接情况&#xff0c;已经网络流量信息、…

Tomcat 部署时 war 和 war exploded区别

在 Tomcat 调试部署的时候&#xff0c;我们通常会看到有下面 2 个选项。 是选择war还是war exploded 这里首先看一下他们两个的区别&#xff1a; war 模式&#xff1a;将WEB工程以包的形式上传到服务器 &#xff1b;war exploded 模式&#xff1a;将WEB工程以当前文件夹的位置…

【Go 基础篇】Go语言数组遍历:探索多种遍历数组的方式

数组作为一种基本的数据结构&#xff0c;在Go语言中扮演着重要角色。而数组的遍历是使用数组的基础&#xff0c;它涉及到如何按顺序访问数组中的每个元素。在本文中&#xff0c;我们将深入探讨Go语言中多种数组遍历的方式&#xff0c;为你展示如何高效地处理数组数据。 前言 …

2023年高教社杯 国赛数学建模思路 - 复盘:光照强度计算的优化模型

文章目录 0 赛题思路1 问题要求2 假设约定3 符号约定4 建立模型5 模型求解6 实现代码 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 问题要求 现在已知一个教室长为15米&#xff0c;宽为12米&…

YOLO目标检测——肺炎分类数据集下载分享

肺炎分类数据集总共21000图片&#xff0c;可应用于&#xff1a;肺炎检测、疾病诊断、疾病预测和预警等等。 数据集点击下载&#xff1a;YOLO肺炎分类数据集21000图片.rar

如何深入理解 Node.js 中的流(Streams)

Node.js是一个强大的允许开发人员构建可扩展和高效的应用程序。Node.js的一个关键特性是其内置对流的支持。流是Node.js中的一个基本概念&#xff0c;它能够实现高效的数据处理&#xff0c;特别是在处理大量信息或实时处理数据时。 在本文中&#xff0c;我们将探讨Node.js中的流…

腾讯云服务器地域和可用区详细介绍_选择攻略

腾讯云服务器地域有什么区别&#xff1f;怎么选择比较好&#xff1f;地域选择就近原则&#xff0c;距离地域越近网络延迟越低&#xff0c;速度越快。关于地域的选择还有很多因素&#xff0c;地域节点选择还要考虑到网络延迟速度方面、内网连接、是否需要备案、不同地域价格因素…

微服务dubbo

微服务是一种软件开发架构风格&#xff0c;它将一个应用程序拆分成一组小型、独立的服务&#xff0c;每个服务都可以独立部署、管理和扩展。每个服务都可以通过轻量级的通信机制&#xff08;通常是 HTTP/REST 或消息队列&#xff09;相互通信。微服务架构追求高内聚、低耦合&am…