文章目录
- 介绍
- 实现细节
- **BytesBuffer 类**
- **生产者和消费者**
- 性能优化
- 总结
介绍
本文介绍如何使用C++实现一个多线程安全的简单字节缓冲区,并进行性能测试。
实现细节
BytesBuffer 类
BytesBuffer
类通过使用条件变量和双缓冲区机制,实现了生产者和消费者之间的高效数据传输。该类的主要功能包括数据追加(append)和数据检索(retrieve)。下面是具体实现:
#ifndef BYTE_BUFFER_BUFFER_H_
#define BYTE_BUFFER_BUFFER_H_#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <cstring>
#include <mutex>
#include <vector>class BytesBuffer {public:static const size_t kInitialSize = 1024;explicit BytesBuffer(size_t capacity = kInitialSize): capacity_(capacity),buffer1_(capacity),buffer2_(capacity),current_buffer_(&buffer1_),temp_buffer_(&buffer2_),stop_(false) {}void append(const char* data, size_t length) {std::unique_lock<std::mutex> lock(mutex_);for (size_t i = 0; i < length; ++i) {while (!stop_ && current_buffer_->size() >= capacity_) {not_full_.wait(lock); // 等待足够空间写入}if (stop_) return;current_buffer_->push_back(data[i]);not_empty_.notify_one();}}size_t retrieve(char* target, size_t maxLength) {std::unique_lock<std::mutex> lock(mutex_);while (!stop_ && current_buffer_->empty()) {not_empty_.wait(lock); // 等待有数据可读}if (stop_) return 0;std::swap(current_buffer_, temp_buffer_);lock.unlock();size_t bytes_to_read = std::min(maxLength, temp_buffer_->size());std::memcpy(target, temp_buffer_->data(), bytes_to_read);temp_buffer_->clear();lock.lock();not_full_.notify_one();return bytes_to_read;}void stop() {std::unique_lock<std::mutex> lock(mutex_);stop_ = true;not_empty_.notify_all();not_full_.notify_all();}bool isStopped() const {return stop_.load();}private:size_t capacity_;std::vector<char> buffer1_;std::vector<char> buffer2_;std::vector<char>* current_buffer_;std::vector<char>* temp_buffer_;std::mutex mutex_;std::condition_variable not_empty_;std::condition_variable not_full_;std::atomic<bool> stop_;BytesBuffer(const BytesBuffer&) = delete;BytesBuffer& operator=(const BytesBuffer&) = delete;
};#endif // BYTE_BUFFER_BUFFER_H_
生产者和消费者
生产者和消费者函数分别向缓冲区追加数据和从缓冲区检索数据。
#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>
#include "bytes_buffer.hpp"void Producer(BytesBuffer* buffer, std::atomic<bool>& running) {const char data[] = "测试数据";while (running.load()) {buffer->append(data, std::strlen(data));}
}void Consumer(BytesBuffer* buffer, std::atomic<bool>& running) {char output[1024];while (running.load()) {buffer->retrieve(output, sizeof(output));}
}int main() {BytesBuffer buffer(1024);std::atomic<bool> running(true);std::thread producer_thread(Producer, &buffer, std::ref(running));std::thread consumer_thread(Consumer, &buffer, std::ref(running));// 运行测试5秒钟std::this_thread::sleep_for(std::chrono::seconds(5));// 信号线程停止并加入它们running.store(false);buffer.stop();producer_thread.join();consumer_thread.join();std::cout << "测试完成" << std::endl;return 0;
}
性能优化
- 减少锁的持有时间:在
retrieve
方法中,交换当前缓冲区和临时缓冲区后立即解锁,然后进行数据复制操作,这样减少了锁的持有时间。 - 使用双缓冲区:使用两个缓冲区
buffer1_
和buffer2_
,通过交换缓冲区实现双缓冲机制,减少锁竞争。生产者线程向current_buffer_
写入数据,而消费者线程从temp_buffer_
读取数据。
总结
通过实现和优化 BytesBuffer
类,我们可以方便地管理数据流,并确保在多线程环境下的安全性。