Linux系统编程系列之线程池

 Linux系统编程系列(16篇管饱,吃货都投降了!)

        1、Linux系统编程系列之进程基础

        2、Linux系统编程系列之进程间通信(IPC)-信号

        3、Linux系统编程系列之进程间通信(IPC)-管道

        4、Linux系统编程系列之进程间通信-IPC对象

        5、Linux系统编程系列之进程间通信-消息队列

        6、Linux系统编程系列之进程间通信-共享内存

        7、Linux系统编程系列之进程间通信-信号量组

        8、Linux系统编程系列之守护进程

        9、Linux系统编程系列之线程

        10、Linux系统编程系列之线程属性 

        11、Linux系统编程系列之互斥锁和读写锁

        12、Linux系统编程系列之线程的信号处理

        13、Linux系统编程系列之POSIX信号量

        14、Linux系统编程系列之条件变量

        15、Linux系统编程系列之死锁

        16、 Linux系统编程系列之线程池

一、什么是线程池

        线程池就是将许多线程,放置在一个池子中(实际就是一个结构体)只要有任务,就将任务投入池中,这些线程们通过某些机制,及时处理这些任务,一旦处理完后又重新回到池子中重新接收新任务。为了便于管理,线程池还应当提供诸如初始化线程池,增删线程数量,检测未完成任务的数据,检测正在执行任务的线程的数量、销毁线程池等等基本操作。

二、特性

        1、更好的满足并发性需求

        2、更加节省系统资源,降低负载

        3、避免每次需要执行任务时都创建新的线程

        4、可以限制并发线程数量,帮助控制应用程序的性能和稳定性

        核心思想:通过精巧的设计使得池子中的线程数量可以动态地发生变化,让线程既可以应对并发性需求,又不会浪费系统资源

三、使用场景

        用来应对某种场景,要在程序中创建大量线程,并且这些线程的数量和生命周期均不确定,可能方生方死,也可能常驻内存,请看下面举例。

        1、处理大量密集型任务

        线程池可以处理多个任务,从而减少了创建和销毁线程的开销,提高了系统性能

        2、服务端应用程序

        线程池可以帮助服务端应用程序同时处理多个客户端请求

        3、I/O 密集型应用程序

        线程池可以处理 I/O 操作,允许系统使用空闲时间进行其他任务处理

        4、Web 应用程序

        线程池可以处理来自 Web 客户端的请求,从而提高了 Web 应用程序的性能

        总之,任何需要处理大量任务或需要同时处理多个请求的应用程序都可以使用线程池来提高性能和效率。

四、设计思想

        设计某个东西一定要有个目标,这里是要求设计一个线程池,首先要明白线程池的作用,然后根据其作用来展开设计。线程池是通过让线程的数量进行动态的变化来及时处理接受的任务,所以核心就是线程和任务。可以将问题转换为如何组织线程和组织任务?借鉴别人的一幅图

         1、如何组织线程

                从上图可以看出,线程被创建出来之后,都处于睡眠态,它们实际上是进入了条件变量的等待队列中,而任务都被放入一个链表,被互斥锁保护起来

                线程的生命周期如下:(这里省略程序执行的流程图)

                (1)、线程被创建

                (2)、预防死锁,准备好退出处理函数,防止在持有一把锁的状态中死去

                (3)、尝试持有互斥锁(等待任务)

                (4)、判断是否有任务,若无则进入条件变量等待队列睡眠,若有则进入第(5)步

                (5)、从任务链表中取得一个任务

                (6)、释放互斥锁

                (7)、弹出退出处理函数,避免占用内存

                (8)、执行任务

                (9)、执行任务完成后重新回到第(2)步

                线程的生命周期其实就是线程要做的事情,把上面的第(2)步到第(8)步写成一个线程的例程函数。

         2、如何组织任务

                所谓的任务就是一个函数,回想一下创建一条线程时,是不是要指定一个线程例程函数。这里的线程池做法是将函数(准确的说是函数指针)及其参数存入一个任务节点,并将节点链接成一个链表。所以现在的问题变成如何设计一个任务链表?

                一个链表的操作有:创建节点,增加节点,删除节点,查询节点这几步。由于这里是任务节点,就不需要查询了,当然也可以查询。所以现在的问题转换为任务节点如何设计?

                任务节点的数据域主要有函数指针和函数参数,指针域就是一个简单任务结点指针,也可以是双向循环链表。

                设计分析到这里,基本上有了框架了,前面买了这么多关子,下面就来个重头戏。其实前面主要是想描述一种设计思想,遇到新的东西如何设计它?首先要明白设计的目的和作用,然后找出核心点,最后就是把核心点进行不断的分析推理,一步步转换为最基本的问题。学过项目管理的同学应该听过在范围管理里有个叫工作分解结构的东西,这里跟那个差不多。

五、案例

        实现一个线程池,并完成线程池的基本操作演示

        thread_pool.h(线程池头文件)

#ifndef _THREAD_POOL_H
#define _THREAD_POOL_H#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>#define MAX_WAITING_TASKS	1000    // 最大的等待任务数量
#define MAX_ACTIVE_THREADS	20      // 最多的活跃线程数量// 任务节点
typedef struct task 
{void *(*do_task) (void *arg);   // 函数指针void *arg;      // 函数参数struct task *next;  // 指向下一个任务节点
}task;// 线程池的管理节点
typedef struct thread_pool
{pthread_mutex_t task_list_lock;   // 任务列表互斥锁,保证同一个时间只有一条线程进行访问pthread_cond_t  task_list_cond;   // 任务列表条件变量,保证没有任务时,线程进入睡眠task *task_list;    // 任务列表pthread_t *tids;     // 存放线程号的数组int shutdown;  // 线程池销毁的开关unsigned int max_waiting_tasks;  // 最大等待的任务数量unsigned int waiting_tasks;      // 正在等待被处理的任务数量unsigned int active_threads;     // 正在活跃的线程数}thread_pool;// 初始化线程池
int init_pool(thread_pool *pool, unsigned int threads_number);// 往线程池中添加任务
int add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg);// 往线程池中添加线程
int add_thread(thread_pool *pool, unsigned int additional_threads);// 从线程池中删除线程
int remove_thread(thread_pool *pool, unsigned int removing_threads);// 查询线程池中线程数量
int total_thread(thread_pool *pool);// 销毁线程池
int destroy_pool(thread_pool *pool);// 线程的例程函数
void *routine(void *arg);#endif // _THREAD_POOL_H

        thread_pool.c(线程池实现文件) 

#include "thread_pool.h"// 初始化线程池
int init_pool(thread_pool *pool, unsigned int threads_number)
{if(threads_number < 1){printf("warning: threads_number must bigger than 0\n");return -1;}if(threads_number > MAX_ACTIVE_THREADS){printf("warning: threads number is bigger than MAX_ACTIVE_THREADS(%u)\n", MAX_ACTIVE_THREADS);return -1;}pthread_mutex_init(&pool->task_list_lock, NULL); // 初始化互斥锁pthread_cond_init(&pool->task_list_cond, NULL);  // 初始化条件变量pool->task_list = calloc(1, sizeof(task));  // 申请一个任务头节点if(!pool->task_list){printf("error: task_list calloc fail\n");return -1;}pool->task_list->next = NULL;   // 让任务头节点指向NULLpool->tids = calloc(MAX_ACTIVE_THREADS, sizeof(pthread_t)); // 申请堆数组用来存放线程号if(!pool->tids){printf("error: tids calloc fail\n");return -1;}pool->shutdown = 0; // 关闭线程池销毁开发pool->waiting_tasks = 0;    // 当前等待任务为0pool->max_waiting_tasks = MAX_WAITING_TASKS;    // 初始化最大等待任务数量pool->active_threads = threads_number;  // 初始化活跃的线程数// 根据活跃的线程数,创建对应数量的线程for(int i = 0; i < pool->active_threads; i++){// 需要判断线程是否创建失败// 线程号用线程号数组,线程属性设置为默认的,例程函数是routine,函数参数是线程池errno = pthread_create(&pool->tids[i], NULL, routine,(void*)pool);if(errno != 0){perror("error: pthread_create fail");return -1;}}return 1;
}// 往线程池中添加任务
int add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg)
{if(!pool){printf("warning: thread_pool is null\n");return -1;}// 如果当前等待执行的任务数超过最大等待的任务数量,则退出if(pool->waiting_tasks >= pool->max_waiting_tasks){printf("warning: task_list is full, too many list\n");return -1;}// 尝试给新的任务节点申请空间task *new_task = (task*)malloc(sizeof(task));if(!new_task){printf("error: task malloc fail\n");return -1;}// 初始化任务节点new_task->do_task = do_task;new_task->arg = arg;new_task->next = NULL;// 把任务节点添加到任务列表最后面// 1、先找到任务列表末尾task *tmp = NULL;for(tmp = pool->task_list; tmp->next; tmp = tmp->next);// 2、上锁pthread_mutex_lock(&pool->task_list_lock);// 3、添加新任务到任务列表末尾,同时当前等待任务数+1tmp->next = new_task;pool->waiting_tasks++;// 4、解锁pthread_mutex_unlock(&pool->task_list_lock);// 唤醒一个正在条件变量中睡眠等待的线程,取执行任务pthread_cond_signal(&pool->task_list_cond);return 1;
}// 往线程池中添加线程,返回实际添加的线程数量
int add_thread(thread_pool *pool, unsigned int additional_threads)
{if(!pool){printf("warning: thread_pool is null\n");return -1;}if(additional_threads == 0){return 0;}// 期望活跃的线程数 = 目前活跃的线程数 + 期望添加的线程数unsigned int total_threads = pool->active_threads + additional_threads;// 如果超过最大的活跃线程数就打印警告if(total_threads > MAX_ACTIVE_THREADS){printf("warning: add too many threads\n");}int actual_add_threads = 0;for(int i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++){// 需要判断线程是否创建失败// 线程号用线程号数组,线程属性设置为默认的,例程函数是routine,函数参数是线程池errno = pthread_create(&pool->tids[i], NULL, routine,(void*)pool);if(errno != 0){perror("error: pthread_create fail");// 如果一个都没有创建成功,就直接返回if(actual_add_threads == 0){return -1;}// 如果成功创建了多个线程,但是本次创建失败,就跳出循环break;}else{actual_add_threads++;   // 线程创建成功就+1}}// 更新线程池中活跃的线程数pool->active_threads += actual_add_threads;return actual_add_threads;  // 返回实际添加的线程
}// 从线程池中删除线程,返回实际删除线程的数量
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{if(!pool){printf("warning: thread_pool is null\n");return -1;}if(removing_threads == 0){return pool->active_threads;}// 如果要删除的线程数大于线程池中活跃的线程数,则打印警告if(removing_threads >= pool->active_threads){printf("warning: remove too many threads\n");}// 剩余数量 = 活跃数量 - 删除的目标数  int remaining_threads = pool->active_threads - removing_threads;//  目的是为了让线程池中最少保留有一个线程可以用于执行任务remaining_threads = remaining_threads > 0 ? remaining_threads : 1;int actual_remove_threads = 0;;// 循环取消线程直到等于期望线程数for(int i = pool->active_threads-1; i > remaining_threads-1; i--){errno = pthread_cancel(pool->tids[i]);if(errno != 0){printf("[%ld] cancel error: %s\n", pool->tids[i], strerror(errno));break;}else{actual_remove_threads++;}}// 更新线程池中活跃的线程数量pool->active_threads -= actual_remove_threads;return actual_remove_threads;   // 返回实际删除的线程
}// 查询线程池中线程数量
int total_thread(thread_pool *pool)
{return pool->active_threads;
}// 销毁线程池
int destroy_pool(thread_pool *pool)
{if(!pool){printf("warning: thread_pool is null\n");return -1;}pool->shutdown = 1; // 启动线程池销毁开关pthread_cond_broadcast(&pool->task_list_cond);    // 唤醒所有在线程池中的线程// 循环等待所有线程退出for(int i = 0; i < pool->active_threads; i++){errno = pthread_join(pool->tids[i], NULL);if(errno != 0){printf("join tids[%d] error: %s\n", i, strerror(errno));}else{printf("[%ld] is joined\n", pool->tids[i]);}}// 头删法,从头一个个的删除任务节点for(task *p = pool->task_list->next; p; p = pool->task_list){pool->task_list->next = p->next;    // 修改首元节点free(p);}free(pool->task_list);free(pool->tids);free(pool);return 1;
}// 线程的取消函数
void pthread_cancel_handler(void *arg)
{printf("[%ld] is cancel\n", pthread_self());pthread_mutex_unlock((pthread_mutex_t*)arg);    // 解锁
}// 线程的例程函数
void *routine(void *arg)
{task *task_p;   // 任务指针,用来执行将要执行的任务thread_pool *pool = (thread_pool*)arg;while(1){pthread_cleanup_push(pthread_cancel_handler, (void*)&pool->task_list_lock);// 尝试持有互斥锁pthread_mutex_lock(&pool->task_list_lock);// 判断是否有任务,没有则进入睡眠// 1、没有任务,线程池销毁开关断开,则进入条件变量等待队列while(!pool->waiting_tasks && !pool->shutdown){// 当条件不满足时,会先自动解锁pool->lock,然后等待到条件满足后,会自动上锁pool->lockpthread_cond_wait(&pool->task_list_cond, &pool->task_list_lock);}// 2、没有任务,线程池销毁开发闭合,则先解锁,然后退出if(!pool->waiting_tasks && pool->shutdown){pthread_mutex_unlock(&pool->task_list_lock);  // 需要解锁pthread_exit(NULL);}// 3、有任务,线程池销毁开关断开,则取一个任务task_p = pool->task_list->next;pool->task_list->next = task_p->next; // 弹出第一任务节点 pool->waiting_tasks--;  // 当前等待的任务数量-1,pthread_mutex_unlock(&pool->task_list_lock);  // 解锁// 弹出压入的线程取消函数,运行到这里不执行,但是当线程在前面被意外取消或中断会执行pthread_cleanup_pop(0); // 为了防止死锁,执行任务期间不接受线程取消请求pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);(task_p->do_task)(task_p->arg); // 通过函数指针的方式执行任务// 执行完任务后,接受线程取消请求pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);free(task_p);   // 删除任务节点}pthread_exit(NULL);
}

        main.c(线程池操作演示文件) 

// 线程池的测试案例#include <stdio.h>
#include <unistd.h>
#include "thread_pool.h"void *mytask(void *arg)
{	int n = rand()%10;printf("[%ld][%s] ==> job will be done in %d sec...\n", pthread_self(), __FUNCTION__, n);sleep(n);printf("[%ld][%s] ==> job done!\n", pthread_self(), __FUNCTION__);
}void *func(void *arg )
{printf("this is a test by [%s]\n", (char*)arg);sleep(1);printf("test finish...\n");
}void *count_time(void *arg)
{int i = 0;while(1){sleep(1);printf("sec: %d\n", ++i);}
}int main(void)
{pthread_t a;pthread_create(&a, NULL, count_time, NULL);// 1、初始化线程池thread_pool *pool = malloc(sizeof(thread_pool));init_pool(pool, 2);// 2、投放任务printf("throwing 3 tasks...\n");add_task(pool, mytask, NULL);add_task(pool, mytask, NULL);add_task(pool, mytask, NULL);// 3、查看当前线程池中的线程数量printf("current thread number: %d\n", total_thread(pool));sleep(9);// 4、再次投放任务printf("throwing another 2 tasks...\n");add_task(pool, mytask, NULL);add_task(pool, mytask, NULL);add_task(pool, func, (void *)"Great Macro");// 5、添加2条线程printf("try add 2 threads, actual add %d\n", add_thread(pool, 2));printf("current thread number: %d\n", total_thread(pool));sleep(5);// 6、删除3条线程printf("try remove 3 threads, actual remove: %d\n", remove_thread(pool, 3));printf("current thread number: %d\n", total_thread(pool));sleep(5);// 7、 销毁线程池printf("destroy thread pool\n");destroy_pool(pool);return 0;
}

        注:编译时,把线程池文件和测试文件放在同一个工作目录下。

        实际使用时,只需要把thread_pool.h和thread_pool.c拷贝到自己的工程目录下,然后根据规则操作线程池。

六、总结

        线程池是许多线程的集合,它不是线程组。线程池适用于任何需要处理大量任务或需要同时处理多个请求的应用程序的场景,可以提供性能和效率。

        至此,Linux系统编程系列,16篇完结撒花,历时5天,这年中秋国庆没有假放!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/95271.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PyTorch入门之【tensor】

目录 tensor的创建tensor的相关信息tensor的运算 tensor的创建 1.手动创建 import torch test1torch.tensor([1,2,3])#一维时为向量 test2torch.tensor([[1,2,3]])#二维时为矩阵 test3torch.tensor([[[1,2,3]]])#三维及以上统称为tensor print(test1) print(test2) print(tes…

【RP-RV1126】烧录固件使用记录

文章目录 烧录完整固件进入MASKROM模式固件烧录升级中&#xff1a;升级完成&#xff1a; 烧录部分进入Loader模式选择文件切换loader模式 烧录完整固件 完整固件就是update.img包含了所有的部件&#xff0c;烧录后可以直接运行。 全局编译&#xff1a;./build.sh all生成固件…

TCP端口崩溃,msg:socket(): Too many open files

一、现象 linux系统中运行了一个TCP服务器&#xff0c;该服务器监听的TCP端口为10000。但是长时间运行时发现该端口会崩溃&#xff0c;TCP客户端连接该端口会失败&#xff1a; 可以看到进行三次握手时&#xff0c;TCP客户端向该TCP服务器的10000端口发送了SYN报文&#xff0c;…

(二)正点原子STM32MP135移植——TF-A移植

目录 一、TF-A概述 二、编译官方代码 2.1 解压源码 2.2 打补丁 2.3 编译准备 &#xff08;1&#xff09;修改Makfile.sdk &#xff08;2&#xff09;设置环境变量 &#xff08;3&#xff09;编译 三、移植 3.1 复制官方文件 3.2 修改电源 3.3 修改TF卡和emmc 3.4 添…

【面试HOT100】哈希双指针滑动窗口

系列综述&#xff1a; &#x1f49e;目的&#xff1a;本系列是个人整理为了秋招面试的&#xff0c;整理期间苛求每个知识点&#xff0c;平衡理解简易度与深入程度。 &#x1f970;来源&#xff1a;材料主要源于LeetCodeHot100进行的&#xff0c;每个知识点的修正和深入主要参考…

【数据结构与算法】树、二叉树的概念及结构(详解)

前言: &#x1f4a5;&#x1f388;个人主页:​​​​​​Dream_Chaser&#xff5e; &#x1f388;&#x1f4a5; ✨✨专栏:http://t.csdn.cn/oXkBa ⛳⛳本篇内容:c语言数据结构--树以及二叉树的概念与结构 目录 一.树概念及结构 1.树的概念 1.1树与非树 树的特点&#xff1…

XXL-JOB源码梳理——一文理清XXL-JOB实现方案

分布式定时任务调度系统 流程分析 一个分布式定时任务&#xff0c;需要具备有以下几点功能&#xff1a; 核心功能&#xff1a;定时调度、任务管理、可观测日志高可用&#xff1a;集群、分片、失败处理高性能&#xff1a;分布式锁扩展功能&#xff1a;可视化运维、多语言、任…

【计算机网络笔记八】应用层(五)HTTPS

什么是 HTTPS HTTPS 解决了 HTTP 不安全的问题 HTTP 整个传输过程数据都是明文的&#xff0c;任何人都能够在链路中截获、修改或者伪造请求&#xff0f;响应报文&#xff0c;数据不具有可信性。 ① HTTPS 使用加密算法对报文进行加密&#xff0c;黑客截获了也看不懂 ② HTTP…

Play Beyond:Sui让优秀的游戏变得更好

自问世以来&#xff0c;视频游戏就紧随着文化产业发展。从Pong和Space Invaders的时代到Animal Crossing和Among Us&#xff0c;伟大的游戏总有能力吸引玩家&#xff0c;并推动娱乐产业发展。根据Grand View Research的数据&#xff0c;全球视频游戏市场在2022年估计为2170.6亿…

CUDA C编程权威指南:1.1-CUDA基础知识点梳理

主要整理了N多年前&#xff08;2013年&#xff09;学习CUDA的时候开始总结的知识点&#xff0c;好长时间不写CUDA代码了&#xff0c;现在LLM推理需要重新学习CUDA编程&#xff0c;看来出来混迟早要还的。 1.CUDA 解析&#xff1a;2007年&#xff0c;NVIDIA推出CUDA&#xff08…

Docker 日志管理 - ELK

Author&#xff1a;rab 目录 前言一、Docker 日志驱动二、ELK 套件部署三、Docker 容器日志采集3.1 部署 Filebeat3.2 配置 Filebeat3.3 验证采集数据3.4 Kibana 数据展示3.4.1 创建索引模式3.4.2 Kibana 查看日志 总结 前言 如何查看/管理 Docker 运行容器的日志&#xff1f;…

图像拼接后丢失数据,转tiff报错rasterfile failed: an unknown

图像拼接后丢失数据 不仅是数据丢失了&#xff0c;还有个未知原因报错 部分数据存在值不存在的情况 原因 处理遥感数据很容易&#xff0c;磁盘爆满了 解决方案 清理一些无用数据&#xff0c;准备买个2T的外接硬盘用着了。 然后重新做处理

redis高可用(主从复制,哨兵,集群)

目录 一、主从复制&#xff1a; 1.主从复制介绍&#xff1a; 2.主从复制的作用&#xff1a; 3.主从复制流程&#xff1a; 4.搭建Redis 主从复制&#xff1a; 4.1 环境准备&#xff1a; 4.2 安装redis&#xff1a; 4.3 master节点修改 Redis 配置文件&#xff1a; 4.4 slave节点…

Linux学习之悟空派上实现OLED的无线网IP及CPU温度显示【守护进程】

起因 最近各种网购平台似乎都在推送99元的悟空派全志H3的开发板&#xff0c;出于好奇就买了一块来试试水&#xff0c;由于这块板子基本上和orangepi-Zero的硬件结构一模一样&#xff0c;所以设备树、boot这些就用orangepi现成的部件了。 因为本人比较喜欢使用SSH操作&#xff…

C++——list(2)

作者&#xff1a;几冬雪来 时间&#xff1a;2023年9月28日 内容&#xff1a;C——list内容讲解 目录 前言&#xff1a; list的const迭代器&#xff1a; const的iterator&#xff1a; const迭代器&#xff1a; operator->: 拷贝构造&#xff1a; 迭代器接口补充&…

【LittleXi】【MIT6.S081-2020Fall】Lab: locks

【MIT6.S081-2020Fall】Lab: locks 【MIT6.S081-2020Fall】Lab: locks内存分配实验内存分配实验准备实验目的1. 举一个例子说明修改前的**kernel/kalloc.c**中如果没有锁会导致哪些进程间竞争(races)问题2. 说明修改前的kernel/kalloc.c中锁竞争contention问题及其后果3. 解释a…

Elasticsearch安装访问

Elasticsearch 是一个开源的、基于 Lucene 的分布式搜索和分析引擎&#xff0c;设计用于云计算环境中&#xff0c;能够实现实时的、可扩展的搜索、分析和探索全文和结构化数据。它具有高度的可扩展性&#xff0c;可以在短时间内搜索和分析大量数据。 Elasticsearch 不仅仅是一个…

【云笔记篇】Microsoft OneNote笔记插件推荐OneMore

【云笔记篇】Microsoft OneNote笔记插件推荐OneMore OneMore插件是一款非常强大&#xff0c;多达一百多个扩展功能的OneNote笔记插件&#xff0c;而且免费开源&#xff0c;不断更新的优秀插件—【蘇小沐】 1、实验 【OneMore官网&#xff1a;OneMore - a OneNote add-in (on…