线程池
概念:
一堆线程+任务队列
作用
- 避免大量线程频繁的创建/销毁时间成本
- 避免瞬间大量线程创建耗尽资源,程序崩溃危险
实现
创建固定数量的线程+创建一个线程安全的任务队列
一种线程使用模式。
- 线程过多会带来调度开销,进而影响缓存局部性和整体性能。
- 而线程池维护着多个 线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。
- 线程池不 仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内 存、网络sockets等的数量
线程池应用场景
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大 多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- **接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。**突发性大量客户请求,在没 有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程 可能使内存到达极限,出现错误
线程池的分类
- FixThreadPool------固定线程池
- CachedThreadPool-----缓存线程池
- ScheduledThreadPool—调度线程池
- SingleThreadPool-----单任务线程池
特点介绍
FixedThreadPool
- 通过Exector的newFixedThreadPool静态方法来创建
- 线程数量固定的线程池
- 只有核心线程切并且不会被回收
- 当所有线程都处于活动状态时,新任务都会处于等待状态,直到有线程空闲出来
CachedThreadPool
- 通过Exector的newCachedThreadPool静态静态方法来创建
- 线程数量不定的线程池
- 只有非核心线程,最大线程数量为Integer.MAX_VALUE,可视为任意大
- 有超时机制,时长为60s,即超过60s的空闲线程就会被回收
- 当线程池中的线程都处于活动状态时,线程池会创建新的线程来处理新任务,否则就会利用空闲的线程来处理新任务。因此任何任务都会被立即执行
- 该线程池比较适合执行大量耗时较少的任务
ScheduledThreadPool
- 通过Exector的newScheduledThreadPool静态方法来创建
- 核心线程数量是固定的,而非核心线程数不固定的,并且非核心线程有超时机制,只要处于闲置状态就会被立即回收
- 该线程池主要用于执行定时任务和具有固定周期的重复任务
SingleThreadPool
- 通过Exector的newSingleThreadPool静态方法来创建
- 只有一个核心线程,它确保所有的任务都在同一个线程中按顺序执行。因此在这些任务之间不需要处理线程同步的问题
线程池实现
#include <iostream>
#include <queue>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <pthread.h>typedef bool (*task_callback)(int data);
class Task
{public:Task(){} Task(int data, task_callback handler){_data = data;_handler = handler;} ~Task(){} public://设置任务处理的数据以及处理方法void SetTask(int data, task_callback handler){_data = data;_handler = handler;} //执行任务bool Run() {return _handler(_data);} private:int _data;task_callback _handler;
};
#define MAX_THR 5
#define MAX_QUE 10
class ThreadPool
{public:ThreadPool(int qmax = MAX_QUE, int tmax = MAX_THR):_thr_max(tmax), _capacity(qmax), _thr_cur(tmax){pthread_mutex_init(&_mutex, NULL);pthread_cond_init(&_cond_con, NULL);pthread_cond_init(&_cond_pro, NULL);}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond_con);pthread_cond_destroy(&_cond_pro);}public:static void *thr_start(void *arg) {ThreadPool *pool = (ThreadPool*)arg;while(1) {pool->QueueLock();while(pool->QueueIsEmpty()){pool->ConWait();}Task tt;pool->QueuePop(&tt);pool->ProWakeUp();pool->QueueUnLock();//为了防止处理时间过长导致其它线程无法获取锁//因此解锁之后才进行处理tt.Run();}return NULL;}bool ThreadPoolInit() {pthread_t tid;int ret, i;for (i = 0; i < _thr_max; i++) {ret = pthread_create(&tid, NULL, thr_start,(void*)this);if (ret != 0) {std::cout<<"thread create error\n";return false; }pthread_detach(tid);}return true;}void AddTask(Task tt) {//向线程池添加任务QueueLock();while(QueueIsFull()) {ProWait();}QueuePush(tt);ConWakeUp();QueueUnLock();}void ThreadPoolQuit(){//退出线程池中所有的线程_quit_flag = true;while(_thr_cur > 0) {ConWakeUpAll(); usleep(1000);}return;}private:void QueuePush(Task tt){_queue.push(tt);}void QueuePop(Task *tt){*tt = _queue.front();_queue.pop();}void QueueLock(){pthread_mutex_lock(&_mutex);}void QueueUnLock(){pthread_mutex_unlock(&_mutex);}void ProWait(){pthread_cond_wait(&_cond_pro, &_mutex);} void ProWakeUp(){pthread_cond_signal(&_cond_pro);}void ConWait(){//进入这个函表示现在没有任务if (_quit_flag == true) {//若线程池要求退出_thr_cur--;std::cout<<"thread:"<<pthread_self()<<"exit\n";pthread_mutex_unlock(&_mutex);pthread_exit(NULL);}pthread_cond_wait(&_cond_con, &_mutex);}void ConWakeUp(){pthread_cond_signal(&_cond_con);}void ConWakeUpAll(){pthread_cond_broadcast(&_cond_con);}bool QueueIsFull(){return (_queue.size() == _capacity);}bool QueueIsEmpty(){return _queue.empty();}private:int _thr_max;int _thr_cur;int _quit_flag;std::queue<Task> _queue;int _capacity;pthread_mutex_t _mutex;pthread_cond_t _cond_pro;pthread_cond_t _cond_con;
};
bool task_handler(int data){//休眠一段时间srand(time(NULL));int sec = rand()%5;std::cout<<"thread:"<<pthread_self()<<" sleep "<<sec<<"second\n";sleep(sec);return true;
}
int main()
{ThreadPool pool;Task tt[10];pool.ThreadPoolInit();int i;for (i = 0; i < 10; i++) {tt[i].SetTask(i, task_handler);pool.AddTask(tt[i]);}pool.ThreadPoolQuit();return 0;
}