Linux Select
在Linux中,我们可以使用select函数实现I/O端口的复用,传递给 select函数的参数会告诉内核:
•我们所关心的文件描述符
•对每个描述符,我们所关心的状态。(我们是要想从一个文件描述符中读或者写,还是关注一个描述符中是否出现异常)
•我们要等待多长时间。(我们可以等待无限长的时间,等待固定的一段时间,或者根本就不等待)
从 select函数返回后,内核告诉我们一下信息:
•对我们的要求已经做好准备的描述符的个数
•对于三种条件哪些描述符已经做好准备.(读,写,异常)
select
——用于IO多路复用
(1)函数原型
#include<sys/time.h>
#include<sys/types.h>
#include<unistd.h>
int select(int n,fd_set * readfds,fd_set * writefds,fd_set * exceptfds,struct timeval * timeout);
(2)参数
n:最大的文件描述词加1;
readfds、writefds 和exceptfds:称为描述词组,是用来回传该描述词的读,写或例外的状况;
timeout:用来设置select()的等待时间。
struct timeval
{time_t tv_sec;time_t tv_usec;
};
(3)返回值
如果参数timeout设为NULL则表示select()没有timeout。
执行成功则返回文件描述词状态已改变的个数,如果返回0代表在描述词状态改变前已超过timeout时间,当有错误发生时则返回-1,错误原因存于errno,此时参数readfds,writefds,exceptfds和timeout的值变成不可预测。
EBADF 文件描述词为无效的或该文件已关闭
EINTR 此调用被信号所中断
EINVAL 参数n 为负值。
ENOMEM 核心内存不足
常见的程序片段:fs_set readset;
FD_ZERO(&readset);
FD_SET(fd,&readset);
select(fd+1,&readset,NULL,NULL,NULL);
if(FD_ISSET(fd,readset){……}
#ifndef _SELECT_H_
#define _SELECT_H_#include "wrap.h"
#include "client_list.h"
#include "server_queue.h"#define SERVER_PORT 6780
#define MAXLINE 100
#define OPEN_MAX 65535
#define TCP_FRAME_SIZE 1200typedef struct
{int sockfd; // server socketint port; // server portstruct sockaddr_in addr; // server addrint maxi; // select maxint maxfd;int aggregate[FD_SETSIZE];// select aggregationfd_set allset;server_queue_t send_queue; // server send data queue to clientserver_queue_t recv_queue; // server recv data queue from clientpthread_t send_thread;pthread_t recv_thread;client_t *client; // client list -- save all client info
} server_t;/* recv and send queue frame */
typedef struct
{int sockfd; // client socketuint16_t length;char data[TCP_FRAME_SIZE];
} __packed tcp_frame_t;//==========================================================
server_t *SocketInit(void);#endif /* _SELECT_H_ */
#include "select.h"
#include "debug.h"static server_t *socket_init(void)
{int opt = 1, i;server_t *current;current = (server_t *)malloc(sizeof(server_t));current->port = SERVER_PORT;current->sockfd = Socket(AF_INET, SOCK_STREAM, 0);// SOL_SOCKET: port can same, ip notSetsockopt(current->sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));current->addr.sin_family = AF_INET;current->addr.sin_port = htons(current->port);current->addr.sin_addr.s_addr = INADDR_ANY;Bind(current->sockfd, (struct sockaddr *)¤t->addr, sizeof(current->addr));Listen(current->sockfd, MAXLINE);current->maxi = -1;current->maxfd = current->sockfd;for(i = 0; i < FD_SETSIZE; ++i){current->aggregate[i] = -1;}FD_ZERO(current->allset);FD_SET(current->sockfd , current->allset);ServerQueueInit(¤t->send_queue, TCP_FRAME_SIZE);ServerQueueInit(¤t->recv_queue, TCP_FRAME_SIZE);return current;
}static void socket_accept(server_t *arg, fd_set *rset)
{server_t *current = arg;struct sockaddr_in addr;int len = sizeof(struct sockaddr_in), i;int new_fd = Accept(current->sockfd, (struct sockaddr *)&addr, &len);debug("new connection client_fd ( %d ) %s: %d\n", new_fd, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));for(i = 0; i < FD_SETSIZE; ++i){if(current->aggregate[i] < 0){// add new_fd to aggregatecurrent->aggregate[i] = new_fd;break;}}if(FD_SETSIZE == i){printf("too many connects\n");Close(new_fd);return;}FD_SET(new_fd , current->allset);if(new_fd > current->maxfd){current->maxfd = new_fd;}if(i > current->maxi){current->maxi = i;}/* add client node */client_t *node = (client_t *)malloc(sizeof(client_t));node->sockfd = new_fd;memcpy(&node->addr, &addr, sizeof(struct sockaddr_in));ClientAdd(node);
}static void socket_recv(server_t *arg, fd_set *rset, int ret)
{server_t *current = arg;int i, sockfd, length = 0;tcp_frame_t write;for(i = 0; i <= current->maxi; ++i){if((sockfd = current->aggregate[i]) < 0){continue;}if(FD_ISSET(sockfd , rset)){length = recv(sockfd, write.data, TCP_FRAME_SIZE, 0);if(0 == length){/* delete client node, close connect socket */debug("client[%d] close\n", sockfd);ClientDel(sockfd);Close(sockfd);FD_CLR(sockfd , current->allset);current->aggregate[i] = -1;continue;}else if(length > 0){write.sockfd = sockfd;write.length = length;server_debug(write.data, write.length);// add data to recv_queue, pop in other,if(ServerQueueWrite(¤t->recv_queue, (uint8_t *)&write, sizeof(tcp_frame_t)) == 0){debug("push failure...queue full...\n");}}if(--ret <= 0){break;}}}
}static void *server_recv_thread(void *arg)
{server_t *current = (server_t *)arg;fd_set rset;struct timeval timeout;while(1){rset = current->allset;timeout.tv_sec = 0;timeout.tv_usec = 200;int ret = Select(current->maxfd + 1 , &rset, NULL , NULL , &timeout);if(0 == ret){continue;}if(FD_ISSET(current->sockfd, &rset)){socket_accept(current, &rset); // a new connect come}if(--ret < 0){continue;}socket_recv(current, &rset, ret); // a exsit connect send data to us}Close(current->sockfd);return NULL;
}static void *server_send_thread(void *arg)
{server_t *current = (server_t *)arg;tcp_frame_t *read = NULL;while(1){//read = (tcp_frame_t*)ServerQueueRead(¤t->send_queue, sizeof(tcp_frame_t));if(read != NULL){//server_debug(read->data, read->length);}usleep(100);}return NULL;
}server_t * SocketInit(void)
{server_t *current = socket_init();debug("create thread...\r\n");pthread_create(¤t->send_thread, NULL, server_send_thread, current);pthread_create(¤t->recv_thread, NULL, server_recv_thread, current);return current;
}