目录
boost的线程基本用法
boost:condition
thread_group 线程组
thread_pool
boost的线程基本用法
boost::thread Thread_GenerateUuid;boost::thread Thread_ShowUuid;boost::mutex mutex;std::queue<std::string>UuidQueue;void procGenerateUuid();void showUuid();void GetPveDataDemo::procGenerateUuid()
{try{for (;;){boost::this_thread::sleep_for(boost::chrono::seconds(2));boost::this_thread::interruption_point(); // 手动在线程中加入中断点,中断点不影响其他语句执行 当其他地方触发该断点(thread.interrupt())再次执行到此处,抛出boost::thread_interrupted异常,若未捕捉则线程终止。 if (UuidQueue.size() <= 5){boost::lock_guard<boost::mutex> lck_guard(mutex);std::string uuid = QUuid::createUuid().toString().toStdString();UuidQueue.push(uuid);}}}catch (...){}
}void GetPveDataDemo::showUuid()
{try{int cnt = 1;for (;;){boost::this_thread::sleep_for(boost::chrono::seconds(2));boost::this_thread::interruption_point();// 手动在线程中加入中断点,中断点不影响其他语句执行 当其他地方触发该断点可从该循环中退出if (!UuidQueue.empty()){boost::lock_guard<boost::mutex> lck_guard(mutex);std::string cuuid = UuidQueue.front();UuidQueue.pop();QString UuidDisplay = "#:" + QString::number(cnt) + ": " + QString(cuuid.c_str()) + "\n";ui.textBrowser_UUID->append(UuidDisplay);//界面和线程写在一起了不太好 ++cnt;}}}catch (...){}
}void GetPveDataDemo::on_pushButton_2_clicked()
{//Uuid线程if (!Thread_GenerateUuid.joinable() && !Thread_ShowUuid.joinable()) //joinable判断线程是否结束{Thread_GenerateUuid = boost::thread(boost::bind(&GetPveDataDemo::procGenerateUuid, this));Thread_ShowUuid = boost::move(boost::thread(boost::bind(&GetPveDataDemo::showUuid, this)));ui.pushButton_2->setText(QString::fromLocal8Bit("停止生成UUID"));}else{Thread_GenerateUuid.interrupt(); //向线程发送中断请求Thread_ShowUuid.interrupt();Thread_GenerateUuid.join();//join连接 detach是脱离线程 不会阻塞Thread_ShowUuid.join(); //join函数,作用是等待直到线程执行结束; 阻塞当前线程ui.pushButton_2->setText(QString::fromLocal8Bit("开始生成UUID"));ui.textBrowser_UUID->clear();ui.textBrowser_UUID->append(QString::fromLocal8Bit("当前生产线程停止"));ui.textBrowser_UUID->append(QString::fromLocal8Bit("当前消费线程停止"));}//move 移动语义 将临时对象的所有权给予GenerateUuid,避免临时变量不必要的拷贝和析构
}
boost:condition
与std的条件变量一样会有虚假唤醒问题,同样需要谓词处理。
#define LANENUM 32boost::mutex mtx_etcSignal[LANENUM];
boost::condition m_cond_etcSignal[LANENUM];map<string,int> plateNo2flag;bool waitForEtcSignal(int laneNo,const string&plate,int connTime)
{boost::mutex::scoped_lock lk(mutex mtx_etcSignal[laneNo]);m_cond_etcSignal[laneNo].wait_for(lk,bost::chrono::milliseconds(connTime),[&](){if(plateNo2flag.count(plate)&&plateNo2flag[plate] == 1){ plateNo2flag.erase(plate);return true;}else{return false;}});//添加谓词避免虚假唤醒return false;
}//扣费线程发起扣费请求...bool status = waitForEtcSignal(laneNo,plate,connTime);
if(status)
{//...
}//server线程收到扣费结果
//...
if(paySuccess)
{boost::mutex::scoped_lock lk(mutex mtx_etcSignal[laneNo]);plateNo2flag[plate] = 1;m_cond_etcSignal[laneNo].notify_all();
}
thread_group 线程组
boost::thread_group 实际利用list<thread>管理一组线程,方便批量管理;如批量join;不是线程池,没有线程池的自动伸缩等功能。
bool TestServerClient::startServer()
{if (!m_bStarted){ if (!group){group = new boost::thread_group();}try{group->create_thread(boost::bind(&TestServerClient::func_1, this));group->add_thread(new boost::thread(boost::bind(&TestServerClient::func_2, this, "hello client")));//创建线程会自动加入list中(无法接受参数,需要bind) add添加线程m_bStarted = true;}catch (std::runtime_error e){std::string errorstr(e.what());}}return m_bStarted;
}void TestServerClient::stopServer()
{if (m_bStarted){group->interrupt_all();group->join_all();group = nullptr;}
}
thread_pool
boost的线程池概述:
https://threadpool.sourceforge.net/index.html
threadpool是基于[boost]库实现的一个线程池子库 .
/*! \file
* \brief Thread pool core.
*
* This file contains the threadpool's core class: pool<Task, SchedulingPolicy>.
*
* Thread pools are a mechanism for asynchronous and parallel processing
* within the same process. The pool class provides a convenient way
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/#ifndef THREADPOOL_POOL_HPP_INCLUDED
#define THREADPOOL_POOL_HPP_INCLUDED#include <boost/ref.hpp>#include "./detail/pool_core.hpp"#include "task_adaptors.hpp"#include "./detail/locking_ptr.hpp"#include "scheduling_policies.hpp"
#include "size_policies.hpp"
#include "shutdown_policies.hpp"/// The namespace threadpool contains a thread pool and related utility classes.
namespace boost { namespace threadpool
{/*! \brief Thread pool. ** Thread pools are a mechanism for asynchronous and parallel processing * within the same process. The pool class provides a convenient way * for dispatching asynchronous tasks as functions objects. The scheduling* of these tasks can be easily controlled by using customized schedulers. * A task must not throw an exception.** A pool is DefaultConstructible, CopyConstructible and Assignable.* It has reference semantics; all copies of the same pool are equivalent and interchangeable. * All operations on a pool except assignment are strongly thread safe or sequentially consistent; * that is, the behavior of concurrent calls is as if the calls have been issued sequentially in an unspecified order.** \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored.* \param SchedulingPolicy A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. The scheduler shall not throw exceptions.** \remarks The pool class is thread-safe.* * \see Tasks: task_func, prio_task_func* \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler*/ template <typename Task = task_func,template <typename> class SchedulingPolicy = fifo_scheduler,template <typename> class SizePolicy = static_size,template <typename> class SizePolicyController = resize_controller,template <typename> class ShutdownPolicy = wait_for_all_tasks> class thread_pool {typedef detail::pool_core<Task, SchedulingPolicy,SizePolicy,SizePolicyController,ShutdownPolicy> pool_core_type;shared_ptr<pool_core_type> m_core; // pimpl idiomshared_ptr<void> m_shutdown_controller; // If the last pool holding a pointer to the core is deleted the controller shuts the pool down.public: // Type definitionstypedef Task task_type; //!< Indicates the task's type.typedef SchedulingPolicy<task_type> scheduler_type; //!< Indicates the scheduler's type./* typedef thread_pool<Task, SchedulingPolicy,SizePolicy,ShutdownPolicy > pool_type; //!< Indicates the thread pool's type.*/typedef SizePolicy<pool_core_type> size_policy_type; typedef SizePolicyController<pool_core_type> size_controller_type;public:/*! Constructor.* \param initial_threads The pool is immediately resized to set the specified number of threads. The pool's actual number threads depends on the SizePolicy.*/thread_pool(size_t initial_threads = 0): m_core(new pool_core_type), m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core)){size_policy_type::init(*m_core, initial_threads);}/*! Gets the size controller which manages the number of threads in the pool. * \return The size controller.* \see SizePolicy*/size_controller_type size_controller(){return m_core->size_controller();}/*! Gets the number of threads in the pool.* \return The number of threads.*/size_t size() const{return m_core->size();}/*! Schedules a task for asynchronous execution. The task will be executed once only.* \param task The task function object. It should not throw execeptions.* \return true, if the task could be scheduled and false otherwise. */ bool schedule(task_type const & task){ return m_core->schedule(task);}/*! Returns the number of tasks which are currently executed.* \return The number of active tasks. */ size_t active() const{return m_core->active();}/*! Returns the number of tasks which are ready for execution. * \return The number of pending tasks. */ size_t pending() const{return m_core->pending();}/*! Removes all pending tasks from the pool's scheduler.*/ void clear(){ m_core->clear();} /*! Indicates that there are no tasks pending. * \return true if there are no tasks ready for execution. * \remarks This function is more efficient that the check 'pending() == 0'.*/ bool empty() const{return m_core->empty();} /*! The current thread of execution is blocked until the sum of all active* and pending tasks is equal or less than a given threshold. * \param task_threshold The maximum number of tasks in pool and scheduler.*/ void wait(size_t task_threshold = 0) const{m_core->wait(task_threshold);} /*! The current thread of execution is blocked until the timestamp is met* or the sum of all active and pending tasks is equal or less * than a given threshold. * \param timestamp The time when function returns at the latest.* \param task_threshold The maximum number of tasks in pool and scheduler.* \return true if the task sum is equal or less than the threshold, false otherwise.*/ bool wait(xtime const & timestamp, size_t task_threshold = 0) const{return m_core->wait(timestamp, task_threshold);}};/*! \brief Fifo pool.** The pool's tasks are fifo scheduled task_func functors.**/ typedef thread_pool<task_func, fifo_scheduler, static_size, resize_controller, wait_for_all_tasks> fifo_pool;/*! \brief Lifo pool.** The pool's tasks are lifo scheduled task_func functors.**/ typedef thread_pool<task_func, lifo_scheduler, static_size, resize_controller, wait_for_all_tasks> lifo_pool;/*! \brief Pool for prioritized task.** The pool's tasks are prioritized prio_task_func functors.**/ typedef thread_pool<prio_task_func, prio_scheduler, static_size, resize_controller, wait_for_all_tasks> prio_pool;/*! \brief A standard pool.** The pool's tasks are fifo scheduled task_func functors.**/ typedef fifo_pool pool;} } // namespace boost::threadpool#endif // THREADPOOL_POOL_HPP_INCLUDED
使用示例:
// fifo策略的poolboost::threadpool::pool _thread_pool;
_thread_pool.size_controller().resize(4);
_thread_pool.schedule(boost::bind(&LedDevice::SendParkingLots, this, nBelongedPark));
//同步等待结束
_thread_pool.wait();// priority优先级线程池
#include <boost/thread.hpp>
#include "threadpool/pool.hpp"class HaiNanEtcInterface final:public EtcInterface
{
public://...enum Priority_Level{LowPriority = 100,MidPriority = 200,HighPriority = 300};//枚举声明 不定义不占内存 实际存在常量区(static 也在常量区)均可类名:访问
private:atomic_bool m_bStarted;boost::threadpool::prio_pool m_EtcThreadPool;
};bool HaiNanEtcInterface::startServer()
{if (!m_bStarted){
m_EtcThreadPool.size_controller().resize(4);//初始化线程池大小}m_bStarted = true;return m_bStarted;
}//使用优先级策略的线程池
void HaiNanEtcInterface::queryFee(const PassVehicle& pve)
{if (m_EtcThreadPool.size() > 1){ m_EtcThreadPool.schedule(boost::threadpool::prio_task_func(HighPriority, boost::bind(&HaiNanEtcInterface::queryFeeFunc, this,pve)));}
}void HaiNanEtcInterface::checkTime()
{if (m_EtcThreadPool.size() > 1){ m_EtcThreadPool.schedule(boost::threadpool::prio_task_func(LowPriority, boost::bind(&HaiNanEtcInterface::checkTimeFunc, this)));}
}void HaiNanEtcInterface::stopServer()
{if (m_bStarted){//清除pending的task 在等待所有active和pending的task结束m_EtcThreadPool.clear();m_EtcThreadPool.wait();}
}