线程池666666

1. 作用

线程池内部维护了多个工作线程,每个工作线程都会去任务队列中拿取任务并执行,当执行完一个任务后不是马上销毁,而是继续保留执行其它任务。显然,线程池提高了多线程的复用率,减少了创建和销毁线程的时间。

2. 实现原理

线程池内部由任务队列、工作线程和管理者线程组成。

任务队列:存储需要处理的任务。每个任务其实就是具体的函数,在任务队列中存储函数指针和对应的实参。当工作线程获取任务后,就能根据函数指针来调用指定的函数。其实现可以是数组、链表、STL容器等。

工作线程:有N个工作线程,每个工作线程会去任务队列中拿取任务,然后执行具体的任务。当任务被处理后,任务队列中就不再有该任务了。当任务队列中没有任务时,工作线程就会阻塞。

管理者线程:周期性检测忙碌的工作线程数量和任务数量。当任务较多线程不够用时,管理者线程就会多创建几个工作线程来加快处理(不会超过工作线程数量的上限)。当任务较少线程空闲多时,管理者线程就会销毁几个工作线程来减少内存占用(不会低于工作线程数量的下限)。

注意:线程池中没有维护“生产者线程”,所谓的“生产者线程”就是往任务队列中添加任务的线程。

3. 手撕线程池

参考来源:爱编程的大丙。

【1】threadpool.c:

#include "threadpool.h"
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>#define NUMBER	2	//管理者线程增加或减少的工作线程数量//任务结构体
typedef struct Task {void (*func)(void* arg);void* arg;
} Task;//线程池结构体
struct ThreadPool {//任务队列,视为环形队列Task* taskQ;int queueCapacity;	//队列容量int queueSize;		//当前任务个数int queueFront;		//队头 -> 取任务int queueRear;		//队尾 -> 加任务//线程相关pthread_t managerID;	//管理者线程IDpthread_t* threadIDs;	//工作线程IDint minNum;				//工作线程最小数量int maxNum;				//工作线程最大数量int busyNum;			//工作线程忙的数量int liveNum;			//工作线程存活数量int exitNum;			//要销毁的工作线程数量pthread_mutex_t mutexPool;	//锁整个线程池pthread_mutex_t mutexBusy;	//锁busyNumpthread_cond_t notFull;		//任务队列是否满pthread_cond_t notEmpty;	//任务队列是否空//线程池是否销毁int shutdown;		//释放为1,否则为0
};/**************************************************************** 函  数: threadPoolCreate* 功  能: 创建线程池并初始化* 参  数: min---工作线程的最小数量*         max---工作线程的最大数量*		   capacity---任务队列的最大容量* 返回值: 创建的线程池的地址**************************************************************/
ThreadPool* threadPoolCreate(int min, int max, int capacity)
{//申请线程池空间ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));do {//此处循环只是为了便于失败释放空间,只会执行一次if (pool == NULL) {printf("pool create error!\n");break;}//申请任务队列空间,并初始化pool->taskQ = (Task*)malloc(sizeof(Task) * capacity);if (pool->taskQ == NULL) {printf("Task create error!\n");break;}pool->queueCapacity = capacity;pool->queueSize = 0;pool->queueFront = 0;pool->queueRear = 0;//初始化互斥锁和条件变量if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||pthread_cond_init(&pool->notFull, NULL) != 0 ||pthread_cond_init(&pool->notEmpty, NULL) != 0){printf("mutex or cond create error!\n");break;}//初始化shutdownpool->shutdown = 0;//初始化线程相关参数pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);if (pool->threadIDs == NULL) {printf("threadIDs create error!\n");break;}memset(pool->threadIDs, 0, sizeof(pthread_t) * max);pool->minNum = min;pool->maxNum = max;pool->busyNum = 0;pool->liveNum = min;pool->exitNum = 0;//创建管理者线程和工作线程pthread_create(&pool->managerID, NULL, manager, pool);//创建管理线程for (int i = 0; i < min; ++i) {pthread_create(&pool->threadIDs[i], NULL, worker, pool);//创建工作线程}return pool;} while (0);//申请资源失败,释放已分配的资源if (pool && pool->taskQ) free(pool->taskQ);if (pool && pool->threadIDs) free(pool->threadIDs);if (pool) free(pool);return NULL;
}/**************************************************************** 函  数: threadPoolDestroy* 功  能: 销毁线程池* 参  数: pool---要销毁的线程池* 返回值: 0表示销毁成功,-1表示销毁失败**************************************************************/
int threadPoolDestroy(ThreadPool* pool)
{if (!pool) return -1;//关闭线程池pool->shutdown = 1;//阻塞回收管理者线程pthread_join(pool->managerID, NULL);//唤醒所有工作线程,让其自杀for (int i = 0; i < pool->liveNum; ++i) {pthread_cond_signal(&pool->notEmpty);}//释放所有互斥锁和条件变量pthread_mutex_destroy(&pool->mutexBusy);pthread_mutex_destroy(&pool->mutexPool);pthread_cond_destroy(&pool->notEmpty);pthread_cond_destroy(&pool->notFull);//释放堆空间if (pool->taskQ) {free(pool->taskQ);pool->taskQ = NULL;}if (pool->threadIDs) {free(pool->threadIDs);pool->threadIDs = NULL;}free(pool);pool = NULL;return 0;
}/**************************************************************** 函  数: threadPoolAdd* 功  能: 生产者往线程池的任务队列中添加任务* 参  数: pool---线程池*		   func---函数指针,要执行的任务地址*		   arg---func指向的函数的实参* 返回值: 无**************************************************************/
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{pthread_mutex_lock(&pool->mutexPool);//任务队列满,阻塞生产者while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {pthread_cond_wait(&pool->notFull, &pool->mutexPool);}//判断线程池是否关闭if (pool->shutdown) {pthread_mutex_unlock(&pool->mutexPool);return;}//添加任务进pool->taskQpool->taskQ[pool->queueRear].func = func;pool->taskQ[pool->queueRear].arg = arg;pool->queueSize++;pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;pthread_cond_signal(&pool->notEmpty);//唤醒工作线程pthread_mutex_unlock(&pool->mutexPool);
}/**************************************************************** 函  数: getThreadPoolBusyNum* 功  能: 获取线程池忙的工作线程数量* 参  数: pool---线程池* 返回值: 忙的工作线程数量**************************************************************/
int getThreadPoolBusyNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);return busyNum;
}/**************************************************************** 函  数: getThreadPoolAliveNum* 功  能: 获取线程池存活的工作线程数量* 参  数: pool---线程池* 返回值: 存活的工作线程数量**************************************************************/
int getThreadPoolAliveNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexPool);int liveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);return liveNum;
}/**************************************************************** 函  数: worker* 功  能: 工作线程的执行函数* 参  数: arg---实参传入,这里传入的是线程池* 返回值: 空指针**************************************************************/
void* worker(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (1) {/* 1.取出任务队列中的队头任务 */pthread_mutex_lock(&pool->mutexPool);//无任务就阻塞线程while (pool->queueSize == 0 && !pool->shutdown) {pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);//唤醒后,判断是不是要销毁线程if (pool->exitNum > 0) {//线程自杀pool->exitNum--;//销毁指标-1if (pool->liveNum > pool->minNum) {pool->liveNum--;//活着的工作线程-1pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}}}//线程池关闭了就退出线程if (pool->shutdown) {pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}//取出pool中taskQ的任务Task task;task.func = pool->taskQ[pool->queueFront].func;task.arg = pool->taskQ[pool->queueFront].arg;pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;//移动队头pool->queueSize--;//通知生产者添加任务pthread_cond_signal(&pool->notFull);pthread_mutex_unlock(&pool->mutexPool);/* 2.设置pool的busyNum+1 */pthread_mutex_lock(&pool->mutexBusy);pool->busyNum++;pthread_mutex_unlock(&pool->mutexBusy);/* 3.执行取出的任务 */printf("thread %ld start working ...\n", pthread_self());task.func(task.arg);free(task.arg);task.arg = NULL;printf("thread %ld end working ...\n", pthread_self());/* 4.设置pool的busyNum-1 */pthread_mutex_lock(&pool->mutexBusy);pool->busyNum--;pthread_mutex_unlock(&pool->mutexBusy);}return NULL;
}/**************************************************************** 函  数: manager* 功  能: 管理者线程的执行函数* 参  数: arg---实参传入,这里传入的是线程池* 返回值: 空指针**************************************************************/
void* manager(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (!pool->shutdown) {/* 每隔3秒检测一次 */sleep(3);/* 获取pool中相关变量 */pthread_mutex_lock(&pool->mutexPool);int taskNum = pool->queueSize;	//任务队列中的任务数量int liveNum = pool->liveNum;	//存活的工作线程数量int busyNum = pool->busyNum;	//忙碌的工作线程数量pthread_mutex_unlock(&pool->mutexPool);/* 功能一:增加工作线程,每次增加NUMBER个 *///当任务个数大于存活工作线程数,且存活工作线程数小于最大值if (taskNum > liveNum && liveNum < pool->maxNum) {pthread_mutex_lock(&pool->mutexPool);int counter = 0;for (int i = 0; i < pool->maxNum && counter < NUMBER&& pool->liveNum < pool->maxNum; ++i){if (pool->threadIDs[i] == 0) {pthread_create(&pool->threadIDs[i], NULL, worker, pool);counter++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);}/* 功能二:销毁工作线程,每次销毁NUMBER个 *///当忙的线程数*2 < 存活线程数,且存活线程数 > 最小线程数if (busyNum * 2 < liveNum && liveNum > pool->minNum) {pthread_mutex_lock(&pool->mutexPool);pool->exitNum = NUMBER;//唤醒NUMBER个工作线程,让其解除阻塞,在worker函数中自杀for (int i = 0; i < NUMBER; ++i) {pthread_cond_signal(&pool->notEmpty);}pthread_mutex_unlock(&pool->mutexPool);}}return NULL;
}/**************************************************************** 函  数: threadExit* 功  能: 工作线程退出函数,将工作线程的ID置为0,然后退出* 参  数: pool---线程池* 返回值: 无**************************************************************/
void threadExit(ThreadPool* pool)
{//将pool->threadIDs中的ID改为0pthread_t tid = pthread_self();for (int i = 0; i < pool->maxNum; i++) {if (pool->threadIDs[i] == tid) {pool->threadIDs[i] = 0;printf("threadExit() called, %ld exiting...\n", tid);break;}}pthread_exit(NULL);//退出
}

【2】threadpool.h:

#ifndef _THREADPOOL_H
#define _THREADPOOL_Htypedef struct ThreadPool ThreadPool;//创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int capacity);//销毁线程池
int threadPoolDestroy(ThreadPool* pool);//给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);//获取当前忙碌的工作线程的数量
int getThreadPoolBusyNum(ThreadPool* pool);//获取当前存活的工作线程的数量
int getThreadPoolAliveNum(ThreadPool* pool);/*********************其它函数**********************/
void* worker(void* arg);//工作线程的执行函数
void* manager(void* arg);//管理者线程的执行函数
void threadExit(ThreadPool* pool);//线程退出函数#endif

【3】main.c:

#include <stdio.h>
#include "threadpool.h"
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>//任务函数,所有线程都执行此任务
void testFunc(void* arg)
{int* num = (int*)arg;printf("thread %ld is working, number = %d\n", pthread_self(), *num);sleep(1);
}int main()
{//创建线程池: 最少3个工作线程,最多10个,任务队列容量为100ThreadPool* pool = threadPoolCreate(3, 10, 100);//加入100个任务于任务队列for (int i = 0; i < 100; ++i) {int* num = (int*)malloc(sizeof(int));*num = i + 100;threadPoolAdd(pool, testFunc, num);}//销毁线程池sleep(30);//保证任务全部运行完毕threadPoolDestroy(pool);return 0;
}

【4】运行结果:

......

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/39415.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu开通5005端口 记录

Ubuntu版本&#xff1a;20.04 使用systemctl status firewalld查看防火墙状态&#xff0c;报错Unit firewalld.service could not be found 报错的原因是没有安装firewall&#xff0c;安装命令为sudo apt install firewalld&#xff0c;然后进行安装 安装完成后输入systemctl…

vscode jupyter选择Python环境时找不到我安装的Python

在一些情况下&#xff0c;我们需要自己安装一个Python&#xff0c;在选择内核是可能找不到指定的Python版本&#xff0c; 再次打开内核选择页面就能看到Python环境了 注意先到指定环境下安装依赖包&#xff1a; ./python3 pip install ipykernel notebook jupyter

人工智能-NLP简单知识汇总01

人工智能-NLP简单知识汇总01 1.1自然语言处理的基本概念 自然语言处理难点&#xff1a; 语音歧义句子切分歧义词义歧义结构歧义代指歧义省略歧义语用歧义 总而言之&#xff1a;&#xff01;&#xff01;语言无处不歧义 1.2自然语言处理的基本范式 1.2.1基于规则的方法 通…

[DataWhale大模型应用开发]学习笔记1-尝试搭建向量数据库

1.词向量 1.定义 词向量&#xff08;Word Vector&#xff09;是将单词表示为向量形式的技术&#xff0c;是自然语言处理&#xff08;NLP&#xff09;中的一种常用方法。通过将单词转化为向量&#xff0c;计算机能够更好地理解和处理语言。简单来说&#xff0c;词向量就是将单…

Windows系统安装NVM,实现Node.js多版本管理

目录 一、前言 二、NVM简介 三、准备工作 1、卸载Node 2、创建文件夹 四、下载NVM 五、安装NVM 六、使用NVM 1、NVM常用操作命令 2、查看NVM版本信息 3、查看Node.js版本列表&#xff1b; 4、下载指定版本Node.js 5、使用指定版本Node.js 6、查看已安装Node.js列…

【区块链+基础设施】国家健康医疗大数据科创平台 | FISCO BCOS应用案例

在医疗领域&#xff0c;疾病数据合法合规共享是亟待解决的难题。一方面&#xff0c;当一家医院对患者实施治疗后&#xff0c;若患者转到其 他医院就医&#xff0c;该医院就无法判断诊疗手段是否有效。另一方面&#xff0c;医疗数据属于个人敏感数据&#xff0c;一旦被泄露或被恶…

一个能让渲染性能提高100倍的办法

GPU 光线追踪是当今的热门话题&#xff0c;所以让我们来谈谈它&#xff01;今天我们将光线追踪一个单个球体。 使用片段着色器。 是的&#xff0c;我知道。并不特别花哨。你可以在 Shadertoy 上搜索并获得数百个示例(https://www.shadertoy.com/results?querysphere)。甚至已…

自研直播系统-直播系统实战

文章目录 1 流媒体基础本文教程下载地址1.1 流媒体1.2 流式传输方式1.2.1 顺序流式传输1.2.2 实时流式传输 1.3 流媒体传输协议1.3.1 rtmp协议1.3.2 HLS协议1.3.3 RTSP协议1.3.4 视频流的对比 1.4 视频编码(codec)1.5 分辨率的规范分辨率簡介&#xff1a;1.5.2 分辨率單位 1.6 …

聊聊etsy平台,一个年入百万的项目

聊聊etsy平台&#xff0c;一个年入百万的项目 什么是etsy,这是怎样一个平台&#xff0c;怎样盈利的&#xff1f;相信现在大家满脑子都是这些疑问。 这个平台也是无意间一个学员提到的&#xff0c;据说他朋友靠这个平台年赚好几百万。苦于门槛太高&#xff0c;他也做不了。今天…

重磅发布|WAIC 2024最新活动日程安排完整发布!

WAIC 2024 将于 7 月在上海世博中心和世博展览馆举行&#xff0c;论坛时间为 7 月 4 日至 6 日&#xff0c;展览时间为 7 月 4 日至 7 日。会议涵盖 AI 伦理治理、大模型、具身智能、投融资、教育人才等重点话题&#xff0c;体现 AI 向善等价值导向&#xff0c;9 位大奖得主和 …

Inscription Alliance的Denim协议发行首个聚合跨链铭文BTIA,计划参与Mint注册量达15万

官方消息&#xff0c;由Inscription Alliance自主研发的创新性Denim协议发行首个聚合跨链铭文BTIA&#xff0c;并将于2024年7月19日公开Mint。Denim协议旨在解决当下铭文赛道流动性和互通性不足的痛点&#xff0c;基于该协议搭建的Denim Swap可以实现聚合各项协议和各条公链的彼…

数据结构常见图算法

深度优先搜索 时间复杂度 领接矩阵表示 O( n2) 领接表表示 O(n+e) 空间复杂度 O(e) DFS与回溯法类似,一条路径走到底后需要返回上一步,搜索第二条路径。在树的遍历中,首先一直访问到最深的节点,然后回溯到它的父节点,遍历另一条路径,直到遍历完所有节点…

荣耀大横评,睿蓝7-450荣耀版卷出来的性价比之王

手握11万左右预算,如何在市场内选出一辆合适自己的车?荣耀版车型无疑是当下的最佳答案。在众多荣耀版车型中,比亚迪宋PLUS荣耀版EV520km领先型(后统称宋PLUS荣耀版)、比亚迪元PLUS荣耀版430km领先型(后统称元PLUS荣耀版)、比亚迪海豚PLUS荣耀版420km时尚版(后统称海豚荣耀版)、…

【CSAPP】-binarybomb实验

目录 实验目的与要求 实验原理与内容 实验设备与软件环境 实验过程与结果&#xff08;可贴图&#xff09; 操作异常问题与解决方案 实验总结 实验目的与要求 1. 增强学生对于程序的机器级表示、汇编语言、调试器和逆向工程等方面原理与技能的掌握。 2. 掌握使用gdb调试器…

Python学习篇:PyCharm的基本使用教程(二)

目录 1 前言 2 创建Python项目 3 创建Python文件 4 编写 Hello World 并运行 5 PyCharm界面简介 1 前言 PyCharm的使用贯穿整个Python的学习&#xff0c;所以单独拿出来出教程不合适&#xff0c;说多了对于新手来说也还是不明白&#xff0c;这里我们先从学习开始前大家需…

【基础算法总结】分治—快排

分治—快排 1.分治2.颜色分类3.排序数组4.数组中的第K个最大元素5.库存管理 III 点赞&#x1f44d;&#x1f44d;收藏&#x1f31f;&#x1f31f;关注&#x1f496;&#x1f496; 你的支持是对我最大的鼓励&#xff0c;我们一起努力吧!&#x1f603;&#x1f603; 1.分治 分治…

搜狐新闻HarmonyOS版本 push 推送开发

背景 搜狐新闻作为HarmonyOS的合作伙伴&#xff0c;于2023年12月成功上架鸿蒙单框架应用市场&#xff0c;成为首批鸿蒙应用矩阵的一员。 新闻类推送作为应用的重要组成部分&#xff0c;在二期规划中&#xff0c;我们将推送功能列为核心功能模块。本文将推送集成过程中的步骤和…

JAVA妇产科专科电子病历系统源码,前端框架:Vue,ElementUI

JAVA妇产科专科电子病历系统源码&#xff0c;前端框架&#xff1a;Vue&#xff0c;ElementUI孕产妇健康管理信息管理系统是一种将孕产妇健康管理信息进行集中管理和存储的系统。通过建立该系统&#xff0c;有助于提高孕产妇健康管理的效率和质量&#xff0c;减少医疗事故发生的…

新华三通用大模型算力底座方案:为AI时代注入强大动力

在人工智能技术日新月异的今天&#xff0c;大模型作为推动AI进步的重要驱动力&#xff0c;是百行百业不断追逐的热点。大模型以其强大的泛化能力、卓越的模型效果和广泛的应用场景&#xff0c;正改变着人工智能的未来。作为国内领先的ICT解决方案提供商&#xff0c;新华三集团凭…

Linux kfence使用与实现原理

0 背景 为了更好的检测linux kernel中内存out-of-bounds、mem-corruption、use-after-free、invaild-free等问题&#xff0c;调研了kfence功能&#xff08;该功能在linux kernel 5.12引入&#xff09;&#xff0c;帮助研发更好的分析与定位这类内存错误的问题。 一、kfence介…