【项目】Reactor模式的服务器

目录

Reactor完整代码连接

前置知识:

 1.普通的epoll读写有什么问题?

2.Connection内的回调函数是什么

3.服务器的初始化(Connection只是使用的一个结构体)

4.等待就绪事件:有事件就绪,对使用Connection的不同对象(封装fd,回调方法)调用对应的回调方法;

5._listenSock读取 :Accepter函数获取新链接怎么处理的

6.普通套接字读取:Recver

7.对写事件的关心是按需关系的:

8.执行效果:

9.Reactor的优势:


Reactor完整代码连接

前置知识:

Reactor叫做反应堆模式;反应:对已经就绪的事件(读、写、异常)进行处理;

我们使用epoll来实现,select、poll、epoll是多路转接的发展史,epoll完善了select、poll的缺点;

  • 需要程序员维护数组                                      select/poll都有这个缺点    
  • 有大量的遍历                                                 select/poll都有这个缺点 
  • 大量参数为输入输出型参数,需要重新设置  select有
  • 管理的fd有上限                                              select有

 1.普通的epoll读写有什么问题?

  • 使用的是一个静态数组读取;报文长,读到就是不完整报文,报文短,一次读取可能有多个报文,最后一个报文可能不是完整的;
  • 这样的错误报文没法分析和处理,就不能构造响应报文;

综上所述:问题为没法保证读取到的是完整报文,导致没法分析和处理、构建响应报文;

    void Read(int fd){char buff[1024];ssize_t s = read(fd, buff, 1023);if(s > 0){buff[s] = 0;LOG2(INFO, buff, fd);}

解决办法:将文件描述符封装,并且有接受发送缓冲区,使用string就可以;

  1. 使用静态数组读取,然后添加到接受缓冲区保存;
  2. 读取完毕,对接受缓冲区分析是否有完整报文;
  3. 处理完整请求报文,构建响应报文添加到发送缓冲区;
using func_t = std::function<void(Connection*)>;
using cals_t = std::function<void(std::string &, Connection*)>;class Connection{
public:Connection(int fd = -1 ):_fd(fd), _ts(nullptr){}~Connection(){if(_fd >= 0)close(_fd);}
public:int _fd;//读写异常回调方法func_t _recver;func_t _sender;func_t _exception;//接受缓冲区std::string _outbuff;//发送缓冲区std::string _inbuff;//TcpServer的回指指针,对写事件的关心是按需打开TcpServer *_ts;//连接最近活跃活动的时间time_t _times;
};

2.Connection内的回调函数是什么

一个包装器;返回值为void,参数为Connection*,的函数指针、仿函数、lamada表达,它都可以接受;

using func_t = std::function<void(Connection*)>;

优势:

  • _listenSock是读方法是接受新连接,普通是读取请求报文
  • 初始化时设置读写异常回调方法(回调:使用函数指针执行的函数),不需要判断是_listenSock还是普通套接字,统一使用con->_recver;

3.服务器的初始化(Connection只是使用的一个结构体)

  1. 套接字创建,bind,监听;
  2. 构建epoll模型,epoll函数也封装了,_epfd封装在Epoll类内;
  3. 初始化_listenSock    Connection结构体;读取回调方法Accept是类函数,多一个this指针,需要使用std::bind来改变参数个数,进行传递给包装器
  4. epoll_wait的事件就绪队列初始化;
class TcpServer{const static int gport = 8080;const static int gnum = 128;TcpServer(int port = gport, int num = gnum):_port(gport), _evts_num(num){//套接字,创建bind监听_listenSock = Sock::Socket();Sock::Bind(_listenSock,_port);Sock::Listen(_listenSock);//构建epoll模型_epoll.CreateEpoll();//listensock添加epoll模型和_connections管理起来AddConnection(_listenSock, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);//epoll_wait就绪队列,获取已就绪的事件_evts = new epoll_event[_evts_num];}~TcpServer(){if(_listenSock >= 0)close(_listenSock);if(_evts != nullptr)delete[] _evts;for(auto &pr : _connections){_connections.erase(pr.first);delete pr.second;}}
private:int _listenSock;//epollEpoll _epoll;//就绪队列epoll_event* _evts;int _evts_num;//管理connection对象std::unordered_map<int, Connection*> _connections;int _port;//业务处理的回调指针cals_t _cb;
};

4.等待就绪事件:有事件就绪,对使用Connection的不同对象(封装fd,回调方法)调用对应的回调方法;

    void LoopOnce(){int n = _epoll.WaitEpoll(_evts, _evts_num);//有事件就绪if(n > 0){//LOG2(INFO, "epoll wait success",fd);for(int i=0; i<n; i++){int fd = _evts[i].data.fd;int events = _evts->events;//连接关闭或者错误,改为读写统一处理,读写失败调异常处理;if( events & EPOLLHUP){LOG2(INFO,"连接关闭",fd);events |= (EPOLLIN | EPOLLOUT);}if( events & EPOLLERR){LOG2(INFO,"错误",fd);events |= (EPOLLIN | EPOLLOUT);} if(_connections.count(fd) && events & EPOLLIN){if(IsConnectionExits(fd) && _connections[fd]->_recver != nullptr){_connections[fd]->_recver(_connections[fd]);}}if(_connections.count(fd) && events & EPOLLOUT){if(IsConnectionExits(fd) && _connections[fd]->_sender != nullptr)_connections[fd]->_sender(_connections[fd]);}}}else if(n == 0){LOG(INFO, "timeout");}else{LOG(FATAL,"epoll wait fail");exit(4);}}void Dispatcher(cals_t cb){_cb = cb;while(true){//去除不活跃的连接DeleteInactivity();LoopOnce();}}

5._listenSock读取 :Accepter函数获取新链接怎么处理的

  • 得到新连接,如果新的fd是合法的,设置对应的读写异常回调方法,读:读取请求报文,写:发送响应报文,异常:出现错误进行处理;
  • 所有的套接字都是使用ET模式(通知效率高,只支持非阻塞读写),EPOLLET因该被设置;
void Accepter(Connection * con){while(true){con->_times = time(nullptr);struct sockaddr_in tmp;socklen_t tlen = sizeof(tmp);int new_sock = accept(con->_fd, (struct sockaddr *)&tmp, &tlen);if(new_sock < 0){//所以事件处理完毕if(errno == EAGAIN || errno == EWOULDBLOCK)break;else if(errno == EINTR)//可能被信号中断,概率极小continue;else{std::cout << "accept fail , errno :" << errno << strerror(errno) << std::endl;break;}} else//添加到epoll模型和_connections中管理;{if(AddConnection(new_sock, std::bind(&TcpServer::Recver, this, std::placeholders::_1), std::bind(&TcpServer::Sender, this, std::placeholders::_1),std::bind(&TcpServer::Exception, this, std::placeholders::_1)))LOG2(INFO, "add connection success",new_sock);elseLOG2(RISK, "add connection fail", new_sock);}}}

6.普通套接字读取:Recver

  1. 一直读取,直到错误或者读取完毕;每次读取的结果都放到接受缓冲区;
  2. 读取完毕,对接受缓冲区处理,拿出一个个完整的报文,对请求报文进行业务处理;
    void Recver(Connection *con){con->_times = time(nullptr);const int buff_size = 1024;char buff[buff_size];while(true){ssize_t s = recv(con->_fd, buff, buff_size - 1, 0);if(s > 0){buff[s] = 0;con->_outbuff += buff;}else if(s == 0){LOG2(INFO, "写端关闭", con->_fd);con->_exception(con);return;}else{//读取完毕if(errno == EAGAIN || errno == EWOULDBLOCK ){LOG2(INFO, "处理完毕", con->_fd);break;}else if(errno == EINTR)continue;else{LOG2(ERROR, "recv fail ,fd: ", con->_fd);con->_exception(con);return;}}}std::cout << "fd: " << con->_fd << "outbuff: " << con->_outbuff <<std::endl;//对outbuff内的完整报文,进行处理std::vector<std::string> out;//分隔报文,函数在protocol.hppSplitMessage(out, con->_outbuff);for(auto &s : out)_cb(s, con);//业务逻辑回调指针,在主函数}

7.对写事件的关心是按需关系的:

  • 如果开启关心,还没有数据发送,写事件会一直就绪;所以按需关心;
  • 请求报文业务处理完毕,构建好响应报文,一定有响应,打开对写事件的关心;
  • 也是Connection为什么封装一个Tcperver指针的原因,这里开启写事件的关心;
//业务处理
void CalArguments(std::string &str, Connection *con)
{//请求报文反序列化Request req;//std::cout<<str <<std::endl;if(!req.Deserialize(str)){LOG2(ERROR, "deseroalize fail" ,con->_fd);return;}//对数据处理Response res;calculator(req, res);//响应报文序列化std::string s = res.Serialize();//添加到inbuffcon->_inbuff += s;//一定有响应报文,打开写事件的关系con->_ts->EnableReadWrite(con->_fd, true, true);
}

8.执行效果:

  • 我写的协议是对任意两个数加减乘除;
  • 每个请求或者响应都是用 x  做为分割符的;

9.Reactor的优势:

和进程/线程做比较:

  • 它是一个单进程的就可以并发处理请求的服务器,比进程/线程减少了创建、销毁、调度的时间;
  • 它的等待一批fd,减少了单位等待时间;一个线程等待对应一个fd;
  • 有很高的复用性,替换业务逻辑就行了;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/66024.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Java核心知识】ThreadLocal相关知识

ThreadLocal 什么是ThreadLocal ThreadLoacal类可以为每个线程保存一份独有的变量&#xff0c;该变量对于每个线程都是独占的。实现原理为每个Thread类中包含一个ThreadHashMap&#xff0c;key为变量的对应的ThreadLocal对象&#xff0c;value为变量的值。 在日常使用中&…

python编写MQTT订阅程序

Download | Eclipse Mosquitto 1、下载&#xff1a; https://mosquitto.org/files/binary/win64/mosquitto-2.0.17-install-windows-x64.exe 2、安装&#xff1a; 3、conf配置 1)使用notepad打开“C:\Program Files\mosquitto\mosquitto.conf”另存为c:\myapp\msquitto\mo…

RT-Thread 线程间同步

线程间同步 在多线程实时系统中&#xff0c;一项工作的完成往往可以通过多个线程协调的方式共同来完成&#xff0c;那么多个线程之间如何 “默契” 协作才能使这项工作无差错执行&#xff1f;下面举个例子说明。 例如一项工作中的两个线程&#xff1a;一个线程从传感器中接收…

C#基础知识点记录

目录 课程一、C#基础1.C#编译环境、基础语法2.Winform-后续未学完 课程二、Timothy C#底层讲解一、类成员0常量1字段2属性3索引器5方法5.1值参数&#xff08;创建副本&#xff0c;方法内对值的操作&#xff0c;不会影响原来变量的值&#xff09;5.2引用参数&#xff08;传的是地…

ELK日志收集系统(四十九)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 一、概述 二、组件 1. elasticsearch 2. logstash 2.1 工作过程 2.2 INPUT 2.3 FILETER 2.4 OUTPUTS 3. kibana 三、架构类型 3.1 ELK 3.2 ELKK 3.3 ELFK 3.5 EF…

go语言-协程

mOS结构体 每一种操作系统不同的线程信息 g给g0栈给g0协程内存中分配的地址&#xff0c;记录函数跳转信息&#xff0c; 单线程循环 0.x版本 1.0版本 多线程循环 操作系统并不知道Goroutine的存在 操作系统线程执行一个调度循环&#xff0c;顺序执行Goroutine 调度循环非常…

vue Cesium接入在线地图

Cesium接入在线地图只需在创建时将imageryProvider属性换为在线地图的地址即可。 目录 天地图 OSM地图 ArcGIS 地图 谷歌影像地图 天地图 //矢量服务let imageryProvider new Cesium.WebMapTileServiceImageryProvider({url: "http://t0.tianditu.com/vec_w/wmts?s…

大数据组件-Flume集群环境的启动与验证

&#x1f947;&#x1f947;【大数据学习记录篇】-持续更新中~&#x1f947;&#x1f947; 个人主页&#xff1a;beixi 本文章收录于专栏&#xff08;点击传送&#xff09;&#xff1a;【大数据学习】 &#x1f493;&#x1f493;持续更新中&#xff0c;感谢各位前辈朋友们支持…

【python爬虫】中央气象局预报—静态网页图像爬取练习

静态网页爬取练习 中央气象局预报简介前期准备步骤Python爬取每日预报结果—以降水为例 中央气象局预报简介 中央气象台是中国气象局&#xff08;中央气象台&#xff09;发布的七天降水预报页面。这个页面提供了未来一周内各地区的降水预报情况&#xff0c;帮助人们了解即将到来…

如何高效地设计测试用例并评审

编写出好的测试用例是每一个测试工程师的职责&#xff0c;但在实际工作中大家写的测试用例往往需要不断地修改才能使用&#xff0c;这不仅浪费了时间&#xff0c;还容易让测试工程师产生自我否定的情绪&#xff0c;甚至在团队中产生各种矛盾。 那如何高效地设计测试用例呢&…

【pyqt5界面化工具开发-14】初始牛刀-登录工具

目录 0x00 前言&#xff1a; 一、准备好ui的加载 二、获取对应的触发事件 三、触发事件绑定 三、输入内容的调用 三、完善登录逻辑 0x00 前言&#xff1a; 在逻辑代码的处理添加数据包的请求&#xff0c;返回数据包的判断&#xff0c;就可以完整实现登录检测的一个界面化…

Android.mk开发模板

今天简单写了一个 Android.mk 的示例模板&#xff0c;供初学者参考。 本模板主要给大家示例 Android NDK 开发中的如下几个问题&#xff1a; 如何自动添加需要编译的源文件列表如何添加第三方静态库、动态库的依赖如何构造一个完整的NDK工程框架 假设我们的项目依赖 libmath.…

JavaScript原型链污染

前言 在浏览某个论坛的时候&#xff0c;第一次看到了JavaScript原型链污染漏洞。当时非常的好奇&#xff0c;当时我一直以为js作为一种前端语言&#xff0c;就算存在漏洞也是针对前端&#xff0c;不会危害到后端&#xff0c;因此我以为这种漏洞危害应该不大。可当我看到他的漏…

简明易懂:Python中的分支与循环

文章目录 前言分支结构if 语句&#xff1a;单一条件判断else语句&#xff1a;提供备选方案elif 语句&#xff1a;多条件判断嵌套的分支结构&#xff1a;复杂条件逻辑 循环结构for循环&#xff1a;遍历序列range()函数与for循环while循环&#xff1a;条件重复循环控制&#xff1…

GA遗传算法

储备知识 GA算法主要解决数学模型中最优化的搜索算法&#xff0c;是进化算法中的一种&#xff0c;基因算法借鉴了自然界基因的遗传的主要现象&#xff0c;分别为遗传&#xff0c;变异&#xff0c;自然选择&#xff0c;杂交等。 GA算法参数 GA算法的参数如下所示。 种群规模…

剑指 Offer 62. 圆圈中最后剩下的数字(简单)

题目&#xff1a; class Solution { public:int lastRemaining(int n, int m) {int pos 0;for(int i2;i<n;i){pos (posm)%i;}return pos;} };作者&#xff1a;想吃火锅的木易 链接&#xff1a;详细题解 来源&#xff1a;力扣&#xff08;LeetCode&#xff09;

ssh常用操作

ssh常用操作 SSH是一种安全协议&#xff0c;ssh是该协议的客户端程序&#xff0c;openssh-server则是该协议的服务端程序 常用系统都自带了ssh客户端程序&#xff0c;服务端程序则可能要安装 密码远程登陆 前提&#xff1a;服务器安装了openssh-server&#xff0c;未安装时…

安装bpftrace和bcc的踩坑记录

最后在Ubuntu22.04使用Ubuntu提供的安装命令完成了安装。这里是记录尝试在Ubuntu18.04和Ubuntu22.04使用源码安装未果的过程。 文章目录 22版本安装bcc准备工具安装命令使用报错&#xff1a;iovisor封装的安装方式ubuntu的安装方式 For Bionic (18.04 LTS)官方提供的源码安装准…

Tutorial: Mathmatical Derivation of Backpropagation

目录 1. 概要 2. Gradient Descent 3. Chain rule 3.1 单变量基本链式法则 3.2 单变量全微分链式法则 3.3 小贴士&#xff1a;微分、导数、导函数是什么关系&#xff1f; 4. What and why backpropagation? 5. Backpropagation for a simple neural network 5.1 基于…

哪些存储设备的数据需要注意,防止误删除或者格式化丢失?

以下是一些存储设备的数据要注意&#xff0c;防止误删除或者格式化丢失&#xff1a; 1.硬盘&#xff1a;存储重要数据时要备份&#xff0c;避免硬盘故障、误格式化等情况导致数据丢失。 2.USB闪存驱动器&#xff1a;在拔出USB闪存驱动器前&#xff0c;应该先进行“安全删除”…