1、使用C构造线程类
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <queue>
#include <cstring>
#include <time.h>
#include <unistd.h>
#include <pthread.h>
#include <thread>
#include <chrono>
using namespace std;const int NUMBER = 2;// Task是任务类,里边有两个成员,分别是两个指针void(*)(void*)和void*。
// TaskQueue是任务队列,提供了添加任务、取出任务、存储任务、获取任务个数、线程同步的功能。
//
//
using callback = void(*)(void*);
// 任务 结构体
struct Task
{// 初始化方式一Task(){func = nullptr;arg = nullptr;}// 初始化方式二Task(callback f, void* arg){func = f;arg = arg;}callback func;void* arg;
};// 任务队列 类
class TaskQueue
{
public:TaskQueue();~TaskQueue();// 添加任务void TaskQueue_add(Task& t);void TaskQueue_add(callback f, void* arg);// 取出任务Task TaskQueue_pop();// 获取队列中任务个数int TaskQueue_size(){return _queue.size();}private:pthread_mutex_t _mutex; // 互斥锁std::queue<Task> _queue; // 任务队列
};TaskQueue::TaskQueue()
{// 初始化锁变量pthread_mutex_init(&_mutex, nullptr);
}TaskQueue::~TaskQueue()
{// 销毁锁变量pthread_mutex_destroy(&_mutex);
}void TaskQueue::TaskQueue_add(Task& t)
{pthread_mutex_lock(&_mutex);_queue.push(t);pthread_mutex_unlock(&_mutex);
}void TaskQueue::TaskQueue_add(callback f, void* arg)
{pthread_mutex_lock(&_mutex);Task t;t.func = f;t.arg = arg;_queue.push(t);pthread_mutex_unlock(&_mutex);
}Task TaskQueue::TaskQueue_pop()
{pthread_mutex_lock(&_mutex);Task t;if (_queue.size() > 0){t = _queue.front();_queue.pop();}pthread_mutex_unlock(&_mutex);return t;
}// 线程池 类
class ThreadPool
{
public:ThreadPool(int max, int min);~ThreadPool();// 添加任务void task_add(Task t);// 获取工作的线程个数int get_busy_num();// 或者启动的线程个数int get_live_num();private:
// 函数// 工作线程的任务函数static void* thread_work(void* arg);// 管理者线程的任务函数static void* thread_manage(void* arg);void thread_exit();private:
// 变量// 线程池中线程队列pthread_t* _threads_id;// 线程池中线程的最大数量int _threads_max;// 线程池中线程的最小数量int _threads_min;// 工作的线程个数int _threads_busy_num;// 存活的线程个数int _threads_live_num;// 要销毁的线程个数int _threads_exit_num;// 管理者线程pthread_t _thread_manage;// 锁 锁住整个线程池pthread_mutex_t _lock_pool;// 条件变量 task_queue是不是为空pthread_cond_t _task_queue_empty;// 任务队列TaskQueue* _task_que;// 是否销毁线程,销毁=1;不销毁=0int _shut_down;
};// 构造函数
ThreadPool::ThreadPool(int max, int min)
{cout << "======== 构造 ========" << endl;// 构造函数,进行初始化_task_que = new TaskQueue;_threads_max = max;_threads_min = min;_threads_busy_num = 0;_threads_live_num = min;// 申请存放线程的数组_threads_id = new pthread_t[_threads_max];if (_threads_id == nullptr){cout << "malloc threadIDs fail..." << endl;return;}memset(_threads_id, 0, sizeof(pthread_t) * _threads_max);for (int i = 0; i < _threads_min; i++){pthread_create(&(_threads_id[i]), NULL, thread_work, this);cout << "创建子线程, ID: " << to_string(_threads_id[i]) << endl;}// 创建管理者线程, 1个pthread_create(&_thread_manage, NULL, thread_manage, this);// 初始化锁 条件变量if (pthread_mutex_init(&_lock_pool, NULL) != 0 || pthread_cond_init(&_task_queue_empty, NULL) != 0){cout << "init mutex or condition fail..." << endl;}_shut_down = 0;return;
}// 析构函数
ThreadPool::~ThreadPool()
{cout << "======== 析构 ========" << endl;_shut_down = 1;// 销毁管理者线程pthread_join(_thread_manage, NULL);// 唤醒所有消费者线程for (int i = 0; i < _threads_live_num; i++){pthread_cond_signal(&_task_queue_empty);}if (_task_que)delete _task_que;if (_threads_id)delete[] _threads_id;pthread_mutex_destroy(&_lock_pool);pthread_cond_destroy(&_task_queue_empty);return;
}void ThreadPool::task_add(Task t)
{if (_shut_down == 1){return;}cout << "======== 添加任务 ========" << endl;// 添加任务,不需要加锁,任务队列中有锁_task_que->TaskQueue_add(t);// 唤醒工作的线程pthread_cond_signal(&_task_queue_empty);
}int ThreadPool::get_live_num()
{int threadNum = 0;pthread_mutex_lock(&_lock_pool);threadNum = _threads_live_num;pthread_mutex_unlock(&_lock_pool);return threadNum;
}int ThreadPool::get_busy_num()
{int busyNum = 0;pthread_mutex_lock(&_lock_pool);busyNum = _threads_busy_num;pthread_mutex_unlock(&_lock_pool);return busyNum;
}// 线程退出
void ThreadPool::thread_exit()
{pthread_t tid = pthread_self();for (int i = 0; i < _threads_max; i++){if (_threads_id[i] == tid){cout << "thread_exit() function: thread " << to_string(pthread_self()) << " exiting..." << endl;_threads_id[i] = 0;break;}}pthread_exit(NULL);
}// 线程工作
void* ThreadPool::thread_work(void* arg)
{ThreadPool* pool = static_cast<ThreadPool*>(arg);while (1){cout << "======== 工作 ========" << endl;sleep(2);// 访问任务队列加锁pthread_mutex_lock(&(pool->_lock_pool));// 如果任务列表为空,添加进程阻塞while (pool->_task_que->TaskQueue_size() == 0 && pool->_shut_down == 0){pthread_cond_wait(&(pool->_task_queue_empty), &(pool->_lock_pool));if (pool->_threads_exit_num > 0){pool->_threads_exit_num--;if (pool->_threads_live_num > pool->_threads_min){pool->_threads_live_num--;pthread_mutex_unlock(&(pool->_lock_pool));// 销毁该线程pool->thread_exit();}}}// 如果线程池要关闭了,直接返回if (pool->_shut_down == 1){cout << "======== shut down ========" << endl;pool->_threads_live_num--;pthread_mutex_unlock(&(pool->_lock_pool));pool->thread_exit();}if (pool->_task_que->TaskQueue_size() > 0){// 取任务Task t = pool->_task_que->TaskQueue_pop();pool->_threads_busy_num++;pthread_mutex_unlock(&(pool->_lock_pool));// 处理任务cout << "thread " << pthread_self() << " start working..." << endl;t.func(t.arg);delete t.arg;t.arg = NULL;cout << "thread " << pthread_self() << " end working..." << endl;// 工作的线程数量减-pthread_mutex_lock(&(pool->_lock_pool));pool->_threads_busy_num--;pthread_mutex_unlock(&(pool->_lock_pool));}}return nullptr;
}// 线程管理
void* ThreadPool::thread_manage(void* arg)
{cout << "======== 管理 ========" << endl;ThreadPool* pool = static_cast<ThreadPool*>(arg);// 如果线程池没有关闭, 就一直检测while (pool->_shut_down == 0){// 每隔5s检测一次sleep(5);// 取出线程池中的任务数和线程数量// 取出工作的线程池数量pthread_mutex_lock(&pool->_lock_pool);int queueSize = pool->_task_que->TaskQueue_size();int liveNum = pool->_threads_live_num;int busyNum = pool->_threads_busy_num;cout << "管理 任务个数: " << queueSize << " 存活线程个数: " << liveNum << " 工作的线程个数: " << busyNum << endl;pthread_mutex_unlock(&pool->_lock_pool);// 创建线程// 当前任务个数>存活的线程数 && 存活的线程数<最大线程个数if (queueSize > liveNum && liveNum < pool->_threads_max){// 线程池加锁cout << "======== 添加线程 ========" << endl;pthread_mutex_lock(&pool->_lock_pool);int num = 0;for (int i = 0; i < pool->_threads_max && num < NUMBER && pool->_threads_live_num < pool->_threads_max; i++){if (pool->_threads_id[i] == 0){pthread_create(&pool->_threads_id[i], NULL, thread_work, pool);num++;pool->_threads_live_num++;}}pthread_mutex_unlock(&pool->_lock_pool);}// 销毁多余的线程// 忙线程*2 < 存活的线程数目 && 存活的线程数 > 最小线程数量if (busyNum * 2 < liveNum && liveNum > pool->_threads_min){cout << "======== 销毁线程 ========" << endl;pthread_mutex_lock(&pool->_lock_pool);pool->_threads_exit_num = NUMBER;pthread_mutex_unlock(&pool->_lock_pool);for (int i = 0; i < NUMBER; ++i){pthread_cond_signal(&pool->_task_queue_empty);}}if (pool->_shut_down == 1){break;}}return nullptr;
}// 实际的处理函数
void taskFunc(void* arg)
{int num = *(int*)arg;cout << "thread " << pthread_self() << " is working, number = " << num;
}int main()
{ThreadPool* pool = new ThreadPool(10, 3);for (int i = 0; i < 30; i++){int* num = (int*)malloc(sizeof(int));*num = i + 100;Task t;t.func = taskFunc;t.arg = num;pool->task_add(t);sleep(0.5);}//sleep(40);std::this_thread::sleep_for(std::chrono::seconds(40));delete pool;pool = nullptr;return 0;
}
结果为:
[======$]g++ thread-poo-c++.cpp -lpthread
[======$]./a.out
======== 构造 ========
创建子线程, ID: ======== 工作 ========
140173728401152
创建子线程, ID: 140173720008448
======== 工作 ========
创建子线程, ID: 140173711615744
======== 工作 ========
======== 添加任务 ========
======== 管理 ================ 添加任务 ========
======== 添加任务 ================ 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
======== 添加任务 ========
thread thread 140173720008448 start working...140173728401152 start working...
thread 140173720008448 is working, number =
thread 140173728401152 is working, number = 101thread 140173728401152 end working...
100thread 140173720008448 end working...
======== 工作 ========
thread 140173711615744 start working...
thread 140173711615744 is working, number = 102thread 140173711615744 end working...
======== 工作 ========
======== 工作 ========
thread 140173720008448 start working...
thread 140173720008448 is working, number = thread 103thread 140173720008448 end working...
thread ======== 工作 ========
140173711615744140173728401152 start working... start working...
thread 140173711615744 is working, number = 104
thread 140173728401152 is working, number = 105thread 140173728401152 end working...
======== 工作 ========
thread 140173711615744 end working...
======== 工作 ========
管理 任务个数: 24 存活线程个数: 3 工作的线程个数: 0
======== 添加线程 ========
======== 工作 ========
======== 工作 ========
thread 140173720008448 start working...
thread 140173720008448 is working, number = 106thread 140173720008448 end working...
======== 工作 ========
thread 140173728401152 start working...
thread 140173728401152 is working, number = 107thread 140173728401152 end working...
======== 工作 ========
thread 140173711615744 start working...
thread 140173711615744 is working, number = 108thread 140173711615744 end working...
======== 工作 ========
thread 140173694830336 start working...
thread 140173694830336thread is working, number = 109140173686437632 start working...thread 140173694830336 end working...
======== 工作 ========thread 140173686437632 is working, number = 110thread 140173686437632 end working...
======== 工作 ========
thread 140173720008448 start working...
thread 140173720008448 is working, number = 111thread 140173720008448 end working...
======== 工作 ========
thread 140173728401152 start working...
thread 140173728401152 is working, number = 112thread 140173728401152 end working...
======== 工作 ========
thread 140173711615744 start working...
thread 140173711615744 is working, number = 113thread 140173711615744 end working...
======== 工作 ========
thread thread 140173694830336140173686437632 start working... start working...
thread 140173694830336 is working, number =
thread 140173686437632 is working, number = 115thread 140173686437632 end working...
======== 工作 ========
114thread 140173694830336 end working...
======== 工作 ========
管理 任务个数: 14 存活线程个数: 5 工作的线程个数: 0
======== 添加线程 ========
======== 工作 ========
======== 销毁线程 ========
======== 工作 ========
thread 140173720008448 start working...
thread 140173720008448 is working, number = 116thread 140173720008448 end working...
======== 工作 ========
thread 140173728401152 start working...
thread 140173728401152 is working, number = 117thread 140173728401152 end working...
======== 工作 ========
thread 140173711615744 start working...
thread 140173711615744 is working, number = 118thread 140173711615744 end working...
======== 工作 ========
thread 140173686437632 start working...
thread 140173686437632thread is working, number = 119140173694830336 start working...
thread 140173694830336 is working, number = 120thread 140173694830336 end working...
thread 140173686437632 end working...
======== 工作 ========
======== 工作 ========
thread 140173678044928 start working...
thread thread 140173678044928 is working, number = 121140173669652224 start working...thread
thread 140173669652224 is working, number = 140173678044928122 end working...
======== 工作 ========
thread 140173669652224 end working...
======== 工作 ========
thread 140173720008448 start working...
thread 140173720008448 is working, number = 123thread 140173720008448 end working...
======== 工作 ========
thread 140173728401152 start working...
thread 140173728401152 is working, number = 124thread 140173728401152 end working...
======== 工作 ========
thread 140173711615744 start working...
thread 140173711615744 is working, number = 125thread 140173711615744 end working...
======== 工作 ========
thread thread 140173686437632140173694830336 start working... start working...
thread 140173686437632 is working, number = 126
thread 140173694830336 is working, number = 127thread thread 140173686437632140173694830336 end working... end working...
======== 工作 ================ 工作 ========
thread 140173678044928 start working...
thread thread 140173678044928140173669652224 is working, number = start working...
thread 140173669652224 is working, number = 129128thread 140173669652224 end working...
======== 工作 ========
thread 140173678044928 end working...
======== 工作 ========
管理 任务个数: 0 存活线程个数: 7 工作的线程个数: 0
======== 销毁线程 ========
thread_exit() function: thread thread_exit() function: thread 140173728401152 exiting...140173720008448 exiting...管理 任务个数: 0 存活线程个数: 5 工作的线程个数: 0
======== 销毁线程 ========
thread_exit() function: thread thread_exit() function: thread 140173694830336 exiting...140173711615744exiting...
管理 任务个数: 0 存活线程个数: 3 工作的线程个数: 0
管理 任务个数: 0 存活线程个数: 3 工作的线程个数: 0
管理 任务个数: 0 存活线程个数: 3 工作的线程个数: 0
======== 析构 ========
管理 任务个数: 0 存活线程个数: 3 工作的线程个数: 0
======== shut down ========
======== shut down ========
======== shut down ========
======== shut down ========
2、使用C++的线程类
#pragma once#ifndef THREADPOOL_H
#define THREADPOOL_H#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <thread>
#include <functional>
#include <assert.h>class ThreadPool
{
public:explicit ThreadPool(size_t threadCount = 8);~ThreadPool();ThreadPool(const ThreadPool& other) = delete;ThreadPool& operator = (const ThreadPool& other) = delete;template<class T>void addTask(T&& task);private:void runInThread();// 存放线程std::vector<std::thread> workers;// 存放任务std::queue<std::function<void()>> tasks;// 锁std::mutex mtx;// 条件变量std::condition_variable cond;bool isClosed;
};inline ThreadPool::ThreadPool(size_t threadCount)
{assert(threadCount > 0);isClosed = false;// 初始化 threadCount 个线程for (size_t i = 0; i < threadCount; i++){workers.emplace_back(std::thread(std::bind(&ThreadPool::runInThread, this)));}
}inline ThreadPool::~ThreadPool()
{{std::lock_guard<std::mutex> lock(mtx);isClosed = true;cond.notify_all();}for (auto& thr : workers){thr.join();}
}// 工作函数
inline void ThreadPool::runInThread()
{while (!isClosed){std::unique_lock<std::mutex> lock(mtx);while (tasks.empty() && !isClosed){cond.wait(lock);}if (!tasks.empty()){auto task = std::move(tasks.front());tasks.pop();lock.unlock();task();}}
}// 添加任务
template<class T>
inline void ThreadPool::addTask(T&& task)
{if (workers.size() == 0){task();}if (!isClosed){std::lock_guard<std::mutex> lock(mtx);tasks.emplace(std::forward<T>(task));cond.notify_one();}
}#endif //THREADPOOL_H
#include "thread-1.h"
#include<iostream>
#include<random>
#include<algorithm>
#include<windows.h>
using namespace std;void taskFunc(void* arg)
{int num = *static_cast<int*>(arg);thread::id this_thread_id = this_thread::get_id();cout << "threadId=" << this_thread_id << " num=" << num << endl;Sleep(10);
}int main()
{ThreadPool pool(4);for (int i = 0; i < 100; ++i){int* num = new int(i);pool.addTask(function<void()>(bind(&taskFunc, num)));}Sleep(1000);return 0;
}
结果为:
threadId=threadId=threadId=threadId=13724 num=3
13692 num=0
13748 num=1
13716 num=2
threadId=13716 num=4
threadId=13692 num=6
threadId=13724 num=7
threadId=13748 num=5
threadId=13748threadId=13716 num=10
threadId=13692 num=9num=8
threadId=13724 num=11
threadId=13748 num=12
threadId=13692 num=14
threadId=13724 num=15
threadId=13716 num=13
threadId=13716 num=16
threadId=13748 num=18threadId=13692 num=17
threadId=13724 num=19threadId=13724 num=20
threadId=13692 num=22
threadId=13716 num=23
threadId=13748 num=21
threadId=13692 num=24
threadId=13724 num=26
threadId=13748 num=27
threadId=13716 num=25
threadId=13692 num=29
threadId=13748 num=30
threadId=13724 num=31
threadId=13716 num=28
threadId=13716 num=32
threadId=13724 num=34
threadId=13748 num=35
threadId=13692 num=33
threadId=13692 num=36
threadId=13724 num=38
threadId=13716 num=39
threadId=13748 num=37
threadId=13748 num=40
threadId=13724 num=42
threadId=13692 num=43
threadId=13716 num=41
threadId=13716 num=44
threadId=13692 num=46
threadId=13748 num=47
threadId=13724 num=45
threadId=13724 num=threadId=13748 num=50
threadId=13692 num=49
48
threadId=13716 num=51
threadId=threadId=13724 num=53
threadId=13748 num=55
13716 num=52
threadId=13692 num=54
threadId=13748 num=56
threadId=13724 num=58
threadId=13692 num=59
threadId=13716 num=57
threadId=13716 num=60
threadId=13724 num=62
threadId=13748 num=63
threadId=13692 num=61
threadId=13748 num=64
threadId=13724 num=66
threadId=13716 num=67
threadId=13692 num=65
threadId=13724 num=68
threadId=13748 num=70
threadId=13692 num=69
threadId=13716 num=71
threadId=13748 num=72
threadId=13724 num=74
threadId=13716 num=75
threadId=13692 num=73
threadId=13716 num=76
threadId=13692 num=78
threadId=13748 num=79
threadId=13724 num=77
threadId=13748 num=80
threadId=13716 num=82
threadId=13724 num=83
threadId=13692 num=81
threadId=13716 num=85
threadId=13692 num=86
threadId=13748 num=87
threadId=13724 num=84
threadId=threadId=13724 num=89
threadId=13716 num=91
13692 num=88
threadId=13748 num=90
threadId=13748 num=92
threadId=13692 num=94
threadId=13724threadId=13716 num=93num=95
threadId=13724 num=96
threadId=13748 num=98
threadId=13692 num=99
threadId=13716 num=97
3、参考
手写线程池 - C改C++版 | 爱编程的大丙
基于C++11的线程池实现(ThreadPool)_c++ threadpool-CSDN博客