linux——select、poll、epoll

文章目录

    • 1.多路I/O转接服务器
    • 2.select
    • 3.select代码
    • 4.poll
    • 5.epoll
      • 5.1 基础API
      • 5.3 epoll代码
      • 5.4 边沿触发和水平触发
        • 5.4.1 水平出发LT
        • 5.4.2 边缘触发
        • 5.4.3 服务器的边缘触发和水平触发
      • 5.4 边缘触发但是能一次读完
    • 6.epoll反应堆模型
      • 6.1 反应堆模型
      • 6.2 epoll反应堆代码
    • 7.心跳包
    • 8.线程池
      • 8.1 线程池代码
      • 8.2 请问怎么实现线程池

1.多路I/O转接服务器

多路IO转接服务器也叫做多任务IO服务器。该类服务器实现的主旨思想是,不再由应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件。

2.select

#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);nfds: 		监控的文件描述符集里最大文件描述符加1,因为此参数会告诉内核检测前多少个文件描述符的状态readfds:	监控有读数据到达文件描述符集合,传入传出参数writefds:	监控写数据到达文件描述符集合,传入传出参数exceptfds:	监控异常发生达文件描述符集合,如带外数据到达异常,传入传出参数timeout:	定时阻塞监控时间,3种情况1.NULL,永远等下去2.设置timeval,等待固定时间3.设置timeval里时间均为0,检查描述字后立即返回,轮询struct timeval {long tv_sec; /* seconds */long tv_usec; /* microseconds */};void FD_CLR(int fd, fd_set *set); 	//把文件描述符集合里fd清0int FD_ISSET(int fd, fd_set *set); 	//测试文件描述符集合里fd是否置1void FD_SET(int fd, fd_set *set); 	//把文件描述符集合里fd位置1void FD_ZERO(fd_set *set); 			//把文件描述符集合里所有位清0返回值:成功:所监听的所有监听集合中满足条件的总数;失败:返回-1

3.select代码

  1. 服务器
/* server.c */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "wrap.h"#define MAXLINE 80
#define SERV_PORT 6666int main(int argc, char *argv[])
{int i, maxi, maxfd, listenfd, connfd, sockfd;int nready, client[FD_SETSIZE]; 	/* FD_SETSIZE 默认为 1024 */ssize_t n;fd_set rset, allset;char buf[MAXLINE];char str[INET_ADDRSTRLEN]; 			/* #define INET_ADDRSTRLEN 16 */socklen_t cliaddr_len;struct sockaddr_in cliaddr, servaddr;listenfd = Socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(SERV_PORT);Bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));Listen(listenfd, 20); 		/* 默认最大128 */maxfd = listenfd; 			/* 初始化 */maxi = -1;					/* client[]的下标 */for (i = 0; i < FD_SETSIZE; i++)client[i] = -1; 		/* 用-1初始化client[] */FD_ZERO(&allset);FD_SET(listenfd, &allset); /* 构造select监控文件描述符集 */for ( ; ; ) {rset = allset; 			/* 每次循环时都从新设置select监控信号集 */nready = select(maxfd+1, &rset, NULL, NULL, NULL);if (nready < 0)perr_exit("select error");if (FD_ISSET(listenfd, &rset)) { /* new client connection */cliaddr_len = sizeof(cliaddr);connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);printf("received from %s at PORT %d\n",inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),ntohs(cliaddr.sin_port));for (i = 0; i < FD_SETSIZE; i++) {if (client[i] < 0) {client[i] = connfd; /* 保存accept返回的文件描述符到client[]里 */break;}}/* 达到select能监控的文件个数上限 1024 */if (i == FD_SETSIZE) {fputs("too many clients\n", stderr);exit(1);}FD_SET(connfd, &allset); 	/* 添加一个新的文件描述符到监控信号集里 */if (connfd > maxfd)maxfd = connfd; 		/* select第一个参数需要 */if (i > maxi)maxi = i; 				/* 更新client[]最大下标值 */if (--nready == 0)continue; 				/* 如果没有更多的就绪文件描述符继续回到上面select阻塞监听,负责处理未处理完的就绪文件描述符 */}for (i = 0; i <= maxi; i++) { 	/* 检测哪个clients 有数据就绪 */if ( (sockfd = client[i]) < 0)continue;if (FD_ISSET(sockfd, &rset)) {if ( (n = Read(sockfd, buf, MAXLINE)) == 0) {Close(sockfd);		/* 当client关闭链接时,服务器端也关闭对应链接 */FD_CLR(sockfd, &allset); /* 解除select监控此文件描述符 */client[i] = -1;} else {int j;for (j = 0; j < n; j++)buf[j] = toupper(buf[j]);Write(sockfd, buf, n);}if (--nready == 0)break;}}}close(listenfd);return 0;
}
  1. 客户端
/* client.c */
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>
#include "wrap.h"#define MAXLINE 80
#define SERV_PORT 6666int main(int argc, char *argv[])
{struct sockaddr_in servaddr;char buf[MAXLINE];int sockfd, n;sockfd = Socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);servaddr.sin_port = htons(SERV_PORT);Connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));while (fgets(buf, MAXLINE, stdin) != NULL) {Write(sockfd, buf, strlen(buf));n = Read(sockfd, buf, MAXLINE);if (n == 0)printf("the other side has been closed.\n");elseWrite(STDOUT_FILENO, buf, n);}Close(sockfd);return 0;
}

4.poll

#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);struct pollfd {int fd; /* 文件描述符 */short events; /* 监控的事件 */short revents; /* 监控事件中满足条件返回的事件 */};POLLIN			普通或带外优先数据可读,即POLLRDNORM | POLLRDBANDPOLLRDNORM		数据可读POLLRDBAND		优先级带数据可读POLLPRI 		高优先级可读数据POLLOUT		普通或带外数据可写POLLWRNORM		数据可写POLLWRBAND		优先级带数据可写POLLERR 		发生错误POLLHUP 		发生挂起POLLNVAL 		描述字不是一个打开的文件nfds 			监控数组中有多少文件描述符需要被监控timeout 		毫秒级等待-1:阻塞等,#define INFTIM -1 				Linux中没有定义此宏0:立即返回,不阻塞进程>0:等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值如果不再监控某个文件描述符时,可以把pollfd中,fd设置为-1,poll不再监控此pollfd,
下次返回时,把revents设置为0
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<poll.h>
#include<errno.h>
#include<ctype.h>
#include<unistd.h>#define MAXLINE 80
#define SERV_PORT 8000
#define OPEN_MAX 1024int main()
{int i,j,maxi,listenfd,connfd,sockfd;int nready;ssize_t n;char buf[MAXLINE],str[INET_ADDRSTRLEN];socklen_t clilen;struct pollfd client[OPEN_MAX];struct sockaddr_in cliaddr,servaddr;listenfd=socket(AF_INET,SOCK_STREAM,0);int opt=1;setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));bzero(&servaddr,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_addr.s_addr=htonl(INADDR_ANY);servaddr.sin_port=htons(SERV_PORT);bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr));listen(listenfd,120);client[0].fd=listenfd;client[0].events=POLLIN;for(int i=1;i<OPEN_MAX;i++){client[i].fd=-1;}maxi=0;for( ; ;){nready=poll(client,maxi+1,-1);/*这个if语句监听listenfd是否有读事件,也就是是否有客户端连接请求,有的话accept连接,并将客户端的文件描述符添加到监听队列中,进行监听*/if(client[0].revents & POLLIN){clilen=sizeof(cliaddr);connfd=accept(listenfd,(struct sockaddr*)&cliaddr,&clilen);printf("received from %s at PORT %d\n",inet_ntop(AF_INET,&cliaddr.sin_addr,str,sizeof(str)),ntohs(cliaddr.sin_port));for(i=1;i<OPEN_MAX;i++){if(client[i].fd<0){client[i].fd=connfd;break;}}if(i==OPEN_MAX){perror("too many clients");}client[i].events=POLLIN;if(i>maxi){maxi=i;}if(--nready<=0){continue;}}for(int i=1;i<=maxi;i++){if((sockfd=client[i].fd)<0){continue;}if(client[i].revents & POLLIN){if((n=read(sockfd,buf,MAXLINE))<0){if(errno==ECONNRESET){close(sockfd);client[i].fd=-1;}else{perror("read error");}}else if(n==0){printf("client[%d] closed connection\n",i);close(sockfd);client[i].fd=-1;}else{for(j=0;j<n;j++){buf[j]=toupper(buf[j]);}printf("read 执行\n");write(sockfd,buf,n);}if(--nready<=0){break;}}}}return 0;
}

5.epoll

epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
目前epell是linux大规模并发网络程序中的热门首选模型。
epoll除了提供select/poll那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
可以使用cat命令查看一个进程可以打开的socket描述符上限。

cat /proc/sys/fs/file-max

如有需要,可以通过修改配置文件的方式修改该上限值

sudo vi /etc/security/limits.conf在文件尾部写入以下配置,soft软限制,hard硬限制。如下图所示。* soft nofile 65536* hard nofile 100000

5.1 基础API

  1. 创建一个epoll句柄,参数size用来告诉内核监听的文件描述符的个数,跟内存大小有关。
#include <sys/epoll.h>int epoll_create(int size)		size:监听数目
  1. 控制某个epoll监控的文件描述符上的事件:注册、修改、删除。
#include <sys/epoll.h>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)epfd:	为epoll_creat的句柄op:		表示动作,用3个宏来表示:EPOLL_CTL_ADD (注册新的fd到epfd)EPOLL_CTL_MOD (修改已经注册的fd的监听事件)EPOLL_CTL_DEL (从epfd删除一个fd);event:	告诉内核需要监听的事件struct epoll_event {__uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */};typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64;} epoll_data_t;EPOLLIN :	表示对应的文件描述符可以读(包括对端SOCKET正常关闭)EPOLLOUT:	表示对应的文件描述符可以写EPOLLPRI:	表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)EPOLLERR:	表示对应的文件描述符发生错误EPOLLHUP:	表示对应的文件描述符被挂断;EPOLLET: 	将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)而言的EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
  1. 等待所监控文件描述符上有事件的产生,类似于select()调用
#include <sys/epoll.h>int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)events:		用来存内核得到事件的集合,maxevents:	告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,timeout:	是超时时间-1:	阻塞0:	立即返回,非阻塞>0:	指定毫秒返回值:	成功返回有多少文件描述符就绪,时间到时返回0,出错返回-1

5.3 epoll代码

#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<arpa/inet.h>
#include<sys/epoll.h>
#include<errno.h>
#include<ctype.h>#define MAXLINE 8192
#define SERV_PORT 8000
#define OPEN_MAX 5000int main()
{int i,listenfd,connfd,sockfd;int n,num=0;ssize_t nready,efd,res;char buf[MAXLINE],str[INET_ADDRSTRLEN];socklen_t clilen;struct sockaddr_in cliaddr,servaddr;struct epoll_event tep,ep[OPEN_MAX];  //tep:epoll_ctl参数;ep[]:epoll_wait参数listenfd=socket(AF_INET,SOCK_STREAM,0);int opt=1;setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));bzero(&servaddr,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_addr.s_addr=htonl(INADDR_ANY);servaddr.sin_port=htons(SERV_PORT);bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr));listen(listenfd,20);efd=epoll_create(OPEN_MAX);  //创建epoll模型,efd指向红黑树根节点if(efd==-1){perror("epoll_create error");}tep.events=EPOLLIN;tep.data.fd=listenfd;         //指定lfd的监听事件为读res=epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&tep);   //将lfd及对应的结构体添加到红黑树上,efd可以找到该树if(res==-1){perror("epoll_ctl error");}while(1){/*对红黑树上文件描述符监听,ep为struct epoll_event类型数组,OPEN_MAX为数组容量-1永久阻塞*/nready=epoll_wait(efd,ep,OPEN_MAX,-1);if(nready==-1){perror("epoll_wait error");}for(int i=0;i<nready;i++){if(!(ep[i].events&EPOLLIN))continue;if(ep[i].data.fd==listenfd){clilen=sizeof(cliaddr);connfd=accept(listenfd,(struct sockaddr*)&cliaddr,&clilen);//listenfd可读,说明有客户端连接printf("received from %s at PORT %d\n",inet_ntop(AF_INET,&cliaddr.sin_addr,str,sizeof(str)),ntohs(cliaddr.sin_port));printf("cfd %d---client %d\n",connfd,++num);tep.events=EPOLLIN;tep.data.fd=connfd;res=epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep); //将客户端文件描述符添加到红黑树上if(res==-1){perror("epoll_ctl error");}}else{sockfd=ep[i].data.fd;n=read(sockfd,buf,MAXLINE);if(n==0){   //读到0,说明客户端关闭连接res=epoll_ctl(efd,EPOLL_CTL_DEL,sockfd,NULL); //将该文件描述符从红黑树摘除if(res==-1){perror("epoll_ctl error");}close(sockfd);printf("client[%d] closed connection\n",sockfd);}else if(n<0){       //出错perror("read n<0 error:");res=epoll_ctl(efd,EPOLL_CTL_DEL,sockfd,NULL);close(sockfd);}else{for(int i=0;i<n;i++){buf[i]=toupper(buf[i]);}write(sockfd,buf,n);}}}}close(listenfd);close(efd);return 0;
}

5.4 边沿触发和水平触发

5.4.1 水平出发LT

#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<sys/epoll.h>
#include<errno.h>#define MAXLINE 10int main()
{int efd,i;int pfd[2];pid_t pid;char buf[MAXLINE],ch='a';pipe(pfd);pid=fork();if(pid==0){close(pfd[0]);while(1){for(i=0;i<MAXLINE/2;i++){buf[i]=ch;}buf[i-1]='\n';ch++;for(;i<MAXLINE;i++){buf[i]=ch;}buf[i-1]='\n';ch++;write(pfd[1],buf,sizeof(buf));sleep(5);}close(pfd[1]);}else if(pid>0){struct epoll_event event;struct epoll_event resevent[10];int res,len;close(pfd[1]);efd=epoll_create(10);//event.events=EPOLLIN|EPOLLET; //边缘触发event.events=EPOLLIN; //水平触发,默认水平触发event.data.fd=pfd[0];epoll_ctl(efd,EPOLL_CTL_ADD,pfd[0],&event);while(1){res=epoll_wait(efd,resevent,10,-1);printf("res %d\n",res);if(resevent[0].data.fd==pfd[0]){len=read(pfd[0],buf,MAXLINE/2);write(STDOUT_FILENO,buf,len);}}close(pfd[0]);}}
zhaoxr@zhaoxr-ThinkPad-E450:~/select$ ./epoll_trigger                     
res 1
aaaa
res 1
bbbb
res 1
cccc
res 1
dddd
res 1
eeee
res 1
ffff
^C

5.4.2 边缘触发

#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<sys/epoll.h>
#include<errno.h>#define MAXLINE 10int main()
{int efd,i;int pfd[2];pid_t pid;char buf[MAXLINE],ch='a';pipe(pfd);pid=fork();if(pid==0){close(pfd[0]);while(1){for(i=0;i<MAXLINE/2;i++){buf[i]=ch;}buf[i-1]='\n';ch++;for(;i<MAXLINE;i++){buf[i]=ch;}buf[i-1]='\n';ch++;write(pfd[1],buf,sizeof(buf));sleep(5);}close(pfd[1]);}else if(pid>0){struct epoll_event event;struct epoll_event resevent[10];int res,len;close(pfd[1]);efd=epoll_create(10);event.events=EPOLLIN|EPOLLET; //ET 边缘触发//event.events=EPOLLIN;    //LT 水平触发(默认)event.data.fd=pfd[0];epoll_ctl(efd,EPOLL_CTL_ADD,pfd[0],&event);while(1){res=epoll_wait(efd,resevent,10,-1);printf("res %d\n",res);if(resevent[0].data.fd==pfd[0]){len=read(pfd[0],buf,MAXLINE/2);write(STDOUT_FILENO,buf,len);}}close(pfd[0]);}}

5.4.3 服务器的边缘触发和水平触发

边缘触发(非阻塞模式):

#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <unistd.h>#define MAXLINE 10
#define SERV_PORT 8080int main(void)
{struct sockaddr_in servaddr, cliaddr;socklen_t cliaddr_len;int listenfd, connfd;char buf[MAXLINE];char str[INET_ADDRSTRLEN];int i, efd;listenfd = socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(SERV_PORT);bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));listen(listenfd, 20);struct epoll_event event;struct epoll_event resevent[10];int res, len;efd = epoll_create(10);event.events = EPOLLIN | EPOLLET;             /* ET 边沿触发 ,默认是水平触发 *///event.events=EPOLLIN;printf("Accepting connections ...\n");cliaddr_len = sizeof(cliaddr);connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);printf("received from %s at PORT %d\n",inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),ntohs(cliaddr.sin_port));event.data.fd = connfd;epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);while (1) {res = epoll_wait(efd, resevent, 10, -1);printf("res %d\n", res);if (resevent[0].data.fd == connfd) {len = read(connfd, buf, MAXLINE/2);write(STDOUT_FILENO, buf, len);}}return 0;
}

水平触发:

#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <unistd.h>#define MAXLINE 10
#define SERV_PORT 8080int main(void)
{struct sockaddr_in servaddr, cliaddr;socklen_t cliaddr_len;int listenfd, connfd;char buf[MAXLINE];char str[INET_ADDRSTRLEN];int i, efd;listenfd = socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(SERV_PORT);bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));listen(listenfd, 20);struct epoll_event event;struct epoll_event resevent[10];int res, len;efd = epoll_create(10);//event.events = EPOLLIN | EPOLLET;             /* ET 边沿触发 ,默认是水平触发 */event.events=EPOLLIN;printf("Accepting connections ...\n");cliaddr_len = sizeof(cliaddr);connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);printf("received from %s at PORT %d\n",inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),ntohs(cliaddr.sin_port));event.data.fd = connfd;epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);while (1) {res = epoll_wait(efd, resevent, 10, -1);printf("res %d\n", res);if (resevent[0].data.fd == connfd) {len = read(connfd, buf, MAXLINE/2);write(STDOUT_FILENO, buf, len);}}return 0;
}

客户端程序:

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>
#include<arpa/inet.h>
#define MAXLINE 10
#define SERV_PORT 8080int main(int argc, char *argv[])
{struct sockaddr_in servaddr;char buf[MAXLINE];int sockfd, i;char ch = 'a';sockfd = socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);servaddr.sin_port = htons(SERV_PORT);connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));while (1) {for (i = 0; i < MAXLINE/2; i++)buf[i] = ch;buf[i-1] = '\n';ch++;for (; i < MAXLINE; i++)buf[i] = ch;buf[i-1] = '\n';ch++;write(sockfd, buf, sizeof(buf));sleep(5);}close(sockfd);return 0;
}

5.4 边缘触发但是能一次读完

边缘触发但是能一次读完,不通过使用epoll_wait的触发,可以一次读完所有的数据,这其中的原理是:将文件描述符设置为非阻塞,通过while ((len = read(connfd, buf, MAXLINE/2)) > 0) write(STDOUT_FILENO, buf, len); ,可以一次将connfd中的所有数据读完所有数据。

ET的非阻塞模式比LT模式效率要高,因为ET减少了epoll_wait()的使用。

结论:epoll 的 ET模式, 高效模式,但是只支持 非阻塞模式。 --- 忙轮询。struct epoll_event event;event.events = EPOLLIN | EPOLLET;epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &event);	int flg = fcntl(cfd, F_GETFL);	flg |= O_NONBLOCK;fcntl(cfd, F_SETFL, flg);优点:高效。突破1024文件描述符。缺点:不能跨平台。 Linux。

6.epoll反应堆模型

6.1 反应堆模型

epoll 反应堆模型:epoll ET模式 + 非阻塞、轮询 + void *ptr。原来:	socket、bind、listen -- epoll_create 创建监听 红黑树 --  返回 epfd -- epoll_ctl() 向树上添加一个监听fd -- while1---- epoll_wait 监听 -- 对应监听fd有事件产生 -- 返回 监听满足数组。 -- 判断返回数组元素 -- lfd满足 -- Accept -- cfd 满足 -- read() ---->-- write回去。反应堆:不但要监听 cfd 的读事件、还要监听cfd的写事件。socket、bind、listen -- epoll_create 创建监听 红黑树 --  返回 epfd -- epoll_ctl() 向树上添加一个监听fd -- while1---- epoll_wait 监听 -- 对应监听fd有事件产生 -- 返回 监听满足数组。 -- 判断返回数组元素 -- lfd满足 -- Accept -- cfd 满足 -- read() ---->-- cfd从监听红黑树上摘下 -- EPOLLOUT -- 回调函数 -- epoll_ctl() -- EPOLL_CTL_ADD 重新放到红黑上监听写事件-- 等待 epoll_wait 返回 -- 说明 cfd 可写 -- write回去 -- cfd从监听红黑树上摘下 -- EPOLLIN -- epoll_ctl() -- EPOLL_CTL_ADD 重新放到红黑上监听读事件 -- epoll_wait 监听

反应堆的理解:加入IO转接之后,有了事件,server才去处理,这里反应堆也是这样,由于网络环境复杂,服务器处理数据之后,可能并不能直接写回去,比如遇到网络繁忙或者对方缓冲区已经满了这种情况,就不能直接写回给客户端。反应堆就是在处理数据之后,监听写事件,能写会客户端了,才去做写回操作。写回之后,再改为监听读事件。如此循环。

6.2 epoll反应堆代码

/* 
*epoll基于非阻塞I/O事件驱动 
*/  
#include <stdio.h>  
#include <sys/socket.h>  
#include <sys/epoll.h>  
#include <arpa/inet.h>  
#include <fcntl.h>  
#include <unistd.h>  
#include <errno.h>  
#include <string.h>  
#include <stdlib.h>  
#include <time.h>  #define MAX_EVENTS  1024                                    //监听上限数  
#define BUFLEN 4096  
#define SERV_PORT   8080  void recvdata(int fd, int events, void *arg);  
void senddata(int fd, int events, void *arg);  /* 描述就绪文件描述符相关信息 */  struct myevent_s {  int fd;                                                 //要监听的文件描述符  int events;                                             //对应的监听事件  void *arg;                                              //泛型参数  void (*call_back)(int fd, int events, void *arg);       //回调函数  int status;                                             //是否在监听:1->在红黑树上(监听), 0->不在(不监听)  char buf[BUFLEN];  int len;  long last_active;                                       //记录每次加入红黑树 g_efd 的时间值  
};  int g_efd;                                                  //全局变量, 保存epoll_create返回的文件描述符  
struct myevent_s g_events[MAX_EVENTS+1];                    //自定义结构体类型数组. +1-->listen fd  /*将结构体 myevent_s 成员变量 初始化*/  void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg)  
{  ev->fd = fd;  ev->call_back = call_back;  ev->events = 0;  ev->arg = arg;  ev->status = 0;  memset(ev->buf, 0, sizeof(ev->buf));  ev->len = 0;  ev->last_active = time(NULL);                       //调用eventset函数的时间  return;  
}  /* 向 epoll监听的红黑树 添加一个 文件描述符 */  //eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);  
void eventadd(int efd, int events, struct myevent_s *ev)  
{  struct epoll_event epv = {0, {0}};  int op;  epv.data.ptr = ev;  epv.events = ev->events = events;       //EPOLLIN 或 EPOLLOUT  if (ev->status == 0) {                                          //已经在红黑树 g_efd 里  op = EPOLL_CTL_ADD;                 //将其加入红黑树 g_efd, 并将status置1  ev->status = 1;  }  if (epoll_ctl(efd, op, ev->fd, &epv) < 0)                       //实际添加/修改  printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);  else  printf("event add OK [fd=%d], op=%d, events[%0X]\n", ev->fd, op, events);  return ;  
}  /* 从epoll 监听的 红黑树中删除一个 文件描述符*/  void eventdel(int efd, struct myevent_s *ev)  
{  struct epoll_event epv = {0, {0}};  if (ev->status != 1)                                        //不在红黑树上  return ;  //epv.data.ptr = ev;  epv.data.ptr = NULL;  ev->status = 0;                                             //修改状态  epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv);                //从红黑树 efd 上将 ev->fd 摘除  return ;  
}  /*  当有文件描述符就绪, epoll返回, 调用该函数 与客户端建立链接 */  void acceptconn(int lfd, int events, void *arg)  
{  struct sockaddr_in cin;  socklen_t len = sizeof(cin);  int cfd, i;  if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1) {  if (errno != EAGAIN && errno != EINTR) {  /* 暂时不做出错处理 */  }  printf("%s: accept, %s\n", __func__, strerror(errno));  return ;  }  do {  for (i = 0; i < MAX_EVENTS; i++)                                //从全局数组g_events中找一个空闲元素  if (g_events[i].status == 0)                                //类似于select中找值为-1的元素  break;                                                  //跳出 for  if (i == MAX_EVENTS) {  printf("%s: max connect limit[%d]\n", __func__, MAX_EVENTS);  break;                                                      //跳出do while(0) 不执行后续代码  }  int flag = 0;  if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) {             //将cfd也设置为非阻塞  printf("%s: fcntl nonblocking failed, %s\n", __func__, strerror(errno));  break;  }  /* 给cfd设置一个 myevent_s 结构体, 回调函数 设置为 recvdata */  eventset(&g_events[i], cfd, recvdata, &g_events[i]);     eventadd(g_efd, EPOLLIN, &g_events[i]);                         //将cfd添加到红黑树g_efd中,监听读事件  } while(0);  printf("new connect [%s:%d][time:%ld], pos[%d]\n",   inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i);  return ; 
}  void recvdata(int fd, int events, void *arg)  {  struct myevent_s *ev = (struct myevent_s *)arg;  int len;  len = recv(fd, ev->buf, sizeof(ev->buf), 0);            //读文件描述符, 数据存入myevent_s成员buf中  eventdel(g_efd, ev);        //将该节点从红黑树上摘除  if (len > 0) {  ev->len = len;  ev->buf[len] = '\0';                                //手动添加字符串结束标记  printf("C[%d]:%s\n", fd, ev->buf);  eventset(ev, fd, senddata, ev);                     //设置该 fd 对应的回调函数为 senddata  eventadd(g_efd, EPOLLOUT, ev);                      //将fd加入红黑树g_efd中,监听其写事件  } else if (len == 0) {  close(ev->fd);  /* ev-g_events 地址相减得到偏移元素位置 */  printf("[fd=%d] pos[%ld], closed\n", fd, ev-g_events);  } else {  close(ev->fd);  printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));  }  return;  }  void senddata(int fd, int events, void *arg)  {  struct myevent_s *ev = (struct myevent_s *)arg;  int len;  len = send(fd, ev->buf, ev->len, 0);                    //直接将数据 回写给客户端。未作处理  eventdel(g_efd, ev);                                //从红黑树g_efd中移除  if (len > 0) {  printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf);  eventset(ev, fd, recvdata, ev);                     //将该fd的 回调函数改为 recvdata  eventadd(g_efd, EPOLLIN, ev);                       //从新添加到红黑树上, 设为监听读事件  } else {  close(ev->fd);                                      //关闭链接  printf("send[fd=%d] error %s\n", fd, strerror(errno));  }  return ;  }  /*创建 socket, 初始化lfd */  void initlistensocket(int efd, short port)  {  struct sockaddr_in sin;  int lfd = socket(AF_INET, SOCK_STREAM, 0);  fcntl(lfd, F_SETFL, O_NONBLOCK);                                            //将socket设为非阻塞  memset(&sin, 0, sizeof(sin));                                               //bzero(&sin, sizeof(sin))  sin.sin_family = AF_INET;  sin.sin_addr.s_addr = INADDR_ANY;  sin.sin_port = htons(port);  bind(lfd, (struct sockaddr *)&sin, sizeof(sin));  listen(lfd, 20);  /* void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg);  */  eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]);  /* void eventadd(int efd, int events, struct myevent_s *ev) */  eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);  return ;  }  int main(int argc, char *argv[])  {  unsigned short port = SERV_PORT;  if (argc == 2)  port = atoi(argv[1]);                           //使用用户指定端口.如未指定,用默认端口  g_efd = epoll_create(MAX_EVENTS+1);                 //创建红黑树,返回给全局 g_efd   if (g_efd <= 0)  printf("create efd in %s err %s\n", __func__, strerror(errno));  initlistensocket(g_efd, port);                      //初始化监听socket  struct epoll_event events[MAX_EVENTS+1];            //保存已经满足就绪事件的文件描述符数组   printf("server running:port[%d]\n", port);  int checkpos = 0, i;  while (1) {  /* 超时验证,每次测试100个链接,不测试listenfd 当客户端60秒内没有和服务器通信,则关闭此客户端链接 */  long now = time(NULL);                          //当前时间  for (i = 0; i < 100; i++, checkpos++) {         //一次循环检测100个。 使用checkpos控制检测对象   if (checkpos == MAX_EVENTS)  checkpos = 0;  if (g_events[checkpos].status != 1)         //不在红黑树 g_efd 上  continue;  long duration = now - g_events[checkpos].last_active;       //客户端不活跃的世间  if (duration >= 60) {  close(g_events[checkpos].fd);                           //关闭与该客户端链接  printf("[fd=%d] timeout\n", g_events[checkpos].fd);  eventdel(g_efd, &g_events[checkpos]);                   //将该客户端 从红黑树 g_efd移除  }  }  /*监听红黑树g_efd, 将满足的事件的文件描述符加至events数组中, 1秒没有事件满足, 返回 0*/  int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000);  if (nfd < 0) {  printf("epoll_wait error, exit\n");  break;  }  for (i = 0; i < nfd; i++) {  /*使用自定义结构体myevent_s类型指针, 接收 联合体data的void *ptr成员*/  struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr;    if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {           //读就绪事件  ev->call_back(ev->fd, events[i].events, ev->arg);  //lfd  EPOLLIN    }  if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {         //写就绪事件  ev->call_back(ev->fd, events[i].events, ev->arg);  }  }  }  /* 退出前释放所有资源 */  return 0;  } 

7.心跳包

TCP保活机制

  1. 心跳包

由应用程序自己发送心跳包来检测连接是否正常,大致的方法是:服务器在一个 Timer事件中定时向客户端发送一个短小精悍的数据包,然后启动一个低级别的线程,在该线程中不断检测客户端的回应, 如果在一定时间内没有收到客户端的回应,即认为客户端已经掉线;同样,如果客户端在一定时间内没有收到服务器的心跳包,则认为连接不可用。

心跳检测机制
在TCP网络通信中,经常会出现客户端和服务器之间的非正常断开,需要实时检测查询链接状态。常用的解决方法就是在程序中加入心跳机制。

Heart-Beat线程
这个是最常用的简单方法。在接收和发送数据时个人设计一个守护进程(线程),定时发送Heart-Beat包,客户端/服务器收到该小包后,立刻返回相应的包即可检测对方是否实时在线。

该方法的好处是通用,但缺点就是会改变现有的通讯协议!大家一般都是使用业务层心跳来处理,主要是灵活可控。

UNIX网络编程不推荐使用SO_KEEPALIVE来做心跳检测,还是在业务层以心跳包做检测比较好,也方便控制。

  1. 乒乓包

举例:微信朋友圈有人评论,客户端怎么知道有人评论?服务器怎么将评论发给客户端的?

微信客户端每隔一段时间就向服务器询问,是否有人评论?
当服务器检查到有人给评论时,服务器发送一个乒乓包给客户端,该乒乓包中携带的数据是[此时有人评论的标志位]
注:步骤1和2,服务器和客户端不需要建立连接,只是发送简单的乒乓包。
当客户端接收到服务器回复的带有评论标志位的乒乓包后,才真正的去和服务器通过三次握手建立连接;建立连接后,服务器将评论的数据发送给客户端。

注意:乒乓包是携带很简单的数据的包

  1. 设置TCP属性: SO_KEEPALIVE

1.因为要考虑到一个服务器通常会连接多个客户端,因此由用户在应用层自己实现心跳包,代码较多 且稍显复杂,而利用TCP/IP协议层为内置的KeepAlive功能来实现心跳功能则简单得多。

2.不论是服务端还是客户端,一方开启KeepAlive功能后,就会自动在规定时间内向对方发送心跳包, 而另一方在收到心跳包后就会自动回复,以告诉对方我仍然在线。

3.因为开启KeepAlive功能需要消耗额外的宽带和流量,所以TCP协议层默认并不开启KeepAlive功 能,尽管这微不足道,但在按流量计费的环境下增加了费用,另一方面,KeepAlive设置不合理时可能会 因为短暂的网络波动而断开健康的TCP连接。并且,默认的KeepAlive超时需要7,200,000 MilliSeconds, 即2小时,探测次数为5次。对于很多服务端应用程序来说,2小时的空闲时间太长。

4.因此,我们需要手工开启KeepAlive功能并设置合理的KeepAlive参数。

在《UNIX网络编程第1卷》中也有详细的阐述:
SO_KEEPALIVE:保持连接,检测对方主机是否崩溃,避免(服务器)永远阻塞于TCP连接的输入。设置该选项后,如果2小时内在此套接口的任一方向都没有数据交换,TCP就自动给对方 发一个保持存活探测分节(keepalive
probe)。这是一个对方必须响应的TCP分节.它会导致以下三种情况:

对方接收一切正常:以期望的ACK响应。2小时后,TCP将发出另一个探测分节。
对方已崩溃且已重新启动:以RST响应。套接口的待处理错误被置为ECONNRESET,套接口本身则被关闭。
对方无任何响应:源自berkeley的TCP发送另外8个探测分节,相隔75秒一个,试图得到一个响应。在发出第一个探测分节11分钟15秒后若仍无响应就放弃。套接口的待处理错误被置为ETIMEOUT,套接口本身则被关闭。如ICMP错误是“host unreachable(主机不可达)”,说明对方主机并没有崩溃,但是不可达,这种情况下待处理错误被置为EHOSTUNREACH。

根据上面的介绍可以知道对端以一种非优雅的方式断开连接的时候,可以设置SO_KEEPALIVE属性使得在2小时以后发现对方的TCP连接是否依然存在。如果不能接受如此之长的等待时间,从TCP-Keepalive-HOWTO上可以知道一共有两种方式可以设置:

修改内核关于网络方面的配置参数
SOL_TCP字段的TCP_KEEPIDLE,TCP_KEEPINTVL,TCP_KEEPCNT三个选项
int                 keepIdle = 6;     /*开始首次KeepAlive探测前的TCP空闭时间 */
int                 keepInterval = 5; /* 两次KeepAlive探测间的时间间隔  */
int                 keepCount = 3;    /* 判定断开前的KeepAlive探测次数 */
Setsockopt(listenfd, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle));
Setsockopt(listenfd, SOL_TCP,TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval));
Setsockopt(listenfd,SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount)); 

8.线程池

在这里插入图片描述

8.1 线程池代码

#ifndef __THREADPOOL_H_
#define __THREADPOOL_H_typedef struct threadpool_t threadpool_t;/*** @function threadpool_create* @descCreates a threadpool_t object.* @param thr_num  thread num* @param max_thr_num  max thread size* @param queue_max_size   size of the queue.* @return a newly created thread pool or NULL*/
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);/*** @function threadpool_add* @desc add a new task in the queue of a thread pool* @param pool     Thread pool to which add the task.* @param function Pointer to the function that will perform the task.* @param argument Argument to be passed to the function.* @return 0 if all goes well,else -1*/
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);/*** @function threadpool_destroy* @desc Stops and destroys a thread pool.* @param pool  Thread pool to destroy.* @return 0 if destory success else -1*/
int threadpool_destroy(threadpool_t *pool);/*** @desc get the thread num* @pool pool threadpool* @return # of the thread*/
int threadpool_all_threadnum(threadpool_t *pool);/*** desc get the busy thread num* @param pool threadpool* return # of the busy thread*/
int threadpool_busy_threadnum(threadpool_t *pool);#endif
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include "threadpool.h"#define DEFAULT_TIME 10                 /*10s检测一次*/
#define MIN_WAIT_TASK_NUM 10            /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/ 
#define DEFAULT_THREAD_VARY 10          /*每次创建和销毁线程的个数*/
#define true 1
#define false 0typedef struct {void *(*function)(void *);          /* 函数指针,回调函数 */void *arg;                          /* 上面函数的参数 */
} threadpool_task_t;                    /* 各子线程任务结构体 *//* 描述线程池相关信息 */struct threadpool_t {pthread_mutex_t lock;               /* 用于锁住本结构体 */    pthread_mutex_t thread_counter;     /* 记录忙状态线程个数de琐 -- busy_thr_num */pthread_cond_t queue_not_full;      /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */pthread_cond_t queue_not_empty;     /* 任务队列里不为空时,通知等待任务的线程 */pthread_t *threads;                 /* 存放线程池中每个线程的tid。数组 */pthread_t adjust_tid;               /* 存管理线程tid */threadpool_task_t *task_queue;      /* 任务队列(数组首地址) */int min_thr_num;                    /* 线程池最小线程数 */int max_thr_num;                    /* 线程池最大线程数 */int live_thr_num;                   /* 当前存活线程个数 */int busy_thr_num;                   /* 忙状态线程个数 */int wait_exit_thr_num;              /* 要销毁的线程个数 */int queue_front;                    /* task_queue队头下标 */int queue_rear;                     /* task_queue队尾下标 */int queue_size;                     /* task_queue队中实际任务数 */int queue_max_size;                 /* task_queue队列可容纳任务数上限 */int shutdown;                       /* 标志位,线程池使用状态,true或false */
};void *threadpool_thread(void *threadpool);void *adjust_thread(void *threadpool);int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);//threadpool_create(3,100,100);  
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{int i;threadpool_t *pool = NULL;          /* 线程池 结构体 */do {if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {  printf("malloc threadpool fail");break;                                      /*跳出do while*/}pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num;               /* 活着的线程数 初值=最小线程数 */pool->wait_exit_thr_num = 0;pool->queue_size = 0;                           /* 有0个产品 */pool->queue_max_size = queue_max_size;          /* 最大任务队列数 */pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false;                         /* 不关闭线程池 *//* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool->threads == NULL) {printf("malloc threads fail");break;}memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);/* 给 任务队列 开辟空间 */pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);if (pool->task_queue == NULL) {printf("malloc task_queue fail");break;}/* 初始化互斥琐、条件变量 */if (pthread_mutex_init(&(pool->lock), NULL) != 0|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0){printf("init the lock or cond fail");break;}/* 启动 min_thr_num 个 work thread */for (i = 0; i < min_thr_num; i++) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);   /*pool指向当前线程池*/printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);}pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);     /* 创建管理者线程 */return pool;} while (0);threadpool_free(pool);      /* 前面代码调用失败时,释放poll存储空间 */return NULL;
}/* 向线程池中 添加一个任务 */
//threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 process: 小写---->大写*/int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{pthread_mutex_lock(&(pool->lock));/* ==为真,队列已经满, 调wait阻塞 */while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));}if (pool->shutdown) {pthread_cond_broadcast(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;}/* 清空 工作线程 调用的回调函数 的参数arg */if (pool->task_queue[pool->queue_rear].arg != NULL) {pool->task_queue[pool->queue_rear].arg = NULL;}/*添加任务到任务队列里*/pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;       /* 队尾指针移动, 模拟环形 */pool->queue_size++;/*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;
}/* 线程池中各个工作线程 */
void *threadpool_thread(void *threadpool)
{threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;while (true) {/* Lock must be taken to wait on conditional variable *//*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/pthread_mutex_lock(&(pool->lock));/*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/while ((pool->queue_size == 0) && (!pool->shutdown)) {  printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));/*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/if (pool->wait_exit_thr_num > 0) {pool->wait_exit_thr_num--;/*如果线程池里线程个数大于最小值时可以结束当前线程*/if (pool->live_thr_num > pool->min_thr_num) {printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pool->live_thr_num--;pthread_mutex_unlock(&(pool->lock));pthread_exit(NULL);}}}/*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/if (pool->shutdown) {pthread_mutex_unlock(&(pool->lock));printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pthread_detach(pthread_self());pthread_exit(NULL);     /* 线程自行结束 */}/*从任务队列里获取任务, 是一个出队操作*/task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;       /* 出队,模拟环形队列 */pool->queue_size--;/*通知可以有新的任务添加进来*/pthread_cond_broadcast(&(pool->queue_not_full));/*任务取出后,立即将 线程池琐 释放*/pthread_mutex_unlock(&(pool->lock));/*执行任务*/ printf("thread 0x%x start working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));                            /*忙状态线程数变量琐*/pool->busy_thr_num++;                                                   /*忙状态线程数+1*/pthread_mutex_unlock(&(pool->thread_counter));(*(task.function))(task.arg);                                           /*执行回调函数任务*///task.function(task.arg);                                              /*执行回调函数任务*//*任务结束处理*/ printf("thread 0x%x end working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));pool->busy_thr_num--;                                       /*处理掉一个任务,忙状态数线程数-1*/pthread_mutex_unlock(&(pool->thread_counter));}pthread_exit(NULL);
}/* 管理线程 */
void *adjust_thread(void *threadpool)
{int i;threadpool_t *pool = (threadpool_t *)threadpool;while (!pool->shutdown) {sleep(DEFAULT_TIME);                                    /*定时 对线程池管理*/pthread_mutex_lock(&(pool->lock));int queue_size = pool->queue_size;                      /* 关注 任务数 */int live_thr_num = pool->live_thr_num;                  /* 存活 线程数 */pthread_mutex_unlock(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num;                  /* 忙着的线程数 */pthread_mutex_unlock(&(pool->thread_counter));/* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {pthread_mutex_lock(&(pool->lock));  int add = 0;/*一次增加 DEFAULT_THREAD 个线程*/for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY&& pool->live_thr_num < pool->max_thr_num; i++) {if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);add++;pool->live_thr_num++;}}pthread_mutex_unlock(&(pool->lock));}/* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/if ((busy_thr_num * 2) < live_thr_num  &&  live_thr_num > pool->min_thr_num) {/* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;      /* 要销毁的线程数 设置为10 */pthread_mutex_unlock(&(pool->lock));for (i = 0; i < DEFAULT_THREAD_VARY; i++) {/* 通知处在空闲状态的线程, 他们会自行终止*/pthread_cond_signal(&(pool->queue_not_empty));}}}return NULL;
}int threadpool_destroy(threadpool_t *pool)
{int i;if (pool == NULL) {return -1;}pool->shutdown = true;/*先销毁管理线程*/pthread_join(pool->adjust_tid, NULL);for (i = 0; i < pool->live_thr_num; i++) {/*通知所有的空闲线程*/pthread_cond_broadcast(&(pool->queue_not_empty));}for (i = 0; i < pool->live_thr_num; i++) {pthread_join(pool->threads[i], NULL);}threadpool_free(pool);return 0;
}int threadpool_free(threadpool_t *pool)
{if (pool == NULL) {return -1;}if (pool->task_queue) {free(pool->task_queue);}if (pool->threads) {free(pool->threads);pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_empty));pthread_cond_destroy(&(pool->queue_not_full));}free(pool);pool = NULL;return 0;
}int threadpool_all_threadnum(threadpool_t *pool)
{int all_threadnum = -1;                 // 总线程数pthread_mutex_lock(&(pool->lock));all_threadnum = pool->live_thr_num;     // 存活线程数pthread_mutex_unlock(&(pool->lock));return all_threadnum;
}int threadpool_busy_threadnum(threadpool_t *pool)
{int busy_threadnum = -1;                // 忙线程数pthread_mutex_lock(&(pool->thread_counter));busy_threadnum = pool->busy_thr_num;    pthread_mutex_unlock(&(pool->thread_counter));return busy_threadnum;
}int is_thread_alive(pthread_t tid)
{int kill_rc = pthread_kill(tid, 0);     //发0号信号,测试线程是否存活if (kill_rc == ESRCH) {return false;}return true;
}/*测试*/ #if 1/* 线程池中的线程,模拟处理业务 */
void *process(void *arg)
{printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),(int)arg);sleep(1);                           //模拟 小---大写printf("task %d is end\n",(int)arg);return NULL;
}int main(void)
{/*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/threadpool_t *thp = threadpool_create(3,100,100);   /*创建线程池,池里最小3个线程,最大100,队列最大100*/printf("pool inited");//int *num = (int *)malloc(sizeof(int)*20);int num[20], i;for (i = 0; i < 20; i++) {num[i] = i;printf("add task %d\n",i);/*int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg) */threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 */}sleep(10);                                          /* 等子线程完成任务 */threadpool_destroy(thp);return 0;
}#endif

8.2 请问怎么实现线程池

参考回答:

  1. 设置一个生产者消费者队列,作为临界资源
  2. 初始化n个线程,并让其运行起来,加锁去队列取任务运行
  3. 当任务队列为空的时候,所有线程阻塞
  4. 当生产者队列来了一个任务后,先对队列加锁,把任务挂在到队列上,然后使用条件变量去通知阻塞中的一个线程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/482988.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

年终盘点:2021年中国科技的重大突破

来源&#xff1a;科技日报2021年已经步入尾声&#xff0c;过去的一年是科技界屡创新高、收获满仓的一年。这一年&#xff0c;恰逢中国共产党百年华诞&#xff0c;我国科技界更是取得多项重要突破。量子计算获得重大进展&#xff0c;使我国成为唯一在两个物理体系中实现量子计算…

vue学习笔记-03-浅谈组件-概念,入门,如何用props给组件传值?

vue学习笔记-03-浅谈组件-概念&#xff0c;入门&#xff0c;如何用props给组件传值&#xff1f; 文章目录vue学习笔记-03-浅谈组件-概念&#xff0c;入门&#xff0c;如何用props给组件传值&#xff1f;什么是组件&#xff1f;为什么要使用组件&#xff1f;如何使用组件呢&…

盘点:2021年度物理学十大突破|《物理世界》

来源&#xff1a;物理世界作者&#xff1a;哈米什约翰斯顿&#xff08;Hamish Johnston&#xff09;译者&#xff1a;王晓涛、乔琦2021年12月14日&#xff0c;《物理世界》&#xff08;Physics World&#xff09;编辑从其网站发表的近600项研究进展中评选出了年度物理学领域十大…

Python实现二叉树的遍历

二叉树是有限个元素的集合&#xff0c;该集合或者为空、或者有一个称为根节点&#xff08;root&#xff09;的元素及两个互不相交的、分别被称为左子树和右子树的二叉树组成。 二叉树的每个结点至多只有二棵子树(不存在度大于2的结点)&#xff0c;二叉树的子树有左右之分&#…

操作系统学习笔记-02-1.2-什么是操作系统

1.2什么是操作系统 没有一个完整&#xff0c;精确&#xff0c;公认的定义从功能和特点上来介绍操作系统 用户角度上&#xff0c;操作系统是一个控制软件管理应用程序为应用程序提供服务杀死应用程序 资源管理管理外设&#xff0c;分配资源 操作系统架构层次 硬件之上应用程序之…

大脑活动与认知: 热力学与信息论的联系

来源&#xff1a;集智俱乐部作者&#xff1a;Guillem Collell、Jordi Fauquet译者:张澳审校&#xff1a;刘培源编辑&#xff1a;邓一雪导语信息和能量之间的关系已经在物理学、化学和生物学中得到了广泛的研究。然而&#xff0c;这种联系并没有在神经科学领域形式化。2015年&am…

离散数学学习笔记-01-随机试验与随机事件

文章目录1.1.1随机试验与随机事件引言随机事件1.1.2.样本空间与事件的集合表示基本概念1.1.3事件之间的关系1.包含2.并&#xff08;和&#xff09;引入概率论的三个要素&#xff1a;1.1.1随机试验与随机事件 引言 确定性&#xff08;必然&#xff09;&#xff1a;一定发生&am…

18-ESP8266 SDK开发基础入门篇--TCP 服务器 RTOS版,串口透传,TCP客户端控制LED

https://www.cnblogs.com/yangfengwu/p/11112015.html 先规定一下协议 aa 55 02 01 F1 4C 控制LED点亮 F1 4C为CRC高位和低位aa 55 02 00 30 8C 控制LED熄灭 30 8C为CRC高位和低位 aa 55 03 占空比(四字节 高位在前,低位在后) CRC校验高位,CRC校验低位 预留一个问题 我用客…

Ubuntu下的git使用指南

1.创建账号&#xff0c;绑定邮箱 在Git或者Gitee中创建一个Git账号或者Gitee账号&#xff0c;绑定邮箱&#xff0c;Ubuntu下的git命令对Git或者Gitee都有效。 2.安装git Ubuntu下下载git命令&#xff1a; sudo apt-get install git在下载完之后&#xff0c;可以通过git --v…

人类、动物和人工智能意识的新理论

来源&#xff1a;ScienceAI编译&#xff1a;萝卜皮德国波鸿鲁尔大学&#xff08;RUB&#xff09;的两名研究人员提出了一种新的意识理论。他们一直在探索意识的本质&#xff0c;大脑如何产生意识以及在何处产生意识&#xff0c;以及动物、人工智能是否也有意识等问题。新概念将…

ffmpeg——简单播放器代码

1.媒体文件播放总体过程 媒体文件——>解复用——>解码——>调用播放接口——>播放 2.解复用 2.1 什么是解复用&#xff1f; 解复用&#xff1a;将媒体文件分解为视频流和音频流 avformat_open_input() /*打开对应的文件&#xff0c;查找对应的解复用器&…

Nature公布2022年值得关注的七大科学事件, 中国一项入选!

来源&#xff1a;科技日报 记者 刘霞 文中图片来自《自然》杂志官网&#xff0c;版权属于原作者&#xff0c;仅用于学术分享尽管今年新冠疫情仍然肆虐&#xff0c;给人类带来不少悲剧和灾难&#xff0c;但“每朵乌云都镶有金边”&#xff01;新冠疫苗成为抗击疫情的有力武器、…

计算机网络学习笔记-01-概念,组成,功能,分类

计算机网络-2019 王道考研 计算机网络-1-概念&#xff0c;组成&#xff0c;功能&#xff0c;分类 文章目录1.概念&#xff0c;组成&#xff0c;功能&#xff0c;分类1.1概念1.2功能1.3组成部分1.3分类1.4思维导图总结1.概念&#xff0c;组成&#xff0c;功能&#xff0c;分类 …

深度学习如炼丹,你有哪些迷信做法?网友:Random seed=42结果好

来源&#xff1a;机器学习研究组订阅调参的苦与泪&#xff0c;还有那些「迷信的做法」。每个机器学习领域的研究者都会面临调参过程的考验&#xff0c;当往往说来容易做来难。调参的背后往往是通宵达旦的论文研究与 GitHub 查阅&#xff0c;并需要做大量的实验&#xff0c;不仅…

计算机网络学习笔记-02-标准化工作以及相关组织

计算机网络-2019 王道考研 计算机网络-02-标准化工作以及相关组织 文章目录2.标准化工作以及相关组织2.1标准化工作2.2 相关组织2.3思维导图2.标准化工作以及相关组织 2.1标准化工作 标准的分类 法定标准&#xff1a;OSI事实标准&#xff1a;TCP/IP 举例子&#xff1a;手机卡…

Go-cron定时任务

1、cron(计划任务) 按照约定的时间&#xff0c;定时的执行特定的任务&#xff08;job&#xff09;。 cron 表达式 表达了这种约定。 cron 表达式代表了一个时间集合&#xff0c;使用 6 个空格分隔的字段表示。 秒 分 时 日 月 星期 2、Linux的cron与Go的cron区别 linux 中的 cr…

2021年突破人类想象力的6大科学纪录

来源&#xff1a;《科技日报》 人类每年都在创造历史&#xff0c;科学家们也在不断创造新纪录&#xff0c;今年也不例外&#xff01;美国《科学新闻》杂志网站在12月20日的报道中&#xff0c;为我们梳理了2021年令人惊奇的6大科学纪录&#xff0c;包括发现迄今最古老的黑洞以及…

操作系统学习笔记-01-操作系统的概念(定义),功能和目标

操作系统学习笔记-2019 王道考研 操作系统-01-操作系统的概念&#xff08;定义&#xff09;&#xff0c;功能和目标 文章目录1-操作系统的概念&#xff08;定义&#xff09;&#xff0c;功能和目标1.1常见的操作系统1.2概念&#xff08;定义&#xff09;1.3-操作系统的功能和目…

从复现人类智能到挑战AI大工程,智能计算正经历什么考验?

来源&#xff1a;AI科技评论作者&#xff1a;杏花编辑&#xff1a;青暮世界顶级机器学习专家Michael I.Jordan曾提出一个观点&#xff0c;他认为人工智能正逐步由原理性研究&#xff0c;走向人工智能大工程。Michael I.Jordan认为&#xff0c;随着机器学习的蓬勃发展&#xff0…

操作系统学习笔记-02-操作系统的特征

操作系统学习笔记-2019 王道考研 操作系统-2-操作系统的特征 文章目录02-操作系统四个的特征2.1-知识概览2.2并发2.3共享2.3并发和共享的关系2.4虚拟2.5小结&#xff1a;虚拟技术2.6-异步2.7知识回顾02-操作系统四个的特征 2.1-知识概览 2.2并发 并发:指两个或多个事件在同-一…