创建一个线程池是提高多线程应用程序性能的有效方法。一个线程池中包含一定数量的工作线程,这些线程可以复用来处理多个任务,避免了频繁创建和销毁线程所带来的开销。
下面是一个基础的线程池实现的框架,使用C语言和POSIX线程库(pthread)。这个简单的线程池会轮询并执行放入队列的任务。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>#define MAX_THREADS 4
#define MAX_QUEUE 8// 任务结构体
typedef struct {void (*function)(void *); // 任务的回调函数void *argument; // 回调函数的参数
} threadpool_task_t;// 线程池结构体
typedef struct {pthread_mutex_t lock; // 互斥锁pthread_cond_t notify; // 条件变量pthread_t *threads; // 线程数组threadpool_task_t *queue; // 任务队列int thread_count; int queue_size; int head; int tail; int count; // 当前任务数量int shutdown; // 是否关闭线程池int started; // 已启动的线程数量
} threadpool_t;// 线程工作函数
void *threadpool_thread(void *threadpool);// 创建线程池
threadpool_t *threadpool_create(int thread_count, int queue_size);// 向线程池中添加任务
int threadpool_add(threadpool_t *pool, void (*function)(void *), void *argument);// 销毁线程池
int threadpool_destroy(threadpool_t *pool, int flags);int main() {threadpool_t *pool = threadpool_create(MAX_THREADS, MAX_QUEUE);// 添加一些示例任务for (int i = 0; i < 10; i++) {threadpool_add(pool, &my_task, (void *)(long)i);}sleep(3); // 等待任务完成threadpool_destroy(pool, 0);return 0;
}// 任务示例
void my_task(void *arg) {printf("Thread %ld working on task %ld\n", pthread_self(), (long)arg);
}threadpool_t *threadpool_create(int thread_count, int queue_size) {threadpool_t *pool = (threadpool_t *)malloc(sizeof(threadpool_t));if (pool == NULL) {return NULL;}pool->thread_count = 0;pool->queue_size = queue_size;pool->head = pool->tail = pool->count = 0;pool->shutdown = pool->started = 0;pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count);pool->queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t) * queue_size);pthread_mutex_init(&(pool->lock), NULL);pthread_cond_init(&(pool->notify), NULL);for (int i = 0; i < thread_count; i++) {if (pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool) != 0) {threadpool_destroy(pool, 0);return NULL;}pool->thread_count++;pool->started++;}return pool;
}int threadpool_add(threadpool_t *pool, void (*function)(void *), void *argument) {int next, err = 0;if (pthread_mutex_lock(&(pool->lock)) != 0) {return -1;}next = (pool->tail + 1) % pool->queue_size;do {if (pool->count == pool->queue_size) {err = 1;break;}pool->queue[pool->tail].function = function;pool->queue[pool->tail].argument = argument;pool->tail = next;pool->count += 1;if (pthread_cond_signal(&(pool->notify)) != 0) {err = 1;break;}} while (0);if (pthread_mutex_unlock(&(pool->lock)) != 0) {err = 1;}return err;
}int threadpool_destroy(threadpool_t *pool, int flags) {if (pool == NULL) {return -1;}if (pthread_mutex_lock(&(pool->lock)) != 0) {return -1;}pool->shutdown = 1;if ((pthread_cond_broadcast(&(pool->notify)) != 0) || (pthread_mutex_unlock(&(pool->lock)) != 0)) {return -1;}for (int i = 0; i < pool->thread_count; i++) {if (pthread_join(pool->threads[i], NULL) != 0) {return -1;}}if (pthread_mutex_destroy(&(pool->lock)) != 0 || pthread_cond_destroy(&(pool->notify)) != 0) {return -1;}free(pool->threads);free(pool->queue);free(pool);return 0;
}void *threadpool_thread(void *threadpool) {threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;while (1) {pthread_mutex_lock(&(pool->lock));while ((pool->count == 0) && (!pool->shutdown)) {pthread_cond_wait(&(pool->notify), &(pool->lock));}if ((pool->shutdown) && (pool->count == 0)) {break;}task.function = pool->queue[pool->head].function;task.argument = pool->queue[pool->head].argument;pool->head = (pool->head + 1) % pool->queue_size;pool->count -= 1;pthread_mutex_unlock(&(pool->lock));(*(task.function))(task.argument);}pool->started--;pthread_mutex_unlock(&(pool->lock));pthread_exit(NULL);return NULL;
}
代码说明
-
结构体定义:
threadpool_task_t
: 表示一个任务,包括任务的回调函数和参数。threadpool_t
: 表示线程池,包括线程数组、任务队列、队列头尾索引、互斥锁、条件变量等。
-
线程池工作函数
threadpool_thread
:- 线程从任务队列中获取任务并执行。
- 在任务队列为空时,该线程将会阻塞等待新的任务。
-
线程池创建函数
threadpool_create
:- 初始化线程池的结构体,各种初始值和资源分配。
- 创建指定数量的线程,并将它们与
threadpool_thread
函数关联起来。
-
添加任务函数
threadpool_add
:- 将任务放入队列,并通知一个阻塞在条件变量上的线程去执行这个任务。
-
销毁线程池函数
threadpool_destroy
:- 关闭线程池,等待所有线程终止,并销毁分配的资源。
-
示例任务
my_task
:- 简单输出线程ID和任务ID示例。
这个基础代码可以满足大部分简单的线程池需求,但在实际应用中,你可能还会需要考虑:
- 线程池中的异常处理。
- 动态调整线程池大小。
- 更复杂的任务调度和优先级。
- 更高效的任务队列实现。
这个基础代码是一个很好的起点,你可以在其基础上根据具体需求做进一步优化和扩展。