最近在学习音视频的时候,解封装和解码的时候用到了多线程。于是把线程池的知识补了一下。
线程池的这个知识点有会涉及到锁,生产者消费者设计模式,纯虚函数继承等知识。在学习的时候可以根据知识点扩展延伸。
楼主在学习线程池这部分的时候没有更进一步深入。比如负载均衡,贪心算法,退火算法等这些都没有去研究。一来是楼主目前的数学基础还很孱弱,二来还用不到高并发的场景。
代码部分也为楼主网上学习,欢迎大家在评论区交流讨论。
这里的思路是先初始化线程池内线程,在Start函数中启动线程池的Run函数,在线程池的run函数内通过while循环让线程不是在工作就是在等待工作的路上。
因为在Start函数中new了很多个线程,所以这里引入了deque容器对其维护,因为使用了容器,所以使用锁对这个容器维护,因为使用了锁,不妨借用栈的特性加类的构造函数和析构函数的特性。所以这里使用了 unique_lock<mutex> unique(mux);
本着测试先行的原则,完成这部分就可以测试Start函数了。
线程部分解决了,接下来是任务部分。留出接口AddTask() 给外部,GetTask()给工作的线程获取线程。这里考虑到FIFO的调度,选用了deque容器来维护外部添加的Task。添加任务和获取任务通过信号量 condition_variable来实现。
代码如下
ThreadPool.h部分
#pragma once#include <atomic> //原子变量,不用设置互斥锁
#include <thread>
#include <deque> //考虑到后面的FIFO调度,这里使用这个容器
#include <mutex>
#include <condition_variable>class Task {
public:virtual void Run() = 0; //纯虚函数,自定义函数继承,将需要多线程的操作放在这个Run函数中实现
};class ThreadPool
{
public:void initThread(int _nums);void Start();void Stop();void AddTask(Task* _task);Task* GetTask();private:void Run();std::atomic<int> threadNums; //头文件中不使用命名空间std::deque<std::thread*> threadVec; std::deque<Task*> taskVec; std::mutex mux;std::atomic<bool> is_exit = false; //退出标志位std::condition_variable cv; //信号量};
ThreadPool.cpp部分
#include "ThreadPool.h"
#include <iostream>using namespace std;void ThreadPool::initThread(int _nums)
{threadNums = _nums;
}void ThreadPool::Start()
{if (threadNums <= 0){cerr << "Please Init threadNums" << endl; //相对于cout ,cerr不占用缓存return;}//因为对容器进行了操作,防止串台,加一个锁unique_lock<mutex> unique(mux); //相当于实例化一个对象,通过栈里面对象会自动销毁,//在构造和析构函数中完成了加锁出作用域后自动解锁 if (!threadVec.empty()) //因为有容器维护,所以这里判断是否为空来判断是否多次start{cerr << "has been Inited it" << endl;return;}for (int i = 0; i < threadNums; i++){thread* th = new thread(&ThreadPool::Run, this); //库函数thread的构造函数,这里传入的是类成员函数,有细节//创建好线程,使用容器维护threadVec.push_back(th);}
}//退出线程池函数
void ThreadPool::Stop()
{//首先暂停while循环条件is_exit = true;//通知正在等待任务的线程cv.notify_all();//等待还在运行的线程结束for (auto th : threadVec){th->join(); //等待线程运行结束delete th;}//清理维护线程池的容器unique_lock<mutex> unique(mux);threadVec.clear();}void ThreadPool::AddTask(Task* _task)
{unique_lock<mutex> unique(mux);taskVec.push_back(_task);unique.unlock();cv.notify_one();
}Task* ThreadPool::GetTask()
{unique_lock<mutex> unique(mux);if (taskVec.empty()){cv.wait(unique);}if (is_exit){return nullptr;}Task* _task = taskVec.front();taskVec.pop_front(); //FIFOreturn _task;
}void ThreadPool::Run()
{//线程池中每个线程在退出前都一直在等待这个循环while (!is_exit){Task* task = GetTask();task->Run();}}
main.cpp (测试代码部分)
#include <iostream>
#include "ThreadPool.h"using namespace std;class myTask :public Task
{
public:void Run() override{cout << "hello world" << endl;this_thread::sleep_for(1s); //模拟任务耗时}};int main()
{ThreadPool pool;pool.initThread(1);pool.Start();myTask a_task;myTask b_task;pool.AddTask(&a_task);pool.AddTask(&b_task);this_thread::sleep_for(3s); //这里不等待不会执行任务,直接退出。还请评论区大佬指点一二pool.Stop();getchar(); //阻塞观察线程运行现象return 0;
}