最近看到一个线程池,写的实在太好,于是想深入理解一下。原始代码出处:GitHub - Ahajha/CTPL: Modern and efficient C++ Thread Pool Library
由于平时的工程一般只支持到C++11,而拿到的代码应该是在C++20下才能编译通过,因此也做了一些修改,需要原始码的可去github上自行下载。
先出测试代码:
#include "ThreadPool.h"void func4444()
{printf("[%s]thread_id[%s]:func4444begin--------\n", current_time().c_str(),get_curr_thread_id().c_str());::Sleep(2000);printf("[%s]thread_id[%s]:func4444end..--------\n", current_time().c_str(), get_curr_thread_id().c_str());}void func5555(int v)
{printf("[%s]thread_id[%s]:func5555begin----参数: %d----\n", current_time().c_str(), get_curr_thread_id().c_str(), v);::Sleep(3000);printf("[%s]thread_id[%s]:func5555end..----参数: %d----\n", current_time().c_str(), get_curr_thread_id().c_str(), v);}int main()
{printf("[%s]main_thread_id:[%s]\n", current_time().c_str(), get_curr_thread_id().c_str());ctpl::thread_pool p;int intValue = 42;p.add_func("func4444", func4444);p.add_func("func5555", func5555, intValue);while (true){::Sleep(500);}
}
测试结果如下:
可以看到,加入到线程池的过程是在主线程中进行的,实际运行的都是在工作线程中完成。
关键代码
1、加入到任务队列
template<typename F, typename... Args>void add_func(const char* name, F && f, Args&&... args){auto pck = std::make_shared<std::packaged_task<decltype(f(args...))()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));printf("[%s]thread_id[%s]:add_func: %s\n", current_time().c_str(), get_curr_thread_id().c_str(), name);this->push_func(std::make_unique<std::function<void()>>([pck, name] {if (strlen(name) > 0)printf("[%s]thread_id[%s]:sche_trace_func %s enter ...\n", current_time().c_str(), get_curr_thread_id().c_str(), name);(*pck)();if (strlen(name) > 0)printf("[%s]thraed_id[%s]:sche_trace_func %s exit .\n", current_time().c_str(), get_curr_thread_id().c_str(), name);}));}
pck是一个包装指针,(*pck)()
会执行被包装任务中的函数 f
,并传入参数 args...,因此(*pck)();实际就是执行f(args)
由于(*pck)()是在匿名函数,当执行this->push_func时,相当于把这个匿名函数存于func_tasks中,func_tasks是一个线程安全的队列。此时并未真正执行函数,只是暂存起来。注意,detail::atomic_queue<std::unique_ptr<std::function<void()>>> func_tasks;看到队列存储的函数类型是固定的,即无输入且无输出的函数类型,即我们这儿的匿名函数。
void thread_pool::push_func(std::unique_ptr<std::function<void()>> &task){this->func_tasks.push(std::move(task));this->signal_cv.notify_one();}
当通过push_func将匿名函数加入进队列func_tasks后,会给工作线程发送通知,激活空闲的工作线程以执行此匿名函数,并从此匿名函数中执行对应的函数f(args)
2、从任务队列中取任务并执行
void thread_pool::emplace_thread(){this->stop_flags.emplace_back(std::make_shared<std::atomic<bool>>(false));// The main loop for the thread. Grabs a copy of the pointer// to the stop flag.const auto stop_flag = this->stop_flags.back();this->threads.emplace_back([this, stop_flag]{printf("[%s]thread_id:[%s]\n", current_time().c_str(), get_curr_thread_id().c_str());std::atomic<bool>& stop = *stop_flag;// Used to store new tasks.std::unique_ptr<std::function<void()>> task;// True if 'task' currently has a runnable task in it.auto has_new_task = this->func_tasks.pop(task);while (true){// If there is a task to runwhile (has_new_task){// Run the task(*task)();// Delete the tasktask.reset();// The thread is wanted to stop, return even// if the queue is not empty yetif (stop){return;}// Get a new taskhas_new_task = this->func_tasks.pop(task);}// At this point the queue has run out of tasks, wait here for more.// Thread is now idle.// If all threads are idle, notify any waiting in wait().if (++this->_n_idle == this->size()){std::lock_guard<std::mutex> lock(this->waiter_mut);this->waiter.notify_all();}std::unique_lock<std::mutex> lock(this->signal_mut);// While the following evaluates to true, wait for a signal.this->signal_cv.wait(lock, [this, &task, &has_new_task, &stop](){// Try to get a new task. This will fail if the thread was// woken up for another reason (stopping or resizing), or// if another thread happened to grab the task before this// one got to it.has_new_task = this->func_tasks.pop(task);// If there is a new task or the thread is being told to stop,// stop waiting.return has_new_task || this->done || stop;});// Thread is no longer idle.--this->_n_idle;// if the queue is empty and it was able to stop waiting, then// that means the thread was told to stop, so stop.if (!has_new_task){return;}}});}
一般emplace_thread在主线程中被调用,通常调用一次就会多一个工作线程,this->threads.emplace_back([this, stop_flag]{}这一行代码完成了一个工作线程的开启,工作的内容即是大括号内的匿名函数。
这个函数内部有两层循环,最内部的循环总是判断是否有新的任务,有的话通过(*task)()执行之前包装的匿名函数,再由那个被包装的匿名函数调用真正需要执行的函数。执行完后取下一个新线程,队列中若没有新线程则跳出内层的while循环,在外层等待新任务的信号。
三、小结
前面两个是线程池最关键的代码段,有了这个基础,再顺着代码调试一下,基本上就清晰了。当然,这里面还有关于C++11的用法,模板相关的同样比较绕人,不过那些小知识点可以通过小的练习完成,不会对理解这个程序造成太大的困扰。
后面再找时间做一个扩展,看如何在这个基础上进行定时器的逻辑推演,其实原先拿到的程序是定时器相关的东西,只不过觉得那个太复杂,于是花了很大的精力将其去除,保留核心。