文章目录
- 1. 引言
- 2. SPSC 环形缓冲区设计思想
- 3. 缓冲区类定义
- 4. 追加数据
- 5. 读取数据
- 6. 完整代码
- 7. 基准测试
- 7.1. 测试代码
- 8. 执行结果
1. 引言
本文将介绍如何实现无锁字节缓冲区(LockFreeBytesBuffer),并通过Google Benchmark对其性能进行测试。该缓冲区设计为单生产者单消费者(SPSC)模型,以简化线程安全问题。
2. SPSC 环形缓冲区设计思想
在环形缓冲区的上下文中,实现零拷贝的方法之一是通过直接操作缓冲区的指针,而不是复制数据。这种方式需要修改接口,使得消费者直接访问缓冲区的数据,而不是通过中间变量进行复制。
在设计LockFreeBytesBuffer时遵循了以下原则:
- 线程安全:使用原子操作确保数据一致性。
- 内存效率:使用固定大小的缓冲区,避免动态内存分配。
LockFreeBytesBuffer使用一个固定大小的环形缓冲区来存储数据,并通过原子操作管理读写索引。生产者线程将数据追加到缓冲区,消费者线程从缓冲区读取数据。
为了确保单生产者单消费者模型下的线程安全,我们需要确保:
- 生产者和消费者线程不会同时写入或读取同一块内存。
- 使用原子操作来更新索引,以确保读写操作的正确性和可见性。
3. 缓冲区类定义
下面是LockFreeBytesBuffer类的定义:
class LockFreeBytesBuffer {public:static const std::size_t kBufferSize = 1024U; // 缓冲区大小LockFreeBytesBuffer() noexcept : reader_index_(0U), writer_index_(0U) {std::memset(buffer_, 0, kBufferSize);}bool Append(const char* data, std::size_t length) noexcept;std::size_t BeginRead(const char** target) noexcept;void EndRead(std::size_t length) noexcept;private:char buffer_[kBufferSize];std::atomic<std::size_t> reader_index_;std::atomic<std::size_t> writer_index_;
};
4. 追加数据
生产者线程调用Append
方法将数据写入缓冲区。该方法首先检查缓冲区是否有足够的空间存放新数据。如果有足够的空间,它将数据写入缓冲区,并更新写索引。
bool LockFreeBytesBuffer::Append(const char* data, std::size_t length) noexcept {const std::size_t current_write_index = writer_index_.load(std::memory_order_relaxed);const std::size_t current_read_index = reader_index_.load(std::memory_order_acquire);const std::size_t free_space = (current_read_index + kBufferSize - current_write_index - 1U) % kBufferSize;if (length > free_space) {return false; // 缓冲区满}const std::size_t pos = current_write_index % kBufferSize;const std::size_t first_part = std::min(length, kBufferSize - pos);std::memcpy(&buffer_[pos], data, first_part);std::memcpy(&buffer_[0], data + first_part, length - first_part);writer_index_.store(current_write_index + length, std::memory_order_release);return true;
}
5. 读取数据
消费者线程调用BeginRead
方法获取缓冲区中可用的数据。该方法返回一个指向缓冲区中数据的指针,并更新读索引。消费者处理完数据后,调用EndRead
方法标记数据已读。
std::size_t LockFreeBytesBuffer::BeginRead(const char** target) noexcept {const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed);const std::size_t current_write_index = writer_index_.load(std::memory_order_acquire);const std::size_t available_data = (current_write_index - current_read_index) % kBufferSize;if (available_data == 0U) {return 0U; // 缓冲区空}const std::size_t pos = current_read_index % kBufferSize;*target = &buffer_[pos];return std::min(available_data, kBufferSize - pos); // 返回第一段数据的大小
}void LockFreeBytesBuffer::EndRead(std::size_t length) noexcept {const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed);reader_index_.store(current_read_index + length, std::memory_order_release);
}
6. 完整代码
以下是完整的LockFreeBytesBuffer
实现:
#ifndef EMBEDDED_BYTE_BUFFER_H_
#define EMBEDDED_BYTE_BUFFER_H_#include <algorithm>
#include <atomic>
#include <cstddef>
#include <cstring>class LockFreeBytesBuffer {public:static const std::size_t kBufferSize = 1024U; // 缓冲区大小LockFreeBytesBuffer() noexcept : reader_index_(0U), writer_index_(0U) {std::memset(buffer_, 0, kBufferSize);}// 将数据追加到缓冲区bool Append(const char* data, std::size_t length) noexcept;// 获取指向缓冲区的读指针std::size_t BeginRead(const char** target) noexcept;// 标记数据已读void EndRead(std::size_t length) noexcept;private:char buffer_[kBufferSize];std::atomic<std::size_t> reader_index_;std::atomic<std::size_t> writer_index_;
};bool LockFreeBytesBuffer::Append(const char* data, std::size_t length) noexcept {const std::size_t current_write_index = writer_index_.load(std::memory_order_relaxed);const std::size_t current_read_index = reader_index_.load(std::memory_order_acquire);const std::size_t free_space = (current_read_index + kBufferSize - current_write_index - 1U) % kBufferSize;if (length > free_space) {return false; // 缓冲区满}const std::size_t pos = current_write_index % kBufferSize;const std::size_t first_part = std::min(length, kBufferSize - pos);std::memcpy(&buffer_[pos], data, first_part);std::memcpy(&buffer_[0], data + first_part, length - first_part);writer_index_.store(current_write_index + length, std::memory_order_release);return true;
}std::size_t LockFreeBytesBuffer::BeginRead(const char** target) noexcept {const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed);const std::size_t current_write_index = writer_index_.load(std::memory_order_acquire);const std::size_t available_data = (current_write_index - current_read_index) % kBufferSize;if (available_data == 0U) {return 0U; // 缓冲区空}const std::size_t pos = current_read_index % kBufferSize;*target = &buffer_[pos];return std::min(available_data, kBufferSize - pos); // 返回第一段数据的大小
}void LockFreeBytesBuffer::EndRead(std::size_t length) noexcept {const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed);reader_index_.store(current_read_index + length, std::memory_order_release);
}#endif // EMBEDDED_BYTE_BUFFER_H_
7. 基准测试
为了评估LockFreeBytesBuffer的性能,我们使用Google Benchmark库编写了基准测试程序。该程序创建生产者和消费者线程,并在基准测试循环中进行数据读写操作。
7.1. 测试代码
#include <benchmark/benchmark.h>#include <cstring>
#include <thread>#include "LockFreeBytesBuffer.h"void Producer(LockFreeBytesBuffer* buffer, bool* stop) noexcept {const char data[] = "test data";while (!*stop) {buffer->Append(data, std::strlen(data));// 模拟生产数据的延迟std::this_thread::sleep_for(std::chrono::microseconds(10));}
}void Consumer(LockFreeBytesBuffer* buffer, bool* stop) noexcept {const char* data = nullptr;while (!*stop) {const std::size_t length = buffer->BeginRead(&data);if (length > 0U) {// 处理数据buffer->EndRead(length);// 模拟处理数据的延迟std::this_thread::sleep_for(std::chrono::microseconds(10));}}
}// Benchmark函数
static void BM_Buffer(benchmark::State& state) {LockFreeBytesBuffer buffer;bool stop = false;std::thread producer_thread(Producer, &buffer, &stop);std::thread consumer_thread(Consumer, &buffer, &stop);for (auto _ : state) {// 在这里进行基准测试const char data[] = "benchmark data";buffer.Append(data, std::strlen(data));const char* target = nullptr;const std::size_t length = buffer.BeginRead(&target);buffer.EndRead(length);}stop = true; // 停止生产者和消费者线程producer_thread.join();consumer_thread.join();
}// 注册benchmark
BENCHMARK(BM_Buffer)->Threads(1);int main(int argc, char** argv) {::benchmark::Initialize(&argc, argv);::benchmark::RunSpecifiedBenchmarks();return 0;
}
8. 执行结果
安装Google Benchmark:如果还没有安装Google Benchmark,可以按照Google Benchmark GitHub上的说明进行安装。
编译: g++ -O2 buffer_benchmark.cpp -lbenchmark -lpthread -o buffer_benchmark
运行基准测试程序:
$ ./buffer_benchmark
2024-06-23T14:47:30+00:00
Running ./buffer_benchmark
Run on (2 X 2096.07 MHz CPU s)
CPU Caches:L1 Data 32 KiB (x2)L1 Instruction 64 KiB (x2)L2 Unified 512 KiB (x2)L3 Unified 4096 KiB (x2)
Load Average: 0.00, 0.00, 0.00
***WARNING*** Library was built as DEBUG. Timings may be affected.
--------------------------------------------------------------
Benchmark Time CPU Iterations
--------------------------------------------------------------
BM_Buffer/threads:1 10.8 ns 10.8 ns 64004334