epoll
是一种高效的 I/O 多路复用机制,广泛用于 Linux 系统中,用于处理大量并发的文件描述符。它比传统的 select
和 poll
方法具有更好的性能,特别是在处理大量并发连接时。
1.epoll的设计思路
epoll是在select 出现 N 多年后才被发明的,是select 和 poll(poll 和 select 基本一样,有少量改进)的增强版本。epoll通过以下一些措施来改进效率:
-
措施一:功能分离
-
select 低效的原因之一是将“维护等待队列”和“阻塞进程”两个步骤合二为一。
如上图所示,每次调用select都需要这两步操作,然而大多数应用场景中,需要监视的socket相对固定,并不需要每次都修改。
epoll将这两个操作分开,先用epoll_ctl 维护等待队列,再调用epoll_wait 阻塞进程。显而易见,效率就能得到提升。
为方便理解后续的内容,我们先了解一下epoll的用法。如下的代码中,先用epoll_create 创建一个epoll对象 epfd,再通过epoll_ctl 将需要监视的socket添加到 epfd 中,最后调用epoll_wait 等待数据:
int s =socket(AF_INET, SOCK_STREAM, 0);
bind(s, ...)
listen(s, ...) int epfd =epoll_create(...);
epoll_ctl(epfd, ...); //将所有需要监听的socket添加到epfd中 while(1){ int n =epoll_wait(...) for(接收到数据的socket){ //处理 }
}
功能分离,使得epoll有了优化的可能。
措施二:就绪列表
select低效的另一个原因在于程序不知道哪些socket收到数据,只能一个个遍历。如果内核维护一个“就绪列表”,引用收到数据的socket,就能避免遍历。
如上图所示,计算机共有三个socket,收到数据的sock2和sock3 被就绪列表rdlist 所引用。当进程被唤醒后,只要获取rdlist 的内容,就能够知道哪些socket收到数据。
2.epoll底层使用的数据结构
2.1索引的数据结构
既然epoll将“维护监视队列”和“进程阻塞”分离,也意味着需要有个数据结构来保存监视的socket,至少要方便地添加和移除,还要便于搜索,以避免重复添加。
epoll 在内核里使用红黑树来跟踪进程所有待检测的文件描述字,把需要监控的 socket 通过epoll_ctl() 函数加入内核中的红黑树里,红黑树是个高效的数据结构,增删改一般时间复杂度是0(logn)。而 select/poll 内核里没有类似 epoll 红黑树这种保存所有待检测的 socket 的数据结构,所以select/poll 每次操作时都传入整个 socket 集合给内核,而 epoll 因为在内核维护了红黑树,可以保存所有待检测的 socket ,所以只需要传入一个待检测的 socket,减少了内核和用户空间大量的数据拷贝和内存分配。
2.2就绪列表的数据结构
就绪列表引用着就绪的socket,所以它应能够快速的插入数据。程序可能随时调用epoll_ctl 添加监视socket,也可能随时删除。当删除时,若该socket已经存放在就绪列表中,它也应该被移除。所以就绪列表应是一种能够快速插入和删除的数据结构。双向链表就是这样一种数据结构,epoll使用双向链表来实现就绪队列(对应上图的rdlist)。
第二点, epoll使用事件驱动的机制,内核里维护了一个双向链表来记录就绪事件,当某个socket 有事件发生时,通过回调函数内核会将其加入到这个就绪事件列表中,当用户调用 epoll_wait()函数时,只会返回有事件发生的文件描述符的个数,不需要像 select/poll 那样轮询扫描整个socket 集合,大大提高了检测的效率。
epoll 的方式即使监听的 Socket 数量越多的时候,效率不会大幅度降低,能够同时监听的 Socket 的数目也非常的多了,上限就为系统定义的进程打开的最大文件描述符个数。因而,epoll 被称为解决 C10K 问题的利器。插个题外话,网上文章不少说, epoll_wait 返回时,对于就绪的事件,epoll 使用的是共享内存的方式,即用户态和内核态都指向了就绪链表,所以就避免了内存拷贝消耗。
这是错的!看过 epoll 内核源码的都知道,压根就没有使用共享内存这个玩意。你可以从下面这份代码看到,epoll_wait 实现的内核代码中调用了put_user 函数,这个函数就是将数据从内核拷贝到用户空间。
3.epoll的工作流程
3.1.创建epoll对象
如下图所示,当某个进程调用epoll_create 方法时,内核会创建一个 eventpoll 对象(也就是程序中 epfd 所代表的对象)。
eventpoll 对象也是文件系统中的一员,和socket一样,它也会有等待队列。创建一个代表该epoll的 eventpoll 对象是必须的,因为内核要维护“就绪列表”等数据,“就绪列表”可以作为 eventpoll 的成员。
3.2.维护监视列表
创建epoll对象后,可以用epoll_ctl 添加或删除所要监听的socket。以添加socket为例。
如上图,如果通过epoll_ctl 添加sock1、sock2 和sock3 的监视,内核会将 eventpoll 添加到这三个socket的等待队列中。当socket收到数据后,中断程序会操作 eventpoll 对象,而不是直接操作进程。
3.3.接收数据
当socket收到数据后,中断程序会给 eventpoll 的“就绪列表”添加socket引用。
如上图展示的是sock2 和sock3 收到数据后,中断程序让rdlist 引用这两个socket。
eventpoll 对象相当于socket和进程之间的中介,socket的数据接收并不直接影响进程,而是通过改变 eventpoll 的就绪列表来改变进程状态。
当程序执行到epoll_wait 时,如果rdlist 已经引用了socket,那么epoll_wait 直接返回,如果 rdlist 为空,阻塞进程。
3.4.阻塞和唤醒进程
假设计算机中正在运行进程 A 和进程 B,在某时刻进程 A 运行到了epoll_wait 语句。
如上图所示,内核会将进程 A 放入 eventpoll 的等待队列中,阻塞进程。
当socket接收到数据,中断程序一方面修改rdlist,另一方面唤醒 eventpoll 等待队列中的进程,进程 A 再次进入运行状态(如下图)。
也因为rdlist 的存在,进程 A 可以知道哪些socket发生了变化。
4.实例代码
下面是一个使用 epoll
的示例代码,演示了如何创建 epoll
实例、注册文件描述符、等待事件和处理事件。此示例是一个简单的 TCP 服务器,能够接受客户端连接并处理数据。
#include <stdio.h>
#include <ctype.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>// 设置文件描述符为非阻塞
void set_nonblocking(int fd) {int flags = fcntl(fd, F_GETFL, 0);if (flags == -1) {perror("fcntl F_GETFL 错误");exit(1);}if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {perror("fcntl F_SETFL 错误");exit(1);}
}// 服务器主函数
int main(int argc, const char* argv[])
{// 创建监听套接字int lfd = socket(AF_INET, SOCK_STREAM, 0);if(lfd == -1){perror("socket 错误");exit(1);}// 设置监听套接字为非阻塞set_nonblocking(lfd);// 绑定服务器地址和端口struct sockaddr_in serv_addr;memset(&serv_addr, 0, sizeof(serv_addr));serv_addr.sin_family = AF_INET;serv_addr.sin_port = htons(9999); // 监听端口9999serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 绑定所有网络接口的IP地址// 设置端口复用int opt = 1;setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));// 将套接字绑定到指定地址int ret = bind(lfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));if(ret == -1){perror("绑定错误");exit(1);}// 开始监听连接请求ret = listen(lfd, 64);if(ret == -1){perror("监听错误");exit(1);}// 创建一个 epoll 实例int epfd = epoll_create(100);if(epfd == -1){perror("epoll_create 错误");exit(1);}// 将监听套接字 lfd 加入 epoll 实例,监听读事件,使用ET模式struct epoll_event ev;ev.events = EPOLLIN | EPOLLET; // 监听读事件,ET模式ev.data.fd = lfd; // 数据是监听套接字 lfdret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);if(ret == -1){perror("epoll_ctl 错误");exit(1);}// 用于存放触发事件的数组struct epoll_event evs[1024];int size = sizeof(evs) / sizeof(struct epoll_event);// 进入事件处理循环while(1){// 等待事件触发int num = epoll_wait(epfd, evs, size, -1);if(num == -1){perror("epoll_wait 错误");exit(1);}// 处理所有触发的事件for(int i = 0; i < num; ++i){int curfd = evs[i].data.fd; // 获取当前事件对应的文件描述符// 如果是监听套接字 lfd 有事件发生,表示有新连接if(curfd == lfd){// 接受所有新连接while (1) {int cfd = accept(lfd, NULL, NULL);if(cfd == -1){if (errno == EAGAIN || errno == EWOULDBLOCK) {// 所有连接都已处理break;} else {perror("accept 错误");continue;}}// 设置新连接为非阻塞set_nonblocking(cfd);// 将新连接 cfd 添加到 epoll 实例中监听其读事件,使用ET模式ev.events = EPOLLIN | EPOLLET;ev.data.fd = cfd;ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);if(ret == -1){perror("epoll_ctl-accept 错误");exit(1);}printf("新连接 %d 加入\n", cfd);}}else{// 处理已连接套接字的数据收发char buf[1024];int len;// 使用循环确保将缓冲区中所有数据读取完毕while ((len = recv(curfd, buf, sizeof(buf), 0)) > 0) {printf("客户端 %d 说: %s", curfd, buf);send(curfd, buf, len, 0);memset(buf, 0, sizeof(buf));}if(len == -1 && (errno != EAGAIN && errno != EWOULDBLOCK)){perror("recv 错误");// 出错时关闭连接,并从 epoll 实例中删除epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);close(curfd);}else if(len == 0){// 客户端断开连接printf("客户端 %d 已断开连接\n", curfd);epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);close(curfd);}}}}close(lfd);return 0;
}