使用线程池
当进程被初始化后,主线程就被创建了。对于绝大多数的应用程序来说,通常仅要求有一个主线程,但也可以在进程内创建多个顺序执行流,这些顺序执行流就是线程,每一个线程都是独立的。
线程是进程的组成部分,一个进程可以拥有多个线程,一个线程必须有一个父进程。线程可以拥有自己的堆栈、自己的程序计数器和自己的局部变量,但不拥有系统资源,它与父进程的其他线程共享该进程所拥有的全部资源。因为多个线程共享父进程里的全部资源,因此编程更加方便;但必须更加小心,因为需要确保线程不会妨碍统一进程中的其他线程。
服务器创建和销毁工作线程的开销很大,如果服务器与很多客户端通信,并且与每个客户端通信的时间很短,那么就会在创建和销毁线程的时候造成很大的开销。
其次,活动的线程也消耗系统资源,如果线程的创建数量没有限制,当大量的客户连接服务器的时候,就会创建出大量的工作线程,他们会消耗大量的内存空间,导致系统的内存空间不足,影响服务器的使用。
最后,如果线程的数目固定,并且每个线程都有很长的生命周期,那么线程切换也就是固定的,这样就会给服务器减轻很多压力,但是如果频繁的创建和销毁线程,必将导致频繁的切换线程,使得线程之间的切换不再遵循系统的固定切换周期,线程切换的开销也会增大很多。
线程池为这些问题提供了解决方案。线程池中预先创建了一些工作线程,他们不断的从工作队列中取出任务,然后执行,当执行完之后,会继续执行工作队列中的下一个任务,减少了创建和销毁线程的次数,每个线程都可以一直被重用,避免了创建和销毁的开销。另外,可以根据系统的实际承载能力,方便的调节线程池中线程的数目,防止因为消耗过量的系统资源而导致系统崩溃的问题。
/** Copyright (c) 2013, Mathias Brossard <mathias@brossard.org>.* All rights reserved.** Redistribution and use in source and binary forms, with or without* modification, are permitted provided that the following conditions are* met:** 1. Redistributions of source code must retain the above copyright* notice, this list of conditions and the following disclaimer.** 2. Redistributions in binary form must reproduce the above copyright* notice, this list of conditions and the following disclaimer in the* documentation and/or other materials provided with the distribution.** THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT* HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.*/#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_#ifdef __cplusplus
/* 对于 C++ 编译器,指定用 C 的语法编译 */
extern "C" {
#endif/*** @file threadpool.h* @brief Threadpool Header File*//*** Increase this constants at your own risk* Large values might slow down your system*/
#define MAX_THREADS 64
#define MAX_QUEUE 65536/* 简化变量定义 */
typedef struct threadpool_t threadpool_t;/* 定义错误码 */
typedef enum {threadpool_invalid = -1,threadpool_lock_failure = -2,threadpool_queue_full = -3,threadpool_shutdown = -4,threadpool_thread_failure = -5
} threadpool_error_t;typedef enum {threadpool_graceful = 1
} threadpool_destroy_flags_t;/* 以下是线程池三个对外 API *//*** @function threadpool_create* @brief Creates a threadpool_t object.* @param thread_count Number of worker threads.* @param queue_size Size of the queue.* @param flags Unused parameter.* @return a newly created thread pool or NULL*/
/*** 创建线程池,有 thread_count 个线程,容纳 queue_size 个的任务队列,flags 参数没有使用*/
threadpool_t *threadpool_create(int thread_count, int queue_size, int flags);/*** @function threadpool_add* @brief add a new task in the queue of a thread pool* @param pool Thread pool to which add the task.* @param function Pointer to the function that will perform the task.* @param argument Argument to be passed to the function.* @param flags Unused parameter.* @return 0 if all goes well, negative values in case of error (@see* threadpool_error_t for codes).*/
/*** 添加任务到线程池, pool 为线程池指针,routine 为函数指针, arg 为函数参数, flags 未使用*/
int threadpool_add(threadpool_t *pool, void (*routine)(void *),void *arg, int flags);/*** @function threadpool_destroy* @brief Stops and destroys a thread pool.* @param pool Thread pool to destroy.* @param flags Flags for shutdown** Known values for flags are 0 (default) and threadpool_graceful in* which case the thread pool doesn't accept any new tasks but* processes all pending tasks before shutdown.*/
/*** 销毁线程池,flags 可以用来指定关闭的方式*/
int threadpool_destroy(threadpool_t *pool, int flags);#ifdef __cplusplus
}
#endif#endif /* _THREADPOOL_H_ */
/** Copyright (c) 2013, Mathias Brossard <mathias@brossard.org>.* All rights reserved.** Redistribution and use in source and binary forms, with or without* modification, are permitted provided that the following conditions are* met:** 1. Redistributions of source code must retain the above copyright* notice, this list of conditions and the following disclaimer.** 2. Redistributions in binary form must reproduce the above copyright* notice, this list of conditions and the following disclaimer in the* documentation and/or other materials provided with the distribution.** THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT* HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.*//*** @file threadpool.c* @brief Threadpool implementation file*/#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>#include "threadpool.h"/*** 线程池关闭的方式*/
typedef enum {immediate_shutdown = 1,graceful_shutdown = 2
} threadpool_shutdown_t;/*** @struct threadpool_task* @brief the work struct** @var function Pointer to the function that will perform the task.* @var argument Argument to be passed to the function.*/
/*** 线程池一个任务的定义*/typedef struct {void (*function)(void *);void *argument;
} threadpool_task_t;/*** @struct threadpool* @brief The threadpool struct** @var notify Condition variable to notify worker threads.* @var threads Array containing worker threads ID.* @var thread_count Number of threads* @var queue Array containing the task queue.* @var queue_size Size of the task queue.* @var head Index of the first element.* @var tail Index of the next element.* @var count Number of pending tasks* @var shutdown Flag indicating if the pool is shutting down* @var started Number of started threads*/
/*** 线程池的结构定义* @var lock 用于内部工作的互斥锁* @var notify 线程间通知的条件变量* @var threads 线程数组,这里用指针来表示,数组名 = 首元素指针* @var thread_count 线程数量* @var queue 存储任务的数组,即任务队列* @var queue_size 任务队列大小* @var head 任务队列中首个任务位置(注:任务队列中所有任务都是未开始运行的)* @var tail 任务队列中最后一个任务的下一个位置(注:队列以数组存储,head 和 tail 指示队列位置)* @var count 任务队列里的任务数量,即等待运行的任务数* @var shutdown 表示线程池是否关闭* @var started 开始的线程数*/
struct threadpool_t {pthread_mutex_t lock;pthread_cond_t notify;pthread_t *threads;threadpool_task_t *queue;int thread_count;int queue_size;int head;int tail;int count;int shutdown;int started;
};/*** @function void *threadpool_thread(void *threadpool)* @brief the worker thread* @param threadpool the pool which own the thread*/
/*** 线程池里每个线程在跑的函数* 声明 static 应该只为了使函数只在本文件内有效*/
static void *threadpool_thread(void *threadpool);int threadpool_free(threadpool_t *pool);threadpool_t *threadpool_create(int thread_count, int queue_size, int flags)
{if(thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE) {return NULL;}threadpool_t *pool;int i;/* 申请内存创建内存池对象 */if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {goto err;}/* Initialize */pool->thread_count = 0;pool->queue_size = queue_size;pool->head = pool->tail = pool->count = 0;pool->shutdown = pool->started = 0;/* Allocate thread and task queue *//* 申请线程数组和任务队列所需的内存 */pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count);pool->queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t) * queue_size);/* Initialize mutex and conditional variable first *//* 初始化互斥锁和条件变量 */if((pthread_mutex_init(&(pool->lock), NULL) != 0) ||(pthread_cond_init(&(pool->notify), NULL) != 0) ||(pool->threads == NULL) ||(pool->queue == NULL)) {goto err;}/* Start worker threads *//* 创建指定数量的线程开始运行 */for(i = 0; i < thread_count; i++) {if(pthread_create(&(pool->threads[i]), NULL,threadpool_thread, (void*)pool) != 0) {threadpool_destroy(pool, 0);return NULL;}pool->thread_count++;pool->started++;}return pool;err:if(pool) {threadpool_free(pool);}return NULL;
}int threadpool_add(threadpool_t *pool, void (*function)(void *),void *argument, int flags)
{int err = 0;int next;if(pool == NULL || function == NULL) {return threadpool_invalid;}/* 必须先取得互斥锁所有权 */if(pthread_mutex_lock(&(pool->lock)) != 0) {return threadpool_lock_failure;}/* 计算下一个可以存储 task 的位置 */next = pool->tail + 1;next = (next == pool->queue_size) ? 0 : next;do {/* Are we full ? *//* 检查是否任务队列满 */if(pool->count == pool->queue_size) {err = threadpool_queue_full;break;}/* Are we shutting down ? *//* 检查当前线程池状态是否关闭 */if(pool->shutdown) {err = threadpool_shutdown;break;}/* Add task to queue *//* 在 tail 的位置放置函数指针和参数,添加到任务队列 */pool->queue[pool->tail].function = function;pool->queue[pool->tail].argument = argument;/* 更新 tail 和 count */pool->tail = next;pool->count += 1;/* pthread_cond_broadcast *//** 发出 signal,表示有 task 被添加进来了* 如果由因为任务队列空阻塞的线程,此时会有一个被唤醒* 如果没有则什么都不做*/if(pthread_cond_signal(&(pool->notify)) != 0) {err = threadpool_lock_failure;break;}/** 这里用的是 do { ... } while(0) 结构* 保证过程最多被执行一次,但在中间方便因为异常而跳出执行块*/} while(0);/* 释放互斥锁资源 */if(pthread_mutex_unlock(&pool->lock) != 0) {err = threadpool_lock_failure;}return err;
}int threadpool_destroy(threadpool_t *pool, int flags)
{int i, err = 0;if(pool == NULL) {return threadpool_invalid;}/* 取得互斥锁资源 */if(pthread_mutex_lock(&(pool->lock)) != 0) {return threadpool_lock_failure;}do {/* Already shutting down *//* 判断是否已在其他地方关闭 */if(pool->shutdown) {err = threadpool_shutdown;break;}/* 获取指定的关闭方式 */pool->shutdown = (flags & threadpool_graceful) ?graceful_shutdown : immediate_shutdown;/* Wake up all worker threads *//* 唤醒所有因条件变量阻塞的线程,并释放互斥锁 */if((pthread_cond_broadcast(&(pool->notify)) != 0) ||(pthread_mutex_unlock(&(pool->lock)) != 0)) {err = threadpool_lock_failure;break;}/* Join all worker thread *//* 等待所有线程结束 */for(i = 0; i < pool->thread_count; i++) {if(pthread_join(pool->threads[i], NULL) != 0) {err = threadpool_thread_failure;}}/* 同样是 do{...} while(0) 结构*/} while(0);/* Only if everything went well do we deallocate the pool */if(!err) {/* 释放内存资源 */threadpool_free(pool);}return err;
}int threadpool_free(threadpool_t *pool)
{if(pool == NULL || pool->started > 0) {return -1;}/* Did we manage to allocate ? *//* 释放线程 任务队列 互斥锁 条件变量 线程池所占内存资源 */if(pool->threads) {free(pool->threads);free(pool->queue);/* Because we allocate pool->threads after initializing themutex and condition variable, we're sure they'reinitialized. Let's lock the mutex just in case. */pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock));pthread_cond_destroy(&(pool->notify));}free(pool);return 0;
}static void *threadpool_thread(void *threadpool)
{threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;for(;;) {/* Lock must be taken to wait on conditional variable *//* 取得互斥锁资源 */pthread_mutex_lock(&(pool->lock));/* Wait on condition variable, check for spurious wakeups.When returning from pthread_cond_wait(), we own the lock. *//* 用 while 是为了在唤醒时重新检查条件 */while((pool->count == 0) && (!pool->shutdown)) {/* 任务队列为空,且线程池没有关闭时阻塞在这里 */pthread_cond_wait(&(pool->notify), &(pool->lock));}/* 关闭的处理 */if((pool->shutdown == immediate_shutdown) ||((pool->shutdown == graceful_shutdown) &&(pool->count == 0))) {break;}/* Grab our task *//* 取得任务队列的第一个任务 */task.function = pool->queue[pool->head].function;task.argument = pool->queue[pool->head].argument;/* 更新 head 和 count */pool->head += 1;pool->head = (pool->head == pool->queue_size) ? 0 : pool->head;pool->count -= 1;/* Unlock *//* 释放互斥锁 */pthread_mutex_unlock(&(pool->lock));/* Get to work *//* 开始运行任务 */(*(task.function))(task.argument);/* 这里一个任务运行结束 */}/* 线程将结束,更新运行线程数 */pool->started--;pthread_mutex_unlock(&(pool->lock));pthread_exit(NULL);return(NULL);
}
#define THREAD 8
#define QUEUE 65535#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>#include "threadpool.h"int tasks = 0, done = 0;
pthread_mutex_t lock;void dummy_task(void *arg) {pthread_mutex_lock(&lock);/* 记录成功完成的任务数 */done++;pthread_mutex_unlock(&lock);usleep(done*1000000);printf("%d\n",done);
}int main(int argc, char **argv)
{threadpool_t *pool;/* 初始化互斥锁 */pthread_mutex_init(&lock, NULL);/* 断言线程池创建成功 */assert((pool = threadpool_create(THREAD, QUEUE, 0)) != NULL);fprintf(stderr, "Pool started with %d threads and ""queue size of %d\n", THREAD, QUEUE);/* 只要任务队列还没满,就一直添加 */while(threadpool_add(pool, &dummy_task, NULL, 0) == 0) {pthread_mutex_lock(&lock);tasks++;pthread_mutex_unlock(&lock);}fprintf(stderr, "Added %d tasks\n", tasks);/* 不断检查任务数是否完成一半以上,没有则继续休眠 */while((tasks / 2) > done) {usleep(10000);}/* 这时候销毁线程池,0 代表 immediate_shutdown */assert(threadpool_destroy(pool, 0) == 0);fprintf(stderr, "Did %d tasks\n", done);return 0;
}
可是真是如此吗
看到一个评论
游戏MMO服务器的核心指导思想: 简单,必要的性能考虑,稳定第一。
作为主程级别的开发人员,能熟悉TCP/IP原理是必须的,遇到问题能找到本质原因。
基于指导思想考虑怎么做:
要用多线程(性能),但代码绝对不能有锁(有锁的代码太复杂了);
从客户端到服务器的接入层,逻辑层和存储层,要有一套统一的处理方案(简单);
服务器节点间必须完全异步(这是必要的性能考虑), 但异步实现机制从程序员角度看和同步类似(简单考虑)。
网络通信功能实现比较容易,要把时间花整体服务器结构简单化上面。