一、Motivation
通常,操作系统会为每个进程划分一个时间片的,在这个时间片内进程可以合法占有 cpu 进行一些计算任务。并当时间片结束后自动退回至就绪状态待命,等待下一次的调度
但是,有一种情况会使进程提前(时间片还未用完)进入等待状态,即是进程发生了阻塞(多半是因为 I/O 请求)。进程一旦发生了阻塞,它就要让出 cpu 给其他进程,这个让位的动作就是进程之间切换的操作,这种操作非常蠢(在开发者眼里是无用功),也很耗时。可以说是时间和 cpu 资源没用在正儿八经的计算任务上
select 和 epoll 的提出就是来解决这个愚蠢的问题,有一种设想:在分配给该进程时间片还未结束之前,如果进程的某个 socket 连接发生阻塞,先不急着逼该进程退位,而是通过某种手段去查询一下进程的其他 socket 连接是否有已就绪的。如果其他 socket 连接有活动可以处理,不如充分利用 cpu 先进行计算,在处理完成 OR 时间片到期后再让位也不迟。这样不就可以提高计算机资源的利用率了嘛
但是,在 Linux 老的版本中,有关事件触发的问题,一直是采用 select 轮询手段来解决的,所谓的轮询就是 cpu 不停地去查询任务队列是否有已经就绪的任务。这种方法在任务较少的情况下还能勉强应付,当任务数量增加至千级数量级之后,效率就会出现断崖式地降低。因为每次需要轮询上千个任务,自然非常耗时
为此,Linux 提出了新的解决方法 epoll,不再采用轮询的方法来感知新事件的发生,而是通过 epoll 结构体内部的红黑树来自动将等待的任务和就绪的任务分开,从而使 kernel 能够快速感知新事件的发生
再说直白一点,只要活儿足够多,epoll_wait 根本就不会让用户进程阻塞,用户进程会一直干活,直到属于该进程的时间片结束。这样就大大减少了进程切换次数,提高了效率
epoll 相比于 select 和 poll 厉害的地方,即是它可以快速感知已连接的 socket 动静,而 select 和 poll 需要用户层将描述符集合 sockets 传入 kernel,接着进行一次遍历之后才能知道 socket 的变动情况
其实,socket 有什么动静我们是能够快速感知的(通过 TCP/IP 协议),但是我们要准确识别到底是哪个 socket 发生了变化,这件事确实有很多种不同的做法,不同的做法也带来了不同的效率
epoll 是在用户层第一次注册 socket 的时候就将其传入 kernel 了,具体表现为在 epoll 对象的红黑树 rbr 中创建该 socket 的 epitem,这种行为就是用户层数据拷贝进 kernel 的动作。与 select 和 poll 每次都要将集合 sockets 传入 kernel 的事实相比,无疑是 epoll 在初始化时就将 socket 传入 kernel 的做法效率要高很快
epoll 效率高的背后其实是牺牲 kernel 空间(创建红黑树)来换取时间的妥协,select 和 poll 因为只是临时(遍历完会释放)占用 kernel 空间,所以它们的效率比较低。归根结底,就是空间换时间
二、Solutions
S1 - epoll_create
创建一个 epoll 句柄,size 用来告诉 kernel 共能监听多少个事件,
int epoll_create(int size)
这个参数在现在的版本中没有意义,kernel 会根据实际情况自行决定的,意思就是说这个 size 只是我们规定的事件的大致数量,而不是能够处理的最大事件数
epoll 结构体中定义的等待队列 wq 存放阻塞在 epoll 对象上的用户进程,当软中断数据就绪时会前来寻找进程;epoll 对象用红黑树 rbr 来管理用户进程 accept 添加进来的所有 socket 连接,选用红黑树的原因是因为红黑树能够更好地支持海量连接的查找、插入和删除;就绪链表 rdllist 存放着一些已就绪的任务,这样一来,应用进程只需要查询 rdllist 就能判断是否有就绪任务可供处理,而不必去遍历整棵红黑树
S2 - epoll_ctl
该方法向 epoll 对象中添加、修改和删除特定的事件,返回 0 表示成功,-1 表示失败,
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event)
添加意味着对这件事感兴趣,应用进程想收来处理;删除则表示对这件事没了兴趣。其中,epfd 是 epoll 对象的 id,epoll_create()
的返回值;op 有三种操作类型,EPOLL_CTL_ADD、EPOLL_CTL_MOD 和 EPOLL_CTL_DEL;fd 是需要监听的文件描述符,通常是连接至服务端的 socket;最后一个参数 event 可以是以下几种宏的集合,
- EPOLLIN:文件描述符可读
- EPOLLOUT:文件描述符可写
- EPOLLPRI:文件描述符有紧急数据可读
- EPOLLERR:文件描述符发生错误
- EPOLLHUP:文件描述符被挂断
- EPOLLET:边缘触发(后面会讲到)
- EPOLLONESHOT:只监听一次,意味着触发来事件之后就被踢出 epoll 对象中了
它是一个传入的指针,这就要求我们需要在进入函数之前分配好空间并初始化,以便 epoll_create()
可以在方法内获取内容,但 epoll_create()
并不会替我们释放 events 空间
再进一步解释,当有新的 socket 连接加入 epoll 对象时,epoll 对象会创建一个 epitem 用来关联该 socket 连接,然后将 epitem 挂到红黑树 rbr 中。之后,会设置该 epitem 的回调函数(如果该连接有数据写入,请将其存入 epoll 对象的就绪链表 rdllist 中),以及其他的回调函数
在这我只列举了 “增” 的一个例子,其他关于 “删” 和 “改” 的操作,它们的本质是一样的,都是 socket 连接有什么动作就会去调用对应的回调函数。关于能够快速实现 “增删改查” 最主要的原因是因为选用了红黑树
补充一个 demo,假设处于阻塞状态的 socket 有数据写入了,第一步会去通知红黑树 rbr 找到(很快)该 socket 的 epitem;然后,调用 epitem 的回调函数将 epitem 加入就绪链表 rdllist 中。这一步主要是为了 epoll_wait 能够快速获取已经就绪的 socket 信息
S3 - epoll_wait
等待处于监听范围的事件发生,
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout)
epoll 对象会将已经发生的事件复制到数组 events 中,maxevents 是数组的长度;timeout 如果为 0,则意味着就绪链表 rdllist 若为空则立刻返回,不会等待;-1 表示阻塞,会一直陷入 epoll_wait 状态中
关于 ET 和 LT 模式,我想用简短的语言去描述,不要深究细节。ET(边缘触发)模式仅当状态发生变化时才会感知事件的发生,即使这个事件对应的缓冲区内还有未读取的数据;而 LT(水平触发)模式是只要有数据没处理就会一直通知下去
三、Result
我想透过一个简单的 demo 来介绍 epoll 的经典用法。说到用法,最常用的就是连接 socket,监听 socket 的动静并读/写数据进行处理,之后返回给 client 结果。我写了一个小写转大写的程式来说明 epoll 的用法,请看代码,
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <ctype.h>
#include <unistd.h>#define EPOLL_MAXSIZE 16
#define SRV_PORT_ID 1980 /* 端口号 */
#define SOCKET_QUEUE_LEN 20
#define BUFSIZE 256struct myepoll_data {int fd;char data[BUFSIZE];
};int main()
{int i,j;int epfd, sockfd, nfds, clntfd;struct sockaddr_in srvaddr, clntaddr;struct epoll_event ev, evs[EPOLL_MAXSIZE];socklen_t clntlen = sizeof(clntaddr);char buf[BUFSIZE];/* 创建epoll结构体(就绪链表、等待队列和红黑树) */epfd = epoll_create(EPOLL_MAXSIZE);if(epfd == -1) {printf("epoll_create err\n");goto over;}printf("epoll_create ok\n");/* 创建socket结构体 */sockfd = socket(AF_INET, SOCK_STREAM, 0);if(sockfd == -1) {printf("socket_create err\n");goto over;}printf("socket_create ok\n");/* 初始化socket绑定监听 */bzero(&srvaddr, sizeof(srvaddr));srvaddr.sin_family = AF_INET;srvaddr.sin_port = htons(SRV_PORT_ID);srvaddr.sin_addr.s_addr = htonl(INADDR_ANY);if(bind(sockfd, (struct sockaddr*)&srvaddr, sizeof(struct sockaddr)) == -1) {printf("socket_bind err\n");goto over;}printf("socket_bind ok\n");if(listen(sockfd, SOCKET_QUEUE_LEN) == -1) {printf("socket_listen err\n");goto over;}printf("socket_listen ok\n");/* 向epoll结构体中注册socket,实现监听功能 */ev.data.fd = sockfd;ev.events = EPOLLIN | EPOLLET;if(epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev) == -1) {printf("epoll_ctl_add err\n");goto over;}printf("epoll_ctl_add ok\n");/* 不停地处理外来事件 */while(1) {/* 阻塞地等待事件发生,其中0为没有就绪事件就立刻返回,-1为阻塞 */nfds = epoll_wait(epfd, evs, EPOLL_MAXSIZE, -1);/* 处理每个收上来的事件 */for(i=0; i<nfds; i++) {if(evs[i].data.fd == sockfd) { /* 有人敲sockfd的门了(收到新的连接)*/clntfd = accept(sockfd, (struct sockaddr*)&clntaddr, &clntlen);ev.events = EPOLLIN | EPOLLET;ev.data.fd = clntfd;if(epoll_ctl(epfd, EPOLL_CTL_ADD, clntfd, &ev) == -1)printf("epoll_ctl_add %d err\n", clntfd);elseprintf("epoll_ctL_add %d clnt ok\n", clntfd);} else if(evs[i].events & EPOLLIN) { /* 读取数据但先不处理 */clntfd = evs[i].data.fd;memset(buf, 0, BUFSIZE);if(read(clntfd, buf, BUFSIZE) == 0) { /* 客户端关闭连接 */if(epoll_ctl(epfd, EPOLL_CTL_DEL, clntfd, NULL) == -1) {printf("epoll_ctl_del %d err\n", clntfd);} else {printf("epoll_ctl_del %d ok\n", clntfd);close(clntfd);}continue;}/* 先接收client的请求 */struct myepoll_data fddata;fddata.fd = clntfd;strcpy(fddata.data, buf);ev.data.ptr = &fddata;memset(buf, 0, BUFSIZE);strcpy(buf, "i'm keep u's data, deal with it later, please check u can be written...\n");send(clntfd, buf, strlen(buf), 0);ev.events = EPOLLOUT | EPOLLET;/* 下一次epoll时再处理client的请求 */if(epoll_ctl(epfd, EPOLL_CTL_MOD, clntfd, &ev) == -1) printf("epoll_ctl_mod clnt %d EPOLLIN -> EPOLLOUT err\n", clntfd);else printf("epoll_ctl_mod clnt %d EPOLLIN -> EPOLLOUT ok\n", clntfd);} else if(evs[i].events & EPOLLOUT) { /* 对之前读取的数据予以处理并将处理结果返回给client */struct myepoll_data* fddata = (struct myepoll_data*)evs[i].data.ptr;clntfd = fddata->fd;char* data = fddata->data;memset(buf, 0, BUFSIZE);strcpy(buf, "i'm processing u's data, please waiting...\n");send(clntfd, buf, strlen(buf), 0);/* 将小写转为大写的业务逻辑 */for(j=0; j<strlen(data); j++)data[j] = toupper(data[j]);send(clntfd, data, strlen(data), 0);ev.events = EPOLLIN | EPOLLET;/* 准备接收client的下一次计算请求 */if(epoll_ctl(epfd, EPOLL_CTL_MOD, clntfd, &ev) == -1)printf("epoll_ctl_mod clnt %d EPOLLOUT -> EPOLLIN err\n", clntfd);else printf("epoll_ctl_mod clnt %d EPOLLOUT -> EPOLLIN ok\n", clntfd);} else {printf("unknown event\n");}}}over:return 0;
}
整个流程,我认为较为清晰,首先创建 socket,然后将 socket 添加进 epoll 对象中,这就意味着让 epoll 对象监听 socket 的一举一动。如果有数据写入 socket 中,那么就读出来,等待下一轮再进行处理(为什么下一轮再进行处理?而不是接收了请求就处理,其中的道理我暂时还没有悟透,但有人告诉我,先接收后处理的手法是 epoll 的精髓,说实话我并不认同,因为我不能说服自己要相信这种脱裤子放屁的说法)
按照流程走下去,在下一轮中进行处理(小写转大写),然后将结果返回给 client。这就是 epoll demo。在另一个终端中透过 nc 命令尝试连接 server 进程,
nc 127.0.0.1 1980
作为 client,输入小写的字符串,server 就会返回大写的结果,