在之前的文章中,我们学习了IO复用模型之select原理及例子,但是select
有监听描述符个数的限制,而且select
的效率并不高,所以这篇文章就来学习效率更高的poll
和Linux特有的epoll
方法。
文章目录
- 1 select/poll/epoll对比
- 2 poll
- 2.1 poll函数
- 2.2 poll实战:实现多个套接字监听
- 2.2.1 客户端
- 2.2.2 服务端
- 2.2.3 实验结果
- 2.2.4 完整代码
- 3 epoll
- 3.1 相关函数
- 3.2 epoll实战:实现多个套接字监听
- 3.2.1 客户端
- 3.2.2 服务端
- 3.2.3 实验结果
- 3.3.4 完整代码
1 select/poll/epoll对比
这三者都用于I/O多路复用来监视多个文件描述符。epoll
的目的是取代较旧的POSIX中的select
和poll
系统调用。
复杂性与可扩展性
select
或poll
的时间复杂度为O(n),每次调用内核都需要遍历整个文件描述符epoll
的时间复杂度为O(1),它使用红黑树来跟踪当前被监视的所有文件描述符。epoll
在文件描述符很多的情况下表现良好,且具有良好的可扩展性
可用性与可移植性
-
select
和poll
在任何Unix系统上都可用 -
epoll
是Linux特有的(在2.5.44版本之后可用) -
poll
是POSIX标准接口,因此在需要代码可移植时可以使用
poll和select
给定一组文件描述符,它们告诉你哪些文件描述符有可读/可写的数据。select
和poll
从根本上基本使用相同的代码。poll
对文件描述符返回一组可能的结果,如POLLRDNORM
、POLLRDBAND
、POLLIN
、POLLHUP
、POLLERR
,而select
只告诉你有输入/输出/错误。
如果你有一个稀疏的文件描述符集(如设备长时间运行,在文件描述符回收和创建的过程中,可能一个描述符为1,一个描述符为1000),poll
可以比select
执行得更好。poll
使用pollfd
参数指定要监视的文件描述符;select
使用位集并需要遍历整个范围。
select
函数在某些系统上有文件描述符数量的限制,通常由文件描述符集的大小限制,例如 FD_SETSIZE
。这个宏定义了文件描述符集的最大大小,通常是1024。而poll
使用一个动态分配的数组来存储文件描述符集,因此理论上没有硬性的文件描述符数量限制。但在实际使用中,系统可能对单个进程所能打开的文件描述符总数有一定的限制,这是由操作系统的配置和资源限制决定的(可使用ulimit -n
查看)。
2 poll
2.1 poll函数
poll
允许程序监视多个文件描述符以确定是否可以进行I/O操作。
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
fds
:指向pollfd
结构体数组的指针,每个结构体表示一个要监视的文件描述符以及感兴趣的事件nfds
:数组中结构体的数量。timeout
:超时时间,单位是毫秒。传递负值表示无限超时,传递0表示立即返回。
pollfd
结构体:
struct pollfd {int fd; // 文件描述符short events; // 要监视的事件short revents; // 实际发生的事件
};
其中events
/revents
的取值可以为如下几个选项:
POLLIN
:有数据可读。对于套接字来说,表示连接被对端关闭。POLLPRI
:有紧急数据可读。对于套接字来说,表示有带外数据。POLLOUT
:对端可写。POLLRDHUP
:对端挂起(连接关闭或半关闭)。POLLERR
:有错误发生。POLLHUP
:挂起事件。对于套接字来说,表示连接被挂起。POLLNVAL
:无效的请求,文件描述符未打开。
例如,如果你想监视可读和错误事件,可以将events
设置为 POLLIN | POLLERR
。
注意
- 如果
revents
中包含POLLNVAL
,说明文件描述符无效或未打开,此时poll
结果可能不可靠 revents
中可能同时包含多个标志,需要使用位运算和上述常量进行判断POLLRDHUP
和POLLHUP
标志在不同系统上可能有不同的行为,具体情况可以查看文档或相关头文件定义
2.2 poll实战:实现多个套接字监听
和之前select
一样,这里就来实现一个服务端和客户端的模型,从代码中来深入理解poll
函数的使用。
2.2.1 客户端
客户端需要能够监听标准输入stdin
的消息,然后转发个服务端;还需要监听服务端的套接字,以接收服务端发来的消息。代码如下:
struct pollfd fds[2];
char buffer[1024];fds[0].fd = 0; // stdin
fds[0].events = POLLIN;
fds[1].fd = sock;
fds[1].events = POLLIN;while(1)
{int ret = poll(fds, 2, -1);if (ret > 0){if (fds[0].revents & POLLIN){fgets(buffer, sizeof(buffer), stdin);send(sock, buffer, strlen(buffer), 0);}if (fds[1].revents & POLLIN){int valread = read(sock, buffer, sizeof(buffer));if (valread > 0){buffer[valread] = '\0';printf("Server says: %s", buffer);}else{// Server disconnectedprintf("Server disconnected\n");break;}}}
}
这里声明了一个 pollfd
结构体变量fds
,监听stdin
和服务端的套接字。poll
第三个超时参数为-1,表示无限等待。在poll
返回之后,我们只需要判断对应fds
中revents
对应的事件有没有置位就行了。
2.2.2 服务端
服务端则是一边要accept
新的客户端连接请求,一边接收来自客户端的消息并回显回去。代码如下:
int client_sockets[MAX_CLIENTS] = {0};
struct pollfd fds[MAX_CLIENTS + 1]; // +1 for the listening socket// Initialize the pollfd structure for the listening socket
fds[0].fd = server_fd;
fds[0].events = POLLIN;while (1)
{activity = poll(fds, max_clients + 1, -1);if ((activity < 0) && (errno != EINTR)){perror("Poll error");exit(EXIT_FAILURE);}// Check for incoming connections on the listening socketif (fds[0].revents & POLLIN){if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0){perror("Accept failed");exit(EXIT_FAILURE);}printf("New connection, socket fd is %d, ip is: %s, port is: %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));// Add the new socket to the array of client socketsfor (i = 1; i < max_clients + 1; i++){if (client_sockets[i] == 0){client_sockets[i] = new_socket;fds[i].fd = new_socket;fds[i].events = POLLIN;printf("Added new client to the list of sockets at index %d\n", i);break;}}}// Check for data from clientsfor (i = 1; i < max_clients + 1; i++){sd = client_sockets[i];if (fds[i].revents & POLLIN){if ((valread = read(sd, buffer, 1024)) == 0){close(sd);client_sockets[i] = 0;printf("Client at index %d disconnected\n", i);}else{buffer[valread] = '\0';printf("Client at index %d says: %s\n", i, buffer);}}}
}
和select
一样,这里可以判断一下poll
的返回值,小于0表示系统异常,但是如果errno
为EINTR
则表示进程被信号中断,继续下一次poll
即可。
2.2.3 实验结果
首先打开服务端,再打开客户端,连接上后,客户端向服务端发送nohao
,然后按Ctrl+C
退出客户端,如下图所示:
2.2.4 完整代码
客户端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <poll.h>#define PORT 8080int main() {int sock = 0;struct sockaddr_in serv_addr;if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {perror("Socket creation error");exit(EXIT_FAILURE);}serv_addr.sin_family = AF_INET;serv_addr.sin_port = htons(PORT);// Convert IPv4 and IPv6 addresses from text to binary formif (inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr) <= 0) {perror("Invalid address/ Address not supported");exit(EXIT_FAILURE);}if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {perror("Connection Failed");exit(EXIT_FAILURE);}struct pollfd fds[2];char buffer[1024];fds[0].fd = 0; // stdinfds[0].events = POLLIN;fds[1].fd = sock;fds[1].events = POLLIN;while (1) {int ret = poll(fds, 2, -1);if (ret > 0) {if (fds[0].revents & POLLIN) {fgets(buffer, sizeof(buffer), stdin);send(sock, buffer, strlen(buffer), 0);}if (fds[1].revents & POLLIN) {int valread = read(sock, buffer, sizeof(buffer));if (valread > 0) {buffer[valread] = '\0';printf("Server says: %s", buffer);} else {// Server disconnectedprintf("Server disconnected\n");break;}}}}close(sock);return 0;
}
服务端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <poll.h>
#include <errno.h>#define PORT 8080
#define MAX_CLIENTS 10int main() {int server_fd, new_socket, max_clients = MAX_CLIENTS;int activity, i, valread;int sd, max_sd;struct sockaddr_in address;int addrlen = sizeof(address);char buffer[1024];// Create a socketif ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {perror("Socket creation failed");exit(EXIT_FAILURE);}// Set up the server address structaddress.sin_family = AF_INET;address.sin_addr.s_addr = INADDR_ANY;address.sin_port = htons(PORT);// Bind the socket to the addressif (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {perror("Bind failed");exit(EXIT_FAILURE);}// Listen for incoming connectionsif (listen(server_fd, 3) < 0) {perror("Listen failed");exit(EXIT_FAILURE);}int client_sockets[MAX_CLIENTS] = {0};struct pollfd fds[MAX_CLIENTS + 1]; // +1 for the listening socket// Initialize the pollfd structure for the listening socketfds[0].fd = server_fd;fds[0].events = POLLIN;printf("Waiting for connections...\n");while (1) {// Use poll to wait for eventsactivity = poll(fds, max_clients + 1, -1);if ((activity < 0) && (errno != EINTR)) {perror("Poll error");exit(EXIT_FAILURE);}// Check for incoming connections on the listening socketif (fds[0].revents & POLLIN) {if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) {perror("Accept failed");exit(EXIT_FAILURE);}printf("New connection, socket fd is %d, ip is: %s, port is: %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));// Add the new socket to the array of client socketsfor (i = 1; i < max_clients + 1; i++) {if (client_sockets[i] == 0) {client_sockets[i] = new_socket;fds[i].fd = new_socket;fds[i].events = POLLIN;printf("Added new client to the list of sockets at index %d\n", i);break;}}}// Check for data from clientsfor (i = 1; i < max_clients + 1; i++) {sd = client_sockets[i];if (fds[i].revents & (POLLIN | POLLHUP | POLLERR)) {if ((valread = read(sd, buffer, 1024)) == 0) {// Client disconnectedclose(sd);client_sockets[i] = 0;printf("Client at index %d disconnected\n", i);} else {// Process client message (in this example, just print it)buffer[valread] = '\0';printf("Client at index %d says: %s\n", i, buffer);}}}}return 0;
}
3 epoll
epoll
比select
和poll
更为灵活和高效,特别是在大量连接上的场景。
3.1 相关函数
来看一下与epoll
相关的函数原型:
1、epoll_create和epoll_create1:创建epoll
实例
int epoll_create(int size);
int epoll_create1(int flags);
epoll_create
:创建一个epoll
实例。size
参数在大多数情况下会被忽略,可以设置为大于0的任何值。epoll_create1
:与epoll_create
类似,但它支持flag
设置为EPOLL_CLOEXEC
,表示在调用exec
进程时,epoll
实例的文件描述符将会被关闭,以防止它在新程序中继续存在。这可以增强程序的安全性和可预测性。- 如果
flags
为 0,那么EPOLL_CLOEXEC
标志将不会被设置。
- 如果
2、epoll_ctl:在epoll
实例中注册、修改或删除文件描述符的监听事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epfd
:是一个由epoll_create
或epoll_create1
返回的epoll
实例的文件描述符。op
:是一个操作符,指定对epoll
实例的操作类型。可以取以下值:EPOLL_CTL_ADD
:添加一个新的文件描述符到epoll
实例中进行监听。EPOLL_CTL_MOD
:修改一个已经在epoll
实例中的文件描述符的监听事件。EPOLL_CTL_DEL
:从epoll
实例中删除一个文件描述符。
fd
:是要进行操作的文件描述符,即要添加、修改或删除的文件描述符。event
:是一个指向struct epoll_event
结构的指针,用于指定要监听的事件类型以及关联的数据。
3、epoll_wait:等待事件发生。返回发生的事件的数量,并将事件信息填充到提供的数组中。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
epfd
:是一个由epoll_create
或epoll_create1
返回的epoll
实例的文件描述符。events
:是一个指向struct epoll_event
结构的数组,用于存储发生事件的文件描述符和相关信息。maxevents
:events
数组的大小,即最多能存储多少个事件。timeout
:是等待的超时时间,以毫秒为单位。传递负值表示epoll_wait
将一直阻塞,直到有事件发生。传递0表示立即返回,不管是否有事件发生。
其中 **struct epoll_event
**结构体定义如下:
struct epoll_event {__uint32_t events; // 要监视的事件epoll_data_t data; // 用户数据
};typedef union epoll_data {void *ptr;int fd;__uint32_t u32;__uint64_t u64;
} epoll_data_t;
events
字段:表示要监视的事件,可以是EPOLLIN
(可读)、EPOLLOUT
(可写)等。具体的事件常量可以查看<sys/epoll.h>
头文件。data
字段:用于保存用户数据。可以是文件描述符(fd
)、指针(ptr
)等,取决于epoll_data
的类型。
3.2 epoll实战:实现多个套接字监听
这里用epoll
来实现一个服务端和客户端的模型,通过代码来理解epoll
的使用方法。
3.2.1 客户端
1、创建epoll实例
int epoll_fd = epoll_create1(0);
2、添加待监听的文件描述符
struct epoll_event event;event.events = EPOLLIN;
event.data.fd = STDIN_FILENO;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &event);event.events = EPOLLIN;
event.data.fd = sock;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &event);
这里epoll_event
的data
(用户数据)就保存文件描述符,用于后面判断是哪里来的消息。同时这里两个epoll_ctl
的最后一个参数用了同一个变量event
的地址传入,这是因为传入后函数内部会对数据进行拷贝。
3、等待和处理事件
struct epoll_event events[MAX_EVENTS];
while (1)
{int event_count = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);for (int i = 0; i < event_count; i++){if (events[i].data.fd == STDIN_FILENO){fgets(buffer, sizeof(buffer), stdin);send(sock, buffer, strlen(buffer), 0);}else if (events[i].data.fd == sock){ssize_t bytes_received = recv(sock, buffer, sizeof(buffer), 0);if (bytes_received <= 0){printf("Server disconnected\n");close(sock);exit(EXIT_SUCCESS);}else{buffer[bytes_received] = '\0';printf("Server says: %s\n", buffer);}}}
}
epoll_wait
会返回事件的个数,并将结果保存在events
中,我们只需要遍历它即可。
3.2.2 服务端
1、创建epoll实例和添加文件描述符
epoll_fd = epoll_create1(0);struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = server_fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event);
2、等待和处理事件
while (1)
{event_count = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);for (int i = 0; i < event_count; i++){if (events[i].data.fd == server_fd){if ((new_socket = accept(server_fd, (struct sockaddr *)&address, &addrlen)) < 0){perror("Accept failed");exit(EXIT_FAILURE);}event.events = EPOLLIN;event.data.fd = new_socket;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &event) == -1){perror("Failed to add new client socket to epoll");exit(EXIT_FAILURE);}printf("New connection, socket fd is %d, ip is: %s, port is: %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));}else{int client_socket = events[i].data.fd;ssize_t bytes_received = recv(client_socket, buffer, sizeof(buffer), 0);if (bytes_received <= 0){printf("Client at socket fd %d disconnected\n", client_socket);close(client_socket);epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_socket, NULL);}else{buffer[bytes_received] = '\0';printf("Client at socket fd %d says: %s\n", client_socket, buffer);}}}
}
3.2.3 实验结果
首先打开服务端,再打开客户端,连接上后,客户端向服务端发送123
,然后按Ctrl+C
退出客户端,如下图所示:
3.3.4 完整代码
客户端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>#define PORT 8080
#define MAX_EVENTS 10int main() {int sock = 0;struct sockaddr_in serv_addr;char buffer[1024];// Create a socketif ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {perror("Socket creation error");exit(EXIT_FAILURE);}// Set up the server address structserv_addr.sin_family = AF_INET;serv_addr.sin_port = htons(PORT);// Convert IPv4 and IPv6 addresses from text to binary formif (inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr) <= 0) {perror("Invalid address/ Address not supported");exit(EXIT_FAILURE);}// Connect to the serverif (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {perror("Connection Failed");exit(EXIT_FAILURE);}// Create epoll instanceint epoll_fd = epoll_create1(0);if (epoll_fd == -1) {perror("Failed to create epoll instance");exit(EXIT_FAILURE);}// Add stdin and socket to epollstruct epoll_event events[MAX_EVENTS];struct epoll_event event;event.events = EPOLLIN;event.data.fd = STDIN_FILENO;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &event) == -1) {perror("Failed to add stdin to epoll");exit(EXIT_FAILURE);}event.events = EPOLLIN;event.data.fd = sock;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &event) == -1) {perror("Failed to add socket to epoll");exit(EXIT_FAILURE);}while (1) {int event_count = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);for (int i = 0; i < event_count; i++) {if (events[i].data.fd == STDIN_FILENO) {// Data from stdinfgets(buffer, sizeof(buffer), stdin);send(sock, buffer, strlen(buffer), 0);} else if (events[i].data.fd == sock) {// Data from serverssize_t bytes_received = recv(sock, buffer, sizeof(buffer), 0);if (bytes_received <= 0) {// Server disconnectedprintf("Server disconnected\n");close(sock);exit(EXIT_SUCCESS);} else {// Process server message (in this example, just print it)buffer[bytes_received] = '\0';printf("Server says: %s\n", buffer);}}}}close(sock);return 0;
}
服务端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>#define PORT 8080
#define MAX_EVENTS 10int main() {int server_fd, new_socket;int epoll_fd, event_count;struct sockaddr_in address;socklen_t addrlen = sizeof(address);char buffer[1024];// Create a socketif ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {perror("Socket creation failed");exit(EXIT_FAILURE);}// Set up the server address structaddress.sin_family = AF_INET;address.sin_addr.s_addr = INADDR_ANY;address.sin_port = htons(PORT);// Bind the socket to the addressif (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {perror("Bind failed");exit(EXIT_FAILURE);}// Listen for incoming connectionsif (listen(server_fd, 3) < 0) {perror("Listen failed");exit(EXIT_FAILURE);}// Create epoll instanceepoll_fd = epoll_create1(0);if (epoll_fd == -1) {perror("Failed to create epoll instance");exit(EXIT_FAILURE);}// Add the server socket to epollstruct epoll_event event;event.events = EPOLLIN;event.data.fd = server_fd;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) == -1) {perror("Failed to add server socket to epoll");exit(EXIT_FAILURE);}struct epoll_event events[MAX_EVENTS];printf("Waiting for connections...\n");while (1) {event_count = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);for (int i = 0; i < event_count; i++) {if (events[i].data.fd == server_fd) {// New client connectionif ((new_socket = accept(server_fd, (struct sockaddr *)&address, &addrlen)) < 0) {perror("Accept failed");exit(EXIT_FAILURE);}// Add new client socket to epollevent.events = EPOLLIN;event.data.fd = new_socket;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &event) == -1) {perror("Failed to add new client socket to epoll");exit(EXIT_FAILURE);}printf("New connection, socket fd is %d, ip is: %s, port is: %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));} else {// Data from clientint client_socket = events[i].data.fd;ssize_t bytes_received = recv(client_socket, buffer, sizeof(buffer), 0);if (bytes_received <= 0) {// Client disconnectedprintf("Client at socket fd %d disconnected\n", client_socket);close(client_socket);epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_socket, NULL);} else {// Process client message (in this example, just print it)buffer[bytes_received] = '\0';printf("Client at socket fd %d says: %s\n", client_socket, buffer);}}}}close(server_fd);return 0;
}