一、介绍
在实际的工作安排中,如果有一个比较大的工作,又可以细分的,诸如有一天一万个萝卜要洗这样的工作。假如做为一个工作的分配者,怎么处理这种需求?可能每个人都会想,先看看一个人一天洗多少萝卜,然后一除大概就可以算出来一天完成这个工作量需要多少人。
可以把一个人当成一个线程,假设十个人一天可以完成这项工作,那么就可以使用十个线程来完成类似的工作。但事情总有特殊情况,比如有的人天生麻利,可能半天就完成了。那么此时怎么安排这个人来工作呢?对工作分配者来说,很简单,余下的人哪个剩余工作量最多,就让完成的这个人去帮他做。依次类推,滚雪球一样就把工作完成了。当然,这里不考虑人的情绪问题,只是单纯的从工作完成角度来考虑。
其实把洗萝卜改成多种,即面对的清洗工作内容有洗萝卜、洗白菜、洗土豆等等,出现有人先完成的可能性会急剧增加。
而计算机的世界里,其实和上面的说明类似。只不过,计算机世界里可能更平等,不用考虑诸如为啥我干完了本职工作还要帮别人干这种感情的问题。
二、并行环境
早期的PC是单核的,也就是说,其本质是不提供并发和并行操作的。所谓的上层并发,就是一种虚拟的现象。可随着芯片技术的发展,CPU从单核到多核,从一个到多个,甚至从另外一层角度看,分布式也算是一种并行技术。
那任务的分配就必然会产生冷暖不均。怎么解决这类问题呢?在现实世界中,一般这种可并行量化的情况,都是按件计量,也就是说干得越多收入越多。但计算机世界没有感情,就得需要设计者从数学的角度来设计相关的算法来实现。并行的算法有很多种,有兴趣可以查找相关资料。此处仍然针对不同任务分配的情况,最基础的方式就是使用任务再分配的方式,也就是任务偷窃。
三、任务偷窃
习惯上,人们是愿意通过一个管理者去重新分配剩余的工作量。但在计算机世界中,如果使用这种方式,在大的环境中(分布式)也是常见的,但在一般的多线程开发中,无形中会增加工作量和引入不必要的代码。其实可以逆向思维,如果一个线程把自己的任务都执行完成后,是不是可以去看看别的线程是不是还有任务,如果其它线程剩余的任务数量超过一个阈值,就去它的任务队列拿一个过来执行,这个过程就是任务的偷窃。
一般来说,如果任务量不大,做任务偷窃几乎没有意义,反而增加复杂度。但在一些反复执行任务的线程池中,特别是会动态增加或随机插入任务的的场景下,任务的窃取就非常重要了。
四、例程
下面看一个例子:
//ThreadWorker.cpp
#include "ThreadWorker.h"
#include "ThreadPool.h"
#include <iostream>thread_local int ThreadWorker::testValue = 10;
ThreadWorker::ThreadWorker() {}void ThreadWorker::InitThread(bool initStatus, CallBackMsg cb) {this->cbm_ = cb;this->pWorkerThread_ = std::make_shared<std::thread>(&ThreadWorker::Run, this);if (nullptr != this->pWorkerThread_) {this->curThreadId_ = this->pWorkerThread_->get_id();}this->pThreadCon_ = std::make_shared<ThreadCondition>();
}
void ThreadWorker::Start() {}
void ThreadWorker::SetSignal() { this->pThreadCon_->Signal(); }
void ThreadWorker::Run() {int data[10] = {0};auto tPool = ThreadPool::Get();while (!status_) {this->runStatus_ = false;this->pThreadCon_->Wait(); // Handling false wake-upbool bRet = true;while (bRet) {std::cerr << "cur thread id is:" << this->curThreadId_ << std::endl;// ThreadPool::Get()->Wait();this->runStatus_ = true;std::cerr << "cur run thread id is:" << this->curThreadId_ << std::endl;Task task;bool ret = tPool->GetPrivateTask(this->curThreadId_, task);if (!ret) {ret = tPool->GetPubTask(task);std::cerr << "GetPubTask:" << this->curThreadId_ << std::endl;if (!ret) {auto [r, t] = tPool->GetStealTask();std::cerr << "GetStealTask:" << this->curThreadId_ << std::endl;if (r) {std::cerr << "Run--------------GetStealTask:" << this->curThreadId_ << std::endl;cbm_(data, 3);} else {bRet = false;continue;}}}if (ret) {task(++testValue);data[0] = testValue++;data[1] = testValue++;data[2] = testValue++;cbm_(data, 3);} else {std::this_thread::sleep_for(std::chrono::milliseconds(100));if (this->cbm_ != nullptr) {cbm_(data, 0);}}}std::this_thread::yield();}
}void ThreadWorker::Join() {if (this->pWorkerThread_ != nullptr && this->pWorkerThread_->joinable()) {this->pWorkerThread_->join();}
}void ThreadWorker::Quit() {this->status_ = true;// trigger conditionalthis->pThreadCon_->Signal();
}
bool ThreadWorker::getCurRunStatus() { return this->runStatus_; }//ThreadPool.cpp
#include "ThreadPool.h"
#include "ThreadWorker.h"
#include <iostream>ThreadPool::ThreadPool() {}
ThreadPool::~ThreadPool() { this->Destory(); }
void ThreadPool::InitThreadPool(int threadCount, bool initThreadStatus, CallBackMsg cb) {this->funcCallBack_ = cb;if (threadCount > this->maxThreadCount_) {threadCount = this->maxThreadCount_;}this->threadCount_ = threadCount;this->initThreadStatus_ = initThreadStatus;for (int num = 0; num < threadCount; num++) {auto workerThread = std::make_shared<ThreadWorker>();workerThread->InitThread(initThreadStatus, cb);this->pVecThreadWorker_.emplace_back(workerThread);// create task dequethis->taskDeques_.emplace(workerThread->GetCurThreadID().value(), std::make_unique<TaskDeque<Task>>());}this->pThreadCon_ = std::make_shared<ThreadCondition>();this->GetThreadID();
}void ThreadPool::AddTask(Task t) {this->taskQueue_.Push(t);this->SetSignal();std::cerr << "add task and signal" << std::endl;
}
void ThreadPool::Destory() {for (auto &au : this->pVecThreadWorker_) {au->Join();}
}
std::tuple<bool, Task> ThreadPool::GetTask() { return this->taskQueue_.PopFront(); }
std::shared_ptr<ThreadPool> ThreadPool::Get() {static auto threadPool = std::make_shared<ThreadPool>();return threadPool;
}void ThreadPool::Wait() { this->pThreadCon_->Wait(); }
void ThreadPool::SetSignal() { this->pThreadCon_->Signal(); }
// Wake up specified thread
void ThreadPool::SetSignal(std::thread::id threadid) {for (auto &pWorker : this->pVecThreadWorker_) {if (pWorker->GetCurThreadID() == threadid) {pWorker->Start();}}
}
void ThreadPool::SetMaxThreadCount(int maxCount) { this->maxThreadCount_ = maxCount; }
bool ThreadPool::getThreadRunStatus(std::thread::id id) {for (auto &worker : this->pVecThreadWorker_) {if (id == worker->GetCurThreadID()) {return worker->getCurRunStatus();}}return false;
}
std::vector<std::thread::id> ThreadPool::GetThreadID() {for (auto &worker : this->pVecThreadWorker_) {this->idVec_.emplace_back(worker->GetCurThreadID().value());}return this->idVec_;
}// deque
void ThreadPool::AddTaskDeque(Task t) {static int count = 0;std::cerr << "AddTaskDeque,thread count:" << this->idVec_.size() << std::endl;if (this->idVec_.size() < 1) {return;}std::cerr << "AddTaskDeque,push task!count is:" << count << std::endl;if (count < 37) {this->taskDeques_[this->idVec_[this->curThreadIndex++]]->pushFront(t);this->curThreadIndex = this->curThreadIndex % this->idVec_.size();count++;} else {this->taskQueue_.Push(t);count++;}// this->SetSignal();for (auto &th : this->pVecThreadWorker_) {th->SetSignal();}std::cerr << "set signal" << std::endl;
}
bool ThreadPool::GetPrivateTask(std::thread::id tid, Task &t) {if (this->taskDeques_.count(tid) > 0) {return this->taskDeques_[tid]->popFront(t);}return false;
}bool ThreadPool::GetPubTask(Task &t) {auto [ret, d] = this->taskQueue_.PopFront();if (ret) {t = d;}return ret;
}
std::tuple<bool, Task> ThreadPool::GetStealTask(/*std::thread::id tid*/) {for (std::thread::id &id : this->idVec_) {std::cerr << "--------------------other thread task count:" << this->taskDeques_[id]->size() << std::endl;if (this->taskDeques_[id]->size() > 0) {return this->taskDeques_[id]->popBack();}}return {false, Task{}};
}//main.cpp
#include <iostream>#include "TaskBucket.h"
#include "ThreadPool.h"
#include "common.h"
#include <chrono>
#include <iostream>
#include <list>
#include <random>unsigned long getRand() {static std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count() /std::chrono::system_clock::period::den);return dre();
}
int getRangeRand() {static std::default_random_engine d;static std::uniform_int_distribution<unsigned> u(1, 9);return u(d);
}
void DisplayResult(int *buf, int size) {if (size < 1) {std::cerr << "not task run!" << std::endl;return;}std::cout << "run result:" << std::endl;for (int num = 0; num < size; num++) {std::cout << "first data:" << buf[num] << std::endl;}
}void AppTaskFast(int d) {std::cout << "enter thread function AppTaskFast,d is:" << d << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(1000));if (d > 0) {std::cout << "cur value is:" << d << std::endl;return;}std::cout << "err:nothing to do!" << std::endl;
}
void AppTask(int d) {std::cout << "enter thread function AppTask,d is:" << d << std::endl;std::cout << "random value:" << getRangeRand() << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(getRangeRand() * 1000));if (d > 0) {std::cout << "cur value is:" << d << std::endl;return;}std::cout << "err:nothing to do!" << std::endl;
}void AppTaskGlobal(int dd) {std::cout << "enter thread function AppTaskGlobal,d is:" << dd << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(1000));if (dd > 0) {std::cout << "AppTaskGlobal:cur value is:" << dd << std::endl;return;}std::cout << "AppTaskGlobal:err:nothing to do!" << std::endl;
}int main() {auto pTp = ThreadPool::Get();pTp->InitThreadPool(6, false, DisplayResult);std::this_thread::sleep_for(std::chrono::milliseconds(1000));std::cerr << "start assign task!" << std::endl;for (int num = 0; num < 48; num++) {if (num < 43) {pTp->AddTaskDeque(AppTask);} else {pTp->AddTaskDeque(AppTaskGlobal);}}char a;std::cin >> a;
}
执行后可能的结果:
需要说明的是,不同的环境执行的结果可能是不同的,在测试中发现,即使是同样的机器,反复执行的结果也不尽相同。大家可以适当的修改一下各个任务的延时情况,来模拟执行任务的长短,可能会有更好的体会。
程序主要是设计了三类任务(普通任务、快速任务和全局任务)和两种任务队列(线程私有任务队列和公共任务队列),在线程执行完成自己的任务后去公共队列取任务,如果公共队列没有了则去偷窃别的线程的任务。三类任务通过设置不同的延时来模拟实际的执行任务的情况。实际只选取了两个,在AddTaskDeque中,动态的修改全局和私有队列的数量(可以根据自己的机器来动态修改那些写死的数字),从而达到模拟任务完成参差不齐的情况来实现任务的偷窃。
程序中还有很多需要优化的地方,比如可以监听一个信号,然后使用全部触发模拟会更简单。此处只是为了使用原来的代码,所以就使用了循环唤醒。任务队列写成三层判断也是为了让大家明白任务执行的流程,其实实际开发时可以优化成一个语句,重点是明白如何进行线程窃取即可。
五、总结
对绝大多数软件人员而言,其实很多技术拼到最后不是拼设计者的聪明才智,拼的是经验。特别是对于工程类的开发更是如此。而软件开发领域,基本以现有的成熟的技术为主,这点就更突出。随着软件的规模越大,经验的要求越高。很多开发者可能终其一生都遇不到所谓的千万并发,那希望他设计一个支持千万并发的服务,就是一个不可能的任务,其它情况亦是如此。
所以开发者们勿需气馁,把基础打好,认真学习,找机会就上。不断总结经验得失,技术就会越来越磨炼得精粹。