【计算机网络】epoll

IO多路转接 - epoll

  • 一、I/O多路转接之 epoll
    • 1. epoll 接口
      • (1)epoll_create()
      • (2)epoll_wait()
      • (3)epoll_ctl()
    • 2. epoll 原理
    • 3. epoll 的优点
    • 4. epoll 的使用
    • 5. epoll 的工作模式
      • (1)水平触发 Level Triggered 工作模式(LT 模式)
      • (2)边缘触发 Edge Triggered 工作模式(ET 模式)
      • (3)LT 和 ET 的对比
  • 二、Reactor
    • 1. 概念
    • 2. 实现
      • (1)Epoller.hpp
      • (2)TcpServer.hpp
      • (3)Calculator.hpp
      • (4)main.cpp
      • (5)CMakeLists.txt
    • 3. 总结

一、I/O多路转接之 epoll

1. epoll 接口

(1)epoll_create()

首先 epoll_create() 这个接口就是帮我们创建一个 epoll 模型,这个模型是什么我们后面介绍原理的时候再讲。

其中 epoll_create() 的接口如下:

在这里插入图片描述

其中 epoll_create1() 是新标准,我们不介绍。而 epoll_create() 的参数 size 也已经废弃了,这个参数传什么也无所谓了,只要大于 0 就可以了。它的返回值也是一个文件描述符,成功则返回一个新的文件描述符,失败则返回 -1.

(2)epoll_wait()

epoll 模型创建好之后,我们想往这个 epoll 模型中新增一个要关心的 fd 及其事件;修改一个或者删除一个文件描述符及其事件;就需要用到 epoll_wait() 这个接口。该接口如下:

在这里插入图片描述

epoll_wait() 本质就是获取已经就绪的文件描述符。第一个参数 epfd 就是 epoll_create() 的返回值;第二个和第三个参数就是我们将来定义的一个用户级缓冲区,返回已经就绪的 fd 和 事件;最后一个参数的含义和 polltimeout 一模一样,单位为毫秒。而返回值表示已经就绪的文件描述符的个数。

其中我们看到第二个参数中带有 struct epoll_event 这个类型的结构体,这个结构体是什么呢?我们来看一下:

在这里插入图片描述

如上图,epoll_event 中的 events 表示哪些事件,它的类型是 uint32_t,也就是一个位图,和 poll 中的 events 一样,以位图的形式传递标记位事件;而第二个字段 data 的类型 epoll_data_t 是一个联合体,就是可以选择该联合体字段中的任意一个,通常用来保存的是用户级的数据,有关这个字段我们后面再说。

其中 events 可以是以下几个宏的集合:

  • EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
  • EPOLLOUT : 表示对应的文件描述符可以写;
  • EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
  • EPOLLERR : 表示对应的文件描述符发生错误;
  • EPOLLHUP : 表示对应的文件描述符被挂断;
  • EPOLLET : 将 EPOLL 设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的;
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 EPOLL 队列里。

(3)epoll_ctl()

为了使用这个 epoll,首先我们也需要将 listen 套接字添加到 epoll 模型里,所以就需要用到 epoll_ctl() 接口。该接口如下:

在这里插入图片描述

它的作用主要是我们想向系统里新增一个文件描述符,及其要关心的事件,想要修改一个特定的文件描述符关心的事件。所以 epoll_ctl() 支持 epoll 来进行相关的管理工作。

其中第一个参数就是 epoll_create() 的返回值。第二个参数 op 就是以下三个选项的其中一个,分别代表增加,修改,删除:

在这里插入图片描述

第三个参数和第四个参数代表哪一个文件描述符上的哪些事件。

所以我们需要使用 epoll 的话,需要使用以上三个系统调用,而 selectpoll 都只有一个系统调用。

2. epoll 原理

无论是 selectpoll 都是用数组来管理文件描述符和对应的事件,更重要的是该数组这个数据结构是由用户来维护的!接下来我们解释一下 epoll 的原理,就能明白为什么需要三个系统调用来使用 epoll 了。

操作系统在硬件层面,通过硬件中断的方式知道网卡上有数据了,然后通过网卡驱动上的方法将数据拷贝到网卡驱动上的数据链路层。同时,操作系统为了支持 epoll,它为我们提供了三种机制:

  • 操作系统在内部会帮我们维护一颗红黑树

其中红黑树中的节点包含的最重要的字段是:int fduint32_t event,分别代表内核要关心的文件描述符和要关心的事件。

  • 操作系统会为我们维护一个就绪队列

一旦红黑树中有特定的一个节点,比如某个节点上的文件描述符的某个事件就绪了,就可以把该节点添加到就绪队列中;其中该就绪队列中每个节点中的字段包含 int fduint32_t event,分别代表已经就绪的文件描述符和已经就绪的事件。

  • 操作系统的底层网卡驱动是允许操作系统去注册一些回调机制的

操作系统内部会提供一个回调函数,这个回调函数是用来干什么的呢?首先网卡通过硬件中断的方式将数据搬到了网卡驱动层。当网卡驱动层中的数据链路层有数据就绪了,主动会调用该回调函数。然后该回调方法会做如下几个操作:

  1. 向上交付
  2. 交付给 TCP 的接收队列
  3. 根据文件描述符为键值查找红黑树,确认这个接受队列和哪一个文件描述符是关联的,再判断该 fd 是否关心了 EPOLLIN 或者 EPOLLOUT 读写事件,如果有,由于数据已经就绪,所以接下来第四步
  4. 构建就绪节点,插入到就绪队列中

实际上我们用 epoll 的时候,操作系统就会把该回调函数注册到底层,然后底层数据一旦就绪就会自动回调执行上面的四个方法。所以对于用户来说,只需要在就绪队列中获取就绪节点即可!整套机制都是由操作系统完成的!

我们把这三套机制叫做 epoll 模型,如下图:

请添加图片描述

其中 eventpollepoll 对象;epitem 为红黑树的节点。

所以接口 epoll_create() 创建 epoll 模型本质就是创建红黑树,创建就绪队列以及注册底层的回调机制。所以该 epoll 模型怎么让进程找到呢?其实给 epoll 模型放入到 struct file 对象即可,把它也当作文件!因为在 Linux 中一切皆文件!struct file 中也有指针指向 epoll 模型!所以再把该 struct file 对象添加到进程的文件描述符表中即可!

所以 epoll_create() 实质上就是在操作系统中创建 struct file,其中的指针指向整个 epoll 对象,对应的文件描述符就能挂接到进程的文件描述符表中,最后把该文件描述符返回给用户,所以我们就可以通过该文件描述符找到 struct file 并找到 epoll 模型了。epoll_ctl() 实质上的增加、修改、删除都是在对红黑树进行操作。其中 epoll_wait() 的第二个参数是输出型参数,它会将就绪队列中所有就绪的节点一个一个地放进 struct epoll_event 里。

在这里插入图片描述

3. epoll 的优点

基于 epoll 的原理,我们可以得到 epoll 的优势:

  • 检测就绪的时间复杂度为 O(1),因为只需要看队列是否为空就可以了。而获取就绪的时间复杂度为 O(n),因为需要将就绪队列中的节点一个一个拷贝到应用层。
  • fd 和 event 没有上限,因为该红黑树有多大由操作系统说了算
  • 由于该红黑树是操作系统帮我们维护的,所以不需要在用户层由用户维护一个数组这样的数据结构,来管理所有的文件描述符及其要关心的事件了
  • epoll_wait() 的返回值 n,表示有 n 个 fd 就绪了,那么该接口还会将已经就绪的节点放入到它的输出型参数 events 中,所以就绪事件是连续的,有 n 个!这意味着,上层用户处理已经就绪的事件,就不再需要像以前一样检测有哪些 fd 是非法的,哪些是没有就绪的了;只需要根据返回值 n,遍历 events 即可!

4. epoll 的使用

我们对 epoll 的相关接口进行一下简单的封装成为 Epoller.hpp,如下:

				#pragma once#include "NoCopy.hpp"#include "log.hpp"#include <sys/epoll.h>#include <cstring>#include <cerrno> class Epoller : public NoCopy{static const int size = 128;public:Epoller(){_epfd = epoll_create(size);if(_epfd == -1){lg(Fatal, "epoll_create error: %s", strerror(errno));}else{lg(Info, "epoll_create success: %d", _epfd);}}int EpollerWait(struct epoll_event revents[], int num){int n = epoll_wait(_epfd, revents, num, _timeout);return n;}int EpollerCtl(int oper, int sockfd, uint32_t event){   int n = 0;if(oper == EPOLL_CTL_DEL){n = epoll_ctl(_epfd, oper, sockfd, nullptr);if(n != 0){lg(Error, "epoll_ctl delete error!");}}else{// EPOLL_CTL_MOD 或者 EPOLL_CTL_ADD// 设置进内核的红黑树中struct epoll_event ev;ev.events = event;ev.data.fd = sockfd;    // 方便我们后面处理的时候知道是哪一个 fd 就绪了n = epoll_ctl(_epfd, oper, sockfd, &ev);if(n != 0){lg(Error, "epoll_ctl error!");}}return n;}~Epoller(){if(_epfd >= 0){close(_epfd);}}private:int _epfd;int _timeout = 1000;};

接下来编写 epollSever.hpp,如下:

				#pragma once#include <iostream>#include <memory>#include <sys/epoll.h>#include "Socket.hpp"#include "log.hpp"#include "Epoller.hpp"#include "NoCopy.hpp"uint32_t EVENT_IN = (EPOLLIN);uint32_t EVENT_OUT = (EPOLLOUT);class EpollServer : public NoCopy{static const int maxevents = 64;public:EpollServer(uint16_t port):_port(port),_listenSocket_ptr(new Sock()),_epoller_ptr(new Epoller()){}void Init(){_listenSocket_ptr->Socket();_listenSocket_ptr->Bind(_port);_listenSocket_ptr->Listen();lg(Info, "create listen socket success: %d\n", _listenSocket_ptr->GetFd());}void Accepter(){std::string client_ip;uint16_t client_port;int sockfd = _listenSocket_ptr->Accept(&client_ip, &client_port);if(sockfd > 0){// 不能直接读取,而是将它添加到内核的红黑树中,让 epoll 关心即可_epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, sockfd, EVENT_IN);lg(Info, "get a new link, client info@ %s:%d", client_ip.c_str(), client_port);}else{return;}}// for testvoid Recver(int fd){char buffer[1024];ssize_t n = read(fd, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;std::cout << "get a message: " << buffer << std::endl;// writestd::string echo_str = "sever echo $ ";echo_str += buffer;write(fd, echo_str.c_str(), echo_str.size());}else if (n == 0){lg(Info, "client quit, me too, close fd is: %d", fd);// 先在内核红黑树中移除 fd,再关闭 fd_epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, fd, 0);close(fd);}else{lg(Warning, "recv error, fd is: %d", fd);_epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, fd, 0);close(fd);}}void Dispatcher(struct epoll_event revs[], int num){for(int i = 0; i < num; ++i){uint32_t event = revs[i].events;int fd = revs[i].data.fd;if(event & EVENT_IN){// 读事件就绪if(fd == _listenSocket_ptr->GetFd()){// 获取到一个新连接,连接管理器Accepter();}else{// 其它 fd 上的普通读取事件就绪Recver(fd);}}else if(event & EVENT_OUT){// 写事件就绪// ...}else{// ...}}}void Start(){// 将 listenSocket 添加到 epoll 中// 也就是将 listenSocket 和它所关心的事件添加到内核 epoll 模型中的红黑树中!_epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, _listenSocket_ptr->GetFd(), EVENT_IN);struct epoll_event revs[maxevents];while(true){// 其中 n 最大是 maxeventsint n = _epoller_ptr->EpollerWait(revs, maxevents);if(n > 0){// 有事件就绪,分派事件lg(Debug, "event happend, fd is: %d", revs[0].data.fd);Dispatcher(revs, n); }else if(n == 0){lg(Info, "time out...");}else {lg(Error, "epoll wait error");}}}~EpollServer(){_listenSocket_ptr->Close(); }private:std::shared_ptr<Sock> _listenSocket_ptr;std::shared_ptr<Epoller> _epoller_ptr;uint16_t _port;};

我们上面两个模块都用到了 NoCopy 这个类,也就是禁止拷贝,代码如下:

				#pragma onceclass NoCopy{public:NoCopy(){}NoCopy(const NoCopy&) = delete;const NoCopy& operator=(const NoCopy&) = delete;};

5. epoll 的工作模式

(1)水平触发 Level Triggered 工作模式(LT 模式)

epoll 默认所处的工作模式就是 LT 模式。例如我们上面所写的简单的 epoll 服务器,每次有新的连接到来时,如果我们不处理它,epoll 会每次都通知我们有连接到来了。这种一旦有新的连接到来,或者有新的数据到来,上层如果不取走,底层就会一直通知上层,让上层把数据尽快取走,这种模式就叫做 LT 模式。就像示波器中的高电平,一直有效。

(2)边缘触发 Edge Triggered 工作模式(ET 模式)

ET 模式指的是,数据或者连接,从无到有,从有到多,变化的时候,才会通知我们一次。正是因为 ET 模式有这种特点,才会倒逼程序员每次通知都必须把本轮数据全部取走,怎么保证数据全部取走呢?所以就需要循环读取,直到读取出错!但是我们使用 read() 或者 recv() 在缓冲区中读取数据的时候,当缓冲区的数据没有了,因为它们的读取方式默认是阻塞的,所以此时就会阻塞,服务器就会被挂起!所以我们在 ET 模式下,所有的 fd 必须是要设置为非阻塞的!

(3)LT 和 ET 的对比

  • selectpoll 其实也是工作在 LT 模式下;epoll 既可以支持 LT,也可以支持 ET

  • 普遍地我们认为,ET 的工作模式比 LT 的工作模式通知效率更高,因为通知一次就可以倒逼上层把全部数据读取走。同时也看得出来 ET 模式的 IO 效率也更高,这也就意味着,TCP 会向对方通告一个更大的窗口,从而从概率上让对方一次给自己发送更多的数据!

  • 所谓的 LT 模式和 ET 模式,本质就是向就绪队列中放入多个或者一个就绪的事件

  • 但是 ET 模式就一定比 LT 模式的效率高吗?不一定!因为 LT 也可以将所有的 fd 设置为非阻塞,然后循环读取,也就是当通知一次的时候,就把数据全部取走了,就和 ET 一样了!所以谁的效率高不一定,要看具体的实现。

二、Reactor

1. 概念

我们在上面编写的 epoll 服务器的代码中,在其他普通的 fd 读取事件就绪时,也就是在 Recver() 中,读取是有问题的,因为我们不能区分每次读取上来的数据是一个完整的报文。另外还有其它各种问题,所以我们要对上面的代码使用 Reactor 的设计模式作修改。

所谓的 Reactor 是一种设计模式,翻译过来称为反应堆模式。用于处理事件驱动的系统中的并发操作。它提供了一种结构化的方式来处理输入事件,并将其分发给相应的处理程序。Reactor 模式通常用于网络编程中,特别是在服务器端应用程序中。

要进行正确的 IO 处理,就应该有如下的理解:在应用层一定存在大量的连接,每一个连接在应用层都叫做文件描述符。而在读取每一个文件描述符上的数据的时候,可能根本就没有读取完,此时我们就需要把该文件描述符上的数据临时保存起来。所以我们在写服务器的时候,我们要保证每一个文件描述符及其连接及其缓冲区,都是独立的!

2. 实现

(1)Epoller.hpp

Epoller.hpp 是对 epoll 的系统调用的封装,代码如下:

			#pragma once#include "NoCopy.hpp"#include "log.hpp"#include <sys/epoll.h>#include <cstring>#include <cerrno> class Epoller : public NoCopy{static const int size = 128;public:Epoller(){_epfd = epoll_create(size);if(_epfd == -1){lg(Fatal, "epoll_create error: %s", strerror(errno));}else{lg(Info, "epoll_create success: %d", _epfd);}}int EpollerWait(struct epoll_event revents[], int num, int timeout){int n = epoll_wait(_epfd, revents, num, timeout);return n;}int EpollerCtl(int oper, int sockfd, uint32_t event){   int n = 0;if(oper == EPOLL_CTL_DEL){n = epoll_ctl(_epfd, oper, sockfd, nullptr);if(n != 0){lg(Error, "epoll_ctl delete error! sockfd: %d", sockfd);}}else{// EPOLL_CTL_MOD 或者 EPOLL_CTL_ADD// 设置进内核的红黑树中struct epoll_event ev;ev.events = event;ev.data.fd = sockfd;    // 方便我们后面处理的时候知道是哪一个 fd 就绪了n = epoll_ctl(_epfd, oper, sockfd, &ev);if(n != 0){lg(Error, "epoll_ctl error!");}}return n;}~Epoller(){if(_epfd >= 0){close(_epfd);}}private:int _epfd;int _timeout = 1000;};

(2)TcpServer.hpp

TcpServer.hpp 是处理 IO 的服务器,代码如下:

			#pragma once#include <iostream>#include <string>#include <unordered_map>#include <memory>#include <functional>#include <cerrno>  #include "log.hpp"#include "NoCopy.hpp"#include "Epoller.hpp"#include "Socket.hpp"#include "Comm.hpp"// 设置 ET 模式uint32_t EVENT_IN = (EPOLLIN | EPOLLET);uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);const static int g_buffer_size = 128;class Connection;class TcpServer;using func_t = std::function<void(std::weak_ptr<Connection>)>;using except_t = std::function<void(std::weak_ptr<Connection>)>;// 管理每一个连接class Connection {public:Connection(int sockfd):_sockfd(sockfd){}void SetHandler(func_t recv_cb, func_t send_cb, except_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}~Connection(){}private:int _sockfd;// 充当缓冲区std::string _inbuffer; std::string _outbuffer;public:// 回指指针// std::shared_ptr<TcpServer> _tcpServer_ptr;std::weak_ptr<TcpServer> _tcpServer_ptr;// 回调方法func_t _recv_cb;func_t _send_cb;except_t _except_cb;std::string _ip;uint16_t _port;int Sockfd(){return _sockfd;}void AppendInBuffer(const std::string& info){_inbuffer += info;}void AppendOutBuffer(const std::string& info){_outbuffer += info;}std::string& Inbuffer(){return _inbuffer;}std::string& Outbuffer(){return _outbuffer;}void SetWeakPtr(std::weak_ptr<TcpServer> tcpServer_ptr){_tcpServer_ptr = tcpServer_ptr;}};// enable_shared_from_this 可以提供返回当前对象的 this 对应的 shared_ptrclass TcpServer : public std::enable_shared_from_this<TcpServer>, public NoCopy{static const int num = 64;public:TcpServer(uint16_t port, func_t OnMessage):_port(port),_quit(true),_OnMessage(OnMessage),_epoller_ptr(new Epoller()),_listenSock_ptr(new Sock()){}void Init(){_listenSock_ptr->Socket();// 将 fd 设置为非阻塞SetNonBlockOrDie(_listenSock_ptr->GetFd());_listenSock_ptr->Bind(_port);_listenSock_ptr->Listen();lg(Info, "create listen socket success: %d\n", _listenSock_ptr->GetFd());AddConnection(_listenSock_ptr->GetFd(), EVENT_IN, \std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);}void AddConnection(int sockfd, uint32_t event, func_t recv_cb, func_t send_cb, except_t except_cb,\const std::string& ip = "0.0.0.0", uint16_t port = 0){// 1. 给所有的套接字建立一个 connection 对象std::shared_ptr<Connection> new_connection(new Connection(sockfd));// shared_from_this(): 返回当前对象的 shared_ptrnew_connection->SetWeakPtr(shared_from_this()); new_connection->SetHandler(recv_cb, send_cb, except_cb);new_connection->_ip = ip;new_connection->_port = port;// 2. 将套接字和 Connection 添加到 unordered_map 中_connections.insert(std::make_pair(sockfd, new_connection));// 3. 将 listen 套接字或其它事件添加到 epoll 模型中_epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, sockfd, event);lg(Debug, "add a new connection success, sockfd is: %d", sockfd);}// listen 套接字的连接管理器,即有事件就绪的时候,就是有连接到来,就需要处理新连接void Accepter(std::weak_ptr<Connection> conn){auto connection = conn.lock();// 不断检测是否还有新连接,直到读取出错while(true){struct sockaddr_in peer;socklen_t len = sizeof(peer);int sockfd = ::accept(connection->Sockfd(), (sockaddr*)&peer, &len);// 获取到新连接设置为非阻塞,然后构建 Connection 对象放入哈希表中和内核红黑树中if(sockfd > 0){uint16_t peer_port = ntohs(peer.sin_port);char ipbuffer[128];inet_ntop(AF_INET, &peer.sin_addr, ipbuffer, sizeof(ipbuffer));lg(Debug, "get a new client, get info-> [%s: %d], sockfd: %d", ipbuffer, peer_port, sockfd);SetNonBlockOrDie(sockfd);AddConnection(sockfd, EVENT_IN, \std::bind(&TcpServer::Recver, this, std::placeholders::_1),\std::bind(&TcpServer::Sender, this, std::placeholders::_1),\std::bind(&TcpServer::Excepter, this, std::placeholders::_1),\ipbuffer, peer_port);}else{if(errno == EWOULDBLOCK){   // 读取完毕break;}else if(errno == EINTR){ // 信号原因中断continue;}else{break;}}}}// 普通事件的事件管理器// 对于服务器而言只需要进行IO,不需要关心是否读完和报文的格式void Recver(std::weak_ptr<Connection> conn){if(conn.expired()) return;auto connection = conn.lock();int sockfd = connection->Sockfd();while(true){char buffer[g_buffer_size];memset(buffer, 0, sizeof(buffer));ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);   // 非阻塞读取if(n > 0){connection->AppendInBuffer(buffer);}else if(n == 0){lg(Info, "sockfd: %d, client info %s: %d quit...", sockfd, connection->_ip.c_str(), connection->_port);connection->_except_cb(connection);}else{if(errno == EWOULDBLOCK){break;}else if(errno == EINTR){continue;}else{lg(Warning, "sockfd: %d, client info %s: %d recv error...", sockfd, connection->_ip.c_str(), connection->_port);connection->_except_cb(connection);return;}}}// 交给上层处理,读取到的数据都在 connection 中// 1. 检测// 2. 如果有完整报文,就处理_OnMessage(connection);}void Sender(std::weak_ptr<Connection> conn){if(conn.expired()) return;auto connection = conn.lock();auto& outbuffer = connection->Outbuffer();while(true){ssize_t n = send(connection->Sockfd(), outbuffer.c_str(), outbuffer.size(), 0);if(n > 0){outbuffer.erase(0, n);if(outbuffer.empty()){break;}}else if(n == 0){return;}else{if(errno == EWOULDBLOCK){break;}else if(errno == EINTR){continue;}else{lg(Warning, "sockfd: %d, client info %s: %d send error...", connection->Sockfd(), connection->_ip.c_str(), connection->_port);connection->_except_cb(connection);return;}}}// 没发完if(!outbuffer.empty()){// 开始对写事件关心EnableEvent(connection->Sockfd(), true, true);}else{// 关闭对写事件关心EnableEvent(connection->Sockfd(), true, false);}}void Excepter(std::weak_ptr<Connection> connection){if(connection.expired()) return;auto conn = connection.lock();lg(Debug, "Excepter hander sockfd: %d, client info %s: %d excepter handler", \conn->Sockfd(), conn->_ip.c_str(), conn->_port);// 1. 移除对特定 fd 的关心// EnableEvent(connection->Sockfd(), false, false);_epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, conn->Sockfd(), 0);// 2. 关闭异常的 fdlg(Debug, "close %d done...\n", conn->Sockfd());close(conn->Sockfd());// 3. 从 _connections 中移除 fd 和 Connection 的映射关系lg(Debug, "remove %d from _connections...\n", conn->Sockfd());_connections.erase(conn->Sockfd());}void EnableEvent(int sockfd, bool readAble, bool writeAble){uint32_t events = 0;events |= ((readAble ? EPOLLIN : 0) | (writeAble ? EPOLLOUT : 0) | EPOLLET);_epoller_ptr->EpollerCtl(EPOLL_CTL_MOD, sockfd, events);}bool IsConnectionExist(int fd){auto iter = _connections.find(fd);if(iter == _connections.end()){return false;}else{return true;}}void Dispatcher(int timeout){int n = _epoller_ptr->EpollerWait(revs, num, timeout);for(int i = 0; i < n; i++){uint32_t event = revs[i].events;int sockfd = revs[i].data.fd;// 一旦事件异常,统一把异常转换为读写问题if(event & EPOLLERR){event |= (EPOLLIN | EPOLLOUT);}if(event & EPOLLHUP){event |= (EPOLLIN | EPOLLOUT);}// 只需要处理读写if((event & EPOLLIN) && IsConnectionExist(sockfd)){if(_connections[sockfd]->_recv_cb){_connections[sockfd]->_recv_cb(_connections[sockfd]);}}if((event & EPOLLOUT) && IsConnectionExist(sockfd)){if(_connections[sockfd]->_send_cb){_connections[sockfd]->_send_cb(_connections[sockfd]);}}}}void Loop(){_quit = false;// AddConnection();while(!_quit){// 事件派发// Dispatcher(3000);Dispatcher(-1);PrintConnection();}_quit = true;}void PrintConnection(){std::cout << "_connections fd list: ";for(auto& connection : _connections){std::cout << connection.second->Sockfd() << ", ";std::cout << "inbuffer: " << connection.second->Inbuffer() << " ";}std::cout << std::endl;}~TcpServer(){}private:std::shared_ptr<Epoller> _epoller_ptr;std::shared_ptr<Sock> _listenSock_ptr;   uint16_t _port;bool _quit;struct epoll_event revs[num];// fd 到对应连接到映射,_connections 就是当前服务器管理的所有连接std::unordered_map<int, std::shared_ptr<Connection>> _connections;// 让上层处理信息func_t _OnMessage;};

(3)Calculator.hpp

Calculator.hpp 是上层处理业务的具体处理方法,代码如下:

			#pragma once#include <string>#include <iostream>#include "Protocol.hpp"enum{DIV_ERR = 1,MOD_ERR = 2,OP_ERR = 3};// 上层业务class Calculator{public:Calculator(){}Response CalculatorHelper(const Request &req){Response resp(0, 0);switch (req._op){case '+':resp._result = req._x + req._y;break;case '-':resp._result = req._x - req._y;break;case '*':resp._result = req._x * req._y;break;case '%':{if (req._y == 0)resp._code = MOD_ERR;elseresp._result = req._x % req._y;}break;case '/':{if (req._y == 0)resp._code = DIV_ERR;elseresp._result = req._x / req._y;}break;default:resp._code = OP_ERR;break;}return resp;}// "len"\n"10 + 20"\nstd::string Handler(std::string &package){std::string content;bool ret = Decode(package, &content); // content = "10 + 20"if (!ret)return "";Request req;ret = req.Deserialize(content); // x = 10, y = 20, op = '+'if (!ret)return "";content = "";Response resp = CalculatorHelper(req); // result = 30, code = 0resp.Serialize(&content);              // content = "30 0"content = Encode(content);             // content = "len"\n"30 0\n"return content;}~Calculator(){}};

(4)main.cpp

下面是主函数的调用:

			#include <iostream>#include <memory>#include <functional>#include "TcpServer.hpp"    // 处理IO#include "Calculator.hpp"   // 处理业务#include "log.hpp"Calculator calculator;void DefaultOnMessage(std::weak_ptr<Connection> conn){if(conn.expired()) return;auto connection_ptr = conn.lock();std::cout << "Application layerget a message: " << connection_ptr->Inbuffer() << std::endl;// 对报文进行处理std::string response_str = calculator.Handler(connection_ptr->Inbuffer());if(response_str.empty()){return;}lg(Debug, "%s", response_str.c_str());// response_str 发送出去connection_ptr->AppendOutBuffer(response_str);// 因为写事件(发送缓冲区是否有空间,经常是ok的),经常是就绪的// 所以如果我们设置对 EPOLLOUT 关心,那么 EPOLLOUT 几乎每次都是就绪的// 就导致 epollserver 经常返回,浪费 CPU 资源// 所以,对于读取,我们设置为常关心;对于写,我们设置为按需设置// 处理写事件:直接写入,如果写入完成,就结束。// 如果写入完成,但是数据还没有写完,_outbuffer 里还有内容,我们就需要设置对写事件进行关心了,如果写完了,就去掉写事件的关心// connection_ptr->_send_cb(connection_ptr);auto tcpserver = connection_ptr->_tcpServer_ptr.lock();tcpserver->Sender(connection_ptr);}int main(){ std::shared_ptr<TcpServer> tcp_svr(new TcpServer(8888, DefaultOnMessage));tcp_svr->Init();tcp_svr->Loop();return 0;}

其中有一些头文件例如 Socket.hpp 和 log.hpp 我们以前已经用过,这里就不再放出来了。

(5)CMakeLists.txt

			cmake_minimum_required(VERSION 2.8)project(ReactorServer)add_executable(reactorServer main.cc)target_link_libraries(reactorServer jsoncpp)add_executable(clientCal ClientCal.cc)target_link_libraries(clientCal jsoncpp)

3. 总结

Reactor 其实是一个半同步半异步模型,那么 IO 等于等待+数据拷贝,所以 Reactor 的半同步半异步体现在,等待是由 epoll 完成,这是体现同步;异步体现在 Reactor 可以进行回调处理。

Reactor 模式中,有一个事件循环(Event Loop)负责监听和分发事件。当有新的事件到达时,事件循环会将其分发给相应的处理程序进行处理。这种方式可以实现高效的并发处理,避免了线程创建和销毁的开销。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/793868.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

实验四 Java图形界面与事件处理(头歌)

实验四 Java图形界面与事件处理(头歌) 制作不易&#xff01;点个关注&#xff01;给大家带来更多的价值&#xff01; 目录 实验四 Java图形界面与事件处理(头歌) 制作不易&#xff01;点个关注&#xff01;给大家带来更多的价值&#xff01;代码如下&#xff1a; 代码如下&…

case语句

Oracle从入门到总裁:​​​​​​https://blog.csdn.net/weixin_67859959/article/details/135209645 CASE 语句的执行方式与 IF...THEN...ELSIF 语句的执行方式类似&#xff0c;但是它是通过一个表达式的值来决定执行哪个分支 CASE 选择器表达式 WHEN 条件 1 THEN 语句序列 …

Linux: linux常见操作指令

目录 01.ls 指令 02. pwd命令 03. cd 指令 04. touch指令 05.mkdir指令&#xff08;重要&#xff09; 06.rmdir指令 && rm 指令&#xff08;重要&#xff09; 07.man指令&#xff08;重要&#xff09; 07.cp指令&#xff08;重要&#xff09; 08.mv指令&#…

H.264 压缩与编解码原理

H.264 压缩与编解码原理 H.264 压缩与编解码原理H.264 简介视频编码的总体思路H.264 压缩技术帧内预测压缩什么是空间冗余&#xff1f;具体预测方法 帧间预测压缩什么是时间冗余&#xff1f;具体预测方法&#xff1a;运动估计 概念&#xff1a;Group of Pictures&#xff08;GO…

java-网络编程socket-聊天室-先导

这边我会简单介绍一下聊天室的组成部分,和思路的引导 涉及知识点 java 中异常处理机制 和 io流和网络编程socket 简单回顾异常机制 Java中的异常机制是一种用于处理程序运行期间出现的错误或异常情况的机制。这种机制允许程序员定义在特定情况下可能发生的错误&#xff0c;并…

mysql慢sql排查与分析

当MySQL遇到慢查询&#xff08;慢SQL&#xff09;时&#xff0c;我们可以通过以下步骤进行排查和优化&#xff1a; 标题开启慢查询日志&#xff1a; 确保MySQL的慢查询日志已经开启。通过查看slow_query_log和slow_query_log_file变量来确认。 如果没有开启&#xff0c;可以…

闻风丧胆的算法(二)

&#x1f308;个人主页&#xff1a;Rookie Maker &#x1f525; 系列专栏&#xff1a;算法 &#x1f3c6;&#x1f3c6;关注博主&#xff0c;随时获取更多关于IT的优质内容&#xff01;&#x1f3c6;&#x1f3c6; &#x1f600;欢迎来到我的代码世界~ &#x1f601; 喜欢的小…

小林coding图解计算机网络|基础篇01|TCP/IP网络模型有哪几层?

小林coding网站通道&#xff1a;入口 本篇文章摘抄应付面试的重点内容&#xff0c;详细内容还请移步&#xff1a; 文章目录 应用层(Application Layer)传输层(Transport Layer)TCP段(TCP Segment) 网络层(Internet Layer)IP协议的寻址能力IP协议的路由能力 数据链路层(Link Lay…

一文介绍回归和分类的本质区别 !!

文章目录 前言 1、回归和分类的本质 &#xff08;1&#xff09;回归&#xff08;Regression&#xff09;的本质 &#xff08;2&#xff09;分类&#xff08;Classification&#xff09;的本质 2、回归和分类的原理 &#xff08;1&#xff09;回归&#xff08;Regression&#x…

Vue3(学自尚硅谷)

一、基础准备工作 &#xff08;一&#xff09;过程 环境要求&#xff1a;有node.js环境、npm。执行命令&#xff1a; npm create vuelatest 而后选择&#xff1a; ✔ 请输入项目名称&#xff1a; … me_vue3 ✔ 是否使用 TypeScript 语法&#xff1f; … 否 / 是 ✔ 是否启用…

干货| 这篇电商数据分析案例一定要看!

主流电商商品数据采集API接口 直播带货行业在经历了高端玩家的“春秋争霸”之后&#xff0c;逐渐进入到了一种“网红化”的阶段。人们正在将注意力从原来凤毛麟角的直播巨头逐渐转移到一些小主播身上。但近短时间却出现了网红带货营销额放缓的现象。因此商家必须要调整直播策略…

【数据库】数据库的介绍、分类、作用和特点,AI人工智能数据如何存储

欢迎来到《小5讲堂》&#xff0c;大家好&#xff0c;我是全栈小5。 这是《数据库》系列文章&#xff0c;每篇文章将以博主理解的角度展开讲解&#xff0c; 特别是针对知识点的概念进行叙说&#xff0c;大部分文章将会对这些概念进行实际例子验证&#xff0c;以此达到加深对知识…

互联网轻量级框架整合之JavaEE基础II

编写本篇代码并实际执行之前请仔细阅读前一篇互联网轻量级框架整合之JavaEE基础I Servlet 在Servlet容器中&#xff0c;Servlet是最基础的组件&#xff0c;也可以把JSP当做Servlet&#xff0c;JSP的存在意义只在于方便编写动态页面&#xff0c;使Java语言能和HTML相互结合&…

产品推荐 | 中科亿海微推出亿迅®A8000金融FPGA加速卡

01、产品概述 亿迅A8000金融加速卡&#xff0c;是中科亿海微联合金融证券领域的战略合作伙伴北京睿智融科&#xff0c;将可编程逻辑芯片与金融行业深度结合&#xff0c;通过可编程逻辑芯片对交易行情加速解码&#xff0c;实现低至纳秒级的解码引擎&#xff0c;端到端的处理时延…

Linux gcc day3

find命令&#xff08;importance&#xff09;&#xff1a; 语法&#xff1a;find pathname -options find /root -name test.c which命令&#xff1a; which [指令] 只搜索指令&#xff0c;在什么位置下 为什么文件夹带有颜色呢&#xff1f; 科普补充alias命令&#xff1a; ali…

C++:赋值运算符(17)

赋值也就是将后面的值赋值给变量&#xff0c;这里最常用的就是 &#xff0c;a1那么a就是1&#xff0c;此外还包含以下的赋值运算 等于int a 1; a10 a10加等于int a 1; a1;a2-减等于int a 1; a-1;a0*乘等于int a 2; a*5;a10/除等于int a 10; a/2;a5%模等于int a 10; a%…

kafka集群介绍+部署Filebeat+Kafka+ELK

一、消息队列 1、为什么需要消息队列&#xff08;MQ&#xff09; 主要原因是由于在高并发环境下&#xff0c;同步请求来不及处理&#xff0c;请求往往会发生阻塞。比如大量的请求并发访问数据库&#xff0c;导致行锁表锁&#xff0c;最后请求线程会堆积过多&#xff0c;从而触…

Mac电脑清理垃圾软件 Mac电脑清理垃圾的文件在哪 cleanMyMac X 4.8.0激活号码

Mac用户经常会有这样一些烦恼&#xff0c;比如软件之间的管理&#xff0c;应用生成的缓冲文件怎样删除&#xff0c;还有软件的卸载等等... 如何有效清理Mac中的垃圾文件&#xff0c;删除多余的软件成为Mac用户迫切的需求。本文就为大家介绍几款好用的Mac电脑清理垃圾软件&#…

在线考试|基于Springboot的在线考试管理系统设计与实现(源码+数据库+文档)

在线考试管理系统目录 目录 基于Springboot的在线考试管理系统设计与实现 一、前言 二、系统设计 三、系统功能设计 1、前台&#xff1a; 2、后台 管理员功能 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 博主…

Redis中的Sentinel(二)

Sentinel 初始化Sentinel状态。 在应用了Sentinel的专用代码之后&#xff0c;接下来&#xff0c;服务器会初始化一个sentinel.c/sentinelState结构(简称Sentinel状态),这个结构 保存了服务器中所有和Sentinel功能有关的状态(服务器的一般状态仍然由redis.h/redisServer保存);…