程序随笔——C++实现的一个线程池

1.线程池简介

我们知道在线程池是一种多线程处理形式,处理过程中我们将相应的任务提交给线程池,线程池会分配对应的工作线程执行任务或存放在任务队列中,等待执行。

面向对象编程中,创建和销毁对象是需要消耗一定时间的,因为创建一个对象要获取内存资源或者其它更多资源。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因。当然线程池也同样适用这种思想。

因为线程的创建和销毁时需要消耗一定的时间的。假设,线程的创建消耗T1,线程执行任务的时间T2,线程的销毁销毁T3。当T1 + T3 > T2时候,使用线程池技术,通过线程的复用,就能提高程序的性能。

2.线程池的作用

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,并出现"OutOfMemory"的错误。
3.线程池的实现
我们这里实现的线程池是通过类CThreadPool来实现的。这个线程池类的构造函数如下所示:
[cpp] view plain copy
  1. CThreadPool(int corePoolSize, int maximumPoolSize, int keepAliveTime, CBlockingQueue<Task> *pQueue);  

构造函数参数的含义:

  1. corePoolSize:核心池的大小,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,默认情况下,在创建了线程池之后,线程池中的线程数为0,,当有任务到来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把达到的任务放到缓存对队列当中。
  2. maximumPoolSize:线程池最大线程数,表示在线程池中最多能创建多少个线程。
  3. keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。
  4. CBlockingQueue<Task>:任务队列,用来存放提交到线程池的任务。

我们实现的线程池的大概原理是:在创建线程池之后,线程池中并没有任何工作线程的,当使用线程池提供的execute方法,向线程池提交任务的时候,如果线程池中存在空闲的工作线程,那么就会复用该工作线程,并不会去创建新的工作线程;但是如果没有工作线程或空闲的工作线程,并且当然的工作线程数量小于核心池的大小时候,会创建一个工作线程去执行任务,若当前的工作线程数量达到了核心池的数量,那么就会将任务放入到队列中去;若队列满的情况下,如果没有达到线程池的最大线程数,那么就会将创建新的工作线程去执行任务;若线程数达到了最大的线程数,那么我们是抛出异常,这里没有提供一个拒绝的策略,后续有时间的会处理,目前就向采用抛出异常;并且在当前的工作线程数大于核心池数的时候,会有超时机制,关闭指定某时间的空闲工作线程,直到等于核心池的大小。当然,目前的实现还是有缺陷的,线程是结束了,当时并没释放到资源,目前没想到好的方法。

为了简化线程池的配置,我们提供了一个工厂类来进行线程池的创建。我们的工厂类支持创建三种线程池:

  1. newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  2. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  3. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

整个实现的原理就是这样,下面 直接上实现,我们的实现只适用Linux上,并且需要依赖boost库。一些实现是之前写的库。这里只贴出新实现的代码。

1.FactoryThreadPool.h

[cpp] view plain copy
  1. #ifndef __FACTORY_THREAD_POOL_H__  
  2. #define __FACTORY_THREAD_POOL_H__  
  3.   
  4. #include "ThreadPool.h"  
  5. #include "LinkedBlockingQueue.h"  
  6.   
  7. //创建线程池的工厂类,一般直接使用工厂类创建所需的线程池  
  8. //共提供三种线程池:固定大小线程池、可缓存的线程池、单个后台线程  
  9. class CFactoryThreadPool  
  10. {  
  11. public:  
  12.     CFactoryThreadPool() { }  
  13.     ~CFactoryThreadPool() { }  
  14.   
  15.     //固定大小线程池。每次提交一个任务就创建一个线程池,直到线程池达到  
  16.     //线程池的最大大小。线程池的大小一旦达到最大值就会保持不变。  
  17.     CAbstractThreadPool *newFixedThreadPool(int iThreads)  
  18.     {  
  19.         CBlockingQueue<Task> *pBlockingQueue = new CLinkedBlockingQueue<Task>();  
  20.   
  21.         return new CThreadPool(iThreads, iThreads, 0, pBlockingQueue);  
  22.     }  
  23.     //可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么  
  24.     //就会回收部分空闲(60s不执行任务)的线程,当任务数增加是,此线程池  
  25.     //又可以智能的添加新线程来处理任务,线程池的大小依赖于操作系统能创建  
  26.     //的最大线程的大小  
  27.     CAbstractThreadPool *newCachedThreadPool()  
  28.     {  
  29.         CBlockingQueue<Task> *pBlockingQueue = new CLinkedBlockingQueue<Task>(1);  
  30.       
  31.         return new CThreadPool(0, INT_MAX, 60, pBlockingQueue);  
  32.     }  
  33.     //单个后台线程。这个线程池只有一个线程在工作,也就是相当于单线程串行  
  34.     //执行所有任务。保证所有任务的执行按照提交的顺序。  
  35.     CAbstractThreadPool * newSingleThreadExecutor()  
  36.     {  
  37.         CBlockingQueue<Task> *pBlockingQueue = new CLinkedBlockingQueue<Task>();  
  38.       
  39.         return new CThreadPool(1, 1, 0, pBlockingQueue);  
  40.     }  
  41. };  
  42.   
  43. #endif  //#ifndef __FACTORY_THREAD_POOL_H__  
2.AbstractThreadPool.h

[cpp] view plain copy
  1. #ifndef __ABSTRACT_THREAD_POOL_H__  
  2. #define __ABSTRACT_THREAD_POOL_H__  
  3.   
  4. #include "TaskFuncExecute.h"  
  5.   
  6. //抽象基类  
  7. class CAbstractThreadPool  
  8. {  
  9. public:  
  10.     CAbstractThreadPool() { }  
  11.   
  12.     virtual ~CAbstractThreadPool() { }  
  13.   
  14.     virtual void execute(const Task &task) = 0;  
  15.     virtual void shutdown() = 0;  
  16. };  
  17.   
  18.   
  19. #endif  //#ifndef __ABSTRACT_THREAD_POOL_H__  
3.TaskFuncExecute.h

[cpp] view plain copy
  1. #ifndef __TASK_FUNC_EXECUTE_H__  
  2. #define __TASK_FUNC_EXECUTE_H__  
  3.   
  4. #include <boost/function.hpp>  
  5.   
  6. typedef boost::function<void(void)>   Task;  
  7.   
  8. //类CTaskFuncExecute,用来执行boost::function函数对象,里面实现了execute  
  9. class CTaskFuncExecute  
  10. {  
  11. public:  
  12.     CTaskFuncExecute(const Task &task = nullTask)  
  13.         :m_TaskFunction(task)  
  14.     {  
  15.   
  16.     }  
  17.   
  18.     ~CTaskFuncExecute()  
  19.     {  
  20.           
  21.     }  
  22.   
  23.     CTaskFuncExecute(const CTaskFuncExecute &Excute)  
  24.     {  
  25.         m_TaskFunction = Excute.m_TaskFunction;  
  26.     }  
  27.   
  28.     CTaskFuncExecute &operator=(const CTaskFuncExecute &Excute)  
  29.     {  
  30.         m_TaskFunction = Excute.m_TaskFunction;  
  31.         return *this;  
  32.     }  
  33.   
  34.     void execute()  
  35.     {  
  36.         m_TaskFunction();  
  37.     }  
  38.   
  39.     //定义一个空任务,实际上不做什么  
  40.     static void nullTask(void)  
  41.     {  
  42.         //nothing  
  43.     }  
  44.       
  45. private:  
  46.     Task m_TaskFunction;  
  47. };  
  48.   
  49. #endif  //#ifndef __TASK_FUNC_EXECUTE_H__  
4.Thread.h

[cpp] view plain copy
  1. #ifndef __THREAD_H__  
  2. #define __THREAD_H__  
  3.   
  4. #include <pthread.h>  
  5. #include <string>  
  6. #include <boost/function.hpp>  
  7. #include "TaskFuncExecute.h"  
  8. #include "Condition.h"  
  9.   
  10. //CThread实现了对Linux中的一些线程操作的封装  
  11. class CThread  
  12. {  
  13.     friend void *startThread(void *pObj);  
  14.   
  15. public:  
  16.     typedef boost::function<void(void)> ThreadFunc;  
  17.     enum ThreadStatus {RUNNING, EXIT, JOIN};  
  18.     CThread(const ThreadFunc &Func, const std::string &strName);  
  19.     virtual ~CThread();  
  20.   
  21.     void start(const Task &InitTask);  
  22.     int join();  
  23.     void exit(void *rval_ptr);  
  24.   
  25.     void setname(std::string &strName);  
  26.     const std::string &name(void);  
  27.   
  28.     pthread_t getSelfId();  
  29.   
  30.     ThreadStatus getThreadStatus();  
  31.       
  32. private:  
  33.     ThreadStatus m_Status;          //线程的状态  
  34.     pthread_t m_tId;                //线程标识  
  35.     ThreadFunc m_Func;  
  36.     CMutexLock m_Mutex;  
  37.     std::string m_strThreadName;    //线程名  
  38. };  
  39.   
  40. #endif  //#ifndef __THREAD_H__  
5.thread.cpp

[cpp] view plain copy
  1. #include "Thread.h"  
  2. #include "Logger.h"  
  3. #include "Exception.h"  
  4.   
  5. using namespace Log;  
  6. using namespace Exception;  
  7.   
  8. //一个辅助类,实现运行boost::function函数对象  
  9. struct threadData  
  10. {  
  11.     typedef CThread::ThreadFunc ThreadFunc;  
  12.     ThreadFunc m_Func;  
  13.     CThread *m_pThis;  
  14.     Task m_Task;  
  15.   
  16.     threadData(const ThreadFunc &Func, CThread *pThis, const Task &InitTask)  
  17.         :m_Func(Func)  
  18.         ,m_pThis(pThis)  
  19.         ,m_Task(InitTask)  
  20.     {  
  21.   
  22.     }  
  23.   
  24.     void runThreadFunc()  
  25.     {  
  26.         try  
  27.         {  
  28.             m_Func();  
  29.         }  
  30.         catch (std::exception &ex)  
  31.         {  
  32.             LOG_FATAL << "runThreadFunc exception : " << ex.what();  
  33.         }  
  34.         catch (...)  
  35.         {  
  36.             LOG_FATAL << "runThreadFunc unknow exception";  
  37.         }  
  38.     }  
  39. };  
  40.   
  41. void *startThread(void *pObj)  
  42. {  
  43.     try  
  44.     {  
  45.         if (pObj == NULL)  
  46.             throw CException("startThread parament obj is null");  
  47.   
  48.         threadData *pData = static_cast<threadData *>(pObj);  
  49.         pData->m_Task();  
  50.         pData->runThreadFunc();  
  51.         pData->m_pThis->m_Mutex.lock();  
  52.         pData->m_pThis->m_Status = CThread::EXIT;  
  53.         delete pData;  
  54.         pData->m_pThis->m_Mutex.unlock();  
  55.     }  
  56.     catch (const CException &ex)  
  57.     {  
  58.         LOG_FATAL << "throw exception : " << ex.what();  
  59.     }  
  60.     return NULL;  
  61. }  
  62.   
  63.   
  64. CThread::CThread(const ThreadFunc &Func, const std::string &strName)  
  65.     :m_Status(RUNNING)  
  66.     ,m_tId(0)  
  67.     ,m_Func(Func)  
  68.     ,m_Mutex()  
  69.     ,m_strThreadName(strName)  
  70. {  
  71.   
  72. }  
  73.   
  74. CThread::~CThread()  
  75. {  
  76.     if (m_Status != JOIN)  
  77.     {  
  78.         ::pthread_detach(m_tId);  
  79.     }  
  80. }  
  81.   
  82. void CThread::start(const Task &InitTask)  
  83. {  
  84.     threadData *pData = new threadData(m_Func, this, InitTask);  
  85.     int iRet = ::pthread_create(&m_tId, NULL, startThread, pData);  
  86.     if (iRet != 0)  
  87.     {  
  88.         //创建线程失败?认为这是个致命错误,会终止程序的运行  
  89.         LOG_FATAL << "pthread_create false return err " << iRet;  
  90.     }  
  91.     LOG_INFO << "create thread : " << m_strThreadName << ",tid = " << m_tId;  
  92. }  
  93.   
  94. int CThread::join()  
  95. {  
  96.     m_Mutex.lock();  
  97.     if (m_Status == JOIN)  
  98.     {  
  99.         m_Mutex.unlock();  
  100.         //重复的调用join,这里需要向日志系统输出错误信息。  
  101.         LOG_ERROR << "repeat call pthread_join";  
  102.         return -1;  
  103.     }  
  104.     m_Mutex.unlock();  
  105.     LOG_INFO << "join thread, tid = " << m_tId;  
  106.     int iRet = ::pthread_join(m_tId, NULL);  
  107.     m_Mutex.lock();  
  108.     m_Status = JOIN;  
  109.     m_Mutex.unlock();  
  110.   
  111.     return iRet;  
  112. }  
  113.   
  114. void CThread::exit(void *rval_ptr)  
  115. {  
  116.     ::pthread_exit(rval_ptr);  
  117. }  
  118.   
  119.   
  120. const std::string &CThread::name(void)  
  121. {  
  122.     CMutexLockPart lock(m_Mutex);  
  123.     return m_strThreadName;  
  124. }  
  125.   
  126. void CThread::setname(std::string &strName)  
  127. {  
  128.     CMutexLockPart lock(m_Mutex);  
  129.     m_strThreadName = strName;  
  130. }  
  131.   
  132. pthread_t CThread::getSelfId()  
  133. {  
  134.     return ::pthread_self();  
  135. }  
  136.   
  137. CThread::ThreadStatus CThread::getThreadStatus()  
  138. {  
  139.     CMutexLockPart lock(m_Mutex);  
  140.     return m_Status;  
  141. }  
6.ThreadPool.h

[cpp] view plain copy
  1. #ifndef __THREAD_POOL_H__  
  2. #define __THREAD_POOL_H__  
  3.   
  4. #include <boost/ptr_container/ptr_vector.hpp>  
  5. #include "AbstractThreadPool.h"  
  6. #include "BlockingQueue.h"  
  7. #include "Thread.h"  
  8. #include "TaskFuncExecute.h"  
  9.   
  10. //BlockingQueue由使用者申请,在析构程池的时候释放  
  11. class CThreadPool   : public CAbstractThreadPool  
  12. {  
  13. public:  
  14.   
  15.     CThreadPool(int corePoolSize, int maximumPoolSize, int keepAliveTime, CBlockingQueue<Task> *pQueue);  
  16.     ~CThreadPool();  
  17.   
  18.     void execute(const Task &task);  
  19.     void shutdown();  
  20.   
  21. private:  
  22.     void allWorker(const Task &task);  
  23.     bool addBlockingQueue(const Task &task);  
  24.     void runInWorkerThread();  
  25.     Task getTask();  
  26.   
  27.     enum ThreadPoolStatus {RUNNING, SHUTDOWN};  
  28.     ThreadPoolStatus m_eStatue;  
  29.     int m_iCorePoolSize;        //核心池的大小  
  30.     int m_iMaximumPoolSize;     //最大的大小  
  31.     int m_iKeepAliveTime;       //空闲线程等待新任务的最长时间.0表示不使用  
  32.     int m_iCurrentThreadSum;    //当前的工作线程  
  33.     int m_iIdleThreadSum;       //空闲的工作线程  
  34.     CMutexLock m_MutexLock;  
  35.     CCondition m_Condition;  
  36.     CBlockingQueue<Task> *m_pQueue;  
  37.     boost::ptr_vector<CThread> m_vecWorker;   //所有的工作线程的集合  
  38. };  
  39.   
  40.   
  41. #endif  //#ifndef __THREAD_POOL_H__  
7.ThreadPool.cpp

[cpp] view plain copy
  1. #include <boost/bind.hpp>  
  2. #include "ThreadPool.h"  
  3. #include "Exception.h"  
  4. #include "Logger.h"  
  5.   
  6. using namespace Exception;  
  7. using namespace Log;  
  8.   
  9. CThreadPool::CThreadPool(int corePoolSize, int maximumPoolSize, int keepAliveTime, CBlockingQueue<Task> *pQueue)  
  10.     :m_eStatue(RUNNING)  
  11.     ,m_iCorePoolSize(corePoolSize)  
  12.     ,m_iMaximumPoolSize(maximumPoolSize)  
  13.     ,m_iKeepAliveTime(keepAliveTime)  
  14.     ,m_iCurrentThreadSum(0)  
  15.     ,m_iIdleThreadSum(0)  
  16.     ,m_MutexLock()  
  17.     ,m_Condition(m_MutexLock)  
  18.     ,m_pQueue(pQueue)  
  19.     ,m_vecWorker()  
  20. {  
  21.     m_vecWorker.clear();  
  22. }  
  23.   
  24. CThreadPool::~CThreadPool()  
  25. {  
  26.     m_vecWorker.erase(m_vecWorker.begin(), m_vecWorker.end());  
  27.     //队列由工厂类创建,这里释放掉  
  28.     if (m_pQueue != NULL)  
  29.         delete m_pQueue;    //这里需要释放队列  
  30. }  
  31.   
  32. void CThreadPool::execute(const Task &task)  
  33. {  
  34.     //为了简化实现,这里没有提供可选的策略。后续有时间可完善  
  35.     try  
  36.     {  
  37.         if (m_eStatue == RUNNING)  
  38.         {  
  39.             m_MutexLock.lock();  
  40.             if (m_iCorePoolSize == 0 || m_iCurrentThreadSum < m_iCorePoolSize)  
  41.             {  
  42.                 if (m_iIdleThreadSum != 0)  
  43.                 {  
  44.                     m_MutexLock.unlock();  
  45.                     //有空闲的工作线程,直接放到队列去  
  46.                     (void)addBlockingQueue(task);     
  47.                 }  
  48.                 else  
  49.                 {  
  50.                     m_MutexLock.unlock();  
  51.                     //继续分配工作线程  
  52.                     allWorker(task);  
  53.                 }  
  54.             }  
  55.             else  
  56.             {  
  57.                 if (m_pQueue && m_pQueue->full())  
  58.                 {  
  59.                     m_MutexLock.unlock();  
  60.                     allWorker(task);  
  61.                 }  
  62.                 else  
  63.                 {  
  64.                     m_MutexLock.unlock();  
  65.                     (void)addBlockingQueue(task);  
  66.                 }  
  67.             }  
  68.         }  
  69.         else  
  70.         {  
  71.             //线程池处于SHUTDOWN状态,此时如果在提交任务,  
  72.             //我们会抛出异常,这就是我们使用的默认策略  
  73.             m_MutexLock.unlock();  
  74.             throw CException("ThreadPool status SHUTDOWN!");  
  75.         }  
  76.     }  
  77.     catch (const CException &ex)    
  78.     {  
  79.         LOG_ERROR << "Throw exception : " << ex.what();  
  80.     }  
  81. }  
  82.   
  83. void CThreadPool::shutdown()  
  84. {  
  85.     m_MutexLock.lock();  
  86.     m_eStatue = SHUTDOWN;  
  87.     m_Condition.broadcast();  
  88.     m_MutexLock.unlock();  
  89.   
  90.     for (boost::ptr_vector<CThread>::iterator iter = m_vecWorker.begin();   
  91.             iter != m_vecWorker.end(); ++iter)  
  92.     {  
  93.         iter->join();  
  94.     }  
  95. }  
  96.   
  97. void CThreadPool::runInWorkerThread()  
  98. {  
  99.     try  
  100.     {  
  101.         for(;;)  
  102.         {  
  103.             m_MutexLock .lock();  
  104.             if (m_eStatue == SHUTDOWN && m_pQueue && m_pQueue->empty())  
  105.             {  
  106.                 m_MutexLock.unlock();  
  107.                 return;  
  108.             }  
  109.             m_MutexLock.unlock();  
  110.             Task task = getTask();  
  111.             if (task == NULL)  
  112.                 return;  
  113.   
  114.             task();             //执行任务  
  115.         }  
  116.     }  
  117.     catch (const CException &ex)  
  118.     {  
  119.         LOG_ERROR << "runInWorkerThread throw exeception " << ex.what();  
  120.     }  
  121.     catch (...)  
  122.     {  
  123.         LOG_ERROR << "runInWorkerThread unknow error";  
  124.     }  
  125. }  
  126.   
  127. Task CThreadPool::getTask()  
  128. {  
  129.     CMutexLockPart lock(m_MutexLock);  
  130.     while (m_pQueue && m_pQueue->empty() && m_eStatue == RUNNING)  
  131.     {  
  132.         ++m_iIdleThreadSum;  
  133.         if (m_iKeepAliveTime == 0 || (m_iCorePoolSize && m_iCurrentThreadSum < m_iCorePoolSize))  
  134.         {  
  135.             m_Condition.wait();  
  136.         }  
  137.         else  
  138.         {  
  139.             bool bRet = m_Condition.waitForSeconds(m_iKeepAliveTime);  
  140.             if (bRet == true)  
  141.             {  
  142.                 --m_iIdleThreadSum;  
  143.                 return NULL;  
  144.             }  
  145.         }  
  146.         --m_iIdleThreadSum;  
  147.     }  
  148.   
  149.     if (m_pQueue->empty() && m_eStatue == SHUTDOWN)  
  150.         return NULL;  
  151.   
  152.     Task task = m_pQueue->poll();  
  153.   
  154.     if (m_pQueue && (m_pQueue->size() > 0))  
  155.         m_Condition.signal();  
  156.   
  157.     return task;  
  158. }  
  159.   
  160. void CThreadPool::allWorker(const Task &task)  
  161. {  
  162.     try  
  163.     {  
  164.         CMutexLockPart lock(m_MutexLock);  
  165.   
  166.         if (m_iCurrentThreadSum >= m_iMaximumPoolSize)  
  167.             throw CException("Current Threads out of Max Threads");  
  168.           
  169.         CThread *pWorkerThread = new CThread(boost::bind(&CThreadPool::runInWorkerThread, this), "Worker Thread");  
  170.         pWorkerThread->start(task);  
  171.         m_vecWorker.push_back(pWorkerThread);  
  172.         ++m_iCurrentThreadSum;  
  173.     }  
  174.     catch (const CException &ex)  
  175.     {  
  176.         LOG_ERROR << "ThreadPool allWorker throw exception " << ex.what();  
  177.     }  
  178. }  
  179.   
  180. bool CThreadPool::addBlockingQueue(const Task &task)  
  181. {  
  182.     try  
  183.     {  
  184.         LOG_INFO << "addBlockingQueue";  
  185.         CMutexLockPart lock(m_MutexLock);  
  186.         if (m_pQueue && m_pQueue->full())  
  187.             return false;  
  188.           
  189.         if (m_pQueue && !m_pQueue->offer(task))  
  190.                 throw CException("ThreadPool add BlockingQueue false");  
  191.   
  192.         m_Condition.signal();  
  193.         return true;  
  194.     }  
  195.     catch (const CException &ex)  
  196.     {  
  197.         //这种情况应该是不会出现的,如果出现了,进行日志记录。继续运行  
  198.         LOG_ERROR << "ThreadPool addBlockingQueue throw exception " << ex.what();  
  199.         return true;  
  200.     }  
  201. }  
8.mainFixedThreadPool.cpp
[cpp] view plain copy
  1. #include "FactoryThreadPool.h"  
  2. #include "Logger.h"   
  3. #include <stdio.h>  
  4. #include <unistd.h>  
  5. #include <boost/bind.hpp>  
  6.   
  7. using namespace Log;  
  8.   
  9. //一个计算两个数之和的任务  
  10. void Add(int iValue1, int iValue2)  
  11. {  
  12.     LOG_INFO << iValue1 << " + " << iValue2 << " = " << iValue1 + iValue2;  
  13.     //sleep(1);  
  14. }  
  15.   
  16. //这是固定大小线程池的测试代码,这里我们创建了一个固定大小为10的  
  17. //线程池,该线程池用来执行计算两数之和。  
  18. int main(void)  
  19. {  
  20.     //日志的报警等级为INFO    
  21.     CLogger::setLogLevel(CLogger::DEBUG);    
  22.     //CLogger::setLogLevel(CLogger::INFO);    
  23.     //我们设置日志输出到文件     
  24.     CLogger::setOutputMode(LOGGER_MODE_LOGFILE);   
  25.   
  26.     LOG_DEBUG << "---------------------start-------------------";   
  27.   
  28.     LOG_DEBUG << "FixedThreadPool test start!";  
  29.   
  30.     CFactoryThreadPool *pFactory = new CFactoryThreadPool();  
  31.     CAbstractThreadPool *pThreadPool = pFactory->newFixedThreadPool(10);  
  32.   
  33.     //向线程池提交200个加法运算的任务  
  34.     for (int i = 0; i < 200; i++)  
  35.     {  
  36.         pThreadPool->execute(boost::bind(Add, i, i));  
  37.     }  
  38.       
  39.     pThreadPool->shutdown();  
  40.   
  41.     delete pThreadPool;  
  42.     delete pFactory;  
  43.   
  44.     LOG_DEBUG << "----------------------end--------------------";   
  45.   
  46.     return 0;  
  47. }  
9.mainCachedThreadPool.cpp

[cpp] view plain copy
  1. #include "FactoryThreadPool.h"  
  2. #include "Logger.h"   
  3. #include <stdio.h>  
  4. #include <unistd.h>  
  5. #include <boost/bind.hpp>  
  6.   
  7. using namespace Log;  
  8.   
  9. //一个计算两个数之和的任务  
  10. void Add(int iValue1, int iValue2)  
  11. {  
  12.     LOG_INFO << iValue1 << " + " << iValue2 << " = " << iValue1 + iValue2;  
  13.     sleep(1);  
  14. }  
  15.   
  16. //这是可缓存线程池的测试代码,这里我们创建了一个固定大小为10的  
  17. //线程池,该线程池用来执行计算两数之和。  
  18. int main(void)  
  19. {  
  20.     //日志的报警等级为INFO    
  21.     CLogger::setLogLevel(CLogger::DEBUG);    
  22.     //CLogger::setLogLevel(CLogger::INFO);    
  23.     //我们设置日志输出到文件     
  24.     CLogger::setOutputMode(LOGGER_MODE_LOGFILE);   
  25.   
  26.     LOG_DEBUG << "---------------------start-------------------";   
  27.   
  28.     LOG_DEBUG << "newCachedThreadPool test start!";  
  29.   
  30.     CFactoryThreadPool *pFactory = new CFactoryThreadPool();  
  31.     CAbstractThreadPool *pThreadPool = pFactory->newCachedThreadPool();  
  32.   
  33.     //向线程池提交200个加法运算的任务  
  34.     for (int i = 0; i < 200; i++)  
  35.     {  
  36.         pThreadPool->execute(boost::bind(Add, i, i));  
  37.     }  
  38.       
  39.     pThreadPool->shutdown();  
  40.   
  41.     delete pThreadPool;  
  42.     delete pFactory;  
  43.   
  44.     LOG_DEBUG << "----------------------end--------------------";   
  45.   
  46.     return 0;  
  47. }  
10.mainSingleThreadPool.cpp

[cpp] view plain copy
  1. #include "FactoryThreadPool.h"  
  2. #include "Logger.h"   
  3. #include <stdio.h>  
  4. #include <unistd.h>  
  5. #include <boost/bind.hpp>  
  6.   
  7. using namespace Log;  
  8.   
  9. //一个计算两个数之和的任务  
  10. void Add(int iValue1, int iValue2)  
  11. {  
  12.     LOG_INFO << iValue1 << " + " << iValue2 << " = " << iValue1 + iValue2;  
  13.     sleep(1);  
  14. }  
  15.   
  16. //这是固定大小线程池的测试代码,这里我们创建了一个固定大小为10的  
  17. //线程池,该线程池用来执行计算两数之和。  
  18. int main(void)  
  19. {  
  20.     //日志的报警等级为INFO    
  21.     CLogger::setLogLevel(CLogger::DEBUG);    
  22.     //CLogger::setLogLevel(CLogger::INFO);    
  23.     //我们设置日志输出到文件     
  24.     CLogger::setOutputMode(LOGGER_MODE_LOGFILE);   
  25.   
  26.     LOG_DEBUG << "---------------------start-------------------";   
  27.   
  28.     LOG_DEBUG << "SingleThreadPool test start!";  
  29.   
  30.     CFactoryThreadPool *pFactory = new CFactoryThreadPool();  
  31.     CAbstractThreadPool *pThreadPool = pFactory->newSingleThreadExecutor();  
  32.   
  33.     //向线程池提交200个加法运算的任务  
  34.     for (int i = 0; i < 200; i++)  
  35.     {  
  36.         pThreadPool->execute(boost::bind(Add, i, i));  
  37.     }  
  38.       
  39.     pThreadPool->shutdown();  
  40.   
  41.     delete pThreadPool;  
  42.     delete pFactory;  
  43.   
  44.     LOG_DEBUG << "----------------------end--------------------";   
  45.   
  46.     return 0;  
  47. }  
11.mainThreadPool.cpp

[cpp] view plain copy
  1. #include "ThreadPool.h"  
  2. #include "Logger.h"   
  3. #include "LinkedBlockingQueue.h"  
  4. #include <stdio.h>  
  5. #include <unistd.h>  
  6. #include <boost/bind.hpp>  
  7.   
  8. using namespace Log;  
  9.   
  10. //一个计算两个数之和的任务  
  11. void Add(int iValue1, int iValue2)  
  12. {  
  13.     LOG_INFO << iValue1 << " + " << iValue2 << " = " << iValue1 + iValue2;  
  14.     sleep(2);  
  15. }  
  16.   
  17. //这是线程池的测试代码,这里我们创建了一个大小为2的  
  18. //线程池,该线程池最大允许的线程数为4,  
  19. //该线程池用来执行计算两数之和。  
  20. int main(void)  
  21. {  
  22.     //日志的报警等级为INFO    
  23.     CLogger::setLogLevel(CLogger::DEBUG);    
  24.     //CLogger::setLogLevel(CLogger::INFO);    
  25.     //我们设置日志输出到文件     
  26.     CLogger::setOutputMode(LOGGER_MODE_LOGFILE);   
  27.   
  28.     LOG_DEBUG << "---------------------start-------------------";   
  29.   
  30.     LOG_DEBUG << "CommonThreadPool test start!";  
  31.   
  32.     CThreadPool *pThreadPool = new CThreadPool(2, 4, 30, new CLinkedBlockingQueue<Task>(10));  
  33.   
  34.     //向线程池提交200个加法运算的任务  
  35.     for (int i = 0; i < 15; i++)  
  36.     {  
  37.         pThreadPool->execute(boost::bind(Add, i, i));  
  38.     }  
  39.       
  40.     pThreadPool->shutdown();  
  41.   
  42.     delete pThreadPool;  
  43.   
  44.     LOG_DEBUG << "----------------------end--------------------";   
  45.   
  46.     return 0;  
  47. }  

我们共给出了4个测试代码,包括三个工厂类创建 的线程池和直接只有CThreadPool创建的线程池的测试。这里没有贴出具体的测试结果。因为测试结果输出是比较多的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/385006.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

线程池原理及创建并C++实现

本文给出了一个通用的线程池框架&#xff0c;该框架将与线程执行相关的任务进行了高层次的抽象&#xff0c;使之与具体的执行任务无关。另外该线程池具有动态伸缩性&#xff0c;它能根据执行任务的轻重自动调整线程池中线程的数量。文章的最后&#xff0c;我们给出一个简单示例…

Linux 打印简单日志(一)

简单日志输出&#xff1a; #include<stdio.h> #include<string.h> #include<stdlib.h>void write(char* filename,char* szStr){FILE* fp;fp fopen(filename,"at");if(fp ! NULL){fwrite(szStr,256,1,fp); //fclose(fp);fp NULL;} }int main(int…

c++简单线程池实现

线程池&#xff0c;简单来说就是有一堆已经创建好的线程&#xff08;最大数目一定&#xff09;&#xff0c;初始时他们都处于空闲状态&#xff0c;当有新的任务进来&#xff0c;从线程池中取出一个空闲的线程处理任务&#xff0c;然后当任务处理完成之后&#xff0c;该线程被重…

Linux 打印可变参数日志

实现了传输进去的字符串所在的文档&#xff0c;函数和行数显示功能。 实现了将传入的可变参数打印到日志功能。 #include<stdio.h> #include<stdarg.h> #include<string.h>const char * g_path "/home/exbot/wangqinghe/log.txt"; #define LOG(fm…

C++强化之路之线程池开发整体框架(二)

一.线程池开发框架 我所开发的线程池由以下几部分组成&#xff1a; 1.工作中的线程。也就是线程池中的线程&#xff0c;主要是执行分发来的task。 2.管理线程池的监督线程。这个线程的创建独立于线程池的创建&#xff0c;按照既定的管理方法进行管理线程池中的所有线程&#xf…

vfprintf()函数

函数声明&#xff1a;int vfprintf(FILE *stream, const char *format, va_list arg) 函数参数&#xff1a; stream—这是指向了FILE对象的指针&#xff0c;该FILE对象标识了流。 format—c语言字符串&#xff0c;包含了要被写入到流stream中的文本。它可以包含嵌入的format标签…

Makefile(二)

将生产的.o文件放进指定的文件中&#xff08;先创建该文件夹&#xff09; src $(wildcard ./*.cpp) obj $(patsubst %.cpp,./output/%.o,$(src))target test$(target) : $(obj)g $(obj) -o $(target) %.o: %.cppg -c $< -o output/$.PHONY:clean clean:rm -f $(target) $…

TCP粘包问题分析和解决(全)

TCP通信粘包问题分析和解决&#xff08;全&#xff09;在socket网络程序中&#xff0c;TCP和UDP分别是面向连接和非面向连接的。因此TCP的socket编程&#xff0c;收发两端&#xff08;客户端和服务器端&#xff09;都要有成对的socket&#xff0c;因此&#xff0c;发送端为了将…

UML类图符号 各种关系说明以及举例

UML中描述对象和类之间相互关系的方式包括&#xff1a;依赖&#xff0c;关联&#xff0c;聚合&#xff0c;组合&#xff0c;泛化&#xff0c;实现等。表示关系的强弱&#xff1a;组合>聚合>关联>依赖 相互间关系 聚合是表明对象之间的整体与部分关系的关联&#xff0c…

寻找数组中第二大数

设置两个数值来表示最大数和第二大数&#xff0c;在循环比较赋值即可 //找给定数组中第二大的数int get_smax(int *arr,int length) {int max;int smax;if(arr[0] > arr[1]){max arr[0];smax arr[1];}else{max arr[1];smax arr[0];}for(int i 2; i < length; i){if(…

timerfd API使用总结

timerfd 介绍 timerfd 是在Linux内核2.6.25版本中添加的接口&#xff0c;其是Linux为用户提供的一个定时器接口。这个接口基于文件描述符&#xff0c;所以可以被用于select/poll/epoll的场景。当使用timerfd API创建多个定时器任务并置于poll中进行事件监听&#xff0c;当没有可…

#if/#else/#endif

在linux环境下写c代码时会尝试各种方法或调整路径&#xff0c;需要用到#if #include<stdio.h>int main(){int i; #if 0i 1; #elsei 2; #endifprintf("i %d",i);return 0; } 有时候会调整代码&#xff0c;但是又不是最终版本的更换某些值&#xff0c;就需要注…

内存分配调用

通过函数给实参分配内存&#xff0c;可以通过二级指针实现 #include<stdio.h> #incldue<stdlib.h>void getheap(int *p) //错误的模型 {p malloc(100); }void getheap(int **p) //正确的模型 {*p malloc(100); } int main() {int *p NULL;getheap(&p);free(p…

ESP传输模式拆解包流程

一、 ESP简介ESP&#xff0c;封装安全载荷协议(Encapsulating SecurityPayloads)&#xff0c;是一种Ipsec协议&#xff0c;用于对IP协议在传输过程中进行数据完整性度量、来源认证、加密以及防回放攻击。可以单独使用&#xff0c;也可以和AH一起使用。在ESP头部之前的IPV4…

结构体成员内存对齐

#include<stdio.h> struct A {int A; };int main() {struct A a;printf("%d\n",sizeof(a));return 0; } 运行结果&#xff1a;4 #include<stdio.h> struct A {int a;int b&#xff1b; };int main() {struct A a;printf("%d\n",sizeof(a))…

C库函数-fgets()

函数声明&#xff1a;char *fgets(char *str,int n,FILE *stream) 函数介绍&#xff1a;从指定的stream流中读取一行&#xff0c;并把它存储在str所指向的字符串中。当读取到&#xff08;n-1&#xff09;个字符时&#xff0c;获取读取到换行符时&#xff0c;或者到达文件末尾时…

linux内核netfilter模块分析之:HOOKs点的注册及调用

1: 为什么要写这个东西?最近在找工作,之前netfilter 这一块的代码也认真地研究过&#xff0c;应该每个人都是这样的你懂 不一定你能很准确的表达出来。 故一定要化些时间把这相关的东西总结一下。 0&#xff1a;相关文档linux 下 nf_conntrack_tuple 跟踪记录 其中可以根据内…

指定结构体元素的位字段

struct B {char a:4; //a这个成员值占了4bitchar b:2;char c:2; } 占了1个字节 struct B {int a:4; //a这个成员值占了4bitchar b:2;char c:2; } 占了8个字节 控制LED灯的结构体&#xff1a; struct E {char a1:1;char a2:1;char a3:1;char a4:1;char a5:1;char a6:1;char a7:1…

网络抓包工具 wireshark 入门教程

Wireshark&#xff08;前称Ethereal&#xff09;是一个网络数据包分析软件。网络数据包分析软件的功能是截取网络数据包&#xff0c;并尽可能显示出最为详细的网络数据包数据。Wireshark使用WinPCAP作为接口&#xff0c;直接与网卡进行数据报文交换。网络管理员使用Wireshark来…

结构体中指针

结构体中带有指针的情况 #include<stdio.h>struct man {char *name;int age; };int main() {struct man m {"tom",20};printf("name %s, age %d\n",m.name,m.age);return 0; } 运行结果&#xff1a; exbotubuntu:~/wangqinghe/C/20190714$ gcc st…