//// Created by LENOVO on 2021/10/25.//#include"TaskPool.h"#include<functional>std::mutex printMutex;TaskPool::TaskPool():m_bRunning(false){}TaskPool::~TaskPool(){removeAllTasks();}voidTaskPool::init(int threadNum/*= 5*/){if(threadNum <=0)threadNum =5;m_bRunning =true;for(int i =0; i < threadNum; i++){std::shared_ptr<std::thread> spThread;// 这个bind含义是啥spThread.reset(new std::thread(std::bind(&TaskPool::threadFunc,this)));{std::lock_guard<std::mutex>guard(printMutex);std::cout <<"Init a thread, id: "<< spThread->get_id()<< std::endl;}m_threads.push_back(spThread);}}voidTaskPool::stop(){m_bRunning =false;// 通知所有线程,不运行了m_cv.notify_all();// 等待所有线程退出for(auto& it : m_threads){if(it->joinable())it->join();}}voidTaskPool::addTask(Task *task){std::shared_ptr<Task> spTask;spTask.reset(task);{// 将task加入队列是互斥的std::lock_guard<std::mutex>guard(m_mutexList);m_taskList.push_back(spTask);}{std::lock_guard<std::mutex>guard(printMutex);std::cout <<"add a Task, id: "<< spTask->getID()<<", thread id is: "<< std::this_thread::get_id()<< std::endl;}// 通知一个线程的threadFuncm_cv.notify_one();}voidTaskPool::removeAllTasks(){{// 需要先互斥地将指向taskList的ptr重置,然后将整个taskList资源cleadstd::lock_guard<std::mutex>guard(m_mutexList);for(auto& it : m_taskList){it.reset();}m_taskList.clear();}}voidTaskPool::threadFunc(){std::shared_ptr<Task> spTask;while(true){// 获取task的过程是互斥的{// 减少锁的粒度std::unique_lock<std::mutex>guard(m_mutexList);// 如果任务队列为空,那么就一直wait,等待线程被唤醒加入到队列中while(m_taskList.empty()){if(!m_bRunning)break;// 如果获得了互斥锁,但是条件不满足// wait()会释放锁,挂起当前线程// 条件变量发生变化的时候,wait()将环型挂起的线程并获得锁m_cv.wait(guard);}if(!m_bRunning)break;// 获取队头taskspTask = m_taskList.front();m_taskList.pop_front();}// 如果队列为空,则重新进行尝试获取if(spTask ==NULL)continue;// 否则,执行task任务spTask->doIt();// 完成之后,将指针重置spTask.reset();}std::lock_guard<std::mutex>guard(printMutex);std::cout <<"exit thread , threadID:"<< std::this_thread::get_id()<< std::endl;}
TaskPool.h
//// Created by LENOVO on 2021/10/25.//#ifndefUNTITLED_TASKPOOL_H#defineUNTITLED_TASKPOOL_H#include<thread>#include<mutex>#include<condition_variable>#include<list>#include<vector>#include<memory>#include<iostream>extern std::mutex printMutex;classTask{private:unsignedint id;public:Task(unsignedint ID){id = ID;}virtualvoiddoIt(){std::lock_guard<std::mutex>guard(printMutex);std::cout <<"handle a task ,TaskID is: "<< id <<", thradID is:"<< std::this_thread::get_id()<< std::endl;}virtual~Task(){std::lock_guard<std::mutex>guard(printMutex);std::cout <<"a task destructed , TaskID is: "<< id <<", thradID is:"<< std::this_thread::get_id()<< std::endl;}unsignedintgetID(){return id;}};classTaskPool final{public:TaskPool();~TaskPool();TaskPool(const TaskPool& rhs)=delete;TaskPool&operator=(const TaskPool& rhs)=delete;public:// 初始化线程voidinit(int threadNum =5);// 通知所有线程结束运行,并等待所有线程运行结束voidstop();voidaddTask(Task* task);voidremoveAllTasks();private:voidthreadFunc();private:std::list<std::shared_ptr<Task>> m_taskList;std::mutex m_mutexList;std::condition_variable m_cv;bool m_bRunning;std::vector<std::shared_ptr<std::thread>> m_threads;};#endif//UNTITLED_TASKPOOL_H
main.cpp
#include<chrono>#include"TaskPool.h"intmain(){TaskPool threadPool;threadPool.init();Task* task =NULL;for(int i =0; i <10; i++){task =newTask(i);threadPool.addTask(task);}std::this_thread::sleep_for(std::chrono::seconds(5));threadPool.stop();return0;}
https://blog.csdn.net/weibo1230123/article/details/79978745
https://blog.csdn.net/weixin_42157432/article/details/115560824
在linux socket网络编程中,大规模并发TCP或UDP连接时,经常会用到端口复用:
int opt 1;
if (setsockopt…