文章目录
- 0 包裹函数
- 1 多进程服务器
- 流程
- 代码
- 2 多线程服务器
- 3 TCP状态转移
- 半关闭
- 心跳包
- 4 端口复用
- 5 IO多路复用技术
- 高并发服务器
- 6 select
- 代码
- 总结
- 7 POLL
- API
- 代码
- poll相对select的优缺点
- 8 epoll(重点)
- API
- 监听管道
- 代码
- EPOLL 高并发服务器
- 9 Epoll的两种工作方式
- 边缘触发代码
0 包裹函数
用于创建socket,绑定端口ip和监听时,添加了错误时报错的包裹函数
warp.h
#ifndef __WRAP_H_
#define __WRAP_H_
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <strings.h>void perr_exit(const char *s);
int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr);
int Bind(int fd, const struct sockaddr *sa, socklen_t salen);
int Connect(int fd, const struct sockaddr *sa, socklen_t salen);
int Listen(int fd, int backlog);
int Socket(int family, int type, int protocol);
ssize_t Read(int fd, void *ptr, size_t nbytes);
ssize_t Write(int fd, const void *ptr, size_t nbytes);
int Close(int fd);
ssize_t Readn(int fd, void *vptr, size_t n);
ssize_t Writen(int fd, const void *vptr, size_t n);
ssize_t my_read(int fd, char *ptr);
ssize_t Readline(int fd, void *vptr, size_t maxlen);
int tcp4bind(short port,const char *IP);
#endif
wrap.c
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <strings.h>void perr_exit(const char *s)
{perror(s);exit(-1);
}int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr)
{int n;again:if ((n = accept(fd, sa, salenptr)) < 0) {if ((errno == ECONNABORTED) || (errno == EINTR))goto again;elseperr_exit("accept error");}return n;
}int Bind(int fd, const struct sockaddr *sa, socklen_t salen)
{int n;if ((n = bind(fd, sa, salen)) < 0)perr_exit("bind error");return n;
}int Connect(int fd, const struct sockaddr *sa, socklen_t salen)
{int n;if ((n = connect(fd, sa, salen)) < 0)perr_exit("connect error");return n;
}int Listen(int fd, int backlog)
{int n;if ((n = listen(fd, backlog)) < 0)perr_exit("listen error");return n;
}int Socket(int family, int type, int protocol)
{int n;if ((n = socket(family, type, protocol)) < 0)perr_exit("socket error");return n;
}ssize_t Read(int fd, void *ptr, size_t nbytes)
{ssize_t n;again:if ( (n = read(fd, ptr, nbytes)) == -1) {if (errno == EINTR)goto again;elsereturn -1;}return n;
}ssize_t Write(int fd, const void *ptr, size_t nbytes)
{ssize_t n;again:if ( (n = write(fd, ptr, nbytes)) == -1) {if (errno == EINTR)goto again;elsereturn -1;}return n;
}int Close(int fd)
{int n;if ((n = close(fd)) == -1)perr_exit("close error");return n;
}/*参三: 应该读取的字节数*/
ssize_t Readn(int fd, void *vptr, size_t n)
{size_t nleft; //usigned int 剩余未读取的字节数ssize_t nread; //int 实际读到的字节数char *ptr;ptr = vptr;nleft = n;while (nleft > 0) {if ((nread = read(fd, ptr, nleft)) < 0) {if (errno == EINTR)nread = 0;elsereturn -1;} else if (nread == 0)break;nleft -= nread;ptr += nread;}return n - nleft;
}ssize_t Writen(int fd, const void *vptr, size_t n)
{size_t nleft;ssize_t nwritten;const char *ptr;ptr = vptr;nleft = n;while (nleft > 0) {if ( (nwritten = write(fd, ptr, nleft)) <= 0) {if (nwritten < 0 && errno == EINTR)nwritten = 0;elsereturn -1;}nleft -= nwritten;ptr += nwritten;}return n;
}static ssize_t my_read(int fd, char *ptr)
{static int read_cnt;static char *read_ptr;static char read_buf[100];if (read_cnt <= 0) {
again:if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) {if (errno == EINTR)goto again;return -1;} else if (read_cnt == 0)return 0;read_ptr = read_buf;}read_cnt--;*ptr = *read_ptr++;return 1;
}ssize_t Readline(int fd, void *vptr, size_t maxlen)
{ssize_t n, rc;char c, *ptr;ptr = vptr;for (n = 1; n < maxlen; n++) {if ( (rc = my_read(fd, &c)) == 1) {*ptr++ = c;if (c == '\n')break;} else if (rc == 0) {*ptr = 0;return n - 1;} elsereturn -1;}*ptr = 0;return n;
}int tcp4bind(short port,const char *IP)
{struct sockaddr_in serv_addr;int lfd = Socket(AF_INET,SOCK_STREAM,0);bzero(&serv_addr,sizeof(serv_addr));if(IP == NULL){//如果这样使用 0.0.0.0,任意ip将可以连接serv_addr.sin_addr.s_addr = INADDR_ANY;}else{if(inet_pton(AF_INET,IP,&serv_addr.sin_addr.s_addr) <= 0){perror(IP);//转换失败exit(1);}}serv_addr.sin_family = AF_INET;serv_addr.sin_port = htons(port);Bind(lfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr));return lfd;
}
1 多进程服务器
流程
因为read 和 write都是阻塞的,所以如果多个客户端进行连接时,会阻塞。
理念就是连接给一个专用的进程,然后这个进程来分配其他进程进行读写的操作
流程
创建套接字
绑定
监听
while(1)
{
提取连接
fork创建子进程
子进程中 关闭lfd 服务客户端(连接)
父进程关闭 cfd(读写),回收子进程资源
}
代码
#include<sys/socket.h>
#include<stdio.h>
#include<arpa/inet.h>
#include <unistd.h>
#include "wrap.h"
#include<stdlib.h>
#include <signal.h>
#include<sys/wait.h>
#include<sys/types.h>void *free_process(int signum)
{pid_t pid;while(1){pid = waitpid(-1,NULL,WNOHANG);if(pid <= 0) // 没有要回收的子进程{break;}else{printf("child pid =%d\n",pid);}}}int main()
{// 阻塞信号集,在子进程创建之前// 在创建子进程之后添加信号sigset_t set;sigemptyset(&set);sigaddset(&set, SIGCHLD);sigprocmask(SIG_BLOCK, &set, NULL);// 创建套接字,绑定 链接socketint lfd = tcp4bind(8000,NULL);// 监听Listen(lfd, 128);// 提取// 回射struct sockaddr_in cliaddr;socklen_t len = sizeof(cliaddr);while(1){// 读取socketint cfd = Accept(lfd, (struct sockaddr*)&cliaddr,&len);char ip[16] = "";printf("new client ip= %s port = %d\n",inet_ntop(AF_INET,&cliaddr.sin_addr.s_addr,ip,16),ntohs(cliaddr.sin_port));// fork 创建子进程pid_t pid;pid = fork();if(pid<0){perror("");exit(0);}else if(pid == 0) // 子进程{// 关闭lfdclose(lfd);while(1){char buf[1024];int n = read(cfd,buf,sizeof(buf));if(n < 0){perror("");close(cfd);exit(0);}else if(n == 0) // 对方关闭{printf("client close\n");close(cfd);exit(0);}else{printf("%s", buf);write(cfd,buf,n);// exit(0);}}}else{close(cfd); // 回收// 注册信号回调struct sigaction act;act.sa_flags = 0;act.sa_handler = free_process;sigemptyset(&act.sa_mask);sigaction(SIGCHLD,&act,NULL);sigprocmask(SIG_UNBLOCK, &set, NULL);}}// 回收return 0;
}
2 多线程服务器
#include<sys/socket.h>
#include<stdio.h>
#include<arpa/inet.h>
#include <unistd.h>
#include "wrap.h"
#include<stdlib.h>
#include <signal.h>
#include<sys/wait.h>
#include<sys/types.h>
#include<pthread.h>typedef struct c_info
{int cfd;struct sockaddr_in cliaddr;}CINFO;int main(int argc, char *argv[])
{if(argc < 2){printf("argc < 2???\n ./a.out 8000\n");}// 初始化线程属性pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);short port = atoi(argv[1]);int lfd = tcp4bind(port, NULL);Listen(lfd, 128);struct sockaddr_in cliaddr;socklen_t len = sizeof(cliaddr);CINFO *info;while(1){int cfd = Accept(lfd,(struct sockaddr*)&cliaddr,&len);char ip[16] = "";printf("new client ip = %s port = %d\n", inet_ntop(AF_INET,&cliaddr.sin_addr.s_addr,ip,16),ntohs(cliaddr.sin_port));pthread_t pthid;info = malloc(sizeof(CINFO));info->cfd = cfd;info->cliaddr = cliaddr;pthread_create(&pthid,NULL,client_fun,&info);}return 0;
}void * client_fun(void *arg)
{CINFO *info = (CINFO *)arg;char ip[16] = "";printf("new client ip = %s port = %d\n", inet_ntop(AF_INET,&(info->cliaddr.sin_addr.s_addr),ip,16),ntohs(info->cliaddr.sin_port));while(1){char buf[1024] = "";int count = 0;count = read(info->cfd, buf, sizeof(buf));if(count < 0){perror("");break;}else if(count == 0){printf("client close");break;}else{ printf("%s", buf);write(info->cfd, buf, count);}}close(info->cfd);free(info);}
3 TCP状态转移
TIME_WAIT -> CLOSE 2MML
半关闭
处于FIN_WAIT2时,处于半关闭状态,此时只能读数据不能收数据
手动半关闭
心跳包
每隔一段时间服务器向客户端发送一个包,客户端需要在一定时间内返回一个规定好的包,用于测试连接是否还存在,如果对方没有回复,则断开连接
4 端口复用
端口重新启用
使用 setsockopt设置端口重新使用
放在绑定之前
5 IO多路复用技术
高并发服务器
1.阻塞等待
一个进程 服务一个客户端
消耗资源
2.非阻塞忙轮询
重复查看 进程是否有需求,是否有新连接
3.多路io
通过监听多个文件描述符,监听文件描述符是否还在读写
内核有三种方式
select:windows使用 select select跨平台
poll: 少用
epoll: linux下使用
内核监听多个文件描述符的属性(读写缓冲区)变化,如果某个文件描述符的读缓冲区变化了,这个时候就是可以读了,将这个事件告知应用层
6 select
使用select监听文件描述符
注意:变化的文件描述符,会存放在监听的集合中,未变化的文件描述符会被删除
代码
#include<sys/socket.h>
#include<stdio.h>
#include<arpa/inet.h>
#include <unistd.h>
#include<sys/time.h>
#include<sys/types.h>
#include<sys/select.h>
#include "wrap.h"#define PORT 8888
int main(int argc, char *argv[])
{// 创建套接字,绑定int lfd = tcp4bind(PORT,NULL);// 监听Listen(lfd, 128);int maxfd = lfd; // 最大的文件描述符fd_set oldset,rset;// 清空集合FD_ZERO(&oldset);FD_ZERO(&rset);// 将lfd加入到oldset集合中FD_SET(lfd, &oldset);// whilewhile(1){rset = oldset; // 将oldset赋值给需要监听的集合rsetint n = select(maxfd + 1,&rset,NULL,NULL,NULL);if(n<0){perror("");break;}else if(n==0){continue;}else // n>0 监听到了文件描述符{// lfd变化, 则进行提取if(FD_ISSET(lfd,&rset)){struct sockaddr_in cliaddr;socklen_t len = sizeof(cliaddr);char ip[16] = "";// 提取新的连接int cfd = Accept(lfd, (struct sockaddr*)&cliaddr, & len);printf("new client ip = %s, port = %d\n",inet_ntop(AF_INET, &cliaddr.sin_addr.s_addr,ip,16),ntohs(cliaddr.sin_port));// 将cfd添加到oldset集合中,下次进行监听FD_SET(cfd,&oldset);// 更新maxfdif(cfd > maxfd)maxfd = cfd;// 如果只有lfd变化,continueif(--n == 0)continue;}// cfd 遍历cfd,看lfd之后的文件描述符是否在rset,如果在则cfd变化for(int i = lfd+1;i<=maxfd;i++){// 如果i文件描述符在rset集合中if(FD_ISSET(i,&rset)){char buf[1024] = "";int ret = Read(i,buf,sizeof(buf));if(ret < 0) //出错,将cfd关闭,从oldset删除cfd{perror("");close(i);FD_CLR(i,&oldset);}else if(ret == 0){printf("client close\n");close(i);FD_CLR(i,&oldset);}else{printf("%s\n", buf);Write(i,buf,ret);}}}}}// select监听return 0;
}
总结
优缺点
优点:跨平台
缺点:文件描述符1024的限制 由于FD_SETSIZE的限制
只是返回变化的文件描述符的个数,具体哪个变化需要遍历
每次都需要将需要监听的文件描述集合由应用层拷贝到内核
7 POLL
API
代码
//IO多路复用技术poll函数的使用
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <errno.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <poll.h>
#include "wrap.h"int main()
{int i;int n;int lfd;int cfd;int ret;int nready;int maxfd;char buf[1024];socklen_t len;int sockfd;fd_set tmpfds, rdfds;struct sockaddr_in svraddr, cliaddr;//创建socketlfd = Socket(AF_INET, SOCK_STREAM, 0);//允许端口复用int opt = 1;setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(int));//绑定bindsvraddr.sin_family = AF_INET;svraddr.sin_addr.s_addr = htonl(INADDR_ANY);svraddr.sin_port = htons(8888);ret = Bind(lfd, (struct sockaddr *)&svraddr, sizeof(struct sockaddr_in));//监听listenret = Listen(lfd, 128);struct pollfd client[1024];for(i=0; i<1024; i++){client[i].fd = -1;} //将监听文件描述符委托给内核监控----监控读事件client[0].fd = lfd;client[0].events = POLLIN;maxfd = 0; //maxfd表示内核监控的范围while(1){nready = poll(client, maxfd+1, -1);if(nready<0){perror("poll error");exit(1);}//有客户端连接请求if(client[0].fd==lfd && (client[0].revents & POLLIN)){cfd = Accept(lfd, NULL, NULL);//寻找client数组中的可用位置for(i=1; i<1024; i++){if(client[i].fd==-1){client[i].fd = cfd;client[i].events = POLLIN;break;}}//若没有可用位置, 则关闭连接if(i==1024){Close(cfd);continue;}if(maxfd<i){maxfd = i;}if(--nready==0){continue;}}//下面是有数据到来的情况for(i=1; i<=maxfd; i++){//若fd为-1, 表示连接已经关闭或者没有连接if(client[i].fd==-1) {continue;}sockfd = client[i].fd;memset(buf, 0x00, sizeof(buf));n = Read(sockfd, buf, sizeof(buf));if(n<=0){printf("read error or client closed,n==[%d]\n", n);Close(sockfd);client[i].fd = -1; //fd为-1,表示不再让内核监控}else{printf("read over,n==[%d],buf==[%s]\n", n, buf);write(sockfd, buf, n);}if(--nready==0){break;}}}Close(lfd);return 0;
}
poll相对select的优缺点
优点:相对于select没有最大1024文件描述符限制
请求和返回是分离
缺点:
每次都需要将需要监听的文件描述符从应用层拷贝到内核
每次都需要将数组中的元素遍历一遍才知道哪个变化
大量并发、少量活跃率低
8 epoll(重点)
1.创建红黑树
2.将监听的文件描述符上树
3.监听
特点:
没有文件描述符1024的限制
以后每次监听都不需要在此将需要监听的文件描述符拷贝在内核
返回的是已经变化的文件描述符,不需要遍历树
工作原理:
API
1.创建红黑树
2.上树 下树 修改节点
3. 监听
监听管道
代码
#include<sys/socket.h>
#include<stdio.h>
#include<arpa/inet.h>
#include <unistd.h>
#include<sys/time.h>
#include<sys/types.h>
#include<sys/select.h>
#include "wrap.h"
#include<sys/epoll.h>int main()
{int fd[2];pipe(fd);pid_t pid;pid = fork();if(pid < 0)perror("");else if(pid == 0){close(fd[0]);char buf[5];char ch = 'a';while(1){sleep(3);memset(buf,ch,sizeof(buf));write(fd[1],buf,5);}}else{close(fd[1]);// 创建树int epfd = epoll_create(1);struct epoll_event ev, evs[1];ev.data.fd = fd[0];ev.events = EPOLLIN;epoll_ctl(epfd,EPOLL_CTL_ADD,fd[0],&ev);while(1){int n = epoll_wait(epfd, &evs[1],-1,-1);if(n == 1){ char buf[128] = "";int ret = read(fd[0],buf,sizeof(buf));if(ret <= 0){close(fd[0]);epoll_ctl(epfd,EPOLL_CTL_DEL,fd[0],&ev);break;}else{printf("%s\n",buf);}}}}return 0;}
EPOLL 高并发服务器
#include<sys/socket.h>
#include<stdio.h>
#include<arpa/inet.h>
#include <unistd.h>
#include<sys/time.h>
#include<sys/types.h>
#include<sys/select.h>
#include "wrap.h"
#include<sys/epoll.h>#define PORT 8000
int main()
{// 创建套接字int lfd = tcp4bind(PORT,NULL);// 监听Listen(lfd,128);// 创建树int epfd = epoll_create(1);// 将lfd上树struct epoll_event ev,evs[1024];ev.data.fd = lfd;ev.events = EPOLLIN;epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&ev);// while循环监听while(1){int nready = epoll_wait(epfd,evs,-1,-1);if (nready < 0){perror("");break;}else if(nready == 0){continue;}else // nread > 0 文件描述符有变化{for(int i =0;i<nready;i++){// 判断lfd变换,并且是读事件变换if(evs->data.fd == lfd && evs[i].events & EPOLLIN){struct sockaddr_in cliaddr;char ip[16] = "";socklen_t len = sizeof(cliaddr);int cfd = Accept(lfd,(struct sockaddr *)&cliaddr, &len);printf("new client ip = %s port =%d\n",inet_ntop(AF_INET,&cliaddr.sin_addr.s_addr,ip,16),ntohs(cliaddr.sin_port));// 将cfd上树ev.data.fd = cfd;ev.events = EPOLLIN;epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&ev);}else if(evs[i].events & EPOLLIN) // cfd 变换,而且是读事件变换{char buf[1024] = "";int n = read(evs[i].data.fd,buf,sizeof(buf));if(n < 0) // 出错{perror("");epoll_ctl(epfd,EPOLL_CTL_DEL,evs[i].data.fd,&evs[i]);}else if(n == 0) // 客户端关闭,下树{printf("client close]\n");close(evs[i].data.fd); // 关闭cfdepoll_ctl(epfd,EPOLL_CTL_DEL,evs[i].data.fd,&evs[i]); }else // 服务端进行处理{printf("%s\n",buf);write(evs[i].data.fd,buf,n);}}}}}return 0;
}
9 Epoll的两种工作方式
- 监听读缓存区的变化
水平触发
只要缓存区有数据,就会触发epoll_wait
边缘触发
数据来一次,epoll_wait只触发一次
2.监听写缓存区的变化
水平触发:只要可以写,就会触发
边沿触发:数据从有到无就会触发
边缘触发
触发一次的时候只读4位,但发送了10位,所以虽然只读一次,但是读不完
设置为一次读完
设置cfd为非阻塞
因为设置水平触发,只要缓存区有数据epoll_wait就会被触发,epoll_wait 是一个系统调用,尽量少调用边缘触发,边缘触发数据来一次只触发一次,这个时候要求一次性将数据读完,所以while循环读,堵到最后read默认带阻塞,不能让read阻塞,因为不能再去监听,设置cfd为非阻塞,read堵到最后一次返回值为-1,判断errno的值为eagain,则代表数据读干净、
工作中 边缘触发 + 非阻塞 = 高速模式
边缘触发代码
#include<sys/socket.h>
#include<stdio.h>
#include<arpa/inet.h>
#include <unistd.h>
#include<sys/time.h>
#include<sys/types.h>
#include<sys/select.h>
#include "wrap.h"
#include<sys/epoll.h>
#include <fcntl.h>#define PORT 8000
int main()
{// 创建套接字int lfd = tcp4bind(PORT,NULL);// 监听Listen(lfd,128);// 创建树int epfd = epoll_create(1);// 将lfd上树struct epoll_event ev,evs[1024];ev.data.fd = lfd;ev.events = EPOLLIN;epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&ev);// while循环监听while(1){int nready = epoll_wait(epfd,evs,-1,-1);printf("epoll wait");if (nready < 0){perror("");break;}else if(nready == 0){continue;}else // nread > 0 文件描述符有变化{for(int i =0;i<nready;i++){// 判断lfd变换,并且是读事件变换if(evs->data.fd == lfd && evs[i].events & EPOLLIN){struct sockaddr_in cliaddr;char ip[16] = "";socklen_t len = sizeof(cliaddr);int cfd = Accept(lfd,(struct sockaddr *)&cliaddr, &len);printf("new client ip = %s port =%d\n",inet_ntop(AF_INET,&cliaddr.sin_addr.s_addr,ip,16),ntohs(cliaddr.sin_port));// 获得cfd的标志位int flag = fcntl(cfd, F_GETFL); // 设置为非阻塞flag |= O_NONBLOCK;fcntl(cfd,F_SETFL,flag);// 将cfd上树ev.data.fd = cfd;ev.events = EPOLLIN | EPOLLET;epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&ev);}else if(evs[i].events & EPOLLIN) // cfd 变换,而且是读事件变换{while(1){char buf[4] = "";// 如果读一个缓冲区,缓冲区域没有数据,如果是阻塞,就阻塞等待// 是非阻塞,返回值等于 -1,并且会将errorno值设置为EAGAIN int n = read(evs[i].data.fd,buf,sizeof(buf));if(n < 0) // 出错{ // 如果缓冲区读干净了,这个时候应该跳出while循环,继续监听if(errno == EAGAIN){break;}// 普通错误perror("");close(evs[i].data.fd); // 关闭cfdepoll_ctl(epfd,EPOLL_CTL_DEL,evs[i].data.fd,&evs[i]);break;}else if(n == 0) // 客户端关闭,下树{printf("client close]\n");close(evs[i].data.fd); // 关闭cfdepoll_ctl(epfd,EPOLL_CTL_DEL,evs[i].data.fd,&evs[i]); break;}else // 服务端进行处理{// printf("%s\n",buf);write(STDOUT_FILENO,buf,4);write(evs[i].data.fd,buf,n);}}}}}}return 0;
}