C++11 线程池:轻量级高并发解决方案
线程池(Thread Pool)是一种线程管理的机制,它包含了多个预先创建的线程,用于执行多个任务,这些任务被放入任务队列中等待执行。
满足我们的生产者和消费者模型。
线程池的核心组成部分。
- 任务队列 -----按顺序等待要处理的任务。
- 线程数组----- 多个已启动的线程,从任务队列拿取任务处理。
- 互斥锁。
- 条件变量。
- 任务。
线程池的好处:
- 减少线程的创建和销毁次数,提高系统的性能和效率。因为我们每次创建和销毁线程都是有开销的。
- 活动的线程需要消耗系统资源,如果启动太多,会导致系统由于过度消耗内存或切换过度而导致系统资源不足。
- 通过重复利用已创建的线程 ,避免了频繁创建和销毁线程的性能开销。
基于C++11实现的线程池:感受C++11的魅力
当然实际项目中的线程池可能会更复杂。但是对于初学者 ,你能够了解到这里已经足够了。
ThreadPool.h 线程池必要的接口 ,和属性。
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <queue>
#include <functional>class ThreadPool {
public:// 创建线程池单例static ThreadPool* getIntance(int num) {static ThreadPool* threadPool = nullptr;// call_once() ,保证函数只能执行一次 ,只能在多线程里面使用std::call_once(flag_, init_threadPool, threadPool, num);return threadPool;}// 往线程池中添任务template <typename F , typename... Agrs>void push_task(F &&f , Agrs&&... agrs);private:static void init_threadPool(ThreadPool*& p, int num) {p = new ThreadPool(num);}ThreadPool(int num = 4); // 默认线程数是 4 ~ThreadPool();std::queue<std::function<void()>>task_queue_; // 任务队列std::vector<std::thread>threads; // 线程数组std::mutex mutex_; // 互斥锁std::condition_variable c_variable_; // 条件变量bool isStop_; // 线程池是否终止static std::once_flag flag_; // call_once() 需要的flag
};// 往任务队列添加任务
template <typename F, typename... Agrs>
void ThreadPool::push_task(F&& f, Agrs&&... agrs) {// 将函数和参数进行绑定// forward 实现完美转换std::function<void()>task = std::bind(std::forward<F>(f), std::forward<Agrs>(agrs)...);{std::unique_lock<std::mutex>lock(mutex_);// 加入任务队列task_queue_.emplace(task);}c_variable_.notify_one(); // 唤醒一个线程来执行
}#endif // !_THREAD_POOL_H
ThreadPool.cpp 相关接口的具体实现。
#include "ThreadPool.h"std::once_flag ThreadPool::flag_;ThreadPool::ThreadPool(int num):isStop_ ( false ) {for (int i = 0; i < num ; i++) {threads.emplace_back([this]( ) {while ( true ) {std::unique_lock<std::mutex>lock(mutex_);c_variable_.wait(lock, [ = ]() {return (!task_queue_.empty() || isStop_);});if ( isStop_ && task_queue_.empty() ) { // 如果被线程池终止了return;}std::cout << "线程池处理" << std::endl;// 从任务队列,拿出任务执行std::function<void( )>task(std::move(task_queue_.front()));task_queue_.pop();task();}});}
}ThreadPool::~ThreadPool( ) {{std::unique_lock<std::mutex>lock(mutex_);isStop_ = true;if (!task_queue_.empty()) c_variable_.notify_all(); // 唤醒所有线程去执行任务}for ( auto& a : threads ) {a.join();}
}
测试代码:
#include "ThreadPool.h"
#include <iostream>
#include <algorithm>
#include <chrono>void print(const char *name) {std::cout << "name :" << name << std::endl;
}int getMax(int a, int b) {return std::max(a, b);
}void sort(std::vector<int>*&value) {// 默认是升序std::sort(value->begin(), value->end()); //
}int main() {// 创建一个线程池 ,线程数为4ThreadPool *pool = ThreadPool::getIntance( 4 );std::vector<int>value;//for (int i = 1; i < 10; i++) {// pool->push_task( [i]() {// std::cout << "task runing " << i << std::endl;// std::this_thread::sleep_for(std::chrono::seconds(1)); // 休眠1秒// std::cout << "task stop : " << i << std::endl;// });//}for (int i = 10; i > 0; i--) {value.emplace_back(i);}// 将任务投入线程池,让线程来处理pool->push_task(print, "小美"); // 输出小美pool->push_task(sort, &value); // 升序排序std::this_thread::sleep_for(std::chrono::seconds(10)); // 休眠10秒for (int i = 0; i < value.size(); i++) {std::cout << value[i] << std::endl;}std::cout << "\n";return 0;
}
测试结果:
效果达到预期 ,当然你也可以多写几个测试案例。