C++基于多设计模式下的同步&异步日志系统day5
📟作者主页:慢热的陕西人
🌴专栏链接:C++基于多设计模式下的同步&异步日志系统
📣欢迎各位大佬👍点赞🔥关注🚓收藏,🍉留言
主要内容完成双缓冲区异步任务处理器,日志器模块异步工作器设计与实现。
文章目录
- C++基于多设计模式下的同步&异步日志系统day5
- 1.双缓冲区异步任务处理器(AsyncLooper)设计
- 2.日志器模块异步工作器设计与实现
1.双缓冲区异步任务处理器(AsyncLooper)设计
设计思想:异步处理线程+数据池
使⽤者将需要完成的任务添加到任务池中,由异步线程来完成任务的实际执⾏操作
任务池的设计思想:双缓冲区阻塞数据池
优势:避免了空间的频繁申请释放,且尽可能的减少了⽣产者与消费者之间锁冲突的概率,提⾼了任务处理效率
在任务池的设计中,有很多备选⽅案,⽐如循环队列等等,但是不管是哪⼀种都会涉及到锁冲突的情况,因为在⽣产者与消费者模型中,任何两个⻆⾊之间都具有互斥关系,因此每⼀次的任务添加与取出都有可能涉及锁的冲突,⽽双缓冲区不同,双缓冲区是处理器将⼀个缓冲区中的任务全部处理完毕后,然后交换两个缓冲区,重新对新的缓冲区中的任务进⾏处理,虽然同时多线程写⼊也会冲突,但是冲突并不会像每次只处理⼀条的时候频繁(减少了⽣产者与消费者之间的锁冲突),且不涉及到空间的频繁申请释放所带来的消耗。
- 单个缓冲区的进一步设计
设计一个缓冲区:直接存放格式化后的日志消息字符串
好处:
1.减少了LogMsg对象频繁的构造的消耗
2.可以针对缓冲区中的日志消息,一次性进行IO,减少IO次数,提高效率
缓冲区类的设计
1.管理一个存放字符串数据的缓冲区(使用vector进行空间管理)
2.当前的写入数据位置的指针(指向可写区域的起始位置,避免数据的写入覆盖)
3.当前的读取数据位置的指针(指向可读数据区域的起始位置,当读取指针与写入指针指向相同位置的时候,代表没有数据了)
提供的操作
1.向缓冲区中写入数据
2.获取可读数据起始地址的接口
3.获取可读数据长度的接口
4.移动读写位置的接口
5.初始化缓冲区(将读写缓冲区的所有数据处理完毕之后)
6.提供交换缓冲区的操作(交换空间地址,并不交换空间数据)
/*实现异步日志器的缓冲区*/#ifndef __M_BUF_H__
#define __M_BUF_H__#include<iostream>
#include<vector>
#include<cassert>namespace xupt
{class Buffer{#define DEFAULT_BUFFER_SZIE (1 * 1024 * 1024) //100M#define THREASHOLD_BUFFER_SIZE (8 * 1024 * 1024)#define INCREAMENT_BUFFER_SIZE (1 * 1024 * 1024)public:Buffer() : _buffer(DEFAULT_BUFFER_SZIE), _reader_idx(0), _writer_idx(0) {}//向缓冲区写入数据void push(const char *data, size_t len){ //缓冲区空间不:1.扩容; 2.阻塞;//if(len > writeAbleSize()) return;//2.动态空间,测试性能极限ensureEnough(len);//1.将数据拷贝到缓冲区std::copy(data, data + len, &_buffer[_writer_idx]);//2.移动写指针moveWriter(len);}//返回当前可写入的大小size_t writeAbleSize(){//对于扩容的思路来说,我们不需要这个接口,因为总是可写的//因此这个接口,仅仅针对固定大小的缓冲区大小所提供的return(_buffer.size() - _writer_idx);}//返回可读数据的起始地址const char* begin(){return &_buffer[_reader_idx];}//返回可读数据的长度size_t readAbleSize(){//当前的缓冲区并不环形的//当前实现的缓冲区,是双缓冲区的思路,处理完就交换,所以不存在空间循环使用的情况return (_writer_idx - _reader_idx);}//移动读位置的接口void moveReader(size_t len){assert(len <= readAbleSize());_reader_idx += len;}//重置读写位置,初始化缓冲区void reset(){_writer_idx = 0; //缓冲区所有的空间都是空闲的_reader_idx = 0; //与_writer_idx相等代表没有可读的数据}//对Buffer实现交换操作void swap(Buffer &buffer){_buffer.swap(buffer._buffer);std::swap(_reader_idx, buffer._reader_idx);std::swap(_writer_idx, buffer._writer_idx);}//判断缓冲区是否为空,当任务写入缓冲区不为零的时候,我们才有交换的意义。bool empty(){return (_reader_idx == _writer_idx);}private://对空间进行扩容void ensureEnough(size_t len){if(len <= writeAbleSize()) return ; //如果空间足够,那么直接返回不扩容size_t newsize = 0;if(_buffer.size() < THREASHOLD_BUFFER_SIZE) {//当buffer的空间大小小于阈值的时候,二倍增长newsize = _buffer.size() * 2 + len; //+len,防止翻倍之后依旧不满足len的情况}else{//当buffer的空间大小大于阈值的时候,线性增长newsize = _buffer.size() + INCREAMENT_BUFFER_SIZE + len;}_buffer.resize(newsize);}//移动写位置的接口void moveWriter(size_t len){assert((len + _writer_idx) <= _buffer.size());_writer_idx += len;}private:std::vector<char> _buffer; //存储缓冲区数据的vectorsize_t _reader_idx; //读取数据的下标size_t _writer_idx; //写入数据的下标};}#endif
2.日志器模块异步工作器设计与实现
①异步使用双缓冲区的思想
外界将任务数据,添加到输出缓冲器中
异步线程对处理缓冲区中的数据进行处理,若处理缓冲区中没有数据了,则交换缓冲区
②实现
管理的成员:
a.双缓冲区(生产,消费)
b.互斥锁 ,保证线程安全
c.条件变量-生产&消费(生产缓冲区没有数据,处理完处理完消费缓冲区后就休眠)
d.回调函数(针对缓冲区中数据的处理接口,外界传入一个函数,告诉工作器如何处理数据)
提供的操作:
a.停止异步工作器
b.添加数据到缓冲区
私有操作:
创建线程,线程入口函数中 交换缓冲区,对消费缓区,使用回调函数进行处理
③异步日志器的设计
继承于
Logger
日志器类对于写日志操作进行函数重写(不再将数据写入文件,而是通过异步消息处理器,放到缓冲区中)
通过异步消息处理器,进行日志消息的实际落地
管理的成员
a.异步工作器(异步消息处理器)
//logger.hpp
/*完成日志器模块:1.抽象日志器基类2.派生出不同的子类(同步日志器类 & 异步日志器类)*/
#ifndef __M_LOGER_H__
#define __M_LOGER_H__#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif#include"format.hpp"
#include"level.hpp"
#include"message.hpp"
#include"sink.hpp"
#include"util.hpp"
#include"looper.hpp"#include<atomic>
#include<stdio.h>
#include<mutex>
#include<stdarg.h>
#include<cassert>namespace xupt
{class Logger{public:using ptr = std::shared_ptr<Logger>;Logger(const std::string& logger_name,LogLevel::value level,Formatter::ptr &formatter,std::vector<LogSink::ptr> sinks):_logger_name(logger_name),_limit_level(level),_formatter(formatter),_sinks(sinks.begin(), sinks.end()){}/*完成日志构造消息,并进行格式化,得到格式化后的日志消息字符串---然后进行落地输出*/void debug(const std::string &file, size_t line, const std::string &fmt, ...){//1.判断当前的日志是否到达了输出等级if(LogLevel::value::DEBUG < _limit_level) { return ; }va_list ap;va_start(ap, fmt);char* res;int ret = vasprintf(&res, fmt.c_str(), ap);if(ret == -1){std::cout << "vasprintf failed!\n";return ;}va_end(ap); //将ap置空serialize(LogLevel::value::DEBUG, file, line, res);free(res);}void info(const std::string &file, size_t line, const std::string &fmt, ...){//1.判断当前的日志是否到达了输出等级if(LogLevel::value::INFO < _limit_level) { return ; }va_list ap;va_start(ap, fmt);char* res;int ret = vasprintf(&res, fmt.c_str(), ap);if(ret == -1){std::cout << "vasprintf failed!\n";return ;}va_end(ap); //将ap置空serialize(LogLevel::value::INFO, file, line, res);free(res);} void warn(const std::string &file, size_t line, const std::string &fmt, ...){//1.判断当前的日志是否到达了输出等级if(LogLevel::value::WARN < _limit_level) { return ; }va_list ap;va_start(ap, fmt);char* res;int ret = vasprintf(&res, fmt.c_str(), ap);if(ret == -1){std::cout << "vasprintf failed!\n";return ;}va_end(ap); //将ap置空serialize(LogLevel::value::WARN, file, line, res);free(res);} void error(const std::string &file, size_t line, const std::string &fmt, ...){//1.判断当前的日志是否到达了输出等级if(LogLevel::value::ERROR < _limit_level) { return ; }va_list ap;va_start(ap, fmt);char* res;int ret = vasprintf(&res, fmt.c_str(), ap);if(ret == -1){std::cout << "vasprintf failed!\n";return ;}va_end(ap); //将ap置空serialize(LogLevel::value::ERROR, file, line, res);free(res);} void fatal(const std::string &file, size_t line, const std::string &fmt, ...){//1.判断当前的日志是否到达了输出等级if(LogLevel::value::FATAL < _limit_level) { return ; }va_list ap;va_start(ap, fmt);char* res;int ret = vasprintf(&res, fmt.c_str(), ap);if(ret == -1){std::cout << "vasprintf failed!\n";return ;}va_end(ap); //将ap置空serialize(LogLevel::value::FATAL, file, line, res);free(res);} protected:void serialize(LogLevel::value level, const std::string &file, size_t line, char* str){//2.构造LogMsg对象LogMsg msg(level, line, file, _logger_name, str);//3.通过格式化工具对LogMsg进行格式化,得到格式化后的日志字符串std::stringstream ss;_formatter->format(ss, msg);//5.进行日志落地log(ss.str().c_str(), ss.str().size());}/*抽象接口完成实际的落地输出--不同的日志器会有不同的实际落地方式*/virtual void log(const char *data, size_t len) = 0;protected:std::mutex _mutex; // 保证过程的线程安全std::string _logger_name; // 日志器名称std::atomic<LogLevel::value> _limit_level; // 日志等级Formatter::ptr _formatter; // 格式化std::vector<LogSink::ptr> _sinks; // 用一个数组来存放日志落地位置};//同步日志器class SyncLogger : public Logger{public:SyncLogger(const std::string& logger_name,LogLevel::value level,Formatter::ptr &formatter,std::vector<LogSink::ptr> sinks):Logger(logger_name, level, formatter, sinks){}protected:virtual void log(const char *data, size_t len){std::unique_lock<std::mutex> lock(_mutex);if(_sinks.empty()) return;for(auto &sink : _sinks){sink->log(data,len);}}};class AsyncLogger : public Logger{public:AsyncLogger(const std::string& logger_name,LogLevel::value level,Formatter::ptr &formatter,std::vector<LogSink::ptr> sinks,AsyncType looper_type = AsyncType::ASYNC_SAFE):Logger(logger_name, level, formatter, sinks),_looper(std::make_shared<AsyncLooper>(std::bind(&AsyncLogger::realLog, this, std::placeholders::_1), looper_type)){}//将数据写入缓冲区virtual void log(const char* data, size_t len){_looper->push(data, len);}//设计一个实际落地的函数(将缓冲区中的数据落地)void realLog(Buffer& buf){//这里不需要加锁,因为这里不是多线程,是串行进行的if(_sinks.empty()) return;for(auto &sink : _sinks){sink->log(buf.begin(), buf.readAbleSize());}}private:AsyncLooper::ptr _looper; //};enum class LoggerType{LOGGER_SYNC,LOGGER_ASYNC};/*使用建造者模式来建造日志器,而不用用户直接去构造日志器,减少用户的使用复杂度*///1.抽象一个日志器建造者类(完成日志器对象所需零部件的构造 & 日志器的构建)// 1.设置日志器类型// 2.将不同类型日志器的创建放到同一个日志器建造类中完成class LoggerBuilder{public:LoggerBuilder():_logger_type(LoggerType::LOGGER_SYNC),_limit_level(LogLevel::value::DEBUG),_looper_type(AsyncType::ASYNC_SAFE){}void buildLoggerType(LoggerType type) { _logger_type = type; }void buildEnableUnsafeAsync() { _looper_type = AsyncType::ASYNC_UNSAFE; }void buildLoggerName(const std::string & name) { _logger_name = name; }void buildLoggerLevel(LogLevel::value level) { _limit_level = level; }void buildFomatter(const std::string &pattern) { _formatter = std::make_shared<Formatter>(pattern); }template<typename SinkType, typename ...Args>void buildSink(Args &&... args){LogSink::ptr psink = SinkFactory::create<SinkType>(std::forward<Args>(args)...);_sinks.push_back(psink);}virtual Logger::ptr build() = 0;protected:AsyncType _looper_type;LoggerType _logger_type;std::string _logger_name; // 日志器名称std::atomic<LogLevel::value> _limit_level; // 日志等级Formatter::ptr _formatter; // 格式化std::vector<LogSink::ptr> _sinks; // 用一个数组来存放日志落地位置};//2.派生出具体的建造者类---局部的日志器建造者&全局的日志器建造者(后边添加了全局单例管理器之后,将日志器添加全局管理)class LocalLoggerBuilder : public LoggerBuilder{public:Logger::ptr build() override{assert(_logger_name.empty() == false); //必须有日志器名称if(_formatter.get() == nullptr){_formatter = std::make_shared<Formatter>();}if(_sinks.empty()){buildSink<StdoutSink>();}if(_logger_type == LoggerType::LOGGER_ASYNC){return std::make_shared<AsyncLogger>(_logger_name, _limit_level, _formatter, _sinks);}//返回同步日志器return std::make_shared<SyncLogger>(_logger_name, _limit_level, _formatter, _sinks);}}; }#endif//looper.hpp
/*实现异步工作器*/#ifndef __M_LOP_H__
#define __M_LOP_H__#include"buffer.hpp"#include<mutex>
#include<condition_variable>
#include<thread>
#include<functional>
#include<atomic>namespace xupt
{using Functor = std::function<void(Buffer&)>;enum class AsyncType{ASYNC_SAFE, // 安全模式,表示缓冲区满了则退出ASYNC_UNSAFE // 无限扩容};class AsyncLooper{public:using ptr = std::shared_ptr<AsyncLooper>;AsyncLooper(const Functor & cb, AsyncType looper_type = AsyncType::ASYNC_SAFE):_looper_type(looper_type),_stop(false),_thread(std::thread(&AsyncLooper::threadEntry, this)),_callback(cb){}~AsyncLooper() { stop(); }void stop(){_stop = true;_cond_con.notify_all(); //唤醒所有的消费线程_thread.join();//等待工作线程的退出}void push(const char *data, size_t len) //{//1.无限扩容 -- 不安全 2.固定大小 -- 缓冲区满的时候阻塞std::unique_lock<std::mutex> lock(_mutex);if(_looper_type == AsyncType::ASYNC_SAFE)//条件变量控制,如果当前缓冲区的大小,足够放入数据,则可以添加数据_cond_pro.wait(lock, [&](){ return _pro_buf.writeAbleSize() >= len; });//将数据写入到缓冲区_pro_buf.push(data, len);//唤醒消费者对缓冲区中的数据进行处理_cond_con.notify_one();} private:// 线程入口函数void threadEntry(){while(true){// 为互斥锁设置一个声明周期,当缓冲区交换完,就解锁(并不对数据的处理过程加锁){//退出标志已设置,并且缓冲区中没有数据,则退出if(_stop && _pro_buf.empty()) break;// 1.判断缓冲区是否有数据,有数据则交换,无则阻塞std::unique_lock<std::mutex> lock(_mutex);_cond_con.wait(lock, [&]() { return _stop || !_pro_buf.empty(); });_con_buf.swap(_pro_buf);if(_looper_type == AsyncType::ASYNC_SAFE)// 唤醒生产者_cond_pro.notify_all(); }// 2.被唤醒后,对消费缓冲区进行处理_callback(_con_buf);// 3.初始化消费者缓冲区_con_buf.reset();}}private:Functor _callback; //具体对缓冲区数据处理的函数,由异步工作器的使用这传入 private:AsyncType _looper_type; std::atomic<bool> _stop; // 是否停止Buffer _pro_buf; // 生产缓冲区Buffer _con_buf; // 消费缓冲区std::mutex _mutex; // 互斥锁std::condition_variable _cond_pro; // 生产条件变量std::condition_variable _cond_con; // 消费条件变量std::thread _thread; // 异步工作器对应的工作线程};}
到这本篇博客的内容就到此结束了。
如果觉得本篇博客内容对你有所帮助的话,可以点赞,收藏,顺便关注一下!
如果文章内容有错误,欢迎在评论区指正