目录
- 理解五种IO模型
- 非阻塞IO的设置
- 多路转接之select
- 实现一个简易的select服务器
- select服务器的优缺点
- 多路转接之poll
- 实现一个简易的poll服务器
- poll服务器的优缺点
- 多路转接之epoll
- epoll原理
- epoll的优势
- 用epoll实现一个简易的echo服务器
- epoll的LT和ET工作模式
- 什么是LT和ET
- 实现一个简易的reactor服务器
五种IO模型
如何理解五种IO模型:
应用层调用read/recv和write/send的时候,本质就是数据在用户层和操作系统的缓冲区的拷贝交互。要进行拷贝,必须先判断条件成立,该条件就是读写事件就绪。如果条件不成立,这些函数就会被阻塞住,等待资源就绪。
所以输入输出IO可以分为两个步骤:等待 + 拷贝
给大家举个例子帮助大家理解五种IO模型:
钓鱼 = 等待 + 钓(动作)
河边的张三在钓鱼,张三的钓鱼动作:一直盯着鱼漂,直到鱼漂抖动就说明有鱼来了。
李四也在钓鱼,李四的钓鱼动作:一边做自己的事,一边时不时观察鱼漂。
王五有个高级鱼竿,鱼漂上带着铃铛,王五的钓鱼动作:一直做自己的事,铃铛鱼漂响了,就去钓鱼
赵六买了20个鱼竿钓鱼,赵六的钓鱼动作:一直检查每个鱼漂,任何一个鱼漂动了就钓鱼。
小王开车载田七去公司的时候,田七路过的时候看见别人钓鱼,突然自己想吃野生鱼了,就叫小王去钓鱼,钓到了就给田七送过去。
人:进程/线程。鱼:数据。河:内核空间。鱼漂:数据就绪的事件。鱼竿:文件描述符(都是通过文件描述符进行操作)。钓鱼:recv/read
张三对应的是阻塞式IO。李四对应的是非阻塞IO,王五对应的是信号阻塞式IO(其中王五提前是知道铃铛响了该怎么办,该钓鱼),赵六对应的是多路转接/多路复用IO。田七对应的是异步IO(小王(操作系统)有鱼了告诉就告诉田七,田七直接用即可)
在这几个人中,你会觉得哪个人的钓鱼效率是最高的(钓的鱼最多的)?很明显是赵六,因为河中的鱼对每个钩子的咬钩概率是相同的,而赵六的钩子占比是最大的,所以必定单位时间内钓鱼的数量是最多的。
阻塞式IO:在内核将数据准备好之前,系统调用会一直等待,所有的文件描述符,默认都是阻塞方式。阻塞IO是最常见的IO模型,因为简单易上手。
非阻塞IO:如果内核还未将数据准备好,系统调用仍然会直接返回,并且返回EWOULDBLOCK错误码。非阻塞IO往往需要程序员循环的方式反复尝试读写该文件描述符,这个过程称为轮询,这对CPU来说是较大的浪费,一般只有特定场景下才使用。
信号驱动IO:内核将数据准备好的时候,使用SIGIO信号通知应用程序进行IO操作
IO多路转接:虽然从流程上看起来和阻塞IO类似,实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态
异步IO:由内核在数据拷贝完成时,通知应用程序(而信号驱动是告诉进程何时可以开始拷贝数据,需要进程自己拷贝)
什么叫做高效IO呢?
任何IO过程,都包含两个步骤,等待 + 拷贝。而且在实际的应用场景中,等待消耗的时间往往都远远高于拷贝的时间。让IO更高效,最核心的办法就是让等待的时间尽量少。在单位时间内,IO过程中,等待对于拷贝的比重越小,IO效率越高。
阻塞式IO和非阻塞IO有什么区别?
区别:等待时的状态不同。
阻塞调用是指调用结果返回之前,当前线程会被挂起,调用线程只有在到得到结果之后才会返回。
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
同步IO和异步IO有什么区别?
同步,就是在发出一个调用时,在没有得到该调用的结果之前,该调用就不会返回。但是一旦调用返回,就得到返回值了。换句话说,就是有调用者主动等待这个调用结果。
异步,调用在发出之后,这个调用就直接返回,所以没有返回结果。直到被动的通过状态/信号来通知来处理
非阻塞IO的设置
如何设置非阻塞IO?
以前我们在写网络编程的时候,recv的flag选项填的是0
如果我们想非阻塞等待,则可以设flags为MSG_DONTWAIT。也可以用open非阻塞的方式打开,方法有很多。
但提供一种更通用的做法,文件描述符本质就是下标,每一个下标指向的就是内核里的文件对象,文件对象中是有文件描述符标志的。
传入的cmd值不同,后面追加的参数也不相同
fcntl函数有5种功能:
复制一个现有的描述符(cmd = F_DUPFD)
获得/设置文件描述符标记(cmd = F_GETFD或F_SETFD)
获得/设置文件状态标记(cmd = F_GETFL或F_SETFL)
获得/设置异步I/O所有权(cmd = F_GETOWN或F_SETOWN)
获得/设置记录锁(cmd = F_GETLK, F_SETLK或F_SETLKW)
实现非阻塞轮询方式读取标准输入
#include <iostream>
#include <unistd.h>
#include <fcntl.h>void SetNoBlock(int fd)
{int flag = fcntl(fd, F_GETFL);fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}int main()
{SetNoBlock(0);char buffer[4096];while (1){std::cout << "Enter#";fflush(stdout);int n = read(0, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;std::cout << buffer << std::endl;}else if (n == 0){std::cout << "read done" << std::endl;break;}else{std::cout << "read error: " << strerror(errno) <<std::endl;break;}}return 0;
}
为什么还没有输入就直接读出错了?错误信息是资源暂时还没有就绪。
结论:
设置成为非阻塞,如果底层fd数据没有就绪,recv/read/write/send,返回值会以出错的形式返回。这样会有两种情况:a、真的出错 b、底层资源没有就绪
我们怎么区分呢? – 通过errno区分
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>void SetNoBlock(int fd)
{int flag = fcntl(fd, F_GETFL);fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}int main()
{SetNoBlock(0);char buffer[4096];while (1){int n = read(0, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;std::cout << buffer << std::endl;}else if (n == 0){std::cout << "read done" << std::endl;break;}else{if (errno == EWOULDBLOCK){//只是底层没有数据就绪,这种错误是可以容忍的//do_other_thing(); //也可以做其他事情,设置一些方法,就不演示了continue;}std::cout << "read error: " << strerror(errno) <<std::endl;break;}}return 0;
}
多路转接之select
select是干什么的?select负责多路转接IO中的等待,一次可以等待多个文件描述符,并不负责拷贝。
select系统调用是用来让我们的程序监视多个文件描述符的状态变化的,程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态变化。
nfds:填写要监视的所有要监视的fd中的最大值 + 1。这与底层内核有关系,内核会从0一直遍历到nfds。
timeout:为输入输出型参数
如果设置为nullptr,则为阻塞式。
若设置timeout = {0, 0};为非阻塞,会一直函数返回。
若设置timeout = {5, 0};如果一直收不到消息:阻塞5s,每过5s就超时(函数返回)一次,这样反复循环。如果过2s收到了消息,则该参数会返回{3, 0},表示还有3s才超时。
return返回值:大于0,有几个fd就绪了。等于0,超时/非阻塞返回了。小于0,select调用失败了
readfds:为输入输出参数
做输入参数时:表示用户告诉内核,你要帮我关心的文件描述符的读,fd_set表示位图,可以通过位图设置多个文件描述符。
做输出参数时:表示内核告诉用户,你要关心的文件描述中有哪些已经就绪了。
writefds和exceptfds同readfds参数
该位图既然是一种类型,必然有大小,所以能够想关心的fd的个数一定是有上限的。
int main()
{
std::cout << sizeof(fd_set) << std::endl;
return 0;
}
128 * 8 = 1024bit,最多只能关心1024个fd。bit位的位置代表fd是多少,为0则代表没设置该fd,为1则代表设置了该fd。
实现一个简易的select服务器
为了节省篇幅,后面再贴完整代码
#include "Socket.hpp"class SelectServer
{
public:SelectServer(uint16_t port):_port(port){}void Init(){_listensock.CreateSocket();_listensock.Setsockopt(); //设置端口复用_listensock.Bind(_port);_listensock.Listen();}void Start(){while (1){//能否直接调用accept?不能直接accept,因为会阻塞!我们的目标就是写多路转接IO,所以不能出现阻塞}}~SelectServer(){_listensock.Close();}
private:Socket _listensock;uint16_t _port;
};
更新代码
#include "Socket.hpp"class SelectServer
{
public:SelectServer(uint16_t port):_port(port){}void Init(){_listensock.CreateSocket();_listensock.Setsockopt();//设置地址复用_listensock.Bind(_port);_listensock.Listen();}void HandleEvent(){}void Start(){while (1){//能否直接调用accept?不能直接accept,因为会阻塞!fd_set readfd;FD_SET(_listensock.Fd(), &readfd);struct timeval timeout = {3, 0};int n = select(_listensock.Fd() + 1, &readfd, nullptr, nullptr, &timeout);//暂时这样设置if (n < 0){std::cout << "select error: " << strerror(errno) << std::endl;break;}if (n == 0){lg(Debug, "[%d: %d]", timeout.tv_sec, timeout.tv_usec);}else{std::cout << "Get a New Link" << std::endl;sleep(2);HandleEvent();}}}~SelectServer(){_listensock.Close();}
private:Socket _listensock;uint16_t _port;
};
运行结果如下:
timeout设置为{3, 0}可以看到select每隔3s就超时一次,同理如果timeout设置为{0, 0},则会一直超时,如果设置为null,则会阻塞。
客户端连接上后,我们也看到了,他一直在打印Get a New Link,为什么一直打印?因为上层没有把底层数据拿上去处理,所以select就会一直提醒事件就绪了。
怎么处理?
更新代码
#include "Socket.hpp"class SelectServer
{
public:SelectServer(uint16_t port):_port(port){}void Init(){_listensock.CreateSocket();_listensock.Setsockopt();_listensock.Bind(_port);_listensock.Listen();}void HandleEvent(fd_set* rfd){//代码走到这里就说明我们的连接事件就绪了if (FD_ISSET(_listensock.Fd(), rfd) == true){string clientIp; uint16_t clientPort;int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了if (newSockfd < 0){return;}lg(Info, "accept success: %s, %d\n", clientIp.c_str(), clientPort);}}void Start(){while (1){//能否直接调用accept?不能直接accept,因为会阻塞!fd_set readfd;FD_SET(_listensock.Fd(), &readfd);struct timeval timeout = {3, 0};int n = select(_listensock.Fd() + 1, &readfd, nullptr, nullptr, /*&timeout*/0);if (n < 0){std::cout << "select error: " << strerror(errno) << std::endl;break;}if (n == 0){lg(Debug, "[%d: %d]", timeout.tv_sec, timeout.tv_usec);}else{std::cout << "Get a New Link" << std::endl;sleep(1);HandleEvent(&readfd);}}}~SelectServer(){_listensock.Close();}
private:Socket _listensock;uint16_t _port;
};
代码运行结果:
更新HandleEvent函数的代码
void HandleEvent(fd_set* rfd)
{//代码走到这里就说明我们的连接事件就绪了if (FD_ISSET(_listensock.Fd(), rfd) == true){string clientIp; uint16_t clientPort;int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了if (newSockfd < 0){return;}lg(Info, "accept success: %s, %d\n", clientIp.c_str(), clientPort);char buffer[4096];ssize_t n = read(newSockfd, buffer, sizeof(buffer) - 1);/*我们已经得到了新的sock,可以直接用read接口吗?不可以,因为我们这里是单线程/单进程,直接read可能会被阻塞住,以前我们的代码是直接托管给了多线程/多进程,他们的阻塞不会影响主进程。我们这个单进程应该将新到来的newSocketfd交给select,让select帮我们来管理*/}
}
所以此时,我们需要将HandleEvent函数里面的newSocketfd传递给Start函数。如何做呢?可以使用数组作为该类的成员 – 这个数组也叫做辅助数组,这也是select最大的特点之一:使用辅助数组,让文件描述符在函数之间互相传递。
#include "Socket.hpp"class SelectServer
{static const int MaxFdNum = sizeof(fd_set) * 8; //因为位图fd_set有大小,所以最多只能管理1024个fdstatic const int DefaultNum = -1;
public:SelectServer(uint16_t port):_port(port){//初始化辅助数组for (int i = 0; i < MaxFdNum; ++i){read_fd_array[i] = DefaultNum;}}void Init(){_listensock.CreateSocket();_listensock.Setsockopt();_listensock.Bind(_port);_listensock.Listen();}void HandleEvent(fd_set* rfd){for (int i = 0; i < MaxFdNum; ++i)//遍历所有的read_fd_array中是否有满足条件FD_ISSET{int socket = read_fd_array[i];if (socket == DefaultNum)continue;if (FD_ISSET(socket, rfd) == true && _listensock.Fd() == socket){//代码走到这里就说明我们的连接事件就绪了string clientIp; uint16_t clientPort;int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了if (newSockfd < 0){return;}lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);//添加新的文件描述符到辅助数组for (int i = 0; i < MaxFdNum; ++i){if (read_fd_array[i] == DefaultNum){if (_maxfd < newSockfd)_maxfd = newSockfd;read_fd_array[i] = newSockfd;break;}if (i == MaxFdNum - 1){lg(Warning, "server is full, close sock: %d", newSockfd);close(newSockfd);}}}else if (FD_ISSET(socket, rfd) == true){//说明是其他的套接字读事件就绪了char buffer[4096];int n = read(socket, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;lg(Info, "client say:%s", buffer);}else if (n == 0){//说明对端把连接关闭了lg(Info, "client closed...");//把该socket就可以移除关心状态了read_fd_array[i] = DefaultNum;close(socket);}else{//读出错了lg(Error, "read error");//把该socket就可以移除关心状态了read_fd_array[i] = DefaultNum;close(socket);}}}}void Start(){//将lisntensockfd设置进辅助数组read_fd_array[0] = _listensock.Fd();_maxfd = read_fd_array[0];while (1){//因为select的参数为输入输出参数,所以我们每次调用select的时候都需要重新设置一遍select的参数,所以需要将下面的语句写入循环里fd_set readfd;FD_ZERO(&readfd); //一定要设置为0,否则可能会出现select失败的问题for (int i = 0; i < MaxFdNum; ++i){if (read_fd_array[i] != DefaultNum){if (_maxfd < read_fd_array[i]) _maxfd = read_fd_array[i];//将辅助数组中所要关心的文件描述符全部都设置进去FD_SET(read_fd_array[i], &readfd);}}int n = select(_maxfd + 1, &readfd, nullptr, nullptr, 0);//为了易于观察,我们将timout参数设置为0if (n < 0){std::cout << "select error: " << strerror(errno) << std::endl;break;}if (n == 0){//超时返回(非阻塞返回)lg(Info, "timeout...");}else{HandleEvent(&readfd);}}}~SelectServer(){_listensock.Close();}
private:int _maxfd; //最大的文件描述符是什么Socket _listensock;uint16_t _port;int read_fd_array[MaxFdNum]; //记录的是需要关心的读事件的文件描述符,-1表示不关心,非-1表示需要关心的文件描述符是什么// int write_fd_array[MaxFdNum]; //仅为了理解多路转接IO,不考虑这两种情况,情况简单化,只考虑读事件// int except_fd_array[MaxFdNum];
};
代码运行结果:
注意,我们的可是单进程哦。实现了并发处理多个请求。
整理代码,完整代码(允许结果和上面演示效果相同):
Log.hpp文件 – 往期文章实现过
#pragma once
#include <iostream>
#include <string>
#include <stdarg.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>using namespace std;#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4#define Screen 1
#define Onefile 2
#define Classfile 3#define fileName "log.txt"
//使用前需要创建log目录
class Log
{
public:Log(){printMethod = Screen;path = "./log/";}void Enable(int method){printMethod = method;}void printOneFile(string logname, const string& logtxt){logname = path + logname;int fd = open(logname.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666);//open只会创建文件不会创建目录if (fd < 0){perror("open failed");return;}write(fd, logtxt.c_str(), logtxt.size());close(fd);}void printClassFile(int level, const string& logtxt){string filename = fileName;filename += ".";filename += leveltoString(level);printOneFile(filename, logtxt);}void printLog(int level, const string& logtxt){if (printMethod == Screen){cout << logtxt << endl;return;}else if (printMethod == Onefile){printOneFile(fileName, logtxt);return;}else if (printMethod == Classfile){printClassFile(level, logtxt);return;}}const char* leveltoString(int level){if (level == Info) return "Info";else if (level == Debug) return "Debug";else if (level == Error) return "Error";else if (level == Fatal) return "Fatal";else return "default";}void operator()(int level, const char* format, ...){time_t t = time(nullptr);struct tm* st = localtime(&t);char leftbuffer[4096];snprintf(leftbuffer, sizeof(leftbuffer), "year: %d, month: %d, day: %d, hour: %d, minute: %d, second: %d\n\[%s]:",st->tm_year + 1900, st->tm_mon + 1, st->tm_mday, st->tm_hour, st->tm_min, st->tm_sec, leveltoString(level));char rightbuffer[4096];va_list start;va_start(start, format);vsnprintf(rightbuffer, sizeof(rightbuffer), format, start);va_end(start);char logtxt[4096 * 2];snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);printLog(level, logtxt);}
private:int printMethod;string path;//路径与文件名解耦,最后将路径和文件粘合起来,再用open打开即可
};
Sock.hpp文件 – 往期文章实现过
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <cstring>
#include "Log.hpp"Log lg;
class Socket
{
public:Socket(){}~Socket(){}void CreateSocket(){_sockfd = socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){lg(Fatal, "create socket failed:%s", strerror(errno));exit(1);}}void Bind(uint16_t serverPort){struct sockaddr_in server;server.sin_family = AF_INET;server.sin_port = htons(serverPort);server.sin_addr.s_addr = INADDR_ANY;int n = bind(_sockfd, (struct sockaddr*)&server, sizeof(server));if (n < 0){lg(Fatal, "bind socket failed:%s", strerror(errno));exit(2);}}void Listen(){int n = listen(_sockfd, 5);if (n < 0){lg(Fatal, "listen socket failed:%s", strerror(errno));exit(3);}}int Accept(string* clientIp, uint16_t* clinetPort){struct sockaddr_in peer;socklen_t len = sizeof(peer);int newsocket = accept(_sockfd, (struct sockaddr*)&peer, &len);if (newsocket < 0){lg(Error, "accept error:%s", strerror(errno));return -1;}*clientIp = inet_ntoa(peer.sin_addr);*clinetPort = ntohs(peer.sin_port);return newsocket;}int Connect(const string& serverIp, uint16_t serverPort){struct sockaddr_in server;server.sin_family = AF_INET;server.sin_addr.s_addr = inet_addr(serverIp.c_str());server.sin_port = htons(serverPort);int n = connect(_sockfd, (struct sockaddr*)&server, sizeof(server));if (n < 0){lg(Error, "connect error:%s", strerror(errno));return -1;}return 0;}void Setsockopt(){int opt = 1;if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (const void*)&opt, sizeof(opt)) < 0)lg(Error, "%s\n", strerror(errno));}void Close(){close(_sockfd);}int Fd(){return _sockfd;}
private:int _sockfd;
};
SelectServer.hpp文件
#include "Socket.hpp"class SelectServer
{static const int MaxFdNum = sizeof(fd_set) * 8; //因为位图fd_set有大小,所以最多只能管理1024个fdstatic const int DefaultNum = -1;
public:SelectServer(uint16_t port):_port(port){//初始化辅助数组for (int i = 0; i < MaxFdNum; ++i){read_fd_array[i] = DefaultNum;}}void Init(){_listensock.CreateSocket();_listensock.Setsockopt();_listensock.Bind(_port);_listensock.Listen();}void Recver(int socket, int pos){char buffer[4096];int n = read(socket, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;lg(Info, "client say:%s", buffer);}else if (n == 0){//说明对端把连接关闭了lg(Info, "client closed...");//把该socket就可以移除关心状态了read_fd_array[pos] = DefaultNum;close(socket);}else{//读出错了lg(Error, "read error");//把该socket就可以移除关心状态了read_fd_array[pos] = DefaultNum;close(socket);}}void Acceptor(){string clientIp; uint16_t clientPort;int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了if (newSockfd < 0){return;}lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);//添加新的文件描述符到辅助数组for (int i = 0; i < MaxFdNum; ++i){if (read_fd_array[i] == DefaultNum){if (_maxfd < newSockfd)_maxfd = newSockfd;read_fd_array[i] = newSockfd;break;}if (i == MaxFdNum - 1){lg(Warning, "server is full, close sock: %d", newSockfd);close(newSockfd);}}}void Dispatcher(fd_set* rfd) //事件派发器{for (int i = 0; i < MaxFdNum; ++i)//遍历所有的read_fd_array中是否有满足条件FD_ISSET{int socket = read_fd_array[i];if (socket == DefaultNum)continue;if (FD_ISSET(socket, rfd) == true && _listensock.Fd() == socket){//代码走到这里就说明我们的连接事件就绪了Acceptor();}else if (FD_ISSET(socket, rfd) == true){//说明是其他的套接字读事件就绪了Recver(socket, i);}}}void EventLoop() //事件循环{//将lisntensockfd设置进辅助数组read_fd_array[0] = _listensock.Fd();_maxfd = read_fd_array[0];while (1){//因为select的参数为输入输出参数,所以我们每次调用select的时候都需要重新设置一遍select的参数,所以需要将下面的语句写入循环里fd_set readfd;FD_ZERO(&readfd); //一定要设置为0,否则可能会出现select失败的问题for (int i = 0; i < MaxFdNum; ++i){if (read_fd_array[i] != DefaultNum){if (_maxfd < read_fd_array[i]) _maxfd = read_fd_array[i];//将辅助数组中所要关心的文件描述符全部都设置进去FD_SET(read_fd_array[i], &readfd);}}int n = select(_maxfd + 1, &readfd, nullptr, nullptr, 0);//为了易于观察,我们将timout参数设置为0if (n < 0){std::cout << "select error: " << strerror(errno) << std::endl;break;}if (n == 0){//超时返回(非阻塞返回)lg(Info, "timeout...");}else{Dispatcher(&readfd); //事件派发器}}}~SelectServer(){_listensock.Close();}
private:int _maxfd; //最大的文件描述符是什么Socket _listensock;uint16_t _port;int read_fd_array[MaxFdNum]; //记录的是需要关心的读事件的文件描述符,-1表示不关心,非-1表示需要关心的文件描述符是什么// int write_fd_array[MaxFdNum]; //仅为了理解多路转接IO,不考虑这两种情况,情况简单化,只考虑读事件// int except_fd_array[MaxFdNum];
};
main.cc文件
#include "SelectServer.hpp"int main()
{SelectServer svr(8080);svr.Init();svr.EventLoop();return 0;
}
select服务器的优缺点
优点:能实现成多路转接的服务器,即单进程也能处理多用户的请求
缺点:
1.同时等待的fd是有上限的,因为位图fd_set是有大小的。
2.用户必须借助第三方数组来维护合法的fd
3.使用select接口设置参数时,每次都要对关心的fd进行事件重置。
4.数据拷贝的频率比较高,select函数需要每次都需要重新设置参数,每传一次位图fd_set内核就需要拷贝一次。
5.如果只有一个fd就绪,用户层也需要全部遍历(虽然可以改进但没必要)
6.内核中检测位图fd_set的事件就绪也要遍历(这也就是为什么select的第一个参数传的是文件描述符的最大值,因为内核要以这个范围来遍历)
多路转接之poll
为了解决上面一些的问题,poll就出现了
fds:struct pollfd数组的首地址
nfds:数组元素的个数
timeout:设置超时时间,单位是ms
填0:非阻塞等待
小于0:表示阻塞等待
大于0:每过timeout就超时一次。(理解同select)
fd:你要让内核关心的某一个fd
events:输入参数,用户让内核关心fd的事件
revents:输出参数,内核告诉用户,你要关心的fd哪些事件就绪了
从这里看出来了,poll将输入和输出参数分离,所以poll不需要像select一样每次都需要将参数重新设定。
short是一个16bit的位图,events和revents的取值如下:
事件 | 描 述 | 是否可作为输入 | 是否可作为输出 |
---|---|---|---|
POLLIN | 数据(包括普通数据和优先数据)可读 | 是 | 是 |
POLLRDNORM | 普通数据可读 | 是 | 是 |
POLLRDBAND | 优先级带数据可读(Linux不支持) | 是 | 是 |
POLLPRI | 高优先级数据可读,比如TCP带外数据 | 是 | 是 |
POLLOUT | 数据(包括普通数据和优先数据)可写 | 是 | 是 |
POLLWRNORM | 普通数据可写 | 是 | 是 |
POLLWRBAND | 优先级带数据可写 | 是 | 是 |
POLLRDHUP | POLLRDHUPTCP连接被对方关闭,或者对方关闭了写操作。它GNU 引入 | 是 | 是 |
POLLERR | 错误 | 否 | 是 |
POLLHUP | 挂起。比如管道的写端被关闭后,读端描述符上将收到POLLHUP事件 | 否 | 是 |
POLLNVAL | 文件描述符没有打开 | 否 | 是 |
select只分为了读、写、异常事件,而poll将fd的事件分的更细了。
实现一个简易的poll服务器
参考代码:
Socket.hpp、Log.hpp文件同上
PollerServer.hpp文件
#include "Socket.hpp"
#include <poll.h>
#include <vector>class PollServer
{static const int DefaultNum = -1;
public:PollServer(uint16_t port):_port(port){}void Init(){_listensock.CreateSocket();_listensock.Setsockopt();_listensock.Bind(_port);_listensock.Listen();}void Recver(int socket, int pos){char buffer[4096];int n = read(socket, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;lg(Info, "client say:%s", buffer);}else if (n == 0){//说明对端把连接关闭了lg(Info, "client closed...");//把该socket就可以移除关心状态了_event_fds.erase(_event_fds.begin() + pos);close(socket);}else{//读出错了lg(Error, "read error");//把该socket就可以移除关心状态了_event_fds.erase(_event_fds.begin() + pos);close(socket);}}void Acceptor(){string clientIp; uint16_t clientPort;int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了if (newSockfd < 0){return;}lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);//添加新的文件描述符到辅助数组struct pollfd event;event.fd = newSockfd;event.events |= POLLIN;_event_fds.push_back(move(event));}void Dispatcher() //事件派发器{for (int i = 0; i < _event_fds.size(); ++i)//遍历所有的_event_fds中是否有revents事件就绪的{int socket = _event_fds[i].fd;if ((_event_fds[i].revents & POLLIN) == true){if (_listensock.Fd() == socket){//代码走到这里就说明我们的连接事件就绪了Acceptor();}else{//说明是其他的套接字读事件就绪了Recver(socket, i);}}else if ((_event_fds[i].revents & POLLOUT) == true){//写事件就绪}else{//...}}}void EventLoop() //事件循环{//将lisntensockfd设置进辅助数组struct pollfd event;event.fd = _listensock.Fd();event.events |= POLLIN;_event_fds.push_back(move(event)); //将listensock添加到_event_fds数组里while (1){int n = poll(_event_fds.data(), _event_fds.size(), -1); //易于观察就设置为-1if (n < 0){std::cout << "poll error: " << strerror(errno) << std::endl;break;}if (n == 0){//超时返回(非阻塞返回)lg(Info, "timeout...");}else{Dispatcher(); //事件派发器}}}~PollServer(){_listensock.Close();}
private:Socket _listensock;uint16_t _port;std::vector<struct pollfd> _event_fds;
};
main.cc文件
#include "PollServer.hpp"int main()
{PollServer svr(8080);svr.Init();svr.EventLoop();return 0;
}
运行结果
poll服务器的优缺点
优点:
pollfd结构包含了要监视的event事件和就绪的revent事件,输入与输出分离了,接口使用比select方便。
poll解除了fd有上限的问题,数组为vector了,vector最大能有多大取决于操作系统了,不关poll函数的事了。
缺点:
用户层和内核层都需要遍历该数组,都是o(n)的效率,如果用户要关心的fd有上万个,频繁的遍历会影响效率问题。
每次调用poll,都需要把大量的pollfd结果从用户态拷贝到内核中。
有大量fd,若只有少量的fd处于就绪状态,也需要全部线性遍历一遍,随着监视描述符数量的增长,其效率也会线性下降。
总结:解决了select的1、3问题
多路转接之epoll
按照man手册的说法:是为处理大批量的socketfd而作了改进的poll。它是在2.5.44内核中被引进的,linux2.6下,它几乎具备了之前所说的一切优点,被公认为性能最好的多路转接IO。现在只要涉及服务器组件的,都用的是epoll。
size:该参数已经被废弃,但是要填大于0的值
该函数会创建一个epoll模型,返回epoll模型的文件描述符,什么是epoll模型,后面会讲。
epfd:epoll模型的描述符
events:输出型参数,struct epoll_event数组的首地址
maxevents:该数组的个数
timeout:意义同epoll参数的timeout
返回值和select、poll一样
fd:当事件就绪时,能知道该事件是哪个fd就绪了events可以是以下几个宏的集合
事件 | 描 述 |
---|---|
EPOLLIN | 表示对应的文件描述符可以读(包括对端SOCKET正常关闭) |
EPOLLOUT | 表示对应的文件描述符可以写 |
POLLPRI | 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来) |
POLLERR | 表示示对应的文件描述符发生错误 |
POLLPRI | 表示对应的文件描述符被挂断 |
POLLPRI | 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的 |
EPOLLONESHOT | 只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里 |
epfd:epoll模型
op、fd、event:用户告诉内核,要对fd的event事件进行op处理
op:
EPOLL_CTL_ADD添加事件
EPOLL_CTL_MOD修改事件
EPOLL_CTL_DEL删除事件
epoll解决了select的所有缺点,epoll是怎样设计的呢?它的原理是什么?
epoll原理
如图所示,内核中的三个部分(红黑树,回调函数,就绪队列)组成了epoll模型。epoll模型并且由struct_file结构体所指向,想管理epoll模型,就能通过fd找到epoll模型。
红黑树管理已注册的文件描述符:epoll_ctl对红黑树的增删改操作时间复杂度为O(logN)
就绪事件的通知机制:每当有fd就绪了,网卡驱动就会通知(调用回调函数)epoll,并执行内核曾经注册好的回调函数。有了这个就绪事件通知机制,内核就不用再O(N)的遍历所有的文件描述符是否已经就绪
就绪列表的管理:epoll_wait所捞取的都是已经就绪的
epoll的优势
1.用户层不再需要每次调用接口时都将关心fd拷贝给内核了
2.管理fd个数没有上限
3.不再需要辅助数组了,因为内核中的红黑树就代替了曾经用户要管理的数组
4.内核中通过事件通知机制O(1)就能将就绪fd放入就绪队列,不再需要遍历所有的fd,来判断是否就绪
5.对事件的管理更方便,只需要对epoll_ctl进行操作
select和poll的底层如图所示
用epoll实现一个简易的echo服务器
参考代码:
Log.hpp文件 和 Socket.hpp文件和之前一样
Epoller.hpp文件
#include <sys/epoll.h>
#include "Log.hpp"
#include <cstring>class Epoller
{
public:Epoller(){}void EpollerCreate(){_epollfd = epoll_create(64);if (_epollfd < 0){lg(Fatal, "Create epoll failed: %s", strerror(errno));abort();}}void EpollerUpate(int oper, int fd, uint32_t event){int n = 0;if (oper == EPOLL_CTL_ADD || oper == EPOLL_CTL_MOD){struct epoll_event epEvent;epEvent.data.fd = fd;epEvent.events = 0;epEvent.events |= event;n = epoll_ctl(_epollfd, oper, fd, &epEvent);if (n < 0){lg(Error, "epoll ctl failed: %s", strerror(errno));}}else if (oper == EPOLL_CTL_DEL){n = epoll_ctl(_epollfd, oper, fd, 0);if (n < 0){lg(Error, "epoll ctl failed: %s", strerror(errno));}}}int Wait(struct epoll_event* events, int n, int timeout){int m = epoll_wait(_epollfd, events, n, timeout);if (m < 0){lg(Error, "epoll wait failed: %s", strerror(errno));}else if (m == 0){//超时返回(非阻塞返回)lg(Info, "timeout...");}return m;}int Fd(){return _epollfd;}
private:int _epollfd;
};
nocpy.hpp文件
//编程技巧:写防拷贝的类的时候直接继承,就不用自己再手动写了
class nocpy
{
public:nocpy(){}~nocpy(){}
private:nocpy(const nocpy&) = delete;nocpy& operator=(const nocpy&) = delete;
};
EpollServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>class EpollServer : public nocpy
{static const int num = 64; //一次性最多捞取上来多少个就绪fd, 一次捞取不完下一次可以继续捞取
public:EpollServer(uint16_t port):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->Listen();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());_epoller_ptr->EpollerCreate();lg(Info, "create epoller success");}void Recver(int socket, int pos){char buffer[4096];int n = read(socket, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;lg(Info, "client say:%s", buffer);string s = "server echo:";s += buffer;int m = write(socket, s.c_str(), s.size());if (m < 0){lg(Error, "write failed: %s", strerror(errno));}}else if (n == 0){//说明对端把连接关闭了lg(Info, "client closed...");//把该socket就可以移除关心状态了_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, socket, 0);close(socket);}else{//读出错了lg(Error, "read error");//把该socket就可以移除关心状态了_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, socket, 0);close(socket);}}void Acceptor(){string clientIp; uint16_t clientPort;int newSockfd = _listensock_ptr->Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了if (newSockfd < 0){return;}lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);//添加新的文件描述符到Epoller里面_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, newSockfd, EPOLLIN);}void Dispatcher(struct epoll_event* recvs, int n) //事件派发器{for (int i = 0; i < n; ++i)//recvs数组里面全部都是就绪的fd{int socket = recvs[i].data.fd;if ((recvs[i].events & EPOLLIN) == true){if (_listensock_ptr->Fd() == socket){//代码走到这里就说明我们的连接事件就绪了Acceptor();}else{//说明是其他的套接字读事件就绪了Recver(socket, i);}}else if ((recvs[i].events & EPOLLOUT) == true){//写事件就绪}else{//...}}}void EventLoop() //事件循环{//添加_listensock到Epoller里面_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, _listensock_ptr->Fd(), EPOLLIN);struct epoll_event recvs[num];while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //易于观察就设置为-1Dispatcher(recvs, n); //捞取上来了n个fd}}~EpollServer(){_listensock_ptr->Close();}
private:std::shared_ptr<Socket> _listensock_ptr; //智能指针优势:1.可共享拷贝资源 - 不怕浅拷贝 2.灵活的生命周期,可随时释放 3.可延迟初始化 4.异常安全std::shared_ptr<Epoller> _epoller_ptr;uint16_t _port;
};
main.cc文件
#include "EpollServer.hpp"int main()
{std::unique_ptr<EpollServer> svr(new EpollServer(8080 ));svr->Init();svr->EventLoop();return 0;
}
代码运行结果:
epoll的LT和ET工作模式
什么是LT和ET
一个现象:
我们将这行代码注释掉,上层不处理新连接到来的这个事件
代码运行结果:
发现底层会一直通知上层,有新事件到来,请上层处理。
一旦有新的连接到来或者有新的数据到来,上层如果你不取走,底层会一直通知你去取走哦,这种模式就叫做LT
LT:水平触发Level Triggered ,ET:边缘触发Edge Triggered
LT、ET,和示波器很像
LT一直处于高电平,表示为真。ET只有从低点到高点变化的时候,才为真
epoll默认模式:LT模式。事件到来,但是上层不处理,高电平,一直有效
ET:底层数据或者连接,从无到有,从有到多,变化的时候,才会通知我们一次。
给大家举一个例子
张三是个快递员,是一个比较负责的快递员。假如你买的快递到了,有6个快递,下楼取一下,你可能在打游戏没时间取,过一会儿,张三又给你打电话:你有6个快递,你下来取一下。你游戏没打完就是不取,但是呢,张三期间也一直打电话。张三的同事李四,李四也有你的快递,路过,张三就给李四说:我帮你派发吧。张三现在一共有十个快递,让你下来取,你不下来取张三就一直会给你打电话,张三送快递的模式就叫做LT模式。
过了一周,又要派发你的快递。但是是李四来派发你的快递,李四给你打电话说:你不来取,我就不给你打电话了,我只通知你一次。你一听:那我还是下去取吧,要是走了我快递里就找不到了,我的快递在一堆快递里,除了快递员不知道哪个是我的快递。你取的时候,发现你只能拿走6个当中的4个,一次拿不完,你把快递拿上去之后剩下的快递你不知道在哪了,索性就不要了,你又去打你的游戏了。李四看到张三也有你的快递,还人情就帮张三派发,又给你打电话:你有新的快递到了,你不下来取,我就走了。你一听,这人只通知一次,你就又下去取了,这次就把上次没取完的一并给取走了。
总结:
张三是有快递就一直给你打电话。
李四是从无到有,从有到多,只给你打一次电话,你没取干净,我也不再通知你。
你认为两个快递员谁的工作效率更高呢?
ET,因为这个快递员在单位时间内通知的人数是更多的。ET一小时内可以通知50人,而LT可能只通知了10人。主要是ET的通知效率高。
ET:因为只会通知一次,所以会倒逼程序员使用ET工作模式的时候,每次通知,都必须要把本轮数据全部取走。你怎么知道你把本次就绪底层的数据读取完毕了呢?循环读取,知道读取不到数据了。一般的fd,是阻塞式的fd,如果没有数据了会阻塞,所以在ET模式下,我们的fd必须是非阻塞的。
ET的通知效率高,不仅仅如此,ET的IO效率也更高,原因在于,每通知一次就要求程序员把本轮数据全部取走,这意味着tcp会向对方通告一个更大的窗口,从而概率上让对方一次能给我发送更多的数据,提高了网络的吞吐量,则提高IO效率。
ET的效率也不是一定比LT高,LT也可以将所有的fd设置成为非阻塞,然后循环读取,通知第一次的时候就全部取走,不就和ET一样了嘛。LT是epoll的默认行为,使用ET能够减少epoll触发的次数,但是代价就是强逼着程序员一次就绪响应就把所有的数据都处理完。所以说,如果LT情况下如果也能做到每次就绪的文件描述符都立即处理,不让这个就绪被重复提示的话,其实性能也是一样的。那为什么LT不代替ET呢?因为程序员不能保证完全的替代,会可能写出bug。
实现一个简易的reactor服务器
我们之前read函数只读一次,因为tcp是面向字节流的,所以不能保证读上来的是一个完整的报文。
如果读一次没有读完,那需要读第二次,也就是循环读,所以读到的数据需要存到一个缓冲区里。
下面我们就用epoll来实现一个ET模式的reactor服务器
TcpServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>const int num = 64; //一次性最多捞取上来多少个fdclass Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:Connection(int fd, TcpServer* tcp_server_ptr):_sockfd(fd), _tcp_server_ptr(tcp_server_ptr){}void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}
private:int _sockfd;std::string _inbuffer;std::string _outbuffer;func_t _recv_cb; //读回调函数func_t _send_cb; //写回调函数func_t _except_cb; //异常回调函数std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
};class TcpServer : public nocpy
{
public:TcpServer(uint16_t port):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->SetNonBlock(); //设置非阻塞_listensock_ptr->Listen();_epoller_ptr->EpollerCreate();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());}void Dispatcher(int n){for (int i = 0; i < n; ++i){}}void Start(){_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, _listensock_ptr->Fd(), EVENT_IN);while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1if (n < 0){lg(Error, "epoll wait failed: %s", strerror(errno));break;}else if (n == 0){lg(Info, "timeout...");}else {Dispatcher(n);}}}~TcpServer(){}
private:std::shared_ptr<Socket> _listensock_ptr;std::shared_ptr<Epoller> _epoller_ptr;struct epoll_event recvs[num]; //捞取就绪事件的数组std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接int _port;
};
更新代码
TcpServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>const int num = 64; //一次性最多捞取上来多少个fdclass Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:Connection(int fd, TcpServer* tcp_server_ptr):_sockfd(fd), _tcp_server_ptr(tcp_server_ptr){}void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}public:int _sockfd;std::string _inbuffer;std::string _outbuffer;func_t _recv_cb; //读回调函数func_t _send_cb; //写回调函数func_t _except_cb; //异常回调函数std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
};class TcpServer : public nocpy
{
public:TcpServer(uint16_t port):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->SetNonBlock(); //设置非阻塞_listensock_ptr->Listen();_epoller_ptr->EpollerCreate();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());AddConnection(_listensock_ptr->Fd(), EVENT_IN, nullptr, nullptr, nullptr);}void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb){//建立连接的同时,挂到epoller中//1.将listensock添加到Connection中,同时,将listen和Connection放入Connections中std::shared_ptr<Connection> conn(new Connection(_listensock_ptr->Fd(), this));conn->SetHandler(recv_cb, send_cb, except_cb);_connections.insert(make_pair(_listensock_ptr->Fd(), conn));//2.添加对应的事件,放入到epoller中_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);}void Dispatcher(int n){//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦for (int i = 0; i < n; ++i){uint32_t event = recvs[i].events;int fd = recvs[i].data.fd;auto pos = _connections.find(fd);assert(pos != _connections.end());//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可if (event & EPOLLERR)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLHUP)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLIN){if (pos->second->_recv_cb)pos->second->_recv_cb(pos->second);}if (event & EPOLLOUT){if (pos->second->_send_cb)pos->second->_send_cb(pos->second);}}}void Start(){while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1if (n < 0){lg(Error, "epoll wait failed: %s", strerror(errno));break;}else if (n == 0){lg(Info, "timeout...");}else {Dispatcher(n);}}}~TcpServer(){}
private:std::shared_ptr<Socket> _listensock_ptr;std::shared_ptr<Epoller> _epoller_ptr;struct epoll_event recvs[num]; //捞取就绪事件的数组std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接int _port;
};
更新代码
TcpServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>const int num = 64; //一次性最多捞取上来多少个fdclass Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:Connection(int fd, TcpServer* tcp_server_ptr):_sockfd(fd), _tcp_server_ptr(tcp_server_ptr){}void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}public:int _sockfd;std::string _inbuffer;std::string _outbuffer;func_t _recv_cb; //读回调函数func_t _send_cb; //写回调函数func_t _except_cb; //异常回调函数std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
};class TcpServer : public nocpy
{
public:TcpServer(uint16_t port):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->SetNonBlock(); //设置非阻塞_listensock_ptr->Listen();_epoller_ptr->EpollerCreate();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);}void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb){//建立连接的同时,挂到epoller中//1.将fd添加到Connection中,同时,将fd和Connection放入Connections中std::shared_ptr<Connection> conn(new Connection(fd, this));conn->SetHandler(recv_cb, send_cb, except_cb);_connections.insert(make_pair(fd, conn));//2.添加对应的事件,放入到epoller中_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);}void Acceptor(std::shared_ptr<Connection> conn){while (1){std::string clientIp;uint16_t clientPort;Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);if (newSocket.Fd() < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "listensock accept failed: %s", strerror(errno));break;}lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);newSocket.SetNonBlock();AddConnection(newSocket.Fd(), EVENT_IN, nullptr, nullptr, nullptr);}}void Dispatcher(int n){//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦for (int i = 0; i < n; ++i){uint32_t event = recvs[i].events;int fd = recvs[i].data.fd;auto pos = _connections.find(fd);assert(pos != _connections.end());//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可if (event & EPOLLERR)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLHUP)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLIN){if (pos->second->_recv_cb)pos->second->_recv_cb(pos->second);}if (event & EPOLLOUT){if (pos->second->_send_cb)pos->second->_send_cb(pos->second);}}}void Start(){while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1if (n < 0){lg(Error, "epoll wait failed: %s", strerror(errno));break;}else if (n == 0){lg(Info, "timeout...");}else {Dispatcher(n);}}}~TcpServer(){}
private:std::shared_ptr<Socket> _listensock_ptr;std::shared_ptr<Epoller> _epoller_ptr;struct epoll_event recvs[num]; //捞取就绪事件的数组std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接int _port;
};
用三个客户端连接,代码运行结果
更新代码
TcpServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>const int num = 64; //一次性最多捞取上来多少个fdclass Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:Connection(int fd, TcpServer* tcp_server_ptr):_sockfd(fd), _tcp_server_ptr(tcp_server_ptr){}void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}
public:int _sockfd;std::string _inbuffer;std::string _outbuffer;func_t _recv_cb; //读回调函数func_t _send_cb; //写回调函数func_t _except_cb; //异常回调函数std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针std::string _ip;uint16_t _port;
};class TcpServer : public nocpy
{
public:TcpServer(uint16_t port, func_t OnMessage):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->SetNonBlock(); //设置非阻塞_listensock_ptr->Listen();_epoller_ptr->EpollerCreate();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);}void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb){//建立连接的同时,挂到epoller中//1.将fd添加到Connection中,同时,将fd和Connection放入Connections中std::shared_ptr<Connection> conn(new Connection(fd, this));conn->SetHandler(recv_cb, send_cb, except_cb);_connections.insert(make_pair(fd, conn));//2.添加对应的事件,放入到epoller中_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);}void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可{lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);auto it = _connections.find(conn->_sockfd);assert (it != _connections.end());_connections.erase(it); //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);close(conn->_sockfd);}void Recver(std::shared_ptr<Connection> conn){int fd = conn->_sockfd;char buffer[4096];while (1){int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取if (n < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "read failed: %s", strerror(errno));conn->_except_cb(conn);return;}else if (n == 0){lg(Info, "server closed...");conn->_except_cb(conn);return;}buffer[n] = 0;conn->_inbuffer += buffer; //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理}_OnMessage(conn); //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理}void Acceptor(std::shared_ptr<Connection> conn){while (1){std::string clientIp;uint16_t clientPort;Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);if (newSocket.Fd() < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "listensock accept failed: %s", strerror(errno));break;}lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);newSocket.SetNonBlock();AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\nullptr, std::bind(&TcpServer::Excepter, this, std::placeholders::_1));}}void Dispatcher(int n){//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦for (int i = 0; i < n; ++i){uint32_t event = recvs[i].events;int fd = recvs[i].data.fd;auto pos = _connections.find(fd);assert(pos != _connections.end());//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可if (event & EPOLLERR)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLHUP)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLIN){if (pos->second->_recv_cb)pos->second->_recv_cb(pos->second);}if (event & EPOLLOUT){if (pos->second->_send_cb)pos->second->_send_cb(pos->second);}}}void Start(){while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1if (n < 0){lg(Error, "epoll wait failed: %s", strerror(errno));break;}else if (n == 0){lg(Info, "timeout...");}else {Dispatcher(n);}}}~TcpServer(){}
private:std::shared_ptr<Socket> _listensock_ptr;std::shared_ptr<Epoller> _epoller_ptr;struct epoll_event recvs[num]; //捞取就绪事件的数组std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接int _port;func_t _OnMessage; //让上层处理信息
};
main.cc文件
#include "TcpServer.hpp"void DefaultOnMessage(std::shared_ptr<Connection> conn)
{std::cout << "sock: " << conn->_sockfd <<"的缓冲区里的数据为:" << conn->_inbuffer << std::endl;
}int main()
{std::unique_ptr<TcpServer> svr(new TcpServer(8080, DefaultOnMessage));svr->Init();svr->Start();return 0;
}
代码运行结果如下
但是发现了一个问题:类似于循环引用,异常事件关闭连接时,导致程序出错。
更新代码
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>const int num = 64; //一次性最多捞取上来多少个fdclass Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:Connection(int fd):_sockfd(fd){}void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}
public:int _sockfd;std::string _inbuffer;std::string _outbuffer;func_t _recv_cb; //读回调函数func_t _send_cb; //写回调函数func_t _except_cb; //异常回调函数std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针std::string _ip;uint16_t _port;
};class TcpServer : public nocpy, public std::enable_shared_from_this<TcpServer> //继承此的作用仅是为了能获取到this对应的shared_ptr
{
public:TcpServer(uint16_t port, func_t OnMessage):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->SetNonBlock(); //设置非阻塞_listensock_ptr->Listen();_epoller_ptr->EpollerCreate();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);}void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb){//建立连接的同时,挂到epoller中//1.将fd添加到Connection中,同时,将fd和Connection放入Connections中std::shared_ptr<Connection> conn(new Connection(fd));conn->SetHandler(recv_cb, send_cb, except_cb);_connections.insert(make_pair(fd, conn));if (fd == _listensock_ptr->Fd())conn->_tcp_server_ptr = shared_ptr<TcpServer>(this); //如果是listensock连接,就构造智能指针来管理TcpServerelseconn->_tcp_server_ptr = shared_from_this(); //如果是其他连接,就共享该资源来管理//2.添加对应的事件,放入到epoller中_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);}void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可{lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);auto it = _connections.find(conn->_sockfd);assert (it != _connections.end());_connections.erase(it); //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);close(conn->_sockfd);}void Recver(std::shared_ptr<Connection> conn){int fd = conn->_sockfd;char buffer[4096];while (1){int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取if (n < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "read failed: %s", strerror(errno));conn->_except_cb(conn);return;}else if (n == 0){lg(Info, "server closed...");conn->_except_cb(conn);return;}buffer[n] = 0;conn->_inbuffer += buffer; //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理}_OnMessage(conn); //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理}void Acceptor(std::shared_ptr<Connection> conn){while (1){std::string clientIp;uint16_t clientPort;Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);if (newSocket.Fd() < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "listensock accept failed: %s", strerror(errno));break;}lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);newSocket.SetNonBlock();AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\nullptr, std::bind(&TcpServer::Excepter, this, std::placeholders::_1));}}void Dispatcher(int n){//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦for (int i = 0; i < n; ++i){uint32_t event = recvs[i].events;int fd = recvs[i].data.fd;auto pos = _connections.find(fd);assert(pos != _connections.end());//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可if (event & EPOLLERR)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLHUP)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLIN){if (pos->second->_recv_cb)pos->second->_recv_cb(pos->second);}if (event & EPOLLOUT){if (pos->second->_send_cb)pos->second->_send_cb(pos->second);}}}void Start(){while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1if (n < 0){lg(Error, "epoll wait failed: %s", strerror(errno));break;}else if (n == 0){lg(Info, "timeout...");}else {Dispatcher(n);}}}~TcpServer(){}
private:std::shared_ptr<Socket> _listensock_ptr;std::shared_ptr<Epoller> _epoller_ptr;struct epoll_event recvs[num]; //捞取就绪事件的数组std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接int _port;func_t _OnMessage; //让上层处理信息
};
运行结果如下:
每一个socket都配套有一个读写缓冲区,为什么要有读缓冲区?因为你不能保证一次read上来的数据就是一个完整的数据。为什么要有写缓冲区?因为你不能保证内核缓冲区是有空间的,也就是你无法保证发送条件是否就绪。
异常和读事件我们都处理了,如何处理写事件呢?
通常情况下,发送缓冲区一般都是有空间的,写事件一般都是就绪的,如果我们设置对EPOLLOUT关心,那EPOLLOUT几乎每次都是就绪,会导致epoll_wait经常返回,浪费CPU资源。
结论:对于读事件,设置常关心,对于写事件,按需设置。什么是按需设置?直接写入,如果写入完成就结束。如果没有将这一轮的数据写完,outbuffer里还有数据,我们就需要对写事件进行关心了,如果写完了,就去掉对写事件的关心。
更新代码
TcpServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>const int num = 64; //一次性最多捞取上来多少个fdclass Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:Connection(int fd):_sockfd(fd){}void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}
public:int _sockfd;std::string _inbuffer;std::string _outbuffer;func_t _recv_cb; //读回调函数func_t _send_cb; //写回调函数func_t _except_cb; //异常回调函数std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针std::string _ip;uint16_t _port;
};class TcpServer : public nocpy, public std::enable_shared_from_this<TcpServer> //继承此的作用仅是为了能获取到this对应的shared_ptr
{
public:TcpServer(uint16_t port, func_t OnMessage):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->SetNonBlock(); //设置非阻塞_listensock_ptr->Listen();_epoller_ptr->EpollerCreate();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);}void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb){//建立连接的同时,挂到epoller中//1.将fd添加到Connection中,同时,将fd和Connection放入Connections中std::shared_ptr<Connection> conn(new Connection(fd));conn->SetHandler(recv_cb, send_cb, except_cb);_connections.insert(make_pair(fd, conn));if (fd == _listensock_ptr->Fd()) //如果是listensock连接,就构造智能指针来管理TcpServerconn->_tcp_server_ptr = shared_ptr<TcpServer>(this);elseconn->_tcp_server_ptr = shared_from_this(); //如果是其他连接,就共享该资源来管理//2.添加对应的事件,放入到epoller中_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);}void Sender(std::shared_ptr<Connection> conn){while (1){int n = write(conn->_sockfd, conn->_outbuffer.c_str(), conn->_outbuffer.size());if (n > 0){conn->_outbuffer.erase(0, n);if (conn->_outbuffer.empty() == true){return;}}else if (n == 0){return;}else{if (errno == EWOULDBLOCK || errno == EAGAIN){//说明底层数据不就绪,即内核写缓冲区没有空间了break;}if (errno == EINTR){continue;}lg(Error, "write failed, socket: %d", conn->_sockfd);conn->_except_cb(conn);return;}}if (!conn->_outbuffer.empty()){//说明没有将数据发完,则需要设置写事件关心了uint32_t event = EVENT_OUT | EVENT_IN;_epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);}else {//说明将数据发完了,则取消对写事件关心了uint32_t event = EVENT_IN;_epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);}}void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可{lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);auto it = _connections.find(conn->_sockfd);assert (it != _connections.end());_connections.erase(it); //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);close(conn->_sockfd);}void Recver(std::shared_ptr<Connection> conn){int fd = conn->_sockfd;char buffer[4096];while (1){int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取if (n < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "read failed: %s", strerror(errno));conn->_except_cb(conn);return;}else if (n == 0){lg(Info, "server closed...");conn->_except_cb(conn);return;}buffer[n] = 0;conn->_inbuffer += buffer; //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理}_OnMessage(conn); //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理}void Acceptor(std::shared_ptr<Connection> conn){while (1){std::string clientIp;uint16_t clientPort;Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);if (newSocket.Fd() < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "listensock accept failed: %s", strerror(errno));break;}lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);newSocket.SetNonBlock();AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\std::bind(&TcpServer::Sender, this, std::placeholders::_1),\std::bind(&TcpServer::Excepter, this, std::placeholders::_1));}}void Dispatcher(int n){//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦for (int i = 0; i < n; ++i){uint32_t event = recvs[i].events;int fd = recvs[i].data.fd;auto pos = _connections.find(fd);assert(pos != _connections.end());//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可if (event & EPOLLERR)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLHUP)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLIN){if (pos->second->_recv_cb)pos->second->_recv_cb(pos->second);}if (event & EPOLLOUT){if (pos->second->_send_cb)pos->second->_send_cb(pos->second);}}}void Start(){while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1if (n < 0){lg(Error, "epoll wait failed: %s", strerror(errno));break;}else if (n == 0){lg(Info, "timeout...");}else {Dispatcher(n);}}}~TcpServer(){}
private:std::shared_ptr<Socket> _listensock_ptr;std::shared_ptr<Epoller> _epoller_ptr;struct epoll_event recvs[num]; //捞取就绪事件的数组std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接int _port;func_t _OnMessage; //让上层处理信息
};
main.cc文件
#include "TcpServer.hpp"std::string Handle(std::string& inbuffer) //业务处理
{std::string tmp = inbuffer;inbuffer.clear();return tmp;
}
void DefaultOnMessage(std::shared_ptr<Connection> conn)
{string response_str = Handle(conn->_inbuffer);if (response_str.empty()) return;conn->_outbuffer += response_str;if (conn->_send_cb)conn->_send_cb(conn);
}int main()
{std::unique_ptr<TcpServer> svr(new TcpServer(8080, DefaultOnMessage));svr->Init();svr->Start();return 0;
}
代码运行结果如下:
套用之前写的网络版本计算器业务逻辑 - 文末附完整代码
代码运行结果
reactor模型是半同步半异步的模型
半同步体现的是等:由epoll来做的:保证了就绪事件的通知和进行IO
半异步的体现:负责了业务处理,如果把业务处理放到线程池中去处理,就是半异步了。因为我们的业务处理并不耗时,所以就没有开多线程
还有一种模式叫做Proactor,纯异步的编写方式,在linux服务器设计里面没有,我们就不涉及了。
reactor也叫做反应堆,是什么意思呢?如同打地鼠的游戏
玩游戏的人就相当于是一个多路转接,我们要检测每一个洞口有没有地鼠出来,虽然没出来,但是我们知道一旦出来了我就要执行我的回调方法(来砸它) – 回调函数是提前设置好的。
游戏的面板就相当于我们的reactor,每一个洞就相当于Connection,老鼠上来了就叫做事件就绪,执行砸方法就是执行回调函数。这种就叫做反应堆
redis底层用的就是单reactor,处理用的是reactor的LT模式
完整版代码如下:
文件目录
Epoller.hpp文件
#include <sys/epoll.h>
#include "Log.hpp"
#include <cstring>class Epoller
{
public:Epoller(){}void EpollerCreate(){_epollfd = epoll_create(64);if (_epollfd < 0){lg(Fatal, "Create epoll failed: %s", strerror(errno));abort();}}void EpollerUpate(int oper, int fd, uint32_t event){int n = 0;if (oper == EPOLL_CTL_ADD || oper == EPOLL_CTL_MOD){struct epoll_event epEvent;epEvent.data.fd = fd;epEvent.events = 0;epEvent.events |= event;n = epoll_ctl(_epollfd, oper, fd, &epEvent);if (n < 0){lg(Error, "epoll ctl failed: %s", strerror(errno));}}else if (oper == EPOLL_CTL_DEL){n = epoll_ctl(_epollfd, oper, fd, 0);if (n < 0){lg(Error, "epoll ctl failed: %s", strerror(errno));}}}int Wait(struct epoll_event* events, int n, int timeout){int m = epoll_wait(_epollfd, events, n, timeout);if (m < 0){lg(Error, "epoll wait failed: %s", strerror(errno));}else if (m == 0){//超时返回(非阻塞返回)lg(Info, "timeout...");}return m;}int Fd(){return _epollfd;}
private:int _epollfd;
};
Log.hpp文件
#pragma once
#include <iostream>
#include <string>
#include <stdarg.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>using namespace std;#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4#define Screen 1
#define Onefile 2
#define Classfile 3#define fileName "log.txt"
//使用前需要创建log目录
class Log
{
public:Log(){printMethod = Screen;path = "./log/";}void Enable(int method){printMethod = method;}void printOneFile(string logname, const string& logtxt){logname = path + logname;int fd = open(logname.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666);//open只会创建文件不会创建目录if (fd < 0){perror("open failed");return;}write(fd, logtxt.c_str(), logtxt.size());close(fd);}void printClassFile(int level, const string& logtxt){string filename = fileName;filename += ".";filename += leveltoString(level);printOneFile(filename, logtxt);}void printLog(int level, const string& logtxt){if (printMethod == Screen){cout << logtxt << endl;return;}else if (printMethod == Onefile){printOneFile(fileName, logtxt);return;}else if (printMethod == Classfile){printClassFile(level, logtxt);return;}}const char* leveltoString(int level){if (level == Info) return "Info";else if (level == Debug) return "Debug";else if (level == Error) return "Error";else if (level == Fatal) return "Fatal";else return "default";}void operator()(int level, const char* format, ...){time_t t = time(nullptr);struct tm* st = localtime(&t);char leftbuffer[4096];snprintf(leftbuffer, sizeof(leftbuffer), "year: %d, month: %d, day: %d, hour: %d, minute: %d, second: %d\n\[%s]:",st->tm_year + 1900, st->tm_mon + 1, st->tm_mday, st->tm_hour, st->tm_min, st->tm_sec, leveltoString(level));char rightbuffer[4096];va_list start;va_start(start, format);vsnprintf(rightbuffer, sizeof(rightbuffer), format, start);va_end(start);char logtxt[4096 * 2];snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);printLog(level, logtxt);}
private:int printMethod;string path;//路径与文件名解耦,最后将路径和文件粘合起来,再用open打开即可
};
nocpy.hpp文件
#pragma once
//编程技巧:写防拷贝的类的时候直接继承,就不用自己再手动写了
class nocpy
{
public:nocpy(){}~nocpy(){}
private:nocpy(const nocpy&) = delete;nocpy& operator=(const nocpy&) = delete;
};
Socket.hpp文件
#pragma once
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <cstring>
#include "Log.hpp"Log lg;
class Socket
{
public:Socket(int fd = -1):_sockfd(fd){}~Socket(){}void CreateSocket(){_sockfd = socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){lg(Fatal, "create socket failed:%s", strerror(errno));exit(1);}}void Bind(uint16_t serverPort){struct sockaddr_in server;server.sin_family = AF_INET;server.sin_port = htons(serverPort);server.sin_addr.s_addr = INADDR_ANY;int n = bind(_sockfd, (struct sockaddr*)&server, sizeof(server));if (n < 0){lg(Fatal, "bind socket failed:%s", strerror(errno));exit(2);}}void Listen(){int n = listen(_sockfd, 5);if (n < 0){lg(Fatal, "listen socket failed:%s", strerror(errno));exit(3);}}int Accept(string* clientIp, uint16_t* clinetPort){struct sockaddr_in peer;socklen_t len = sizeof(peer);int newsocket = accept(_sockfd, (struct sockaddr*)&peer, &len);if (newsocket < 0){return -1;}*clientIp = inet_ntoa(peer.sin_addr);*clinetPort = ntohs(peer.sin_port);return newsocket;}int Connect(const string& serverIp, uint16_t serverPort){struct sockaddr_in server;server.sin_family = AF_INET;server.sin_addr.s_addr = inet_addr(serverIp.c_str());server.sin_port = htons(serverPort);int n = connect(_sockfd, (struct sockaddr*)&server, sizeof(server));if (n < 0){return -1;}return 0;}void Setsockopt(){int opt = 1;if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (const void*)&opt, sizeof(opt)) < 0)lg(Error, "%s\n", strerror(errno));}void SetNonBlock(){int flag = fcntl(_sockfd, F_GETFL);fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);}void Close(){close(_sockfd);}int Fd(){return _sockfd;}
private:int _sockfd;
};
TcpServer.hpp文件如下
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>const int num = 64; //一次性最多捞取上来多少个fdclass Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:Connection(int fd):_sockfd(fd){}void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}
public:int _sockfd;std::string _inbuffer;std::string _outbuffer;func_t _recv_cb; //读回调函数func_t _send_cb; //写回调函数func_t _except_cb; //异常回调函数std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针std::string _ip;uint16_t _port;
};class TcpServer : public nocpy, public std::enable_shared_from_this<TcpServer> //继承此的作用仅是为了能获取到this对应的shared_ptr
{
public:TcpServer(uint16_t port, func_t OnMessage):_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage){}void Init(){_listensock_ptr->CreateSocket();_listensock_ptr->Setsockopt();_listensock_ptr->Bind(_port);_listensock_ptr->SetNonBlock(); //设置非阻塞_listensock_ptr->Listen();_epoller_ptr->EpollerCreate();lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);}void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb){//建立连接的同时,挂到epoller中//1.将fd添加到Connection中,同时,将fd和Connection放入Connections中std::shared_ptr<Connection> conn(new Connection(fd));conn->SetHandler(recv_cb, send_cb, except_cb);_connections.insert(make_pair(fd, conn));if (fd == _listensock_ptr->Fd()) //如果是listensock连接,就构造智能指针来管理TcpServerconn->_tcp_server_ptr = shared_ptr<TcpServer>(this);elseconn->_tcp_server_ptr = shared_from_this(); //如果是其他连接,就共享该资源来管理//2.添加对应的事件,放入到epoller中_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);}void Sender(std::shared_ptr<Connection> conn){while (1){int n = write(conn->_sockfd, conn->_outbuffer.c_str(), conn->_outbuffer.size());if (n > 0){conn->_outbuffer.erase(0, n);if (conn->_outbuffer.empty() == true){return;}}else if (n == 0){return;}else{if (errno == EWOULDBLOCK || errno == EAGAIN){//说明底层数据不就绪,即内核写缓冲区没有空间了break;}if (errno == EINTR){continue;}lg(Error, "write failed, socket: %d", conn->_sockfd);conn->_except_cb(conn);return;}}if (!conn->_outbuffer.empty()){//说明没有将数据发完,则需要设置写事件关心了uint32_t event = EVENT_OUT | EVENT_IN;_epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);}else {//说明将数据发完了,则取消对写事件关心了uint32_t event = EVENT_IN;_epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);}}void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可{lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);auto it = _connections.find(conn->_sockfd);assert (it != _connections.end());_connections.erase(it); //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);close(conn->_sockfd);}void Recver(std::shared_ptr<Connection> conn){int fd = conn->_sockfd;char buffer[4096];while (1){int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取if (n < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "read failed: %s", strerror(errno));conn->_except_cb(conn);return;}else if (n == 0){lg(Info, "server closed...");conn->_except_cb(conn);return;}buffer[n] = 0;conn->_inbuffer += buffer; //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理}_OnMessage(conn); //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理}void Acceptor(std::shared_ptr<Connection> conn){while (1){std::string clientIp;uint16_t clientPort;Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);if (newSocket.Fd() < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){//如果底层没有数据了break;}else if (errno == EINTR){//被信号打断了continue;}lg(Error, "listensock accept failed: %s", strerror(errno));break;}lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);newSocket.SetNonBlock();AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\std::bind(&TcpServer::Sender, this, std::placeholders::_1),\std::bind(&TcpServer::Excepter, this, std::placeholders::_1));}}void Dispatcher(int n){//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦for (int i = 0; i < n; ++i){uint32_t event = recvs[i].events;int fd = recvs[i].data.fd;auto pos = _connections.find(fd);assert(pos != _connections.end());//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可if (event & EPOLLERR)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLHUP)event |= (EPOLLIN | EPOLLOUT);if (event & EPOLLIN){if (pos->second->_recv_cb)pos->second->_recv_cb(pos->second);}if (event & EPOLLOUT){if (pos->second->_send_cb)pos->second->_send_cb(pos->second);}}}void Start(){while (1){int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1if (n < 0){lg(Error, "epoll wait failed: %s", strerror(errno));break;}else if (n == 0){lg(Info, "timeout...");}else {Dispatcher(n);}}}~TcpServer(){}
private:std::shared_ptr<Socket> _listensock_ptr;std::shared_ptr<Epoller> _epoller_ptr;struct epoll_event recvs[num]; //捞取就绪事件的数组std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接int _port;func_t _OnMessage; //让上层处理信息
};
main.cc文件
#include "TcpServer.hpp"
#include "ServerCal.hpp"void DefaultOnMessage(std::shared_ptr<Connection> conn)
{ServerCal calculator;std::string response_str = calculator.Calculator(conn->_inbuffer);if (response_str.empty()) return;conn->_outbuffer += response_str;if (conn->_send_cb)conn->_send_cb(conn);
}int main()
{std::unique_ptr<TcpServer> svr(new TcpServer(8080, DefaultOnMessage));svr->Init();svr->Start();return 0;
}
Protocol.hpp文件
#pragma once
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>using namespace std;const string blank_space_sep = " ";
const string protocol_sep = "\n";
//添加报头:"内容" 转为 "长度\n内容\n"
bool Encode(string *package, const string& content)
{package->clear();*package += to_string(content.size());*package += protocol_sep;*package += content;*package += protocol_sep;return true;
}
//去除报头:"长度\n内容\n" 转为 "内容" -- 可能存在本来是5\n1 + 1\n 但接受到的是5\n1 +
bool Decode(string &package, string *content)
{content->clear();int pos = package.find(protocol_sep);if (pos == string::npos) return false;int len = stoi(package.substr(0, pos));int totalLen = pos + len + 2;if (package.size() < totalLen) return false;//如果报文不完整,说明*content = package.substr(pos + protocol_sep.size(), len);package.erase(0, totalLen);return true;
}//提取字符串
bool Extract(const string& in, int& x, int& y, char& oper)
{int pos = in.find(blank_space_sep);if (pos == string::npos) return false;x = stoi(in.substr(0, pos));oper = *in.substr(pos + blank_space_sep.size(), 1).c_str();int pos2 = in.rfind(blank_space_sep);if (pos2 == string::npos) return false;y = stoi(in.substr(pos2 + blank_space_sep.size()));return true;
}//请求协议
class Request
{
public:Request(int x = 0, int y = 0, char oper = '+'):_x(x), _y(y), _oper(oper){}//序列化 -- 将结构体转为"_x + _y"bool Serialize(string* out){
#ifdef MySelfout->clear();*out += to_string(_x);*out += blank_space_sep;*out += _oper;*out += blank_space_sep;*out += to_string(_y);return true;
#elseJson::Value root;root["x"] = _x;root["y"] = _y;root["oper"] = _oper;Json::StyledWriter w;*out = w.write(root);return true;
#endif}//反序列化 -- 将"_x + _y"转为结构体bool Deserialize(const string& in){
#ifdef MySelfreturn Extract(in, _x, _y, _oper);
#elseJson::Reader r;Json::Value root;r.parse(in, root);_x = root["x"].asInt();_y = root["y"].asInt();_oper = root["oper"].asInt();return true;
#endif}
public:int _x;int _y;char _oper;
};//响应协议
class Response
{const string blank_space_sep = " ";
public:Response(int result = 0, int code = 0):_result(), _code(code){}//序列化 -- 将结构体转为"_result _code"bool Serialize(string* out){
#ifdef MySelf*out = to_string(_result) + blank_space_sep + to_string(_code);return true;
#else Json::Value root;root["code"] = _code;root["result"] = _result;Json::StyledWriter w;*out = w.write(root);return true;
#endif}//反序列化 -- 将"_result _code"转为结构体bool Deserialize(const string& in){
#ifdef MySelfint pos = in.find(blank_space_sep);if (pos == string::npos) return false;_result = stoi(in.substr(0, pos));_code = stoi(in.substr(pos + blank_space_sep.size()));return true;
#elseJson::Reader r;Json::Value root;r.parse(in, root);_result = root["result"].asInt();_code = root["code"].asInt();return true;
#endif}
public:int _result;int _code; // 0,可信,否则!0具体是几,表明对应的错误原因
};
ServerCal.hpp文件
#pragma once
#include "Protocol.hpp"//服务器的计算服务
class ServerCal
{
public:ServerCal(){}Response CalculatorHelper(const Request &req){int x = req._x;char oper = req._oper;int y = req._y;Response rsp(0, 0);switch (oper){case '+':{rsp._result = x + y;rsp._code = 0;break;}case '-':{rsp._result = x - y;rsp._code = 0;break;}case '*':{rsp._result = x * y;rsp._code = 0;break;}case '/':{if (y == 0){rsp._result = 0;rsp._code = -1;break;}rsp._result = x / y;rsp._code = 0;break;}case '%':{if (y == 0){rsp._result = 0;rsp._code = -1;break;}rsp._result = x % y;rsp._code = 0;break;}default:break;}return rsp;}string Calculator(string& s){string content;if(!Decode(s, &content))//将"长度/n内容/n" -> "内容"return "";Request rq;rq.Deserialize(content);//将"内容"反序列化Response rsp = CalculatorHelper(rq);//得到答案内容content.clear();rsp.Serialize(&content);//序列化答案内容string ret;Encode(&ret, content);//将"内容" -> "长度/n内容/n"return ret;}
};
TcpClient.cc文件
#include "Protocol.hpp"
#include "Socket.hpp"static void Usage(const std::string &proc)
{std::cout << "\nUsage: " << proc << "\tserverIp\tport\n" << std::endl;
}
int main(int argc, char* argv[])
{if (argc != 3){Usage(argv[0]);exit(0);}string serverIp = argv[1];uint16_t serverPort = atoi(argv[2]);Socket skt;skt.CreateSocket();skt.Connect(serverIp, serverPort);string streamBuffer;while (1){cout << "Enter#";fflush(stdout);string content;getline(cin, content);//提取字符串,得到”请求反序列化“Request rq;Extract(content, rq._x, rq._y, rq._oper);string s;rq.Serialize(&s);string ret;Encode(&ret, s);write(skt.Fd(), ret.c_str(), ret.size());char readBuffer[4096];int n = read(skt.Fd(), readBuffer, sizeof(readBuffer) - 1);if (n > 0){readBuffer[n] = 0;streamBuffer += readBuffer;string ret;Decode(streamBuffer, &ret);Response rsp;rsp.Deserialize(ret);cout << "code: " << rsp._code << " result: " << rsp._result << endl;}else if (n == 0){cerr << "Server closed..." << endl;break;}else {cerr << "read failed:" << strerror(errno) << endl;exit(3);}}return 0;
}
Makefile文件
1=main
.PHONY:all
all:$1 TcpClient
$1:$1.ccg++ -o $@ $^ -std=c++11 -ljsoncpp
TcpClient:TcpClient.ccg++ -o $@ $^ -std=c++11 -ljsoncpp
.PHONY:clean
clean:rm -rf $1 TcpClient