目录
前言
可伸缩线程池原理
可伸缩线程池实现
完整程序
前言
本篇可伸缩线程池的实现是在静态线程池上拓展而来,对于静态线程池的实现,请参考:
线程池的实现全过程v1.0版本(手把手创建,看完必掌握!!!)_竹烟淮雨的博客-CSDN博客
可伸缩线程池原理
对于静态线程池,我们额外引入一个管理线程专门来管理线程池。当线程池中的线程数量不足够匹配任务队列里的任务量,就增加线程;如果线程数量超出了任务队列的任务量,就减少线程。
具体可伸缩的条件由我们自己来规定,比如说待处理的任务数 > 正在工作的线程数*2,就增加线程;当前任务数*2 < 正在工作的线程数,就关闭一些线程。
可伸缩线程池实现
对之前V1.0版本的静态线程池进行修改,添加了一个名为management_thread的管理线程,该线程会根据任务队列的任务数量动态管理线程池中的线程数量。管理线程会在每次循环中根据一定的条件增加或减少线程数量,然后休眠一段时间后继续检查。这样就能根据任务负载自动调整线程池中的线程数量。
修改部分如下:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>struct job {void *(*func)(void *arg);void *arg;struct job *next;
};struct threadpool {int thread_num;pthread_t *pthread_ids;struct job *head;struct job *tail;int queue_max_num;int queue_cur_num;pthread_mutex_t mutex;pthread_cond_t queue_empty;pthread_cond_t queue_not_empty;pthread_cond_t queue_not_full;pthread_t management_thread; // 新增的管理线程
};void *threadpool_function(void *arg) {// 线程池工作线程的代码不变
}void *management_thread_function(void *arg) {struct threadpool *pool = (struct threadpool *)arg;while (1) {pthread_mutex_lock(&(pool->mutex));int active_threads = pool->queue_cur_num;int task_queue_size = pool->queue_cur_num;// 根据条件动态管理线程数量if (task_queue_size > 2 * active_threads && pool->thread_num < pool->queue_max_num) {int threads_to_add = (task_queue_size - active_threads + 1) / 2;for (int i = 0; i < threads_to_add; i++) {if (pool->thread_num < pool->queue_max_num) {pthread_create(&(pool->pthread_ids[pool->thread_num]), NULL, threadpool_function, (void *)pool);pool->thread_num++;}}} else if (2 * task_queue_size < active_threads && pool->thread_num > 1) {int threads_to_remove = (active_threads - 2 * task_queue_size + 1) / 2;for (int i = 0; i < threads_to_remove; i++) {if (pool->thread_num > 1) {pool->thread_num--;pthread_cancel(pool->pthread_ids[pool->thread_num]);pthread_join(pool->pthread_ids[pool->thread_num], NULL);}}}pthread_mutex_unlock(&(pool->mutex));sleep(1); // 等待一段时间后重新检查线程数量}
}struct threadpool *threadpool_init(int thread_num, int queue_max_num) {struct threadpool *pool = (struct threadpool *)malloc(sizeof(struct threadpool));// 初始化部分不变pthread_create(&(pool->management_thread), NULL, management_thread_function, (void *)pool);return pool;
}// 其他函数不变int main(int argc, char const *argv[]) {// 主函数部分不变
}
完整程序
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
struct job
{void *(*func)(void *arg);void *arg;struct job *next;
};struct threadpool
{int thread_num; //已开启的线程数量pthread_t *pthread_ids; //保存线程池中线程的idstruct job *head; //任务队列的头struct job *tail; //任务队列的尾int queue_max_num; //任务队列的最大数int queue_cur_num; //任务队列已有多少个任务pthread_mutex_t mutex;pthread_cond_t queue_empty; //控制任务队列为空的条件pthread_cond_t queue_not_empty; //控制任务队列不为空的条件pthread_cond_t queue_not_full; //控制任务队列不为满的条件pthread_t management_thread; // 管理线程
};void *threadpool_function(void *arg)
{struct threadpool *pool = (struct threadpool *)arg;struct job *pjob = NULL;while (1){pthread_mutex_lock(&(pool->mutex));while (pool->queue_cur_num == 0){pthread_cond_wait(&(pool->queue_not_empty), &(pool->mutex));}pjob = pool->head;pool->queue_cur_num--;//对任务队列满队条件变量解阻塞if (pool->queue_cur_num != pool->queue_max_num){pthread_cond_broadcast(&(pool->queue_not_full));}if (pool->queue_cur_num == 0){pool->head = pool->tail = NULL;pthread_cond_broadcast(&(pool->queue_empty));}else{pool->head = pool->head->next;}pthread_mutex_unlock(&(pool->mutex));(*(pjob->func))(pjob->arg);free(pjob);pjob = NULL;}
}void *management_thread_function(void *arg)
{struct threadpool *pool = (struct threadpool *)arg;while (1){pthread_mutex_lock(&(pool->mutex));int active_threads = pool->queue_cur_num; // 当前正在工作的线程数量int task_queue_size = pool->queue_cur_num; // 当前任务队列中的任务数量// 根据条件动态管理线程数量if (task_queue_size > 2 * active_threads && pool->thread_num < pool->queue_max_num){int threads_to_add = (task_queue_size - active_threads + 1) / 2;for (int i = 0; i < threads_to_add; i++){if (pool->thread_num < pool->queue_max_num){pthread_create(&(pool->pthread_ids[pool->thread_num]), NULL, threadpool_function, (void *)pool);pool->thread_num++;}}}else if (2 * task_queue_size < active_threads && pool->thread_num > 1){int threads_to_remove = (active_threads - 2 * task_queue_size + 1) / 2;for (int i = 0; i < threads_to_remove; i++){if (pool->thread_num > 1){pool->thread_num--;pthread_cancel(pool->pthread_ids[pool->thread_num]);pthread_join(pool->pthread_ids[pool->thread_num], NULL);}}}pthread_mutex_unlock(&(pool->mutex));sleep(1); // 等待一段时间后重新检查线程数量}
}struct threadpool *threadpool_init(int thread_num, int queue_max_num)
{struct threadpool *pool = (struct threadpool *)malloc(sizeof(struct threadpool));// mallocpool->queue_max_num = queue_max_num;pool->queue_cur_num = 0;pool->head = NULL;pool->tail = NULL;pthread_mutex_init(&(pool->mutex), NULL);pthread_cond_init(&(pool->queue_empty), NULL);pthread_cond_init(&(pool->queue_not_empty), NULL);pthread_cond_init(&(pool->queue_not_full), NULL);pool->thread_num = thread_num;pool->pthread_ids = malloc(sizeof(pthread_t) * thread_num);// mallocfor (int i = 0; i < pool->thread_num; i++){pthread_create(&(pool->pthread_ids[i]), NULL, threadpool_function, (void *)pool);}pthread_create(&(pool->management_thread), NULL, management_thread_function, (void *)pool);return pool;
}void threadpool_add_job(struct threadpool *pool, void *(*func)(void *arg), void *arg)
{pthread_mutex_lock(&(pool->mutex));//如果任务队列已满while (pool->queue_cur_num == pool->queue_max_num){pthread_cond_wait(&(pool->queue_not_full), &(pool->mutex));}struct job *pjob = (struct job *)malloc(sizeof(struct job));pjob->func = func;pjob->arg = arg;if (pool->head == NULL){pool->head = pool->tail = pjob;pthread_cond_broadcast(&(pool->queue_not_empty));}else{pool->tail->next = pjob;pool->tail = pjob;}pool->queue_cur_num++;pthread_mutex_unlock(&(pool->mutex));
}void thread_destroy(struct threadpool *pool)
{pthread_mutex_lock(&(pool->mutex));while (pool->queue_cur_num != 0){pthread_cond_wait(&(pool->queue_empty), &(pool->mutex));}pthread_mutex_unlock(&(pool->mutex));//通知所有阻塞的线程pthread_cond_broadcast(&(pool->queue_not_empty));pthread_cond_broadcast(&(pool->queue_not_full)); //可不要for (int i = 0; i < pool->thread_num; i++){printf("thread exit!\n");pthread_cancel(pool->pthread_ids[i]);pthread_join(pool->pthread_ids[i], NULL);}pthread_mutex_destroy(&(pool->mutex));pthread_cond_destroy(&(pool->queue_empty));pthread_cond_destroy(&(pool->queue_not_empty));pthread_cond_destroy(&(pool->queue_not_full));free(pool->pthread_ids);//为了以防万一,任务队列不为空,要对所有任务进行销毁struct job *temp;while (pool->head != NULL){temp = pool->head;pool->head = temp->next;free(temp);}free(pool);
}void *work(void *arg)
{char *p = (char *)arg;printf("hello world! %s\n", p);printf("welcome to Nanjing! %s\n", p);sleep(1);
}int main(int argc, char const *argv[])
{struct threadpool *pool = threadpool_init(10, 100);threadpool_add_job(pool, work, "1");threadpool_add_job(pool, work, "2");threadpool_add_job(pool, work, "3");threadpool_add_job(pool, work, "4");threadpool_add_job(pool, work, "5");threadpool_add_job(pool, work, "6");threadpool_add_job(pool, work, "7");threadpool_add_job(pool, work, "8");threadpool_add_job(pool, work, "9");threadpool_add_job(pool, work, "10");threadpool_add_job(pool, work, "11");threadpool_add_job(pool, work, "12");threadpool_add_job(pool, work, "13");threadpool_add_job(pool, work, "14");threadpool_add_job(pool, work, "15");threadpool_add_job(pool, work, "16");threadpool_add_job(pool, work, "17");threadpool_add_job(pool, work, "18");threadpool_add_job(pool, work, "19");threadpool_add_job(pool, work, "20");thread_destroy(pool);sleep(5);return 0;
}