一步一步写线程之九线程池任务的窃取

一、介绍

在实际的工作安排中,如果有一个比较大的工作,又可以细分的,诸如有一天一万个萝卜要洗这样的工作。假如做为一个工作的分配者,怎么处理这种需求?可能每个人都会想,先看看一个人一天洗多少萝卜,然后一除大概就可以算出来一天完成这个工作量需要多少人。
可以把一个人当成一个线程,假设十个人一天可以完成这项工作,那么就可以使用十个线程来完成类似的工作。但事情总有特殊情况,比如有的人天生麻利,可能半天就完成了。那么此时怎么安排这个人来工作呢?对工作分配者来说,很简单,余下的人哪个剩余工作量最多,就让完成的这个人去帮他做。依次类推,滚雪球一样就把工作完成了。当然,这里不考虑人的情绪问题,只是单纯的从工作完成角度来考虑。
其实把洗萝卜改成多种,即面对的清洗工作内容有洗萝卜、洗白菜、洗土豆等等,出现有人先完成的可能性会急剧增加。
而计算机的世界里,其实和上面的说明类似。只不过,计算机世界里可能更平等,不用考虑诸如为啥我干完了本职工作还要帮别人干这种感情的问题。

二、并行环境

早期的PC是单核的,也就是说,其本质是不提供并发和并行操作的。所谓的上层并发,就是一种虚拟的现象。可随着芯片技术的发展,CPU从单核到多核,从一个到多个,甚至从另外一层角度看,分布式也算是一种并行技术。
那任务的分配就必然会产生冷暖不均。怎么解决这类问题呢?在现实世界中,一般这种可并行量化的情况,都是按件计量,也就是说干得越多收入越多。但计算机世界没有感情,就得需要设计者从数学的角度来设计相关的算法来实现。并行的算法有很多种,有兴趣可以查找相关资料。此处仍然针对不同任务分配的情况,最基础的方式就是使用任务再分配的方式,也就是任务偷窃。

三、任务偷窃

习惯上,人们是愿意通过一个管理者去重新分配剩余的工作量。但在计算机世界中,如果使用这种方式,在大的环境中(分布式)也是常见的,但在一般的多线程开发中,无形中会增加工作量和引入不必要的代码。其实可以逆向思维,如果一个线程把自己的任务都执行完成后,是不是可以去看看别的线程是不是还有任务,如果其它线程剩余的任务数量超过一个阈值,就去它的任务队列拿一个过来执行,这个过程就是任务的偷窃。
一般来说,如果任务量不大,做任务偷窃几乎没有意义,反而增加复杂度。但在一些反复执行任务的线程池中,特别是会动态增加或随机插入任务的的场景下,任务的窃取就非常重要了。

四、例程

下面看一个例子:

//ThreadWorker.cpp
#include "ThreadWorker.h"
#include "ThreadPool.h"
#include <iostream>thread_local int ThreadWorker::testValue = 10;
ThreadWorker::ThreadWorker() {}void ThreadWorker::InitThread(bool initStatus, CallBackMsg cb) {this->cbm_ = cb;this->pWorkerThread_ = std::make_shared<std::thread>(&ThreadWorker::Run, this);if (nullptr != this->pWorkerThread_) {this->curThreadId_ = this->pWorkerThread_->get_id();}this->pThreadCon_ = std::make_shared<ThreadCondition>();
}
void ThreadWorker::Start() {}
void ThreadWorker::SetSignal() { this->pThreadCon_->Signal(); }
void ThreadWorker::Run() {int data[10] = {0};auto tPool = ThreadPool::Get();while (!status_) {this->runStatus_ = false;this->pThreadCon_->Wait(); // Handling false wake-upbool bRet = true;while (bRet) {std::cerr << "cur thread id is:" << this->curThreadId_ << std::endl;// ThreadPool::Get()->Wait();this->runStatus_ = true;std::cerr << "cur run thread id is:" << this->curThreadId_ << std::endl;Task task;bool ret = tPool->GetPrivateTask(this->curThreadId_, task);if (!ret) {ret = tPool->GetPubTask(task);std::cerr << "GetPubTask:" << this->curThreadId_ << std::endl;if (!ret) {auto [r, t] = tPool->GetStealTask();std::cerr << "GetStealTask:" << this->curThreadId_ << std::endl;if (r) {std::cerr << "Run--------------GetStealTask:" << this->curThreadId_ << std::endl;cbm_(data, 3);} else {bRet = false;continue;}}}if (ret) {task(++testValue);data[0] = testValue++;data[1] = testValue++;data[2] = testValue++;cbm_(data, 3);} else {std::this_thread::sleep_for(std::chrono::milliseconds(100));if (this->cbm_ != nullptr) {cbm_(data, 0);}}}std::this_thread::yield();}
}void ThreadWorker::Join() {if (this->pWorkerThread_ != nullptr && this->pWorkerThread_->joinable()) {this->pWorkerThread_->join();}
}void ThreadWorker::Quit() {this->status_ = true;// trigger conditionalthis->pThreadCon_->Signal();
}
bool ThreadWorker::getCurRunStatus() { return this->runStatus_; }//ThreadPool.cpp
#include "ThreadPool.h"
#include "ThreadWorker.h"
#include <iostream>ThreadPool::ThreadPool() {}
ThreadPool::~ThreadPool() { this->Destory(); }
void ThreadPool::InitThreadPool(int threadCount, bool initThreadStatus, CallBackMsg cb) {this->funcCallBack_ = cb;if (threadCount > this->maxThreadCount_) {threadCount = this->maxThreadCount_;}this->threadCount_ = threadCount;this->initThreadStatus_ = initThreadStatus;for (int num = 0; num < threadCount; num++) {auto workerThread = std::make_shared<ThreadWorker>();workerThread->InitThread(initThreadStatus, cb);this->pVecThreadWorker_.emplace_back(workerThread);// create task dequethis->taskDeques_.emplace(workerThread->GetCurThreadID().value(), std::make_unique<TaskDeque<Task>>());}this->pThreadCon_ = std::make_shared<ThreadCondition>();this->GetThreadID();
}void ThreadPool::AddTask(Task t) {this->taskQueue_.Push(t);this->SetSignal();std::cerr << "add task and signal" << std::endl;
}
void ThreadPool::Destory() {for (auto &au : this->pVecThreadWorker_) {au->Join();}
}
std::tuple<bool, Task> ThreadPool::GetTask() { return this->taskQueue_.PopFront(); }
std::shared_ptr<ThreadPool> ThreadPool::Get() {static auto threadPool = std::make_shared<ThreadPool>();return threadPool;
}void ThreadPool::Wait() { this->pThreadCon_->Wait(); }
void ThreadPool::SetSignal() { this->pThreadCon_->Signal(); }
// Wake up specified thread
void ThreadPool::SetSignal(std::thread::id threadid) {for (auto &pWorker : this->pVecThreadWorker_) {if (pWorker->GetCurThreadID() == threadid) {pWorker->Start();}}
}
void ThreadPool::SetMaxThreadCount(int maxCount) { this->maxThreadCount_ = maxCount; }
bool ThreadPool::getThreadRunStatus(std::thread::id id) {for (auto &worker : this->pVecThreadWorker_) {if (id == worker->GetCurThreadID()) {return worker->getCurRunStatus();}}return false;
}
std::vector<std::thread::id> ThreadPool::GetThreadID() {for (auto &worker : this->pVecThreadWorker_) {this->idVec_.emplace_back(worker->GetCurThreadID().value());}return this->idVec_;
}// deque
void ThreadPool::AddTaskDeque(Task t) {static int count = 0;std::cerr << "AddTaskDeque,thread count:" << this->idVec_.size() << std::endl;if (this->idVec_.size() < 1) {return;}std::cerr << "AddTaskDeque,push task!count is:" << count << std::endl;if (count < 37) {this->taskDeques_[this->idVec_[this->curThreadIndex++]]->pushFront(t);this->curThreadIndex = this->curThreadIndex % this->idVec_.size();count++;} else {this->taskQueue_.Push(t);count++;}// this->SetSignal();for (auto &th : this->pVecThreadWorker_) {th->SetSignal();}std::cerr << "set signal" << std::endl;
}
bool ThreadPool::GetPrivateTask(std::thread::id tid, Task &t) {if (this->taskDeques_.count(tid) > 0) {return this->taskDeques_[tid]->popFront(t);}return false;
}bool ThreadPool::GetPubTask(Task &t) {auto [ret, d] = this->taskQueue_.PopFront();if (ret) {t = d;}return ret;
}
std::tuple<bool, Task> ThreadPool::GetStealTask(/*std::thread::id tid*/) {for (std::thread::id &id : this->idVec_) {std::cerr << "--------------------other thread task count:" << this->taskDeques_[id]->size() << std::endl;if (this->taskDeques_[id]->size() > 0) {return this->taskDeques_[id]->popBack();}}return {false, Task{}};
}//main.cpp
#include <iostream>#include "TaskBucket.h"
#include "ThreadPool.h"
#include "common.h"
#include <chrono>
#include <iostream>
#include <list>
#include <random>unsigned long getRand() {static std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count() /std::chrono::system_clock::period::den);return dre();
}
int getRangeRand() {static std::default_random_engine d;static std::uniform_int_distribution<unsigned> u(1, 9);return u(d);
}
void DisplayResult(int *buf, int size) {if (size < 1) {std::cerr << "not task run!" << std::endl;return;}std::cout << "run result:" << std::endl;for (int num = 0; num < size; num++) {std::cout << "first data:" << buf[num] << std::endl;}
}void AppTaskFast(int d) {std::cout << "enter thread function AppTaskFast,d is:" << d << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(1000));if (d > 0) {std::cout << "cur value is:" << d << std::endl;return;}std::cout << "err:nothing to do!" << std::endl;
}
void AppTask(int d) {std::cout << "enter thread function AppTask,d is:" << d << std::endl;std::cout << "random value:" << getRangeRand() << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(getRangeRand() * 1000));if (d > 0) {std::cout << "cur value is:" << d << std::endl;return;}std::cout << "err:nothing to do!" << std::endl;
}void AppTaskGlobal(int dd) {std::cout << "enter thread function AppTaskGlobal,d is:" << dd << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(1000));if (dd > 0) {std::cout << "AppTaskGlobal:cur value is:" << dd << std::endl;return;}std::cout << "AppTaskGlobal:err:nothing to do!" << std::endl;
}int main() {auto pTp = ThreadPool::Get();pTp->InitThreadPool(6, false, DisplayResult);std::this_thread::sleep_for(std::chrono::milliseconds(1000));std::cerr << "start assign task!" << std::endl;for (int num = 0; num < 48; num++) {if (num < 43) {pTp->AddTaskDeque(AppTask);} else {pTp->AddTaskDeque(AppTaskGlobal);}}char a;std::cin >> a;
}

执行后可能的结果:

在这里插入图片描述

需要说明的是,不同的环境执行的结果可能是不同的,在测试中发现,即使是同样的机器,反复执行的结果也不尽相同。大家可以适当的修改一下各个任务的延时情况,来模拟执行任务的长短,可能会有更好的体会。
程序主要是设计了三类任务(普通任务、快速任务和全局任务)和两种任务队列(线程私有任务队列和公共任务队列),在线程执行完成自己的任务后去公共队列取任务,如果公共队列没有了则去偷窃别的线程的任务。三类任务通过设置不同的延时来模拟实际的执行任务的情况。实际只选取了两个,在AddTaskDeque中,动态的修改全局和私有队列的数量(可以根据自己的机器来动态修改那些写死的数字),从而达到模拟任务完成参差不齐的情况来实现任务的偷窃。
程序中还有很多需要优化的地方,比如可以监听一个信号,然后使用全部触发模拟会更简单。此处只是为了使用原来的代码,所以就使用了循环唤醒。任务队列写成三层判断也是为了让大家明白任务执行的流程,其实实际开发时可以优化成一个语句,重点是明白如何进行线程窃取即可。

五、总结

对绝大多数软件人员而言,其实很多技术拼到最后不是拼设计者的聪明才智,拼的是经验。特别是对于工程类的开发更是如此。而软件开发领域,基本以现有的成熟的技术为主,这点就更突出。随着软件的规模越大,经验的要求越高。很多开发者可能终其一生都遇不到所谓的千万并发,那希望他设计一个支持千万并发的服务,就是一个不可能的任务,其它情况亦是如此。
所以开发者们勿需气馁,把基础打好,认真学习,找机会就上。不断总结经验得失,技术就会越来越磨炼得精粹。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/980.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大数据真题讲解系列——拼多多数据分析面试题

拼多多数据分析面试题&#xff1a;连续3次为球队得分的球员名单 问题&#xff1a; 两支篮球队进行了激烈的比赛&#xff0c;比分交替上升。比赛结束后&#xff0c;你有一个两队分数的明细表&#xff08;名称为“分数表”&#xff09;。表中记录了球队、球员号码、球员姓名、得…

CSS基础:盒子模型详解

你好&#xff0c;我是云桃桃。 一个希望帮助更多朋友快速入门 WEB 前端的程序媛。 云桃桃&#xff0c;大专生&#xff0c;一枚程序媛&#xff0c;感谢关注。回复 “前端基础题”&#xff0c;可免费获得前端基础 100 题汇总&#xff0c;回复 “前端工具”&#xff0c;可获取 We…

Liunx挂载硬件设备

一、mount命令&#xff08;用于挂载文件系统&#xff09; &#xff08;一&#xff09;语法格式&#xff1a;mount 参数 源设备路径 目的路径 &#xff08;二&#xff09;参数 1、-t&#xff1a;指定挂载的文件系统 &#xff08;1&#xff09;iso9660&#xff1a;光盘或光盘…

微服务架构中的业务可监控设计

目录 监控指标的定义与收集 监控指标的定义 监控数据的收集 业务的可监控设计技术解决方案 建立统一的监控平台 实施智能告警系统 分布式追踪与可视化 自动化测试和性能基准 安全监控与防御 可视化与告警系统 可视化系统的应用 实时数据展示 历史数据分析 多维度…

【nvm最新解决方案】Node.js v16.20.2 is not yet released or available

【nvm最新解决方案】Node.js v16.20.2 is not yet released or available 解决办法&#xff1a;下载想安装的node压缩包&#xff0c;放入nvm对应目录。 2024年最新node压缩包地址&#xff1a;https://nodejs.org/dist/ 1、选择对应的node版本&#xff1a;例如&#xff0c;我选的…

乡政府管理系统|基于Springboot的乡政府管理系统设计与实现(源码+数据库+文档)

乡政府管理系统目录 目录 基于Springboot的乡政府管理系统设计与实现 一、前言 二、系统功能设计 三、系统实现 1、用户信息管理 2、活动信息管理 3、新闻类型管理 4、新闻动态管理 四、数据库设计 1、实体ER图 五、核心代码 六、论文参考 七、最新计算机毕设选题推…

FPGA中按键程序设计示例

本文中使用Zynq 7000系列中的xc7z035ffg676-2器件的100MHz PL侧的外部差分时钟来检测外部按键是否按下&#xff0c;当按键被按下时&#xff0c;对应的灯会被点亮。当松开按键时&#xff0c;对应的灯会熄灭。 1、编写代码 新建工程&#xff0c;选用xc7z035ffg676-2器件。 点击…

ansible执行mysql脚本

目录 概述实践环境要求ansible yml脚本命令离线包 概述 ansible执行mysql脚本 实践 官网文档 环境要求 环境需要安装以下内容: 1.mysql客户端(安装了mysql即会有)2.安装MySQL-python (Python 2.X) 详细插件安装链接 ansible yml脚本 关键代码如下&#xff1a; # 剧本…

vscode设置conda默认python环境,简单有效

本地conda 可能安装了各种环境&#xff0c;默认的vscode总是base环境&#xff0c;这时你想要在vscode调试python代码&#xff0c;使用默认的环境没有安装对应的包就会遇到报错解决这个问题的方法很简单ctrlshiftp 调出命令面板 再输入 select interpreter , 选择 python 选择解…

设计模式——2_9 模版方法(Template Method)

人们往往把任性也叫做自由&#xff0c;但是任性只是非理性的自由&#xff0c;人性的选择和自决都不是出于意志的理性&#xff0c;而是出于偶然的动机以及这种动机对感性外在世界的依赖 ——黑格尔 文章目录 定义图纸一个例子&#xff1a;从文件中获取信息分几步&#xff1f;Rea…

为什么用CubeMX配置STM32H7主频只能配到200,但实际配到400没报错,超过400报错,其他深色也要把前边的分频器向小调?

原因&#xff1a; STM32CUBEMX配置STM32H750时钟480M时失败_stm32h750 时钟配置_小李干净又卫生的博客-CSDN博客 STM32CUBEMX默认设置的是VOS1&#xff0c;是不能支持480M运行的&#xff0c;只能400 但还不清楚为什么这里没有更多选项Scale &#xff1f;

BRC20铭文铭刻解析

BRC20铭文铭刻的出现对于智能制造无疑是一个重要的里程碑。随着科技的飞速发展&#xff0c;智能制造已经成为制造业发展的必然趋势&#xff01;智能制造是指通过运用人工智能、物联网、大数据等先进技术&#xff0c;实现生产过程的自动化、智能化和高效化。 1. BRC20铭文的概念…

栈的应用--括号匹配问题

括号匹配问题:给一个字符串,其中包括小括号、中括号、大括号.求该字符串中的括号是否匹配? 例如: ()()[]{} --> 匹配 ([{()}]) --> 匹配 []{ --> 不匹配 [(]] --> 不匹配 这里可以借助栈的思想来做: 1.遍历字符串:从左边开始…

阿里巴巴Java规约p3c-pmd与maven集成,实现maven package打包前校验

前言 网上复制来&#xff0c;复制去&#xff0c;都不知道原因&#xff0c;瞎搞。故写了此篇文档&#xff1b; 以下代码&#xff0c;直接复制到pom.xml中就好使。 需求 阿里巴巴Java规约p3c-pmd与maven集成&#xff0c;实现maven package打包前校验&#xff0c;如果校验不通过…

9. Vue Router4 过渡动效

Vue Router 4 提供了强大的动态过渡动效功能&#xff0c;可以让你的页面在路由切换时有流畅的过渡效果。这主要通过 Vue 的 <transition> 组件实现。 基本使用 首先&#xff0c;我们需要在路由组件外部包裹一个 <transition> 组件。然后&#xff0c;给 <trans…

【Git】从零开始的 Git 基本操作

文章目录 1. 创建 Git 本地仓库2. 配置 Git3. 认识工作区、暂存区、版本库3.1 添加文件 | 场景一3.2 查看 .git 文件3.3 添加文件 | 场景二 4. 修改文件5. 版本回退6. 撤销修改6.1 情况一&#xff1a;对于工作区的代码&#xff0c;还没有 add6.2 情况二&#xff1a;已经 add&am…

实在IDP文档审阅产品导引

实在IDP文档审阅&#xff1a;智能文档处理的革新者 一、引言 在数字化转型的浪潮中&#xff0c;文档处理的智能化成为企业提效的关键。实在智能科技有限公司推出的实在IDP文档审阅&#xff0c;是一款利用AI技术快速理解、处理文档的智能平台&#xff0c;旨在为企业打造专属的…

MyBatis-知识点详解

本文将详细地介绍MyBatis框架&#xff0c;从其优缺点、适用场景到工作原理&#xff0c;全面解析MyBatis的懒加载、延迟加载机制&#xff0c;以及如何管理实体与数据库字段的映射差异。同时&#xff0c;探讨动态SQL、缓存策略、事务处理&#xff0c;并比较MyBatis与Hibernate的不…

Qt 6子窗口全屏显示

一、全屏显示效果 二、全屏相关函数 1,全屏显示函数 QWidget::showFullScreen(); // 此方法只对顶级窗口有效&#xff0c;对子窗口无效 2&#xff0c;恢复显示函数 QWidget::showNormal(); // 此方法也只对顶级窗口有效&#xff0c;对子窗口无效 3&#xff0c;最小化显示函…

go语言并发实战——日志收集系统(五) 基于go-ini包读取日志收集服务的配置文件

实现日志收集服务的客户端 前言 从这篇文章开始我们就正式进入了日志收集系统的编写&#xff0c;后面几篇文章我们将学习到如何编写日志收集服务的客户端,话不多说,让我们进入今天的内容吧&#xff01; 需要实现的功能 我们要收集指定目录下的日志文件&#xff0c;将它们发…