网络异步编程
一、tcp连接的状态
- LISTEN:服务端状态,等待客户端发起连接请求
- SYN_SENT:客户端已发送同步连接请求,等待服务端相应
- SYN_RECEIVED:服务器收到客户端的SYN请请求,并发送自己的SYN响应,并等待客户端对这个SYN+ACK的确认(等待客户端连接确认);
- ESTABLISHED:双方完成三次握手,连接成功,可以进行数据传输;
- FIN_WAIT_1:主动关闭连接的一方(通常是客户端)已经发送FIN报文,但是还未收到对方的确认。此时仍可以进行数据接收;
- FIN_WAIT_2:主动关闭一方收到了对方的FIN确认,但是没收到对方的FIN,进入半连接状态,仅能接收数据;
- CLOSE_WAIT:被动关闭连接的一方已经收到FIN,并发送了确认,但尚关闭连接,等待应用层释放资源;
- CLOSING:双方都发送了关闭请求,都在等待对方确认;
- LAST_ACK:被动关闭的一方发送了FIN,等待最后的ACK来关闭连接;
- TIME_WAIT:主动关闭方发送完FIN,并收到对方的FIN+ACK后进入该状态,等待足够长的时间确保对方能够收到确认后再关闭连接;
- 所有连接终止程序完成后,套接字回到CLOSE状态
二、客户端的状态过程:
CLOSED->SYN_SENT->ESTABLISHED->FIN_WAIT_1->FIN_WAIT_2->TIME_WAIT->CLOSED
三、服务端的状态过程
CLOSED->LISTEN->SYN收到->ESTABLISHED->CLOSE_WAIT->LAST_ACK->CLOSED
四、本地网络IP
-
eth0:192.168.0.45 是实际网络设备接口及对应公网或内网IP地址。
-
lo:127.0.0.1 是本地回环接口,仅用于主机内部通信
-
any:0.0.0.0 在服务配置中通常用作通配符,指示服务监听所有网络接口上的流量。
五、文件描述符
-
stdin:0 表示标准写入的文件描述符
-
stdout:1 表示标准输出的文件描述符
-
stderr:2 表示标错误输出的文件描述符
每个线程的前三个文件描述符都是这三个
-
客户端操作过程 socket()->bind()->connect()->send()->recv()->close()
-
服务端操作过程 socket()->bins()->listen()->accept()->recv()->send()->close()
六、多线程网络I/O
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/select.h>
#include <pthread.h>void *client_thread(void *arg)
{int client_socket = *(int *)arg;while (true){char buffer[1024];memset(buffer, 0, sizeof(buffer));int count = recv(client_socket, buffer, sizeof(buffer), 0); // 读取客户端发送的数据if (count == 0)break;printf("receive data: %s\n", buffer);send(client_socket, buffer, strlen(buffer), 0); // 发送数据给客户端}close(client_socket); // 关闭连接
}int main()
{int socketfd = socket(AF_INET, SOCK_STREAM, 0); // 创建一个TCP socketstruct sockaddr_in servaddr;memset(&servaddr, 0, sizeof(struct sockaddr_in));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = inet_addr("0.0.0.0"); // 设置服务器的IP地址//servaddr.sin_addr.s_addr = ntohl(INADDR_ANY); servaddr.sin_port = htons(1234); // 设置服务器的端口号if (-1 == bind(socketfd, (struct sockaddr *)&servaddr, sizeof(servaddr))) // 将socket绑定到指定的地址和端口{perror("bind");return -1;}listen(socketfd, 10); // 开始监听,允许最多10个连接请求在等待队列中排队while (true){struct sockaddr_in clientaddr;socklen_t clientaddrlen = sizeof(clientaddr);int connfd = accept(socketfd, (struct sockaddr *)&clientaddr, &clientaddrlen); // 接受一个连接请求,printf("accept a new connection\n");pthread_t tid;pthread_create(&tid, NULL, client_thread, (void *)&connfd); // 创建一个线程来处理连接} close(socketfd); // 关闭socketreturn 0;
}
-
TIME_WAIT状态是为了确保TCP状态的可靠终止,防止旧数据包在网络中滞留
-
服务频繁的创建和关闭会导致出现大量的TIME_WAIT状态的进程;一般是服务器主动去断开连接
-
代码中出现大量的TIME_WAIT,可以使用netstat -anop 查看网络连接状态
select、pool、epoll是Linux中用于实现I/O多路复用的三种机制。主要是在单个线程中同时监控多个文件描述符的状态变化,包括可读、可写等从而实现提高服务并发处理的能力;
七、SELECT
- 通过一个集合监控文件描述符列表,默认FD_SETSIZE大小位1024
- 每次调用时都需要将整个文件描述符集合从用户空间拷贝到内核空间,每次需要遍历每个文件描述符的状态,返回就绪集合,效率回随着文件描述符的增加而降低
//select//select(maxfd, &rset, &wset, &error, timeout);fd_set rfds, rset;FD_ZERO(&rfds);FD_SET(socketfd, &rfds);struct timeval timeout;// timeout.tv_sec = 5;// timeout.tv_usec = 0;int maxfd = socketfd;while (true){rset = rfds;int nready = select(maxfd + 1, &rfds, NULL, NULL, NULL);if (nready > 0){if (FD_ISSET(socketfd, &rfds)){struct sockaddr_in clientaddr;socklen_t clientaddrlen = sizeof(clientaddr);int connfd = accept(socketfd, (struct sockaddr *)&clientaddr, &clientaddrlen);printf("accept a new connection\n");FD_SET(connfd, &rfds);maxfd = connfd;}for (int i = socketfd + 1; i <= maxfd; i++){if (FD_ISSET(i, &rfds)){char buf[1024];memset(buf, 0, sizeof(buf));int count = recv(i, buf, sizeof(buf), 0);if (count == 0){close(i);FD_CLR(i, &rfds);break;}//else{send(i, buf, count, 0);printf("send %d bytes data to client, send data:%s\n", count, buf);}}}}}
八、POLL
- 与select一样每次需要将监控的分拣描述符拷贝到内核,轮询所有文件描述符
- poll将文件描述符和事件类型以及检测事件的文件描述符封装到一个结构体里面
struct pollfd fds[1024];int maxfd = socketfd;fds[socketfd].fd = socketfd;fds[socketfd].events = POLLIN;while (true){int nready = poll(fds, maxfd + 1, -1);if (nready == 0)continue;if (fds[socketfd].revents & POLLIN){struct sockaddr_in client_addr;socklen_t client_addr_len = sizeof(client_addr);int client_fd = accept(socketfd, (struct sockaddr*)&client_addr, &client_addr_len);printf("accept a new client, client ip:%s, client port:%d\n",inet_ntoa(client_addr.sin_addr),ntohs(client_addr.sin_port));fds[client_fd].fd = client_fd;fds[client_fd].events = POLLIN;maxfd = (client_fd > maxfd) ? client_fd : maxfd;}for (int i = socketfd + 1; i <= maxfd; i++){if (fds[i].revents & POLLIN){char buf[1024] = {0};int ret = recv(fds[i].fd, buf, sizeof(buf), 0);if (ret <= 0){close(fds[i].fd);fds[i].fd = -1;fds[i].events = 0;continue;}else{printf("recv buf: %s\n", buf);send(fds[i].fd, buf, strlen(buf), 0);}}}}
九、EPOLL
- epool采用事件通知的方式,避免轮询,提高io的性能
- 使用红黑树结构维护文件描述符,并引入水平触发(LT)和边缘触发(ET)两种模式
- 不需要多次拷贝文件描述符集合
- 水平触发一搬在传输小文件中使用,边缘触发一般在传输发文件中使用
- 主要使用三个函数
- epoll_create()
- epoll_ctl()
- epoll_wait()
int epfd = epoll_create(1);struct epoll_event event, events[1024];event.events = EPOLLIN;event.data.fd = socketfd;epoll_ctl(epfd, EPOLL_CTL_ADD, socketfd, &event);while (1){int number = epoll_wait(epfd, events, 1024, -1);for (int i = 0; i < number; i++){int sockfd = events[i].data.fd;if (sockfd == socketfd) // socketfd RPOLLIN事件{struct sockaddr_in client_addr;socklen_t client_addr_len = sizeof(client_addr);int client_fd = accept(socketfd, (struct sockaddr*)&client_addr, &client_addr_len);event.events = EPOLLIN; //水平触发//event.events = EPOLLIN | EPOLLET; //边缘触发event.data.fd = client_fd;epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &event);printf("new client: %d\n", client_fd);}else if (events[i].events & EPOLLIN) // client_fd RPOLLIN事件 {char buf[1024] = {0};int len = read(sockfd, buf, sizeof(buf));if (len == 0){perror("read error");close(sockfd);epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);continue;}printf("client00000 say: %s\n", buf);send(sockfd, buf, strlen(buf), 0);}}}
reactor模式
- listenfd触发EPOLLIN事件,执行accept_cb
- clientfd触发EPOLLIN事件,执行recv_cb
- clientfd触发EPOLLOUT事件,执行send_cb
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/poll.h>
#include <pthread.h>
#include <sys/epoll.h>typedef int (*RCALLBACK)(int fd);int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);struct connet_item
{int fd;char rbuffer[1024];char wbuffer[1024];int rlen;int wlen;union{RCALLBACK recv_callback;RCALLBACK accept_callback;} recv_t;RCALLBACK sned_callback;;
};int epfd = 0;
connet_item conncet_list[1024] = {0};void set_event(int fd, int events, int flag)
{struct epoll_event event;event.events = events;event.data.fd = fd;if (flag)epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event);elseepoll_ctl(epfd, EPOLL_CTL_MOD, fd, &event);
}int accept_cb(int fd)
{struct sockaddr_in client_addr;socklen_t client_addr_len = sizeof(client_addr);int client_fd = accept(fd, (struct sockaddr*)&client_addr, &client_addr_len);if (client_fd < 0){perror("accept error:--");return -1;}//event.events = EPOLLIN; //水平触发//event.events = EPOLLIN | EPOLLET; //边缘触发set_event(client_fd, EPOLLIN, 1);conncet_list[client_fd].fd = client_fd;memset(conncet_list[client_fd].rbuffer, 0, 1024);conncet_list[client_fd].rlen = 0;memset(conncet_list[client_fd].wbuffer, 0, 1024);conncet_list[client_fd].wlen = 0;conncet_list[client_fd].recv_t.recv_callback = recv_cb;conncet_list[client_fd].sned_callback = send_cb;printf("new client: %d\n", client_fd);return client_fd;
}
int recv_cb(int fd)
{char *buf = conncet_list[fd].rbuffer;int idx = conncet_list[fd].rlen;int len = recv(fd, buf + idx, 1024 - idx, 0);if (len == 0){perror("read error");close(fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);return -1;}conncet_list[fd].rlen += len;printf("client00000 say: %s\n", buf);memcpy(conncet_list[fd].wbuffer, buf, conncet_list[fd].rlen);conncet_list[fd].wlen = conncet_list[fd].rlen;conncet_list[fd].rlen = 0;set_event(fd, EPOLLOUT, 0);return len;
}
int send_cb(int fd)
{char *buf = conncet_list[fd].wbuffer;int idx = conncet_list[fd].wlen;int len = send(fd, buf, idx, 0);if (len == 0){perror("write error");close(fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);return -1;} printf("send data:%s\n", buf);set_event(fd, EPOLLIN, 0);conncet_list[fd].wlen += len;return len;
}int main()
{int socketfd = socket(AF_INET, SOCK_STREAM, 0); // 创建一个TCP socketstruct sockaddr_in servaddr;memset(&servaddr, 0, sizeof(struct sockaddr_in));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = inet_addr("0.0.0.0"); // 设置服务器的IP地址//servaddr.sin_addr.s_addr = ntohl(INADDR_ANY); servaddr.sin_port = htons(1234); // 设置服务器的端口号if (-1 == bind(socketfd, (struct sockaddr *)&servaddr, sizeof(servaddr))) // 将socket绑定到指定的地址和端口{perror("bind");return -1;}conncet_list[socketfd].fd = socketfd;;conncet_list[socketfd].recv_t.accept_callback = accept_cb;listen(socketfd, 10); // 开始监听,允许最多10个连接请求在等待队列中排队epfd = epoll_create(1);set_event(socketfd, EPOLLIN, 1); // 设置socketfd的EPOLLIN事件epoll_event events[1024];while (1){int number = epoll_wait(epfd, events, 1024, -1);for (int i = 0; i < number; i++){int sockfd = events[i].data.fd;if (events[i].events & EPOLLIN) // client_fd RPOLLIN事件 {conncet_list[sockfd].recv_t.accept_callback(sockfd); }else if (events[i].events & EPOLLOUT){// 客户端发送数据conncet_list[sockfd].sned_callback(sockfd);}}}return 0;
}