Reactor网络模式

文章目录

  • 1. 关于Reactor模式的了解
  • 2. 基于Reactor模式实现epoll ET服务器
    • 2.1 EventItem类的实现
    • 2.2 Reactor类的实现
      • Dispatcher函数
      • AddEvent函数
      • DelEvent函数
      • EnableReadWrite函数
    • 2.3 四个回调函数的实现
      • acceptor回调函数
      • recver回调函数
      • sender回调函数
      • errorer回调函数
  • 3. epoll ET服务器的运行
  • 4. 线程池的使用

1. 关于Reactor模式的了解

Reactor反应器模式,也叫做分发者模式或通知者模式,是一种将就绪事件派发给对应服务处理程序的设计模式。
在这里插入图片描述

在这里插入图片描述

Reactor模式的五个角色构成

  1. 句柄:文件描述符
  2. 同步事件分离器:本质就是一个系统调用,用于等待事件的发生。对于Linux来说,这个角色就是IO多路复用,select、poll、epoll等。
  3. 事件处理器:由多个回调方法构成,这些回调方法构成了与应用相关的对于某个事件的处理反馈。
  4. 具体事件处理器:事件处理器中各个回调方法的具体实现。
  5. 初识分发器:它本质上就是Reactor角色,初始分发器会通过同步事件分离器来等待事件的就绪,当对应事件就绪时就调用事件处理器,最后调用对应的回调方法来处理这个事件。

Reactor模式的工作流程

  1. 当应用向初识分发器注册具体事件处理器时,应用会标识出该事件处理器希望初识分发器在某个事件发生时向其通知,该事件与Handle关联。
  2. 初识分发器会要求每个事件处理器向其传递内部的Handle,该Handle向操作系统标识了事件处理器。
  3. 当所有的事件处理器注册完毕后,应用会启动初识分发器的事件循环,这时初识分发器会将每个事件处理器的Handle葛冰起来,并使用同步事件分离器来等待这些事件的发生。
  4. 初识分发器会将Ready状态的Handle作为key,来寻找其对应的事件处理器。
  5. 初识分发器会将Ready状态的Handle作为key,来寻找其对应的事件处理器。
  6. 初识分发器会调用其对应事件处理器当中对应的回调方法来响应该事件。

2. 基于Reactor模式实现epoll ET服务器

下面我们根据Reactor的五个角色构成以及其工作流程,实现一个Reactor模式下的epoll服务器,从而更直观地感受一下Reactor模式。

2.1 EventItem类的实现

EventItem类的介绍

  • 在Reactor的工作流程中说到,在注册事件处理器时需要将其与Handle关联,本质上就是需要将读回调、写回调和异常回调等与某个文件描述符关联起来。
  • 这样做的目的就是为了当某个文件描述符上的事件就绪时可以找到其对应的各种回调函数,进而执行对应的回调方法来处理该事件。

EventItem类的设计

所以我们可以设计一个EventItem类。,该类中有以下成员:

  1. 文件描述符Handle。
  2. 对应的读回调函数指针、写回调函数指针、异常回调函数指针。
  3. 输入缓冲区inbuffer和输出缓冲区outbuffer。
  4. 回指指针R,指向我们定义的Reactor对象。

对于前两种成员文章上面已经介绍过了,下面介绍一下后两种成员的作用。

  • 为什么要有inbuffer?当某个文件描述符的读事件就绪时,我们会调用recv函数读取客户端发送过来的数据,但是我们其实并不能保证我们读取到的是一个完整的报文,因此需要将读取到的数据暂时保存到该文件描述符对应的inbuffer中,当inbuffer当中可用分离出一个完整的报文后再将其分离出来进行数据处理,这里inbuffer的本质就是用来解决粘包问题的。
  • 为什么要有outbuffer?当处理完一个请求报文后,需要将响应数据发送给客户端,但我们并不能保证底层TCP的发送缓冲区中足够的空间功,因此需要将发送的数据暂时保存在文件描述符对应对应outbuffer中,当底层TCP的发送缓冲区中有足够的空间时,即当写事件就绪时,再一次发送outbuffer当中的数据。
  • 为什么要有回指指针R?后续我们需要根据EventItem对象去寻找Reactor对象,比如当连接事件就绪时,需要调用Reactor类中的AddEventt函数将其添加到Dispatcher当中。有了回指指针R,可以让我们快速地找到对应的Reactor对象。

并且,EventItem类当中需要提供一个管理回调的成员函数,便于外部对EventItem结构当中的各种回调进行设置。

EventItem类代码如下:

class EventItem
{
public:int _sock; // 文件描述符Reactor *_R; // 回指指针callback_t _recv_handler; // 读回调callback_t _send_handler; // 写回调callback_t _error_handler; // 异常回调std::string _inbuffer; // 输入缓冲区std::string _outbuffer; // 输出缓冲区public:EventItem(): _sock(-1), _R(nullptr), _recv_handler(nullptr), _send_handler(nullptr), _error_handler(nullptr){}// 管理回调void ManageCallbacks(callback_t recv_handler, callback_t send_handler, callback_t error_handler){_recv_handler = recv_handler;_send_handler = send_handler;_error_handler = error_handler;}~EventItem() {}
};

2.2 Reactor类的实现

Reactor类的介绍

  • 在Reactor的工作流程中提到,当所有的时间处理器注册完毕之后,会使用同步事件分离器等待这些事件发生,当某个事件处理器的Handle变为Ready状态时,同步事件分离器会通知初识分发器,然后初识分发器会将Ready状态的Handle作为key来寻找其对应的事件处理器,并调用该事件处理器的回调方法来响应该事件。
  • 本质就是当事件注册完毕之后,会调用epoll_wait函数来等待这些事件的就绪,当某个事件就绪时epoll_wait函数会告知调用方,然后调用方就会根据就绪的文件描述符来找到其对应的各种回调函数,并调用其对应的回调函数进行事件处理。

Reactor类的设计

  • 该类当中有一个成员函数叫做Dispatcher,这个函数其实就是所谓的初识分发器,在该函数内部会调用epoll_wait函数等待事件的发生,当事件发生后会告知Dispatcher已经就绪的事件。
  • 当事件就绪之后根据就绪的文件描述符来找到其对应的各种回调函数,由于我们会将每个文件描述符及其对应的各种回调函数都封装到一个EventItme结构当中,所以实际我们就是要根据文件描述符找到其对应的EventItem结构。
  • 我们可以使用哈希表,建立各个文件描述符与其对应的EventItem结构之间的映射,这个哈希表可以作为Reactor类中的一个成员变量,当需要找某个文件描述符对应的EventItem结构时就可以根据该成员变量找到。
  • 当然,Reactor类当中还需要提供成员函数AddEvent和DelEvent,用于向Dispatcher当中注册和删除事件。

Reactor类的基本成员变量以及epoll模型创建的代码

#define SIZE 256
#define MAX_NUM 64class Reactor
{
private:int _epfd; // epoll模型std::unordered_map<int, EventItem> _event_items; // 建立sock与EventItem结构的映射public:Reactor() : _epfd(-1) {}void InitReactor(){// 创建epoll模型_epfd = epoll_create(SIZE);if (_epfd < 0){std::cerr << "epoll_create error" << std::endl;exit(5);} }~Reactor(){if (_epfd >= 0) close(_epfd);}
};

Dispatcher函数

Reactor类当中的Dispatcher函数就是之前所说的初识分发器,这里我们可以将其更形象地称为事件分派器。

  • 事件分派器要做的就是调用epoll_wait函数等待事件发生。
  • 当某个文件描述上的事件发生后,使用哈希表根据文件描述符找到对应的EventItem结构,然后调用EventItem结构当对应的回到函数对该事件进行处理即可。
    // 事件分派器void Dispatcher(int timeout){struct epoll_event revs[MAX_NUM];int num = epoll_wait(_epfd, revs, MAX_NUM, timeout);for (int i = 0; i < num; ++i){int sock = revs[i].data.fd;if ((revs[i].events & EPOLLIN) || (revs[i].events & EPOLLHUP)){// 优先处理异常事件就绪if (_event_items[sock]._error_handler)_event_items[sock]._error_handler(&_event_items[sock]);}if (revs[i].events & EPOLLIN){if (_event_items[sock]._recv_handler)_event_items[sock]._recv_handler(_event_items[sock]);}if (revs[i].events & EPOLLOUT){if (_event_items[sock]._send_handler)_event_items[sock]._send_handler(&_event_items[sock]);}}}
  • 这里没有用switch语句或者if语句对epoll_wait函数的返回值进行判断,而是借用for循环对其返回值进行了判断。
  • 如果epoll_wait的返回值为-1则说明epoll_wait函数调用失败,此时不会进入到for内部。
  • 如果epoll_wait的返回值为0则说明epoll_wait函数超时返回,此时也不好进入for循环内部。
  • 如果epoll_wait的返回值大于0则说明epoll_wait函数调用成功,此时才会进入到for循环内部调用对应的回调函数进行事件处理。
  • 事件处理时最好先对异常事件进行处理,因此代码中将异常事件的判断放在了最前面。

AddEvent函数

Reactor类当中的AddEvent函数是用于进行事件注册的。

  • 在注册事件时需要传入一个文件描述符和一个事件集合,表示需要监视哪个文件描述符上的哪些事件。
  • 还需要传入该文件描述符对应的EventItem结构,表示该文件描述符上的事件就绪后一个执行的回调方法。
  • 在AddEvent函数内部要做的就是,调用epoll_ctl函数将该文件描述符及其对应的事件集合注册到epoll模型当中,然后建立该文件描述符与其对应的EventItem结构的映射关系。
    void AddEvent(int sock, uint32_t event, const EventItem &item){struct epoll_event ev;ev.data.fd = sock;ev.events = event;if (epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev) < 0){std::cerr << "epoll_ctl add error, fd: " << sock << std::endl;return;}// 建立sock与EventItem直接的结构映射关系_event_items.insert({sock, item});std::cout << "添加:" << sock << " 到epoll模型中,成功" << std::endl;}

DelEvent函数

  • Reactor类当中的DelEvent函数是用于事件删除的。
  • 在删除事件时只需要传入一个文件描述符即可。
  • 在DelEvent函数内部要做的就是,调用epoll_ctl函数将该文件描述符从epoll模型中删除,并取消该文件描述符与其对应的EventItem结构的映射关系。
    void DelEvent(int sock){if (epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr) < 0){std::cerr << "epoll_ctl del error, fd: " << sock << std::endl;return;}// 取消sock与EventItem之间的映射关系_event_items.erase(sock);std::cout << "从epoll模型中删除:" << sock << " 成功" << std::endl;}

EnableReadWrite函数

  • Reactor类当中的EnableReadWrite函数,用于使能某个文件描述符的读写事件。
  • 调用EnableReadWrite函数需要传入一个文件描述符,表示需要设置的是哪个文件描述符的事件。
  • 还需要传入两个bool值,分别表示是否需要使能读和写事件。
  • EnableReadWrite函数内部会调用epoll_ctl函数修改该文件描述符的监听事件。
    void EnableReadWrite(int sock, bool read, bool write){struct epoll_event ev;ev.data.fd = sock;// EPOLLET表示当前epoll服务器为边缘触发模式ev.events = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET;if (epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &ev) < 0){std::cerr << "epoll mod error, fd: " << sock << std::endl;}}

2.3 四个回调函数的实现

下面介绍并实现四个回调函数

  • acceptor:当连接事件到来时可以调用该回调函数获取底层建立好的连接。
  • recver:当读事件就绪时可以调用该回调函数读取客户端发来的数据并进行处理。
  • sender:当写事件就绪时可以调用该回调函数向客户端发送响应数据。
  • errorer:当异常事件就绪时可以调用该函数将对应的文件描述符关闭。

当我们为某个文件描述符创建EventItem结构时,就可以调用EventItem类提供的ManageCallbacks函数,将这些回调函数添加到EventItem结构当中。

  • 我们会将监听套接字对应的EventItem结构当中的recv_handler设置为acceptor,因为监听套接字的读事件就绪就意味中有连接事件就绪了,而监听套接字一般只关心读事件,因此监听套接字对应的send_handler和error_handler可以设置为nullptr。
  • 当Dispatcher监测到监听套接字的读事件就绪时,会调用监听套接字对应的EventItem结构当中的recv_handler回调,此时就会调用acceptor回调获取建立好的连接。
  • 而对于客户端建立连接的套接字,我们会将其对应的EventItem结构当中的recv_handler、send_handler和error_handler分别设置为这里的recver、sender和error。
  • 当Dispatcher监测到这些套接字的事件就绪时,就会调用其对应的EventItem结构当中对应的回调函数,也就是这里的recver、sender和error。

acceptor回调函数

acceptor回调用于处理连接事件,其工作流程如下:

  1. 调用accept函数获取底层建立好的连接。
  2. 将获取到的套接字设置为非阻塞,并未其创建EventItem结构,填充EventItem结构当中的各个字段,并注册该套接字相关的回调方法。
  3. 将该套接字及其对应需要关心的事件注册到epoll当中。
int acceptor(EventItem *item)
{while (1){struct sockaddr_in peer;memset(&peer, 0, sizeof(peer));socklen_t len = sizeof(peer);int sock = accept(item->_sock, (struct sockaddr*)&peer, &len);if (sock < 0){if (errno == EAGAIN  || errno == EWOULDBLOCK){// 并没有读取出错,只是底层没有链接了return 0;}else if (errno == EINTR){continue;}else{std::cerr << " accept error" << std::endl;return -1;}}// 将该套接字设置为非阻塞SetNonBlock(sock);// 构建EventItem结构EventItem sock_item;sock_item._sock = sock;sock_item._R = item->_R;// 注册回调方法sock_item.ManageCallbacks(recver, sender, errorer); Reactor *R = item->_R;// 将该套接字以及其对应的事件注册到epoll中R->AddEvent(sock, EPOLLIN | EPOLLET, sock_item);}
}

需要注意的是,因为这里实现的是ET模式下的epoll服务器,因此在获取底层连接时需要循环调用accept函数进行读取,并且监听套接字必须设置为非阻塞。

  • 因为ET模式下只有当底层建立的连接从无到有或是从右到多时才会通知上层,如果没有一次性将底层建立好的连接全部获取,并且此后再也没有建立好的连接,那么底层没有获取完的连接就相当于丢失了,所以需要循环多次调用accept函数获取底层建立好的连接。
  • 循环调用accept函数也就意味着,当底层连接全部被获取后再调用accept函数,此时就会因为底层已经没有连接了而被阻塞住。因此需要将监听套接字设置为非阻塞,这样当底层没有连接时accept就会返回,而不会被阻塞住。

accept获取到的新的套接字也需要设置为非阻塞,就是为了避免将来循环调用recv、send等函数时被阻塞。

设置文件描述符为非阻塞

设置文件描述符为非阻塞时,需要先调用fcntl函数获取获取该文件描述符对应的文件状态标记,然后在该文件状态标记的基础上添加非阻塞标记O_NONBLOCK,最后调用fcntl函数对该文件描述符的状态标记进行设置即可。

代码如下:

// 设置文件描述符为非阻塞
bool SetNonBlock(int sock)
{int fl = fcntl(sock, F_GETFL);if (fl < 0){std::cerr << "fcntl error" << std::endl;return false;}fcntl(sock, F_SETFL, fl | O_NONBLOCK);return true;
}

监听套接字设置为非阻塞后,当底层连接不就绪时,accept函数以出错的形式返回,因此当调用accept函数的返回值小于0时,需要继续判断错误码。

  • 如果错误码为EAGAINEWOULDBLOCK,说明本次出错是因为底层已经没有可获取的连接了,此时底层连接全部获取完毕,这时我们可以返回0,表示本次acceptor调用成功。
  • 错误码为EINTR,说明本次调用accept函数获取底层连接时被信号中断了,这时还应该继续调用accept函数进行获取。
  • 除此之外,才说明accept函数是真正调用失败了,这时我们可以返回-1,表示本次acceptor调用失败。

accept、recv和send等IO系统调用为什么会被信号中断?

IO系统调用函数出错返回并且将错误码设置为EINTR,表明本次在进行数据读取或数据写入之前被信号中断了,也就是说IO系统调用在陷入内核,但是却在没有返回用户态的时候内核就去处理其他信号了。

  • 一般来说,在从内核态返回用户态之前会检查信号的pending位图,也就是未决信号集,如果pending位图中有未处理的信号,内核就会对该信号进行处理。
  • 但IO系统调用函数在进行IO操作之前就被信号中断了,这实际上是一个特例,因为IO过程分为 等 和 拷贝 两个步骤,而一般等的时间是比较长的,而在这个过程中我们的执行流其实是处于闲置的状态的,因此在等的过程中如果有信号产生,内核就会立即去进行信号的处理。

写事件是按需打开的

这里调用accept获取上来的套接字添加到epoll模型中时,只添加了EPOLLINEPOLLET事件,也就是说只让epoll关心套接字的读事件。

  • 这里之所以没有添加写事件,是因为当前我们并没有要发送的数据,因此密钥必要让epoll帮我们关心写事件。
  • 一般读事件是经常被设置的,而写事件是按需打开的,只有当我们有数据要发送时才会将写事件打开,并且在数据全部写入完毕后又会立即将写事件关闭。

recver回调函数

recver回调函数用于处理读事件,其工作流程如下:

  1. 循环调用recv函数读取数据,并将读取到的数据添加到该套接字对应的EventItem结构的inbuffer中。
  2. 对inbuffer中的数据进行切割,将完整的报文切割出来,剩下的留在inbuffer中。
  3. 对切割出来的完整报文进行反序列化。
  4. 业务处理。
  5. 业务处理后形成响应报文。
  6. 将响应报文添加到对应EventItem结构的outbuffer中,并打开写事件。

下一次Dispatcher在进行事件派发的时候就会帮我们关注该套接字的写事件,当写事件就绪就会执行该套接字对应的EventItem结构中的写回调方法,进而将outbuffer中的响应数据发送给客户端。

int recver(EventItem *item)
{if (item->_sock < 0) return -1; // 说明该文件描述符已经被关闭// 1. 数据读取if (recver_helper(item->_sock, &(item->_inbuffer)) < 0){// 读取失败item->_error_handler(item);return -1;}// 2. 报文切割,以 X 作为分隔符std::vector<std::string> datagrams;StringUtil::Split(item->_inbuffer, &datagrams, "X");for (auto s : datagrams){// 3. 反序列化struct data d;StringUtil::Deserialize(s, &d._x, &d._y, &d._op);// 4. 业务处理int result = 0;switch(d._op){case '+' : result = d._x + d._y; break;case '-' : result = d._x - d._y; break;case '*' : result = d._x * d._y; break;case '/' : if (d._y == 0){std::cerr << "Error: div zero!" << std::endl;continue; // 继续处理下一个报文}else{result = d._x / d._y;}break;case '%' :if (d._y == 0){std::cerr << "Error: mod zero! " << std::endl;continue;}else{result = d._x % d._y;}break;default:std::cerr << "operation error!" << std::endl;continue; // 继续处理下一个报文}// 5. 形成响应报文std::string response;response += std::to_string(d._x);response += d._op;response += std::to_string(d._y);response += "=";response += std::to_string(result);response += "X"; // 报文与报文之间的分隔符// 6. 将响应报文添加到outbuffer中item->_outbuffer += response;// 打开写事件if (!item->_outbuffer.empty()) item->_R->EnableReadWrite(item->_sock, true, true);}
}

数据读取函数recver_helper

我们可以将循环调用recv函数读取数据的过程封装成一个recv_helper函数。

  • recver_helper函数要做的就是循环调用recv函数将读取到的数据添加到inbuffer中。
  • 当recv函数的返回值小于0时同一需要进一步判断错误码,如果错误码为EAGAINEWOULDBLOCK说明底层数据读取完毕了,如果错误码为EINTR则说明读取说错被信号中断了,此时还需要继续调用recv函数进行读取,否则就是读取出错了。
  • 当读取出错时直接调用该套接字对应的error_handler回调,最终就会调用到下面将要实现的errorer回调,在我们会在errorer回调当中将该套接字进行关闭。
int recver_helper(int sock, std::string *out)
{while (true){char buffer[128];ssize_t size = recv(sock, buffer, sizeof(buffer) - 1, 0);if (size < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){// 数据读取完毕return 0;}else if (errno == EINTR){// 被信号中断,继续尝试读取continue;}else{// 读取出错return -1;}}else if (size == 0){// 对端连接关闭return -1;}// 读取成功buffer[size] = '\0';// 这里这个out不是输出缓冲区,是输出型参数的意思*out += buffer; // 将读取到的数据添加到该套接字对应EventItem结构对应的inbuffer中}
}

报文切割函数Split

报文切割本质就是为了防止粘包问题,而粘包问题实际是设计到协议定制的。

  • 因为我们需要根据协议知道如何将各个报文进行分离,比如UDP分离报文采用的就是定长报头+自描述字段。
  • 我们的目的是演示整个数据处理的过程,为了简单起见就不进行过于复杂的协议定制了,这里我们就以"X"作为各个报文之间的分隔符,每个报文的最后队徽以一个"X"作为报文结束的标志。
  • 因此现在要做的就是以"X"作为分隔符对inbuffer当中的字符串进行切割,这里将这个过程封装成一个Split函数并放到一个StringUtil工具类中。
  • Split函数要做就是对inbuffer中的报文进行分割,将切割出来的报文放到vector中,对于最后无法切割的报文的数据就留在inbuffer即可。
    static void Split(std::string &in, std::vector<std::string> *out, const std::string sep){int start = 0;size_t pos = in.find(sep, start);while (pos != std::string::npos){out->push_back(in.substr(start, pos - start));start = pos + sep.size();pos = in.find(sep, start);}in = in.substr(start);}

反序列化函数Deserialize

在数据发送之前需要进行序列化encode,接收数据之后需要对数据进行反序列化decode。

  • 序列化就是将对象的状态信息转换为可以存储或传输的形式(字节序列)的过程。
  • 反序列化就是把字节序列恢复为原对象的过程。

实际反序列化也是与协议定制相关的,假设这里的epoll服务器向客户端提供的就是计算服务,客户端向服务器发送的都是需要计算的算术表达式。可以用结构体来描述这样一个算术表达式。
在这里插入图片描述

    static void Deserialize(std::string &in, int *x, int *y, char *op){size_t pos = 0;for (pos = 0; pos < in.size(); pos++){if (in[pos] == '+' || in[pos] == '-' || in[pos] == '*' || in[pos] == '/' || in[pos] == '%')break;              }if (pos < in.size()){std::string left = in.substr(0, pos);std::string right = in.substr(pos + 1);*x = atoi(left.c_str());*y = atoi(right.c_str());*op = in[pos];}else{*op = -1;}}

业务处理

业务处理就是服务器拿到客户端发来的数据后,对数据进行数据分析,最终拿到客户端想要的数据。

我们这里要做的业务处理非常简单,就是用反序列化后的数据计算,此时得到的计算结果就是客户端想要的。

形成响应报文

在业务处理后我们已经拿到了客户端想要的数据,现在我们要的就是形成响应报文,由于我们这里规定每个报文都以"X"作为报文结束的标志,因此在形成响应报文的时候,就需要在每一个计算结果后面加上一个"X",表示这是之前某一个请求报文的响应报文,因此协议定制后就需要双方遵守。

将响应报文添加到outbuffer中

响应报文构建完后需要将其添加到该套接字对应的outbuffer中,并打开该套接字的写事件,此后当写事件就绪时就会将outbuffer当中的数据发送出去。

sender回调函数

sender回调函数用于处理写事件,其工作流程如下:

  1. 循环调用send函数发送数据,并将发送出去的数据从该套接字对应EventItem结构删除。
  2. 如果循环调用send函数后该套接字对应的outbuffer当中的数据被全部发送,此时就要将该套接字对应的写事件关闭,因为已经没有要发送的数据,如果outbuffer中还有数据,那么该套接字对应的写事件就应该继续打开。
int sender(EventItem *item)
{if (item->_sock < 0) return -1;int ret = sender_helper(item->_sock, item->_outbuffer);if (ret == 0) // 全部发送成功,不再关心写事件item->_R->EnableReadWrite(item->_sock, true, false);else if (ret == 1) // 没有发送完毕,还需要继续关心写事件item->_R->EnableReadWrite(item->_sock, true, true);else // 写入出错item->_error_handler(item);return 0;
}

我们可以将循环调用send函数发送数据的过程封装成一个sender_helper函数。

  • sender_helper函数要做的就是循环调用send函数将outbuffer中的数据发送出去。
  • 当send函数的返回值小于0时需进一步判断错误码,如果错误码为EAGAINEWOULDBLOCK说明底层TCP发送缓冲区已经被写满了,这时需要将已经发送的数据从outbuffer中移除。
  • 如果错误码为EINTR则说明发生过程中被信号中断了,此时还需要继续调用send函数进行发送,否则就是发送出错了。
  • 当发送出错时也直接调用该套接字对应的error_handler回调,最终就会调用到下面将要实现的errorer回调,在我们会在errorer回调当中将该套接字进行关闭。
  • 如果最终outbuffer当中的数据全部发送成功,则将outbuffer清空即可。
// in为输入型参数的意思
int sender_helper(int sock, std::string &in)
{size_t total = 0; // 累计已经发送的字节数while (true){ssize_t size = send(sock, in.c_str() + total, in.size() - total, 0);if (size < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){// 底层缓冲区已经没有空间了in.erase(0, total); // 将已经发送的数据移除outbufferreturn 1; // 缓冲区写满,没写完}else if (errno == EINTR){// 被信号中断continue;}else{// 写入出错return -1;}}total += size;if (total >= in.size()){in.clear(); // 清空outbufferreturn 0; // 全部写入完毕}}
}

errorer回调函数

errorer回调用于处理异常事件

  • 对于异常事件就绪的套接字这里不作其他处理,直接调用close函数关闭该套接字即可。
  • 但是在关闭套接字之前,需要先调用DelEvent函数将该套接字从epoll模型删除,并取消套接字与其对应的EventItem结构的映射关系。
  • 由于在Dispatcher当中是先处理的异常事件,为了避免该套接字被关闭后继续进行读写操作,然后读写操作失败再次调用errorer回调函数重复关闭该文件描述符,因此在关闭该套接字后将其EventItem当中的文件描述符设置为-1。
  • 在调用recver和sender回调执行读写操作之前,都会判断该EventItem结构当中的文件描述符是否有效,如果无效则不会进行后续操作。
int errorer(EventItem *item)
{item->_R->DelEvent(item->_sock); // 将该文件描述符从epoll模型删除close(item->_sock); // 关闭该文件描述符item->_sock = -1; // 防止关闭后继续执行读写回调return 0;
}

3. epoll ET服务器的运行

服务器的运行步骤如下:

  1. 套接字的创建、绑定和监听,因为是ET模式下的服务器,因此监听套接字创建出来后需要将其设置为非阻塞。
  2. 实例化一个Reactor对象,并将其进行初始化,也就是创建epoll模型。
  3. 为监听套接字定义一个EventItem结构,提案重复EventItem结构中的各个字段,并将acceptor设置为监听套接字的回调方法。
  4. 调用AddEvent函数将监听套接字及其需要监视的事件添加到Dispatcher当中,该过程包括将监听套接字注册到epoll模型中,1以及监听套接字与其对应EventItem结构的映射。
  5. 最后循环调用Reactor类中的Dispatcher函数进行事件派发。

将sockt套接字进行封装

这里编写一个Socket类,对套接字相关的接口进行一定程度的封装,并且为了让外部能够直接调用Socket类当中封装的函数,将这些函数定义为了静态成员函数。

class Socket
{
public:// 创建套接字static int SocketeCreate(){int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0){std::cerr << "socket error" << std::endl;exit(2);}// 设置端口复用int opt = 1;setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));return sock;}// 绑定端口号static void Bind(int sock, int port){struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_port = htons(port);local.sin_family = AF_INET;local.sin_addr.s_addr = INADDR_ANY;socklen_t len = sizeof(local);if (bind(sock, (struct sockaddr*)&local, len) < 0){std::cerr << "bind error" << std::endl;exit(3);}}// 监听static void Listen(int sock, int backlog){if (listen(sock, backlog) < 0){std::cerr << "listen error" << std::endl;exit(4);}}
};

主函数代码

#include "Reactor.hpp"
#include "Util.hpp"
#include "Socket.hpp"#define BACKLOG 5static void Usage(const std::string s)
{std::cout << "Usage: " << s << " port" << std::endl; 
}int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);exit(1);}int port = 8081;int listen_sock = Socket::SocketeCreate();Socket::Bind(listen_sock, port);SetNonBlock(listen_sock);Socket::Listen(listen_sock, BACKLOG);// 创建Reactor,并初始化Reactor R;R.InitReactor();// 创建套接字对应的EventItem结构EventItem item;item._sock = listen_sock;item._R = &R;item.ManageCallbacks(acceptor, nullptr, nullptr); // 套接字只关心读事件// 将监听套接字托管给DispatcherR.AddEvent(listen_sock, EPOLLIN | EPOLLET, item);// 循环进行事件派发int timeout = 1000;while (1) R.Dispatcher(timeout);return 0;
}

至此,一个简单的单Reactor单线程服务器就编写完毕了。

运行这个服务器:
在这里插入图片描述
这就可以看到,服务器可以接收客户端发来的请求并且进行业务处理后响应给客户端。

4. 线程池的使用

因为当前的epoll服务器的业务处理比较简单,所以但进程的epoll服务器看起来没有什么压力,但是如果服务器的业务处理逻辑比较复杂,那么某些客户端发来的数据请求就可能长时间得不到响应,因为这时epoll服务器需要花费大量时间进行业务处理,而在这个过程中服务器无法为其他客户端提供服务。

解决方法

可以在当前服务器的基础上接入线程池,当recver回调读取完数据并完成报文的切割和反序列化之后,就可以将其构建成一个任务然后放到线程池的任务队列中,然后服务器就可以继续进行事件派发,而不需要将事件耗费到业务处理上面,而放到任务队列当中的任务,则由线程池当中的若干个线程进行处理。

接入线程池

线程池的代码如下:

#pragma once#include <iostream>
#include <unistd.h>
#include <queue>
#include <pthread.h>#define NUM 5//线程池
template<class T>
class ThreadPool
{
public://提供一个全局访问点static ThreadPool* GetInstance(){return &_sInst;}
private:bool IsEmpty(){return _task_queue.size() == 0;}void LockQueue(){pthread_mutex_lock(&_mutex);}void UnLockQueue(){pthread_mutex_unlock(&_mutex);}void Wait(){pthread_cond_wait(&_cond, &_mutex);}void WakeUp(){pthread_cond_signal(&_cond);}
public:~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}//线程池中线程的执行例程static void* Routine(void* arg){pthread_detach(pthread_self());ThreadPool* self = (ThreadPool*)arg;//不断从任务队列获取任务进行处理while (true){self->LockQueue();while (self->IsEmpty()){self->Wait();}T task;self->Pop(task);self->UnLockQueue();task.Run(); //处理任务}}void ThreadPoolInit(){pthread_t tid;for (int i = 0; i < _thread_num; i++){pthread_create(&tid, nullptr, Routine, this); //注意参数传入this指针}}//往任务队列塞任务(主线程调用)void Push(const T& task){LockQueue();_task_queue.push(task);UnLockQueue();WakeUp();}//从任务队列获取任务(线程池中的线程调用)void Pop(T& task){task = _task_queue.front();_task_queue.pop();}
private:ThreadPool(int num = NUM) //构造函数私有: _thread_num(num){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}ThreadPool(const ThreadPool&) = delete; //防拷贝std::queue<T> _task_queue; //任务队列int _thread_num; //线程池中线程的数量pthread_mutex_t _mutex;pthread_cond_t _cond;static ThreadPool<T> _sInst;
};template<class T>
ThreadPool<T> ThreadPool<T>::_sInst;

设计一个任务类

我们需要定义出来一个任务类,该任务类当中需要提供一个Run方法,这个Run方法就是将线程池中的若干线程池从任务队列当中拿到任务后执行的方法。

  • 在任务类中包含两个成员变量,成员bianld就是反序列化后用于进行业务处理的数据,成员变量item就是该套接字的EventItem结构,因为数据处理完之后需要将形成的响应报文添加到该套接字对应的outbuffer中。
  • Run方法中处理数据的逻辑与之前的意义,只是将那部分方法放到了Run方法中。

此时recver回调函数中在读取数据、报文切割、反序列化后就可以构建出一个任务对象,然后将该任务放到任务队列当中就行了。

int recver(EventItem* item)
{if (item->_sock < 0) //该文件描述符已经被关闭return -1;//1、数据读取if (recver_helper(item->_sock, &(item->_inbuffer)) < 0){ //读取失败item->_error_handler(item);return -1;}//2、报文切割std::vector<std::string> datagrams;StringUtil::Split(item->_inbuffer, &datagrams, "X");for (auto s : datagrams){//3、反序列化struct data d;StringUtil::Deserialize(s, &d._x, &d._y, &d._op);Task t(d, item); //构建任务ThreadPool<Task>::GetInstance()->Push(t); //将任务push到线程池的任务队列中}return 0;
}

这样线程池就是接入完毕了,下面再次尝试运行这个服务器。需要注意的是在运行之前需要对线程池进行初始化。

    //初始化线程池ThreadPool<Task>::GetInstance()->ThreadPoolInit();

运行结果如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/100447.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

突破封锁|华为芯片10年进化史:从K3V1到麒麟9000S

华为海思麒麟芯片过去10年研发历程回顾如下&#xff1a; 2009年&#xff1a;华为推出第一款手机芯片K3V1&#xff0c;采用65nm工艺制程&#xff0c;基于ARM11架构&#xff0c;主频600MHz&#xff0c;支持WCDMA/GSM双模网络。这款芯片搭载在华为U8800手机上&#xff0c;标志着华…

在SOLIDWORKS搭建一个简易的履带式机器人

文章目录 前言一、构建模型基本单元二、搭建车体模块三.插入轮子4.构建履带 前言 趁着十一假期&#xff0c;在solidworks中搭建了一个履带式机器人小车&#xff0c;计划将其应用在gazebo中完成多机器人编队的仿真。 一、构建模型基本单元 构建底板&#xff08;a面&#xff09…

面试总结-Redis篇章(十二)——Redis是单线程的,为什么还那么快

Redis是单线程的&#xff0c;为什么还那么快 Redis是单线程的&#xff0c;为什么还那么快什么是IO多路复用 阻塞IO非阻塞IOIO多路复用 Redis是单线程的&#xff0c;为什么还那么快 Redis是纯内存操作&#xff0c;执行速度非常快采用单线程&#xff0c;避免不必要的上下文切换可…

Python中套接字实现服务端和客户端3-3

3 创建客户端的步骤 创建客户端的步骤如图5所示。 图5 创建客户端的步骤 从图5可以看出&#xff0c;对于客户端来说&#xff0c;首先创建套接字&#xff0c;之后通过创建的套接字去连接服务端&#xff0c;如果连接成功&#xff0c;则继续通过该套接字向服务端发送数据&#x…

【Mybatis源码】IDEA中Mybatis源码环境搭建

一、Mybatis源码源 在github中找到Mybatis源码地址&#xff1a;https://github.com/mybatis/mybatis-3 找到Mybatis git地址 二、IDEA导入Mybatis源码 点击Clone下载Mybatis源码 三、选择Mybatis分支 选择Mybatis分支&#xff0c;这里我选择的是3.4.x分支

【ChatGPT】无需代理使用ChatGPT

推荐一个无需代理、可以直接使用的、免费的、无需客户端的、稳定的ChatGPT终端 支持GPT-3.5和CPT-4 无需境外手机号 该工具比较稳定&#xff0c;断流情况很少 GPTDOS 注册地址&#xff1a;GPTDOS &#xff08;使用我的邀请链接进行注册&#xff0c;双方都可以得到50000个toke…

Vue3 编译原理

文章目录 一、编译流程1. 解读入口文件 packgages/vue/index.ts2. compile函数的运行流程 二、AST 解析器1. ast 的生成2. 创建ast的根节点3. 解析子节点 parseChildren&#xff08;关键&#xff09;4. 解析模版元素 Element模版元素解析-举例分析 一、编译流程 1. 解读入口文…

嵌入式养成计划-33--数据库-sqlite3

七十一、 数据库 71.1 数据库基本概念 数据&#xff08;Data&#xff09; 能够输入计算机并能被计算机程序识别和处理的信息集合数据库 &#xff08;Database&#xff09;数据库是在数据库管理系统管理和控制之下&#xff0c;存放在存储介质上的数据集合 常用的数据库 大型数…

第二证券:5.5G时代将至 算力基建迎政策助力

昨日&#xff0c;A股全线低开&#xff0c;三大股指盘中均跌超1%&#xff0c;盘中冲高回落&#xff0c;午后逐渐止跌。到收盘&#xff0c;沪指跌0.44%报3096.92点&#xff0c;深成指微跌0.03%报10106.96点&#xff0c;创业板指跌0.26%报1998.61点&#xff0c;两市算计成交7700元…

【unity】制作一个角色的初始状态(左右跳二段跳)【2D横板动作游戏】

前言 hi~ 大家好&#xff01;欢迎大家来到我的全新unity学习记录系列。现在我想在2d横板游戏中&#xff0c;实现一个角色的初始状态-闲置状态、移动状态、空中状态。并且是利用状态机进行实现的。 本系列是跟着视频教程走的&#xff0c;所写也是作者个人的学习记录笔记。如有错…

linux centos出现No space left on device解决方案

问题是因为系统磁盘空间不足 解决方法: 找到那个磁盘不足问题 df -lh 发现/dev/mapper/cl-root磁盘已用50G,有如下 解决方案&#xff1a; 1、如果是虚拟机可以通过分配空间使其空间增加 2、将其他不常用磁盘空间分配给cl-root如&#xff08; /dev/mapper/cl-home &#…

unity 使用模拟器进行Profiler性能调试

这篇文章主要记录如何实现通过模拟器对打包的app游戏进行Profiler调试。主要记录一些比较重要的点。 准备工作 首先你要能够打包unity的安卓包&#xff0c;如果没有安装安卓组件&#xff0c;请先安装组件。 安装完成以后&#xff0c;会在unity的安装目录找到相应的SDK 这个…

嵌入式Linux裸机开发(五)中断管理

系列文章目录 文章目录 系列文章目录前言STM32 中断系统IMX6U中断控制8个中断GIC中断控制器GIC介绍中断IDGIC逻辑分块GIC协处理器 中断使能中断优先级 重点代码分析官方SDK函数start.S文件自行编写中断驱动文件 前言 最近在学习中发现&#xff0c;学Linux嵌入式不仅是对Linux的…

为Yolov7环境安装Cuba匹配的Pytorch

1. 查看Cuba版本 方法一 nvidia-smi 找到CUDA Version 方法二 Nvidia Control Panel > 系统信息 > 组件 > 2. 安装Cuba匹配版本的PyTorch https://pytorch.org/get-started/locally/这里使用conda安装 conda install pytorch torchvision torchaudio pytorch-cu…

JDK、JRE、JVM三者之间的关系

1.JDK 基本介绍 1) JDK 的全称 (Java Development Kit Java 开发工具包 ) JDK JRE java 的开发工具 [java, javac,javadoc,javap 等 ] 2) JDK 是提供给 Java 开发人员使用的&#xff0c;其中包含了 java 的开发工具&#xff0c;也包括了 JRE 。所以安装了 JDK &#xff0c;就…

论文研读|Protecting Intellectual Property of Deep Neural Networks with Watermarking

目录 论文信息文章简介研究动机研究方法水印生成水印嵌入版权验证 实验结果有效性&#xff08;Effectiveness&#xff09;高效性&#xff08;Converge Speed&#xff09;保真度&#xff08;Functionality&#xff09;鲁棒性&#xff08;Robustness&#xff09;Anti-剪枝攻击&am…

SpringBoot源码分析-自动装配-实现原理

文章目录 SpringBoot自动装配前言介绍实现原理SpringBootApplicationEnableAutoConfigurationselectImports方法没有走&#xff1f;DeferredImportSelector源码分析设计目的 总结 SpringBoot自动装配 前言 什么是自动装配&#xff1f;用过Spring的应该都知道&#xff0c;虽然…

MySQL总结练习题

目录 1.准备数据表 2.表之间的关系 3.题目 3.1 取得每个部门最高薪水的人员名称 3.2 哪些人的薪水在部门的平均薪水之上 3.3 取得部门中&#xff08;所有人的&#xff09;平均的薪水等级 3.4 不准用组函数&#xff08;Max &#xff09;&#xff0c;取得最高薪水 3.5 取…

【数据结构】归并排序和计数排序(排序的总结)

目录 一&#xff0c;归并排序的递归 二&#xff0c;归并排序的非递归 三&#xff0c;计数排序 四&#xff0c;排序算法的综合分析 一&#xff0c;归并排序的递归 基本思想&#xff1a; 归并采用的是分治思想&#xff0c;是分治法的一个经典的运用。该算法先将原数据进行拆…

BUUCTF SimpleRev

分析 该文件为64位的ELF文件&#xff0c;运行在linux平台 使用IDA64打开 进入Decry函数 输入flag和成功的提示 看看如何才能成功拿到flag 这里比较text和str2&#xff0c;text是源代码就有的 那么str2应该就是我们输入的内容 先分析text的内容是什么 进入join函数 该函数…