在处理大量请求时,各个引擎都会采用线程池的方法,并发处理这些请求,但当一万个请求来的时候,我们要创建一万个线程来处理吗,很显然不会,那假如我创建一千个线程,那一线程该如何处理这个十个请求呢?IO多路复用技术就是来解决一个线程处理多个请求的问题的。
首先,IO多路复用技术是由各个引擎通过C++代码调用操作系统提供的特定api来实现,而特定的api大致有三个,分别为select,poll,epoll,通过这三个api,也实现了三种不同的IO多路复用技术,其中select发明最早,性能最差,而发明最晚,性能最好的是epoll。
系统调用知识前提
多路复用技术涉及大量的系统调用,其中三者都需要使用的系统调用,具体使用和作用如下
// socket()系统调用
// 参数:
// domain:协议族(如AF_INET表示IPv4)。
// type:socket类型(如SOCK_STREAM表示TCP)。
// protocol:协议(通常为0,由系统选择适当的协议)。
// 返回值:成功时返回该socket的文件描述符,失败时返回-1
// 作用:创建一个socket服务来监听端口
int socket(int domain, int type, int protocol);// bind()系统调用
// 参数:
// sockfd:socket的文件描述符。
// addr:本地地址。
// addrlen:地址长度。
// 返回值:成功时返回0,失败时返回-1
// 作用:将socket绑定到一个本地地址(IP地址和端口)
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);// listen()系统调用
// 参数:
// sockfd:socket的文件描述符。
// backlog:挂起连接队列的最大长度。
// 返回值:成功时返回0,失败时返回-1
// 作用:将socket设置为被动模式,准备接受连接请求
int listen(int sockfd, int backlog);// accept()系统调用
// 参数:
// sockfd:监听socket的文件描述符。
// addr:指向客户端地址结构的指针。
// addrlen:地址结构的长度指针。
// 返回值:成功时返回新的socket文件描述符,失败时返回-1
// 作用:当主socket监听到一个请求时,创建一个新的socket,并通过当前
// 系统调用,将新的socket和客户端建立链接
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);// connect()系统调用
// 参数:
// sockfd:socket文件描述符。
// addr:服务器地址。
// addrlen:地址长度。
// 返回值:成功时返回0,失败时返回-1
// 作用:作用了accept类似,不过是发送网络请求时
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);// read()系统调用
// 参数:
// fd:文件描述符。
// buf:接收数据的缓冲区指针。
// count:缓冲区的长度。
// 返回值:成功时返回读取的字节数,失败时返回-1
// 作用:从文件描述符读取数据
ssize_t read(int fd, void *buf, size_t count);// write()系统调用
// 参数:
// fd:文件描述符。
// buf:发送数据的缓冲区指针。
// count:缓冲区的长度。
// 返回值:成功时返回写入的字节数,失败时返回-1
// 作用:向文件描述符写入数据
ssize_t write(int fd, const void *buf, size_t count);
// recv()系统调用
// 参数:
// sockfd:socket文件描述符。
// buf:接收数据的缓冲区指针。
// len:缓冲区的长度。
// flags:接收标志。
// 返回值:成功时返回读取的字节数,失败时返回-1
// 作用:从socket接收数据
ssize_t recv(int sockfd, void *buf, size_t len, int flags);// send()系统调用
// 参数:
// sockfd:socket文件描述符。
// buf:发送数据的缓冲区指针。
// len:缓冲区的长度。
// flags:发送标志。
// 返回值:成功时返回发送的字节数,失败时返回-1
// 作用:向socket发送数据
ssize_t send(int sockfd, const void *buf, size_t len, int flags);// close() 系统调用
// 参数:
// fd:文件描述符。
// 返回值:成功时返回 0,失败时返回 -1
int close(int fd);
而三者分别特有的系统调用如下
//===================selectIO多路复用技术特有的系统调用===================
// select()系统调用
// 参数:
// nfds:监视的文件描述符的范围(最大文件描述符加一)。
// readfds:指向一组文件描述符集合,这些描述符将被监视是否可读。
// writefds:指向一组文件描述符集合,这些描述符将被监视是否可写。
// exceptfds:指向一组文件描述符集合,这些描述符将被监视是否有异常。
// timeout:指定select等待的最大时间。如果为NULL,select将无限等待。
// 返回值:成功时返回就绪文件描述符的数量,超时时返回0,失败时返回-1
// 作用:监视多个文件描述符,等待它们变为可读、可写或发生异常
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);//===================selectIO多路复用技术特有的宏操作===================
// FD_ZERO() 宏
// 参数:
// set:指向要清空的文件描述符集合。
// 作用:清空文件描述符集合
void FD_ZERO(fd_set *set);// FD_SET() 宏
// 参数:
// fd:要添加的文件描述符。
// set:指向文件描述符集合。
// 作用:将文件描述符添加到集合中
void FD_SET(int fd, fd_set *set);// FD_CLR() 宏
// 参数:
// fd:要从集合中删除的文件描述符。
// set:指向文件描述符集合。
// 作用:从集合中删除文件描述符
void FD_CLR(int fd, fd_set *set);// FD_ISSET() 宏
// 参数:
// fd:要检查的文件描述符。
// set:指向文件描述符集合。
// 返回值:如果文件描述符在集合中则返回非零值,否则返回零。
// 作用:检查文件描述符是否在集合中
int FD_ISSET(int fd, fd_set *set);//===================pollIO多路复用技术特有的系统调用===================
// poll()系统调用
// 参数:
// fds:指向一个pollfd结构数组。
// nfds:数组中文件描述符的数量。
// timeout:等待的最大时间(毫秒)。负值表示无限等待。
// 返回值:成功时返回就绪文件描述符的数量,超时时返回0,失败时返回-1
// 作用:类似于select,但使用不同的数据结构,扩展性更好
int poll(struct pollfd *fds, nfds_t nfds, int timeout);//===================epollIO多路复用技术特有的系统调用===================
// epoll_create()系统调用
// 参数:
// size:建议的监听的文件描述符数量(通常被忽略)。
// 返回值:成功时返回新的epoll实例的文件描述符,失败时返回-1
// 作用:创建一个新的epoll实例
int epoll_create(int size);// epoll_create1() 系统调用
// 参数:
// flags:epoll 实例的创建标志(如 EPOLL_CLOEXEC)。
// 返回值:成功时返回新的 epoll 实例的文件描述符,失败时返回 -1
// 作用:创建一个新的 epoll 实例,可以指定标志
int epoll_create1(int flags);// epoll_ctl()系统调用
// 参数:
// epfd:epoll实例的文件描述符。
// op:操作类型(如EPOLL_CTL_ADD、EPOLL_CTL_MOD、EPOLL_CTL_DEL)。
// fd:要监视的文件描述符。
// event:指向epoll事件结构的指针。
// 返回值:成功时返回0,失败时返回-1
// 作用:控制epoll实例,注册、修改或删除监视的文件描述符
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);// epoll_wait()系统调用
// 参数:
// epfd:epoll实例的文件描述符。
// events:指向epoll事件结构数组的指针。
// maxevents:数组中事件的最大数量。
// timeout:等待的最大时间(毫秒)。负值表示无限等待。
// 返回值:成功时返回发生事件的文件描述符数量,失败时返回-1
// 作用:等待epoll实例中的文件描述符发生事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
了解了系统调用后,接下来便可以具体的学习三者的原理和代码实现了
selectIO多路复用技术:
这里我们只演示一个简单的select的用例,只接收客户端传来的数据,不进行返回。
首先,我们需要创建一个socket来监听请求,当客户端发送请求被我们接受到时,我们不能用当前的socket来进行连接,因为我们不止处理这一个连接,所以我们要调用socket系统调用创建一个新的socket来和客户端的socket来建立连接,如下图
由于我们并不是一个线程处理一个请求,所以我们并不能对于请求资源的准备进行等待,所以我们要想办法来监听哪个socket的资源已经接收到到了,我们就把线程资源给谁,select的解决办法是定义了一个1024位的结构,用来保存socket的缓冲区(所谓的缓冲区实际上就是socket在他的源码中定义的一些变量,分配的内存,用户态的缓冲区也一样,就是我们自己写程序时定义的变量分配的内存)的文件描述符信息,其中文件描述符是非负整数,如果是几,就将第几位变成1,这是缺陷之一,由于设计较早,最高只能保存1024,这在如今是完全不够用的,那么什么是文件描述符信息呢?
文件描述符信息是一个进程当前使用的资源(计算机的内存和硬盘都叫资源)的标识符,是一个非负整数。
客户端传入的资源通过网卡复制进socket的缓冲区,而这个结构就是保存这个缓存的的文件描述符,通过select的系统调用,监听文件表示符变为可读(就是缓冲区内已经有数据了),而文件表示符变为可读的过程就是内核去监控这块缓存区,等待其中存在内容(不一定是所有内容,有一点就传一点),就将文件表示符变为可读,select返回大于0的值(0表示超时,小于0表示错误)。然后就调用recv系统调用,来将客户端传来的数据,存入我们的缓冲区(变量)。
从图中我们可以看到,本地socket的缓存区也被保存的fd_set中,这是因为当有新的连接被socket监听到时,他会保存到他的缓存区中,所以我们也需要监控他的缓冲区以处理新的连接请求到来。
以下是简单的代码实现,逻辑不够严谨,理解过程就好:
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>#define PORT 8080
#define MAX_CLIENTS 10
#define BUFFER_SIZE 1024int main() {int server_fd, new_socket, client_socket[MAX_CLIENTS], max_sd, sd, activity, valread;struct sockaddr_in address;fd_set readfds;int addrlen = sizeof(address);char buffer[BUFFER_SIZE];// 初始化所有客户端socket为 0 (表示空闲)for (int i = 0; i < MAX_CLIENTS; i++) {client_socket[i] = 0;}// 创建服务器socketif ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {perror("socket failed");exit(EXIT_FAILURE);}// 设置服务器地址address.sin_family = AF_INET;address.sin_addr.s_addr = INADDR_ANY;address.sin_port = htons(PORT);// 绑定服务器socketif (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {perror("bind failed");close(server_fd);exit(EXIT_FAILURE);}// 监听服务器socketif (listen(server_fd, 3) < 0) {perror("listen");close(server_fd);exit(EXIT_FAILURE);}std::cout << "Listening on port " << PORT << std::endl;while (true) {// 清空文件描述符集合FD_ZERO(&readfds);// 将服务器socket添加到集合中FD_SET(server_fd, &readfds);max_sd = server_fd;// 添加客户端socket到集合中for (int i = 0; i < MAX_CLIENTS; i++) {sd = client_socket[i];if (sd > 0) FD_SET(sd, &readfds);if (sd > max_sd) max_sd = sd;}// 使用 select 监视文件描述符集合activity = select(max_sd + 1, &readfds, NULL, NULL, NULL);if ((activity < 0) && (errno != EINTR)) {std::cerr << "select error" << std::endl;}// 处理新的连接if (FD_ISSET(server_fd, &readfds)) {if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) {perror("accept");exit(EXIT_FAILURE);}std::cout << "New connection, socket fd is " << new_socket << std::endl;// 检查客户端数组是否有空位bool added = false;for (int i = 0; i < MAX_CLIENTS; i++) {if (client_socket[i] == 0) {client_socket[i] = new_socket;std::cout << "Adding to list of sockets as " << i << std::endl;added = true;break;}}// 如果没有空位,拒绝新的连接if (!added) {std::cerr << "Too many connections, rejecting new connection from "<< inet_ntoa(address.sin_addr) << ":" << ntohs(address.sin_port) << std::endl;close(new_socket);}}// 处理现有连接的数据for (int i = 0; i < MAX_CLIENTS; i++) {sd = client_socket[i];if (FD_ISSET(sd, &readfds)) {// 确保接收到完整的数据bool connection_closed = false;std::string total_data;do {valread = recv(sd, buffer, BUFFER_SIZE, 0);if (valread > 0) {total_data.append(buffer, valread);} else if (valread == 0) {// 对端关闭连接connection_closed = true;} else {perror("recv");close(sd);client_socket[i] = 0;break;}} while (valread > 0);}}}close(server_fd);return 0;
}
select缺陷很多,比如他的保存上线是1024,这在如今是远远不够的,他每次都要将结构清零,重新保存,这点也是性能损耗之一
pollIO多路复用技术:
他和select大致类似,但细节不同,他并不是简单保存文件描述符,而是保存了一个结构体如下:
struct pollfd {int fd; // 文件描述符short events; // 要监视的事件short revents; // 返回的事件
};
poll系统调用也不再检测文件是否可读,而是检查revents,当其为POLLIN时,则证明可读
其中events的是指监听的是读还是写操作等,这说明poll不仅可以监听是否可读,还能监听是否可写
而poll也不是用1024位的结构记录文件描述符,而使用pollfd数组的方式,这样也没有了最大限制,其他的流程大致和select一样,这里就不画图了,实现代码如下:
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#include <poll.h>
#include <vector>#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_CLIENTS 10int main() {int server_fd, new_socket, valread;struct sockaddr_in address;int addrlen = sizeof(address);char buffer[BUFFER_SIZE];// 创建服务器套接字if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {perror("socket failed");exit(EXIT_FAILURE);}// 设置服务器地址address.sin_family = AF_INET;address.sin_addr.s_addr = INADDR_ANY;address.sin_port = htons(PORT);// 绑定服务器套接字if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {perror("bind failed");close(server_fd);exit(EXIT_FAILURE);}// 监听服务器套接字if (listen(server_fd, 3) < 0) {perror("listen");close(server_fd);exit(EXIT_FAILURE);}std::cout << "Listening on port " << PORT << std::endl;// 创建 pollfd 结构数组std::vector<pollfd> fds;pollfd server_pollfd = {server_fd, POLLIN, 0};fds.push_back(server_pollfd);while (true) {// 调用 poll 系统调用int activity = poll(fds.data(), fds.size(), -1);if (activity < 0) {perror("poll error");break;}// 检查服务器套接字是否有新连接if (fds[0].revents == POLLIN) {if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) {perror("accept");exit(EXIT_FAILURE);}std::cout << "New connection, socket fd is " << new_socket << std::endl;// 添加新连接到 pollfd 结构数组pollfd client_pollfd = {new_socket, POLLIN, 0};fds.push_back(client_pollfd);}// 检查现有连接是否有数据for (size_t i = 1; i < fds.size(); i++) {if (fds[i].revents == POLLIN) {valread = read(fds[i].fd, buffer, BUFFER_SIZE);if (valread == 0) {// 客户端断开连接std::cout << "Client disconnected, socket fd is " << fds[i].fd << std::endl;close(fds[i].fd);fds.erase(fds.begin() + i);i--; // 修正索引} else {buffer[valread] = '\0';std::cout << "Received: " << buffer << std::endl;}}}}close(server_fd);return 0;
}
epollIO多路复用技术:
epoll 是 Linux 2.6 版本内核提供的一种 I/O 事件通知机制,相比 select 和 poll,epoll 更加高效,特别适用于处理大量文件描述符的场景。epoll 提供了较高的性能,因为它在内核中使用了更为复杂的数据结构和算法,以减少在处理大量文件描述符时的开销。
epoll通过epoll_create系统调用在内核中创建一个eventpoll的结构,其中有三部分,分别是edyList(已就绪事件),rbr(未就绪事件的红黑树,通过eqoll_ctl系统调用推入信息),wq(保存等待已就绪事件的进程)。当rbr内保存的文件描述符有数据时,该结构体会从红黑树转移至已就绪队列,并且唤醒wq中的进程。如下图
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <vector>#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_EVENTS 10int main() {int server_fd, new_socket, valread;struct sockaddr_in address;int addrlen = sizeof(address);char buffer[BUFFER_SIZE];// 创建服务器socketif ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {perror("socket failed");exit(EXIT_FAILURE);}// 设置服务器地址address.sin_family = AF_INET;address.sin_addr.s_addr = INADDR_ANY;address.sin_port = htons(PORT);// 绑定服务器socketif (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {perror("bind failed");close(server_fd);exit(EXIT_FAILURE);}// 监听服务器socketif (listen(server_fd, 3) < 0) {perror("listen");close(server_fd);exit(EXIT_FAILURE);}std::cout << "Listening on port " << PORT << std::endl;// 创建 epoll 实例int epoll_fd = epoll_create(MAX_EVENTS);if (epoll_fd == -1) {perror("epoll_create");exit(EXIT_FAILURE);}// 添加服务器socket到 epoll 实例struct epoll_event event;event.events = EPOLLIN;event.data.fd = server_fd;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) == -1) {perror("epoll_ctl");exit(EXIT_FAILURE);}std::vector<epoll_event> events(MAX_EVENTS);while (true) {int num_fds = epoll_wait(epoll_fd, events.data(), MAX_EVENTS, -1);if (num_fds == -1) {perror("epoll_wait");break;}for (int i = 0; i < num_fds; ++i) {if (events[i].data.fd == server_fd) {// 处理新的连接new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen);if (new_socket == -1) {perror("accept");continue;}std::cout << "New connection, socket fd is " << new_socket << std::endl;// 添加新连接到 epoll 实例event.events = EPOLLIN;event.data.fd = new_socket;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &event) == -1) {perror("epoll_ctl");close(new_socket);}} else {// 处理现有连接的数据valread = read(events[i].data.fd, buffer, BUFFER_SIZE);if (valread == 0) {// 客户端断开连接std::cout << "Client disconnected, socket fd is " << events[i].data.fd << std::endl;close(events[i].data.fd);} else {buffer[valread] = '\0';std::cout << "Received: " << buffer << std::endl;send(events[i].data.fd, buffer, valread, 0);}}}}close(server_fd);close(epoll_fd);return 0;
}