线程池的特点:
-
空间换时间,浪费服务器的硬件资源,换取运行效率.
-
池是一组资源的集合,这组资源在服务器启动之初就被完全创建好并初始化,这称为静态资源.
-
当服务器进入正式运行阶段,开始处理客户请求的时候,如果它需要相关的资源,可以直接从池中获取,无需动态分配.
-
当服务器处理完一个客户连接后,可以把相关的资源放回池中,无需执行系统调用释放资源.
工作流程:
采用Proactor并发模型,主线程负责监听文件描述符,接受socket连接,若当前监听的socket发生了读写事件,就把任务插入到请求队列中,工作线程从请求队列中取出任务,完成读写数据的处理。
线程池的定义如下:
template <typename T>
class threadpool
{
public:/*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);~threadpool();bool append(T *request, int state);bool append_p(T *request);private:/*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/static void *worker(void *arg);void run();private:int m_thread_number; //线程池中的线程数int m_max_requests; //请求队列中允许的最大请求数pthread_t *m_threads; //描述线程池的数组,其大小为m_thread_numberstd::list<T *> m_workqueue; //请求队列locker m_queuelocker; //保护请求队列的互斥锁sem m_queuestat; //是否有任务需要处理connection_pool *m_connPool; //数据库int m_actor_model; //模型切换
};
template <typename T>
threadpool<T>::threadpool( int actor_model, connection_pool *connPool, int thread_number, int max_requests) : m_actor_model(actor_model),m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL),m_connPool(connPool)
{if (thread_number <= 0 || max_requests <= 0)throw std::exception();m_threads = new pthread_t[m_thread_number];if (!m_threads)throw std::exception();for (int i = 0; i < thread_number; ++i){if (pthread_create(m_threads + i, NULL, worker, this) != 0){delete[] m_threads;throw std::exception();}if (pthread_detach(m_threads[i])){delete[] m_threads;throw std::exception();}}
}
template <typename T>
threadpool<T>::~threadpool()
{delete[] m_threads;
}
template <typename T>
bool threadpool<T>::append(T *request, int state)
{m_queuelocker.lock();if (m_workqueue.size() >= m_max_requests){m_queuelocker.unlock();return false;}request->m_state = state;m_workqueue.push_back(request);m_queuelocker.unlock();m_queuestat.post();return true;
}
template <typename T>
bool threadpool<T>::append_p(T *request)
{m_queuelocker.lock();if (m_workqueue.size() >= m_max_requests){m_queuelocker.unlock();return false;}m_workqueue.push_back(request);m_queuelocker.unlock();m_queuestat.post();return true;
}
template <typename T>
void *threadpool<T>::worker(void *arg)
{threadpool *pool = (threadpool *)arg;pool->run();return pool;
}
template <typename T>
void threadpool<T>::run()
{while (true){m_queuestat.wait();m_queuelocker.lock();if (m_workqueue.empty()){m_queuelocker.unlock();continue;}T *request = m_workqueue.front();m_workqueue.pop_front();m_queuelocker.unlock();if (!request)continue;if (1 == m_actor_model){if (0 == request->m_state){if (request->read_once()){request->improv = 1;connectionRAII mysqlcon(&request->mysql, m_connPool);request->process();}else{request->improv = 1;request->timer_flag = 1;}}else{if (request->write()){request->improv = 1;}else{request->improv = 1;request->timer_flag = 1;}}}else{connectionRAII mysqlcon(&request->mysql, m_connPool);request->process();}}
}
详细解释下work函数,这个函数不断的检查队列是否为空,如果不为空会唤醒线程,在处理任务前获取锁。从任务队列头部取出任务,释放锁,如果取出的任务对象是有效指针,如果是Proactor并发模型代表有数据要读,调用函数读取客户端数据,标记请求已被处理,获得数据库连接执行数据库操作。
如果读取失败,表示需要进行定时器的处理,如果m_stat=1,那么表示要写入数据,写入成功标记请求已经被处理,否则设置定时器。
如果不是Proactro模型,那么直接处理请求。