文章目录
- 1.多路I/O转接服务器
- 2.select
- 3.select代码
- 4.poll
- 5.epoll
- 5.1 基础API
- 5.3 epoll代码
- 5.4 边沿触发和水平触发
- 5.4.1 水平出发LT
- 5.4.2 边缘触发
- 5.4.3 服务器的边缘触发和水平触发
- 5.4 边缘触发但是能一次读完
- 6.epoll反应堆模型
- 6.1 反应堆模型
- 6.2 epoll反应堆代码
- 7.心跳包
- 8.线程池
- 8.1 线程池代码
- 8.2 请问怎么实现线程池
1.多路I/O转接服务器
多路IO转接服务器也叫做多任务IO服务器。该类服务器实现的主旨思想是,不再由应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件。
2.select
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);nfds: 监控的文件描述符集里最大文件描述符加1,因为此参数会告诉内核检测前多少个文件描述符的状态readfds: 监控有读数据到达文件描述符集合,传入传出参数writefds: 监控写数据到达文件描述符集合,传入传出参数exceptfds: 监控异常发生达文件描述符集合,如带外数据到达异常,传入传出参数timeout: 定时阻塞监控时间,3种情况1.NULL,永远等下去2.设置timeval,等待固定时间3.设置timeval里时间均为0,检查描述字后立即返回,轮询struct timeval {long tv_sec; /* seconds */long tv_usec; /* microseconds */};void FD_CLR(int fd, fd_set *set); //把文件描述符集合里fd清0int FD_ISSET(int fd, fd_set *set); //测试文件描述符集合里fd是否置1void FD_SET(int fd, fd_set *set); //把文件描述符集合里fd位置1void FD_ZERO(fd_set *set); //把文件描述符集合里所有位清0返回值:成功:所监听的所有监听集合中满足条件的总数;失败:返回-1
3.select代码
- 服务器
/* server.c */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "wrap.h"#define MAXLINE 80
#define SERV_PORT 6666int main(int argc, char *argv[])
{int i, maxi, maxfd, listenfd, connfd, sockfd;int nready, client[FD_SETSIZE]; /* FD_SETSIZE 默认为 1024 */ssize_t n;fd_set rset, allset;char buf[MAXLINE];char str[INET_ADDRSTRLEN]; /* #define INET_ADDRSTRLEN 16 */socklen_t cliaddr_len;struct sockaddr_in cliaddr, servaddr;listenfd = Socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(SERV_PORT);Bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));Listen(listenfd, 20); /* 默认最大128 */maxfd = listenfd; /* 初始化 */maxi = -1; /* client[]的下标 */for (i = 0; i < FD_SETSIZE; i++)client[i] = -1; /* 用-1初始化client[] */FD_ZERO(&allset);FD_SET(listenfd, &allset); /* 构造select监控文件描述符集 */for ( ; ; ) {rset = allset; /* 每次循环时都从新设置select监控信号集 */nready = select(maxfd+1, &rset, NULL, NULL, NULL);if (nready < 0)perr_exit("select error");if (FD_ISSET(listenfd, &rset)) { /* new client connection */cliaddr_len = sizeof(cliaddr);connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);printf("received from %s at PORT %d\n",inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),ntohs(cliaddr.sin_port));for (i = 0; i < FD_SETSIZE; i++) {if (client[i] < 0) {client[i] = connfd; /* 保存accept返回的文件描述符到client[]里 */break;}}/* 达到select能监控的文件个数上限 1024 */if (i == FD_SETSIZE) {fputs("too many clients\n", stderr);exit(1);}FD_SET(connfd, &allset); /* 添加一个新的文件描述符到监控信号集里 */if (connfd > maxfd)maxfd = connfd; /* select第一个参数需要 */if (i > maxi)maxi = i; /* 更新client[]最大下标值 */if (--nready == 0)continue; /* 如果没有更多的就绪文件描述符继续回到上面select阻塞监听,负责处理未处理完的就绪文件描述符 */}for (i = 0; i <= maxi; i++) { /* 检测哪个clients 有数据就绪 */if ( (sockfd = client[i]) < 0)continue;if (FD_ISSET(sockfd, &rset)) {if ( (n = Read(sockfd, buf, MAXLINE)) == 0) {Close(sockfd); /* 当client关闭链接时,服务器端也关闭对应链接 */FD_CLR(sockfd, &allset); /* 解除select监控此文件描述符 */client[i] = -1;} else {int j;for (j = 0; j < n; j++)buf[j] = toupper(buf[j]);Write(sockfd, buf, n);}if (--nready == 0)break;}}}close(listenfd);return 0;
}
- 客户端
/* client.c */
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>
#include "wrap.h"#define MAXLINE 80
#define SERV_PORT 6666int main(int argc, char *argv[])
{struct sockaddr_in servaddr;char buf[MAXLINE];int sockfd, n;sockfd = Socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);servaddr.sin_port = htons(SERV_PORT);Connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));while (fgets(buf, MAXLINE, stdin) != NULL) {Write(sockfd, buf, strlen(buf));n = Read(sockfd, buf, MAXLINE);if (n == 0)printf("the other side has been closed.\n");elseWrite(STDOUT_FILENO, buf, n);}Close(sockfd);return 0;
}
4.poll
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);struct pollfd {int fd; /* 文件描述符 */short events; /* 监控的事件 */short revents; /* 监控事件中满足条件返回的事件 */};POLLIN 普通或带外优先数据可读,即POLLRDNORM | POLLRDBANDPOLLRDNORM 数据可读POLLRDBAND 优先级带数据可读POLLPRI 高优先级可读数据POLLOUT 普通或带外数据可写POLLWRNORM 数据可写POLLWRBAND 优先级带数据可写POLLERR 发生错误POLLHUP 发生挂起POLLNVAL 描述字不是一个打开的文件nfds 监控数组中有多少文件描述符需要被监控timeout 毫秒级等待-1:阻塞等,#define INFTIM -1 Linux中没有定义此宏0:立即返回,不阻塞进程>0:等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值如果不再监控某个文件描述符时,可以把pollfd中,fd设置为-1,poll不再监控此pollfd,
下次返回时,把revents设置为0。
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<poll.h>
#include<errno.h>
#include<ctype.h>
#include<unistd.h>#define MAXLINE 80
#define SERV_PORT 8000
#define OPEN_MAX 1024int main()
{int i,j,maxi,listenfd,connfd,sockfd;int nready;ssize_t n;char buf[MAXLINE],str[INET_ADDRSTRLEN];socklen_t clilen;struct pollfd client[OPEN_MAX];struct sockaddr_in cliaddr,servaddr;listenfd=socket(AF_INET,SOCK_STREAM,0);int opt=1;setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));bzero(&servaddr,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_addr.s_addr=htonl(INADDR_ANY);servaddr.sin_port=htons(SERV_PORT);bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr));listen(listenfd,120);client[0].fd=listenfd;client[0].events=POLLIN;for(int i=1;i<OPEN_MAX;i++){client[i].fd=-1;}maxi=0;for( ; ;){nready=poll(client,maxi+1,-1);/*这个if语句监听listenfd是否有读事件,也就是是否有客户端连接请求,有的话accept连接,并将客户端的文件描述符添加到监听队列中,进行监听*/if(client[0].revents & POLLIN){clilen=sizeof(cliaddr);connfd=accept(listenfd,(struct sockaddr*)&cliaddr,&clilen);printf("received from %s at PORT %d\n",inet_ntop(AF_INET,&cliaddr.sin_addr,str,sizeof(str)),ntohs(cliaddr.sin_port));for(i=1;i<OPEN_MAX;i++){if(client[i].fd<0){client[i].fd=connfd;break;}}if(i==OPEN_MAX){perror("too many clients");}client[i].events=POLLIN;if(i>maxi){maxi=i;}if(--nready<=0){continue;}}for(int i=1;i<=maxi;i++){if((sockfd=client[i].fd)<0){continue;}if(client[i].revents & POLLIN){if((n=read(sockfd,buf,MAXLINE))<0){if(errno==ECONNRESET){close(sockfd);client[i].fd=-1;}else{perror("read error");}}else if(n==0){printf("client[%d] closed connection\n",i);close(sockfd);client[i].fd=-1;}else{for(j=0;j<n;j++){buf[j]=toupper(buf[j]);}printf("read 执行\n");write(sockfd,buf,n);}if(--nready<=0){break;}}}}return 0;
}
5.epoll
epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
目前epell是linux大规模并发网络程序中的热门首选模型。
epoll除了提供select/poll那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
可以使用cat命令查看一个进程可以打开的socket描述符上限。
cat /proc/sys/fs/file-max
如有需要,可以通过修改配置文件的方式修改该上限值
sudo vi /etc/security/limits.conf在文件尾部写入以下配置,soft软限制,hard硬限制。如下图所示。* soft nofile 65536* hard nofile 100000
5.1 基础API
- 创建一个epoll句柄,参数size用来告诉内核监听的文件描述符的个数,跟内存大小有关。
#include <sys/epoll.h>int epoll_create(int size) size:监听数目
- 控制某个epoll监控的文件描述符上的事件:注册、修改、删除。
#include <sys/epoll.h>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)epfd: 为epoll_creat的句柄op: 表示动作,用3个宏来表示:EPOLL_CTL_ADD (注册新的fd到epfd),EPOLL_CTL_MOD (修改已经注册的fd的监听事件),EPOLL_CTL_DEL (从epfd删除一个fd);event: 告诉内核需要监听的事件struct epoll_event {__uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */};typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64;} epoll_data_t;EPOLLIN : 表示对应的文件描述符可以读(包括对端SOCKET正常关闭)EPOLLOUT: 表示对应的文件描述符可以写EPOLLPRI: 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)EPOLLERR: 表示对应的文件描述符发生错误EPOLLHUP: 表示对应的文件描述符被挂断;EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)而言的EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
- 等待所监控文件描述符上有事件的产生,类似于select()调用
#include <sys/epoll.h>int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)events: 用来存内核得到事件的集合,maxevents: 告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,timeout: 是超时时间-1: 阻塞0: 立即返回,非阻塞>0: 指定毫秒返回值: 成功返回有多少文件描述符就绪,时间到时返回0,出错返回-1
5.3 epoll代码
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<arpa/inet.h>
#include<sys/epoll.h>
#include<errno.h>
#include<ctype.h>#define MAXLINE 8192
#define SERV_PORT 8000
#define OPEN_MAX 5000int main()
{int i,listenfd,connfd,sockfd;int n,num=0;ssize_t nready,efd,res;char buf[MAXLINE],str[INET_ADDRSTRLEN];socklen_t clilen;struct sockaddr_in cliaddr,servaddr;struct epoll_event tep,ep[OPEN_MAX]; //tep:epoll_ctl参数;ep[]:epoll_wait参数listenfd=socket(AF_INET,SOCK_STREAM,0);int opt=1;setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));bzero(&servaddr,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_addr.s_addr=htonl(INADDR_ANY);servaddr.sin_port=htons(SERV_PORT);bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr));listen(listenfd,20);efd=epoll_create(OPEN_MAX); //创建epoll模型,efd指向红黑树根节点if(efd==-1){perror("epoll_create error");}tep.events=EPOLLIN;tep.data.fd=listenfd; //指定lfd的监听事件为读res=epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&tep); //将lfd及对应的结构体添加到红黑树上,efd可以找到该树if(res==-1){perror("epoll_ctl error");}while(1){/*对红黑树上文件描述符监听,ep为struct epoll_event类型数组,OPEN_MAX为数组容量-1永久阻塞*/nready=epoll_wait(efd,ep,OPEN_MAX,-1);if(nready==-1){perror("epoll_wait error");}for(int i=0;i<nready;i++){if(!(ep[i].events&EPOLLIN))continue;if(ep[i].data.fd==listenfd){clilen=sizeof(cliaddr);connfd=accept(listenfd,(struct sockaddr*)&cliaddr,&clilen);//listenfd可读,说明有客户端连接printf("received from %s at PORT %d\n",inet_ntop(AF_INET,&cliaddr.sin_addr,str,sizeof(str)),ntohs(cliaddr.sin_port));printf("cfd %d---client %d\n",connfd,++num);tep.events=EPOLLIN;tep.data.fd=connfd;res=epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep); //将客户端文件描述符添加到红黑树上if(res==-1){perror("epoll_ctl error");}}else{sockfd=ep[i].data.fd;n=read(sockfd,buf,MAXLINE);if(n==0){ //读到0,说明客户端关闭连接res=epoll_ctl(efd,EPOLL_CTL_DEL,sockfd,NULL); //将该文件描述符从红黑树摘除if(res==-1){perror("epoll_ctl error");}close(sockfd);printf("client[%d] closed connection\n",sockfd);}else if(n<0){ //出错perror("read n<0 error:");res=epoll_ctl(efd,EPOLL_CTL_DEL,sockfd,NULL);close(sockfd);}else{for(int i=0;i<n;i++){buf[i]=toupper(buf[i]);}write(sockfd,buf,n);}}}}close(listenfd);close(efd);return 0;
}
5.4 边沿触发和水平触发
5.4.1 水平出发LT
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<sys/epoll.h>
#include<errno.h>#define MAXLINE 10int main()
{int efd,i;int pfd[2];pid_t pid;char buf[MAXLINE],ch='a';pipe(pfd);pid=fork();if(pid==0){close(pfd[0]);while(1){for(i=0;i<MAXLINE/2;i++){buf[i]=ch;}buf[i-1]='\n';ch++;for(;i<MAXLINE;i++){buf[i]=ch;}buf[i-1]='\n';ch++;write(pfd[1],buf,sizeof(buf));sleep(5);}close(pfd[1]);}else if(pid>0){struct epoll_event event;struct epoll_event resevent[10];int res,len;close(pfd[1]);efd=epoll_create(10);//event.events=EPOLLIN|EPOLLET; //边缘触发event.events=EPOLLIN; //水平触发,默认水平触发event.data.fd=pfd[0];epoll_ctl(efd,EPOLL_CTL_ADD,pfd[0],&event);while(1){res=epoll_wait(efd,resevent,10,-1);printf("res %d\n",res);if(resevent[0].data.fd==pfd[0]){len=read(pfd[0],buf,MAXLINE/2);write(STDOUT_FILENO,buf,len);}}close(pfd[0]);}}
zhaoxr@zhaoxr-ThinkPad-E450:~/select$ ./epoll_trigger
res 1
aaaa
res 1
bbbb
res 1
cccc
res 1
dddd
res 1
eeee
res 1
ffff
^C
5.4.2 边缘触发
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<sys/epoll.h>
#include<errno.h>#define MAXLINE 10int main()
{int efd,i;int pfd[2];pid_t pid;char buf[MAXLINE],ch='a';pipe(pfd);pid=fork();if(pid==0){close(pfd[0]);while(1){for(i=0;i<MAXLINE/2;i++){buf[i]=ch;}buf[i-1]='\n';ch++;for(;i<MAXLINE;i++){buf[i]=ch;}buf[i-1]='\n';ch++;write(pfd[1],buf,sizeof(buf));sleep(5);}close(pfd[1]);}else if(pid>0){struct epoll_event event;struct epoll_event resevent[10];int res,len;close(pfd[1]);efd=epoll_create(10);event.events=EPOLLIN|EPOLLET; //ET 边缘触发//event.events=EPOLLIN; //LT 水平触发(默认)event.data.fd=pfd[0];epoll_ctl(efd,EPOLL_CTL_ADD,pfd[0],&event);while(1){res=epoll_wait(efd,resevent,10,-1);printf("res %d\n",res);if(resevent[0].data.fd==pfd[0]){len=read(pfd[0],buf,MAXLINE/2);write(STDOUT_FILENO,buf,len);}}close(pfd[0]);}}
5.4.3 服务器的边缘触发和水平触发
边缘触发(非阻塞模式):
#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <unistd.h>#define MAXLINE 10
#define SERV_PORT 8080int main(void)
{struct sockaddr_in servaddr, cliaddr;socklen_t cliaddr_len;int listenfd, connfd;char buf[MAXLINE];char str[INET_ADDRSTRLEN];int i, efd;listenfd = socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(SERV_PORT);bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));listen(listenfd, 20);struct epoll_event event;struct epoll_event resevent[10];int res, len;efd = epoll_create(10);event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 *///event.events=EPOLLIN;printf("Accepting connections ...\n");cliaddr_len = sizeof(cliaddr);connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);printf("received from %s at PORT %d\n",inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),ntohs(cliaddr.sin_port));event.data.fd = connfd;epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);while (1) {res = epoll_wait(efd, resevent, 10, -1);printf("res %d\n", res);if (resevent[0].data.fd == connfd) {len = read(connfd, buf, MAXLINE/2);write(STDOUT_FILENO, buf, len);}}return 0;
}
水平触发:
#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <unistd.h>#define MAXLINE 10
#define SERV_PORT 8080int main(void)
{struct sockaddr_in servaddr, cliaddr;socklen_t cliaddr_len;int listenfd, connfd;char buf[MAXLINE];char str[INET_ADDRSTRLEN];int i, efd;listenfd = socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(SERV_PORT);bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));listen(listenfd, 20);struct epoll_event event;struct epoll_event resevent[10];int res, len;efd = epoll_create(10);//event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 */event.events=EPOLLIN;printf("Accepting connections ...\n");cliaddr_len = sizeof(cliaddr);connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);printf("received from %s at PORT %d\n",inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),ntohs(cliaddr.sin_port));event.data.fd = connfd;epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);while (1) {res = epoll_wait(efd, resevent, 10, -1);printf("res %d\n", res);if (resevent[0].data.fd == connfd) {len = read(connfd, buf, MAXLINE/2);write(STDOUT_FILENO, buf, len);}}return 0;
}
客户端程序:
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>
#include<arpa/inet.h>
#define MAXLINE 10
#define SERV_PORT 8080int main(int argc, char *argv[])
{struct sockaddr_in servaddr;char buf[MAXLINE];int sockfd, i;char ch = 'a';sockfd = socket(AF_INET, SOCK_STREAM, 0);bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);servaddr.sin_port = htons(SERV_PORT);connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));while (1) {for (i = 0; i < MAXLINE/2; i++)buf[i] = ch;buf[i-1] = '\n';ch++;for (; i < MAXLINE; i++)buf[i] = ch;buf[i-1] = '\n';ch++;write(sockfd, buf, sizeof(buf));sleep(5);}close(sockfd);return 0;
}
5.4 边缘触发但是能一次读完
边缘触发但是能一次读完,不通过使用epoll_wait的触发,可以一次读完所有的数据,这其中的原理是:将文件描述符设置为非阻塞,通过while ((len = read(connfd, buf, MAXLINE/2)) > 0) write(STDOUT_FILENO, buf, len);
,可以一次将connfd中的所有数据读完所有数据。
ET的非阻塞模式比LT模式效率要高,因为ET减少了epoll_wait()的使用。
结论:epoll 的 ET模式, 高效模式,但是只支持 非阻塞模式。 --- 忙轮询。struct epoll_event event;event.events = EPOLLIN | EPOLLET;epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &event); int flg = fcntl(cfd, F_GETFL); flg |= O_NONBLOCK;fcntl(cfd, F_SETFL, flg);优点:高效。突破1024文件描述符。缺点:不能跨平台。 Linux。
6.epoll反应堆模型
6.1 反应堆模型
epoll 反应堆模型:epoll ET模式 + 非阻塞、轮询 + void *ptr。原来: socket、bind、listen -- epoll_create 创建监听 红黑树 -- 返回 epfd -- epoll_ctl() 向树上添加一个监听fd -- while(1)---- epoll_wait 监听 -- 对应监听fd有事件产生 -- 返回 监听满足数组。 -- 判断返回数组元素 -- lfd满足 -- Accept -- cfd 满足 -- read() --- 小->大 -- write回去。反应堆:不但要监听 cfd 的读事件、还要监听cfd的写事件。socket、bind、listen -- epoll_create 创建监听 红黑树 -- 返回 epfd -- epoll_ctl() 向树上添加一个监听fd -- while(1)---- epoll_wait 监听 -- 对应监听fd有事件产生 -- 返回 监听满足数组。 -- 判断返回数组元素 -- lfd满足 -- Accept -- cfd 满足 -- read() --- 小->大 -- cfd从监听红黑树上摘下 -- EPOLLOUT -- 回调函数 -- epoll_ctl() -- EPOLL_CTL_ADD 重新放到红黑上监听写事件-- 等待 epoll_wait 返回 -- 说明 cfd 可写 -- write回去 -- cfd从监听红黑树上摘下 -- EPOLLIN -- epoll_ctl() -- EPOLL_CTL_ADD 重新放到红黑上监听读事件 -- epoll_wait 监听
反应堆的理解:加入IO转接之后,有了事件,server才去处理,这里反应堆也是这样,由于网络环境复杂,服务器处理数据之后,可能并不能直接写回去,比如遇到网络繁忙或者对方缓冲区已经满了这种情况,就不能直接写回给客户端。反应堆就是在处理数据之后,监听写事件,能写会客户端了,才去做写回操作。写回之后,再改为监听读事件。如此循环。
6.2 epoll反应堆代码
/*
*epoll基于非阻塞I/O事件驱动
*/
#include <stdio.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <time.h> #define MAX_EVENTS 1024 //监听上限数
#define BUFLEN 4096
#define SERV_PORT 8080 void recvdata(int fd, int events, void *arg);
void senddata(int fd, int events, void *arg); /* 描述就绪文件描述符相关信息 */ struct myevent_s { int fd; //要监听的文件描述符 int events; //对应的监听事件 void *arg; //泛型参数 void (*call_back)(int fd, int events, void *arg); //回调函数 int status; //是否在监听:1->在红黑树上(监听), 0->不在(不监听) char buf[BUFLEN]; int len; long last_active; //记录每次加入红黑树 g_efd 的时间值
}; int g_efd; //全局变量, 保存epoll_create返回的文件描述符
struct myevent_s g_events[MAX_EVENTS+1]; //自定义结构体类型数组. +1-->listen fd /*将结构体 myevent_s 成员变量 初始化*/ void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg)
{ ev->fd = fd; ev->call_back = call_back; ev->events = 0; ev->arg = arg; ev->status = 0; memset(ev->buf, 0, sizeof(ev->buf)); ev->len = 0; ev->last_active = time(NULL); //调用eventset函数的时间 return;
} /* 向 epoll监听的红黑树 添加一个 文件描述符 */ //eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);
void eventadd(int efd, int events, struct myevent_s *ev)
{ struct epoll_event epv = {0, {0}}; int op; epv.data.ptr = ev; epv.events = ev->events = events; //EPOLLIN 或 EPOLLOUT if (ev->status == 0) { //已经在红黑树 g_efd 里 op = EPOLL_CTL_ADD; //将其加入红黑树 g_efd, 并将status置1 ev->status = 1; } if (epoll_ctl(efd, op, ev->fd, &epv) < 0) //实际添加/修改 printf("event add failed [fd=%d], events[%d]\n", ev->fd, events); else printf("event add OK [fd=%d], op=%d, events[%0X]\n", ev->fd, op, events); return ;
} /* 从epoll 监听的 红黑树中删除一个 文件描述符*/ void eventdel(int efd, struct myevent_s *ev)
{ struct epoll_event epv = {0, {0}}; if (ev->status != 1) //不在红黑树上 return ; //epv.data.ptr = ev; epv.data.ptr = NULL; ev->status = 0; //修改状态 epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv); //从红黑树 efd 上将 ev->fd 摘除 return ;
} /* 当有文件描述符就绪, epoll返回, 调用该函数 与客户端建立链接 */ void acceptconn(int lfd, int events, void *arg)
{ struct sockaddr_in cin; socklen_t len = sizeof(cin); int cfd, i; if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1) { if (errno != EAGAIN && errno != EINTR) { /* 暂时不做出错处理 */ } printf("%s: accept, %s\n", __func__, strerror(errno)); return ; } do { for (i = 0; i < MAX_EVENTS; i++) //从全局数组g_events中找一个空闲元素 if (g_events[i].status == 0) //类似于select中找值为-1的元素 break; //跳出 for if (i == MAX_EVENTS) { printf("%s: max connect limit[%d]\n", __func__, MAX_EVENTS); break; //跳出do while(0) 不执行后续代码 } int flag = 0; if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) { //将cfd也设置为非阻塞 printf("%s: fcntl nonblocking failed, %s\n", __func__, strerror(errno)); break; } /* 给cfd设置一个 myevent_s 结构体, 回调函数 设置为 recvdata */ eventset(&g_events[i], cfd, recvdata, &g_events[i]); eventadd(g_efd, EPOLLIN, &g_events[i]); //将cfd添加到红黑树g_efd中,监听读事件 } while(0); printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i); return ;
} void recvdata(int fd, int events, void *arg) { struct myevent_s *ev = (struct myevent_s *)arg; int len; len = recv(fd, ev->buf, sizeof(ev->buf), 0); //读文件描述符, 数据存入myevent_s成员buf中 eventdel(g_efd, ev); //将该节点从红黑树上摘除 if (len > 0) { ev->len = len; ev->buf[len] = '\0'; //手动添加字符串结束标记 printf("C[%d]:%s\n", fd, ev->buf); eventset(ev, fd, senddata, ev); //设置该 fd 对应的回调函数为 senddata eventadd(g_efd, EPOLLOUT, ev); //将fd加入红黑树g_efd中,监听其写事件 } else if (len == 0) { close(ev->fd); /* ev-g_events 地址相减得到偏移元素位置 */ printf("[fd=%d] pos[%ld], closed\n", fd, ev-g_events); } else { close(ev->fd); printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno)); } return; } void senddata(int fd, int events, void *arg) { struct myevent_s *ev = (struct myevent_s *)arg; int len; len = send(fd, ev->buf, ev->len, 0); //直接将数据 回写给客户端。未作处理 eventdel(g_efd, ev); //从红黑树g_efd中移除 if (len > 0) { printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf); eventset(ev, fd, recvdata, ev); //将该fd的 回调函数改为 recvdata eventadd(g_efd, EPOLLIN, ev); //从新添加到红黑树上, 设为监听读事件 } else { close(ev->fd); //关闭链接 printf("send[fd=%d] error %s\n", fd, strerror(errno)); } return ; } /*创建 socket, 初始化lfd */ void initlistensocket(int efd, short port) { struct sockaddr_in sin; int lfd = socket(AF_INET, SOCK_STREAM, 0); fcntl(lfd, F_SETFL, O_NONBLOCK); //将socket设为非阻塞 memset(&sin, 0, sizeof(sin)); //bzero(&sin, sizeof(sin)) sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; sin.sin_port = htons(port); bind(lfd, (struct sockaddr *)&sin, sizeof(sin)); listen(lfd, 20); /* void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg); */ eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]); /* void eventadd(int efd, int events, struct myevent_s *ev) */ eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]); return ; } int main(int argc, char *argv[]) { unsigned short port = SERV_PORT; if (argc == 2) port = atoi(argv[1]); //使用用户指定端口.如未指定,用默认端口 g_efd = epoll_create(MAX_EVENTS+1); //创建红黑树,返回给全局 g_efd if (g_efd <= 0) printf("create efd in %s err %s\n", __func__, strerror(errno)); initlistensocket(g_efd, port); //初始化监听socket struct epoll_event events[MAX_EVENTS+1]; //保存已经满足就绪事件的文件描述符数组 printf("server running:port[%d]\n", port); int checkpos = 0, i; while (1) { /* 超时验证,每次测试100个链接,不测试listenfd 当客户端60秒内没有和服务器通信,则关闭此客户端链接 */ long now = time(NULL); //当前时间 for (i = 0; i < 100; i++, checkpos++) { //一次循环检测100个。 使用checkpos控制检测对象 if (checkpos == MAX_EVENTS) checkpos = 0; if (g_events[checkpos].status != 1) //不在红黑树 g_efd 上 continue; long duration = now - g_events[checkpos].last_active; //客户端不活跃的世间 if (duration >= 60) { close(g_events[checkpos].fd); //关闭与该客户端链接 printf("[fd=%d] timeout\n", g_events[checkpos].fd); eventdel(g_efd, &g_events[checkpos]); //将该客户端 从红黑树 g_efd移除 } } /*监听红黑树g_efd, 将满足的事件的文件描述符加至events数组中, 1秒没有事件满足, 返回 0*/ int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000); if (nfd < 0) { printf("epoll_wait error, exit\n"); break; } for (i = 0; i < nfd; i++) { /*使用自定义结构体myevent_s类型指针, 接收 联合体data的void *ptr成员*/ struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr; if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { //读就绪事件 ev->call_back(ev->fd, events[i].events, ev->arg); //lfd EPOLLIN } if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { //写就绪事件 ev->call_back(ev->fd, events[i].events, ev->arg); } } } /* 退出前释放所有资源 */ return 0; }
7.心跳包
TCP保活机制
- 心跳包
由应用程序自己发送心跳包来检测连接是否正常,大致的方法是:服务器在一个 Timer事件中定时向客户端发送一个短小精悍的数据包,然后启动一个低级别的线程,在该线程中不断检测客户端的回应, 如果在一定时间内没有收到客户端的回应,即认为客户端已经掉线;同样,如果客户端在一定时间内没有收到服务器的心跳包,则认为连接不可用。
心跳检测机制
在TCP网络通信中,经常会出现客户端和服务器之间的非正常断开,需要实时检测查询链接状态。常用的解决方法就是在程序中加入心跳机制。
Heart-Beat线程
这个是最常用的简单方法。在接收和发送数据时个人设计一个守护进程(线程),定时发送Heart-Beat包,客户端/服务器收到该小包后,立刻返回相应的包即可检测对方是否实时在线。
该方法的好处是通用,但缺点就是会改变现有的通讯协议!大家一般都是使用业务层心跳来处理,主要是灵活可控。
UNIX网络编程不推荐使用SO_KEEPALIVE来做心跳检测,还是在业务层以心跳包做检测比较好,也方便控制。
- 乒乓包
举例:微信朋友圈有人评论,客户端怎么知道有人评论?服务器怎么将评论发给客户端的?
微信客户端每隔一段时间就向服务器询问,是否有人评论?
当服务器检查到有人给评论时,服务器发送一个乒乓包给客户端,该乒乓包中携带的数据是[此时有人评论的标志位]
注:步骤1和2,服务器和客户端不需要建立连接,只是发送简单的乒乓包。
当客户端接收到服务器回复的带有评论标志位的乒乓包后,才真正的去和服务器通过三次握手建立连接;建立连接后,服务器将评论的数据发送给客户端。
注意:乒乓包是携带很简单的数据的包
- 设置TCP属性: SO_KEEPALIVE
1.因为要考虑到一个服务器通常会连接多个客户端,因此由用户在应用层自己实现心跳包,代码较多 且稍显复杂,而利用TCP/IP协议层为内置的KeepAlive功能来实现心跳功能则简单得多。
2.不论是服务端还是客户端,一方开启KeepAlive功能后,就会自动在规定时间内向对方发送心跳包, 而另一方在收到心跳包后就会自动回复,以告诉对方我仍然在线。
3.因为开启KeepAlive功能需要消耗额外的宽带和流量,所以TCP协议层默认并不开启KeepAlive功 能,尽管这微不足道,但在按流量计费的环境下增加了费用,另一方面,KeepAlive设置不合理时可能会 因为短暂的网络波动而断开健康的TCP连接。并且,默认的KeepAlive超时需要7,200,000 MilliSeconds, 即2小时,探测次数为5次。对于很多服务端应用程序来说,2小时的空闲时间太长。
4.因此,我们需要手工开启KeepAlive功能并设置合理的KeepAlive参数。
在《UNIX网络编程第1卷》中也有详细的阐述:
SO_KEEPALIVE:保持连接,检测对方主机是否崩溃,避免(服务器)永远阻塞于TCP连接的输入。设置该选项后,如果2小时内在此套接口的任一方向都没有数据交换,TCP就自动给对方 发一个保持存活探测分节(keepalive
probe)。这是一个对方必须响应的TCP分节.它会导致以下三种情况:
对方接收一切正常:以期望的ACK响应。2小时后,TCP将发出另一个探测分节。
对方已崩溃且已重新启动:以RST响应。套接口的待处理错误被置为ECONNRESET,套接口本身则被关闭。
对方无任何响应:源自berkeley的TCP发送另外8个探测分节,相隔75秒一个,试图得到一个响应。在发出第一个探测分节11分钟15秒后若仍无响应就放弃。套接口的待处理错误被置为ETIMEOUT,套接口本身则被关闭。如ICMP错误是“host unreachable(主机不可达)”,说明对方主机并没有崩溃,但是不可达,这种情况下待处理错误被置为EHOSTUNREACH。
根据上面的介绍可以知道对端以一种非优雅的方式断开连接的时候,可以设置SO_KEEPALIVE属性使得在2小时以后发现对方的TCP连接是否依然存在。如果不能接受如此之长的等待时间,从TCP-Keepalive-HOWTO上可以知道一共有两种方式可以设置:
修改内核关于网络方面的配置参数
SOL_TCP字段的TCP_KEEPIDLE,TCP_KEEPINTVL,TCP_KEEPCNT三个选项
int keepIdle = 6; /*开始首次KeepAlive探测前的TCP空闭时间 */
int keepInterval = 5; /* 两次KeepAlive探测间的时间间隔 */
int keepCount = 3; /* 判定断开前的KeepAlive探测次数 */
Setsockopt(listenfd, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle));
Setsockopt(listenfd, SOL_TCP,TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval));
Setsockopt(listenfd,SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount));
8.线程池
8.1 线程池代码
#ifndef __THREADPOOL_H_
#define __THREADPOOL_H_typedef struct threadpool_t threadpool_t;/*** @function threadpool_create* @descCreates a threadpool_t object.* @param thr_num thread num* @param max_thr_num max thread size* @param queue_max_size size of the queue.* @return a newly created thread pool or NULL*/
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);/*** @function threadpool_add* @desc add a new task in the queue of a thread pool* @param pool Thread pool to which add the task.* @param function Pointer to the function that will perform the task.* @param argument Argument to be passed to the function.* @return 0 if all goes well,else -1*/
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);/*** @function threadpool_destroy* @desc Stops and destroys a thread pool.* @param pool Thread pool to destroy.* @return 0 if destory success else -1*/
int threadpool_destroy(threadpool_t *pool);/*** @desc get the thread num* @pool pool threadpool* @return # of the thread*/
int threadpool_all_threadnum(threadpool_t *pool);/*** desc get the busy thread num* @param pool threadpool* return # of the busy thread*/
int threadpool_busy_threadnum(threadpool_t *pool);#endif
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include "threadpool.h"#define DEFAULT_TIME 10 /*10s检测一次*/
#define MIN_WAIT_TASK_NUM 10 /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/
#define DEFAULT_THREAD_VARY 10 /*每次创建和销毁线程的个数*/
#define true 1
#define false 0typedef struct {void *(*function)(void *); /* 函数指针,回调函数 */void *arg; /* 上面函数的参数 */
} threadpool_task_t; /* 各子线程任务结构体 *//* 描述线程池相关信息 */struct threadpool_t {pthread_mutex_t lock; /* 用于锁住本结构体 */ pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */pthread_t adjust_tid; /* 存管理线程tid */threadpool_task_t *task_queue; /* 任务队列(数组首地址) */int min_thr_num; /* 线程池最小线程数 */int max_thr_num; /* 线程池最大线程数 */int live_thr_num; /* 当前存活线程个数 */int busy_thr_num; /* 忙状态线程个数 */int wait_exit_thr_num; /* 要销毁的线程个数 */int queue_front; /* task_queue队头下标 */int queue_rear; /* task_queue队尾下标 */int queue_size; /* task_queue队中实际任务数 */int queue_max_size; /* task_queue队列可容纳任务数上限 */int shutdown; /* 标志位,线程池使用状态,true或false */
};void *threadpool_thread(void *threadpool);void *adjust_thread(void *threadpool);int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);//threadpool_create(3,100,100);
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{int i;threadpool_t *pool = NULL; /* 线程池 结构体 */do {if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { printf("malloc threadpool fail");break; /*跳出do while*/}pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num; /* 活着的线程数 初值=最小线程数 */pool->wait_exit_thr_num = 0;pool->queue_size = 0; /* 有0个产品 */pool->queue_max_size = queue_max_size; /* 最大任务队列数 */pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false; /* 不关闭线程池 *//* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool->threads == NULL) {printf("malloc threads fail");break;}memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);/* 给 任务队列 开辟空间 */pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);if (pool->task_queue == NULL) {printf("malloc task_queue fail");break;}/* 初始化互斥琐、条件变量 */if (pthread_mutex_init(&(pool->lock), NULL) != 0|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0){printf("init the lock or cond fail");break;}/* 启动 min_thr_num 个 work thread */for (i = 0; i < min_thr_num; i++) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); /*pool指向当前线程池*/printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);}pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool); /* 创建管理者线程 */return pool;} while (0);threadpool_free(pool); /* 前面代码调用失败时,释放poll存储空间 */return NULL;
}/* 向线程池中 添加一个任务 */
//threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 process: 小写---->大写*/int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{pthread_mutex_lock(&(pool->lock));/* ==为真,队列已经满, 调wait阻塞 */while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));}if (pool->shutdown) {pthread_cond_broadcast(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;}/* 清空 工作线程 调用的回调函数 的参数arg */if (pool->task_queue[pool->queue_rear].arg != NULL) {pool->task_queue[pool->queue_rear].arg = NULL;}/*添加任务到任务队列里*/pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 队尾指针移动, 模拟环形 */pool->queue_size++;/*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;
}/* 线程池中各个工作线程 */
void *threadpool_thread(void *threadpool)
{threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;while (true) {/* Lock must be taken to wait on conditional variable *//*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/pthread_mutex_lock(&(pool->lock));/*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/while ((pool->queue_size == 0) && (!pool->shutdown)) { printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));/*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/if (pool->wait_exit_thr_num > 0) {pool->wait_exit_thr_num--;/*如果线程池里线程个数大于最小值时可以结束当前线程*/if (pool->live_thr_num > pool->min_thr_num) {printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pool->live_thr_num--;pthread_mutex_unlock(&(pool->lock));pthread_exit(NULL);}}}/*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/if (pool->shutdown) {pthread_mutex_unlock(&(pool->lock));printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pthread_detach(pthread_self());pthread_exit(NULL); /* 线程自行结束 */}/*从任务队列里获取任务, 是一个出队操作*/task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; /* 出队,模拟环形队列 */pool->queue_size--;/*通知可以有新的任务添加进来*/pthread_cond_broadcast(&(pool->queue_not_full));/*任务取出后,立即将 线程池琐 释放*/pthread_mutex_unlock(&(pool->lock));/*执行任务*/ printf("thread 0x%x start working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter)); /*忙状态线程数变量琐*/pool->busy_thr_num++; /*忙状态线程数+1*/pthread_mutex_unlock(&(pool->thread_counter));(*(task.function))(task.arg); /*执行回调函数任务*///task.function(task.arg); /*执行回调函数任务*//*任务结束处理*/ printf("thread 0x%x end working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));pool->busy_thr_num--; /*处理掉一个任务,忙状态数线程数-1*/pthread_mutex_unlock(&(pool->thread_counter));}pthread_exit(NULL);
}/* 管理线程 */
void *adjust_thread(void *threadpool)
{int i;threadpool_t *pool = (threadpool_t *)threadpool;while (!pool->shutdown) {sleep(DEFAULT_TIME); /*定时 对线程池管理*/pthread_mutex_lock(&(pool->lock));int queue_size = pool->queue_size; /* 关注 任务数 */int live_thr_num = pool->live_thr_num; /* 存活 线程数 */pthread_mutex_unlock(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num; /* 忙着的线程数 */pthread_mutex_unlock(&(pool->thread_counter));/* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {pthread_mutex_lock(&(pool->lock)); int add = 0;/*一次增加 DEFAULT_THREAD 个线程*/for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY&& pool->live_thr_num < pool->max_thr_num; i++) {if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);add++;pool->live_thr_num++;}}pthread_mutex_unlock(&(pool->lock));}/* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) {/* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; /* 要销毁的线程数 设置为10 */pthread_mutex_unlock(&(pool->lock));for (i = 0; i < DEFAULT_THREAD_VARY; i++) {/* 通知处在空闲状态的线程, 他们会自行终止*/pthread_cond_signal(&(pool->queue_not_empty));}}}return NULL;
}int threadpool_destroy(threadpool_t *pool)
{int i;if (pool == NULL) {return -1;}pool->shutdown = true;/*先销毁管理线程*/pthread_join(pool->adjust_tid, NULL);for (i = 0; i < pool->live_thr_num; i++) {/*通知所有的空闲线程*/pthread_cond_broadcast(&(pool->queue_not_empty));}for (i = 0; i < pool->live_thr_num; i++) {pthread_join(pool->threads[i], NULL);}threadpool_free(pool);return 0;
}int threadpool_free(threadpool_t *pool)
{if (pool == NULL) {return -1;}if (pool->task_queue) {free(pool->task_queue);}if (pool->threads) {free(pool->threads);pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_empty));pthread_cond_destroy(&(pool->queue_not_full));}free(pool);pool = NULL;return 0;
}int threadpool_all_threadnum(threadpool_t *pool)
{int all_threadnum = -1; // 总线程数pthread_mutex_lock(&(pool->lock));all_threadnum = pool->live_thr_num; // 存活线程数pthread_mutex_unlock(&(pool->lock));return all_threadnum;
}int threadpool_busy_threadnum(threadpool_t *pool)
{int busy_threadnum = -1; // 忙线程数pthread_mutex_lock(&(pool->thread_counter));busy_threadnum = pool->busy_thr_num; pthread_mutex_unlock(&(pool->thread_counter));return busy_threadnum;
}int is_thread_alive(pthread_t tid)
{int kill_rc = pthread_kill(tid, 0); //发0号信号,测试线程是否存活if (kill_rc == ESRCH) {return false;}return true;
}/*测试*/ #if 1/* 线程池中的线程,模拟处理业务 */
void *process(void *arg)
{printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),(int)arg);sleep(1); //模拟 小---大写printf("task %d is end\n",(int)arg);return NULL;
}int main(void)
{/*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/threadpool_t *thp = threadpool_create(3,100,100); /*创建线程池,池里最小3个线程,最大100,队列最大100*/printf("pool inited");//int *num = (int *)malloc(sizeof(int)*20);int num[20], i;for (i = 0; i < 20; i++) {num[i] = i;printf("add task %d\n",i);/*int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg) */threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 */}sleep(10); /* 等子线程完成任务 */threadpool_destroy(thp);return 0;
}#endif
8.2 请问怎么实现线程池
参考回答:
- 设置一个生产者消费者队列,作为临界资源
- 初始化n个线程,并让其运行起来,加锁去队列取任务运行
- 当任务队列为空的时候,所有线程阻塞
- 当生产者队列来了一个任务后,先对队列加锁,把任务挂在到队列上,然后使用条件变量去通知阻塞中的一个线程