c++ 实现线程池,下面给出测试用例
mian.cpp
#include <iostream>
#include <thread>
#include <chrono>#include "threadpool.h"
#include "callback_proxy.h"using namespace std;
using namespace Demo;bool GetTimeImpl(int& time) {std::cout << std::endl;std::cout << std::this_thread::get_id() <<": 开始延时..." << std::endl;std::this_thread::sleep_for(std::chrono::seconds(5));std::cout << std::this_thread::get_id() <<": 延时结束..." << std::endl;time = 5201314;return true;
}typedef void(*msg_func)(int time, const void* user_data);
bool GetTimeImplAsync(msg_func callback, const void* user_data)
{int time = -1;bool result = GetTimeImpl(time);if (callback) {callback(time, user_data); // 触发回调}return result;
}typedef std::function<void(int)> FunCallback;
static void CallbackNotify(int time, const void* callback)
{CallbackProxy::DoSafeCallback<FunCallback>(callback, [=](const FunCallback& cb){CallbackProxy::Invoke(cb, time);});
}// 异步获取时间
bool GetTimeAsync(const FunCallback& callback)
{FunCallback* callback_impl = nullptr;if (callback) {callback_impl = new FunCallback(callback);}Threadpool* thread_pool = Threadpool::GetInstance();if (thread_pool != nullptr) {thread_pool->Commit(GetTimeImplAsync, &CallbackNotify, callback_impl);return true;}false;
}// 同步获取时间
bool GetTime(int &time)
{bool result = GetTimeImpl(time);return result;
}int main() {std::cout << "main start" << std::endl;int time;GetTime(time); // 同步获取时间std::cout << "tongbu time:" <<time << std::endl;std::cout << "*********" << std::endl;GetTimeAsync([](int time) { // 异步获取时间std::cout << "yibu time:" << time << std::endl;});std::cout << "main end" << std::endl << std::endl;getchar();return 0;
}
threadpool.h
/*************************************************
** Copyright: xxx公司
** Author: xxx
** Date: xxx
** Description: 线程池
**************************************************/
#ifndef THREADPOOL_H_
#define THREADPOOL_H_#include <vector>
#include <queue>
#include <thread>
#include <atomic>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>class Threadpool {
public:Threadpool(int size = 4) {stopped_ = false;idl_thread_number_ = 0;idl_thread_number_ = size < 1 ? 1 : size;for (int i = 0; i < idl_thread_number_; ++i) { //初始化线程数量pool_.emplace_back(std::thread(&Threadpool::Run, this));}}~Threadpool() {// 线程退出,释放资源stopped_.store(true);cv_task_.notify_all(); // 唤醒所有线程执行for (auto& item : pool_) {if (item.joinable())item.join();}}static Threadpool* GetInstance() {static Threadpool threadpool;return &threadpool;}public:// 提交任务加入队列// 可调用.get()等待任务执行完,获取返回值template<class F, class... Args>auto Commit(F&& f, Args&&... args)->std::future<decltype(f(args...))>{using RetType = decltype(f(args...));auto task = std::make_shared<std::packaged_task<RetType()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<RetType> future = task->get_future();if (stopped_.load()) return future;{// 添加任务到队列std::lock_guard<std::mutex> lock(lock_);tasks_.emplace([task](){(*task)();});}cv_task_.notify_one(); // 唤醒一个线程执行return future;}// 空闲线程数量inline int idl_thread_number() { return idl_thread_number_; }protected:void Run() {while (!this->stopped_) {std::function<void()> task;{std::unique_lock<std::mutex> lock{ this->lock_ };this->cv_task_.wait(lock,[this] {return this->stopped_.load() || !this->tasks_.empty();}); // wait 直到有 taskif (this->stopped_ && this->tasks_.empty())return;task = std::move(this->tasks_.front());this->tasks_.pop();}idl_thread_number_--;task();idl_thread_number_++;}}private:using Task = std::function<void()>;std::vector<std::thread> pool_; // 线程池std::queue<Task> tasks_; // 任务队列std::mutex lock_; // 同步std::condition_variable cv_task_; // 条件阻塞std::atomic<bool> stopped_; // 是否关闭提交std::atomic<int> idl_thread_number_; // 空闲线程数量
};#endif // THREADPOOL_H_
callback_proxy.h
/*************************************************
** Copyright: xxx信息
** Author: xxx
** Date: xxx
** Description: xxx
**************************************************/
#ifndef CALLBACK_PROXY_H_
#define CALLBACK_PROXY_H_#include <functional>
#include <memory>namespace Demo
{
class CallbackProxy
{template<typename TR = void>using CallbackProxyClosure = std::function<TR(void)>;public:template<typename TCallback, typename TDoCall>static auto DoSafeCallback(const void* callback, const TDoCall& closure, bool delete_callback = false)->decltype(closure((*(TCallback*)(callback)))){using TCallbackPtr = TCallback*;using TReturn = decltype(closure((*(TCallbackPtr)(callback))));struct Deleter {Deleter(TCallbackPtr cb_ptr) : cb_ptr_(cb_ptr) {}~Deleter() {if (cb_ptr_ != nullptr) {delete cb_ptr_;cb_ptr_ = nullptr;}}private: TCallbackPtr cb_ptr_;};if (callback != nullptr) {auto&& real_type_cb_ptr = (TCallbackPtr)(callback);Deleter deleter(delete_callback ? real_type_cb_ptr : nullptr);return (*real_type_cb_ptr == nullptr) ? (TReturn()) : (closure(std::forward<TCallback>(*real_type_cb_ptr)));}return TReturn();}template<class F, class... Args, class = typename std::enable_if<!std::is_member_function_pointer<F>::value>::type>static auto Invoke(F && f, Args && ... args)->decltype(f(std::forward<Args>(args)...)){using TReturn = decltype(f(std::forward<Args>(args)...));return Run<TReturn>(std::bind(f, std::forward<Args>(args)...));}template<class R, class C, class... DArgs, class P, class... Args>static R Invoke(R(C::*f)(DArgs...) const, P && p, Args && ... args){using TReturn = R;return Run<TReturn>(std::bind(f, p, std::forward<Args>(args)...));}template<class R, class C, class... DArgs, class P, class... Args>static R Invoke(R(C::*f)(DArgs...), P && p, Args && ... args){using TReturn = R;return Run<TReturn>(std::bind(f, p, std::forward<Args>(args)...));}protected:template<typename TR>static TR Run(const CallbackProxyClosure<TR>& task){return task();}//template<>static void Run(const CallbackProxyClosure<void>& task){if (docallback_async_ != nullptr) {docallback_async_(task);} else {task();}}private:static std::function<void(const std::function< void()>&)> docallback_async_;
};
}#endif // CALLBACK_PROXY_H_