文章目录
- 九、多线程
- 10. 线程池
- 未完待续
九、多线程
10. 线程池
这里我没实现一些 懒汉单例模式 的线程池,并且包含 日志打印 的线程池:
Makefile:
threadpool:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f threadpool
Thread.hpp:
#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>namespace ThreadModule
{// 类型别名using func_t = std::function<void(std::string)>;// 线程类class Thread{public:void Excute(){_func(_threadname);}public:Thread(func_t func, std::string name = "none-name"):_func(func),_threadname(name),_stop(true){}// 执行任务static void *threadroutine(void *args){Thread *self = static_cast<Thread*>(args);self->Excute();return nullptr;}// 启动线程bool Start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if(!n){_stop = false;return true;}else{return false;}}// 停止线程void Detach(){if(!_stop){pthread_detach(_tid);}}// 等待线程结束void Join(){if(!_stop){pthread_join(_tid, nullptr);}}std::string name(){return _threadname;}void Stop(){_stop = true;}~Thread(){}private:pthread_t _tid;std::string _threadname;func_t _func;bool _stop;};
}#endif
LockGuard.hpp:
#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__#include <iostream>
#include <pthread.h>class LockGuard
{
public:// 构造函数加锁LockGuard(pthread_mutex_t *mutex):_mutex(mutex){pthread_mutex_lock(_mutex);}// 析构函数解锁~LockGuard(){pthread_mutex_unlock(_mutex);}
private:pthread_mutex_t *_mutex;
};#endif
Log.hpp:
#pragma once#include <iostream>
#include <fstream>
#include <cstdio>
#include <string>
#include <ctime>
#include <cstdarg>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include "LockGuard.hpp"// 宏定义,用于定义日志格式
#define LOG(level, format, ...) do{LogMessage(__FILE__, __LINE__, gIsSave, level, format, ##__VA_ARGS__);}while (0)
// 将日志输入到文件
#define EnableFile() do{gIsSave = true;}while (0)
// 将日志输出到显示器
#define EnableScreen() do{gIsSave = false;}while (0)bool gIsSave = false;
// 日志文件名
const std::string logname = "log.txt";// 枚举日志级别
enum Level
{DEBUG = 0,INFO,WARNING,ERROR,FATAL
};// 保存日志到文件
void SaveFile(const std::string &filename, const std::string &message)
{std::ofstream out(filename, std::ios::app);if (!out.is_open()){return;}out << message;out.close();
}// 日志级别转字符串
std::string LevelToString(int level)
{switch (level){case DEBUG:return "Debug";case INFO:return "Info";case WARNING:return "Warning";case ERROR:return "Error";case FATAL:return "Fatal";default:return "Unknown";}
}// 获取当前时间字符串
std::string GetTimeString()
{time_t curr_time = time(nullptr);struct tm *format_time = localtime(&curr_time);if (format_time == nullptr)return "None";char time_buffer[1024];snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",format_time->tm_year + 1900,format_time->tm_mon + 1,format_time->tm_mday,format_time->tm_hour,format_time->tm_min,format_time->tm_sec);return time_buffer;
}// 日志锁,同一时刻只能写一个日志
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;// 日志信息
void LogMessage(std::string filename, int line, bool issave, int level, const char *format, ...)
{// 日志级别std::string levelstr = LevelToString(level);// 时间std::string timestr = GetTimeString();// 进程idpid_t selfid = getpid();// 日志内容char buffer[1024];va_list arg;va_start(arg, format);vsnprintf(buffer, sizeof(buffer), format, arg);va_end(arg);// 日志格式化std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +"[" + std::to_string(selfid) + "]" +"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";LockGuard lockguard(&lock);// 输出日志if (!issave){std::cout << message;}else{SaveFile(logname, message);}
}
Task.hpp:
#pragma once#include <iostream>
#include <string>
#include <functional>class Task
{
public:Task(){}Task(int a, int b):_a(a),_b(b),_result(0){}// 执行加法功能void Excute(){_result = _a + _b;}// 结果std::string ResultToString(){return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);}// 问题std::string DebugToString(){return std::to_string(_a) + "+" + std::to_string(_b) + "= ?";}// 重载()运算符void operator()(){Excute();}
private:int _a;int _b;int _result;
};
ThreadPool.hpp:
#pragma once#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"using namespace ThreadModule;// 线程池默认线程数
const static int gdefaultthreadnum = 10;template <typename T>
class ThreadPool
{
private:// 线程互斥锁void LockQueue(){pthread_mutex_lock(&_mutex);}// 线程互斥解锁void UnlockQueue(){pthread_mutex_unlock(&_mutex);}// 线程等待void ThreadSleep(){pthread_cond_wait(&_cond, &_mutex);}// 线程唤醒void ThreadWakeup(){pthread_cond_signal(&_cond);}// 唤醒全部线程void ThreadWakeupAll(){pthread_cond_broadcast(&_cond);}// 私有构造函数ThreadPool(int threadnum = gdefaultthreadnum):_threadnum(threadnum),_waitnum(0),_isrunning(false){// 初始化锁pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);// 日志LOG(INFO, "ThreadPool Construct()");}// 初始化线程池void InitThreadPool(){// 创建一批线程for (int num = 0; num < _threadnum; num++){std::string name = "thread-" + std::to_string(num + 1);_threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name);// 日志LOG(INFO, "init thread %s done", name.c_str());}_isrunning = true;}// 启动线程池void Start(){for (auto &thread : _threads){thread.Start();}}// 任务处理函数void HandlerTask(std::string name) // 类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用!{// 日志LOG(INFO, "%s is running...", name.c_str());while (true){// 加锁LockQueue();while (_task_queue.empty() && _isrunning){_waitnum++;ThreadSleep();_waitnum--;}// 退出情况if (_task_queue.empty() && !_isrunning){UnlockQueue();break;}// 取出任务T t = _task_queue.front();_task_queue.pop();UnlockQueue();// 日志LOG(DEBUG, "%s get a task", name.c_str());// 执行任务t();// 日志LOG(DEBUG, "%s handler a task, result is: %s", name.c_str(), t.ResultToString().c_str());}}// 禁用拷贝构造和赋值操作ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;ThreadPool(const ThreadPool<T> &) = delete;
public:static ThreadPool<T> *GetInstance(){// 首次使用时,创建线程池单例if (nullptr == _instance){// 对于多线程创建单例时加锁,保证线程安全LockGuard lockguard(&_lock);if (nullptr == _instance){// 创建线程池实例_instance = new ThreadPool<T>();_instance->InitThreadPool();_instance->Start();LOG(DEBUG, "创建线程池单例");return _instance;}}// 已经创建过线程池单例,直接返回LOG(DEBUG, "获取线程池单例");return _instance;}// 停止线程池void Stop(){LockQueue();_isrunning = false;ThreadWakeupAll();UnlockQueue();}// 等待线程池退出void Wait(){for (auto &thread : _threads){thread.Join();LOG(INFO, "%s is quit...", thread.name().c_str());}}// 向线程池中添加任务bool Enqueue(const T &t){bool ret = false;LockQueue();if (_isrunning){_task_queue.push(t);if (_waitnum > 0){ThreadWakeup();}LOG(DEBUG, "enqueue task success");ret = true;}UnlockQueue();return ret;}// 析构自动释放锁资源~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}
private:// 线程池中线程个数int _threadnum;// 线程std::vector<Thread> _threads;// 任务队列std::queue<T> _task_queue;// 互斥锁pthread_mutex_t _mutex;// 条件变量pthread_cond_t _cond;// 等待线程数int _waitnum;// 线程池是否运行bool _isrunning;// 线程池单例static ThreadPool<T> *_instance;// 全局锁static pthread_mutex_t _lock;
};// 初始化静态变量
template <typename T>
ThreadPool<T> *ThreadPool<T>::_instance = nullptr;// 全局锁
template <typename T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;
Main.cc:
#include "ThreadPool.hpp"
#include "Task.hpp"
#include "Log.hpp"
#include <iostream>
#include <string>
#include <memory>
#include <ctime>int main()
{// 日志LOG(DEBUG, "程序已经加载");sleep(2);// 创建线程池单例ThreadPool<Task>::GetInstance();sleep(2);// 获取单例ThreadPool<Task>::GetInstance();sleep(2);ThreadPool<Task>::GetInstance();sleep(2);ThreadPool<Task>::GetInstance();sleep(2);// 等待线程结束ThreadPool<Task>::GetInstance()->Wait();sleep(2);return 0;
}
结果演示: