【线程系列之五】线程池介绍C语言

一、基本概念

1.1 概念

线程池(Thread Pool)是一种基于池化技术管理线程的机制,旨在减少线程创建和销毁的开销,提高系统资源的利用率,以及更好地控制系统中同时运行的线程数量。线程池通过预先创建一定数量的线程,并将这些线程放入一个“池”中,当有新任务到来时,不是立即创建新线程来执行,而是从线程池中取出一个空闲的线程来执行该任务。如果所有线程都在忙碌,则新任务会等待直到有线程变得空闲。

在C语言中,由于标准库(如C89/C99/C11等)不支持线程或线程池,因此通常需要使用第三方库如POSIX线程(pthread)来实现线程池。

1.2 应用场景【C语言】

C语言中的线程池应用场景与在其他编程语言中类似,主要包括以下几类:

  • 高并发服务器: 在网络服务器中处理大量客户端请求。每个请求可以分配给一个线程池中的线程进行处理,以提高响应速度和吞吐量。

  • 数据处理和计算密集型任务: 当需要处理大量数据或执行复杂的计算时,可以将任务分配到线程池中并行执行,以缩短总体执行时间。

  • 资源密集型任务: 对于需要频繁访问共享资源(如数据库、文件系统等)的任务,使用线程池可以减少线程创建和销毁的开销,并可能通过更高效的资源管理来提高性能。

  • 异步操作: 在需要执行非阻塞操作(如异步I/O、异步网络请求等)时,线程池可以用来处理这些异步操作的结果或回调。

  • 定时任务和周期性任务: C语言本身不直接支持定时器和周期性任务,但你可以使用线程池中的线程来模拟这些功能,通过轮询或睡眠机制来检查时间并执行相应的任务。

1.3 实现线程池步骤
  • 定义线程池结构: 包括线程数量、任务队列、互斥锁、条件变量等;

  • 实现任务队列: 用于存储待执行的任务;

  • 编写线程工作函数: 该函数将被多个线程同时执行,并从任务队列中取出任务进行处理;

  • 初始化和管理线程池: 包括创建线程、启动线程、销毁线程等操作;

  • 添加任务到线程池: 提供接口将新任务添加到任务队列中,并通知等待的线程。

由于C语言相对底层,因此实现这些功能需要更多的手动编码和对系统资源的管理。此外,你还需要考虑线程安全和性能优化等问题,第三方库可在C++、python中查找,避免从头实现线程池。

下文将具体说明C语言实现线程池的代码。

二、实现

2.1 定义线程池结构

首先,定义一个包含线程池所需所有信息的结构体,如线程数组、任务队列、互斥锁、条件变量等。

#include <pthread.h>  
#include <stdbool.h>  
#include <stdlib.h> #define MAX_THREADS 4 // 任务队列节点  
typedef struct task {  void (*function)(void*);  void* arg;  struct task* next;  
} task_t;  // 线程池结构体  
typedef struct {  pthread_t threads[MAX_THREADS]; // 线程数组  pthread_mutex_t queue_mutex;    // 保护任务队列的互斥锁  pthread_cond_t queue_cond;      // 任务队列的条件变量  bool stop;                      // 线程池停止标志  task_t* head;                   // 任务队列头指针  task_t* tail;                   // 任务队列尾指针  int thread_count;               // 当前活跃线程数  int active_threads;             // 最大活跃线程数(可配置)  
} threadpool_t;
2.2 初始化线程池

实现一个函数来初始化线程池,包括创建线程、初始化同步等。

void* worker(void* arg) {  threadpool_t* pool = (threadpool_t*)arg;  while (true) {  pthread_mutex_lock(&pool->queue_mutex);  // 如果线程池已停止且没有任务,则退出循环  if (pool->stop && pool->head == NULL) {  pthread_mutex_unlock(&pool->queue_mutex);  break;  }  // 等待任务或线程池停止信号  while (!pool->stop && pool->head == NULL && pool->thread_count >= pool->active_threads) {  pthread_cond_wait(&pool->queue_cond, &pool->queue_mutex);  }  // 取出任务(如果线程池未停止且队列不为空)  task_t* task = NULL;  if (!pool->stop && pool->head != NULL) {  task = pool->head;  pool->head = task->next;  if (pool->head == NULL) {  pool->tail = NULL;  }  pool->thread_count--;  }  pthread_mutex_unlock(&pool->queue_mutex);  // 执行任务(如果任务不为空)  if (task != NULL) {  (*(task->function))(task->arg);  free(task); // 释放任务节点内存(如果任务节点是动态分配的)  }  }  return NULL;  
}  void threadpool_init(threadpool_t* pool, int active_threads) {  pool->stop = false;  pool->head = pool->tail = NULL;  pool->thread_count = 0;  pool->active_threads = active_threads;  pthread_mutex_init(&pool->queue_mutex, NULL);  pthread_cond_init(&pool->queue_cond, NULL);  for (int i = 0; i < active_threads; i++) {  pthread_create(&pool->threads[i], NULL, worker, pool);  }  
}
2.3 添加任务到线程池

实现一个函数来向线程池的任务队列中添加任务,并唤醒一个等待的线程(如果有的话)。

void threadpool_add_task(threadpool_t* pool, void (*function)(void*), void* arg) {  task_t* new_task = (task_t*)malloc(sizeof(task_t));  new_task->function = function;  new_task->arg = arg;  new_task->next = NULL;  pthread_mutex_lock(&pool->queue_mutex);  if (pool->tail == NULL) {  pool->head = pool->tail = new_task;  } else {  pool->tail->next = new_task;  pool->tail = new_task;  }  pthread_cond_signal(&pool->queue_cond);  pthread_mutex_unlock(&pool->queue_mutex);  
}
2.4 销毁线程池

实现一个函数来停止所有线程并销毁线程池。

void threadpool_destroy(threadpool_t* pool) {  pool->stop = true;  pthread_cond_broadcast(&pool->queue_cond);  for (int i = 0; i < MAX_THREADS; i++) {  pthread_join(pool->threads[i], NULL);  }  pthread_mutex_destroy(&pool->queue_mutex);  pthread_cond_destroy(&pool->queue_cond);  
}

三、完整简单示例

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <pthread.h>
#include <unistd.h>#define MAX_THREADS 5typedef struct task {void (*function)(void*);void* arg;struct task* next;
} task_t;typedef struct {pthread_t threads[MAX_THREADS];pthread_mutex_t queue_mutex;pthread_cond_t queue_cond;bool stop;task_t* head;task_t* tail;int thread_count;int active_threads;
} threadpool_t;void* worker(void* arg) {threadpool_t* pool = (threadpool_t*)arg;while (true) {pthread_mutex_lock(&pool->queue_mutex);while (pool->head == NULL && !pool->stop) {pthread_cond_wait(&pool->queue_cond, &pool->queue_mutex);}if (pool->stop && pool->head == NULL) {pthread_mutex_unlock(&pool->queue_mutex);break;}task_t* task = pool->head;if (task != NULL) {pool->head = task->next;if (pool->head == NULL) {pool->tail = NULL;}}pthread_mutex_unlock(&pool->queue_mutex);if (task != NULL) {(*(task->function))(task->arg);free(task); // 释放任务节点内存}sleep(1);}return NULL;
}void threadpool_init(threadpool_t* pool, int active_threads) {pool->stop = false;pool->head = pool->tail = NULL;pool->thread_count = 0;pool->active_threads = active_threads;pthread_mutex_init(&pool->queue_mutex, NULL);pthread_cond_init(&pool->queue_cond, NULL);for (int i = 0; i < active_threads; ++i) {pthread_create(&pool->threads[i], NULL, worker, pool);}
}void threadpool_add_task(threadpool_t* pool, void (*function)(void*), void* arg) {task_t* new_task = (task_t*)malloc(sizeof(task_t));new_task->function = function;new_task->arg = arg;new_task->next = NULL;pthread_mutex_lock(&pool->queue_mutex);if (pool->tail == NULL) {pool->head = pool->tail = new_task;} else {pool->tail->next = new_task;pool->tail = new_task;}pthread_cond_signal(&pool->queue_cond);pthread_mutex_unlock(&pool->queue_mutex);
}void threadpool_destroy(threadpool_t* pool) {pool->stop = true;pthread_mutex_lock(&pool->queue_mutex);pthread_cond_broadcast(&pool->queue_cond);pthread_mutex_unlock(&pool->queue_mutex);for (int i = 0; i < pool->active_threads; ++i) {pthread_join(pool->threads[i], NULL);printf("%d\r\n", i);}pthread_mutex_destroy(&pool->queue_mutex);pthread_cond_destroy(&pool->queue_cond);
}// 示例任务函数
void example_task(void* arg) {int task_id = *(int*)arg;printf("Task %d is executing\n", task_id);free(arg); // 释放参数内存sleep(1);  // 模拟任务执行时间
}int main() {threadpool_t pool;threadpool_init(&pool, 2); // 初始化线程池,最大活跃线程数为2// 添加示例任务for (int i = 0; i < MAX_THREADS; ++i) {int* arg = (int*)malloc(sizeof(int));*arg = i;threadpool_add_task(&pool, example_task, arg);}// 等待一段时间,观察任务执行情况sleep(5);// 销毁线程池threadpool_destroy(&pool);return 0;
}

运行结果为:

在这里插入图片描述

这段代码演示了一个简单的线程池的使用方式。在主函数中,我们初始化了一个线程池(最大活跃线程数为2),然后添加了5个示例任务(每个任务执行时间模拟为1秒)。在任务执行完毕后,通过调用 threadpool_destroy 函数来销毁线程池,确保所有任务都被执行完毕。

注意:这里销毁的时候保证知道数组中有多少个有效的线程ID,进而销毁。

可在结构体中添加pool->num_threads ,即用来保存线程池中线程数量的成员变量,应该在初始化线程池时进行设置,并在后续操作中根据需要使用,则不会出现在销毁时越界或死锁。

这个示例展示了如何使用线程池来管理并发执行的任务,以及如何通过互斥锁和条件变量来实现线程安全的任务队列操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/47392.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

3 C 语言运算符深度解析:从基础到实战

目录 1 运算符分类 2 算术运算符与算术表达式 2.1 算术运算符的用法 2.2 左操作数和右操作数 3 关系运算符与关系表达式 3.1 关系运算符的用法 3.2 常量左置防错 3.3 三数相等判断误区 4 逻辑运算符与逻辑表达式 4.1 逻辑运算符的用法 4.2 闰年的判断 4.3 短路运算…

golang单元测试性能测试常见用法

关于go test的一些说明 golang安装后可以使用go test工具进行单元测试 代码片段对比的性能测试,使用起来还是比较方便,下面是一些应用场景 平时自己想做一些简单函数的单元测试&#xff0c;不用每次都新建一个main.go 然后go run main.go相对某个功能做下性能测试 看下cpu/内存…

k8s集群 安装配置 Prometheus+grafana+alertmanager

k8s集群 安装配置 Prometheusgrafanaalertmanager k8s环境如下&#xff1a;机器规划&#xff1a; node-exporter组件安装和配置安装node-exporter通过node-exporter采集数据显示192.168.40.180主机cpu的使用情况显示192.168.40.180主机负载使用情况 Prometheus server安装和配置…

自动驾驶AVM环视算法–全景和标定全功能算法实现和exe测试demo

参考&#xff1a;全景和标定全功能算法实现和exe测试demo-金书世界 1、测试环境 opencv310vs2022 2、使用的编程语言 c和c 3、测试的demo的获取 更新&#xff1a;测试的exe程序&#xff0c;无需解压码就可以体验算法测试效果 百度网盘&#xff1a; 链接&#xff1a;http…

代理IP服务中的代理池大小有何影响?

在当今数字化时代&#xff0c;网络爬虫已经成为获取各类信息必不可少的工具。在大规模数据抓取中&#xff0c;使用单一 IP 地址或同一 IP 代理往往会面临抓取可靠性降低、地理位置受限、请求次数受限等一系列问题。为了克服这些问题&#xff0c;构建代理池成为一种有效的解决方…

基于若依的ruoyi-nbcio流程管理系统修正自定义业务表单的回写bug

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a;RuoYi-Nbcio后台管理系统 http://218.75.87.38:9666/ 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码&#xff1a; h…

VUE3 播放RTSP实时、回放(NVR录像机)视频流(使用WebRTC)

1、下载webrtc-streamer&#xff0c;下载的最新window版本 Releases mpromonet/webrtc-streamer GitHub 2、解压下载包 3、webrtc-streamer.exe启动服务 &#xff08;注意&#xff1a;这里可以通过当前文件夹下用cmd命令webrtc-streamer.exe -o这样占用cpu会很少&#xff0c…

idea Apipost 插件导出接口文档字段类型全部是string

idea版本&#xff1a;2023.2.1 Apipost-Helper-2.0插件版本&#xff1a; 联系官方客服后&#xff0c;更换插件版本&#xff0c;问题解决。更换后的插件版本为&#xff1a; 插件链接放在文章首部了&#xff0c;可直接下载&#xff0c;使用idea直接安装这个zip包&#xff0c;无需…

深度学习pytorch学到哪种程度就算入门了?

在开始前分享一些pytorch的资料需要的同学评论888即可拿走 是我根据网友给的问题精心整理的PyTorch这个框架&#xff0c;可以读一些入门书。 PyTorch本身是一个极其庞大的框架&#xff0c;里面有数据读取、高性能计算、自动微分、模型导出、分布式训练等等。 我觉得能用这个框…

ELK日志管理与应用

目录 一.ELK收集nginx日志 二.收集tomcat日志 三.Filebeat 一.ELK收集nginx日志 1.搭建好ELKlogstashkibana架构 2.关闭防火墙和selinux systemctl stop firewalld setenforce 0 3.安装nginx [rootlocalhost ~]# yum install epel-release.noarch -y [rootlocalhost …

使用Django框架实现音频上传功能

数据库设计&#xff08;models.py&#xff09; class Music(models.Model):""" 音乐 """name models.CharField(verbose_name"音乐名字", max_length32)singer models.CharField(verbose_name"歌手", max_length32)# 本质…

Hadoop-34 HBase 安装部署 单节点配置 hbase-env hbase-site 超详细图文 附带配置文件

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; HadoopHDFSMapReduceHiveFlumeSqoopZookeeperHBase 正在 章节内容 上节我们完成了&#xff1a; HBase的由…

Apache Paimon 在蚂蚁的应用

摘要 &#xff1a;本文整理自 Apache Paimon Committer 闵文俊老师在5月16日 Streaming Lakehouse Meetup Online 上的分享。内容主要分为以下四个部分&#xff1a; 什么是 Paimon蚂蚁 Paimon 应用场景蚂蚁 Paimon 功能改进未来规划 一、什么是 Paimon 1. 实时更新 Paimon 是…

Hadoop3:HDFS存储优化之小文件归档

一、情景说明 我们知道&#xff0c;NameNode存储一个文件元数据&#xff0c;默认是150byte大小的内存空间。 那么&#xff0c;如果出现很多的小文件&#xff0c;就会导致NameNode的内存占用。 但注意&#xff0c;存储小文件所需要的磁盘容量和数据块的大小无关。 例如&#x…

用户注册业务逻辑、接口设计和实现、前端逻辑

一、用户注册业务逻辑分析 二、用户注册接口设计和定义 2.1. 设计接口基本思路 对于接口的设计&#xff0c;我们要根据具体的业务逻辑&#xff0c;设计出适合业务逻辑的接口。设计接口的思路&#xff1a; 分析要实现的业务逻辑&#xff1a; 明确在这个业务中涉及到几个相关子…

如何通过企业微信会话存档保护企业利益?

赵总: 张经理&#xff0c;最近行业内频发数据泄露事件&#xff0c;我们的客户资料和内部沟通记录安全吗&#xff1f; 张经理: 赵总&#xff0c;我们已经采取了一系列措施来加强数据安全。特别是针对企业微信的沟通记录&#xff0c;我们最近引入了安企神软件&#xff0c;它能很…

打印室预约小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;附近打印店管理&#xff0c;文件打印管理&#xff0c;当前预约管理&#xff0c;预约历史管理&#xff0c;打印记录管理 开发系统&#xff1a;Windows 架构模式&#xff1a;SSM JD…

神经网络构造

目录 一、神经网络骨架&#xff1a;二、卷积操作&#xff1a;三、卷积层&#xff1a;四、池化层&#xff1a;五、激活函数&#xff08;以ReLU为例&#xff09;&#xff1a; 一、神经网络骨架&#xff1a; import torch from torch import nn#神经网络 class CLH(nn.Module):de…

华为的热机备份和流量限制

要求&#xff1a; 12&#xff0c;对现有网络进行改造升级&#xff0c;将当个防火墙组网改成双机热备的组网形式&#xff0c;做负载分担模式&#xff0c;游客区和DMZ区走FW4&#xff0c;生产区和办公区的流量走FW5 13&#xff0c;办公区上网用户限制流量不超过100M&#xff0c;…

Redis实战—附近商铺、用户签到、UV统计

本博客为个人学习笔记&#xff0c;学习网站与详细见&#xff1a;黑马程序员Redis入门到实战 P88 - P95 目录 附近商铺 数据导入 功能实现 用户签到 签到功能 连续签到统计 UV统计 附近商铺 利用Redis中的GEO数据结构实现附近商铺功能&#xff0c;常见命令如下图所示。…