Reactor
点赞👍👍收藏🌟🌟关注💖💖
你的支持是对我最大的鼓励,我们一起努力吧!😃😃
基于上一篇epoll的学习,现在我们也知道epoll的工作模式有两种,一种默认LT工作模式,另一种是ET模式。关于epoll的LT工作模式我们已经写过了。接下来我们写一份基于ET模式下的Reator,处理所有的IO。
Reactor = 如何正确的处理IO+协议定制+业务逻辑处理
下面我们写一个简洁版的Reactor,它是一个半同步半异步IO,具体它什么原理,怎么做的,有什么特征。我们在代码层面上解开它的面纱。代码写完总结就理解了。其实Reactor是在Liunx网络中,最常用,最频繁的一种网络IO设计模式!
我们是这样打算的,对错误码,日志函数,套接字,epoll做封装然后在写服务器的时候用到的时候调用即可。错误码,日志函数,套接字以前我们封装过今天直接用就行了。
错误码封装
#pragma onceenum
{USAGG_ERR = 1,SOCKET_ERR,BIND_ERR,LISTEN_ERR,EPOLL_CREATE_ERR
};
日志函数封装
#pragma once#include<iostream>
#include<string>
#include<stdio.h>
#include <cstdarg>
#include<ctime>
#include<sys/types.h>
#include<unistd.h>
#include<fstream>#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4#define LOG_NORMAL "log.txt"
#define LOG_ERR "log.error"const char* level_to_string(int level)
{switch(level){case DEBUG: return "DEBUG";case NORMAL: return "NORMAL";case WARNING: return "WARNING";case ERROR: return "ERROR";case FATAL: return "FATAL";}
}//时间戳变成时间
char* timeChange()
{time_t now=time(nullptr);struct tm* local_time;local_time=localtime(&now);static char time_str[1024];snprintf(time_str,sizeof time_str,"%d-%d-%d %d-%d-%d",local_time->tm_year + 1900,\local_time->tm_mon + 1, local_time->tm_mday,local_time->tm_hour, \local_time->tm_min, local_time->tm_sec);return time_str;
}void logMessage(int level,const char* format,...)
{//[日志等级] [时间戳/时间] [pid] [message]//[WARNING] [2024-3-21 10-46-03] [123] [创建sock失败]
#define NUM 1024//获取时间char* nowtime=timeChange();char logprefix[NUM];snprintf(logprefix,sizeof logprefix,"[%s][%s][pid: %d]",level_to_string(level),nowtime,getpid());//char logconten[NUM];va_list arg;va_start(arg,format);vsnprintf(logconten,sizeof logconten,format,arg);std::cout<<logprefix<<logconten<<std::endl;
};
今天这里我们把套接字封装改一下,以前把它相关接口都写成静态的了,不需要对象直接调用了。今天呢,都改成非静态的,未来这个类提供的方法都要以对象调用访问。
#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Err.hpp"
#include "Log.hpp"using namespace std;class Sock
{const static int backlog = 32;const static int defaultsock = -1;public:Sock(int sock = defaultsock) : _listensock(sock){}~Sock(){if (_listensock != defaultsock)close(_listensock);}public:int sock(){// 1. 创建socket文件套接字对象_listensock= socket(AF_INET, SOCK_STREAM, 0);if (_listensock < 0){logMessage(FATAL, "create socket error");exit(SOCKET_ERR);}logMessage(NORMAL, "create socket success: %d", _listensock);int opt = 1;setsockopt(_listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));}//方便外面获取_listensockint Fd(){return _listensock;}void Bind(int port){// 2. bind绑定自己的网络信息struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;if (bind(_listensock, (struct sockaddr *)&local, sizeof(local)) < 0){logMessage(FATAL, "bind socket error");exit(BIND_ERR);}logMessage(NORMAL, "bind socket success");}void Listen(){// 3. 设置socket 为监听状态if (listen(_listensock, backlog) < 0){logMessage(FATAL, "listen socket error");exit(LISTEN_ERR);}logMessage(NORMAL, "listen socket success");}int Accept(string *clientip, uint16_t *clientport,int* err){struct sockaddr_in peer;socklen_t len = sizeof(peer);int sock = accept(_listensock, (struct sockaddr *)&peer, &len);*err=errno;if (sock < 0){}//logMessage(ERROR, "accept error, next");else{//logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?*clientip = inet_ntoa(peer.sin_addr);*clientport = ntohs(peer.sin_port);}return sock;}private:int _listensock;
};
对于一个epoll来说,首先要先给epoll创建出来,然后epoll的接口都要用到epoll创建成功的返回值。所以直接在这个epoll类中定义个成员变量直接就把创建epoll成功的返回值拿到。然后服务器不需要直接拿着这个返回值,而直接调用这个类对象使用里面的提供的接口就行了。epoll这里先写一个大体框架,后面需要什么了再加
#pragma once#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>
#include "Err.hpp"
#include "Log.hpp"using namespace std;const int defaultepfd = -1;
const int size = 128;class Epoller
{public:Epoller(int epfd = defaultepfd) : _epfd(epfd){}~Epoller(){if (_epfd != defaultepfd)close(_epfd);}public:private:int _epfd;//创建epoll返回值
};
目前调用逻辑,后面在加内容
#include "TcpServer.hpp"
#include "Err.hpp"
#include <memory>static void usage(std::string proc)
{std::cerr << "Usage:\n\t" << proc << " port" << "\n\n";
}int main(int argc, char *argv[])
{if (argc != 2){usage(argv[0]);exit(USAGG_ERR);}uint16_t port=atoi(argv[1]);std::unique_ptr<TcpServer> uls(new TcpServer(port));uls->initServer();uls->Dispatch();return 0;
}
服务器大体框架
#pragma once#include <iostream>
#include "Sock.hpp"
#include "Err.hpp"
#include "Log.hpp"
#include "Epoller.hpp"const int defaultport = 8080;class TcpServer
{public:TcpServer(int port = defaultport) : _port(port){}~TcpServer(){}void initServer(){ }void Dispatch()//事件派发{}
public:private:uint16_t _port;Sock _sock;Epoller _epoll;
};
接下来就是编写服务器了
初始化服务器
void initServer()
{ // 1.创建套接字_sock.sock();_sock.Bind(_port);_sock.Listen();// 2.创建epoll模型
}
创建套接字之后,我们要在创建一个epoll模型,因此我们在Epoller写一个创建epoll模型的接口,然后服务器直接调用就行了
```cpp
void Create()
{_epfd = epoll_create(size);if (_epfd < 0){logMessage(FATAL, "epoll_create error, code: %d, errstring: %s", errno, strerror(errno));exit(EPOLL_CREATE_ERR);}logMessage(NORMAL, "epoll create success, epfd: %d", _epfd);
}
void initServer()
{ // 1.创建套接字_sock.sock();_sock.Bind(_port);_sock.Listen();// 2.创建epoll模型_epoll.Create();// 3.将目前唯一的一个sock,添加到epoll中
}
创建好套接字,epoll模型接下来我们先把_listensock套接字添加到epoll里,看看这样写有没有什么问题
不过先在Epoller类中补充一个用户告诉内核你要帮我关心对应fd的什么事件。
// user -> kernel
bool AddEvents(int sock, uint32_t event)
{struct epoll_event ev;ev.events = event;ev.data.fd = sock;int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev);if (n < 0){logMessage(ERROR, "sock join epoll fail");return false;}return true;
}
写好后我们直接调用就好了
void initServer()
{ // 1.创建套接字_sock.sock();_sock.Bind(_port);_sock.Listen();// 2.创建epoll模型_epoll.Create();// 3.将目前唯一的一个sock,添加到epoll中_epoll.AddEvents(_sock.Fd(),EPOLLIN | EPOLLET);
}
但是现在有小一个问题,前面说过的,如果未来你的套接字工作模式是ET模式,那么该套接字必须处于非阻塞,_listensock套接字创建处理默认就是阻塞的,因此我们需要将文件描述符设为非阻塞!
我们在高级IO的非阻塞IO哪里就已经写过一个将一个fd变成非阻塞了,现在拿过来用就行了,不过我们也还是把它封装起来。然后调用。
#include <iostream>
#include <unistd.h>
#include <fcntl.h>using namespace std;class Util
{
public:static bool SetNonBlock(int fd){int fl = fcntl(fd, F_GETFL);if (fl < 0)return false;fcntl(fd, F_SETFL, fl | O_NONBLOCK);return true;}
};
到目前为止好像貌似没问题了,但是真的吗?
void initServer()
{ // 1.创建套接字_sock.sock();_sock.Bind(_port);_sock.Listen();// 2.创建epoll模型_epoll.Create();// 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞Util::SetNonBlock(_sock.Fd());_epoll.AddEvents(_sock.Fd(),EPOLLIN | EPOLLET);
}
我们回过头看看之前写过的epoll服务器的代码,这里处理普通sock就绪事件的问题,你怎么保证你把本轮收到的数据都读完?即使把本轮数据都读完了,就一定能够读到一个完整的请求吗?即使未来在这里循环读取,可以反正我们非阻塞也都写过。那就能保证读到一个完整请求吗?
不一定!
那没读到完整的请求我们是不是只能把本轮读到的数据暂时保存到buffer里,可是暂时保存到buffer里,你保存了,那别人怎么办? 你这个sock和其他sock说你们别着急先别覆盖我的数据,buffer里面保存的是我的数据,你们先不要写。你想多了!
你读完之后整个这个代码区间就全部释放掉了,因为这是栈上面的空间。所以即便你没读完,或者你把这轮读完了。那么下轮在读,buffer早就释放了。你可能本轮读到后半部分但前半部分已经没有了,可能不到下次,下一个循环进来就给你清空了。
所以怎么样进行正确读写,光有一个套接字和定义一个栈上的缓存区远远不够!
所以我们需要给每一个文件描述符,都要把输入,输出用户层缓存区都要给它带上!
现在我们清楚知道历史代码的问题,我们要正确写整个服务,现在已经不够了。我们需要将每一个套接字进行封装,每一个套接字都要包含自己对应的输入,输出缓存区空间,只有每一个套接字都有自己对应的缓存空间,读取的时候把数据读取赞存在自己对应的缓存区中,没读完下次在读,这时候在添加到我自己的缓存区里,不就行了吗,这个时候谁和谁都不揉在一起。所以我们在封装一个类! 也就是说我认为未来每一个套接字都看类对象。
class Connection
{
public:Connection(int sock = defaultport) : sock_(sock){}~Connection(){if (sock_ == defaultsock)close(sock_);}public:int sock_;//这个类对应的套接字是谁string inbuffer_;//输入缓存区,这里我们暂时没有考虑处理图片,视频等二进制信息,不然stirng就不太合适了 string outbuffer_;//输出缓存区 ,你并不能保证你的写事件就绪};
针对于上面未来每一个Connection对象,因为每一个套接字未来面对的都有自己的读方法、写方法、异常方法,所以在Connection类中,对一个套接字来讲要提供三个回调方法,分别对应读方法、写方法,异常方法。
const int defaultport = 8080;
const int defaultsock = -1;using func_t = function<void(Connection *)>;class Connection
{
public:Connection(int sock = defaultport) : sock_(sock){}~Connection(){if (sock_ == defaultsock)close(sock_);}public:int sock_; //这个类对应的套接字是谁string inbuffer_; //输入缓存区,这里我们暂时没有考虑处理图片,视频等二进制信息,不然stirng就不太合适了 string outbuffer_; //输出缓存区 ,你并不能保证你的写事件就绪func_t recver_; //从sock_读func_t sender_; //向sock_写func_t excepter_; //处理sock_,IO的时候上面的异常事件
};
所以在初始服务器写的,创建套接字,创建epoll模型,然后将_listensock套接字添加到epoll中,那未来是不是更多的套接字会被accept上来,我们把每一个套接字都看成Connection对象,所以未来整个服务器是不是存在非常多的Conncetion对象。同时_listensock 也是一个sock啊,也要看作成为一个 Connection对象,虽然_listensock不用所谓的输入输出缓存区,包括回调方法最多也是用一个 recver _,但是在我看来_listensock也是一个Connection。 所以刚才服初始服务器哪里写_listensock就有不合适了。并且未来每一个sock都是Connection对象,所以服务器要不要把这些多的Connection对象管理起来呢? 当然要了! 那服务器怎么管理?
先描述,在组织! ,我们管理起来了吗,管理起来了,Connection就是。怎么组织呢?既然每一个Connection对象都有一个对应的套接字,那我们就是哈希表管理。
class TcpServer
{public://...private:uint16_t _port;Sock _sock;Epoller _epoll;unordered_map<int,Connection*> _connections;//所有链接集合
};
而且这里的代码仅仅是简单的将套接字添加到epoll中,未来我们不仅想将套接字添加到epoll中,并且还想将每一个Connectoin对象添加unordered_map里。
为什么unordered_map我们使用的是套接字做key,因为将来在epoll中一旦有fd就绪了,我们知道是什么事件就绪,也知道是那个fd就绪。知道之后我们就可以在unordered_map根据文件描述符快速的找到对应套接字的Connection对象,然后输入,输出缓存区也就有了。
所以在写一个添加链接的函数,作为一个服务器收到一个套接字,它就需要添加一个链接。
void AddConnection(int sock, uint32_t events)
{// 1. 首先要为该sock创建Connection,并初始化,并添加到connections_// 2. 其次将sock与它要关心的事件"写透式"注册到epoll中,让epoll帮我们关心bool r = _epoll.AddEvents(sock, events);assert(r);(void)r;}
剩下的等会再说,所以当你有了新的套接字的时候,不应该是把套接字直接添加到epoll里,而应该是进行AddConnection。_listensock也是如此
void initServer()
{// 1.创建套接字_sock.sock();_sock.Bind(_port);_sock.Listen();// 2.创建epoll模型_epoll.Create();// 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞// Util::SetNonBlock(_sock.Fd());// _epoll.AddEvents(_sock.Fd(), EPOLLIN | EPOLLET);AddConnection(_sock.Fd(),EPOLLIN|EPOLLET);
}void AddConnection(int sock, uint32_t events)
{// 1.设置非阻塞,ET模式fd要非阻塞if (events & EPOLLET)Util::SetNonBlock(sock);// 2. 该sock创建Connection,并初始化,并添加到connections_Connection *conn = new Connection(sock);// 3.// 4. 其次将sock与它要关心的事件"写透式"注册到epoll中,让epoll帮我们关心bool r = _epoll.AddEvents(sock, events);assert(r);(void)r;// 5. 将kv添加到connections__connections.insert(make_pair(sock, conn));logMessage(DEBUG, "add new sock : %d in epoll and unordered_map", sock);
}
这样写还没完,未来一旦有fd就绪了,然后就可以在unordered_map中根据fd找到对应的Connection,找到之后我们是不是要执行对应的读,写,异常方法,所以任何一个Connection内方法能够被调用,因此Connection类在提供一个给每一个Connection对象注册读,写,异常方法,
这样写的意思是,未来这个AddConnection接口不仅可以被用来注册_listensock套接字,也可以用来被注册一般文件描述符,一般文件描述符也可能关心读,也可以关心写,也可能关心异常。或者只关心读,只关心写等等情况,所以当我们每个套接字注册一个Connection对象,你怎么知道这个套接字未来要执行什么方法呢 ,所以我们需要Connection提供一个注册方法。
class Connection
{
public:Connection(int sock = defaultport) : sock_(sock){}~Connection(){if (sock_ == defaultsock)close(sock_);}void Rigster(func_t r, func_t s, func_t e){recver_ = r;sender_ = s;excepter_ = e;}public:int sock_; // 这个类对应的套接字是谁string inbuffer_; // 输入缓存区,这里我们暂时没有考虑处理图片,视频等二进制信息,不然stirng就不太合适了string outbuffer_; // 输出缓存区 ,你并不能保证你的写事件就绪func_t recver_; // 从sock_读func_t sender_; // 向sock_写func_t excepter_; // 处理sock_,IO的时候上面的异常事件
};
所以当使用AddConnection的时候,注册文件描述符到epoll的时候,既要有要关心的fd,还要有关心fd上的什么事件,这两个字段是你要告诉epoll,当fd就绪时你想怎么处理这个fd,所以还要再给三个参数,告诉这个fd就绪时处理什么方法。
void AddConnection(int sock, uint32_t events,func_t recver, func_t sender, func_t excepter)
{// 1.设置非阻塞,ET模式fd要非阻塞if (events & EPOLLET)Util::SetNonBlock(sock);// 2. 该sock创建Connection,并初始化,并添加到connections_Connection *conn = new Connection(sock);// 3. 给对应的sock设置对应的回调方法conn->Rigster(recver, sender, excepter);// 4. 其次将sock与它要关心的事件"写透式"注册到epoll中,让epoll帮我们关心bool r = _epoll.AddEvents(sock, events);assert(r);(void)r;// 5. 将kv添加到connections__connections.insert(make_pair(sock, conn));logMessage(DEBUG, "add new sock : %d in epoll and unordered_map", sock);
}
所以当我们重新审视初始化服务器的代码,创建套接字,创建epoll模型,把_listensock套接字添加到epoll里,对于_listensock套接字我们也要设置对应的方法,它就绪时想怎么读,怎么写,怎么处理异常。不过对于_listensock套接字它只关心读,所以这里我们先给它一个未来要写的从_listensock获取链接的方法,不关心写和处理异常,直接设置为nullptr就行了。
不过这里还有一个细节,我们因为读、写、异常是我们使用的是包装器function,如何让它的对象调用一个类内的非静态成员函数会比较麻烦,因为类内的非静态成员函数内隐藏了一个this指针。这样的话function的参数还要添加一个,然后调用的话还需要再传一个这个类对象过去。但是我们在C++学过bing绑定,可以固定参数,这样以后调用这个成员函数直接就帮我自动传这个参数了,对这里不懂得可以看这里【C++】C++11中
void Accepter(Connection* conn)
{
}void initServer()
{// 1.创建套接字_sock.sock();_sock.Bind(_port);_sock.Listen();// 2.创建epoll模型_epoll.Create();// 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞// Util::SetNonBlock(_sock.Fd());// _epoll.AddEvents(_sock.Fd(), EPOLLIN | EPOLLET);AddConnection(_sock.Fd(), EPOLLIN | EPOLLET,bind(&TcpServer::Accepter, this, placeholders::_1), nullptr, nullptr);
}
未来一旦设置到epoll里fd上有事件就绪了,然后就根据就绪的fd在unordered_map找到对应的Conncetion,然后调用Conncetion曾经注册的对应的方法来进行事件派发,所以接下来我们要写事件派发的接口,让epoll_wait帮我们获取就绪事件,这里有很多写法,不过我们还是不写在一块,多写一个Loop函数然后进行一次事件派发,Dispatch就是循环派发
// 事件派发器
void Dispatch()
{int timeout = -1000;while (true){Loop(timeout);}
}void Loop(int &timeout)
{_epoll.Wait()// 获取已经就绪的事件
}
所以在Epoller再来一个接口,我们直接调用就行了,这里我们需要一个拷贝数组,数组大小,和等待的方式
// kernel -> user
int Wait(struct epoll_event *ets, int num, int timeout)
{int n = epoll_wait(_epfd, ets, num, timeout);switch (n){case 0:logMessage(NORMAL, "timeout ...");break;case -1:logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));break;default:logMessage(NORMAL, "have event ready");break;}return n;
}
epoll_wait这里注定了我们一次可能会捞取许多就绪事件,所以我们要在服务器中在定义一个保存所有就绪事件的数组,和数组大小然后调用_epoll.Wait()的时候传过去
const int defaultnum = 128;class TcpServer
{public:TcpServer(int port = defaultport,int num = defaultnum) : _port(port),_revents(nullptr),_num(num){}private:int _port;Sock _sock;Epoller _epoll;unordered_map<int, Connection *> _con;struct epoll_event *_revents;int _num;
};
初始服务器的时候,我们就把数组申请好
void initServer()
{// 1.创建套接字_sock.sock();_sock.Bind(_port);_sock.Listen();// 2.创建epoll模型_epoll.Create();// 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞// Util::SetNonBlock(_sock.Fd());// _epoll.AddEvents(_sock.Fd(), EPOLLIN | EPOLLET);AddConnection(_sock.Fd(), EPOLLIN | EPOLLET,bind(&TcpServer::Accepter, this, placeholders::_1), nullptr, nullptr);// 4.拷贝数组_revents = new struct epoll_event[_num];
}
然后接下来就可以进行事件派发了
void Loop(int &timeout)
{int n = _epoll.Wait(_revents, _num, timeout); // 获取已经就绪的事件for (int i = 0; i < n; ++i) // 下面一定是就绪的事件{uint32_t event = _revents[i].events; // 就绪的事件int sock = _revents[i].data.fd; // 就绪事件的fd// 事件派发if ((event & EPOLLIN) && sock == _sock.Fd()) // listensock读就绪_connections[sock]->recver_(_connections[sock]);if ((event & EPOLLIN) && sock != _sock.Fd()) // 普通sock读就绪_connections[sock]->recver_(_connections[sock]);
}
我们注意到不管是_listensock套接字还是普通sock套接字读就绪,它们的处理方法都是一样的,因此我们把它们写在一块。只要是读事件事件,我们都根据文件描述去执行对应的读方法,未来设怎么读都由自己曾经设置对应recver_来定
void Loop(int &timeout)
{int n = _epoll.Wait(_revents, _num, timeout); // 获取已经就绪的事件for (int i = 0; i < n; ++i) // 下面一定是就绪的事件{uint32_t event = _revents[i].events; // 就绪的事件int sock = _revents[i].data.fd; // 就绪事件的fd// 事件派发// if ((event & EPOLLIN) && sock == _sock.Fd()) // listensock读就绪// _connections[sock]->recver_(_connections[sock]);// if ((event & EPOLLIN) && sock != _sock.Fd()) // 普通fd读就绪// _connections[sock]->recver_(_connections[sock]);if ((event & EPOLLIN))//读事件就绪_connections[sock]->recver_(_connections[sock]);if ((event & EPOLLOUT))//写事件就绪_connections[sock]->sender_(_connections[sock]);
}
这样写还有一些问题,首先我们还要判断一下这个套接字是否在unordered_map真的存在,其次在判断对应套接字的方法是否曾经被设置,不然也可能会存在调用的问题
bool ExitHashSock(int sock)
{auto pos = _con.find(sock);return pos != _con.end();
}void Loop(int &timeout)
{int n = _epoll.Wait(_revents, _num, timeout); // 获取已经就绪的事件for (int i = 0; i < n; ++i) // 下面一定是就绪的事件{uint32_t event = _revents[i].events; // 就绪的事件int sock = _revents[i].data.fd; // 就绪事件的fd// 事件派发// _listensock和普通sock都有自己对应的回调方法,因此对_listensock和普通sock处是一样的,不用区分if ((event & EPOLLIN) && IsConnectionExists(sock) && (_connections[sock]->recver_))_connections[sock]->recver_(_connections[sock]);if ((event & EPOLLOUT) && IsConnectionExists(sock) && (_connections[sock]->sender_))_connections[sock]->sender_(_connections[sock]);
}
这里还有一丢丢小问题,对我们来说我们从来没说过异常的事件,但并不排除在通信的时候存在其他epoll对应的事件
- EPOLLERR : 表示对应的文件描述符发生错误;
- EPOLLHUP : 表示对应的文件描述符被挂断;
我们可以这样写,如果当前就绪的事件出现异常了,我们设置进该就绪事件中的读和写去处理。这样的好处是将所有的异常问题,全部转化,成为读写问题 ,本来读写本来就要做异常处理,只要把读写异常处理了,这个异常也就被自动处理了!
void Loop(int &timeout)
{int n = _epoll.Wait(_revents, _num, timeout); // 获取已经就绪的事件for (int i = 0; i < n; ++i) // 下面一定是就绪的事件{uint32_t event = _revents[i].events; // 就绪的事件int sock = _revents[i].data.fd; // 就绪事件的fd//将所有的异常问题,全部转化,成为读写问题if (event & EPOLLERR)event | EPOLLIN | EPOLLOUT;if (event & EPOLLHUP)event | EPOLLIN | EPOLLOUT;// _listensock和普通sock都有自己对应的回调方法,因此对_listensock和普通sock处是一样的,不用区分if ((event & EPOLLIN) && IsConnectionExists(sock) && (_connections[sock]->recver_))_connections[sock]->recver_(_connections[sock]);if ((event & EPOLLOUT) && IsConnectionExists(sock) && (_connections[sock]->sender_))_connections[sock]->sender_(_connections[sock]);
}
因为我们目前就一个_listensock套接字,所以到目前为止有事件就绪就是_listensock套接字就绪,也就是这里只会执行_listensock套接字设置的读事件也就是我们曾经给_listensock套接字绑定的Accepter函数来获取新链接。
void Accepter(Connection *conn)
{string clientip;uint16_t clientport;int err = 0;//获取错误码,用来判断是非阻塞了/读取被打断了/还是真错误了int sock = _sock.Accept(&clientip, &clientport, &err);if (sock > 0){// 新的sock套接字添加到AddConnetionsAddConnetions(sock, EPOLLIN | EPOLLET,);}
}
对于未来获取到的普通文件描述符,它还要有对应的事件就绪时我们需要怎么处理这个套接字,所以我们需要设置一个对普通文件描述符就绪时它的处理读,写,异常的方法
void recver(Connection *conn)
{
}void sender(Connection *conn)
{
}void excepter(Connection *conn)
{
}void Accepter(Connection *conn)
{string clientip;uint16_t clientport;int err = 0;//获取错误码,用来判断是非阻塞了/读取被打断了/还是真错误了int sock = _sock.Accept(&clientip, &clientport, &err);if (sock > 0){// 新的sock套接字添加到AddConnetionsAddConnetions(sock, EPOLLIN | EPOLLET,bind(&TcpServer::recver, this, placeholders::_1),bind(&TcpServer::sender, this, placeholders::_1),bind(&TcpServer::excepter, this, placeholders::_1));}
}
现在有一个问题时,当时_listesock套接字注册的时候也是ET模式的,并且也设置成非阻塞的了,所以通知的时候只会通知_listesock套接字读就绪一次,Accepter调用也就一次。
也就是说你能保证底层到来的链接就一个吗?不能!你并不能保证此时正在处理_listesock套接字获取新连接时有几个到来了,有可能同时到来了非常多的连接,所以倒逼程序员必须把所有到达的连接全部都读上来,所有上面Accepet的写法是不对的,必须要循环读取全部都读上来!而且我们早把_listesock设置为非阻塞了,底层没有连接了也不会被阻塞,虽然返回值是-1,但是错误码被设置!可能是底层没错误了错误码被设为EAGAIN 或EWOULDBLOCK,或者读取时被信号中断了错误码被设为EINTR,或者真错误了!
void Accepter(Connection *conn)
{while (true){string clientip;uint16_t clientport;int err = 0; // 获取错误码,用来判断是非阻塞了/读取被打断了/还是真错误了int sock = _sock.Accept(&clientip, &clientport, &err);if (sock > 0){// 新的sock套接字添加到AddConnetionsAddConnection(sock, EPOLLIN | EPOLLET,bind(&TcpServer::recver, this, placeholders::_1),bind(&TcpServer::sender, this, placeholders::_1),bind(&TcpServer::excepter, this, placeholders::_1));logMessage(DEBUG, "get a new link, info: [%s:%d]", clientip.c_str(), clientport);}else{if (err == EAGAIN || err == EWOULDBLOCK)break;else if (err == EINTR)continue;elsebreak;}}
}
上面处理好了,接下来就是处理普通套接字就绪事件了,假设现在读就绪了,我们先处理读事件。怎么读?注定了也要循环读取!
其次你即使把本来数据全部读完就能保证是一个完整报文吗?不能!不是就没有办法向上层交,也就意味着没有办法对它进行处理,此时我们就先把读到的数据放到对应fd的Connection对象的输入缓存区里
void recver(Connection *conn)
{while (true){char buffer[1024];ssize_t s = read(conn->sock_, buffer, sizeof(buffer) - 1);if (s > 0)//读取成功{buffer[s] = 0;conn->inbuffer_ += buffer;}}
}
如果s==0,就说明对方把连接关掉了,那我们是不是应该把对应的文件描述符从epoll中删除,然后关闭文件描述符,然后从unordered_map中移除等等工作。如果写出问题呢?我们的代码是不是会充满大量的异常调用,所以只要我们设置了对异常的处理,然后有问题的执行一次异常方法就行了。如果s == -1,我们内部再判断是非阻塞返回、是被信号中断、还是着出错误了。
void recver(Connection *conn)
{while (true){char buffer[1024];ssize_t s = read(conn->sock_, buffer, sizeof(buffer) - 1);if (s > 0)//读取成功{buffer[s] = 0;conn->inbuffer_ += buffer;} else if (s == 0){if (conn->excepter_){conn->excepter_(conn);return;}}else{if (errno == EAGAIN || errno == EWOULDBLOCK)//只是没数据了break;else if (errno == EIDRM)//读取时被信号中断,继续读continue;else//真错误了{if (conn->excepter_){conn->excepter_(conn);return;}}}}
}
接下来我们处理一下读取到的报文,如果是完整报文就向上交付,不是就继续读取直到是一个完整报文。
那问题来了你怎么知道是一个完整的报文?
还记得以前在写网络版本计数器是怎么处理的吗?没错定制一个协议!!!
我们以前学过,今天拿过来用就行了 协议封装
#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <jsoncpp/json/json.h>#define SEP " "
#define SEP_LEN strlen(SEP)
#define LINE_SEP "\r\n"
#define LINE_SEP_LEN strlen(LINE_SEP)using namespace std;// "x op y" -> "content_len"\r\n"x op y"\r\n
string Enlenth(const string &text)//加报头
{string send_string = to_string(text.size());send_string += LINE_SEP;send_string += text;send_string += LINE_SEP;return send_string;
}//"content_len"\r\n"x op y"\r\n -> "x op y"
bool Delenth(const string &packge, string *text)//删除报头
{auto pos = packge.find(LINE_SEP);if (pos == string::npos)return false;string text_len_string = packge.substr(0, pos);int text_len = stoi(text_len_string);*text = packge.substr(pos + LINE_SEP_LEN, text_len);return true;
}class Request//请求
{
public:Request() : _x(0), _y(0), _op(0){}Request(int x, int y, char op) : _x(x), _y(y), _op(op){}bool serialize(string *out)//序列化{
#ifdef MYSELF// 结构化 -> "x op y"*out = "";string x_string = to_string(_x);string y_string = to_string(_y);*out += x_string;*out += SEP;*out += _op;*out += SEP;*out += y_string;
#elseJson::Value root;root["first"] = _x;root["second"] = _y;root["oper"] = _op;Json::FastWriter write;*out = write.write(root);
#endifreturn true;}bool deserialize(const string &in)//反序列{
#ifdef MYSELF// "x op y" -> 结构化auto left = in.find(SEP);auto right = in.rfind(SEP);if (left == string::npos || right == string::npos)return false;if (left == right)return false;if (right - (left + SEP_LEN) != 1)return false;string x_string = in.substr(0, left); // [0, 2) [start, end) , start, end - startstring y_string = in.substr(right + SEP_LEN);if (x_string.empty())return false;if (y_string.empty())return false;_x = stoi(x_string);_y = stoi(y_string);_op = in[left + SEP_LEN];
#elseJson::Value root;Json::Reader reader;reader.parse(in, root);_x = root["first"].asInt();_y = root["second"].asInt();_op = root["oper"].asInt();#endifreturn true;}public:int _x;int _y;char _op;
};class Response//响应
{
public:Response() : _exitcode(0), _result(0){}Response(int exitcode, int result) : _exitcode(exitcode), _result(result){}bool serialize(string *out)//序列化{
#ifdef MYSELF// 结构化 -> "_exitcode _result"*out = "";*out = to_string(_exitcode);*out += SEP;*out += to_string(_result);
#elseJson::Value root;root["first"] = _exitcode;root["second"] = _result;Json::FastWriter write;*out = write.write(root);
#endifreturn true;}bool deserialize(const string &in)//反序列化{
#ifdef MYSELF//"_exitcode _result" ->结构化auto pos = in.find(SEP);if (pos == string::npos)return false;string ec_string = in.substr(0, pos);string res_string = in.substr(pos + SEP_LEN);if (ec_string.empty())return false;if (res_string.empty())return false;_exitcode = stoi(ec_string);_result = stoi(res_string);
#elseJson::Value root;Json::Reader reader;reader.parse(in, root);_exitcode = root["first"].asInt();_result = root["second"].asInt();
#endifreturn true;}public:int _exitcode; // 0:计算成功,!0表示计算失败,具体是多少,定好标准int _result; // 计算结果
};//这里是读取一个完整报文!读到后放到text
bool PartOnepackge(string &inbuffer, string *text)
{//"content_len"/r/n"x op y"/r/nauto pos = inbuffer.find(LINE_SEP);if (pos == string::npos) // 没读到一个完整报文return false;//"content_len"/r/n"x op y"/r/n"content_len" >= 一个完整报文长度string text_len_string = inbuffer.substr(0, pos);int text_len = stoi(text_len_string);int total_len = text_len_string.size() + 2 * LINE_SEP_LEN + text_len;cout << "处理前#inbuffer: \n"<< inbuffer << std::endl;if (inbuffer.size() < total_len) // 也没有读到一个完整报文{cout << "你输入的消息,没有严格遵守我们的协议,正在等待后续的内容, continue" << endl;return false;}// 至少有一个完整的报文*text = inbuffer.substr(0, total_len); // 读到一个完整报文inbuffer.erase(0, total_len); // inbuffer内部减去这次读到的一个完整的报文cout << "处理后#inbuffer:\n " << inbuffer << endl;return true;}
现在协议定义好了,我们也可以开始对读到的数据进行处理了!这里我们只想让服务器就处理IO事件,因此设置一个回调函数专门处理里面是否读取到一个完整报文,读到一个完整报文就序列化反序列业务处理等等。
using func_t = function<void(Connection *)>;class TcpServer
{public:TcpServer(func_t f,int port = defaultport, int num = defaultnum) : _service(f),_port(port), _revents(nullptr), _num(num){}//。。。private:uint16_t _port;Sock _sock;Epoller _epoll;unordered_map<int, Connection *> _connections; // 所有链接集合struct epoll_event *_revents;int _num;func_t _service;//业务处理
};
然后读取一次就交给上层一次,让上层进行处理,而recver专心读取数据就行了。
void recver(Connection *conn)
{while (true){char buffer[1024];ssize_t s = read(conn->sock_, buffer, sizeof(buffer) - 1);if (s > 0){buffer[s] = 0;conn->inbuffer_ += buffer;_service(conn);//处理业务}else if (s == 0){if (conn->excepter_){conn->excepter_(conn);return;}}else{if (errno == EAGAIN || errno == EWOULDBLOCK)break;else if (errno == EIDRM)continue;else{if (conn->excepter_){conn->excepter_(conn);return;}}}}
}
如果读取到一个完整报文后就对请求就行处理,没有就返回继续读取,直到读取到完整的报文
void calculate(Connection *conn)
{string onePackage;while (PartOnepackge(conn->inbuffer_, &onePackage)){string req_str;// 1.2 我们保证,我们req_text里面一定是一个完整的请求:"content_len"\r\n"x op y"\r\nif (!Delenth(onePackage, &req_str))return;cout << "去掉报头的正文:\n"<< req_str << endl;// 2. 对请求Request,反序列化// 2.1 得到一个结构化的请求对象Request req;if (!req.deserialize(req_str))return;// 3. 计算机处理,req.x, req.op, req.y --- 业务逻辑// 3.1 得到一个结构化的响应Response resp;cal(req, resp); // req的处理结果,全部放入到了resp, 回调是不是不回来了?不是!}
业务处理我们也写过直接用
void cal(const Request &req, Response &resp)
{ req已经有结构化完成的数据啦,你可以直接使用resp._exitcode = OK;resp._result = OK;switch (req._op){case '+':resp._result = req._x + req._y;break;case '-':resp._result = req._x - req._y;break;case '*':resp._result = req._x * req._y;break;case '/':{if (req._y == 0)resp._exitcode = DIV_ERR;elseresp._result = req._x / req._y;}break;case '%':{if (req._y == 0)resp._exitcode = MOD_ERR;elseresp._result = req._x % req._y;}break;default:resp._exitcode = OPER_ERR;break;}
}
计算处理好了我们对响应序列化,得到一个响应的报文,接下来就是给对方发回去
void calculate(Connection *conn)
{string onePackage;while (PartOnepackge(conn->inbuffer_, &onePackage)){string req_str;// 1.2 我们保证,我们req_text里面一定是一个完整的请求:"content_len"\r\n"x op y"\r\nif (!Delenth(onePackage, &req_str))return;cout << "去掉报头的正文:\n"<< req_str << endl;// 2. 对请求Request,反序列化// 2.1 得到一个结构化的请求对象Request req;if (!req.deserialize(req_str))return;// 3. 计算机处理,req.x, req.op, req.y --- 业务逻辑// 3.1 得到一个结构化的响应Response resp;cal(req, resp); // req的处理结果,全部放入到了resp, 回调是不是不回来了?不是!// 4.对响应Response,进行序列化// 4.1 得到了一个"字符串"string resp_str;if (!resp.serialize(&resp_str))return;// 5 构建成为一个完整的报文string send_string= Enlenth(resp_str);cout << "--------------result: " << send_string << endl;//发回去}}
现在问题是,在epoll中能不能直接发送呢??
不能!用户无法保证发送条件是就绪的!谁最清楚??epoll!!
也就是说不光读事件要添加到epoll中,写事件是否就绪也要添加到epoll里。
什么叫做写发送事件就绪呢?
发送缓存区有空间!
服务器刚开始启动,或者很多情况下,发送事件是一直就绪的!可以直接发送!
只不过,如果我们没有发完怎么办?那就需要下一次发送,这里也就要求每一个sock都要有自己的发送缓存区!
所以根据上面的分析,大部分情况发送缓存区都是有空间的,所以其实可以直接发,有人说直接发那我就循环发,是的可以循环发,如果没发完非阻塞就直接返回错了,下次发就行了,可是下次从哪里发呢,所以就需要sock有自己的发送缓存区。
因为大部分情况下发送缓存区都是就绪的,所以直接发,没发完没关系下次在发,可是下次是什么时候,这次没发完不就是用户从缓存区数据量太大把发送缓冲区打满了,那下次发是不是就要等底层发送缓存区事件就绪我再发。所以注定你要将sock的EPOLLOUT事件也要有时候注册进epoll ,让epoll通知我,如果底层发事件就绪了我再发。现在问题是是什么时候注册?在新sock注册到epoll里的时候就开始关心写了可不可以?
未来事件派发一旦写事件就绪了自动调用sender方法
看起来很完美,但是一般读事件对于epoll我们要常设,写事件的关心对于epoll我们要按需设置!
也就是说当你需要发的时候,确实发送没发完,你在想办法在epoll中设置一下,让epoll关心对于fd的写事件! 刚开始设置sock如果你真的需要关心写事件你才需要设置写事件,不要把写事件常设到epoll中,因为你把它常设了,epoll就会一直在返回,因为每一个文件描述符的发送缓存区大概率都是就绪的,就一直的返回,你的处理逻辑就一直在消耗cpu资源,这样没必要,所以写事件我们按序设置。
所以我们的结论就是,因为是ET模式可以直接发送,发完就ok,没发完就让epoll关心一下对应的写事件的关心,所以我们需要常设EPOLLIN的关心,对EPOLLOUT按需设置!
然后这里呢,可能一次发不完,为了下次继续发送,所以我们把response序列化的之后的结果放到该fd的输出缓存区,以供下次发送。然后调用对应fd的读方法把数据发出去。
void calculate(Connection *conn)
{string onePackage;while (PartOnepackge(conn->inbuffer_, &onePackage)){string req_str;// 1.2 我们保证,我们req_text里面一定是一个完整的请求:"content_len"\r\n"x op y"\r\nif (!Delenth(onePackage, &req_str))return;cout << "去掉报头的正文:\n"<< req_str << endl;// 2. 对请求Request,反序列化// 2.1 得到一个结构化的请求对象Request req;if (!req.deserialize(req_str))return;// 3. 计算机处理,req.x, req.op, req.y --- 业务逻辑// 3.1 得到一个结构化的响应Response resp;cal(req, resp); // req的处理结果,全部放入到了resp, 回调是不是不回来了?不是!// 4.对响应Response,进行序列化// 4.1 得到了一个"字符串"string resp_str;if (!resp.serialize(&resp_str))return;// 5 构建成为一个完整的报文conn->outbuffer_ += Enlenth(resp_str);std::cout << "--------------result: " << conn->outbuffer_ << std::endl;// 直接发if (conn->sender_)conn->sender_(conn);}}
调用sender并不能保证数据一次就发完了,因此如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!
void sender(Connection *conn)
{while (true){ssize_t s = send(conn->sock_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);if (s > 0){conn->outbuffer_.erase(0, s);if (conn->outbuffer_.empty())//发完就退出break;}else{if (errno == EAGAIN || errno == EWOULDBLOCK)//没发完非阻塞退出break;else if (errno == EINTR)continue;else{if (conn->excepter_){conn->excepter_(conn);return;}}}}// 如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!
}
这里我们在设计一个接口,可以打开对于一个文件描述符,是否对它写事件关心,是否对它读事件关心的情况
bool EnableReadWrite(int sock, bool reader, bool writer)
{uint32_t event = (reader ? EPOLLIN : 0) | (writer ? EPOLLOUT : 0) | EPOLLET;//_epoll.Control();
}
Epoller类在设计一个函数,在epoll对对应的fd关心事件进行修改
bool Control(int sock, uint32_t event, int action)
{int n = 0;if (action == EPOLL_CTL_MOD){struct epoll_event ev;ev.events = event;ev.data.fd = sock;n = epoll_ctl(_epfd, action, sock, &ev);}else if (action == EPOLL_CTL_DEL){n = epoll_ctl(_epfd, action, sock, nullptr);//删除就不关心任何事件了}elsen = -1;return n == 0;
}
bool EnableReadWrite(int sock, bool reader, bool writer)
{uint32_t event = (reader ? EPOLLIN : 0) | (writer ? EPOLLOUT : 0) | EPOLLET;_epoll.Control(sock, event, EPOLL_CTL_MOD);
}
void sender(Connection *conn)
{while (true){ssize_t s = send(conn->sock_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);if (s > 0){conn->outbuffer_.erase(0, s);if (conn->outbuffer_.empty())break;}else{if (errno == EAGAIN || errno == EWOULDBLOCK)break;else if (errno == EINTR)continue;else{if (conn->excepter_){conn->excepter_(conn);return;}}}}// 如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!if (!conn->outbuffer_.empty()) // 设置关心该fd读EnableReadWrite(conn->sock_, true, true);elseEnableReadWrite(conn->sock_, true, false);
}
只要我开启了对写事件的关心,epoll就会帮我关心对应fd的写事件,如果就绪了,就会调用对应的fd的sender,以上就是发送的逻辑。
如果对方连接出异常了怎么办呢?不管是读出现异常,还是写出现异常!并且在前面我们是所有异常情况全部转化成读写问题,然后所有读写异常我们都是调用excepter进行处理,换句话说所有异常处理都在这里。
我们处理异常逻辑很简单,我们也关闭对应文件描述符。
void excepter(Connection *conn)
{logMessage(DEBUG, "Excepter begin");//1,先在epoll中删除_epoll.Control(conn->sock_, 0, EPOLL_CTL_DEL);//2. 然后在unordered_map中删除_connections.erase(conn->sock_);//3.关闭fdclose(conn->sock_);logMessage(DEBUG, "关闭%d 文件描述符的所有的资源", conn->sock_);delete conn;
}
到目前为止所有代码细节我们都写完了,对IO的处理+协议定制+业务逻辑都有,下面验证一下,我们看到能够正常通信,并且还有对异常的处理
Epoller封装完整代码
#pragma once#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>
#include "Err.hpp"
#include "Log.hpp"using namespace std;const int defaultepfd = -1;
const int size = 128;class Epoller
{public:Epoller(int epfd = defaultepfd) : _epfd(epfd){}~Epoller(){if (_epfd != defaultepfd)close(_epfd);}public:void Create(){_epfd = epoll_create(size);if (_epfd < 0){logMessage(FATAL, "epoll_create error, code: %d, errstring: %s", errno, strerror(errno));exit(EPOLL_CREATE_ERR);}logMessage(NORMAL, "epoll create success, epfd: %d", _epfd);}// user -> kernelbool AddEvents(int sock, uint32_t event){struct epoll_event ev;ev.events = event;ev.data.fd = sock;int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev);if (n < 0){logMessage(ERROR, "sock join epoll fail");return false;}// logMessage(NORMAL, "sock join epoll success");return true;}// kernel -> userint Wait(struct epoll_event *ets, int num, int timeout){int n = epoll_wait(_epfd, ets, num, timeout);switch (n){case 0:logMessage(NORMAL, "timeout ...");break;case -1:logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));break;default:logMessage(NORMAL, "have event ready");break;}return n;}bool Control(int sock, uint32_t event, int action){int n = 0;if (action == EPOLL_CTL_MOD){struct epoll_event ev;ev.events = event;ev.data.fd = sock;n = epoll_ctl(_epfd, action, sock, &ev);}else if (action == EPOLL_CTL_DEL){n = epoll_ctl(_epfd, action, sock, nullptr);}elsen = -1;return n == 0;}private:int _epfd;
};
服务器完整代码
#pragma once#include <iostream>
#include <unordered_map>
#include <functional>
#include <cassert>
#include "Sock.hpp"
#include "Err.hpp"
#include "Log.hpp"
#include "Epoller.hpp"
#include "Util.hpp"const int defaultport = 8080;
const int defaultsock = -1;
const int defaultnum = 128;class Connection;using func_t = function<void(Connection *)>;class Connection
{
public:Connection(int sock = defaultport) : sock_(sock){}~Connection(){if (sock_ == defaultsock)close(sock_);}void Rigster(func_t r, func_t s, func_t e){recver_ = r;sender_ = s;excepter_ = e;}public:int sock_; // 这个类对应的套接字是谁string inbuffer_; // 输入缓存区,这里我们暂时没有考虑处理图片,视频等二进制信息,不然stirng就不太合适了string outbuffer_; // 输出缓存区 ,你并不能保证你的写事件就绪func_t recver_; // 从sock_读func_t sender_; // 向sock_写func_t excepter_; // 处理sock_,IO的时候上面的异常事件
};class TcpServer
{public:TcpServer(func_t f, int port = defaultport, int num = defaultnum) : _service(f), _port(port), _revents(nullptr), _num(num){}~TcpServer(){if (_revents)delete[] _revents;}void initServer(){// 1.创建套接字_sock.sock();_sock.Bind(_port);_sock.Listen();// 2.创建epoll模型_epoll.Create();// 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞// Util::SetNonBlock(_sock.Fd());// _epoll.AddEvents(_sock.Fd(), EPOLLIN | EPOLLET);AddConnection(_sock.Fd(), EPOLLIN | EPOLLET,bind(&TcpServer::Accepter, this, placeholders::_1), nullptr, nullptr);// 4.拷贝数组_revents = new struct epoll_event[_num];}// 事件派发void Dispatch(){int timeout = -1000;while (true){Loop(timeout);}}private:bool IsConnectionExists(int sock){auto pos = _connections.find(sock);return pos != _connections.end();}void Loop(int &timeout){int n = _epoll.Wait(_revents, _num, timeout); // 获取已经就绪的事件for (int i = 0; i < n; ++i) // 下面一定是就绪的事件{uint32_t event = _revents[i].events; // 就绪的事件int sock = _revents[i].data.fd; // 就绪事件的fd// 将所有的异常问题,全部转化,成为读写问题if (event & EPOLLERR)event | EPOLLIN | EPOLLOUT;if (event & EPOLLHUP)event | EPOLLIN | EPOLLOUT;// _listensock和普通sock都有自己对应的回调方法,因此对_listensock和普通sock处是一样的,不用区分if ((event & EPOLLIN) && IsConnectionExists(sock) && (_connections[sock]->recver_))_connections[sock]->recver_(_connections[sock]);if ((event & EPOLLOUT) && IsConnectionExists(sock) && (_connections[sock]->sender_))_connections[sock]->sender_(_connections[sock]);}}void recver(Connection *conn){while (true){char buffer[1024];ssize_t s = read(conn->sock_, buffer, sizeof(buffer) - 1);if (s > 0){buffer[s] = 0;conn->inbuffer_ += buffer;_service(conn);}else if (s == 0){if (conn->excepter_){conn->excepter_(conn);return;}}else{if (errno == EAGAIN || errno == EWOULDBLOCK)break;else if (errno == EIDRM)continue;else{if (conn->excepter_){conn->excepter_(conn);return;}}}}}bool EnableReadWrite(int sock, bool reader, bool writer){uint32_t event = (reader ? EPOLLIN : 0) | (writer ? EPOLLOUT : 0) | EPOLLET;_epoll.Control(sock, event, EPOLL_CTL_MOD);}void sender(Connection *conn){while (true){ssize_t s = send(conn->sock_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);if (s > 0){conn->outbuffer_.erase(0, s);if (conn->outbuffer_.empty())break;}else{if (errno == EAGAIN || errno == EWOULDBLOCK)break;else if (errno == EINTR)continue;else{if (conn->excepter_){conn->excepter_(conn);return;}}}}// 如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!if (!conn->outbuffer_.empty()) // 设置关心该fd读EnableReadWrite(conn->sock_, true, true);elseEnableReadWrite(conn->sock_, true, false);}void excepter(Connection *conn){logMessage(DEBUG, "Excepter begin");_epoll.Control(conn->sock_, 0, EPOLL_CTL_DEL);_connections.erase(conn->sock_);close(conn->sock_);logMessage(DEBUG, "关闭%d 文件描述符的所有的资源", conn->sock_);delete conn;}void Accepter(Connection *conn){while (true){string clientip;uint16_t clientport;int err = 0; // 获取错误码,用来判断是非阻塞了/读取被打断了/还是真错误了int sock = _sock.Accept(&clientip, &clientport, &err);if (sock > 0){// 新的sock套接字添加到AddConnetionsAddConnection(sock, EPOLLIN | EPOLLET,bind(&TcpServer::recver, this, placeholders::_1),bind(&TcpServer::sender, this, placeholders::_1),bind(&TcpServer::excepter, this, placeholders::_1));logMessage(DEBUG, "get a new link, info: [%s:%d]", clientip.c_str(), clientport);}else{if (err == EAGAIN || err == EWOULDBLOCK)break;else if (err == EINTR)continue;elsebreak;}}}void AddConnection(int sock, uint32_t events, func_t recver, func_t sender, func_t excepter){// 1.设置非阻塞,ET模式fd要非阻塞if (events & EPOLLET)Util::SetNonBlock(sock);// 2. 该sock创建Connection,并初始化,并添加到connections_Connection *conn = new Connection(sock);// 3. 给对应的sock设置对应的回调方法conn->Rigster(recver, sender, excepter);// 4. 其次将sock与它要关心的事件"写透式"注册到epoll中,让epoll帮我们关心bool r = _epoll.AddEvents(sock, events);assert(r);(void)r;// 5. 将kv添加到connections__connections.insert(make_pair(sock, conn));logMessage(DEBUG, "add new sock : %d in epoll and unordered_map", sock);}public:
private:uint16_t _port;Sock _sock;Epoller _epoll;unordered_map<int, Connection *> _connections; // 所有链接集合struct epoll_event *_revents;int _num;func_t _service;
};
调用逻辑完整代码
#include "TcpServer.hpp"
#include "Err.hpp"
#include <memory>
#include "Protocol.hpp"enum
{OK,DIV_ERR,MOD_ERR,OPER_ERR
};void cal(const Request &req, Response &resp)
{ req已经有结构化完成的数据啦,你可以直接使用resp._exitcode = OK;resp._result = OK;switch (req._op){case '+':resp._result = req._x + req._y;break;case '-':resp._result = req._x - req._y;break;case '*':resp._result = req._x * req._y;break;case '/':{if (req._y == 0)resp._exitcode = DIV_ERR;elseresp._result = req._x / req._y;}break;case '%':{if (req._y == 0)resp._exitcode = MOD_ERR;elseresp._result = req._x % req._y;}break;default:resp._exitcode = OPER_ERR;break;}
}void calculate(Connection *conn)
{string onePackage;while (PartOnepackge(conn->inbuffer_, &onePackage)){string req_str;// 1.2 我们保证,我们req_text里面一定是一个完整的请求:"content_len"\r\n"x op y"\r\nif (!Delenth(onePackage, &req_str))return;cout << "去掉报头的正文:\n"<< req_str << endl;// 2. 对请求Request,反序列化// 2.1 得到一个结构化的请求对象Request req;if (!req.deserialize(req_str))return;// 3. 计算机处理,req.x, req.op, req.y --- 业务逻辑// 3.1 得到一个结构化的响应Response resp;cal(req, resp); // req的处理结果,全部放入到了resp, 回调是不是不回来了?不是!// 4.对响应Response,进行序列化// 4.1 得到了一个"字符串"string resp_str;if (!resp.serialize(&resp_str))return;// 5 构建成为一个完整的报文conn->outbuffer_ += Enlenth(resp_str);std::cout << "--------------result: " << conn->outbuffer_ << std::endl;}// 直接发if (conn->sender_)conn->sender_(conn);
}static void usage(std::string proc)
{std::cerr << "Usage:\n\t" << proc << " port" << "\n\n";
}int main(int argc, char *argv[])
{if (argc != 2){usage(argv[0]);exit(USAGG_ERR);}uint16_t port=atoi(argv[1]);std::unique_ptr<TcpServer> uls(new TcpServer(calculate,port));uls->initServer();uls->Dispatch();return 0;
}
总结一下: 这个TcpServer服务器就是传说中的Reactor,对应上服务器上面有一个一个的Connection,未来有哪一个fd就绪了,它就把某一个Connection激活告诉我们事件,Reactor就会进行事件派发。然后去执行对应Connection的读,写,异常方法。这种模式就是Reactor反应堆模式
Reactor,很明显当我们进行事件就绪了它会回调曾经注册的绑定的方法,今天我们的服务器既保证了就绪事件通知,还负责了IO,其实还负责了业务处理。如果今天我们把一个报文构建成一个任务,扔到后端任务队列,让后端线程池帮我们处理后面的任务。此时对应的Reactor只负责读事件就绪+负责IO,不负责业务处理。
只负责读事件就绪+负责IO 这就是就是半同步
只负责业务处理 这就半异步
还有一种服务器少用的Proactor前摄器模式,可以自己了解了解。
如果想把这个服务器改成多多进程多线程,其实可以直接创建多进程,每个进程里都搞一个TcpServer,或者说创建多线程,每一个线程里都搞一个Tcpserver,把_listensock套接字添加到其中一个线程的Reactor里,一旦有连接就绪的时候,不是要执行Accepter吗,执行Accepter的时候就不仅仅是 AddConnection了,而是尝试把这个连接添加到哪一个线程的epoll中, 然后就在这个线程里把这个文件描述符处理完。
最后说一点,我们还可以在Connectoin中设置一个lasttime记录最近访问时间,每一次读或写的时候我们都更新一个对应时间戳,所以只要读或写就绪了就可以更新一下对应Connection最近时间,换句话说此时我们就可以在派发事件后当所有事件处理完了,就可以在unordered_map遍历所有的连接,计算每一个连接已经有多长时间没有动了,因为每一个连接都有自己的最近访问时间,每一次访问都会更新,不更新就是最开始的,所以我们可以获取当前时间在减去Connectoin里保存的历史最近访问时间,计算出时间差,然后就可以所以连接进行连接管理。时间超过5分钟都没有访问过的,服务器就直接把你关掉。
如上就是Reactor全部内容。如上也是Linux的全部内容。总结了40多篇Linux从系统编程到网络编程的文章,内容很丰富!!想说什么也不知道该怎么说,就这样把!
大家下篇文章再见 🙌 🙌 🙌 !