目录
- 一、线程池
- 1、日志与策略模式
- 2、线程池设计
- 3、线程安全的单例模式
- (1)单例模式的特点
- (2)饿汉实现方式和懒汉实现方式
- (i)饿汉方式实现单例模式
- (ii)懒汉方式实现单例模式
- (iii)懒汉方式实现单例模式(线程安全版本)
- 4、单例式线程池
- 二、线程安全和重入问题
- 三、常见锁概念
- 1、死锁
- (1)多线程造成死锁
- (2)单执行流产生死锁
- (3)阻塞
- 2、死锁四个必要条件
- (1)互斥条件
- (2)请求与保持条件
- (3)不剥夺条件
- (4)循环等待条件
- 3、避免死锁
- (1)破坏死锁的四个必要条件
- (2)避免锁未释放的场景
- 4、避免死锁算法(不讲)
- 四、STL,智能指针和线程安全
- 1、STL中的容器是否是线程安全的?
- 2、智能指针是否是线程安全的?
- 五、其他常见的各种锁
一、线程池
下面开始,我们结合我们之前所做的所有封装,进行一个线程池的设计。在写之前,我们要做如下准
- 准备线程的封装
- 准备锁和条件变量的封装
- 引入日志,对线程进行封装
1、日志与策略模式
什么是设计模式
IT行业这么火,涌入的人很多,俗话说林子大了啥鸟都有,大佬和菜鸡们两极分化的越来越严重.为了让菜鸡们不太拖大佬的后腿,于是大佬们针对一些经典的常见的场景,给定了一些对应的解决方案,这个就是设计模式
日志认识
计算机中的日志是记录系统和软件运行中发生事件的文件,主要作用是监控运行状态、记录异常信息,帮助快速定位问题并支持程序员进行问题修复。它是系统维护、故障排查和安全管理的重要工具。
日志格式以下几个指标是必须得有的
- 时间戳
- 日志等级
- 日志内容
以下几个指标是可选的
- 文件名行号
- 进程,线程相关id信息等
日志有现成的解决方案,如:spdlog、glog、Boost.Log、Log4cxx等等,我们依旧采用自定义日志的方式。
这里我们采用设计模式-策略模式来进行日志的设计,具体策略模式介绍,详情看代码和课程。
我们想要的日志格式如下:
Log.hpp:
#pragma once#include <iostream>
#include <cstdio>
#include <string>
#include <fstream>
#include <sstream>
#include <memory>
#include <filesystem> //C++17
#include <unistd.h>
#include <time.h>
#include "Mutex.hpp"namespace LogMudule
{using namespace LockModule;// 获取一下当前系统的时间std::string CurrentTime(){time_t time_stamp = ::time(nullptr);struct tm curr;localtime_r(&time_stamp, &curr); // 时间戳,获取可读性较强的时间信息5char buffer[1024];// bugsnprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d:%02d:%02d",curr.tm_year + 1900,curr.tm_mon + 1,curr.tm_mday,curr.tm_hour,curr.tm_min,curr.tm_sec);return buffer;}// 构成: 1. 构建日志字符串 2. 刷新落盘(screen, file)// 1. 日志文件的默认路径和文件名const std::string defaultlogpath = "./log/";const std::string defaultlogname = "log.txt";// 2. 日志等级enum class LogLevel{DEBUG = 1,INFO,WARNING,ERROR,FATAL};std::string Level2String(LogLevel level){switch (level){case LogLevel::DEBUG:return "DEBUG";case LogLevel::INFO:return "INFO";case LogLevel::WARNING:return "WARNING";case LogLevel::ERROR:return "ERROR";case LogLevel::FATAL:return "FATAL";default:return "None";}}// 3. 刷新策略.class LogStrategy{public:virtual ~LogStrategy() = default;virtual void SyncLog(const std::string &message) = 0;};// 3.1 控制台策略class ConsoleLogStrategy : public LogStrategy{public:ConsoleLogStrategy(){}~ConsoleLogStrategy(){}void SyncLog(const std::string &message){LockGuard lockguard(_lock);std::cout << message << std::endl;}private:Mutex _lock;};// 3.2 文件级(磁盘)策略class FileLogStrategy : public LogStrategy{public:FileLogStrategy(const std::string &logpath = defaultlogpath, const std::string &logname = defaultlogname): _logpath(logpath),_logname(logname){// 确认_logpath是存在的.LockGuard lockguard(_lock);if (std::filesystem::exists(_logpath)){return;}try{std::filesystem::create_directories(_logpath);}catch (std::filesystem::filesystem_error &e){std::cerr << e.what() << "\n";}}~FileLogStrategy(){}void SyncLog(const std::string &message){LockGuard lockguard(_lock);std::string log = _logpath + _logname; // ./log/log.txtstd::ofstream out(log, std::ios::app); // 日志写入,一定是追加if (!out.is_open()){return;}out << message << "\n";out.close();}private:std::string _logpath;std::string _logname;// 锁Mutex _lock;};// 日志类: 构建日志字符串, 根据策略,进行刷新class Logger{public:Logger(){// 默认采用ConsoleLogStrategy策略_strategy = std::make_shared<ConsoleLogStrategy>();}void EnableConsoleLog(){_strategy = std::make_shared<ConsoleLogStrategy>();}void EnableFileLog(){_strategy = std::make_shared<FileLogStrategy>();}~Logger() {}// 一条完整的信息: [2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [16] + 日志的可变部分(<< "hello world" << 3.14 << a << b;)class LogMessage{public:LogMessage(LogLevel level, const std::string &filename, int line, Logger &logger): _currtime(CurrentTime()),_level(level),_pid(::getpid()),_filename(filename),_line(line),_logger(logger){std::stringstream ssbuffer;ssbuffer << "[" << _currtime << "] "<< "[" << Level2String(_level) << "] "<< "[" << _pid << "] "<< "[" << _filename << "] "<< "[" << _line << "] - ";_loginfo = ssbuffer.str();}template <typename T>LogMessage &operator<<(const T &info){std::stringstream ss;ss << info;_loginfo += ss.str();return *this;}~LogMessage(){if (_logger._strategy){_logger._strategy->SyncLog(_loginfo);}}private:std::string _currtime; // 当前日志的时间LogLevel _level; // 日志等级pid_t _pid; // 进程pidstd::string _filename; // 源文件名称int _line; // 日志所在的行号Logger &_logger; // 负责根据不同的策略进行刷新std::string _loginfo; // 一条完整的日志记录};// 就是要拷贝,故意的拷贝LogMessage operator()(LogLevel level, const std::string &filename, int line){return LogMessage(level, filename, line, *this);}private:std::shared_ptr<LogStrategy> _strategy; // 日志刷新的策略方案};Logger logger;#define LOG(Level) logger(Level, __FILE__, __LINE__)
#define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog()
#define ENABLE_FILE_LOG() logger.EnableFileLog()
}
Main.cc:
#include "Log.hpp"using namespace LogMudule;int main()
{ENABLE_FILE_LOG();LOG(LogLevel::DEBUG) << "hello file";LOG(LogLevel::DEBUG) << "hello file";LOG(LogLevel::DEBUG) << "hello file";LOG(LogLevel::DEBUG) << "hello file";ENABLE_CONSOLE_LOG();LOG(LogLevel::DEBUG) << "hello world";LOG(LogLevel::DEBUG) << "hello world";LOG(LogLevel::DEBUG) << "hello world";LOG(LogLevel::DEBUG) << "hello world";return 0;
}
Mutex.hpp:
#pragma once
#include <iostream>
#include <pthread.h>namespace LockModule
{class Mutex{public:Mutex(const Mutex&) = delete;const Mutex& operator = (const Mutex&) = delete;Mutex(){int n = ::pthread_mutex_init(&_lock, nullptr);(void)n;}~Mutex(){int n = ::pthread_mutex_destroy(&_lock);(void)n;}void Lock(){int n = ::pthread_mutex_lock(&_lock);(void)n;}pthread_mutex_t *LockPtr(){return &_lock;}void Unlock(){int n = ::pthread_mutex_unlock(&_lock);(void)n;}private:pthread_mutex_t _lock;};class LockGuard{public:LockGuard(Mutex &mtx):_mtx(mtx){_mtx.Lock();}~LockGuard(){_mtx.Unlock();}private:Mutex &_mtx;};
}
Makefile:
bin=testLog
cc=g++
src=$(wildcard *.cc)
obj=$(src:.cc=.o)$(bin):$(obj)$(cc) -o $@ $^ -lpthread
%.o:%.cc$(cc) -c $< -std=c++17.PHONY:clean
clean:rm -f $(bin) $(obj).PHONY:test
test:echo $(src)echo $(obj)
2、线程池设计
线程池:
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景:
-
需要大量的线程来完成任务,且完成任务的时间比较短。比如WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
-
对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
-
接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。
线程池的种类
- 创建固定数量线程池,循环从任务队列中获取任务对象,获取到任务对象后,执行任务对象中的任务接口
- 浮动线程池,其他同上
此处,我们选择固定线程个数的线程池。
代码见最后,有些长。
3、线程安全的单例模式
(1)单例模式的特点
某些类,只应该具有一个对象(实例),就称之为单例
例如一个男人只能有一个媳妇.
在很多服务器开发场景中,经常需要让服务器加载很多的数据(上百G)到内存中,此时往往要用一个单例的类来管理这些数据。
(2)饿汉实现方式和懒汉实现方式
[洗碗的例子]
- 吃完饭,立刻洗碗,这种就是饿汉方式。因为下一顿吃的时候可以立刻拿着碗就能吃饭
- 吃完饭,先把碗放下,然后下一顿饭用到这个碗了再洗碗,就是懒汉方式。
懒汉方式最核心的思想是"延时加载".从而能够优化服务器的启动速度。
(i)饿汉方式实现单例模式
template <typename T>
class Singleton {static T data;
public:static T* GetInstance() {return &data;}
};
只要通过 Singleton 这个包装类来使用T对象,则一个进程中只有一个T对象的实例.
简单来说就是把对象创建出来。
(ii)懒汉方式实现单例模式
template <typename T>
class Singleton {static T* inst;
public:static T* GetInstance(){if(inst == NULL){inst = new T();}return inst;}
};
简单来说加载的时候不创建对象,只创建指针。运行的时候再创建
存在一个严重的问题,线程不安全
第一次调用 Getlnstance 的时候,如果两个线程同时调用,可能会创建出两份T对象的实例.但是后续再次调用,就没有问题了
(iii)懒汉方式实现单例模式(线程安全版本)
// 懒汉模式, 线程安全
template <typename T>
class Singleton {volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.static std::mutex lock;
public:static T* GetInstance() {if (inst == NULL) { // 双重判定空指针, 降低锁冲突的概率, 提⾼性能.lock.lock(); // 使⽤互斥锁, 保证多线程情况下也只调⽤⼀次 new.if (inst == NULL) {inst = new T();}lock.unlock();}return inst;}
};
注意事项:
1.加锁解锁的位置
2.双重if判定,避免不必要的锁竞争
3.volatile关键字防止过度优化
4、单例式线程池
#pragma once#include <iostream>
#include <string>
#include <queue>
#include <vector>
#include <memory>
#include "Log.hpp"
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"namespace ThreadPoolModule
{using namespace LogMudule;using namespace ThreadModule;using namespace LockModule;using namespace CondModule;// 用来做测试的线程方法void DefaultTest(){while (true){LOG(LogLevel::DEBUG) << "我是一个测试方法";sleep(1);}}using thread_t = std::shared_ptr<Thread>;const static int defaultnum = 5;template <typename T>class ThreadPool{private:bool IsEmpty() { return _taskq.empty(); }void HandlerTask(std::string name){LOG(LogLevel::INFO) << "线程: " << name << ", 进入HandlerTask的逻辑";while (true){// 1. 拿任务T t;{LockGuard lockguard(_lock);while (IsEmpty() && _isrunning){_wait_num++;_cond.Wait(_lock);_wait_num--;}// 2. 任务队列为空 && 线程池退出了if (IsEmpty() && !_isrunning)break;t = _taskq.front();_taskq.pop();}// 2. 处理任务t(name); // 规定,未来所有的任务处理,全部都是必须提供()方法!}LOG(LogLevel::INFO) << "线程: " << name << " 退出";}ThreadPool(const ThreadPool<T> &) = delete;ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;ThreadPool(int num = defaultnum) : _num(num), _wait_num(0), _isrunning(false){for (int i = 0; i < _num; i++){_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象 ... 成功";}}public:static ThreadPool<T> *getInstance(){if(instance == NULL){LockGuard lockguard(mutex);if(instance == NULL){LOG(LogLevel::INFO) << "单例首次被执行,需要加载对象...";instance = new ThreadPool<T>();}}return instance;}void Equeue(T &&in){LockGuard lockguard(_lock);if (!_isrunning)return;_taskq.push(std::move(in));if (_wait_num > 0)_cond.Notify();}void Start(){if (_isrunning)return;_isrunning = true; // bug fix??for (auto &thread_ptr : _threads){LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << " ... 成功";thread_ptr->Start();}}void Wait(){for (auto &thread_ptr : _threads){thread_ptr->Join();LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << " ... 成功";}}void Stop(){LockGuard lockguard(_lock);if (_isrunning){// 3. 不能在入任务了_isrunning = false; // 不工作// 1. 让线程自己退出(要唤醒) && // 2. 历史的任务被处理完了if (_wait_num > 0)_cond.NotifyAll();}}~ThreadPool(){}private:std::vector<thread_t> _threads;int _num;int _wait_num;std::queue<T> _taskq; // 临界资源Mutex _lock;Cond _cond;bool _isrunning;static ThreadPool<T> *instance;static Mutex mutex; //只用来保护单例};template<typename T>ThreadPool<T> *ThreadPool<T>::instance = NULL;template<typename T>Mutex ThreadPool<T>::mutex; //只用来保护单例
}
二、线程安全和重入问题
概念
线程安全: 就是多个线程在访问共享资源时,能够正确地执行,不会相互干扰或破坏彼此的执行结果。一般而言,多个线程并发同一段只有局部变量的代码时,不会出现不同的结果。但是对全局变量或者静态变量进行操作,并且没有锁保护的情况下,容易出现该问题。
重入: 同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
学到现在,其实我们已经能理解重入其实可以分为两种情况
- 多线程重入函数
- 信号导致一个执行流重复进入函数
结论
不要被上面绕口令式的话语唬住,你只要仔细观察,其实对应概念说的都是一回事。
三、常见锁概念
1、死锁
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。
(1)多线程造成死锁
为了方便表述,假设现在线程A,线程B必须同时持有锁1和锁2,才能进行后续资源的访问
申请一把锁是原子的,但是申请两把锁就不一定了
造成的结果是
(2)单执行流产生死锁
单执行流也有可能产生死锁,如果某一执行流连续申请了两次锁,那么此时该执行流就会被挂起。因为该执行流第一次申请锁的时候是申请成功的,但第二次申请锁时因为该锁已经被申请过了,于是申请失败导致被挂起直到该锁被释放时才会被唤醒,但是这个锁本来就在自己手上,自己现在处于被挂起的状态根本没有机会释放锁,所以该执行流将永远不会被唤醒,此时该执行流也就处于一种死锁的状态。
#include <stdio.h>
#include <pthread.h>pthread_mutex_t mutex;
void* Routine(void* arg)
{pthread_mutex_lock(&mutex);pthread_mutex_lock(&mutex);pthread_exit((void*)0);
}
int main()
{pthread_t tid;pthread_mutex_init(&mutex, NULL);pthread_create(&tid, NULL, Routine, NULL);pthread_join(tid, NULL);pthread_mutex_destroy(&mutex);return 0;
}
运行代码,此时该程序实际就处于一种被挂起的状态。
用ps命令查看该进程时可以看到,该进程当前的状态是Sl+,其中的l实际上就是lock的意思,表示该进程当前处于一种死锁的状态。
(3)阻塞
进程运行时是被CPU调度的,换句话说进程在调度时是需要用到CPU资源的,每个CPU都有一个运行等待队列(runqueue),CPU在运行时就是从该队列中获取进程进行调度的。
在运行等待队列中的进程本质上就是在等待CPU资源,实际上不止是等待CPU资源如此,等待其他资源也是如此,比如锁的资源、磁盘的资源、网卡的资源等等,它们都有各自对应的资源等待队列。
例如,当某一个进程在被CPU调度时,该进程需要用到锁的资源,但是此时锁的资源正在被其他进程使用:
- 那么此时该进程的状态就会由R状态变为某种阻塞状态,比如S状态。并且该进程会被移出运行等待队列,被链接到等待锁的资源的资源等待队列,而CPU则继续调度运行等待队列中的下一个进程。
- 此后若还有进程需要用到这一个锁的资源,那么这些进程也都会被移出运行等待队列,依次链接到这个锁的资源等待队列当中。
- 直到使用锁的进程已经使用完毕,也就是锁的资源已经就绪,此时就会从锁的资源等待队列中唤醒一个进程,将该进程的状态由S状态改为R状态,并将其重新链接到运行等待队列,等到CPU再次调度该进程时,该进程就可以使用到锁的资源了。
总结一下:
站在操作系统的角度,进程等待某种资源,就是将当前进程的task_struct放入对应的等待队列,这种情况可以称之为当前进程被挂起等待了。
站在用户角度,当进程等待某种资源时,用户看到的就是自己的进程卡住不动了,我们一般称之为应用阻塞了。
这里所说的资源可以是硬件资源也可以是软件资源,锁本质就是一种软件资源,当我们申请锁时,锁当前可能并没有就绪,可能正在被其他线程所占用,此时当其他线程再来申请锁时,就会被放到这个锁的资源等待队列当中。
2、死锁四个必要条件
(1)互斥条件
一个资源每次只能被一个执行流使用。(好理解,不解释。)
(2)请求与保持条件
一个执行流因请求资源而阻塞时,对已获得的资源保持不放。
(3)不剥夺条件
一个执行流已获得的资源,在他未使用完之前,另一个线程不能强行剥夺。
(4)循环等待条件
若干执行流之间形成一种头尾相接的循环等待资源的关系。
3、避免死锁
(1)破坏死锁的四个必要条件
破坏循环等待条件问题:资源一次性分配,使用超时机制、加锁顺序一致。
// 下⾯的C++不写了,理解就可以
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <unistd.h>
// 定义两个共享资源(整数变量)和两个互斥锁
int shared_resource1 = 0;
int shared_resource2 = 0;
std::mutex mtx1, mtx2;
// ⼀个函数,同时访问两个共享资源
void access_shared_resources()
{// std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);// std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);// // 使⽤ std::lock 同时锁定两个互斥锁// std::lock(lock1, lock2);// 现在两个互斥锁都已锁定,可以安全地访问共享资源int cnt = 10000;while (cnt){++shared_resource1;++shared_resource2;}// 当离开 access_shared_resources 的作⽤域时,lock1 和 lock2 的析构函数会被⾃动调⽤// 这会导致它们各⾃的互斥量被⾃动解锁
}
// 模拟多线程同时访问共享资源的场景
void simulate_concurrent_access()
{std::vector<std::thread> threads;// 创建多个线程来模拟并发访问for (int i = 0; i < 10; ++i){threads.emplace_back(access_shared_resources);}// 等待所有线程完成for (auto &thread : threads){thread.join();}// 输出共享资源的最终状态std::cout << "Shared Resource 1: " << shared_resource1 << std::endl;std::cout << "Shared Resource 2: " << shared_resource2 << std::endl;
}
int main()
{simulate_concurrent_access();return 0;
}
运行结果:
(2)避免锁未释放的场景
4、避免死锁算法(不讲)
- 死锁检测算法(了解)
- 银行家算法(了解)
四、STL,智能指针和线程安全
1、STL中的容器是否是线程安全的?
不是.
原因是,STL的设计初衷是将性能挖掘到极致,而一旦涉及到加锁保证线程安全,会对性能造成巨大的景响.
而且对于不同的容器,加锁方式的不同,性能可能也不同(例如hash表的锁表和锁桶).
因此 STL默认不是线程安全,如果需要在多线程环境下使用,往往需要调用者自行保证线程安全
2、智能指针是否是线程安全的?
对于 unique_ptr, 由于只是在当前代码块范围内生效,因此不涉及线程安全问题.对于 shared_ptr,多个对象需要共用一个引用计数变量,所以会存在线程安全问题,但是标准库实现的时候考虑到了这个问题,基于原子操作(CAS)的方式保证 shared ptr 能够高效,原子的操作引用计数。
五、其他常见的各种锁
- 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
- 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
- CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
- 自旋锁,读写锁,加餐课详细介绍
线程池代码如下:
ThreadPool.cc:
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <memory>using namespace ThreadPoolModule;int main()
{ENABLE_CONSOLE_LOG();// ENABLE_FILE_LOG();std::unique_ptr<ThreadPool<task_t>> tp = std::make_unique<ThreadPool<task_t>>();tp->Start();int cnt = 10;char c;while (true){std::cin >> c;tp->Equeue(Push);// cnt--;// sleep(1);}tp->Stop();sleep(3);tp->Wait();return 0;
}
ThreadPool.hpp:
#pragma once#include <iostream>
#include <string>
#include <queue>
#include <vector>
#include <memory>
#include "Log.hpp"
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"namespace ThreadPoolModule
{using namespace LogMudule;using namespace ThreadModule;using namespace LockModule;using namespace CondModule;// 用来做测试的线程方法void DefaultTest(){while (true){LOG(LogLevel::DEBUG) << "我是一个测试方法";sleep(1);}}using thread_t = std::shared_ptr<Thread>;const static int defaultnum = 5;template <typename T>class ThreadPool{ private:bool IsEmpty() { return _taskq.empty(); }void HandlerTask(std::string name){LOG(LogLevel::INFO) << "线程: " << name << ", 进入HandlerTask的逻辑";while (true){// 1. 拿任务T t;{LockGuard lockguard(_lock);while (IsEmpty() && _isrunning){_wait_num++;_cond.Wait(_lock);_wait_num--;}// 2. 任务队列为空 && 线程池退出了if(IsEmpty() && !_isrunning)break;t = _taskq.front();_taskq.pop();}// 2. 处理任务t(name); // 规定,未来所有的任务处理,全部都是必须提供()方法!}LOG(LogLevel::INFO) << "线程: " << name << " 退出";}public:ThreadPool(int num = defaultnum) : _num(num), _wait_num(0), _isrunning(false){for (int i = 0; i < _num; i++){_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象 ... 成功";}}void Equeue(T &&in){LockGuard lockguard(_lock);if(!_isrunning) return;_taskq.push(std::move(in));if(_wait_num > 0)_cond.Notify();}void Start(){if(_isrunning) return;_isrunning = true; // bug fix??for (auto &thread_ptr : _threads){LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << " ... 成功";thread_ptr->Start();}}void Wait(){for (auto &thread_ptr : _threads){thread_ptr->Join();LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << " ... 成功";}}void Stop(){LockGuard lockguard(_lock);if(_isrunning){// 3. 不能在入任务了_isrunning = false; // 不工作// 1. 让线程自己退出(要唤醒) && // 2. 历史的任务被处理完了if(_wait_num>0)_cond.NotifyAll();}}~ThreadPool(){}private:std::vector<thread_t> _threads;int _num;int _wait_num;std::queue<T> _taskq; // 临界资源Mutex _lock;Cond _cond;bool _isrunning;};
}
Thread.hpp:
#ifndef _THREAD_HPP__
#define _THREAD_HPP__#include <iostream>
#include <string>
#include <pthread.h>
#include <functional>
#include <sys/types.h>
#include <unistd.h>// v1
namespace ThreadModule
{using func_t = std::function<void(std::string name)>;static int number = 1;enum class TSTATUS{NEW,RUNNING,STOP};class Thread{private:// 成员方法!static void *Routine(void *args){Thread *t = static_cast<Thread *>(args);t->_status = TSTATUS::RUNNING;t->_func(t->Name());return nullptr;}void EnableDetach() { _joinable = false; }public:Thread(func_t func) : _func(func), _status(TSTATUS::NEW), _joinable(true){_name = "Thread-" + std::to_string(number++);_pid = getpid();}bool Start(){if (_status != TSTATUS::RUNNING){int n = ::pthread_create(&_tid, nullptr, Routine, this); // TODOif (n != 0)return false;return true;}return false;}bool Stop(){if (_status == TSTATUS::RUNNING){int n = ::pthread_cancel(_tid);if (n != 0)return false;_status = TSTATUS::STOP;return true;}return false;}bool Join(){if (_joinable){int n = ::pthread_join(_tid, nullptr);if (n != 0)return false;_status = TSTATUS::STOP;return true;}return false;}void Detach(){EnableDetach();pthread_detach(_tid);}bool IsJoinable() { return _joinable; }std::string Name() {return _name;}~Thread(){}private:std::string _name;pthread_t _tid;pid_t _pid;bool _joinable; // 是否是分离的,默认不是func_t _func;TSTATUS _status;};
}// v2
// namespace ThreadModule
// {
// static int number = 1;
// enum class TSTATUS
// {
// NEW,
// RUNNING,
// STOP
// };// template <typename T>
// class Thread
// {
// using func_t = std::function<void(T)>;
// private:
// // 成员方法!
// static void *Routine(void *args)
// {
// Thread<T> *t = static_cast<Thread<T> *>(args);
// t->_status = TSTATUS::RUNNING;
// t->_func(t->_data);
// return nullptr;
// }
// void EnableDetach() { _joinable = false; }// public:
// Thread(func_t func, T data) : _func(func), _data(data), _status(TSTATUS::NEW), _joinable(true)
// {
// _name = "Thread-" + std::to_string(number++);
// _pid = getpid();
// }
// bool Start()
// {
// if (_status != TSTATUS::RUNNING)
// {
// int n = ::pthread_create(&_tid, nullptr, Routine, this); // TODO
// if (n != 0)
// return false;
// return true;
// }
// return false;
// }
// bool Stop()
// {
// if (_status == TSTATUS::RUNNING)
// {
// int n = ::pthread_cancel(_tid);
// if (n != 0)
// return false;
// _status = TSTATUS::STOP;
// return true;
// }
// return false;
// }
// bool Join()
// {
// if (_joinable)
// {
// int n = ::pthread_join(_tid, nullptr);
// if (n != 0)
// return false;
// _status = TSTATUS::STOP;
// return true;
// }
// return false;
// }
// void Detach()
// {
// EnableDetach();
// pthread_detach(_tid);
// }
// bool IsJoinable() { return _joinable; }
// std::string Name() { return _name; }
// ~Thread()
// {
// }// private:
// std::string _name;
// pthread_t _tid;
// pid_t _pid;
// bool _joinable; // 是否是分离的,默认不是
// func_t _func;
// TSTATUS _status;
// T _data;
// };
// }#endif
Task.hpp:
#ifndef _THREAD_HPP__
#define _THREAD_HPP__#include <iostream>
#include <string>
#include <pthread.h>
#include <functional>
#include <sys/types.h>
#include <unistd.h>// v1
namespace ThreadModule
{using func_t = std::function<void(std::string name)>;static int number = 1;enum class TSTATUS{NEW,RUNNING,STOP};class Thread{private:// 成员方法!static void *Routine(void *args){Thread *t = static_cast<Thread *>(args);t->_status = TSTATUS::RUNNING;t->_func(t->Name());return nullptr;}void EnableDetach() { _joinable = false; }public:Thread(func_t func) : _func(func), _status(TSTATUS::NEW), _joinable(true){_name = "Thread-" + std::to_string(number++);_pid = getpid();}bool Start(){if (_status != TSTATUS::RUNNING){int n = ::pthread_create(&_tid, nullptr, Routine, this); // TODOif (n != 0)return false;return true;}return false;}bool Stop(){if (_status == TSTATUS::RUNNING){int n = ::pthread_cancel(_tid);if (n != 0)return false;_status = TSTATUS::STOP;return true;}return false;}bool Join(){if (_joinable){int n = ::pthread_join(_tid, nullptr);if (n != 0)return false;_status = TSTATUS::STOP;return true;}return false;}void Detach(){EnableDetach();pthread_detach(_tid);}bool IsJoinable() { return _joinable; }std::string Name() {return _name;}~Thread(){}private:std::string _name;pthread_t _tid;pid_t _pid;bool _joinable; // 是否是分离的,默认不是func_t _func;TSTATUS _status;};
}// v2
// namespace ThreadModule
// {
// static int number = 1;
// enum class TSTATUS
// {
// NEW,
// RUNNING,
// STOP
// };// template <typename T>
// class Thread
// {
// using func_t = std::function<void(T)>;
// private:
// // 成员方法!
// static void *Routine(void *args)
// {
// Thread<T> *t = static_cast<Thread<T> *>(args);
// t->_status = TSTATUS::RUNNING;
// t->_func(t->_data);
// return nullptr;
// }
// void EnableDetach() { _joinable = false; }// public:
// Thread(func_t func, T data) : _func(func), _data(data), _status(TSTATUS::NEW), _joinable(true)
// {
// _name = "Thread-" + std::to_string(number++);
// _pid = getpid();
// }
// bool Start()
// {
// if (_status != TSTATUS::RUNNING)
// {
// int n = ::pthread_create(&_tid, nullptr, Routine, this); // TODO
// if (n != 0)
// return false;
// return true;
// }
// return false;
// }
// bool Stop()
// {
// if (_status == TSTATUS::RUNNING)
// {
// int n = ::pthread_cancel(_tid);
// if (n != 0)
// return false;
// _status = TSTATUS::STOP;
// return true;
// }
// return false;
// }
// bool Join()
// {
// if (_joinable)
// {
// int n = ::pthread_join(_tid, nullptr);
// if (n != 0)
// return false;
// _status = TSTATUS::STOP;
// return true;
// }
// return false;
// }
// void Detach()
// {
// EnableDetach();
// pthread_detach(_tid);
// }`在这里插入代码片`
// bool IsJoinable() { return _joinable; }
// std::string Name() { return _name; }
// ~Thread()
// {
// }// private:
// std::string _name;
// pthread_t _tid;
// pid_t _pid;
// bool _joinable; // 是否是分离的,默认不是
// func_t _func;
// TSTATUS _status;
// T _data;
// };
// }#endif
Mutex.hpp:
#pragma once
#include <iostream>
#include <pthread.h>namespace LockModule
{class Mutex{public:Mutex(const Mutex &) = delete;const Mutex &operator=(const Mutex &) = delete;Mutex(){int n = ::pthread_mutex_init(&_lock, nullptr);(void)n;}~Mutex(){int n = ::pthread_mutex_destroy(&_lock);(void)n;}void Lock(){int n = ::pthread_mutex_lock(&_lock);(void)n;}void Unlock(){int n = ::pthread_mutex_unlock(&_lock);(void)n;}pthread_mutex_t *LockPtr(){return &_lock;}private:pthread_mutex_t _lock;};class LockGuard{public:LockGuard(Mutex &mtx) : _mtx(mtx){_mtx.Lock();}~LockGuard(){_mtx.Unlock();}private:Mutex &_mtx;};
}
Log.hpp:
#pragma once#include <iostream>
#include <cstdio>
#include <string>
#include <fstream>
#include <sstream>
#include <memory>
#include <filesystem> //C++17
#include <unistd.h>
#include <time.h>
#include "Mutex.hpp"namespace LogMudule
{using namespace LockModule;// 获取一下当前系统的时间std::string CurrentTime(){time_t time_stamp = ::time(nullptr);struct tm curr;localtime_r(&time_stamp, &curr); // 时间戳,获取可读性较强的时间信息5char buffer[1024];// bugsnprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d:%02d:%02d",curr.tm_year + 1900,curr.tm_mon + 1,curr.tm_mday,curr.tm_hour,curr.tm_min,curr.tm_sec);return buffer;}// 构成: 1. 构建日志字符串 2. 刷新落盘(screen, file)// 1. 日志文件的默认路径和文件名const std::string defaultlogpath = "./log/";const std::string defaultlogname = "log.txt";// 2. 日志等级enum class LogLevel{DEBUG = 1,INFO,WARNING,ERROR,FATAL};std::string Level2String(LogLevel level){switch (level){case LogLevel::DEBUG:return "DEBUG";case LogLevel::INFO:return "INFO";case LogLevel::WARNING:return "WARNING";case LogLevel::ERROR:return "ERROR";case LogLevel::FATAL:return "FATAL";default:return "None";}}// 3. 刷新策略.class LogStrategy{public:virtual ~LogStrategy() = default;virtual void SyncLog(const std::string &message) = 0;};// 3.1 控制台策略class ConsoleLogStrategy : public LogStrategy{public:ConsoleLogStrategy(){}~ConsoleLogStrategy(){}void SyncLog(const std::string &message){LockGuard lockguard(_lock);std::cout << message << std::endl;}private:Mutex _lock;};// 3.2 文件级(磁盘)策略class FileLogStrategy : public LogStrategy{public:FileLogStrategy(const std::string &logpath = defaultlogpath, const std::string &logname = defaultlogname): _logpath(logpath),_logname(logname){// 确认_logpath是存在的.LockGuard lockguard(_lock);if (std::filesystem::exists(_logpath)){return;}try{std::filesystem::create_directories(_logpath);}catch (std::filesystem::filesystem_error &e){std::cerr << e.what() << "\n";}}~FileLogStrategy(){}void SyncLog(const std::string &message){LockGuard lockguard(_lock);std::string log = _logpath + _logname; // ./log/log.txtstd::ofstream out(log, std::ios::app); // 日志写入,一定是追加if (!out.is_open()){return;}out << message << "\n";out.close();}private:std::string _logpath;std::string _logname;// 锁Mutex _lock;};// 日志类: 构建日志字符串, 根据策略,进行刷新class Logger{public:Logger(){// 默认采用ConsoleLogStrategy策略_strategy = std::make_shared<ConsoleLogStrategy>();}void EnableConsoleLog(){_strategy = std::make_shared<ConsoleLogStrategy>();}void EnableFileLog(){_strategy = std::make_shared<FileLogStrategy>();}~Logger() {}// 一条完整的信息: [2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [16] + 日志的可变部分(<< "hello world" << 3.14 << a << b;)class LogMessage{public:LogMessage(LogLevel level, const std::string &filename, int line, Logger &logger): _currtime(CurrentTime()),_level(level),_pid(::getpid()),_filename(filename),_line(line),_logger(logger){std::stringstream ssbuffer;ssbuffer << "[" << _currtime << "] "<< "[" << Level2String(_level) << "] "<< "[" << _pid << "] "<< "[" << _filename << "] "<< "[" << _line << "] - ";_loginfo = ssbuffer.str();}template <typename T>LogMessage &operator<<(const T &info){std::stringstream ss;ss << info;_loginfo += ss.str();return *this;}~LogMessage(){if (_logger._strategy){_logger._strategy->SyncLog(_loginfo);}}private:std::string _currtime; // 当前日志的时间LogLevel _level; // 日志等级pid_t _pid; // 进程pidstd::string _filename; // 源文件名称int _line; // 日志所在的行号Logger &_logger; // 负责根据不同的策略进行刷新std::string _loginfo; // 一条完整的日志记录};// 就是要拷贝,故意的拷贝LogMessage operator()(LogLevel level, const std::string &filename, int line){return LogMessage(level, filename, line, *this);}private:std::shared_ptr<LogStrategy> _strategy; // 日志刷新的策略方案};Logger logger;#define LOG(Level) logger(Level, __FILE__, __LINE__)
#define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog()
#define ENABLE_FILE_LOG() logger.EnableFileLog()
}
Cond.hpp:
#pragma once#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"namespace CondModule
{using namespace LockModule;class Cond{public:Cond(){int n = ::pthread_cond_init(&_cond, nullptr);(void)n;}void Wait(Mutex &lock) // 让我们的线程释放曾经持有的锁!{int n = ::pthread_cond_wait(&_cond, lock.LockPtr());}void Notify(){int n = ::pthread_cond_signal(&_cond);(void)n;}void NotifyAll(){int n = ::pthread_cond_broadcast(&_cond);(void)n;}~Cond(){int n = ::pthread_cond_destroy(&_cond);}private:pthread_cond_t _cond;};
}
Makefile:
bin=thread_pool
cc=g++
src=$(wildcard *.cc)
obj=$(src:.cc=.o)$(bin):$(obj)$(cc) -o $@ $^ -lpthread
%.o:%.cc$(cc) -c $< -std=c++17.PHONY:clean
clean:rm -f $(bin) $(obj).PHONY:test
test:echo $(src)echo $(obj)