线程池
什么是线程池?
一次预先申请一批线程,让这批线程有任务,就处理任务;没任务,就处于等待状态。
为什么要有线程池?
以空间换时间,预先申请一批线程,当有任务到来,可以直接指派给线程执行。
// task.hpp
#pragma once#include <functional>using namespace std;typedef function<int(int, int)> calc_func_t;class Task
{
public:Task() {}Task(int x, int y, calc_func_t func): _x(x), _y(y), _calc_func(func){}// 加法计算的任务int operator()() { return _calc_func(_x, _y); }int get_x() { return _x; }int get_y() { return _y; }
private:int _x;int _y;calc_func_t _calc_func;
};
// log.hpp
#pragma once#include <string>
#include <stdarg.h>
#include <unordered_map>using namespace std;#define LOG_FILE "./threadpool.log"// 日志是有日志级别的
enum LogLevel
{DEBUG,NORMAL,WARNING,ERROR,FATAL
};// 针对枚举类型的哈希函数
template <typename T>
class EnumHash
{
public:size_t operator()(const T& t) const { return static_cast<size_t>(t); }
};
unordered_map<LogLevel, string, EnumHash<LogLevel>> logLevelMap = {{DEBUG, "DEBUG"},{NORMAL, "NORMAL"},{WARNING, "WARNING"},{ERROR , "ERROR"},{FATAL, "FATAL"}
};// 完整的日志功能,至少有:日志等级 时间 支持用户自定义
void logMessage(LogLevel log_level, const char* format, ...)
{
#ifndef DEBUG_SHOWif(log_level == DEBUG) return; // DEBUG_SHOW没有定义,不展示DEBUG信息
#endif char stdBuffer[1024]; // 标准部分char logBuffer[1024]; // 自定义部分time_t timestamp = time(nullptr);struct tm* ploct = localtime(×tamp);snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%04d-%02d-%02d %02d:%02d:%02d]", logLevelMap[log_level].c_str(),\1900 + ploct->tm_year, 1 + ploct->tm_mon, ploct->tm_mday, ploct->tm_hour, ploct->tm_min, ploct->tm_sec);va_list args;va_start(args, format);vsnprintf(logBuffer, sizeof logBuffer, format, args);va_end(args);FILE* log_file = fopen(LOG_FILE, "a");fprintf(log_file, "%s %s\n", stdBuffer, logBuffer);fclose(log_file);
}
va_*
系列函数与vprintf
系列函数配合使用可以格式化打印传入的可变参数的内容。
// thread.hpp
#pragma once#include <string>
#include <cstdio>
#include <pthread.h>using namespace std;// 对应创建线程时的routine函数的类型
typedef void*(*func_t)(void*);class ThreadData
{
public:void* _ptpool; // 指向线程池对象string _name;
};class Thread
{
public:Thread(int num, func_t callBack, void* _ptpool): _func(callBack){char nameBuffer[64];snprintf(nameBuffer, sizeof(nameBuffer), "Thread_%d", num);_tdata._name = nameBuffer;_tdata._ptpool = _ptpool;}void start() { pthread_create(&_tid, nullptr, _func, (void*)&_tdata); }void join() { pthread_join(_tid, nullptr); }const string& name() { return _tdata._name; }
private:pthread_t _tid; // 线程IDfunc_t _func; // 线程routineThreadData _tdata; // 线程数据
};
// threadPool.hpp
#pragma once#include <vector>
#include <queue>
#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"const int g_thread_num = 3;// 线程池:本质是生产消费模型
template<class T>
class threadPool
{
private:threadPool(int thread_num = g_thread_num): _thread_num(thread_num){pthread_mutex_init(&_lock, nullptr);pthread_cond_init(&_cond, nullptr);for(int i = 0; i < _thread_num; ++i){_threads.push_back(new Thread(i + 1/*线程编号*/, routine, this/*可以传this指针*/));}}threadPool(const threadPool<T>&) = delete;const threadPool<T>& operator=(const threadPool<T>&) = delete;
public:// 考虑多个线程使用单例的情况static threadPool<T>* getThreadPool(int thread_num = g_thread_num){if(nullptr == _pthread_pool){lockGuard lock_guard(&_pool_lock);// 在单例创建好后,锁也就没有意义了// 将来任何一个线程要获取单例,仍必须调用getThreadPool接口// 这样一定会存在大量的申请和释放锁的行为// 所以外层if判断,用于在单例创建的情况下,拦截大量的线程因请求单例而访问锁的行为if(nullptr == _pthread_pool){_pthread_pool = new threadPool<T>(thread_num);}}return _pthread_pool;}void run(){for(auto& pthread : _threads){pthread->start();logMessage(NORMAL, "%s %s", (pthread->name()).c_str(), "启动成功");}}void pushTask(const T& task){lockGuard lock_guard(&_lock);_task_queue.push(task);pthread_cond_signal(&_cond);}~threadPool(){for(auto& pthread : _threads){pthread->join();delete pthread;}pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_cond);}
public:pthread_mutex_t* getMutex(){return &_lock;}bool isEmpty(){return _task_queue.empty();}void waitCond(){pthread_cond_wait(&_cond, &_lock);}T& getTask(){T& task = _task_queue.front();_task_queue.pop();return task;}
private:// 消费过程static void* routine(void* args){ThreadData* tdata = (ThreadData*)args;threadPool<T>* tpool = (threadPool<T>*)tdata->_ptpool;while(true){T task;{lockGuard lock_guard(tpool->getMutex());while (tpool->isEmpty()) tpool->waitCond();task = tpool->getTask();}logMessage(WARNING, "%s 处理完成: %d + %d = %d", (tdata->_name).c_str(), task.get_x(), task.get_y(), task());}}
private:vector<Thread*> _threads; // 数组存放创建的线程的地址int _thread_num; // 创建的线程个数queue<T> _task_queue; // 阻塞式任务队列pthread_mutex_t _lock; // 针对任务队列的锁pthread_cond_t _cond; // 队列空满情况的条件变量static threadPool<T>* _pthread_pool; // 饿汉式线程池static pthread_mutex_t _pool_lock; // 针对线程池的锁
};template<class T>
threadPool<T>* threadPool<T>::_pthread_pool = nullptr;
template<class T>
pthread_mutex_t threadPool<T>::_pool_lock = PTHREAD_MUTEX_INITIALIZER;
// test.cc
#include "task.hpp"
#include "threadPool.hpp"
#include <unistd.h>
#include <ctime>void test1()
{srand((unsigned int)time(nullptr) ^ getpid());threadPool<Task>::getThreadPool()->run();while(true){// 生产的过程 - 制作任务的时候要花时间的int x = rand() % 100 + 1;usleep(2023);int y = rand() % 50 + 1;Task task(x, y, [](int x, int y){ return x + y; });logMessage(DEBUG, "制作任务完成: %d + %d = ?", x, y);// 推送任务到线程池threadPool<Task>::getThreadPool()->pushTask(task);sleep(1);}
}
# Makefile
test:test.ccg++ -o $@ $^ -std=c++11 -lpthread -DDEBUG_SHOW
.PHONY:clean
clean:rm -f test
运行结果:
自旋锁
自旋锁:本质是通过不断检测锁的状态,来确定资源是否就绪的方案。
什么时候使用自旋锁?这个由临界资源就绪的时间长短决定。
自旋锁的初始化 & 销毁:
自旋锁的加锁:
自旋锁的解锁:
读者写者问题
写者与写者:互斥关系
读者与写者:互斥 & 同步关系
读者与读者:共享关系
读者写者问题和生产消费模型的本质区别在于,消费者会拿走数据(做修改),而读者不会。
读写锁的初始化 & 销毁:
读写锁之读者加锁:
读写锁之写者加锁:
读写锁的解锁:
关于是读者还是写者优先的问题,抛开应用场景去谈技术细节就是耍流氓。
而pthread库中的读写锁默认采用读者优先,这类的应用场景主要是:数据被读取的频率非常高,被修改的频率非常低。