线程系列:
Linux–线程的认识(一)
Linux–线程的分离、线程库的地址关系的理解、线程的简单封装(二)
线程的互斥:临界资源只能在同一时间被一个线程使用
生产消费模型
信号量
线程池
线程池(Thread Pool)是一种基于池化技术设计用于管理和复用线程的机制。
在多线程编程中,频繁地创建和销毁线程会消耗大量的系统资源,并且由于线程的创建和销毁需要时间,这也会降低程序的执行效率。线程池通过预先创建一定数量的线程并放入池中,需要时从池中取出线程执行任务,执行完毕后线程并不销毁而是重新放回池中等待下一次使用,从而避免了线程的频繁创建和销毁所带来的开销。
主要优点
- 降低资源消耗:通过复用已存在的线程,减少线程创建和销毁的开销。
- 提高响应速度:当任务到达时,可以立即分配线程进行处理,减少了等待时间。
- 提高线程的可管理性:线程池可以统一管理线程,包括线程的创建、调度、执行和销毁等。
关键参数
- 核心线程数:线程池维护线程的最少数量,即使这些线程处于空闲状态,线程池也不会回收它们(一般为主线程)。
- 最大线程数:线程池中允许的最大线程数。
- 阻塞队列:用于存放待执行的任务的队列。当线程池中的所有线程都忙时,新任务会被添加到这个队列中等待处理。
- 非核心线程空闲存活时间:当线程池中的线程数量超过核心线程数时,如果线程空闲时间超过这个时间,多余的线程将被终止。
- 线程工厂:用于创建新线程的工厂。
- 拒绝策略:当线程池和队列都满了时,对于新来的任务将采取的拒绝策略。
实现原理
线程池的实现原理通常包括以下几个关键部分:
- 线程池管理器:负责创建并管理线程池,包括初始化线程池、创建工作线程、销毁线程池等。
- 工作线程:线程池中的线程,负责执行具体的任务。工作线程通常会不断从任务队列中取出任务并执行,直- 到线程池被销毁或所有任务都执行完毕。
- 任务接口:每个任务必须实现的接口,用于定义任务的执行逻辑。在Java中,这通常是通过实现Runnable或Callable接口来实现的。
- 任务队列:用于存放待处理的任务。当线程池中的工作线程数量达到最大值时,新到达的任务会被放入任务队列中等待处理。任务队列的实现通常依赖于Java的BlockingQueue接口。
实现流程
- 当有新任务提交给线程池时,线程池会首先判断当前正在运行的线程数量是否小于核心线程数。如果是,则直接创建新的线程来执行任务;否则,将任务加入任务队列等待处理。
- 如果任务队列已满,且当前正在运行的线程数量小于最大线程数(maximumPoolSize),则创建新的线程来处理任务;如果线程数量已经达到最大线程数,则根据配置的拒绝策略来处理新任务(如抛出异常、直接丢弃等)。
- 当一个线程完成任务后,它会从任务队列中取下一个任务来执行;如果没有任务可供执行,并且线程池中的线程数量超过了核心线程数,且这些线程空闲时间超过了设定的存活时间,则这些线程会被销毁,直到线程池中的线程数量减少到核心线程数为止。
实例
Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include<iostream>
#include<string>
#include<pthread.h>
#include<functional>
#include<unistd.h>using namespace std;namespace ThreadMdule
{using func_t = std::function<void(string)>;class Thread{public:void Excute(){_func(_threadname);}Thread(func_t func, const std::string &name="none-name"): _func(func), _threadname(name), _stop(true){}static void* threadroutine(void* args){Thread* self=static_cast<Thread*>(args);self->Excute();return nullptr;}bool start(){int n=pthread_create(&_tid,nullptr,threadroutine,this);if(!n){_stop = false;return true;}else{return false;}}void Detach(){if(!_stop){pthread_detach(_tid);}}void Join(){if(!_stop){pthread_join(_tid,nullptr);}}string name(){return _threadname;}void Stop(){_stop = true;}~Thread() {}private:pthread_t _tid;std::string _threadname;func_t _func;bool _stop;};
}#endif
ThreadPool.hpp
#pragma once#include<iostream>
#include<vector>
#include<queue>
#include<pthread.h>
#include"Thread.hpp"
#include"Log.hpp"
#include"LockGuard.hpp"
using namespace ThreadMdule;
using namespace std;
const static int gdefaultthreadnum=3;//默认线程池的线程数template <class T>
class ThreadPool
{
public:ThreadPool(int threadnum=gdefaultthreadnum) :_threadnum(threadnum),_waitnum(0),_isrunning(false){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_cond,nullptr);LOG(INFO,"ThreadPool COnstruct.");}//各个线程独立的任务函数void HandlerTask(string name){LOG(INFO,"%s is running...",name.c_str());while(true){LockQueue();//开启保护//等到有任务时才退出循环执行下列语句while(_task_queue.empty()&&_isrunning){_waitnum++;ThreadSleep();_waitnum--;}//当任务队列空并且线程池停止时线程退出if(_task_queue.empty()&&!_isrunning){UnlockQueue();cout<<name<<" quit "<<endl;sleep(1);break;}//1.任务队列不为空&&线程池开启//2.任务队列不为空&&线程池关闭,直到任务队列为空//所以,只要有任务,就要处理任务T t=_task_queue.front();//取出对应任务_task_queue.pop();UnlockQueue();LOG(DEBUG,"%s get a task",name.c_str());//处理任务t();LOG(DEBUG,"%s handler a task,result is: %s",name.c_str(),t.ResultToString().c_str());}}//线程池中线程的构建void InitThreadPool(){for(int i=0;i<_threadnum;i++){string name="thread-"+to_string(i+1);_threads.emplace_back(bind(&ThreadPool::HandlerTask,this,placeholders::_1),name);LOG(INFO,"init thread %s done",name.c_str());}_isrunning=true;}//线程池的启动void Start(){for(auto& thread:_threads){thread.start();}}//线程池停止void Stop(){LockQueue();_isrunning=false;ThreadWakeupAll();UnlockQueue();}void Wait(){for(auto& thread:_threads){thread.Join();LOG(INFO,"%s is quit...",thread.name().c_str());}}//将任务入队列bool Enqueue(const T& t){bool ret=false;LockQueue();if(_isrunning){_task_queue.push(t);//如果有空闲的线程,那么唤醒线程让其执行任务if(_waitnum>0){ThreadWakeup();}LOG(DEBUG,"enqueue task success");ret=true;}UnlockQueue();return ret;}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}
private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnlockQueue(){pthread_mutex_unlock(&_mutex);}void ThreadSleep(){pthread_cond_wait(&_cond, &_mutex);}void ThreadWakeup(){pthread_cond_signal(&_cond);}void ThreadWakeupAll(){pthread_cond_broadcast(&_cond);}int _threadnum;//线程数vector<Thread> _threads;//存储线程的vectorqueue<T> _task_queue;//输入的任务队列pthread_mutex_t _mutex;//互斥锁pthread_cond_t _cond;//条件变量int _waitnum;//空闲的线程数bool _isrunning;//表示线程池是否启动};
Task.hpp
#include<iostream>
#include<string>
#include<functional>
using namespace std;class Task
{
public:Task(){}Task(int a,int b): _a(a),_b(b),_result(0){}void Excute(){_result=_a+_b;}string ResultToString(){return to_string(_a) + "+"+to_string(_b)+"="+to_string(_result);}string DebugToString(){return to_string(_a) + "+" + to_string(_b) + "= ?";}void operator()(){Excute();}
private:int _a;int _b;int _result;
};
main.cc
int main()
{srand(time(nullptr)^getpid()^pthread_self());//EnableScreen();EnableFile();unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>(5));tp->InitThreadPool();tp->Start();int tasknum=10;while(tasknum){sleep(1);int a=rand()%10+1;usleep(1024);int b=rand()%20+1;Task t(a,b);LOG(INFO,"main thread push task: %s",t.DebugToString().c_str());tp->Enqueue(t);tasknum--;}tp->Stop();tp->Wait();
}
Loh.hpp
#pragma once#include<iostream>
#include<fstream>
#include<ctime>
#include<cstdarg>
#include<string>
#include<sys/types.h>
#include<unistd.h>
#include<cstdio>
#include"LockGuard.hpp"using namespace std;bool gIsSave=false;//默认输出到屏幕
const string logname="log.txt";
//1.日志是有等级的
enum Level
{DEBUG=0,INFO,WARNING,ERROR,FATAL
};void SaveFile(const string& filename,const string& messages)
{ofstream out(filename,ios::app);if(!out.is_open()){return;}out<<messages;out.close();
}
//等级转化为字符串
string LevelToString(int level)
{switch (level){case DEBUG:return "Debug";case INFO:return "Info";case WARNING:return "Warning";case ERROR:return "Error";case FATAL:return "Fatal";default:return "Unkonwn";}
}//获取当前时间
string GetTimeString()
{time_t curr_time=time(nullptr);//时间戳struct tm* format_time=localtime(&curr_time);//转化为时间结构if(format_time==nullptr)return "None";char time_buffer[1024];snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",format_time->tm_year + 1900,format_time->tm_mon + 1,format_time->tm_mday,format_time->tm_hour,format_time->tm_min,format_time->tm_sec);return time_buffer;}pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
//获取日志信息
void LogMessage(string filename,int line,bool issave,int level,char* format,...)
{string levelstr=LevelToString(level);string timestr=GetTimeString();pid_t selfid=getpid();char buffer[1024];va_list arg;va_start(arg,format);vsnprintf(buffer,sizeof(buffer),format,arg);va_end(arg);string message= "[" + timestr + "]" + "[" + levelstr + "]" +"[" + std::to_string(selfid) + "]" +"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";LockGuard lockguard(&lock);if(!issave){cout<<message;}else{SaveFile(logname,message);}}#define LOG(level,format,...) \do \{ \LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__); \} while (0)#define EnableFile() \do \{ \gIsSave=true; \ } while (0)#define EnableScreen() \do \{ \gIsSave=false; \ } while (0)
线程池解释(代码)
日志解释(代码)
日志是系统或程序记录所有发生事件的文件: