目录
学习目标
1线程池
2.UDP通信
3本地socket通信
学习目标
- 了解线程池模型的设计思想
- 能看懂线程池实现源码
- 掌握tcp和udp的优缺点和使用场景
- 说出udp服务器通信流程
- 说出udp客户端通信流程
- 独立实现udp服务器代码
- 独立实现udp客户端代码
- 熟练掌握本地套接字进行本地进程通信
1线程池
什么是线程池?
是一个抽象的概念, 若干个线程组合到一起, 形成线程池.
为什么需要线程池?
多线程版服务器一个客户端就需要创建一个线程! 若客户端太多, 显然不太合适.
什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了。
实现的时候类似于生产者和消费者.
线程池和任务池:
任务池相当于共享资源, 所以需要使用互斥锁, 当任务池中没有任务的时候需要让线程阻塞, 所以需要使用条件变量.
如何让线程执行不同的任务?
使用回到函数, 在任务中设置任务执行函数, 这样可以起到不同的任务执行不同的函数.
通过阅读线程池代码思考如下问题?
- 熟悉结构体 threadpool_t
- 线程池如何创建起来? 各种初始化,malloc,pthread_create,pthread_cond_init pthread_mutex_init
- 线程池内都有几类线程? 2类:管理线程+工作线程
- 管理者线程的任务是什么?任务如何实现? 任务是添加线程或者删除线程,通过2个算法,删除线程 wait_exit_thr_num = 10
- 工作线程如何工作? 等待有任务,抢到任务,修改busy_thr_num ++ 执行任务 修改 busy_thr_num --
- 线程池是如何销毁的? 自爆shutdown 诱杀!
-
讲解代码threadpoolsimple.c
讲解代码 pthreadpool.c
threadsimplepool.h
#ifndef _THREADPOOL_H
#define _THREADPOOL_H#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>typedef struct _PoolTask
{int tasknum;//模拟任务编号void *arg;//回调函数参数void (*task_func)(void *arg);//任务的回调函数
}PoolTask ;typedef struct _ThreadPool
{int max_job_num;//最大任务个数int job_num;//实际任务个数PoolTask *tasks;//任务队列数组int job_push;//入队位置int job_pop;// 出队位置int thr_num;//线程池内线程个数pthread_t *threads;//线程池内线程数组int shutdown;//是否关闭线程池pthread_mutex_t pool_lock;//线程池的锁pthread_cond_t empty_task;//任务队列为空的条件pthread_cond_t not_empty_task;//任务队列不为空的条件}ThreadPool;void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum 代表线程个数,maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
void addtask(ThreadPool *pool);//添加任务到线程池
void taskRun(void *arg);//任务回调函数#endif
threadsimplepool.c
//简易版线程池
#include "threadpoolsimple.h"ThreadPool *thrPool = NULL;int beginnum = 1000;void *thrRun(void *arg)
{//printf("begin call %s-----\n",__FUNCTION__);ThreadPool *pool = (ThreadPool*)arg;int taskpos = 0;//任务位置PoolTask *task = (PoolTask *)malloc(sizeof(PoolTask));while(1){//获取任务,先要尝试加锁pthread_mutex_lock(&thrPool->pool_lock);//无任务并且线程池不是要摧毁while(thrPool->job_num <= 0 && !thrPool->shutdown ){//如果没有任务,线程会阻塞pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);}if(thrPool->job_num){//有任务需要处理taskpos = (thrPool->job_pop++)%thrPool->max_job_num;//printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());//为什么要拷贝?避免任务被修改,生产者会添加任务memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));task->arg = task;thrPool->job_num--;//task = &thrPool->tasks[taskpos];pthread_cond_signal(&thrPool->empty_task);//通知生产者}if(thrPool->shutdown){//代表要摧毁线程池,此时线程退出即可//pthread_detach(pthread_self());//临死前分家pthread_mutex_unlock(&thrPool->pool_lock);free(task);pthread_exit(NULL);}//释放锁pthread_mutex_unlock(&thrPool->pool_lock);task->task_func(task->arg);//执行回调函数}//printf("end call %s-----\n",__FUNCTION__);
}//创建线程池
void create_threadpool(int thrnum,int maxtasknum)
{printf("begin call %s-----\n",__FUNCTION__);thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));thrPool->thr_num = thrnum;thrPool->max_job_num = maxtasknum;thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁thrPool->job_push = 0;//任务队列添加的位置thrPool->job_pop = 0;//任务队列出队的位置thrPool->job_num = 0;//初始化的任务个数为0thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask)*maxtasknum));//申请最大的任务队列//初始化锁和条件变量pthread_mutex_init(&thrPool->pool_lock,NULL);pthread_cond_init(&thrPool->empty_task,NULL);pthread_cond_init(&thrPool->not_empty_task,NULL);int i = 0;thrPool->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);for(i = 0;i < thrnum;i++){pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//创建多个线程}//printf("end call %s-----\n",__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool *pool)
{pool->shutdown = 1;//开始自爆pthread_cond_broadcast(&pool->not_empty_task);//诱杀 int i = 0;for(i = 0; i < pool->thr_num ; i++){pthread_join(pool->threads[i],NULL);}pthread_cond_destroy(&pool->not_empty_task);pthread_cond_destroy(&pool->empty_task);pthread_mutex_destroy(&pool->pool_lock);free(pool->tasks);free(pool->threads);free(pool);
}//添加任务到线程池
void addtask(ThreadPool *pool)
{//printf("begin call %s-----\n",__FUNCTION__);pthread_mutex_lock(&pool->pool_lock);//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)while(pool->max_job_num <= pool->job_num){pthread_cond_wait(&pool->empty_task,&pool->pool_lock);}int taskpos = (pool->job_push++)%pool->max_job_num;//printf("add task %d tasknum===%d\n",taskpos,beginnum);pool->tasks[taskpos].tasknum = beginnum++;pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];pool->tasks[taskpos].task_func = taskRun;pool->job_num++;pthread_mutex_unlock(&pool->pool_lock);pthread_cond_signal(&pool->not_empty_task);//通知包身工//printf("end call %s-----\n",__FUNCTION__);
}//任务回调函数
void taskRun(void *arg)
{PoolTask *task = (PoolTask*)arg;int num = task->tasknum;printf("task %d is runing %lu\n",num,pthread_self());sleep(1);printf("task %d is done %lu\n",num,pthread_self());
}int main()
{create_threadpool(3,20);int i = 0;for(i = 0;i < 50 ; i++){addtask(thrPool);//模拟添加任务}sleep(20);destroy_threadpool(thrPool);return 0;
}
2.UDP通信
TCP:传输控制协议, 面向连接的,稳定的,可靠的,安全的数据流传递
稳定和可靠: 丢包重传
数据有序: 序号和确认序号
流量控制: 滑动窗口
UDP:用户数据报协议
面向无连接的,不稳定,不可靠,不安全的数据报传递---更像是收发短信
UDP传输不需要建立连接,传输效率更高,在稳定的局域网内环境相对可靠
UDP通信相关函数介绍:
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,struct sockaddr *src_addr, socklen_t *addrlen);
函数说明: 接收消息
参数说明:
- sockfd 套接字
- buf 要接受的缓冲区
- len 缓冲区的长度
- flags 标志位 一般填0
- src_addr 原地址 传出参数
- addrlen 发送方地址长度
- 返回值
成功: 返回读到的字节数
失败: 返回 -1 设置errno
调用该函数相当于TCP通信的recv+accept函数
ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,const struct sockaddr *dest_addr, socklen_t addrlen);
函数说明: 发送数据
参数说明:
- sockfd 套接字
- dest_addr 目的地址
- addrlen 目的地址长度
- 返回值
成功: 返回写入的字节数
失败: 返回-1,设置errno
通过man 2 bind, 可以查看bind函数的相关信息, 后面还有示例代码, 可以参考.
本地套接字服务器的流程:
- 可以使用TCP的方式, 必须按照tcp的流程
- 也可以使用UDP的方式, 必须按照udp的流程
tcp的本地套接字服务器流程:
- 创建套接字 socket(AF_UNIX,SOCK_STREAM,0)
- 绑定 struct sockaddr_un &强转
- 侦听 listen
- 获得新连接 accept
- 循环通信 read-write
- 关闭文件描述符 close
tcp本地套接字客户端流程:
- 调用socket创建套接字
- 调用bind函数将socket文件描述和socket文件进行绑定.
不是必须的, 若无显示绑定会进行隐式绑定,但服务器不知道谁连接了.
- 调用connect函数连接服务端
- 循环通信read-write
- 关闭文件描述符 close
编写代码并进行测试
编写udp代码并进行测试
测试:
多开器几个客户端经过测试表明:, udp天然支持多客户端, 这点和TCP不同, TCP需要维护连接.
//udp服务端
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>int main()
{//创建socketint cfd = socket(AF_INET, SOCK_DGRAM, 0);if(cfd<0){perror("socket error");return -1;}//绑定struct sockaddr_in serv;struct sockaddr_in client;bzero(&serv, sizeof(serv));serv.sin_family = AF_INET;serv.sin_port = htons(8888);serv.sin_addr.s_addr = htonl(INADDR_ANY);bind(cfd, (struct sockaddr *)&serv, sizeof(serv));int i;int n;socklen_t len;char buf[1024];while(1){//读取数据memset(buf, 0x00, sizeof(buf));len = sizeof(client);n = recvfrom(cfd, buf, sizeof(buf), 0, (struct sockaddr *)&client, &len);//将大写转换为小写for(i=0; i<n; i++){buf[i] = toupper(buf[i]);}printf("[%d]:n==[%d], buf==[%s]\n", ntohs(client.sin_port), n, buf);//发送数据sendto(cfd, buf, n, 0, (struct sockaddr *)&client, len);}//关闭套接字close(cfd);return 0;
}
使用nc命令进行测试: nc -u 127.1 8888
//udp客户端
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>int main()
{//创建socketint cfd = socket(AF_INET, SOCK_DGRAM, 0);if(cfd<0){perror("socket error");return -1;}int n;char buf[1024];struct sockaddr_in serv;serv.sin_family = AF_INET;serv.sin_port = htons(8888);inet_pton(AF_INET, "127.0.0.1", &serv.sin_addr.s_addr);while(1){//读标准输入数据memset(buf, 0x00, sizeof(buf));n = read(STDIN_FILENO, buf, sizeof(buf));//发送数据sendto(cfd, buf, n, 0, (struct sockaddr *)&serv, sizeof(serv));//读取数据memset(buf, 0x00, sizeof(buf));n = recvfrom(cfd, buf, sizeof(buf), 0, NULL, NULL);printf("n==[%d], buf==[%s]\n", n, buf);}//关闭套接字close(cfd);return 0;
}
3本地socket通信
回顾一些linux系统有哪些文件类型?
回顾一些linux系统下有哪些常见的IPC机制?
通过查询: man 7 unix 可以查到unix本地域socket通信相关信息:
#include <sys/socket.h>
#include <sys/un.h>
int socket(int domain, int type, int protocol);
函数说明: 创建本地域socket
函数参数:
domain: AF_UNIX or AF_LOCAL
type: SOCK_STREAM或者SOCK_DGRAM
protocol: 0 表示使用默认协议
函数返回值:
成功: 返回文件描述符.
失败: 返回-1, 并设置errno值.
创建socket成功以后, 会在内核创建缓冲区, 下图是客户端和服务端内核缓冲区示意图.
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
函数说明: 绑定套接字
函数参数:
socket: 由socket函数返回的文件描述符
addr: 本地地址
addlen: 本地地址长度
函数返回值:
成功: 返回文件描述符.
失败: 返回-1, 并设置errno值.
需要注意的是: bind函数会自动创建socket文件, 若在调用bind函数之前socket文件已经存在, 则调用bind会报错, 可以使用unlink函数在bind之前先删除文件.
struct sockaddr_un {
sa_family_t sun_family; /* AF_UNIX or AF_LOCAL*/
char sun_path[108]; /* pathname */
};
通过man 2 bind, 可以查看bind函数的相关信息, 后面还有示例代码, 可以参考.
本地套接字服务器的流程:
- 可以使用TCP的方式, 必须按照tcp的流程
- 也可以使用UDP的方式, 必须按照udp的流程
tcp的本地套接字服务器流程:
- 创建套接字 socket(AF_UNIX,SOCK_STREAM,0)
- 绑定 struct sockaddr_un &强转
- 侦听 listen
- 获得新连接 accept
- 循环通信 read-write
- 关闭文件描述符 close
tcp本地套接字客户端流程:
- 调用socket创建套接字
- 调用bind函数将socket文件描述和socket文件进行绑定.
不是必须的, 若无显示绑定会进行隐式绑定,但服务器不知道谁连接了.
- 调用connect函数连接服务端
- 循环通信read-write
- 关闭文件描述符 close
编写代码并进行测试
测试客户端工具:
man nc
-U Specifies to use UNIX-domain sockets.
例如: nc -U sock.s
size = offsetof(struct sockaddr_un, sun_path) +strlen(un.sun_path);
#define offsetof(type, member) ((int)&((type *)0)->member)
//本地socket通信服务端
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/un.h>int main()
{//创建socketint lfd = socket(AF_UNIX, SOCK_STREAM, 0);if(lfd<0){perror("socket error");return -1;}//删除socket文件,避免bind失败unlink("./server.sock");//绑定bindstruct sockaddr_un serv;bzero(&serv, sizeof(serv));serv.sun_family = AF_UNIX;strcpy(serv.sun_path, "./server.sock"); int ret = bind(lfd, (struct sockaddr *)&serv, sizeof(serv));if(ret<0){perror("bind error");return -1;}//监听listenlisten(lfd, 10);//接收新的连接-acceptstruct sockaddr_un client;bzero(&client, sizeof(client));int len = sizeof(client);int cfd = accept(lfd, (struct sockaddr *)&client, &len);if(cfd<0){perror("accept error"); return -1;}printf("client->[%s]\n", client.sun_path);int n;char buf[1024];while(1){//读数据memset(buf, 0x00, sizeof(buf)); n = read(cfd, buf, sizeof(buf));if(n<=0){printf("read error or client close, n==[%d]\n", n);break;}printf("n==[%d], buf==[%s]\n", n, buf);//发送数据write(cfd, buf, n);}close(lfd);return 0;
}
//本地socket通信客户端
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/un.h>int main()
{//创建socketint cfd = socket(AF_UNIX, SOCK_STREAM, 0);if(cfd<0){perror("socket error");return -1;}//删除socket文件,避免bind失败unlink("./client.sock");//绑定bindstruct sockaddr_un client;bzero(&client, sizeof(client));client.sun_family = AF_UNIX;strcpy(client.sun_path, "./client.sock"); int ret = bind(cfd, (struct sockaddr *)&client, sizeof(client));if(ret<0){perror("bind error");return -1;}struct sockaddr_un serv;bzero(&serv, sizeof(serv));serv.sun_family = AF_UNIX;strcpy(serv.sun_path, "./server.sock");ret = connect(cfd, (struct sockaddr *)&serv, sizeof(serv));if(ret<0){perror("connect error"); return -1;}int n;char buf[1024];while(1){memset(buf, 0x00, sizeof(buf));n = read(STDIN_FILENO, buf, sizeof(buf));//发送数据write(cfd, buf, n);//读数据memset(buf, 0x00, sizeof(buf)); n = read(cfd, buf, sizeof(buf));if(n<=0){printf("read error or client close, n==[%d]\n", n);break;}printf("n==[%d], buf==[%s]\n", n, buf);}close(cfd);return 0;
}
测试客户端工具:
man nc -U Specifies to use UNIX-domain sockets.
例如: nc -U sock.s
size = offsetof(struct sockaddr_un, sun_path) +strlen(un.sun_path);
#define offsetof(type, member) ((int)&((type *)0)->member)
//线程池完整版本
#ifndef __THREADPOOL_H_
#define __THREADPOOL_H_typedef struct threadpool_t threadpool_t;/*** @function threadpool_create* @descCreates a threadpool_t object.* @param thr_num thread num* @param max_thr_num max thread size* @param queue_max_size size of the queue.* @return a newly created thread pool or NULL*/
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);/*** @function threadpool_add* @desc add a new task in the queue of a thread pool* @param pool Thread pool to which add the task.* @param function Pointer to the function that will perform the task.* @param argument Argument to be passed to the function.* @return 0 if all goes well,else -1*/
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);/*** @function threadpool_destroy* @desc Stops and destroys a thread pool.* @param pool Thread pool to destroy.* @return 0 if destory success else -1*/
int threadpool_destroy(threadpool_t *pool);/*** @desc get the thread num* @pool pool threadpool* @return # of the thread*/
int threadpool_all_threadnum(threadpool_t *pool);/*** desc get the busy thread num* @param pool threadpool* return # of the busy thread*/
int threadpool_busy_threadnum(threadpool_t *pool);#endif
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include "threadpool.h"#define DEFAULT_TIME 10 /*10s检测一次*/
#define MIN_WAIT_TASK_NUM 10 /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/
#define DEFAULT_THREAD_VARY 10 /*每次创建和销毁线程的个数*/
#define true 1
#define false 0typedef struct
{void *(*function)(void *); /* 函数指针,回调函数 */void *arg; /* 上面函数的参数 */
} threadpool_task_t; /* 各子线程任务结构体 *//* 描述线程池相关信息 */
struct threadpool_t
{pthread_mutex_t lock; /* 用于锁住本结构体 */ pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */pthread_t adjust_tid; /* 存管理线程tid */threadpool_task_t *task_queue; /* 任务队列(数组首地址) */int min_thr_num; /* 线程池最小线程数 */int max_thr_num; /* 线程池最大线程数 */int live_thr_num; /* 当前存活线程个数 */int busy_thr_num; /* 忙状态线程个数 */int wait_exit_thr_num; /* 要销毁的线程个数 */int queue_front; /* task_queue队头下标 */int queue_rear; /* task_queue队尾下标 */int queue_size; /* task_queue队中实际任务数 */int queue_max_size; /* task_queue队列可容纳任务数上限 */int shutdown; /* 标志位,线程池使用状态,true或false */
};void *threadpool_thread(void *threadpool);void *adjust_thread(void *threadpool);int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);//threadpool_create(3,100,100);
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{int i;threadpool_t *pool = NULL;do {if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { printf("malloc threadpool fail");break; /*跳出do while*/}pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num; /* 活着的线程数 初值=最小线程数 */pool->wait_exit_thr_num = 0;pool->queue_size = 0; /* 有0个产品 */pool->queue_max_size = queue_max_size;pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false; /* 不关闭线程池 *//* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool->threads == NULL) {printf("malloc threads fail");break;}memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);/* 队列开辟空间 */pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);if (pool->task_queue == NULL) {printf("malloc task_queue fail\n");break;}/* 初始化互斥琐、条件变量 */if (pthread_mutex_init(&(pool->lock), NULL) != 0|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0){printf("init the lock or cond fail\n");break;}//启动工作线程pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);for (i = 0; i < min_thr_num; i++) {pthread_create(&(pool->threads[i]), &attr, threadpool_thread, (void *)pool);/*pool指向当前线程池*/printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);}//创建管理者线程pthread_create(&(pool->adjust_tid), &attr, adjust_thread, (void *)pool);return pool;} while (0);/* 前面代码调用失败时,释放poll存储空间 */threadpool_free(pool);return NULL;
}/* 向线程池中 添加一个任务 */
//threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 process: 小写---->大写*/int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{pthread_mutex_lock(&(pool->lock));/* ==为真,队列已经满, 调wait阻塞 */while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));}if (pool->shutdown) {pthread_cond_broadcast(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;}/* 清空 工作线程 调用的回调函数 的参数arg */if (pool->task_queue[pool->queue_rear].arg != NULL) {pool->task_queue[pool->queue_rear].arg = NULL;}/*添加任务到任务队列里*/pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 队尾指针移动, 模拟环形 */pool->queue_size++;/*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;
}/* 线程池中各个工作线程 */
void *threadpool_thread(void *threadpool)
{threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;while (true) {/* Lock must be taken to wait on conditional variable *//*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/pthread_mutex_lock(&(pool->lock));/*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/while ((pool->queue_size == 0) && (!pool->shutdown)) { printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));//暂停到这/*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/if (pool->wait_exit_thr_num > 0) {pool->wait_exit_thr_num--;/*如果线程池里线程个数大于最小值时可以结束当前线程*/if (pool->live_thr_num > pool->min_thr_num) {printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());pool->live_thr_num--;pthread_mutex_unlock(&(pool->lock));//pthread_detach(pthread_self());pthread_exit(NULL);}}}/*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/if (pool->shutdown) {pthread_mutex_unlock(&(pool->lock));printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());//pthread_detach(pthread_self());pthread_exit(NULL); /* 线程自行结束 */}/*从任务队列里获取任务, 是一个出队操作*/task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; /* 出队,模拟环形队列 */pool->queue_size--;/*通知可以有新的任务添加进来*/pthread_cond_broadcast(&(pool->queue_not_full));/*任务取出后,立即将 线程池琐 释放*/pthread_mutex_unlock(&(pool->lock));/*执行任务*/ printf("thread 0x%x start working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter)); /*忙状态线程数变量琐*/pool->busy_thr_num++; /*忙状态线程数+1*/pthread_mutex_unlock(&(pool->thread_counter));(*(task.function))(task.arg); /*执行回调函数任务*///task.function(task.arg); /*执行回调函数任务*//*任务结束处理*/ printf("thread 0x%x end working\n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));pool->busy_thr_num--; /*处理掉一个任务,忙状态数线程数-1*/pthread_mutex_unlock(&(pool->thread_counter));}pthread_exit(NULL);
}/* 管理线程 */
void *adjust_thread(void *threadpool)
{int i;threadpool_t *pool = (threadpool_t *)threadpool;while (!pool->shutdown) {sleep(DEFAULT_TIME); /*定时 对线程池管理*/pthread_mutex_lock(&(pool->lock));int queue_size = pool->queue_size; /* 关注 任务数 */int live_thr_num = pool->live_thr_num; /* 存活 线程数 */pthread_mutex_unlock(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num; /* 忙着的线程数 */pthread_mutex_unlock(&(pool->thread_counter));/* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {pthread_mutex_lock(&(pool->lock)); int add = 0;/*一次增加 DEFAULT_THREAD 个线程*/for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY&& pool->live_thr_num < pool->max_thr_num; i++) {if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);add++;pool->live_thr_num++;}}pthread_mutex_unlock(&(pool->lock));}/* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) {/* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; /* 要销毁的线程数 设置为10 */pthread_mutex_unlock(&(pool->lock));for (i = 0; i < DEFAULT_THREAD_VARY; i++) {/* 通知处在空闲状态的线程, 他们会自行终止*/pthread_cond_signal(&(pool->queue_not_empty));}}}return NULL;
}int threadpool_destroy(threadpool_t *pool)
{int i;if (pool == NULL) {return -1;}pool->shutdown = true;/*先销毁管理线程*///pthread_join(pool->adjust_tid, NULL);for (i = 0; i < pool->live_thr_num; i++) {/*通知所有的空闲线程*/pthread_cond_broadcast(&(pool->queue_not_empty));}/*for (i = 0; i < pool->live_thr_num; i++) {pthread_join(pool->threads[i], NULL);}*/threadpool_free(pool);return 0;
}int threadpool_free(threadpool_t *pool)
{if (pool == NULL) {return -1;}if (pool->task_queue) {free(pool->task_queue);}if (pool->threads) {free(pool->threads);pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_empty));pthread_cond_destroy(&(pool->queue_not_full));}free(pool);pool = NULL;return 0;
}int threadpool_all_threadnum(threadpool_t *pool)
{int all_threadnum = -1;pthread_mutex_lock(&(pool->lock));all_threadnum = pool->live_thr_num;pthread_mutex_unlock(&(pool->lock));return all_threadnum;
}int threadpool_busy_threadnum(threadpool_t *pool)
{int busy_threadnum = -1;pthread_mutex_lock(&(pool->thread_counter));busy_threadnum = pool->busy_thr_num;pthread_mutex_unlock(&(pool->thread_counter));return busy_threadnum;
}int is_thread_alive(pthread_t tid)
{int kill_rc = pthread_kill(tid, 0); //发0号信号,测试线程是否存活if (kill_rc == ESRCH) {return false;}return true;
}/*测试*/ #if 1
/* 线程池中的线程,模拟处理业务 */
void *process(void *arg)
{printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),*(int *)arg);sleep(1);printf("task %d is end\n", *(int *)arg);return NULL;
}int main(void)
{/*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/threadpool_t *thp = threadpool_create(3,100,100); /*创建线程池,池里最小3个线程,最大100,队列最大100*/printf("pool inited");//int *num = (int *)malloc(sizeof(int)*20);int num[20], i;for (i = 0; i < 20; i++) {num[i]=i;printf("add task %d\n",i);threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 */}sleep(10); /* 等子线程完成任务 */threadpool_destroy(thp);return 0;
}#endif