C++项目 —— 基于多设计模式下的同步异步日志系统(4)(双缓冲区异步任务处理器(AsyncLooper)设计)

C++项目 —— 基于多设计模式下的同步&异步日志系统(4)(双缓冲区异步任务处理器(AsyncLooper)设计)

  • 异步线程
      • 什么是异步线程?
      • C++ 异步线程简单例子
      • 代码解释
      • 程序输出
      • 关键点总结
      • 扩展:使用 `std::thread` 实现异步线程
  • 分辨同步异步线程
      • 1. 看调用是否立即返回
        • 同步线程(如排队买奶茶)
        • 异步线程(如外卖下单)
      • 2. 看是否有专门的工作线程
        • 同步
        • 异步
      • 3. 看资源访问方式
        • 同步
        • 异步
      • 4. 典型场景对比
      • 代码示例对比
        • 同步日志
        • 异步日志
  • 双缓冲区异步任务处理器
      • 设计思想:异步线程 + 数据池
        • 核心思想
      • 任务池的设计:双缓冲区阻塞数据池
        • 1. 双缓冲区的工作机制
        • 2. 减少锁冲突
        • 3. 内存管理优化
      • 优势分析
        • 1. 提高任务处理效率
        • 2. 减少锁冲突
        • 3. 降低内存分配开销
      • 与其他设计方案的对比
        • 1. 循环队列
        • 2. 单缓冲区
        • 3. 双缓冲区的优势
      • 总结
  • 缓冲区实现
  • 总结
    • loop.hpp中与线程有关的操作
      • 1. 线程的创建与管理
      • 2. 线程同步与通信
      • 3. 线程入口函数
      • 4. 数据生产与消费
      • 5. 线程安全的设计
      • 总结
  • placeholders::_1
      • 示例代码
      • 说明
      • 输出结果
      • 具体作用
      • 详细解释
      • 为什么需要 `std::placeholders::_1`?
      • 示例对比
        • 不使用 `std::bind` 和占位符:
        • 使用 `std::bind` 和占位符:
      • 总结

我们上次把同步日志器的编写已经差不多完成了,如果还没有看过同步日志器编写的小伙伴可以点击这里:

https://blog.csdn.net/qq_67693066/article/details/147309275?spm=1011.2415.3001.5331

我们今天主要来编写异步任务处理器,但是在这之前我们得了解一下什么是异步线程

异步线程

什么是异步线程?

异步线程指的是程序中的一种执行模式,其中某些任务可以在主线程或其他线程中并行运行,而不会阻塞主线程的执行。换句话说,异步线程允许我们在不等待某个操作完成的情况下继续执行其他任务。

在 C++ 中,异步线程可以通过标准库中的 <thread><future> 来实现。常见的异步编程工具包括:

  • std::thread:用于创建和管理线程。
  • std::async:用于以异步方式执行任务,并返回结果。
  • std::futurestd::promise:用于在线程间传递数据。

C++ 异步线程简单例子

以下是一个使用 std::async 实现异步线程的简单例子:

#include <iostream>
#include <future>  // std::async, std::future
#include <chrono>  // std::this_thread::sleep_for
#include <thread>  // std::this_thread// 模拟一个耗时的任务
int slowTask(int x) {std::cout << "Task started with input: " << x << std::endl;std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时操作int result = x * x;std::cout << "Task completed. Result: " << result << std::endl;return result;
}int main() {// 使用 std::async 启动一个异步任务std::future<int> resultFuture = std::async(std::launch::async, slowTask, 5);// 主线程继续执行其他工作std::cout << "Main thread is doing other work..." << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟主线程的工作std::cout << "Main thread finished its work." << std::endl;// 获取异步任务的结果(如果任务未完成,会阻塞直到结果可用)int result = resultFuture.get();std::cout << "Final result from async task: " << result << std::endl;return 0;
}

代码解释

  1. slowTask 函数

    • 这是一个模拟的耗时任务,接收一个整数参数 x,经过 2 秒的延迟后返回 x * x 的结果。
  2. std::async

    • std::async 是一个函数模板,用于启动异步任务。
    • 参数 std::launch::async 表示任务将在一个新线程中异步执行。
    • 第二个参数是任务函数(slowTask),后面的参数是传递给任务函数的参数(这里是 5)。
  3. 主线程的行为

    • 在启动异步任务后,主线程继续执行自己的工作(例如打印消息或进行其他计算)。
    • 当主线程需要获取异步任务的结果时,调用 resultFuture.get()。如果异步任务尚未完成,这一步会阻塞,直到结果可用。
  4. std::future

    • std::future 是一个模板类,用于存储异步任务的结果。
    • 调用 get() 方法可以获取异步任务的返回值。

程序输出

运行上述代码,输出可能如下(顺序可能会因线程调度而略有不同):

Main thread is doing other work...
Task started with input: 5
Main thread finished its work.
Task completed. Result: 25
Final result from async task: 25

关键点总结

  1. 异步执行

    • std::async 允许我们异步地运行任务,而不会阻塞主线程。
  2. 线程管理

    • 使用 std::future 可以方便地获取异步任务的结果。
  3. 非阻塞与阻塞

    • 异步任务启动后,主线程可以继续执行其他任务。
    • 调用 future.get() 时,如果任务未完成,主线程会被阻塞,直到结果可用。

扩展:使用 std::thread 实现异步线程

如果你不想使用 std::async,也可以直接使用 std::thread 来实现类似的功能:

#include <iostream>
#include <thread>
#include <chrono>// 模拟一个耗时的任务
void slowTask(int x) {std::cout << "Task started with input: " << x << std::endl;std::this_thread::sleep_for(std::chrono::seconds(2));std::cout << "Task completed. Result: " << (x * x) << std::endl;
}int main() {// 创建一个线程执行异步任务std::thread t(slowTask, 5);// 主线程继续执行其他工作std::cout << "Main thread is doing other work..." << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "Main thread finished its work." << std::endl;// 等待线程完成t.join();return 0;
}

在这个版本中,std::thread 直接启动一个线程,主线程通过 t.join() 等待线程完成。


在这里插入图片描述

说直白的,就是主线程创建了一个子线程,本来是由主线程完成的,现在由这个子线程完成

分辨同步异步线程

要判断一个线程模型是同步还是异步,可以从以下几个关键特征进行区分。我用最简单的比喻和代码示例来说明:


1. 看调用是否立即返回

同步线程(如排队买奶茶)
void syncLog(const std::string& msg) {std::ofstream file("log.txt");file << msg;  // 必须等待文件写入完成// 调用者在这里阻塞等待
}
  • 特征:函数执行完所有操作(包括I/O)后才返回
  • 类比:就像在奶茶店排队,必须等到拿到奶茶才能离开
异步线程(如外卖下单)
void asyncLog(const std::string& msg) {_buffer.push(msg);  // 只是把消息放入队列// 立即返回,后台线程会处理写入
}
  • 特征:函数只把任务放入队列/缓冲区后立即返回
  • 类比:像点外卖,下单后就可以做其他事情,外卖员会异步配送

2. 看是否有专门的工作线程

同步
// 没有额外线程,直接在当前线程处理
syncLog("error");  // 直接卡在这里写文件
异步
class AsyncLogger {std::thread _worker;  // 关键:有专门的工作线程void workerThread() {while (true) { /* 处理队列中的任务 */ }}
};

3. 看资源访问方式

同步
void writeFile(const std::string& data) {// 直接访问资源(文件/网络)_file.write(data);  // 同步I/O
}
异步
void asyncWrite(const std::string& data) {_queue.push(data);  // 只是提交任务// 实际由其他线程从_queue取出数据后写文件
}

4. 典型场景对比

特征同步异步
调用阻塞
工作线程无(当前线程处理)有专门线程
性能影响受I/O速度限制几乎不影响主线程
代码复杂度简单需要线程安全控制
适合场景简单应用、低频操作高性能服务、高频日志

代码示例对比

同步日志
// 当前线程直接写文件(阻塞)
void logSync(const std::string& msg) {std::lock_guard<std::mutex> lock(_mutex);_file << msg;  // 同步写入,耗时操作!
}
异步日志
// 只是提交任务到队列
void logAsync(const std::string& msg) {std::lock_guard<std::mutex> lock(_mutex);_queue.push(msg);  // 内存操作,极快_cond.notify_one(); // 通知工作线程
}// 工作线程
void workerThread() {while (true) {std::string msg = _queue.pop();  // 从队列取任务_file << msg;  // 实际耗时操作在这里}
}

只要记住:异步=把任务丢给别人处理,自己不等结果,就能轻松区分了!

双缓冲区异步任务处理器

设计思想:异步线程 + 数据池

核心思想

通过引入异步线程双缓冲区数据池,将任务的生产和消费分离。生产者负责生成任务并将其放入任务池,而消费者(异步线程)从任务池中取出任务并执行。这种设计的目标是提高任务处理效率,减少锁冲突,并降低内存分配和释放的开销。
在这里插入图片描述


任务池的设计:双缓冲区阻塞数据池

1. 双缓冲区的工作机制
  • 两个缓冲区:任务池由两个独立的缓冲区组成,分别称为主缓冲区和备用缓冲区。
    • 主缓冲区:用于存放生产者提交的任务。
    • 备用缓冲区:在主缓冲区被消费者完全处理后,与主缓冲区交换角色。
  • 交换机制
    • 当主缓冲区中的所有任务被消费者处理完毕后,主缓冲区和备用缓冲区的角色互换。
    • 这样,消费者可以立即开始处理新的任务,而生产者可以继续向新的主缓冲区添加任务。
2. 减少锁冲突
  • 在传统的单缓冲区或循环队列中,每次任务的添加或取出都需要加锁,导致生产者和消费者之间的锁冲突频繁发生。
  • 而双缓冲区通过批量处理的方式减少了锁冲突的概率:
    • 生产者只需要在向主缓冲区添加任务时加锁,消费者只需要在处理完主缓冲区的所有任务后进行一次锁操作来交换缓冲区。
    • 这种方式大大减少了锁的使用频率,降低了线程间的竞争。
3. 内存管理优化
  • 双缓冲区的设计避免了频繁申请和释放内存空间。
    • 缓冲区的大小通常是固定的,生产者和消费者只需在缓冲区内操作,无需动态分配或释放内存。
    • 这不仅提高了性能,还减少了内存碎片化的风险。

优势分析

1. 提高任务处理效率
  • 消费者可以一次性处理主缓冲区中的所有任务,而不是逐条处理。这种方式减少了任务调度的开销,提升了整体效率。
2. 减少锁冲突
  • 由于锁操作只发生在缓冲区交换时,而非每次任务的添加或取出,锁冲突的概率显著降低。
  • 生产者和消费者之间的交互更加高效,尤其在高并发场景下表现尤为明显。
3. 降低内存分配开销
  • 固定大小的缓冲区避免了频繁的内存申请和释放操作,减少了系统资源的消耗。
  • 同时,固定大小的设计也使得内存管理更加可控,避免了因动态内存分配引发的问题。

与其他设计方案的对比

1. 循环队列
  • 循环队列是一种常见的任务池实现方式,但其每次任务的添加或取出都需要加锁,容易引发生产者和消费者之间的锁冲突。
  • 此外,循环队列通常需要动态调整大小,这会增加内存管理的复杂性。
2. 单缓冲区
  • 单缓冲区的设计简单,但在高并发场景下,锁冲突问题尤为突出。
  • 生产者和消费者需要频繁地争夺缓冲区的访问权,导致性能下降。
3. 双缓冲区的优势
  • 相比循环队列和单缓冲区,双缓冲区通过批量处理和缓冲区交换机制,减少了锁冲突和内存管理的开销。
  • 它在保证高效的同时,提供了一种简单且可靠的解决方案。

总结

双缓冲区阻塞数据池的设计思想旨在通过批量处理缓冲区交换,减少锁冲突和内存分配的开销,从而提高任务处理效率。它特别适用于高并发场景下的任务池设计,能够有效平衡生产者和消费者之间的关系,提升系统的整体性能。

不过在这之前,我们得要实现一下缓冲区

缓冲区实现

创建一个头文件,用来专门实现缓冲区:

buffer.hpp
#ifndef __M_BUFFER_H__
#define __M_BUFFER_H__
#include "utils.hpp"
#include <vector>
#include <cassert>namespace logs
{#define DEFAULT_BUFFER_SIZE (1 * 1024 * 1024)#define THRESHOLD_BUFFER_SIZE (8 * 1024 * 1024)#define INCREAMENT_BUFFER_SIZE (1 * 1024 * 1024)class Buffer{public:Buffer(): _buffer(DEFAULT_BUFFER_SIZE), _writer_idx(0), _read_idx(0){}// 1、写入操作void push(const char *data, size_t len){// 1.首先判断是否需要扩容ensureEnoughSize(len);std::copy(data, data + len, &_buffer[_writer_idx]);moveWriter(len);}//返回可读数据的起始地址const char* begin(){return &_buffer[_read_idx];}size_t writeAbleSize(){return (_buffer.size() - _writer_idx);}size_t readAbleSize(){return (_writer_idx - _read_idx);}void reset(){_read_idx = 0;   // 缓冲区所有空间都是空闲的_writer_idx = 0; // 与_writer_idx相等表示没有数据可读}// 对一个buffer进行一个交换void swap(Buffer &buffer){_buffer.swap(buffer._buffer);// 交换指针std::swap(_read_idx, buffer._read_idx);std::swap(_writer_idx, buffer._writer_idx);}void moveReader(size_t len){assert(len <= readAbleSize());_read_idx += len;}// 判断缓冲区是否为空bool empty(){return (_read_idx == _writer_idx);}private:// 扩容操作void ensureEnoughSize(size_t len){if (len <= writeAbleSize()){return;}size_t new_size = 0;if (_buffer.size() < THRESHOLD_BUFFER_SIZE){new_size = _buffer.size() * 2;}else{new_size = _buffer.size() + INCREAMENT_BUFFER_SIZE;}_buffer.resize(new_size);}// 对读写指针进行向后偏移操作void moveWriter(size_t len){assert(len + _writer_idx <= _buffer.size());_writer_idx += len;}std::vector<char> _buffer; // 缓冲区size_t _writer_idx;        // 写指针size_t _read_idx;          // 读指针};
}#endif

这是一个缓冲区的定义,我们要的是一个双缓冲区,所以我们再建一个头文件:

loop.hpp
#ifndef __M_LOOP_H__
#define __M_LOOP_H__#include "utils.hpp"
#include <condition_variable>
#include <thread>
#include <mutex>
#include <functional>
#include <atomic>
#include "message.hpp"
#include "buffer.hpp"
#include "loop.hpp"// 命名空间
namespace logs
{using Functor = std::function<void(Buffer &)>;class AsyncLooper{public:using ptr = std::shared_ptr<AsyncLooper>;enum class AsyncType{ASYNC_SAFE,  // 安全模式,对空间索取有限制ASYNC_UNSAFE // 非安全模式,对空间索取无限制};AsyncLooper(const Functor &cb, AsyncType looper_type = AsyncType::ASYNC_SAFE): _stop(false), _thread(std::thread(&AsyncLooper::threadEntry, this)), _callback(cb), _looper_type(_looper_type){}~AsyncLooper(){stop();}void stop(){_stop = true;_cond_con.notify_all(); // 唤醒所有工作线程_thread.join();}void push(const char *data, size_t len){// 无限扩容或固定大小std::unique_lock<std::mutex> lock(_mutex);// 条件变量空值,如缓冲区空间大小大于数据长度,则可以添加数据if (_looper_type == AsyncType::ASYNC_SAFE)_cond_pro.wait(lock, [&](){ return _pron_buf.writeAbleSize() >= len; });// 如果走下来,可以向缓冲区添加数据_pron_buf.push(data, len);// 唤醒消费者对缓冲区的数据进行处理_cond_con.notify_one();}private:// 线程入口函数void threadEntry(){while (1){// 1.判断生产缓冲区有没有数据,有则交换,无则阻塞std::unique_lock<std::mutex> lock(_mutex);// 若是当前是退出前被唤醒,或者有数据被唤醒,则返回真,继续向下运行,否则重新进入休眠_cond_con.wait(lock, [&](){ return _stop || !_pron_buf.empty(); });退出标志被设置,且生产缓冲区无数据if (_stop && _pron_buf.empty()){break;}_con_buf.swap(_pron_buf);// 2.唤醒生产者_cond_pro.notify_all();}// 3.被唤醒后,对消费缓冲区进行数据处理_callback(_con_buf);// 4.初始化消费缓冲区_con_buf.reset();}Functor _callback; // 具体对缓冲区进行处理的回调函数,由异步工作器使用者传入private:AsyncType _looper_type;  // 异步日志器类型std::atomic<bool> _stop; // 停止标志位Buffer _pron_buf;        // 生产缓冲区Buffer _con_buf;         // 消费缓冲区std::mutex _mutex;std::condition_variable _cond_pro; // 生产条件变量std::condition_variable _cond_con; // 消费者条件变量std::thread _thread;               // 异步工作器对应的工作线程};
} // namespace logs#endif

这里的_pron_buf 和 _con_buf就是两个缓冲区,当_pron_buf缓冲区充满数据之后,就会和_con_buf进行交换,_pron_buf里面的数据就会到_con_buf里面去,这个时候工作线程就会从_con_buf中拿出数据进行工作。

异步任务处理器的构建是基于以上两个文件的,这个时候回到我们之前定义logger的头文件里面去:

#ifndef __M_LOGGER_H__
#define __M_LOGGER_H__
#include "utils.hpp"
#include "message.hpp"
#include "utils.hpp"
#include "level.hpp"
#include "sink.hpp"
#include <atomic>
#include <mutex>
#include <iostream>
#include <memory>
#include <ctime>
#include <vector>
#include <cassert>
#include <sstream>
#include <stdarg.h>
#include "loop.hpp"namespace logs
{class BaseLogger{public:using ptr = std::shared_ptr<BaseLogger>;BaseLogger(const std::string &logger_name,Loglevel::value limt_level,Formetter::ptr &formetter,std::vector<BaseSink::ptr> &sinks): _logger_name(logger_name), _limt_level(limt_level), _formetter(formetter), _sinks(sinks.begin(), sinks.end()){}const std::string &get_logger_name(){return _logger_name;}/*完成构造日志消息对象过程并进行格式化,得到格式化后的日志消息字符串---然后进行输出*/void debug(const std::string &file, size_t line, const std::string fmt, ...){// 如果限制输出的日志等级比debug高,则直接返回if (Loglevel::value::DEBUG < _limt_level){return;}// 声明参数列表变量va_list ap;va_start(ap, fmt); // 初始化fmt是最后一个固定的参数char *res; // 声明缓冲区int ret = vasprintf(&res, fmt.c_str(), ap);if (ret == -1){std::cout << "vasprintf failed!\n";return;}va_end(ap); // 释放参数列表变量serialize(Loglevel::value::DEBUG, file, line, res);free(res);}void info(const std::string &file, size_t line, const std::string fmt, ...){// 如果限制输出的日志等级比debug高,则直接返回if (Loglevel::value::INFO < _limt_level){return;}// 声明参数列表变量va_list ap;va_start(ap, fmt); // 初始化fmt是最后一个固定的参数char *res; // 声明缓冲区int ret = vasprintf(&res, fmt.c_str(), ap);if (ret == -1){std::cout << "vasprintf failed!\n";return;}va_end(ap); // 释放参数列表变量serialize(Loglevel::value::INFO, file, line, res);free(res);}void warn(const std::string &file, size_t line, const std::string fmt, ...){// 如果限制输出的日志等级比debug高,则直接返回if (Loglevel::value::WARN < _limt_level){return;}// 声明参数列表变量va_list ap;va_start(ap, fmt); // 初始化fmt是最后一个固定的参数char *res; // 声明缓冲区int ret = vasprintf(&res, fmt.c_str(), ap);if (ret == -1){std::cout << "vasprintf failed!\n";return;}va_end(ap); // 释放参数列表变量serialize(Loglevel::value::WARN, file, line, res);free(res);}void error(const std::string &file, size_t line, const std::string &fmt, ...){// 1.通过传入的参数构造一个日志对象,进行日志的格式化,最终落地if (Loglevel::value::ERROR < _limt_level){return;}// 2.对fmt格式化字符串和不定参进行字符串组织,得到日志消息字符串va_list ap;va_start(ap, fmt);char *res;int ret = vasprintf(&res, fmt.c_str(), ap);if (ret == -1){std::cout << "vasprintf failed!\n";return;}va_end(ap);serialize(Loglevel::value::ERROR, file, line, res);free(res);}void fatal(const std::string &file, size_t line, const std::string &fmt, ...){// 1.通过传入的参数构造一个日志对象,进行日志的格式化,最终落地if (Loglevel::value::FATAL < _limt_level){return;}// 2.对fmt格式化字符串和不定参进行字符串组织,得到日志消息字符串va_list ap;va_start(ap, fmt);char *res;int ret = vasprintf(&res, fmt.c_str(), ap);if (ret == -1){std::cout << "vasprintf failed!\n";return;}va_end(ap);serialize(Loglevel::value::FATAL, file, line, res);free(res);}protected:void serialize(Loglevel::value level, const std::string &file_name,size_t line, char *str){// 1.构造msg对象logs::logMsg msg(level, file_name, line, _logger_name, str);// 2.利用Formetter进行消息格式化std::stringstream ss;_formetter->format(ss, msg);// 3.落地方向的输出log(ss.str().c_str(), ss.str().size());}/*抽象接口完成实际的落地输出---不同的日志器会有不同的落地方式*/virtual void log(const char *data, size_t len) = 0;protected:std::mutex _mutex;                        // 锁std::string _logger_name;                 // 日志器名称std::atomic<Loglevel::value> _limt_level; // 日志等级Formetter::ptr _formetter;                // 格式化消息指针std::vector<BaseSink::ptr> _sinks;        // 落地方向};class SyncLogger : public BaseLogger{public:SyncLogger(const std::string &logger_name,Loglevel::value limt_level,Formetter::ptr &formetter,std::vector<BaseSink::ptr> &sinks): BaseLogger(logger_name, limt_level, formetter, sinks) {}protected:void log(const char *data, size_t len){// 1.上锁std::unique_lock<std::mutex> _lock(std::mutex);if (_sinks.empty())return;for (auto &sink : _sinks){sink->log(data, len);}}};enum class loggerType{LOGGER_SYNC,LOGGER_ASYNC};class AsyncLogger : public BaseLogger{public:AsyncLogger(const std::string &logger_name, Loglevel::value level,Formetter::ptr &formatter,std::vector<BaseSink::ptr> &sinks,logs::AsyncLooper::AsyncType looper_type): BaseLogger(logger_name, level, formatter, sinks), _looper(std::make_shared<AsyncLooper>(std::bind(&AsyncLogger::realLog, this, std::placeholders::_1), looper_type)){}void log(const char *data, size_t len){_looper->push(data,len);}void realLog(Buffer& buffer){if (_sinks.empty())return;for (auto &sink : _sinks){sink->log(buffer.begin(), buffer.readAbleSize());}}private:AsyncLooper::ptr _looper;};
}#endif

我们可以来测试一下:

#include "utils.hpp"
#include "level.hpp"
#include "message.hpp"
#include "fometter.hpp"
#include "sink.hpp"
#include "logger.hpp"int main()
{logs::Formetter formatter("abc[%d{%H:%M:%S}][%c]%T%m%n");logs::Formetter::ptr fmt_ptr = std::make_shared<logs::Formetter>(formatter);auto st1 = logs::SinkFactory::create<logs::StdoutSink>();std::vector<logs::BaseSink::ptr> sinks = {st1};std::string logger_name = "asynclogger";logs::BaseLogger::ptr logger(new logs::AsyncLogger(logger_name, logs::Loglevel::value::DEBUG, fmt_ptr, sinks,logs::AsyncLooper::AsyncType::ASYNC_SAFE));logger->debug("main.cc", 53, "%s","格式化功能测试....");// 5. 确保异步日志线程有足够时间处理std::this_thread::sleep_for(std::chrono::milliseconds(100)); }

在这里插入图片描述

总结

loop.hpp中与线程有关的操作

在这段代码中,与线程相关的操作和函数主要包括以下几个方面:


1. 线程的创建与管理

  • 线程的创建

    • AsyncLooper 构造函数中,通过 std::thread 创建一个新的线程 _thread,并将其绑定到成员函数 threadEntry
      _thread(std::thread(&AsyncLooper::threadEntry, this))
      
      这个线程是异步工作器的核心,负责处理缓冲区中的数据。
  • 线程的销毁

    • 在析构函数 ~AsyncLooper() 中调用 stop() 函数,确保线程安全退出。
      ~AsyncLooper()
      {stop();
      }
      
  • 线程的停止

    • stop() 函数用于优雅地停止线程:
      • 设置 _stop 标志位为 true,通知线程准备退出。
      • 调用 _cond_con.notify_all() 唤醒所有等待的线程。
      • 使用 _thread.join() 等待线程执行完毕后回收资源。
        void stop()
        {_stop = true;_cond_con.notify_all(); // 唤醒所有工作线程_thread.join();
        }
        

2. 线程同步与通信

为了确保线程之间的安全协作,使用了互斥锁(std::mutex)和条件变量(std::condition_variable)。

  • 互斥锁 (std::mutex)

    • _mutex 用于保护共享资源(如生产缓冲区 _pron_buf 和消费缓冲区 _con_buf),防止多个线程同时访问导致数据竞争。
    • push()threadEntry() 中使用 std::unique_lock<std::mutex>_mutex 加锁。
  • 条件变量 (std::condition_variable)

    • 生产者条件变量 _cond_pro

      • 用于控制生产者的阻塞和唤醒。
      • push() 中,当缓冲区空间不足时,生产者会调用 _cond_pro.wait() 阻塞,直到消费者释放空间。
        _cond_pro.wait(lock, [&]() { return _pron_buf.writeAbleSize() >= len; });
        
      • 消费者在处理完数据后,调用 _cond_pro.notify_all() 唤醒所有等待的生产者。
        _cond_pro.notify_all();
        
    • 消费者条件变量 _cond_con

      • 用于控制消费者的阻塞和唤醒。
      • threadEntry() 中,当生产缓冲区为空时,消费者会调用 _cond_con.wait() 阻塞,直到有新数据可用。
        _cond_con.wait(lock, [&]() { return _stop || !_pron_buf.empty(); });
        
      • 生产者在向缓冲区添加数据后,调用 _cond_con.notify_one() 唤醒一个消费者。
        _cond_con.notify_one();
        

3. 线程入口函数

  • threadEntry()
    • 这是线程的主要执行逻辑,负责从生产缓冲区 _pron_buf 中读取数据并处理。
    • 主要流程:
      1. 判断生产缓冲区是否有数据。如果有,则交换生产缓冲区和消费缓冲区;如果没有,则阻塞等待。
      2. 如果退出标志 _stop 被设置且生产缓冲区为空,则线程退出。
      3. 调用回调函数 _callback(_con_buf) 处理消费缓冲区中的数据。
      4. 初始化消费缓冲区 _con_buf,以便下一次使用。
      void threadEntry()
      {while (1){std::unique_lock<std::mutex> lock(_mutex);_cond_con.wait(lock, [&]() { return _stop || !_pron_buf.empty(); });if (_stop && _pron_buf.empty()){break;}_con_buf.swap(_pron_buf);_cond_pro.notify_all();}_callback(_con_buf);_con_buf.reset();
      }
      

4. 数据生产与消费

  • 生产数据

    • push(const char *data, size_t len) 是生产者接口,用于向生产缓冲区 _pron_buf 添加数据。
    • 在安全模式下(ASYNC_SAFE),如果缓冲区空间不足,生产者会阻塞等待。
      if (_looper_type == AsyncType::ASYNC_SAFE)_cond_pro.wait(lock, [&]() { return _pron_buf.writeAbleSize() >= len; });
      
  • 消费数据

    • 消费者线程通过 threadEntry() 不断从生产缓冲区中获取数据,并调用用户提供的回调函数 _callback 进行处理。

5. 线程安全的设计

  • 原子变量 _stop

    • 使用 std::atomic<bool> 类型的 _stop 标志位,确保多线程环境下的安全读写。
      std::atomic<bool> _stop;
      
  • 双重检查机制

    • threadEntry() 中,使用 _stop && _pron_buf.empty() 来判断是否需要退出线程,避免误退出或死锁。

总结

这段代码通过多线程技术实现了生产者-消费者模型,核心线程相关的操作包括:

  1. 线程的创建、管理和销毁。
  2. 使用互斥锁和条件变量实现线程同步与通信。
  3. 提供生产者接口 push() 和消费者逻辑 threadEntry()
  4. 通过原子变量和双重检查机制确保线程安全。

这种设计适用于异步日志记录等场景,能够高效地处理大量并发数据。

placeholders::_1

下面是一个简单的例子来展示 std::bindstd::placeholders::_1 的使用方法。这个例子定义了一个类 Calculator,其中包含一个成员函数 add,该函数接受两个参数:一个整数和另一个整数引用。我们将通过 std::bind 将这个成员函数绑定到一个对象实例,并使用占位符 _1 来预留第二个参数的位置。

示例代码

#include <iostream>
#include <functional> // 包含 std::bind 和 std::placeholdersclass Calculator {
public:// 成员函数 add 接受一个整数 n 和一个整数引用 mvoid add(int n, int& m) {m += n;std::cout << "After adding " << n << ", m = " << m << std::endl;}
};int main() {Calculator calc;  // 创建 Calculator 类的实例int value = 10;// 使用 std::bind 绑定成员函数 add 到 calc 实例,并预留第二个参数的位置auto boundFunc = std::bind(&Calculator::add, &calc, 5, std::placeholders::_1);// 调用绑定后的函数对象,传递 value 变量作为第二个参数boundFunc(value);return 0;
}

说明

  • Calculator::add

    • 这个成员函数接收两个参数:一个整数 n 和一个整数引用 m。它将 n 加到 m 上,并输出结果。
  • std::bind

    • 我们使用 std::bind 函数将 Calculator::add 成员函数绑定到 calc 对象实例上,并固定第一个参数为 5(即每次调用都会加上 5),同时使用 std::placeholders::_1 占位符表示第二个参数将在实际调用时传入。
  • boundFunc

    • boundFunc 是一个可调用对象,当调用它并传入一个整数引用时,实际上会调用 calc.add(5, /*传入的值*/)
  • boundFunc(value);

    • 在这里,我们调用了 boundFunc 并传入了 value 变量作为第二个参数。这将导致 Calculator::add 函数被调用,执行 value += 5 操作,并打印出更新后的 value 值。

输出结果

当你运行上述代码时,程序将输出:

After adding 5, m = 15

这表明原始的 value 变量从 10 变为了 15,因为 5 被加到了 value 上。通过这种方式,我们可以看到如何使用 std::bindstd::placeholders::_1 来创建一个绑定特定参数的函数对象,并在调用时动态地提供剩余的参数。

放到代码中:


具体作用

在这段代码中:

_looper(std::make_shared<AsyncLooper>(std::bind(&AsyncLogger::realLog, this, std::placeholders::_1), looper_type))
  • std::bind 用于将成员函数 AsyncLogger::realLog 绑定到一个可调用对象。
  • this 表示当前对象(即 AsyncLogger 的实例),作为成员函数的第一个隐式参数。
  • std::placeholders::_1 是一个占位符,表示这个绑定的函数对象在调用时需要接收一个参数,并将该参数传递给 AsyncLogger::realLog 的第一个显式参数。

换句话说,std::placeholders::_1 在这里的作用是预留一个位置,使得 AsyncLooper 的回调函数可以在运行时动态地传入一个参数(比如 Buffer & 类型的数据)。


详细解释

假设 AsyncLogger::realLog 的定义如下:

void AsyncLogger::realLog(Buffer &buffer);

通过 std::bindstd::placeholders::_1,我们创建了一个函数对象,其行为等价于:

void callback(Buffer &buffer) {this->realLog(buffer);
}

这个函数对象会被传递给 AsyncLooper 的构造函数,作为 _callback 回调函数。当 AsyncLooper 调用 _callback 时,它会将一个 Buffer 对象作为参数传递给 realLog


为什么需要 std::placeholders::_1

  1. 适配函数签名

    • AsyncLooper 的构造函数期望一个 std::function<void(Buffer &)> 类型的回调函数。
    • AsyncLogger::realLog 是一个成员函数,需要绑定到某个对象(this)才能调用。
    • 使用 std::placeholders::_1 可以确保绑定后的函数对象能够接受一个参数,并将其正确传递给 realLog
  2. 灵活性

    • 占位符允许我们灵活地控制参数的传递顺序和数量。例如,如果有多个参数,可以使用 _1, _2, _3 等分别表示不同的参数位置。

示例对比

不使用 std::bind 和占位符:

如果直接传递成员函数指针,编译器会报错,因为成员函数需要绑定到一个对象:

_looper(std::make_shared<AsyncLooper>(&AsyncLogger::realLog, looper_type));
// 错误:无法直接传递成员函数指针
使用 std::bind 和占位符:

通过 std::bind,我们可以将成员函数绑定到当前对象,并预留参数位置:

_looper(std::make_shared<AsyncLooper>(std::bind(&AsyncLogger::realLog, this, std::placeholders::_1), looper_type));
// 正确:绑定后的函数对象符合 std::function<void(Buffer &)> 的要求

总结

std::placeholders::_1 的作用是为绑定的函数对象预留一个参数位置,使得 AsyncLogger::realLog 可以在运行时接受一个参数(如 Buffer &)。这种机制让 std::bind 更加灵活,能够适配不同函数签名的需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/76592.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# 使用 BinaryFormatter 和相关类型时的反序列化风险

C# 使用 BinaryFormatter 和相关类型时的反序列化风险 由来&#xff1a;在项目使用.NET Reactor 混淆 C# 的序列化和反序列化发现存在的问题&#xff0c;读取文件时&#xff0c;转化为对应的类数据有时候为空&#xff0c;所以就在网上搜索了相关知识&#xff0c;在此做个笔记以…

OpenCv高阶(四)——角点检测

一、角点检测 在计算机视觉中&#xff0c;角点检测是识别图像中局部区域&#xff08;角点&#xff09;的关键技术&#xff0c;这些区域通常是两条或多条边缘的交点&#xff0c;具有丰富的结构信息&#xff0c;常用于图像匹配、跟踪、三维重建等任务。 Harris角点检测算法是一…

Conda 入门指令教程

Conda 入门指令教程 Conda 是一个强大的包和环境管理工具&#xff0c;广泛应用于数据科学和机器学习项目中。本文将介绍 Conda 的常用指令&#xff0c;帮助你快速上手。 1. Conda 基础操作 查看 Conda 版本 conda --version显示当前安装的 Conda 版本。 更新 Conda conda…

Unity ShaderLab引用HLSL文件找不到其中函数

在写Unity Shader的过程中&#xff0c;常常需要将方法封装到HLSL文件中&#xff0c;今天遇到一个这样的报错&#xff0c; 明明hlsl文件路径引用没问题&#xff0c;却引用不到方法 并且将分散文件中的函数复制过来一切正常&#xff0c;最终定位到HLSL的预编译指令中 这指令的…

uniapp上传图片时(可选微信头像、相册、拍照)

参考文献&#xff1a;微信小程序登录——头像_onchooseavatar-CSDN博客 <button open-type"chooseAvatar" chooseavatar"onChooseAvatar"> </button>onChooseAvatar(e) {uni.showLoading({title: 上传中...,mask: true});uni.uploadFile({url…

单元测试的一般步骤

Qt Test Qt Test 是 Qt 开发人员发布的一个单元测试框架&#xff0c;用于测试基于 Qt 框架的应用程序或库。它提供了单元测试框架中常见的所有功能以及用于测试图形用户界面的扩展。 1.自动化测试包络ui测试>接口测试>单元测试&#xff1b;现问如何使用Qt进行单元测试&…

【Matlab】中国沿岸潮滩宽度和坡度分布

【Matlab】中国沿岸潮滩宽度和坡度分布 参考文献见最后或者阅读原文&#xff01; 中国沿岸潮滩宽度和坡度分布: figure 1 a 潮滩宽度分布。b 潮滩坡度分布。 图中标注了中国沿海各省&#xff0c;分别为辽宁&#xff08;LN&#xff09;、河北&#xff08;HB&#xff09;、山东&…

理解.NET Core中的配置Configuration

什么是配置 .NET中的配置&#xff0c;本质上就是key-value键值对&#xff0c;并且key和value都是字符串类型。 在.NET中提供了多种配置提供程序来对不同的配置进行读取、写入、重载等操作&#xff0c;这里我们以为.NET 的源码项目为例&#xff0c;来看下.NET中的配置主要是有…

windows服务器及网络:论如何安装(虚拟机)

今天我要介绍的是&#xff1a;在Windows中对于安装系统&#xff08;虚拟机的步骤以及相关的安装事宜&#xff09;&#xff0c;事不宜迟&#xff0c;让我们来看看系统安装&#xff08;虚拟机&#xff09;是怎么操作的&#xff1a; 对现在来说&#xff0c;安装电脑系统已经是非常…

shardingsphere-jdbc集成Seata分布式事务

1、导入相关依赖 <!-- shardingsphere-jdbc --><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>shardingsphere-jdbc</artifactId><version>5.5.1</version></dependency><!-- shardingspher…

05-DevOps-Jenkins自动拉取构建代码

新建Gitlab仓库 先在Gitab上创建一个代码仓库&#xff0c;选择创建空白项目 安装说明进行填写&#xff0c;然后点击创建项目 创建好的仓库是空的&#xff0c;什么都没有 新建一个springboot项目&#xff0c;用于代码上传使用。 只是为了测试代码上传功能&#xff0c;所以代码…

C#核心(24)结构体和类的区别,抽象类和接口的区别(面试常问)

前言 随着上一节我们对StringBulider的讲解落下帷幕&#xff0c;c#核心的知识点我们也即将告一段落,我们讲完了面向对象要用的三大特性&#xff08;封装&#xff0c;继承&#xff0c;多态&#xff09;和七大原则。期中自然也不乏一些小的散的碎的的知识点。 今天我们要讲的也…

HTMLCSS实现异环网站,期末web作业

本网站是我在学习前端时敲得&#xff0c;仅供学习使用。 这段代码是一个完整的 HTML 网页项目&#xff0c;包含 HTML、CSS 和 JavaScript 部分&#xff0c;用于构建一个名为 “异环” 的网页。网页具备头部导航栏、主体视频展示、图片交互元素、音乐播放控制、视频弹窗播放以及…

Oracle表的别名不能用as,列的别名可以用as

在 Oracle 数据库中&#xff0c;‌表的别名‌和‌列的别名‌在使用 AS 关键字时确实有不同规则&#xff0c;以下是详细说明&#xff1a; 1. 表的别名&#xff08;Table Alias&#xff09;‌ ‌不支持 AS 关键字‌&#xff0c;直接跟在表名后即可。‌语法示例‌&#xff1a; S…

【SAP ME 44】在 HANA DB中报废SFC时的SHOP_ORDER表记录锁定

症状 SELECT…FROM SHOP_ORDER FOR UPDATE 在 SFC 报废期间持有锁,当同时调用数量较大时,可能会导致 HANA 数据库出现大量锁积压。这有时会导致因等待 HANA 数据库释放“选择更新”锁而导致报废 SFC 花费数分钟。 HANA 数据库日志中的示例: # begin PreparedStatement_ex…

Vscode开发Vue项目NodeJs启动报错处理

文章目录 背景一、npm启动报错报错信息定位原因处理方案第一步、下载安装高版本 二、node 无法识别报错信息处理方案定位原因第一步、检测环境变量第二步、重新开启界面 背景 使用Vscode开发Vue项目&#xff0c;使用到NodeJs&#xff0c;记录出现的问题及处理方案&#xff0c;…

破局遗留系统!AI自动化重构:从静态方法到Spring Bean注入实战

在当今快速发展的软件行业中,许多企业都面临着 Java 遗留系统的维护和升级难题。这些老旧系统往往采用了大量静态方法,随着业务的不断发展,其局限性日益凸显。而飞算 JavaAI 作为一款强大的 AI 工具,为 Java 遗留系统的重构提供了全新的解决方案,能够实现从静态方法到 Spring B…

2025妈妈杯数学建模C题完整分析论文(共36页)(含模型建立、可运行代码、数据)

2025 年第十五届 MathorCup 数学建模C题完整分析论文 目录 摘 要 一、问题分析 二、问题重述 三、模型假设 四、 模型建立与求解 4.1问题1 4.1.1问题1思路分析 4.1.2问题1模型建立 4.1.3问题1代码&#xff08;仅供参考&#xff09; 4.1.4问题1求解结果&#xff08;仅…

【Python爬虫详解】第一篇:Python爬虫入门指南

什么是网络爬虫&#xff1f; 网络爬虫&#xff08;Web Crawler&#xff09;是一种自动获取网页内容的程序。它可以访问网站&#xff0c;抓取页面内容&#xff0c;并从中提取有价值的数据。在信息爆炸的时代&#xff0c;爬虫技术可以帮助我们高效地收集、整理和分析互联网上的海…

【JavaWeb后端开发02】SpringBootWeb + Https协议

课程内容&#xff1a; SpringBootWeb 入门 Http协议 SpringBootWeb案例 分层解耦 文章目录 1. SpringBootWeb入门1.1 概述1.2 入门程序1.2.1 需求1.2.2 开发步骤1.2.3 常见问题 1.3 入门解析 2. HTTP协议2.1 HTTP概述2.1.1 介绍2.1.2 特点 2.2 HTTP请求协议2.2.1 介绍2.2.2…