IO多路转接 - epoll
- 一、I/O多路转接之 epoll
- 1. epoll 接口
- (1)epoll_create()
- (2)epoll_wait()
- (3)epoll_ctl()
- 2. epoll 原理
- 3. epoll 的优点
- 4. epoll 的使用
- 5. epoll 的工作模式
- (1)水平触发 Level Triggered 工作模式(LT 模式)
- (2)边缘触发 Edge Triggered 工作模式(ET 模式)
- (3)LT 和 ET 的对比
- 二、Reactor
- 1. 概念
- 2. 实现
- (1)Epoller.hpp
- (2)TcpServer.hpp
- (3)Calculator.hpp
- (4)main.cpp
- (5)CMakeLists.txt
- 3. 总结
一、I/O多路转接之 epoll
1. epoll 接口
(1)epoll_create()
首先 epoll_create()
这个接口就是帮我们创建一个 epoll 模型,这个模型是什么我们后面介绍原理的时候再讲。
其中 epoll_create()
的接口如下:
其中 epoll_create1()
是新标准,我们不介绍。而 epoll_create()
的参数 size 也已经废弃了,这个参数传什么也无所谓了,只要大于 0 就可以了。它的返回值也是一个文件描述符,成功则返回一个新的文件描述符,失败则返回 -1.
(2)epoll_wait()
当 epoll 模型创建好之后,我们想往这个 epoll 模型中新增一个要关心的 fd 及其事件;修改一个或者删除一个文件描述符及其事件;就需要用到 epoll_wait()
这个接口。该接口如下:
epoll_wait() 本质就是获取已经就绪的文件描述符。第一个参数 epfd 就是 epoll_create() 的返回值;第二个和第三个参数就是我们将来定义的一个用户级缓冲区,返回已经就绪的 fd 和 事件;最后一个参数的含义和 poll 的 timeout 一模一样,单位为毫秒。而返回值表示已经就绪的文件描述符的个数。
其中我们看到第二个参数中带有 struct epoll_event
这个类型的结构体,这个结构体是什么呢?我们来看一下:
如上图,epoll_event 中的 events 表示哪些事件,它的类型是 uint32_t,也就是一个位图,和 poll 中的 events 一样,以位图的形式传递标记位事件;而第二个字段 data 的类型 epoll_data_t 是一个联合体,就是可以选择该联合体字段中的任意一个,通常用来保存的是用户级的数据,有关这个字段我们后面再说。
其中 events 可以是以下几个宏的集合:
- EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
- EPOLLOUT : 表示对应的文件描述符可以写;
- EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
- EPOLLERR : 表示对应的文件描述符发生错误;
- EPOLLHUP : 表示对应的文件描述符被挂断;
- EPOLLET : 将 EPOLL 设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的;
- EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 EPOLL 队列里。
(3)epoll_ctl()
为了使用这个 epoll,首先我们也需要将 listen 套接字添加到 epoll 模型里,所以就需要用到 epoll_ctl()
接口。该接口如下:
它的作用主要是我们想向系统里新增一个文件描述符,及其要关心的事件,想要修改一个特定的文件描述符关心的事件。所以 epoll_ctl()
支持 epoll 来进行相关的管理工作。
其中第一个参数就是 epoll_create() 的返回值。第二个参数 op 就是以下三个选项的其中一个,分别代表增加,修改,删除:
第三个参数和第四个参数代表哪一个文件描述符上的哪些事件。
所以我们需要使用 epoll 的话,需要使用以上三个系统调用,而 select 和 poll 都只有一个系统调用。
2. epoll 原理
无论是 select 和 poll 都是用数组来管理文件描述符和对应的事件,更重要的是该数组这个数据结构是由用户来维护的!接下来我们解释一下 epoll 的原理,就能明白为什么需要三个系统调用来使用 epoll 了。
操作系统在硬件层面,通过硬件中断的方式知道网卡上有数据了,然后通过网卡驱动上的方法将数据拷贝到网卡驱动上的数据链路层。同时,操作系统为了支持 epoll,它为我们提供了三种机制:
- 操作系统在内部会帮我们维护一颗红黑树
其中红黑树中的节点包含的最重要的字段是:int fd
和 uint32_t event
,分别代表内核要关心的文件描述符和要关心的事件。
- 操作系统会为我们维护一个就绪队列
一旦红黑树中有特定的一个节点,比如某个节点上的文件描述符的某个事件就绪了,就可以把该节点添加到就绪队列中;其中该就绪队列中每个节点中的字段包含 int fd
和 uint32_t event
,分别代表已经就绪的文件描述符和已经就绪的事件。
- 操作系统的底层网卡驱动是允许操作系统去注册一些回调机制的
操作系统内部会提供一个回调函数,这个回调函数是用来干什么的呢?首先网卡通过硬件中断的方式将数据搬到了网卡驱动层。当网卡驱动层中的数据链路层有数据就绪了,主动会调用该回调函数。然后该回调方法会做如下几个操作:
- 向上交付
- 交付给 TCP 的接收队列
- 根据文件描述符为键值查找红黑树,确认这个接受队列和哪一个文件描述符是关联的,再判断该 fd 是否关心了 EPOLLIN 或者 EPOLLOUT 读写事件,如果有,由于数据已经就绪,所以接下来第四步
- 构建就绪节点,插入到就绪队列中
实际上我们用 epoll 的时候,操作系统就会把该回调函数注册到底层,然后底层数据一旦就绪就会自动回调执行上面的四个方法。所以对于用户来说,只需要在就绪队列中获取就绪节点即可!整套机制都是由操作系统完成的!
我们把这三套机制叫做 epoll 模型,如下图:
其中 eventpoll 为 epoll 对象;epitem 为红黑树的节点。
所以接口 epoll_create()
创建 epoll 模型本质就是创建红黑树,创建就绪队列以及注册底层的回调机制。所以该 epoll 模型怎么让进程找到呢?其实给 epoll 模型放入到 struct file 对象即可,把它也当作文件!因为在 Linux 中一切皆文件!struct file 中也有指针指向 epoll 模型!所以再把该 struct file 对象添加到进程的文件描述符表中即可!
所以 epoll_create()
实质上就是在操作系统中创建 struct file,其中的指针指向整个 epoll 对象,对应的文件描述符就能挂接到进程的文件描述符表中,最后把该文件描述符返回给用户,所以我们就可以通过该文件描述符找到 struct file 并找到 epoll 模型了。epoll_ctl()
实质上的增加、修改、删除都是在对红黑树进行操作。其中 epoll_wait()
的第二个参数是输出型参数,它会将就绪队列中所有就绪的节点一个一个地放进 struct epoll_event
里。
3. epoll 的优点
基于 epoll 的原理,我们可以得到 epoll 的优势:
- 检测就绪的时间复杂度为 O(1),因为只需要看队列是否为空就可以了。而获取就绪的时间复杂度为 O(n),因为需要将就绪队列中的节点一个一个拷贝到应用层。
- fd 和 event 没有上限,因为该红黑树有多大由操作系统说了算
- 由于该红黑树是操作系统帮我们维护的,所以不需要在用户层由用户维护一个数组这样的数据结构,来管理所有的文件描述符及其要关心的事件了
- epoll_wait() 的返回值 n,表示有 n 个 fd 就绪了,那么该接口还会将已经就绪的节点放入到它的输出型参数 events 中,所以就绪事件是连续的,有 n 个!这意味着,上层用户处理已经就绪的事件,就不再需要像以前一样检测有哪些 fd 是非法的,哪些是没有就绪的了;只需要根据返回值 n,遍历 events 即可!
4. epoll 的使用
我们对 epoll 的相关接口进行一下简单的封装成为 Epoller.hpp,如下:
#pragma once#include "NoCopy.hpp"#include "log.hpp"#include <sys/epoll.h>#include <cstring>#include <cerrno> class Epoller : public NoCopy{static const int size = 128;public:Epoller(){_epfd = epoll_create(size);if(_epfd == -1){lg(Fatal, "epoll_create error: %s", strerror(errno));}else{lg(Info, "epoll_create success: %d", _epfd);}}int EpollerWait(struct epoll_event revents[], int num){int n = epoll_wait(_epfd, revents, num, _timeout);return n;}int EpollerCtl(int oper, int sockfd, uint32_t event){ int n = 0;if(oper == EPOLL_CTL_DEL){n = epoll_ctl(_epfd, oper, sockfd, nullptr);if(n != 0){lg(Error, "epoll_ctl delete error!");}}else{// EPOLL_CTL_MOD 或者 EPOLL_CTL_ADD// 设置进内核的红黑树中struct epoll_event ev;ev.events = event;ev.data.fd = sockfd; // 方便我们后面处理的时候知道是哪一个 fd 就绪了n = epoll_ctl(_epfd, oper, sockfd, &ev);if(n != 0){lg(Error, "epoll_ctl error!");}}return n;}~Epoller(){if(_epfd >= 0){close(_epfd);}}private:int _epfd;int _timeout = 1000;};
接下来编写 epollSever.hpp,如下:
#pragma once#include <iostream>#include <memory>#include <sys/epoll.h>#include "Socket.hpp"#include "log.hpp"#include "Epoller.hpp"#include "NoCopy.hpp"uint32_t EVENT_IN = (EPOLLIN);uint32_t EVENT_OUT = (EPOLLOUT);class EpollServer : public NoCopy{static const int maxevents = 64;public:EpollServer(uint16_t port):_port(port),_listenSocket_ptr(new Sock()),_epoller_ptr(new Epoller()){}void Init(){_listenSocket_ptr->Socket();_listenSocket_ptr->Bind(_port);_listenSocket_ptr->Listen();lg(Info, "create listen socket success: %d\n", _listenSocket_ptr->GetFd());}void Accepter(){std::string client_ip;uint16_t client_port;int sockfd = _listenSocket_ptr->Accept(&client_ip, &client_port);if(sockfd > 0){// 不能直接读取,而是将它添加到内核的红黑树中,让 epoll 关心即可_epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, sockfd, EVENT_IN);lg(Info, "get a new link, client info@ %s:%d", client_ip.c_str(), client_port);}else{return;}}// for testvoid Recver(int fd){char buffer[1024];ssize_t n = read(fd, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;std::cout << "get a message: " << buffer << std::endl;// writestd::string echo_str = "sever echo $ ";echo_str += buffer;write(fd, echo_str.c_str(), echo_str.size());}else if (n == 0){lg(Info, "client quit, me too, close fd is: %d", fd);// 先在内核红黑树中移除 fd,再关闭 fd_epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, fd, 0);close(fd);}else{lg(Warning, "recv error, fd is: %d", fd);_epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, fd, 0);close(fd);}}void Dispatcher(struct epoll_event revs[], int num){for(int i = 0; i < num; ++i){uint32_t event = revs[i].events;int fd = revs[i].data.fd;if(event & EVENT_IN){// 读事件就绪if(fd == _listenSocket_ptr->GetFd()){// 获取到一个新连接,连接管理器Accepter();}else{// 其它 fd 上的普通读取事件就绪Recver(fd);}}else if(event & EVENT_OUT){// 写事件就绪// ...}else{// ...}}}void Start(){// 将 listenSocket 添加到 epoll 中// 也就是将 listenSocket 和它所关心的事件添加到内核 epoll 模型中的红黑树中!_epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, _listenSocket_ptr->GetFd(), EVENT_IN);struct epoll_event revs[maxevents];while(true){// 其中 n 最大是 maxeventsint n = _epoller_ptr->EpollerWait(revs, maxevents);if(n > 0){// 有事件就绪,分派事件lg(Debug, "event happend, fd is: %d", revs[0].data.fd);Dispatcher(revs, n); }else if(n == 0){lg(Info, "time out...");}else {lg(Error, "epoll wait error");}}}~EpollServer(){_listenSocket_ptr->Close(); }private:std::shared_ptr<Sock> _listenSocket_ptr;std::shared_ptr<Epoller> _epoller_ptr;uint16_t _port;};
我们上面两个模块都用到了 NoCopy 这个类,也就是禁止拷贝,代码如下:
#pragma onceclass NoCopy{public:NoCopy(){}NoCopy(const NoCopy&) = delete;const NoCopy& operator=(const NoCopy&) = delete;};
5. epoll 的工作模式
(1)水平触发 Level Triggered 工作模式(LT 模式)
epoll 默认所处的工作模式就是 LT 模式。例如我们上面所写的简单的 epoll 服务器,每次有新的连接到来时,如果我们不处理它,epoll 会每次都通知我们有连接到来了。这种一旦有新的连接到来,或者有新的数据到来,上层如果不取走,底层就会一直通知上层,让上层把数据尽快取走,这种模式就叫做 LT 模式。就像示波器中的高电平,一直有效。
(2)边缘触发 Edge Triggered 工作模式(ET 模式)
而 ET 模式指的是,数据或者连接,从无到有,从有到多,变化的时候,才会通知我们一次。正是因为 ET 模式有这种特点,才会倒逼程序员每次通知都必须把本轮数据全部取走,怎么保证数据全部取走呢?所以就需要循环读取,直到读取出错!但是我们使用 read() 或者 recv() 在缓冲区中读取数据的时候,当缓冲区的数据没有了,因为它们的读取方式默认是阻塞的,所以此时就会阻塞,服务器就会被挂起!所以我们在 ET 模式下,所有的 fd 必须是要设置为非阻塞的!
(3)LT 和 ET 的对比
-
select 和 poll 其实也是工作在 LT 模式下;epoll 既可以支持 LT,也可以支持 ET;
-
普遍地我们认为,ET 的工作模式比 LT 的工作模式通知效率更高,因为通知一次就可以倒逼上层把全部数据读取走。同时也看得出来 ET 模式的 IO 效率也更高,这也就意味着,TCP 会向对方通告一个更大的窗口,从而从概率上让对方一次给自己发送更多的数据!
-
所谓的 LT 模式和 ET 模式,本质就是向就绪队列中放入多个或者一个就绪的事件
-
但是 ET 模式就一定比 LT 模式的效率高吗?不一定!因为 LT 也可以将所有的 fd 设置为非阻塞,然后循环读取,也就是当通知一次的时候,就把数据全部取走了,就和 ET 一样了!所以谁的效率高不一定,要看具体的实现。
二、Reactor
1. 概念
我们在上面编写的 epoll 服务器的代码中,在其他普通的 fd 读取事件就绪时,也就是在 Recver() 中,读取是有问题的,因为我们不能区分每次读取上来的数据是一个完整的报文。另外还有其它各种问题,所以我们要对上面的代码使用 Reactor 的设计模式作修改。
所谓的 Reactor 是一种设计模式,翻译过来称为反应堆模式。用于处理事件驱动的系统中的并发操作。它提供了一种结构化的方式来处理输入事件,并将其分发给相应的处理程序。Reactor 模式通常用于网络编程中,特别是在服务器端应用程序中。
要进行正确的 IO 处理,就应该有如下的理解:在应用层一定存在大量的连接,每一个连接在应用层都叫做文件描述符。而在读取每一个文件描述符上的数据的时候,可能根本就没有读取完,此时我们就需要把该文件描述符上的数据临时保存起来。所以我们在写服务器的时候,我们要保证每一个文件描述符及其连接及其缓冲区,都是独立的!
2. 实现
(1)Epoller.hpp
Epoller.hpp 是对 epoll 的系统调用的封装,代码如下:
#pragma once#include "NoCopy.hpp"#include "log.hpp"#include <sys/epoll.h>#include <cstring>#include <cerrno> class Epoller : public NoCopy{static const int size = 128;public:Epoller(){_epfd = epoll_create(size);if(_epfd == -1){lg(Fatal, "epoll_create error: %s", strerror(errno));}else{lg(Info, "epoll_create success: %d", _epfd);}}int EpollerWait(struct epoll_event revents[], int num, int timeout){int n = epoll_wait(_epfd, revents, num, timeout);return n;}int EpollerCtl(int oper, int sockfd, uint32_t event){ int n = 0;if(oper == EPOLL_CTL_DEL){n = epoll_ctl(_epfd, oper, sockfd, nullptr);if(n != 0){lg(Error, "epoll_ctl delete error! sockfd: %d", sockfd);}}else{// EPOLL_CTL_MOD 或者 EPOLL_CTL_ADD// 设置进内核的红黑树中struct epoll_event ev;ev.events = event;ev.data.fd = sockfd; // 方便我们后面处理的时候知道是哪一个 fd 就绪了n = epoll_ctl(_epfd, oper, sockfd, &ev);if(n != 0){lg(Error, "epoll_ctl error!");}}return n;}~Epoller(){if(_epfd >= 0){close(_epfd);}}private:int _epfd;int _timeout = 1000;};
(2)TcpServer.hpp
TcpServer.hpp 是处理 IO 的服务器,代码如下:
#pragma once#include <iostream>#include <string>#include <unordered_map>#include <memory>#include <functional>#include <cerrno> #include "log.hpp"#include "NoCopy.hpp"#include "Epoller.hpp"#include "Socket.hpp"#include "Comm.hpp"// 设置 ET 模式uint32_t EVENT_IN = (EPOLLIN | EPOLLET);uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);const static int g_buffer_size = 128;class Connection;class TcpServer;using func_t = std::function<void(std::weak_ptr<Connection>)>;using except_t = std::function<void(std::weak_ptr<Connection>)>;// 管理每一个连接class Connection {public:Connection(int sockfd):_sockfd(sockfd){}void SetHandler(func_t recv_cb, func_t send_cb, except_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}~Connection(){}private:int _sockfd;// 充当缓冲区std::string _inbuffer; std::string _outbuffer;public:// 回指指针// std::shared_ptr<TcpServer> _tcpServer_ptr;std::weak_ptr<TcpServer> _tcpServer_ptr;// 回调方法func_t _recv_cb;func_t _send_cb;except_t _except_cb;std::string _ip;uint16_t _port;int Sockfd(){return _sockfd;}void AppendInBuffer(const std::string& info){_inbuffer += info;}void AppendOutBuffer(const std::string& info){_outbuffer += info;}std::string& Inbuffer(){return _inbuffer;}std::string& Outbuffer(){return _outbuffer;}void SetWeakPtr(std::weak_ptr<TcpServer> tcpServer_ptr){_tcpServer_ptr = tcpServer_ptr;}};// enable_shared_from_this 可以提供返回当前对象的 this 对应的 shared_ptrclass TcpServer : public std::enable_shared_from_this<TcpServer>, public NoCopy{static const int num = 64;public:TcpServer(uint16_t port, func_t OnMessage):_port(port),_quit(true),_OnMessage(OnMessage),_epoller_ptr(new Epoller()),_listenSock_ptr(new Sock()){}void Init(){_listenSock_ptr->Socket();// 将 fd 设置为非阻塞SetNonBlockOrDie(_listenSock_ptr->GetFd());_listenSock_ptr->Bind(_port);_listenSock_ptr->Listen();lg(Info, "create listen socket success: %d\n", _listenSock_ptr->GetFd());AddConnection(_listenSock_ptr->GetFd(), EVENT_IN, \std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);}void AddConnection(int sockfd, uint32_t event, func_t recv_cb, func_t send_cb, except_t except_cb,\const std::string& ip = "0.0.0.0", uint16_t port = 0){// 1. 给所有的套接字建立一个 connection 对象std::shared_ptr<Connection> new_connection(new Connection(sockfd));// shared_from_this(): 返回当前对象的 shared_ptrnew_connection->SetWeakPtr(shared_from_this()); new_connection->SetHandler(recv_cb, send_cb, except_cb);new_connection->_ip = ip;new_connection->_port = port;// 2. 将套接字和 Connection 添加到 unordered_map 中_connections.insert(std::make_pair(sockfd, new_connection));// 3. 将 listen 套接字或其它事件添加到 epoll 模型中_epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, sockfd, event);lg(Debug, "add a new connection success, sockfd is: %d", sockfd);}// listen 套接字的连接管理器,即有事件就绪的时候,就是有连接到来,就需要处理新连接void Accepter(std::weak_ptr<Connection> conn){auto connection = conn.lock();// 不断检测是否还有新连接,直到读取出错while(true){struct sockaddr_in peer;socklen_t len = sizeof(peer);int sockfd = ::accept(connection->Sockfd(), (sockaddr*)&peer, &len);// 获取到新连接设置为非阻塞,然后构建 Connection 对象放入哈希表中和内核红黑树中if(sockfd > 0){uint16_t peer_port = ntohs(peer.sin_port);char ipbuffer[128];inet_ntop(AF_INET, &peer.sin_addr, ipbuffer, sizeof(ipbuffer));lg(Debug, "get a new client, get info-> [%s: %d], sockfd: %d", ipbuffer, peer_port, sockfd);SetNonBlockOrDie(sockfd);AddConnection(sockfd, EVENT_IN, \std::bind(&TcpServer::Recver, this, std::placeholders::_1),\std::bind(&TcpServer::Sender, this, std::placeholders::_1),\std::bind(&TcpServer::Excepter, this, std::placeholders::_1),\ipbuffer, peer_port);}else{if(errno == EWOULDBLOCK){ // 读取完毕break;}else if(errno == EINTR){ // 信号原因中断continue;}else{break;}}}}// 普通事件的事件管理器// 对于服务器而言只需要进行IO,不需要关心是否读完和报文的格式void Recver(std::weak_ptr<Connection> conn){if(conn.expired()) return;auto connection = conn.lock();int sockfd = connection->Sockfd();while(true){char buffer[g_buffer_size];memset(buffer, 0, sizeof(buffer));ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取if(n > 0){connection->AppendInBuffer(buffer);}else if(n == 0){lg(Info, "sockfd: %d, client info %s: %d quit...", sockfd, connection->_ip.c_str(), connection->_port);connection->_except_cb(connection);}else{if(errno == EWOULDBLOCK){break;}else if(errno == EINTR){continue;}else{lg(Warning, "sockfd: %d, client info %s: %d recv error...", sockfd, connection->_ip.c_str(), connection->_port);connection->_except_cb(connection);return;}}}// 交给上层处理,读取到的数据都在 connection 中// 1. 检测// 2. 如果有完整报文,就处理_OnMessage(connection);}void Sender(std::weak_ptr<Connection> conn){if(conn.expired()) return;auto connection = conn.lock();auto& outbuffer = connection->Outbuffer();while(true){ssize_t n = send(connection->Sockfd(), outbuffer.c_str(), outbuffer.size(), 0);if(n > 0){outbuffer.erase(0, n);if(outbuffer.empty()){break;}}else if(n == 0){return;}else{if(errno == EWOULDBLOCK){break;}else if(errno == EINTR){continue;}else{lg(Warning, "sockfd: %d, client info %s: %d send error...", connection->Sockfd(), connection->_ip.c_str(), connection->_port);connection->_except_cb(connection);return;}}}// 没发完if(!outbuffer.empty()){// 开始对写事件关心EnableEvent(connection->Sockfd(), true, true);}else{// 关闭对写事件关心EnableEvent(connection->Sockfd(), true, false);}}void Excepter(std::weak_ptr<Connection> connection){if(connection.expired()) return;auto conn = connection.lock();lg(Debug, "Excepter hander sockfd: %d, client info %s: %d excepter handler", \conn->Sockfd(), conn->_ip.c_str(), conn->_port);// 1. 移除对特定 fd 的关心// EnableEvent(connection->Sockfd(), false, false);_epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, conn->Sockfd(), 0);// 2. 关闭异常的 fdlg(Debug, "close %d done...\n", conn->Sockfd());close(conn->Sockfd());// 3. 从 _connections 中移除 fd 和 Connection 的映射关系lg(Debug, "remove %d from _connections...\n", conn->Sockfd());_connections.erase(conn->Sockfd());}void EnableEvent(int sockfd, bool readAble, bool writeAble){uint32_t events = 0;events |= ((readAble ? EPOLLIN : 0) | (writeAble ? EPOLLOUT : 0) | EPOLLET);_epoller_ptr->EpollerCtl(EPOLL_CTL_MOD, sockfd, events);}bool IsConnectionExist(int fd){auto iter = _connections.find(fd);if(iter == _connections.end()){return false;}else{return true;}}void Dispatcher(int timeout){int n = _epoller_ptr->EpollerWait(revs, num, timeout);for(int i = 0; i < n; i++){uint32_t event = revs[i].events;int sockfd = revs[i].data.fd;// 一旦事件异常,统一把异常转换为读写问题if(event & EPOLLERR){event |= (EPOLLIN | EPOLLOUT);}if(event & EPOLLHUP){event |= (EPOLLIN | EPOLLOUT);}// 只需要处理读写if((event & EPOLLIN) && IsConnectionExist(sockfd)){if(_connections[sockfd]->_recv_cb){_connections[sockfd]->_recv_cb(_connections[sockfd]);}}if((event & EPOLLOUT) && IsConnectionExist(sockfd)){if(_connections[sockfd]->_send_cb){_connections[sockfd]->_send_cb(_connections[sockfd]);}}}}void Loop(){_quit = false;// AddConnection();while(!_quit){// 事件派发// Dispatcher(3000);Dispatcher(-1);PrintConnection();}_quit = true;}void PrintConnection(){std::cout << "_connections fd list: ";for(auto& connection : _connections){std::cout << connection.second->Sockfd() << ", ";std::cout << "inbuffer: " << connection.second->Inbuffer() << " ";}std::cout << std::endl;}~TcpServer(){}private:std::shared_ptr<Epoller> _epoller_ptr;std::shared_ptr<Sock> _listenSock_ptr; uint16_t _port;bool _quit;struct epoll_event revs[num];// fd 到对应连接到映射,_connections 就是当前服务器管理的所有连接std::unordered_map<int, std::shared_ptr<Connection>> _connections;// 让上层处理信息func_t _OnMessage;};
(3)Calculator.hpp
Calculator.hpp 是上层处理业务的具体处理方法,代码如下:
#pragma once#include <string>#include <iostream>#include "Protocol.hpp"enum{DIV_ERR = 1,MOD_ERR = 2,OP_ERR = 3};// 上层业务class Calculator{public:Calculator(){}Response CalculatorHelper(const Request &req){Response resp(0, 0);switch (req._op){case '+':resp._result = req._x + req._y;break;case '-':resp._result = req._x - req._y;break;case '*':resp._result = req._x * req._y;break;case '%':{if (req._y == 0)resp._code = MOD_ERR;elseresp._result = req._x % req._y;}break;case '/':{if (req._y == 0)resp._code = DIV_ERR;elseresp._result = req._x / req._y;}break;default:resp._code = OP_ERR;break;}return resp;}// "len"\n"10 + 20"\nstd::string Handler(std::string &package){std::string content;bool ret = Decode(package, &content); // content = "10 + 20"if (!ret)return "";Request req;ret = req.Deserialize(content); // x = 10, y = 20, op = '+'if (!ret)return "";content = "";Response resp = CalculatorHelper(req); // result = 30, code = 0resp.Serialize(&content); // content = "30 0"content = Encode(content); // content = "len"\n"30 0\n"return content;}~Calculator(){}};
(4)main.cpp
下面是主函数的调用:
#include <iostream>#include <memory>#include <functional>#include "TcpServer.hpp" // 处理IO#include "Calculator.hpp" // 处理业务#include "log.hpp"Calculator calculator;void DefaultOnMessage(std::weak_ptr<Connection> conn){if(conn.expired()) return;auto connection_ptr = conn.lock();std::cout << "Application layerget a message: " << connection_ptr->Inbuffer() << std::endl;// 对报文进行处理std::string response_str = calculator.Handler(connection_ptr->Inbuffer());if(response_str.empty()){return;}lg(Debug, "%s", response_str.c_str());// response_str 发送出去connection_ptr->AppendOutBuffer(response_str);// 因为写事件(发送缓冲区是否有空间,经常是ok的),经常是就绪的// 所以如果我们设置对 EPOLLOUT 关心,那么 EPOLLOUT 几乎每次都是就绪的// 就导致 epollserver 经常返回,浪费 CPU 资源// 所以,对于读取,我们设置为常关心;对于写,我们设置为按需设置// 处理写事件:直接写入,如果写入完成,就结束。// 如果写入完成,但是数据还没有写完,_outbuffer 里还有内容,我们就需要设置对写事件进行关心了,如果写完了,就去掉写事件的关心// connection_ptr->_send_cb(connection_ptr);auto tcpserver = connection_ptr->_tcpServer_ptr.lock();tcpserver->Sender(connection_ptr);}int main(){ std::shared_ptr<TcpServer> tcp_svr(new TcpServer(8888, DefaultOnMessage));tcp_svr->Init();tcp_svr->Loop();return 0;}
其中有一些头文件例如 Socket.hpp 和 log.hpp 我们以前已经用过,这里就不再放出来了。
(5)CMakeLists.txt
cmake_minimum_required(VERSION 2.8)project(ReactorServer)add_executable(reactorServer main.cc)target_link_libraries(reactorServer jsoncpp)add_executable(clientCal ClientCal.cc)target_link_libraries(clientCal jsoncpp)
3. 总结
Reactor 其实是一个半同步半异步模型,那么 IO 等于等待+数据拷贝,所以 Reactor 的半同步半异步体现在,等待是由 epoll 完成,这是体现同步;异步体现在 Reactor 可以进行回调处理。
在 Reactor 模式中,有一个事件循环(Event Loop)负责监听和分发事件。当有新的事件到达时,事件循环会将其分发给相应的处理程序进行处理。这种方式可以实现高效的并发处理,避免了线程创建和销毁的开销。