C++项目 —— 基于多设计模式下的同步&异步日志系统(4)(双缓冲区异步任务处理器(AsyncLooper)设计)
- 异步线程
- 什么是异步线程?
- C++ 异步线程简单例子
- 代码解释
- 程序输出
- 关键点总结
- 扩展:使用 `std::thread` 实现异步线程
- 分辨同步异步线程
- 1. 看调用是否立即返回
- 同步线程(如排队买奶茶)
- 异步线程(如外卖下单)
- 2. 看是否有专门的工作线程
- 同步
- 异步
- 3. 看资源访问方式
- 同步
- 异步
- 4. 典型场景对比
- 代码示例对比
- 同步日志
- 异步日志
- 双缓冲区异步任务处理器
- 设计思想:异步线程 + 数据池
- 核心思想
- 任务池的设计:双缓冲区阻塞数据池
- 1. 双缓冲区的工作机制
- 2. 减少锁冲突
- 3. 内存管理优化
- 优势分析
- 1. 提高任务处理效率
- 2. 减少锁冲突
- 3. 降低内存分配开销
- 与其他设计方案的对比
- 1. 循环队列
- 2. 单缓冲区
- 3. 双缓冲区的优势
- 总结
- 缓冲区实现
- 总结
- loop.hpp中与线程有关的操作
- 1. 线程的创建与管理
- 2. 线程同步与通信
- 3. 线程入口函数
- 4. 数据生产与消费
- 5. 线程安全的设计
- 总结
- placeholders::_1
- 示例代码
- 说明
- 输出结果
- 具体作用
- 详细解释
- 为什么需要 `std::placeholders::_1`?
- 示例对比
- 不使用 `std::bind` 和占位符:
- 使用 `std::bind` 和占位符:
- 总结
我们上次把同步日志器的编写已经差不多完成了,如果还没有看过同步日志器编写的小伙伴可以点击这里:
https://blog.csdn.net/qq_67693066/article/details/147309275?spm=1011.2415.3001.5331
我们今天主要来编写异步任务处理器,但是在这之前我们得了解一下什么是异步线程:
异步线程
什么是异步线程?
异步线程指的是程序中的一种执行模式,其中某些任务可以在主线程或其他线程中并行运行,而不会阻塞主线程的执行。换句话说,异步线程允许我们在不等待某个操作完成的情况下继续执行其他任务。
在 C++ 中,异步线程可以通过标准库中的 <thread>
或 <future>
来实现。常见的异步编程工具包括:
std::thread
:用于创建和管理线程。std::async
:用于以异步方式执行任务,并返回结果。std::future
和std::promise
:用于在线程间传递数据。
C++ 异步线程简单例子
以下是一个使用 std::async
实现异步线程的简单例子:
#include <iostream>
#include <future> // std::async, std::future
#include <chrono> // std::this_thread::sleep_for
#include <thread> // std::this_thread// 模拟一个耗时的任务
int slowTask(int x) {std::cout << "Task started with input: " << x << std::endl;std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时操作int result = x * x;std::cout << "Task completed. Result: " << result << std::endl;return result;
}int main() {// 使用 std::async 启动一个异步任务std::future<int> resultFuture = std::async(std::launch::async, slowTask, 5);// 主线程继续执行其他工作std::cout << "Main thread is doing other work..." << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟主线程的工作std::cout << "Main thread finished its work." << std::endl;// 获取异步任务的结果(如果任务未完成,会阻塞直到结果可用)int result = resultFuture.get();std::cout << "Final result from async task: " << result << std::endl;return 0;
}
代码解释
-
slowTask
函数:- 这是一个模拟的耗时任务,接收一个整数参数
x
,经过 2 秒的延迟后返回x * x
的结果。
- 这是一个模拟的耗时任务,接收一个整数参数
-
std::async
:std::async
是一个函数模板,用于启动异步任务。- 参数
std::launch::async
表示任务将在一个新线程中异步执行。 - 第二个参数是任务函数(
slowTask
),后面的参数是传递给任务函数的参数(这里是5
)。
-
主线程的行为:
- 在启动异步任务后,主线程继续执行自己的工作(例如打印消息或进行其他计算)。
- 当主线程需要获取异步任务的结果时,调用
resultFuture.get()
。如果异步任务尚未完成,这一步会阻塞,直到结果可用。
-
std::future
:std::future
是一个模板类,用于存储异步任务的结果。- 调用
get()
方法可以获取异步任务的返回值。
程序输出
运行上述代码,输出可能如下(顺序可能会因线程调度而略有不同):
Main thread is doing other work...
Task started with input: 5
Main thread finished its work.
Task completed. Result: 25
Final result from async task: 25
关键点总结
-
异步执行:
std::async
允许我们异步地运行任务,而不会阻塞主线程。
-
线程管理:
- 使用
std::future
可以方便地获取异步任务的结果。
- 使用
-
非阻塞与阻塞:
- 异步任务启动后,主线程可以继续执行其他任务。
- 调用
future.get()
时,如果任务未完成,主线程会被阻塞,直到结果可用。
扩展:使用 std::thread
实现异步线程
如果你不想使用 std::async
,也可以直接使用 std::thread
来实现类似的功能:
#include <iostream>
#include <thread>
#include <chrono>// 模拟一个耗时的任务
void slowTask(int x) {std::cout << "Task started with input: " << x << std::endl;std::this_thread::sleep_for(std::chrono::seconds(2));std::cout << "Task completed. Result: " << (x * x) << std::endl;
}int main() {// 创建一个线程执行异步任务std::thread t(slowTask, 5);// 主线程继续执行其他工作std::cout << "Main thread is doing other work..." << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "Main thread finished its work." << std::endl;// 等待线程完成t.join();return 0;
}
在这个版本中,std::thread
直接启动一个线程,主线程通过 t.join()
等待线程完成。
说直白的,就是主线程创建了一个子线程,本来是由主线程完成的,现在由这个子线程完成。
分辨同步异步线程
要判断一个线程模型是同步还是异步,可以从以下几个关键特征进行区分。我用最简单的比喻和代码示例来说明:
1. 看调用是否立即返回
同步线程(如排队买奶茶)
void syncLog(const std::string& msg) {std::ofstream file("log.txt");file << msg; // 必须等待文件写入完成// 调用者在这里阻塞等待
}
- 特征:函数执行完所有操作(包括I/O)后才返回
- 类比:就像在奶茶店排队,必须等到拿到奶茶才能离开
异步线程(如外卖下单)
void asyncLog(const std::string& msg) {_buffer.push(msg); // 只是把消息放入队列// 立即返回,后台线程会处理写入
}
- 特征:函数只把任务放入队列/缓冲区后立即返回
- 类比:像点外卖,下单后就可以做其他事情,外卖员会异步配送
2. 看是否有专门的工作线程
同步
// 没有额外线程,直接在当前线程处理
syncLog("error"); // 直接卡在这里写文件
异步
class AsyncLogger {std::thread _worker; // 关键:有专门的工作线程void workerThread() {while (true) { /* 处理队列中的任务 */ }}
};
3. 看资源访问方式
同步
void writeFile(const std::string& data) {// 直接访问资源(文件/网络)_file.write(data); // 同步I/O
}
异步
void asyncWrite(const std::string& data) {_queue.push(data); // 只是提交任务// 实际由其他线程从_queue取出数据后写文件
}
4. 典型场景对比
特征 | 同步 | 异步 |
---|---|---|
调用阻塞 | 是 | 否 |
工作线程 | 无(当前线程处理) | 有专门线程 |
性能影响 | 受I/O速度限制 | 几乎不影响主线程 |
代码复杂度 | 简单 | 需要线程安全控制 |
适合场景 | 简单应用、低频操作 | 高性能服务、高频日志 |
代码示例对比
同步日志
// 当前线程直接写文件(阻塞)
void logSync(const std::string& msg) {std::lock_guard<std::mutex> lock(_mutex);_file << msg; // 同步写入,耗时操作!
}
异步日志
// 只是提交任务到队列
void logAsync(const std::string& msg) {std::lock_guard<std::mutex> lock(_mutex);_queue.push(msg); // 内存操作,极快_cond.notify_one(); // 通知工作线程
}// 工作线程
void workerThread() {while (true) {std::string msg = _queue.pop(); // 从队列取任务_file << msg; // 实际耗时操作在这里}
}
只要记住:异步=把任务丢给别人处理,自己不等结果,就能轻松区分了!
双缓冲区异步任务处理器
设计思想:异步线程 + 数据池
核心思想
通过引入异步线程和双缓冲区数据池,将任务的生产和消费分离。生产者负责生成任务并将其放入任务池,而消费者(异步线程)从任务池中取出任务并执行。这种设计的目标是提高任务处理效率,减少锁冲突,并降低内存分配和释放的开销。
任务池的设计:双缓冲区阻塞数据池
1. 双缓冲区的工作机制
- 两个缓冲区:任务池由两个独立的缓冲区组成,分别称为主缓冲区和备用缓冲区。
- 主缓冲区:用于存放生产者提交的任务。
- 备用缓冲区:在主缓冲区被消费者完全处理后,与主缓冲区交换角色。
- 交换机制:
- 当主缓冲区中的所有任务被消费者处理完毕后,主缓冲区和备用缓冲区的角色互换。
- 这样,消费者可以立即开始处理新的任务,而生产者可以继续向新的主缓冲区添加任务。
2. 减少锁冲突
- 在传统的单缓冲区或循环队列中,每次任务的添加或取出都需要加锁,导致生产者和消费者之间的锁冲突频繁发生。
- 而双缓冲区通过批量处理的方式减少了锁冲突的概率:
- 生产者只需要在向主缓冲区添加任务时加锁,消费者只需要在处理完主缓冲区的所有任务后进行一次锁操作来交换缓冲区。
- 这种方式大大减少了锁的使用频率,降低了线程间的竞争。
3. 内存管理优化
- 双缓冲区的设计避免了频繁申请和释放内存空间。
- 缓冲区的大小通常是固定的,生产者和消费者只需在缓冲区内操作,无需动态分配或释放内存。
- 这不仅提高了性能,还减少了内存碎片化的风险。
优势分析
1. 提高任务处理效率
- 消费者可以一次性处理主缓冲区中的所有任务,而不是逐条处理。这种方式减少了任务调度的开销,提升了整体效率。
2. 减少锁冲突
- 由于锁操作只发生在缓冲区交换时,而非每次任务的添加或取出,锁冲突的概率显著降低。
- 生产者和消费者之间的交互更加高效,尤其在高并发场景下表现尤为明显。
3. 降低内存分配开销
- 固定大小的缓冲区避免了频繁的内存申请和释放操作,减少了系统资源的消耗。
- 同时,固定大小的设计也使得内存管理更加可控,避免了因动态内存分配引发的问题。
与其他设计方案的对比
1. 循环队列
- 循环队列是一种常见的任务池实现方式,但其每次任务的添加或取出都需要加锁,容易引发生产者和消费者之间的锁冲突。
- 此外,循环队列通常需要动态调整大小,这会增加内存管理的复杂性。
2. 单缓冲区
- 单缓冲区的设计简单,但在高并发场景下,锁冲突问题尤为突出。
- 生产者和消费者需要频繁地争夺缓冲区的访问权,导致性能下降。
3. 双缓冲区的优势
- 相比循环队列和单缓冲区,双缓冲区通过批量处理和缓冲区交换机制,减少了锁冲突和内存管理的开销。
- 它在保证高效的同时,提供了一种简单且可靠的解决方案。
总结
双缓冲区阻塞数据池的设计思想旨在通过批量处理和缓冲区交换,减少锁冲突和内存分配的开销,从而提高任务处理效率。它特别适用于高并发场景下的任务池设计,能够有效平衡生产者和消费者之间的关系,提升系统的整体性能。
不过在这之前,我们得要实现一下缓冲区:
缓冲区实现
创建一个头文件,用来专门实现缓冲区:
buffer.hpp
#ifndef __M_BUFFER_H__
#define __M_BUFFER_H__
#include "utils.hpp"
#include <vector>
#include <cassert>namespace logs
{#define DEFAULT_BUFFER_SIZE (1 * 1024 * 1024)#define THRESHOLD_BUFFER_SIZE (8 * 1024 * 1024)#define INCREAMENT_BUFFER_SIZE (1 * 1024 * 1024)class Buffer{public:Buffer(): _buffer(DEFAULT_BUFFER_SIZE), _writer_idx(0), _read_idx(0){}// 1、写入操作void push(const char *data, size_t len){// 1.首先判断是否需要扩容ensureEnoughSize(len);std::copy(data, data + len, &_buffer[_writer_idx]);moveWriter(len);}//返回可读数据的起始地址const char* begin(){return &_buffer[_read_idx];}size_t writeAbleSize(){return (_buffer.size() - _writer_idx);}size_t readAbleSize(){return (_writer_idx - _read_idx);}void reset(){_read_idx = 0; // 缓冲区所有空间都是空闲的_writer_idx = 0; // 与_writer_idx相等表示没有数据可读}// 对一个buffer进行一个交换void swap(Buffer &buffer){_buffer.swap(buffer._buffer);// 交换指针std::swap(_read_idx, buffer._read_idx);std::swap(_writer_idx, buffer._writer_idx);}void moveReader(size_t len){assert(len <= readAbleSize());_read_idx += len;}// 判断缓冲区是否为空bool empty(){return (_read_idx == _writer_idx);}private:// 扩容操作void ensureEnoughSize(size_t len){if (len <= writeAbleSize()){return;}size_t new_size = 0;if (_buffer.size() < THRESHOLD_BUFFER_SIZE){new_size = _buffer.size() * 2;}else{new_size = _buffer.size() + INCREAMENT_BUFFER_SIZE;}_buffer.resize(new_size);}// 对读写指针进行向后偏移操作void moveWriter(size_t len){assert(len + _writer_idx <= _buffer.size());_writer_idx += len;}std::vector<char> _buffer; // 缓冲区size_t _writer_idx; // 写指针size_t _read_idx; // 读指针};
}#endif
这是一个缓冲区的定义,我们要的是一个双缓冲区,所以我们再建一个头文件:
loop.hpp
#ifndef __M_LOOP_H__
#define __M_LOOP_H__#include "utils.hpp"
#include <condition_variable>
#include <thread>
#include <mutex>
#include <functional>
#include <atomic>
#include "message.hpp"
#include "buffer.hpp"
#include "loop.hpp"// 命名空间
namespace logs
{using Functor = std::function<void(Buffer &)>;class AsyncLooper{public:using ptr = std::shared_ptr<AsyncLooper>;enum class AsyncType{ASYNC_SAFE, // 安全模式,对空间索取有限制ASYNC_UNSAFE // 非安全模式,对空间索取无限制};AsyncLooper(const Functor &cb, AsyncType looper_type = AsyncType::ASYNC_SAFE): _stop(false), _thread(std::thread(&AsyncLooper::threadEntry, this)), _callback(cb), _looper_type(_looper_type){}~AsyncLooper(){stop();}void stop(){_stop = true;_cond_con.notify_all(); // 唤醒所有工作线程_thread.join();}void push(const char *data, size_t len){// 无限扩容或固定大小std::unique_lock<std::mutex> lock(_mutex);// 条件变量空值,如缓冲区空间大小大于数据长度,则可以添加数据if (_looper_type == AsyncType::ASYNC_SAFE)_cond_pro.wait(lock, [&](){ return _pron_buf.writeAbleSize() >= len; });// 如果走下来,可以向缓冲区添加数据_pron_buf.push(data, len);// 唤醒消费者对缓冲区的数据进行处理_cond_con.notify_one();}private:// 线程入口函数void threadEntry(){while (1){// 1.判断生产缓冲区有没有数据,有则交换,无则阻塞std::unique_lock<std::mutex> lock(_mutex);// 若是当前是退出前被唤醒,或者有数据被唤醒,则返回真,继续向下运行,否则重新进入休眠_cond_con.wait(lock, [&](){ return _stop || !_pron_buf.empty(); });退出标志被设置,且生产缓冲区无数据if (_stop && _pron_buf.empty()){break;}_con_buf.swap(_pron_buf);// 2.唤醒生产者_cond_pro.notify_all();}// 3.被唤醒后,对消费缓冲区进行数据处理_callback(_con_buf);// 4.初始化消费缓冲区_con_buf.reset();}Functor _callback; // 具体对缓冲区进行处理的回调函数,由异步工作器使用者传入private:AsyncType _looper_type; // 异步日志器类型std::atomic<bool> _stop; // 停止标志位Buffer _pron_buf; // 生产缓冲区Buffer _con_buf; // 消费缓冲区std::mutex _mutex;std::condition_variable _cond_pro; // 生产条件变量std::condition_variable _cond_con; // 消费者条件变量std::thread _thread; // 异步工作器对应的工作线程};
} // namespace logs#endif
这里的_pron_buf 和 _con_buf就是两个缓冲区,当_pron_buf缓冲区充满数据之后,就会和_con_buf进行交换,_pron_buf里面的数据就会到_con_buf里面去,这个时候工作线程就会从_con_buf中拿出数据进行工作。
异步任务处理器的构建是基于以上两个文件的,这个时候回到我们之前定义logger的头文件里面去:
#ifndef __M_LOGGER_H__
#define __M_LOGGER_H__
#include "utils.hpp"
#include "message.hpp"
#include "utils.hpp"
#include "level.hpp"
#include "sink.hpp"
#include <atomic>
#include <mutex>
#include <iostream>
#include <memory>
#include <ctime>
#include <vector>
#include <cassert>
#include <sstream>
#include <stdarg.h>
#include "loop.hpp"namespace logs
{class BaseLogger{public:using ptr = std::shared_ptr<BaseLogger>;BaseLogger(const std::string &logger_name,Loglevel::value limt_level,Formetter::ptr &formetter,std::vector<BaseSink::ptr> &sinks): _logger_name(logger_name), _limt_level(limt_level), _formetter(formetter), _sinks(sinks.begin(), sinks.end()){}const std::string &get_logger_name(){return _logger_name;}/*完成构造日志消息对象过程并进行格式化,得到格式化后的日志消息字符串---然后进行输出*/void debug(const std::string &file, size_t line, const std::string fmt, ...){// 如果限制输出的日志等级比debug高,则直接返回if (Loglevel::value::DEBUG < _limt_level){return;}// 声明参数列表变量va_list ap;va_start(ap, fmt); // 初始化fmt是最后一个固定的参数char *res; // 声明缓冲区int ret = vasprintf(&res, fmt.c_str(), ap);if (ret == -1){std::cout << "vasprintf failed!\n";return;}va_end(ap); // 释放参数列表变量serialize(Loglevel::value::DEBUG, file, line, res);free(res);}void info(const std::string &file, size_t line, const std::string fmt, ...){// 如果限制输出的日志等级比debug高,则直接返回if (Loglevel::value::INFO < _limt_level){return;}// 声明参数列表变量va_list ap;va_start(ap, fmt); // 初始化fmt是最后一个固定的参数char *res; // 声明缓冲区int ret = vasprintf(&res, fmt.c_str(), ap);if (ret == -1){std::cout << "vasprintf failed!\n";return;}va_end(ap); // 释放参数列表变量serialize(Loglevel::value::INFO, file, line, res);free(res);}void warn(const std::string &file, size_t line, const std::string fmt, ...){// 如果限制输出的日志等级比debug高,则直接返回if (Loglevel::value::WARN < _limt_level){return;}// 声明参数列表变量va_list ap;va_start(ap, fmt); // 初始化fmt是最后一个固定的参数char *res; // 声明缓冲区int ret = vasprintf(&res, fmt.c_str(), ap);if (ret == -1){std::cout << "vasprintf failed!\n";return;}va_end(ap); // 释放参数列表变量serialize(Loglevel::value::WARN, file, line, res);free(res);}void error(const std::string &file, size_t line, const std::string &fmt, ...){// 1.通过传入的参数构造一个日志对象,进行日志的格式化,最终落地if (Loglevel::value::ERROR < _limt_level){return;}// 2.对fmt格式化字符串和不定参进行字符串组织,得到日志消息字符串va_list ap;va_start(ap, fmt);char *res;int ret = vasprintf(&res, fmt.c_str(), ap);if (ret == -1){std::cout << "vasprintf failed!\n";return;}va_end(ap);serialize(Loglevel::value::ERROR, file, line, res);free(res);}void fatal(const std::string &file, size_t line, const std::string &fmt, ...){// 1.通过传入的参数构造一个日志对象,进行日志的格式化,最终落地if (Loglevel::value::FATAL < _limt_level){return;}// 2.对fmt格式化字符串和不定参进行字符串组织,得到日志消息字符串va_list ap;va_start(ap, fmt);char *res;int ret = vasprintf(&res, fmt.c_str(), ap);if (ret == -1){std::cout << "vasprintf failed!\n";return;}va_end(ap);serialize(Loglevel::value::FATAL, file, line, res);free(res);}protected:void serialize(Loglevel::value level, const std::string &file_name,size_t line, char *str){// 1.构造msg对象logs::logMsg msg(level, file_name, line, _logger_name, str);// 2.利用Formetter进行消息格式化std::stringstream ss;_formetter->format(ss, msg);// 3.落地方向的输出log(ss.str().c_str(), ss.str().size());}/*抽象接口完成实际的落地输出---不同的日志器会有不同的落地方式*/virtual void log(const char *data, size_t len) = 0;protected:std::mutex _mutex; // 锁std::string _logger_name; // 日志器名称std::atomic<Loglevel::value> _limt_level; // 日志等级Formetter::ptr _formetter; // 格式化消息指针std::vector<BaseSink::ptr> _sinks; // 落地方向};class SyncLogger : public BaseLogger{public:SyncLogger(const std::string &logger_name,Loglevel::value limt_level,Formetter::ptr &formetter,std::vector<BaseSink::ptr> &sinks): BaseLogger(logger_name, limt_level, formetter, sinks) {}protected:void log(const char *data, size_t len){// 1.上锁std::unique_lock<std::mutex> _lock(std::mutex);if (_sinks.empty())return;for (auto &sink : _sinks){sink->log(data, len);}}};enum class loggerType{LOGGER_SYNC,LOGGER_ASYNC};class AsyncLogger : public BaseLogger{public:AsyncLogger(const std::string &logger_name, Loglevel::value level,Formetter::ptr &formatter,std::vector<BaseSink::ptr> &sinks,logs::AsyncLooper::AsyncType looper_type): BaseLogger(logger_name, level, formatter, sinks), _looper(std::make_shared<AsyncLooper>(std::bind(&AsyncLogger::realLog, this, std::placeholders::_1), looper_type)){}void log(const char *data, size_t len){_looper->push(data,len);}void realLog(Buffer& buffer){if (_sinks.empty())return;for (auto &sink : _sinks){sink->log(buffer.begin(), buffer.readAbleSize());}}private:AsyncLooper::ptr _looper;};
}#endif
我们可以来测试一下:
#include "utils.hpp"
#include "level.hpp"
#include "message.hpp"
#include "fometter.hpp"
#include "sink.hpp"
#include "logger.hpp"int main()
{logs::Formetter formatter("abc[%d{%H:%M:%S}][%c]%T%m%n");logs::Formetter::ptr fmt_ptr = std::make_shared<logs::Formetter>(formatter);auto st1 = logs::SinkFactory::create<logs::StdoutSink>();std::vector<logs::BaseSink::ptr> sinks = {st1};std::string logger_name = "asynclogger";logs::BaseLogger::ptr logger(new logs::AsyncLogger(logger_name, logs::Loglevel::value::DEBUG, fmt_ptr, sinks,logs::AsyncLooper::AsyncType::ASYNC_SAFE));logger->debug("main.cc", 53, "%s","格式化功能测试....");// 5. 确保异步日志线程有足够时间处理std::this_thread::sleep_for(std::chrono::milliseconds(100)); }
总结
loop.hpp中与线程有关的操作
在这段代码中,与线程相关的操作和函数主要包括以下几个方面:
1. 线程的创建与管理
-
线程的创建:
- 在
AsyncLooper
构造函数中,通过std::thread
创建一个新的线程_thread
,并将其绑定到成员函数threadEntry
。
这个线程是异步工作器的核心,负责处理缓冲区中的数据。_thread(std::thread(&AsyncLooper::threadEntry, this))
- 在
-
线程的销毁:
- 在析构函数
~AsyncLooper()
中调用stop()
函数,确保线程安全退出。~AsyncLooper() {stop(); }
- 在析构函数
-
线程的停止:
stop()
函数用于优雅地停止线程:- 设置
_stop
标志位为true
,通知线程准备退出。 - 调用
_cond_con.notify_all()
唤醒所有等待的线程。 - 使用
_thread.join()
等待线程执行完毕后回收资源。void stop() {_stop = true;_cond_con.notify_all(); // 唤醒所有工作线程_thread.join(); }
- 设置
2. 线程同步与通信
为了确保线程之间的安全协作,使用了互斥锁(std::mutex
)和条件变量(std::condition_variable
)。
-
互斥锁 (
std::mutex
):_mutex
用于保护共享资源(如生产缓冲区_pron_buf
和消费缓冲区_con_buf
),防止多个线程同时访问导致数据竞争。- 在
push()
和threadEntry()
中使用std::unique_lock<std::mutex>
对_mutex
加锁。
-
条件变量 (
std::condition_variable
):-
生产者条件变量
_cond_pro
:- 用于控制生产者的阻塞和唤醒。
- 在
push()
中,当缓冲区空间不足时,生产者会调用_cond_pro.wait()
阻塞,直到消费者释放空间。_cond_pro.wait(lock, [&]() { return _pron_buf.writeAbleSize() >= len; });
- 消费者在处理完数据后,调用
_cond_pro.notify_all()
唤醒所有等待的生产者。_cond_pro.notify_all();
-
消费者条件变量
_cond_con
:- 用于控制消费者的阻塞和唤醒。
- 在
threadEntry()
中,当生产缓冲区为空时,消费者会调用_cond_con.wait()
阻塞,直到有新数据可用。_cond_con.wait(lock, [&]() { return _stop || !_pron_buf.empty(); });
- 生产者在向缓冲区添加数据后,调用
_cond_con.notify_one()
唤醒一个消费者。_cond_con.notify_one();
-
3. 线程入口函数
threadEntry()
:- 这是线程的主要执行逻辑,负责从生产缓冲区
_pron_buf
中读取数据并处理。 - 主要流程:
- 判断生产缓冲区是否有数据。如果有,则交换生产缓冲区和消费缓冲区;如果没有,则阻塞等待。
- 如果退出标志
_stop
被设置且生产缓冲区为空,则线程退出。 - 调用回调函数
_callback(_con_buf)
处理消费缓冲区中的数据。 - 初始化消费缓冲区
_con_buf
,以便下一次使用。
void threadEntry() {while (1){std::unique_lock<std::mutex> lock(_mutex);_cond_con.wait(lock, [&]() { return _stop || !_pron_buf.empty(); });if (_stop && _pron_buf.empty()){break;}_con_buf.swap(_pron_buf);_cond_pro.notify_all();}_callback(_con_buf);_con_buf.reset(); }
- 这是线程的主要执行逻辑,负责从生产缓冲区
4. 数据生产与消费
-
生产数据:
push(const char *data, size_t len)
是生产者接口,用于向生产缓冲区_pron_buf
添加数据。- 在安全模式下(
ASYNC_SAFE
),如果缓冲区空间不足,生产者会阻塞等待。if (_looper_type == AsyncType::ASYNC_SAFE)_cond_pro.wait(lock, [&]() { return _pron_buf.writeAbleSize() >= len; });
-
消费数据:
- 消费者线程通过
threadEntry()
不断从生产缓冲区中获取数据,并调用用户提供的回调函数_callback
进行处理。
- 消费者线程通过
5. 线程安全的设计
-
原子变量
_stop
:- 使用
std::atomic<bool>
类型的_stop
标志位,确保多线程环境下的安全读写。std::atomic<bool> _stop;
- 使用
-
双重检查机制:
- 在
threadEntry()
中,使用_stop && _pron_buf.empty()
来判断是否需要退出线程,避免误退出或死锁。
- 在
总结
这段代码通过多线程技术实现了生产者-消费者模型,核心线程相关的操作包括:
- 线程的创建、管理和销毁。
- 使用互斥锁和条件变量实现线程同步与通信。
- 提供生产者接口
push()
和消费者逻辑threadEntry()
。 - 通过原子变量和双重检查机制确保线程安全。
这种设计适用于异步日志记录等场景,能够高效地处理大量并发数据。
placeholders::_1
下面是一个简单的例子来展示 std::bind
和 std::placeholders::_1
的使用方法。这个例子定义了一个类 Calculator
,其中包含一个成员函数 add
,该函数接受两个参数:一个整数和另一个整数引用。我们将通过 std::bind
将这个成员函数绑定到一个对象实例,并使用占位符 _1
来预留第二个参数的位置。
示例代码
#include <iostream>
#include <functional> // 包含 std::bind 和 std::placeholdersclass Calculator {
public:// 成员函数 add 接受一个整数 n 和一个整数引用 mvoid add(int n, int& m) {m += n;std::cout << "After adding " << n << ", m = " << m << std::endl;}
};int main() {Calculator calc; // 创建 Calculator 类的实例int value = 10;// 使用 std::bind 绑定成员函数 add 到 calc 实例,并预留第二个参数的位置auto boundFunc = std::bind(&Calculator::add, &calc, 5, std::placeholders::_1);// 调用绑定后的函数对象,传递 value 变量作为第二个参数boundFunc(value);return 0;
}
说明
-
Calculator::add
:- 这个成员函数接收两个参数:一个整数
n
和一个整数引用m
。它将n
加到m
上,并输出结果。
- 这个成员函数接收两个参数:一个整数
-
std::bind
:- 我们使用
std::bind
函数将Calculator::add
成员函数绑定到calc
对象实例上,并固定第一个参数为5
(即每次调用都会加上5
),同时使用std::placeholders::_1
占位符表示第二个参数将在实际调用时传入。
- 我们使用
-
boundFunc
:boundFunc
是一个可调用对象,当调用它并传入一个整数引用时,实际上会调用calc.add(5, /*传入的值*/)
。
-
boundFunc(value);
:- 在这里,我们调用了
boundFunc
并传入了value
变量作为第二个参数。这将导致Calculator::add
函数被调用,执行value += 5
操作,并打印出更新后的value
值。
- 在这里,我们调用了
输出结果
当你运行上述代码时,程序将输出:
After adding 5, m = 15
这表明原始的 value
变量从 10
变为了 15
,因为 5
被加到了 value
上。通过这种方式,我们可以看到如何使用 std::bind
和 std::placeholders::_1
来创建一个绑定特定参数的函数对象,并在调用时动态地提供剩余的参数。
放到代码中:
具体作用
在这段代码中:
_looper(std::make_shared<AsyncLooper>(std::bind(&AsyncLogger::realLog, this, std::placeholders::_1), looper_type))
std::bind
用于将成员函数AsyncLogger::realLog
绑定到一个可调用对象。this
表示当前对象(即AsyncLogger
的实例),作为成员函数的第一个隐式参数。std::placeholders::_1
是一个占位符,表示这个绑定的函数对象在调用时需要接收一个参数,并将该参数传递给AsyncLogger::realLog
的第一个显式参数。
换句话说,std::placeholders::_1
在这里的作用是预留一个位置,使得 AsyncLooper
的回调函数可以在运行时动态地传入一个参数(比如 Buffer &
类型的数据)。
详细解释
假设 AsyncLogger::realLog
的定义如下:
void AsyncLogger::realLog(Buffer &buffer);
通过 std::bind
和 std::placeholders::_1
,我们创建了一个函数对象,其行为等价于:
void callback(Buffer &buffer) {this->realLog(buffer);
}
这个函数对象会被传递给 AsyncLooper
的构造函数,作为 _callback
回调函数。当 AsyncLooper
调用 _callback
时,它会将一个 Buffer
对象作为参数传递给 realLog
。
为什么需要 std::placeholders::_1
?
-
适配函数签名:
AsyncLooper
的构造函数期望一个std::function<void(Buffer &)>
类型的回调函数。AsyncLogger::realLog
是一个成员函数,需要绑定到某个对象(this
)才能调用。- 使用
std::placeholders::_1
可以确保绑定后的函数对象能够接受一个参数,并将其正确传递给realLog
。
-
灵活性:
- 占位符允许我们灵活地控制参数的传递顺序和数量。例如,如果有多个参数,可以使用
_1
,_2
,_3
等分别表示不同的参数位置。
- 占位符允许我们灵活地控制参数的传递顺序和数量。例如,如果有多个参数,可以使用
示例对比
不使用 std::bind
和占位符:
如果直接传递成员函数指针,编译器会报错,因为成员函数需要绑定到一个对象:
_looper(std::make_shared<AsyncLooper>(&AsyncLogger::realLog, looper_type));
// 错误:无法直接传递成员函数指针
使用 std::bind
和占位符:
通过 std::bind
,我们可以将成员函数绑定到当前对象,并预留参数位置:
_looper(std::make_shared<AsyncLooper>(std::bind(&AsyncLogger::realLog, this, std::placeholders::_1), looper_type));
// 正确:绑定后的函数对象符合 std::function<void(Buffer &)> 的要求
总结
std::placeholders::_1
的作用是为绑定的函数对象预留一个参数位置,使得 AsyncLogger::realLog
可以在运行时接受一个参数(如 Buffer &
)。这种机制让 std::bind
更加灵活,能够适配不同函数签名的需求。