【项目】Reactor模式的服务器

目录

Reactor完整代码连接

前置知识:

 1.普通的epoll读写有什么问题?

2.Connection内的回调函数是什么

3.服务器的初始化(Connection只是使用的一个结构体)

4.等待就绪事件:有事件就绪,对使用Connection的不同对象(封装fd,回调方法)调用对应的回调方法;

5._listenSock读取 :Accepter函数获取新链接怎么处理的

6.普通套接字读取:Recver

7.对写事件的关心是按需关系的:

8.执行效果:

9.Reactor的优势:


Reactor完整代码连接

前置知识:

Reactor叫做反应堆模式;反应:对已经就绪的事件(读、写、异常)进行处理;

我们使用epoll来实现,select、poll、epoll是多路转接的发展史,epoll完善了select、poll的缺点;

  • 需要程序员维护数组                                      select/poll都有这个缺点    
  • 有大量的遍历                                                 select/poll都有这个缺点 
  • 大量参数为输入输出型参数,需要重新设置  select有
  • 管理的fd有上限                                              select有

 1.普通的epoll读写有什么问题?

  • 使用的是一个静态数组读取;报文长,读到就是不完整报文,报文短,一次读取可能有多个报文,最后一个报文可能不是完整的;
  • 这样的错误报文没法分析和处理,就不能构造响应报文;

综上所述:问题为没法保证读取到的是完整报文,导致没法分析和处理、构建响应报文;

    void Read(int fd){char buff[1024];ssize_t s = read(fd, buff, 1023);if(s > 0){buff[s] = 0;LOG2(INFO, buff, fd);}

解决办法:将文件描述符封装,并且有接受发送缓冲区,使用string就可以;

  1. 使用静态数组读取,然后添加到接受缓冲区保存;
  2. 读取完毕,对接受缓冲区分析是否有完整报文;
  3. 处理完整请求报文,构建响应报文添加到发送缓冲区;
using func_t = std::function<void(Connection*)>;
using cals_t = std::function<void(std::string &, Connection*)>;class Connection{
public:Connection(int fd = -1 ):_fd(fd), _ts(nullptr){}~Connection(){if(_fd >= 0)close(_fd);}
public:int _fd;//读写异常回调方法func_t _recver;func_t _sender;func_t _exception;//接受缓冲区std::string _outbuff;//发送缓冲区std::string _inbuff;//TcpServer的回指指针,对写事件的关心是按需打开TcpServer *_ts;//连接最近活跃活动的时间time_t _times;
};

2.Connection内的回调函数是什么

一个包装器;返回值为void,参数为Connection*,的函数指针、仿函数、lamada表达,它都可以接受;

using func_t = std::function<void(Connection*)>;

优势:

  • _listenSock是读方法是接受新连接,普通是读取请求报文
  • 初始化时设置读写异常回调方法(回调:使用函数指针执行的函数),不需要判断是_listenSock还是普通套接字,统一使用con->_recver;

3.服务器的初始化(Connection只是使用的一个结构体)

  1. 套接字创建,bind,监听;
  2. 构建epoll模型,epoll函数也封装了,_epfd封装在Epoll类内;
  3. 初始化_listenSock    Connection结构体;读取回调方法Accept是类函数,多一个this指针,需要使用std::bind来改变参数个数,进行传递给包装器
  4. epoll_wait的事件就绪队列初始化;
class TcpServer{const static int gport = 8080;const static int gnum = 128;TcpServer(int port = gport, int num = gnum):_port(gport), _evts_num(num){//套接字,创建bind监听_listenSock = Sock::Socket();Sock::Bind(_listenSock,_port);Sock::Listen(_listenSock);//构建epoll模型_epoll.CreateEpoll();//listensock添加epoll模型和_connections管理起来AddConnection(_listenSock, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);//epoll_wait就绪队列,获取已就绪的事件_evts = new epoll_event[_evts_num];}~TcpServer(){if(_listenSock >= 0)close(_listenSock);if(_evts != nullptr)delete[] _evts;for(auto &pr : _connections){_connections.erase(pr.first);delete pr.second;}}
private:int _listenSock;//epollEpoll _epoll;//就绪队列epoll_event* _evts;int _evts_num;//管理connection对象std::unordered_map<int, Connection*> _connections;int _port;//业务处理的回调指针cals_t _cb;
};

4.等待就绪事件:有事件就绪,对使用Connection的不同对象(封装fd,回调方法)调用对应的回调方法;

    void LoopOnce(){int n = _epoll.WaitEpoll(_evts, _evts_num);//有事件就绪if(n > 0){//LOG2(INFO, "epoll wait success",fd);for(int i=0; i<n; i++){int fd = _evts[i].data.fd;int events = _evts->events;//连接关闭或者错误,改为读写统一处理,读写失败调异常处理;if( events & EPOLLHUP){LOG2(INFO,"连接关闭",fd);events |= (EPOLLIN | EPOLLOUT);}if( events & EPOLLERR){LOG2(INFO,"错误",fd);events |= (EPOLLIN | EPOLLOUT);} if(_connections.count(fd) && events & EPOLLIN){if(IsConnectionExits(fd) && _connections[fd]->_recver != nullptr){_connections[fd]->_recver(_connections[fd]);}}if(_connections.count(fd) && events & EPOLLOUT){if(IsConnectionExits(fd) && _connections[fd]->_sender != nullptr)_connections[fd]->_sender(_connections[fd]);}}}else if(n == 0){LOG(INFO, "timeout");}else{LOG(FATAL,"epoll wait fail");exit(4);}}void Dispatcher(cals_t cb){_cb = cb;while(true){//去除不活跃的连接DeleteInactivity();LoopOnce();}}

5._listenSock读取 :Accepter函数获取新链接怎么处理的

  • 得到新连接,如果新的fd是合法的,设置对应的读写异常回调方法,读:读取请求报文,写:发送响应报文,异常:出现错误进行处理;
  • 所有的套接字都是使用ET模式(通知效率高,只支持非阻塞读写),EPOLLET因该被设置;
void Accepter(Connection * con){while(true){con->_times = time(nullptr);struct sockaddr_in tmp;socklen_t tlen = sizeof(tmp);int new_sock = accept(con->_fd, (struct sockaddr *)&tmp, &tlen);if(new_sock < 0){//所以事件处理完毕if(errno == EAGAIN || errno == EWOULDBLOCK)break;else if(errno == EINTR)//可能被信号中断,概率极小continue;else{std::cout << "accept fail , errno :" << errno << strerror(errno) << std::endl;break;}} else//添加到epoll模型和_connections中管理;{if(AddConnection(new_sock, std::bind(&TcpServer::Recver, this, std::placeholders::_1), std::bind(&TcpServer::Sender, this, std::placeholders::_1),std::bind(&TcpServer::Exception, this, std::placeholders::_1)))LOG2(INFO, "add connection success",new_sock);elseLOG2(RISK, "add connection fail", new_sock);}}}

6.普通套接字读取:Recver

  1. 一直读取,直到错误或者读取完毕;每次读取的结果都放到接受缓冲区;
  2. 读取完毕,对接受缓冲区处理,拿出一个个完整的报文,对请求报文进行业务处理;
    void Recver(Connection *con){con->_times = time(nullptr);const int buff_size = 1024;char buff[buff_size];while(true){ssize_t s = recv(con->_fd, buff, buff_size - 1, 0);if(s > 0){buff[s] = 0;con->_outbuff += buff;}else if(s == 0){LOG2(INFO, "写端关闭", con->_fd);con->_exception(con);return;}else{//读取完毕if(errno == EAGAIN || errno == EWOULDBLOCK ){LOG2(INFO, "处理完毕", con->_fd);break;}else if(errno == EINTR)continue;else{LOG2(ERROR, "recv fail ,fd: ", con->_fd);con->_exception(con);return;}}}std::cout << "fd: " << con->_fd << "outbuff: " << con->_outbuff <<std::endl;//对outbuff内的完整报文,进行处理std::vector<std::string> out;//分隔报文,函数在protocol.hppSplitMessage(out, con->_outbuff);for(auto &s : out)_cb(s, con);//业务逻辑回调指针,在主函数}

7.对写事件的关心是按需关系的:

  • 如果开启关心,还没有数据发送,写事件会一直就绪;所以按需关心;
  • 请求报文业务处理完毕,构建好响应报文,一定有响应,打开对写事件的关心;
  • 也是Connection为什么封装一个Tcperver指针的原因,这里开启写事件的关心;
//业务处理
void CalArguments(std::string &str, Connection *con)
{//请求报文反序列化Request req;//std::cout<<str <<std::endl;if(!req.Deserialize(str)){LOG2(ERROR, "deseroalize fail" ,con->_fd);return;}//对数据处理Response res;calculator(req, res);//响应报文序列化std::string s = res.Serialize();//添加到inbuffcon->_inbuff += s;//一定有响应报文,打开写事件的关系con->_ts->EnableReadWrite(con->_fd, true, true);
}

8.执行效果:

  • 我写的协议是对任意两个数加减乘除;
  • 每个请求或者响应都是用 x  做为分割符的;

9.Reactor的优势:

和进程/线程做比较:

  • 它是一个单进程的就可以并发处理请求的服务器,比进程/线程减少了创建、销毁、调度的时间;
  • 它的等待一批fd,减少了单位等待时间;一个线程等待对应一个fd;
  • 有很高的复用性,替换业务逻辑就行了;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/66024.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Java核心知识】ThreadLocal相关知识

ThreadLocal 什么是ThreadLocal ThreadLoacal类可以为每个线程保存一份独有的变量&#xff0c;该变量对于每个线程都是独占的。实现原理为每个Thread类中包含一个ThreadHashMap&#xff0c;key为变量的对应的ThreadLocal对象&#xff0c;value为变量的值。 在日常使用中&…

python编写MQTT订阅程序

Download | Eclipse Mosquitto 1、下载&#xff1a; https://mosquitto.org/files/binary/win64/mosquitto-2.0.17-install-windows-x64.exe 2、安装&#xff1a; 3、conf配置 1)使用notepad打开“C:\Program Files\mosquitto\mosquitto.conf”另存为c:\myapp\msquitto\mo…

VueRouter使用详解(5000字通关大全)

Vue Router是一个官方的路由管理器&#xff0c;它可以让我们在Vue应用中实现单页面应用&#xff08;SPA&#xff09;的效果&#xff0c;即通过改变URL而不刷新页面来显示不同的内容。Vue Router可以让我们定义多个路由&#xff0c;每个路由对应一个组件&#xff0c;当URL匹配到…

RT-Thread 线程间同步

线程间同步 在多线程实时系统中&#xff0c;一项工作的完成往往可以通过多个线程协调的方式共同来完成&#xff0c;那么多个线程之间如何 “默契” 协作才能使这项工作无差错执行&#xff1f;下面举个例子说明。 例如一项工作中的两个线程&#xff1a;一个线程从传感器中接收…

菜鸟教程《Python 3 教程》笔记(12):推导式

菜鸟教程《Python 3 教程》笔记&#xff08;12&#xff09; 12 推导式12.1 列表推导式12.2 字典推导式12.3 集合推导式12.4 元组推导式&#xff08;生成器表达式&#xff09; 笔记带有个人侧重点&#xff0c;不追求面面俱到。 12 推导式 出处&#xff1a; 菜鸟教程 - Python3 …

nodejs中如何使用Redis

Redis介绍&#xff1a; Redis 是一个开源的内存数据结构存储器&#xff0c;一般可以用于数据库、缓存、消息代理等&#xff0c;我们常在项目中用redis解决高并发、高可用、高可扩展、大数据存储等问题&#xff1b; 它本质上是一个NoSql&#xff08;非关系型数据库&#xff09;…

Linux开机启动Tomcat

需求背景 Linux重启后要手动执行"startup.sh"启动Tomcat&#xff0c;比较麻烦&#xff0c;想要Linux开机启动Tomcat。 开机启动 #---------------------------------------------------------- sudo tee /usr/bin/tomcat.sh <<-EOF #! /bin/bash nohup /opt/to…

Compose学习 - remember、mutableStateOf的使用

一、需求 在显示界面中&#xff0c;数据变动&#xff0c;界面刷新是非常常见的操作&#xff0c;所以使用compose该如何实现呢&#xff1f; 二、remember、mutableStateOf的使用 我们可以借助标题的两个概念 remember、mutableStateOf来完成。这里先不写定义&#xff0c;定义…

C#基础知识点记录

目录 课程一、C#基础1.C#编译环境、基础语法2.Winform-后续未学完 课程二、Timothy C#底层讲解一、类成员0常量1字段2属性3索引器5方法5.1值参数&#xff08;创建副本&#xff0c;方法内对值的操作&#xff0c;不会影响原来变量的值&#xff09;5.2引用参数&#xff08;传的是地…

Unix System V BSD POSIX 究竟是什么?

学习Linux系统,很多同学对这些单词概念很模糊、一脸懵逼! 黄老师觉得,了解了历史,才会真正明白这些单词的含义,坐稳、黄老师发车了!!! 首先介绍一下什么是Unix? UNIX(非复用信息和计算机服务,英语:Uniplexed Information and Computing Service,UnICS)取“UNI…

ELK日志收集系统(四十九)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 一、概述 二、组件 1. elasticsearch 2. logstash 2.1 工作过程 2.2 INPUT 2.3 FILETER 2.4 OUTPUTS 3. kibana 三、架构类型 3.1 ELK 3.2 ELKK 3.3 ELFK 3.5 EF…

HTML 与 CSS 有什么区别?

HTML&#xff08;超文本标记语言&#xff09;和 CSS&#xff08;层叠样式表&#xff09;是构建网页的两个核心技术。HTML负责定义网页的结构和内容&#xff0c;而CSS则用于控制网页的样式和布局。虽然它们在构建网页时密切相关&#xff0c;但它们在功能和用途上有明显的区别。 …

go语言-协程

mOS结构体 每一种操作系统不同的线程信息 g给g0栈给g0协程内存中分配的地址&#xff0c;记录函数跳转信息&#xff0c; 单线程循环 0.x版本 1.0版本 多线程循环 操作系统并不知道Goroutine的存在 操作系统线程执行一个调度循环&#xff0c;顺序执行Goroutine 调度循环非常…

【LeetCode周赛】LeetCode第359场周赛

LeetCode第359场周赛 判别首字母缩略词k-avoiding 数组的最小总和销售利润最大化找出最长等值子数组 判别首字母缩略词 给你一个字符串数组 words 和一个字符串 s &#xff0c;请你判断 s 是不是 words 的 首字母缩略词 。 如果可以按顺序串联 words 中每个字符串的第一个字符…

css-grammar

语法格式 选择器 {属性名称 : 属性值; 属性名称 : 属性值;...}语法特点: CSS声明总是以键值对(key\value)形式存在。CSS声明总是以分号(;)结束。声明组以大括号({})括起来。为了让CSS可读性更强&#xff0c;每行只描述一个属性。 CSS 注释 注释是用来解释你的代码&#xff…

vue Cesium接入在线地图

Cesium接入在线地图只需在创建时将imageryProvider属性换为在线地图的地址即可。 目录 天地图 OSM地图 ArcGIS 地图 谷歌影像地图 天地图 //矢量服务let imageryProvider new Cesium.WebMapTileServiceImageryProvider({url: "http://t0.tianditu.com/vec_w/wmts?s…

大数据组件-Flume集群环境的启动与验证

&#x1f947;&#x1f947;【大数据学习记录篇】-持续更新中~&#x1f947;&#x1f947; 个人主页&#xff1a;beixi 本文章收录于专栏&#xff08;点击传送&#xff09;&#xff1a;【大数据学习】 &#x1f493;&#x1f493;持续更新中&#xff0c;感谢各位前辈朋友们支持…

【python爬虫】中央气象局预报—静态网页图像爬取练习

静态网页爬取练习 中央气象局预报简介前期准备步骤Python爬取每日预报结果—以降水为例 中央气象局预报简介 中央气象台是中国气象局&#xff08;中央气象台&#xff09;发布的七天降水预报页面。这个页面提供了未来一周内各地区的降水预报情况&#xff0c;帮助人们了解即将到来…

深入学习 cnf问题 和 SAT 算法

前言 SAT问题是一个重要的计算机科学和人工智能问题&#xff0c;它涉及在给定的布尔变量集合和子句集合下&#xff0c;确定是否存在一种变量赋值使得整个合取范式成为真。这个问题在实际应用中有广泛的用途&#xff0c;包括硬件设计、安全协议验证等。 怎么看待 cnf cnf 文件本…

如何高效地设计测试用例并评审

编写出好的测试用例是每一个测试工程师的职责&#xff0c;但在实际工作中大家写的测试用例往往需要不断地修改才能使用&#xff0c;这不仅浪费了时间&#xff0c;还容易让测试工程师产生自我否定的情绪&#xff0c;甚至在团队中产生各种矛盾。 那如何高效地设计测试用例呢&…