目录
信号量
初始化编辑
销毁
等待
发布
基于环形队列的生产消费模型
问题解答:
代码:
线程池
线程池的实现
(1)初始化,构造大致框架
(2)创建线程
(3)创建任务
(4)完善线程要执行的任务
(5)更新初始化Init()函数
代码:
日志
信号量
POSIX 信号量和 SystemV 信号量作用相同, 都是用于同步操作, 达到无冲突的访问共
享资源目的。 但 POSIX 可以用于线程间同步。
信号量本质就是一个计数器,对公共资源的预定机制;
初始化
sem_t _data_sem;
sem_init(&_data_sem,0,0);
//pshared:0 表示线程间共享, 非零表示进程间共享
//value: 信号量初始值
销毁
sem_destroy(&_data_sem);
等待
功能: 等待信号量, 会将信号量的值减 1;
void P(sem_t &s)//申请信号量,--{sem_wait(&s);}
发布
功能: 发布信号量, 表示资源使用完毕, 可以归还资源了。 将信号量值加 1。
void V(sem_t &s)//释放资源,++{sem_post(&s);}
基于环形队列的生产消费模型
之前我们写的生产消费模型是基于 queue 的,其空间可以动态分配,现在基于固定大小
的环形队列重写这个程序(POSIX 信号量) ;
环形队列采用数组模拟, 用模运算来模拟环状特性;
(1)初始化
(2)完善生产消费代码
(3)完善main
当然任务不止是参数,也可以是类:
问题解答:
(1)上面的是单生产单消费的例子,那多生产多消费呢?
两个锁:多个生产者竞争一个锁,多个消费者竞争一个锁;其实本质还是单生产单消费;但是由于处理数据和构造数据都需要时间,所以多生产多消费效率更改高;
(2)在多生产多消费时,是先加锁还是先申请信号量?
先申请信号量;这个问题就好比你去电影院看电影,是先排队(此时你并没有买票)还是先买票的问题,肯定是先买票效率更高,要不然排到你,你没有电影票还是进不去;
(3)为什么信号量对资源进行使用、申请时,不判断一下条件是否满足?
因为信号量本身就是一个判断条件;
代码:
#pragma once#include<iostream>
#include<pthread.h>
#include<semaphore.h>
#include<vector>
#include<string>
#include<unistd.h>
#include<sys/types.h>using namespace std;
const int defaultcp =5;
template<typename T>
class Ringqueue
{
private:void P(sem_t &s)//申请信号量,--{sem_wait(&s);}void V(sem_t &s)//释放资源,++{sem_post(&s);}
public:Ringqueue(int max_cp = defaultcp):_max_cp(max_cp),_ringqueue(max_cp),_c_step(0),_p_step(0){sem_init(&_data_sem,0,0);sem_init(&_space_sem,0,max_cp);}~Ringqueue(){sem_destroy(&_data_sem);sem_destroy(&_space_sem);}void Push(const T &in)//生产{P(_space_sem);_ringqueue[_p_step]=in;_p_step++;_p_step%=_max_cp;V(_data_sem);}void Pop(T *out)//消费{P(_data_sem);*out=_ringqueue[_c_step];_c_step++;_c_step%=_max_cp;V(_space_sem);}private:vector<T>_ringqueue;int _p_step;int _c_step;int _max_cp;sem_t _data_sem;sem_t _space_sem;
};
线程池
线程池其实就是一种线程的使用模式;
线程过多会带来调度开销, 进而影响缓存局部性和整体性能。 而线程池维护着多个线程, 等待着监督管理者分配可并发执行的任务。 这避免了在处理短时间任务时创建与销毁线程的代价。 线程池不仅能够保证内核的充分利用, 还能防止过分调度。
线程池的实现
(1)初始化,构造大致框架
大致框架要有多个线程(用vector维护),要有任务队列(task_queue)能生产任务;
(2)创建线程
完成线程的创建,这里我直接用的上一篇文章自己封装的线程;
(3)创建任务
(4)完善线程要执行的任务
(5)更新初始化Init()函数
这样就完成了线程池的主要内容了,剩下的就是修改一下细节部分即可;
线程池的实现:构成出大致框架,在任务的函数中,注意如果任务列表中没有任务,那么线程就要处于等待状态,如果创建出一个任务后,就可以唤醒一个线程去执行即可;
代码:
#pragma once#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <queue>
#include "Thread.hpp"using namespace std;
const int defaultnum = 5;
template <typename T>
class ThreadPool
{
private:void WakeUpAll(){pthread_cond_broadcast(&_cond);}void Lock(){pthread_mutex_lock(&_mutex);}void Unlock(){pthread_mutex_unlock(&_mutex);}void WakeUp(){pthread_cond_signal(&_cond);}bool isEmpty(){return _task_queue.empty();}void HandlerTask(const string &name){while (true){// 取任务Lock();while (isEmpty() && _isrunning){// 休眠_sleep_num++;pthread_cond_wait(&_cond, &_mutex);_sleep_num--;}if (isEmpty() && !_isrunning){cout << name << "quit..." << endl;Unlock();break;}// 有任务T t = _task_queue.front();_task_queue.pop();Unlock();// 处理任务t();cout << name << t.Excute() << "任务处理完" << endl;}}public:ThreadPool(int thread_num = defaultnum) : _thread_num(thread_num), _isrunning(false), _sleep_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}void Init(){func_t func = bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);// 创建线程for (int i = 0; i < _thread_num; i++){string name = "thread-" + to_string(i + 1);_threads.emplace_back(name, func);}}void Start(){_isrunning = true;for (auto &thread : _threads){thread.start();}}void Stop(){Lock();_isrunning = false;WakeUpAll();Unlock();}void Equeue(const T &in){Lock();if (_isrunning){// 生产任务_task_queue.push(in);// 唤醒线程if (_sleep_num > 0){WakeUp();}}Unlock();}private:int _thread_num;vector<Thread> _threads; // 线程queue<T> _task_queue; // 任务,共享资源bool _isrunning;int _sleep_num; // 休眠的个数pthread_mutex_t _mutex;pthread_cond_t _cond;
};
日志
日志:软件运行的记录信息、向显示器打印、向文件打印、特定的格式;
【日志等级】【pid】【filename】【filenumber】【time】日志内容(支持可变参数)
日志等级:DEBUG、INFO、WARNING、ERROR、FATAL(致命的);
在初始化的时候,主要就是可变参数的初始化
其实写到上面这一步就以及完成的日志的实现;
我们来运行一下代码来看看:
main函数:
cout<<gettime()<<endl;Log lg;lg.logMessage("main.cc",10,DEBUG,"hello %d,world%c,hello %f\n",10,'A',3.14);
下面我们只需要完善一下使该日志信息可以向显示器中打印,也可以向文件中打印;
设置一个类型_type;默认是向显示器打印;在执行打印代码时,只需要判断一下_type即可;
如果是向显示器打印,直接printf即可;如果是文件中打印,需要先打开对应的文件,在将日志信息写入;
完整代码:
#pragma once#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <ctime>
#include <stdarg.h>
// #include<stdio.h>
#include<ostream>
#include <fstream>using namespace std;#define SCREEN_TYPE 1
#define FILE_TYPE 2
const string glogfile ="./log.txt";// 日志等级
enum
{DEBUG = 1,INFO,WARNING,ERROR,FATAL
};string levelTo_string(int level)
{switch (level){case DEBUG:return "DEBUG";case INFO:return "INFO";case WARNING:return "WARNING";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return "UNKNOWN";}
}string gettime()
{time_t now = time(nullptr); // now就是时间戳struct tm *curr_time = localtime(&now);char buff[128];snprintf(buff, sizeof(buff), "%d-%02d-%02d : %02d-%02d-%02d",curr_time->tm_year + 1900,curr_time->tm_mon + 1,curr_time->tm_mday,curr_time->tm_hour,curr_time->tm_min,curr_time->tm_sec);return buff;
}class Logmessage
{
public:string _level;pid_t _pid;string _filename;int _filenumber;string _curr_time;string _message_info;
};class Log
{
private:void FlushLogScreen(Logmessage &lg){printf("[%s][%d][%s][%d] %s",lg._level.c_str(),lg._pid,lg._filename.c_str(),lg._filenumber,lg._message_info.c_str());}void FlushLogFile(Logmessage &lg){ofstream out(_logfile.c_str());if(!out.is_open())return;char buff[2048];snprintf(buff,sizeof(buff),"[%s][%d][%s][%d] %s",lg._level.c_str(),lg._pid,lg._filename.c_str(),lg._filenumber,lg._message_info.c_str());out.write(buff,strlen(buff));out.close();}public:Log(const string &logfile = glogfile) : _type(SCREEN_TYPE), _logfile(logfile){}~Log(){}void Enable(int type){_type = type;}void FlushLog(Logmessage &lg){switch (_type){case SCREEN_TYPE:FlushLogScreen(lg);break;case FILE_TYPE:FlushLogFile(lg);break;}}void logMessage(string filename, int filenumber, int level, const char *format, ...){Logmessage lg;lg._level = levelTo_string(level);lg._pid = getpid();lg._filename = filename;lg._filenumber = filenumber;lg._curr_time = gettime();va_list ap;va_start(ap, format);char log_info[1024];vsnprintf(log_info, sizeof(log_info), format, ap);va_end(ap);lg._message_info = log_info;FlushLog(lg);}private:int _type;string _logfile;
};