muduo异步日志实现
陈硕老师的muduo网络库的异步日志的实现,今晚有点晚了,我明晚再把这个异步日志抽出来,作为一个独立的日志库。
所在文件
-
AsyncLogging.cc
-
AsyncLogging.h
-
LogFile.h
-
LogFile.cc
-
CountDownLatch.h
-
CountDownLatch.cc
这个CountDownLatch有点像信号量,但是又只有down操作,上网查了以下类似的,作用有点像屏障
class AsyncLogging : noncopyable
{public:AsyncLogging(const string& basename,off_t rollSize,int flushInterval = 3);~AsyncLogging(){if (running_){stop();}}void append(const char* logline, int len);void start(){running_ = true;thread_.start(); // 启动线程latch_.wait(); // 这里的wait调用其实并不会阻塞// 主线程}void stop() NO_THREAD_SAFETY_ANALYSIS{running_ = false;cond_.notify();thread_.join();}private:// 默认的守护进程执行的函数void threadFunc();/*Buffer的数据结构private:std::vector<char> buffer_;size_t readerIndex_;size_t writerIndex_;static const char kCRLF[];*/typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;typedef std::vector<std::unique_ptr<Buffer>> BufferVector;typedef BufferVector::value_type BufferPtr; // 表示容器元素的类型const int flushInterval_; // 刷盘的时间间隔std::atomic<bool> running_; // 是否运行const string basename_; // 文件名const off_t rollSize_; // 日志回滚的大小muduo::Thread thread_; // 日志类自带一个守护线程来写muduo::CountDownLatch latch_; muduo::MutexLock mutex_;muduo::Condition cond_ GUARDED_BY(mutex_);BufferPtr currentBuffer_ GUARDED_BY(mutex_); // 当前缓冲BufferPtr nextBuffer_ GUARDED_BY(mutex_); // 预备缓冲BufferVector buffers_ GUARDED_BY(mutex_); // 已经写满并等待落盘的缓冲
};
#include <stdio.h>using namespace muduo;AsyncLogging::AsyncLogging(const string& basename,off_t rollSize,int flushInterval): flushInterval_(flushInterval),running_(false),basename_(basename),rollSize_(rollSize),thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"),latch_(1),mutex_(),cond_(mutex_),currentBuffer_(new Buffer),nextBuffer_(new Buffer),buffers_()
{currentBuffer_->bzero();nextBuffer_->bzero();buffers_.reserve(16);
}void AsyncLogging::append(const char* logline, int len)
{muduo::MutexLockGuard lock(mutex_);if (currentBuffer_->avail() > len){currentBuffer_->append(logline, len);}else{buffers_.push_back(std::move(currentBuffer_));if (nextBuffer_){currentBuffer_ = std::move(nextBuffer_);}else{// 四个缓冲区都写满了currentBuffer_.reset(new Buffer); // Rarely happens}currentBuffer_->append(logline, len);// 这里的notify不一定什么时候都有效// 如果此时守护线程正在工作// 那么这个信后就会丢失,但是没有造成影响// 但是有可能守护线程正在条件变量上睡眠cond_.notify();}
}void AsyncLogging::threadFunc()
{assert(running_ == true);latch_.countDown(); // 计数器减一,latch_ == 0,并唤醒主线程LogFile output(basename_, rollSize_, false); // 初始化一个输出流BufferPtr newBuffer1(new Buffer);BufferPtr newBuffer2(new Buffer);newBuffer1->bzero();newBuffer2->bzero();BufferVector buffersToWrite; // 相当于一个前后端交互的单元// 将写满的buffer装到vector中// 在传输到后端buffersToWrite.reserve(16);while (running_){assert(newBuffer1 && newBuffer1->length() == 0);assert(newBuffer2 && newBuffer2->length() == 0);assert(buffersToWrite.empty());{muduo::MutexLockGuard lock(mutex_);// 使用条件变量完成定时任务if (buffers_.empty()) // unusual usage!{cond_.waitForSeconds(flushInterval_);}buffers_.push_back(std::move(currentBuffer_));currentBuffer_ = std::move(newBuffer1);buffersToWrite.swap(buffers_); // buffers_变成一个空的buffers_if (!nextBuffer_){nextBuffer_ = std::move(newBuffer2);}}assert(!buffersToWrite.empty());// 如果日志太多了if (buffersToWrite.size() > 25){char buf[256];snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",Timestamp::now().toFormattedString().c_str(),buffersToWrite.size()-2);fputs(buf, stderr);// 先输出一个报警的日志output.append(buf, static_cast<int>(strlen(buf)));// 清除多余的日志buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());}for (const auto& buffer : buffersToWrite){// FIXME: use unbuffered stdio FILE ? or use ::writev ?output.append(buffer->data(), buffer->length());}if (buffersToWrite.size() > 2){// drop non-bzero-ed buffers, avoid trashing// vector底层是智能指针不用担心内存泄露buffersToWrite.resize(2);}if (!newBuffer1){assert(!buffersToWrite.empty());newBuffer1 = std::move(buffersToWrite.back());buffersToWrite.pop_back();newBuffer1->reset();}if (!newBuffer2){assert(!buffersToWrite.empty());newBuffer2 = std::move(buffersToWrite.back());buffersToWrite.pop_back();newBuffer2->reset();}buffersToWrite.clear();output.flush();}output.flush();
}