C++项目实战——基于多设计模式下的同步异步日志系统-⑩-异步缓冲区类与异步工作器类设计

文章目录

  • 专栏导读
  • 异步缓冲区设计思想
  • 异步缓冲区类设计
  • 异步工作器类设计
  • 异步日志器设计
  • 异步缓冲区类整理
  • 异步工作器类整理

专栏导读

🌸作者简介:花想云 ,在读本科生一枚,C/C++领域新星创作者,新星计划导师,阿里云专家博主,CSDN内容合伙人…致力于 C/C++、Linux 学习。

🌸专栏简介:本文收录于 C++项目——基于多设计模式下的同步与异步日志系统

🌸相关专栏推荐:C语言初阶系列C语言进阶系列C++系列数据结构与算法Linux

在这里插入图片描述
为了避免因为写日志的过程阻塞,导致业务线程在写日志的时候影响其效率(例如由于网络原因导致日志写入阻塞,进而导致业务线程阻塞),因此我们需要设计一个异步日志器

异步的思想就是不让业务线程进行日志的实际落地操作,而是将日志消息放到缓冲区(一块指定内存)当中,接下来有一个专门的异步线程,去针对缓冲区中的数据进行处理(实际落地操作)

所以,异步日志器的实现思想:

  • 设计一个线程安全的缓冲区
  • 创建一个异步工作线程,专门负责缓冲区中日志消息落地操作。

异步缓冲区设计思想

在任务池的设计中,有很多备选方案,比如队列、循环队列等,但是不管哪一种都会涉及到锁冲突的情况,因为在生产者与消费者模型中,任何两个角色之间具有互斥关系,因此每一次任务的添加与取出都有可能涉及锁的冲突

所以我们采用双缓冲区的的设计思想,优势在于:

  • 避免了空间的频繁申请与释放,且尽可能的减少了生产者与消费则之间锁冲突的概率,提高了任务处理效率

双缓冲区的设计思想是:采用两个缓冲区,一个用来进行任务写入(push pool),一个进行任务处理(pop pool)。当异步工作线程(消费者)将缓冲区中的数据全部处理完毕之后,然后交换两个缓冲区,重新对新的缓冲区中的任务进行处理,虽然同时多线程写入也会产生冲突,但是冲突并不会像每次只处理一条的时候频繁(减少了消费者与生产者之间的锁冲突),且不涉及到空间的频繁申请释放所带来的的消耗。

在这里插入图片描述

异步缓冲区类设计

类中包含的成员:

  • 一个存放字符串数据的缓冲区(使用vector进行空间管理);
  • 当前写入数据位置的指针(指向可写区域的起始位置,避免数据的写入覆盖);
  • 当前读取数据位置的指针(指向可读区域的起始位置,当读取指针与写入指针指向相同的位置表示数据读取完了);

类中提供的操作:

  • 向缓冲区中写入数据
  • 获取可读数据起始地址的接口
  • 获取可读数据长度的接口
  • 移动读写位置的接口
  • 初始化缓冲区的操作(将读写位置初始化–在一个缓冲区所有数据处理完毕之后);
  • 提供交换缓冲区的操作(交换空间地址,并不交换空间数据)。

注意,缓冲区中直接存放格式化后的日志消息字符串,而不是LogMsg对象,这样做有两个好处:

  • 减少了LogMsg对象频繁的构造的消耗;
  • 可以针对缓冲区中的日志消息,一次性进行IO操作,减少IO次数,提高效率。
#ifndef __M_BUFFER_H__
#define __M_BUFFER_H__
#include "util.hpp"
#include <vector>
#include <cassert>namespace LOG
{#define DEFAULT_BUFFER_SIZE (1 * 1024 * 1024)#define THRESHOLD_BUFFER_SIZE (8 * 1024 * 1024)#define INCREMENT_BUFFER_SIZE (1 * 1024 * 1024)class Buffer {public:Buffer() : _buffer(DEFAULT_BUFFER_SIZE), _writer_idx(0), _reader_idx(0) {}// 向缓冲区中写入数据void push(const char* data, size_t len){// 1.考虑空间不够则扩容ensureEnoughSize(len);// 2.将数据拷贝到缓冲区std::copy(data, data + len, &_buffer[_writer_idx]);// 3.将当前写入位置向后偏移moveWriter(len);}// 返回可读数据的起始地址const char* begin(){return &_buffer[_reader_idx];}// 返回可写数据的长度size_t writeAbleSize(){// 对于扩容思路并没有用, 仅针对固定大小缓冲区return (_buffer.size() - _writer_idx);}// 返回可读数据的长度size_t readAbleSize(){ return (_writer_idx - _reader_idx);}// 重置读写位置void reset(){_writer_idx = 0;_reader_idx = 0;}// 对buffer实现交换操作void swap(Buffer & buffer){_buffer.swap(buffer._buffer);std::swap(_reader_idx, buffer._reader_idx);std::swap(_writer_idx, buffer._writer_idx);}// 判断缓冲区是否为空bool empty(){return (_reader_idx == _writer_idx);}private:void ensureEnoughSize(size_t len){if(len < writeAbleSize()) return;size_t new_size = 0;if(_buffer.size() < THRESHOLD_BUFFER_SIZE){new_size = _buffer.size() * 2 + len; // 小于阈值则翻倍增长}else{new_size = _buffer.size() + INCREMENT_BUFFER_SIZE + len;}_buffer.resize(new_size);}// 对读指针进行向后偏移操作void moveReader(size_t len){assert(len <= readAbleSize());_reader_idx += len;}// 对写指针进行向后偏移操作void moveWriter(size_t len){assert(len + _writer_idx<= _buffer.size());_writer_idx += len;}private:std::vector<char> _buffer;size_t _reader_idx; // 当前可读数据的指针size_t _writer_idx; // 当前可写数据的指针};
}
#endif

异步工作器类设计

异步工作器的主要任务是,对缓冲区中的数据进行处理,若处理缓冲区中没有数据了则交换缓冲区

异步工作器类管理的成员有:

  • 双缓冲区(生产,消费);
  • 互斥锁:保证线程安全;
  • 条件变量-生产&消费:生产缓冲区中没有数据,处理完消费缓冲区数据后就休眠;
  • 回调函数:针对缓冲区中数据的处理接口——外界传入一个函数,告诉异步日志器该如何处理。

异步工作器类提供的操作有:

  • 停止异步工作器
  • 添加数据到缓冲区

私有操作:

  • 创建线程
  • 线程入口函数:在线程入口函数中交换缓冲区,对消费缓冲区数据使用回调函数进行处理,处理完后再次交换;
#ifndef __M_LOOPER_H__
#define __M_LOOPER_H__
#include "buffer.hpp"
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>namespace LOG
{using Functor = std::function<void(Buffer &)>;enum class AsyncType{ASYNC_SAFE, // 安全状态, 表示缓冲区满了则阻塞,避免资源耗尽的风险ASYNC_UNSAFE // 不考虑资源耗尽的问题};class AsyncLooper{public:using ptr = std::shared_ptr<AsyncLooper>;AsyncLooper(const Functor& cb, AsyncType looper_type = AsyncType::ASYNC_SAFE):_looper_type(looper_type),_stop(false),_call_back(cb),_thread(std::thread(&AsyncLooper::threadEntry, this)){}~AsyncLooper() { stop(); }void stop() {_stop = true; // 将退出标志设置为true_con_cond.notify_all(); // 唤醒所有工作线程_thread.join(); // 等待工作线程退出}void push(const char* data, size_t len){std::unique_lock<std::mutex> lock(_mutex);// 条件变量控制,若缓冲区剩余空间大小等于数据长度,则可以添加数据if(_looper_type == AsyncType::ASYNC_SAFE)_pro_cond.wait(lock, [&](){ return _pro_buf.writeAbleSize() >= len; });// 能够走下来说明条件满足,可以向缓冲区添加数据了_pro_buf.push(data, len);// 唤醒消费者对缓冲区中的数据进行处理_con_cond.notify_one();}private:// 线程入口函数 -- 对消费者缓冲区中的数据进行处理,处理完毕后,初始化缓冲区,交换缓冲区void threadEntry(){// 为互斥锁设置一个声明周期,当缓冲区交换完毕就解锁while(1){{// 1.判断生产缓冲区有没有数据,有则交换,无则阻塞std::unique_lock<std::mutex> lock(_mutex);if(_stop && _pro_buf.empty()) break;// 若当前是退出前被唤醒或者是有数据被唤醒,则返回真,继续向下运行,否则重新进入休眠_con_cond.wait(lock, [&](){ return _stop || !_pro_buf.empty(); });_con_buf.swap(_pro_buf);// 2.唤醒生产者if(_looper_type == AsyncType::ASYNC_SAFE)_pro_cond.notify_all();}// 3.被唤醒后,对消费者缓冲区进行数据处理_call_back(_con_buf);// 4.初始化消费者缓冲区_con_buf.reset();}}private:Functor _call_back;private:AsyncType _looper_type;std::atomic<bool> _stop;Buffer _pro_buf;Buffer _con_buf;std::mutex _mutex;std::condition_variable _pro_cond;std::condition_variable _con_cond;std::thread _thread; // 异步工作器对应的工作线程};
}
#endif

异步日志器设计

异步日志器继承自日志器类,并在同步日志器类上拓展了异步工作器。当我们需要异步输出日志的时候,需要创建异步日志器和消息处理器,调用异步日志器的log、debug、error、info、fatal等函数输出不同级别日志。

  • log函数为重写Logger类的函数,主要实现将日志日志数据加入异步缓冲区;
  • realLog函数主要由异步线程调用(是为异步工作器设置的回调函数),完成日志的实际落地操作。
class AsyncLogger : public Logger
{
public:AsyncLogger(const std::string &logger_name,LogLevel::value level,LOG::Formatter::ptr &formatter,std::vector<LogSink::ptr> &sinks,AsyncType looper_type): Logger(logger_name, level, formatter, sinks),_looper(std::make_shared<AsyncLooper>(std::bind(&AsyncLogger::realLog, this, std::placeholders::_1), looper_type)){}// 将数据写入缓冲区 void log(const char *data, size_t len){_looper->push(data, len);}// 设计一个实际落地函数void realLog(Buffer &buf){if (_sinks.empty())return;for (auto &sink : _sinks){sink->log(buf.begin(), buf.readAbleSize());}}private:AsyncLooper::ptr _looper; // 异步工作器
};

异步缓冲区类整理

#ifndef __M_BUFFER_H__
#define __M_BUFFER_H__
#include "util.hpp"
#include <vector>
#include <cassert>namespace LOG
{#define DEFAULT_BUFFER_SIZE (1 * 1024 * 1024)#define THRESHOLD_BUFFER_SIZE (8 * 1024 * 1024)#define INCREMENT_BUFFER_SIZE (1 * 1024 * 1024)class Buffer {public:Buffer() : _buffer(DEFAULT_BUFFER_SIZE), _writer_idx(0), _reader_idx(0) {}// 向缓冲区中写入数据void push(const char* data, size_t len){// 1.考虑空间不够则扩容ensureEnoughSize(len);// 2.将数据拷贝到缓冲区std::copy(data, data + len, &_buffer[_writer_idx]);// 3.将当前写入位置向后偏移moveWriter(len);}// 返回可读数据的起始地址const char* begin(){return &_buffer[_reader_idx];}// 返回可写数据的长度size_t writeAbleSize(){// 对于扩容思路并没有用, 仅针对固定大小缓冲区return (_buffer.size() - _writer_idx);}// 返回可读数据的长度size_t readAbleSize(){ return (_writer_idx - _reader_idx);}// 重置读写位置void reset(){_writer_idx = 0;_reader_idx = 0;}// 对buffer实现交换操作void swap(Buffer & buffer){_buffer.swap(buffer._buffer);std::swap(_reader_idx, buffer._reader_idx);std::swap(_writer_idx, buffer._writer_idx);}// 判断缓冲区是否为空bool empty(){return (_reader_idx == _writer_idx);}private:void ensureEnoughSize(size_t len){if(len < writeAbleSize()) return;size_t new_size = 0;if(_buffer.size() < THRESHOLD_BUFFER_SIZE){new_size = _buffer.size() * 2 + len; // 小于阈值则翻倍增长}else{new_size = _buffer.size() + INCREMENT_BUFFER_SIZE + len;}_buffer.resize(new_size);}// 对读指针进行向后偏移操作void moveReader(size_t len){assert(len <= readAbleSize());_reader_idx += len;}// 对写指针进行向后偏移操作void moveWriter(size_t len){assert(len + _writer_idx<= _buffer.size());_writer_idx += len;}private:std::vector<char> _buffer;size_t _reader_idx; // 当前可读数据的指针size_t _writer_idx; // 当前可写数据的指针};
}
#endif

异步工作器类整理

#ifndef __M_LOOPER_H__
#define __M_LOOPER_H__
#include "buffer.hpp"
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>namespace LOG
{using Functor = std::function<void(Buffer &)>;enum class AsyncType{ASYNC_SAFE, // 安全状态, 表示缓冲区满了则阻塞,避免资源耗尽的风险ASYNC_UNSAFE // 不考虑资源耗尽的问题};class AsyncLooper{public:using ptr = std::shared_ptr<AsyncLooper>;AsyncLooper(const Functor& cb, AsyncType looper_type = AsyncType::ASYNC_SAFE):_looper_type(looper_type),_stop(false),_call_back(cb),_thread(std::thread(&AsyncLooper::threadEntry, this)){}~AsyncLooper() { stop(); }void stop() {_stop = true; // 将退出标志设置为true_con_cond.notify_all(); // 唤醒所有工作线程_thread.join(); // 等待工作线程退出}void push(const char* data, size_t len){std::unique_lock<std::mutex> lock(_mutex);// 条件变量控制,若缓冲区剩余空间大小等于数据长度,则可以添加数据if(_looper_type == AsyncType::ASYNC_SAFE)_pro_cond.wait(lock, [&](){ return _pro_buf.writeAbleSize() >= len; });// 能够走下来说明条件满足,可以向缓冲区添加数据了_pro_buf.push(data, len);// 唤醒消费者对缓冲区中的数据进行处理_con_cond.notify_one();}private:// 线程入口函数 -- 对消费者缓冲区中的数据进行处理,处理完毕后,初始化缓冲区,交换缓冲区void threadEntry(){// 为互斥锁设置一个生命周期,当缓冲区交换完毕就解锁while(1){{// 1.判断生产缓冲区有没有数据,有则交换,无则阻塞std::unique_lock<std::mutex> lock(_mutex);if(_stop && _pro_buf.empty()) break;// 若当前是退出前被唤醒或者是有数据被唤醒,则返回真,继续向下运行,否则重新进入休眠_con_cond.wait(lock, [&](){ return _stop || !_pro_buf.empty(); });_con_buf.swap(_pro_buf);// 2.唤醒生产者if(_looper_type == AsyncType::ASYNC_SAFE)_pro_cond.notify_all();}// 3.被唤醒后,对消费者缓冲区进行数据处理_call_back(_con_buf);// 4.初始化消费者缓冲区_con_buf.reset();}}private:Functor _call_back;private:AsyncType _looper_type; // 选择异步工作器工作模式(安全与非安全模式)std::atomic<bool> _stop; // 工作器退出标志Buffer _pro_buf; Buffer _con_buf; std::mutex _mutex;std::condition_variable _pro_cond;std::condition_variable _con_cond;std::thread _thread; // 异步工作器对应的工作线程};
}
#endif

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/110299.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[java进阶]——IO流,递归实现多级文件拷贝

&#x1f308;键盘敲烂&#xff0c;年薪30万&#x1f308; 目录 一、认识IO流 二、了解编码与解码 二、IO流体系 三、字节输入输出流 四、字符输入输出流 五、多级文件拷贝 一、认识IO流 IO流也叫输入流(intput)、输出流(onput)&#xff0c;该流就像java程序同硬盘之间的…

2024年申报国自然项目基金撰写及技巧

随着社会经济发展和科技进步&#xff0c;基金项目对创新性的要求越来越高。申请人需要提出独特且有前瞻性的研究问题&#xff0c;具备突破性的科学思路和方法。因此&#xff0c;基金项目申请往往需要进行跨学科的技术融合。申请人需要与不同领域结合&#xff0c;形成多学科交叉…

第0章:怎么入手tensorflow

近年来人工智能的火爆吸引了很多人&#xff0c;网上相关的热门课程报名的人很多&#xff0c;但是坚持下去的人却少。那些晦涩的原理没有一定知识的积累很难能理解。 如果你对人工智能感兴趣&#xff0c;且想利用人工智能去实现某项功能&#xff0c;而不是对人工智能本身感兴趣&…

Linux性能优化--性能工具:磁盘I/O

6.0 概述 本章介绍的性能工具能帮助你评估磁盘I/O子系统的使用情况。这些工具可以展示哪些磁盘或分区已被使用&#xff0c;每个磁盘处理了多少I/O,发给这些磁盘的I/O请求要等多久才被处理。 阅读本章后&#xff0c;你将能够&#xff1a; 确定系统内磁盘I/O的总量和类型(读/写…

【网络安全 --- xss-labs通关】xss-labs靶场通关,让你巩固对xss漏洞的理解及绕过技巧和方法(提供资源)

一&#xff0c;资源下载准备 1-1 VMware 16.0 安装请参考以下博客&#xff0c;若已经安装请忽略&#xff1a; 【网络安全 --- 工具安装】VMware 16.0 详细安装过程&#xff08;提供资源&#xff09;-CSDN博客【网络安全 --- 工具安装】VMware 16.0 详细安装过程&#xff08;…

序列化和反序列化指令在PLC通信上的应用

在了解本篇博客之前&#xff0c;大家可以熟悉下序列化指令的相关介绍&#xff0c;详细内容如下&#xff1a; 博途PLC 1200/1500 PLC 序列化和反序列化指令编程应用_博图序列化和反序列化-CSDN博客序列化最重要的作用&#xff1a;在传递和保存对象时.保证对象的完整性和可传递性…

3. 无重复字符的最长子串

给定一个字符串 s &#xff0c;请你找出其中不含有重复字符的 最长子串 的长度。 示例 1: 输入: s "abcabcbb" 输出: 3 解释: 因为无重复字符的最长子串是 "abc"&#xff0c;所以其长度为 3。示例 2: 输入: s "bbbbb" 输出: 1 解释: 因为无…

Oracle中数据迁移的工具

SQL*Plus Copy命令 问题描述 怎样在SQL*Plus中实现不同的表之间的数据复制&#xff0c;不论是本地的还是远程的&#xff1f; 问题分析 在SQL*Plus中的copy命令&#xff0c;可以完成远程数据库、本地数据库或Oracle数据库与非Oracle数据库之间的数据复制。其性能与导入/导出相同…

【API篇】四、Flink物理分区算子API

文章目录 1、 分区算子&#xff1a;随机分区2、分区算子&#xff1a;轮询分区3、分区算子&#xff1a;重缩放分区4、分区算子&#xff1a;广播5、分区算子&#xff1a;全局分区6、自定义分区 重分区&#xff0c;即数据"洗牌"&#xff0c;将数据分配到下游算子的并行子…

Apache Shiro 组件反序列化漏洞分析

概述 Apache Shiro是一个强大且易用的Java安全框架,执行身份验证、授权、密码和会话管理。使用Shiro的易于理解的API,您可以快速、轻松地获得任何应用程序,从最小的移动应用程序到最大的网络和企业应用程序。 它的原理比较简单&#xff1a;为了让浏览器或服务器重启后用户不丢失…

嵌入式实时操作系统的设计与开发 (中断管理)

中断发生及响应 硬件抽象HAL层响应 中断请求IRQ被中断控制器汇集成中断向量&#xff08;Interrupt Vector&#xff09;&#xff0c;每个中断向量对应一个中断服务程序ISR&#xff0c;中断向量存放了ISRs的入口地址或ISRs的第一条指令。 系统中通常包含多个中断向量&#xff0…

Linux-ssh

文章目录 远程登录服务器配置远程服务器相关信息创建config文件配置config文件 配置密钥登陆先创建密钥配置密钥文件 执行命令scp传文件copy文件copy文件夹配置我们的vim和tmux 远程登录服务器 ssh userhostnameuser:用户名hostname&#xff1a;IP地址或域名 第一次登陆会显示…

Linux权限基础知识

前言&#xff1a;作者也是初学Linux&#xff0c;可能总结的还不是很到位 Linux修炼功法&#xff1a;初阶功法 ♈️今日夜电波&#xff1a;修炼爱情 —林俊杰 0:30━━━━━━️&#x1f49f;──────── 4:47 …

MySQL——七、MySQL备份恢复

MySQL 一、MySQL日志管理1、MySQL日志类型2、错误日志3、通用查询日志4、慢查询日志5、二进制日志5.1 开启日志5.2 二进制日志的管理5.3 日志查看5.4 二进制日志还原数据 二、MySQL备份1、备份类型逻辑备份优缺点 2、备份内容3、备份工具3.1 MySQL自带的备份工具3.2 文件系统备…

Android SurfaceView预览相机黑屏问题解决方案

解决方案 1、使用动态添加的方式添加surfaceView 在xml文件中添加FrameLayout <FrameLayoutandroid:id"id/colorsurface"android:layout_width"match_parent"android:layout_height"match_parent" /> 2.创建SurfaceView并添加到FrameL…

大模型,重构自动驾驶

文&#xff5c;刘俊宏 编&#xff5c;王一粟 大模型如何重构自动驾驶&#xff1f;答案已经逐渐露出水面。 “在大数据、大模型为特征&#xff0c;以数据驱动为开发模式的自动驾驶3.0时代&#xff0c;自动驾驶大模型将在车端、云端上实现一个统一的端到端的平台管理。”毫末智…

【CesiumforUnreal插件】UE5 快速构建Cesium场景 快速入门!!!

目录 0 引言1 快速入门1.1 准备1.2 安装Cesium for Unreal插件并创建一个项目1.3 准备关卡并添加地形和纹理1.4 添加3D建筑到场景中1.5 探索场景 &#x1f64b;‍♂️ 作者&#xff1a;海码007&#x1f4dc; 专栏&#xff1a;CesiumforUnreal专栏&#x1f4a5; 标题&#xff1a…

如何转换Corona和Vray材质?cr材质转vr材质的方法

cr材质转vr材质的方法一&#xff1a;使用CG Magic插件&#xff0c;一键转换 CG Magic是一款基于3ds Max深度开发的智能化辅助插件&#xff0c;上千项实用功能&#xff0c;降低渲染时长&#xff0c;节省时间和精力&#xff0c;大幅简化工作流程&#xff0c;助力高效完成创作。 …

【Tomcat】为Tomcat服务配置本地Apr库以提升性能

关于 apr 和 apr-util 对 Tomcat 服务的性能提升的说明&#xff1a; 要测APR给tomcat带来的好处最好的方法是在慢速网络上&#xff08;模拟Internet&#xff09;&#xff0c;将Tomcat线程数开到300以上的水平&#xff0c;然后模拟一大堆并发请求。如果不配APR&#xff0c;基本…

Qt中绘图框架的使用例子

绘图框架的使用步骤&#xff1a; 1、创建自定类继承QGraphicsView&#xff08;可以直接用QGraphicsView小部件&#xff09;。 2、创建QGraphicsScene对象&#xff0c;在创建的时候初始化场景矩形框。并使用QGraphicsView部件的函数setScene将场景设置到视图中&#xff0c;这里…