目录
- 1 概述
- 2 实现
- 3 测试
- 4 运行
1 概述
最近研究了C++11的并发编程的线程/互斥/锁/条件变量,利用互斥/锁/条件变量实现一个支持多线程并发的环形队列,队列大小通过模板参数传递。
环形队列是一个模板类,有两个模块参数,参数1是元素类型,参数2是队列大小,默认是10。入队操作如果队列满阻塞,出队操作如果队列为空则阻塞。
其类图为:
2 实现
#ifndef RING_QUEUE_H
#define RING_QUEUE_H
#include <mutex>
#include <condition_variable>
template<typename T, std::size_t N = 10>
class ring_queue
{
public:typedef T value_type;typedef std::size_t size_type;typedef std::size_t pos_type;typedef typename std::unique_lock<std::mutex> lock_type;ring_queue() { static_assert(N != 0); }ring_queue(ring_queue const&) = delete;ring_queue(ring_queue&& ) = delete;ring_queue& operator = (ring_queue const&) = delete;ring_queue& operator = (ring_queue &&) = delete;size_type spaces() const { return N; }bool empty() const{lock_type lock(mutex_);return read_pos_ == write_pos_;}size_type size() const{lock_type lock(mutex_);return N - space_size_;}void push(value_type const& value){{lock_type lock(mutex_);while(!space_size_)write_cv_.wait(lock);queue_[write_pos_] = value;--space_size_;write_pos_ = next_pos(write_pos_);}read_cv_.notify_one();}void push(value_type && value){{lock_type lock(mutex_);while(!space_size_)write_cv_.wait(lock);queue_[write_pos_] = std::move(value);--space_size_;write_pos_ = next_pos(write_pos_);}read_cv_.notify_one();}value_type pop(){value_type value;{lock_type lock(mutex_);while(N == space_size_)read_cv_.wait(lock);value = std::move(queue_[read_pos_]);++space_size_;read_pos_ = next_pos(read_pos_);}write_cv_.notify_one();return value;}private:pos_type next_pos(pos_type pos) { return (pos + 1) % N; }
private:value_type queue_[N];pos_type read_pos_ = 0;pos_type write_pos_ = 0;size_type space_size_ = N;std::mutex mutex_;std::condition_variable write_cv_;std::condition_variable read_cv_;
};
#endif
说明:
- 实现利用了一个固定大小数组/一个读位置/一个写位置/互斥/写条件变量/读条件变量/空间大小变量。
- 两个入队接口:
- push(T const&) 左值入队
- push(T &&) 左值入队
- 一个出队接口
- pop()
3 测试
基于cpptest的测试代码如下:
struct Function4RingQueue
{ring_queue<std::string, 2> queue;std::mutex mutex;int counter = 0;void consume1(size_t n){std::cerr << "\n";for(size_t i = 0; i < n; ++i){std::cerr << "I get a " << queue.pop() << std::endl;counter++;}}void consume2(size_t id){std::string fruit = queue.pop();{std::unique_lock<std::mutex> lock(mutex);std::cerr << "\nI get a " << fruit << " in thread(" << id << ")" << std::endl;counter++;}}void product1(std::vector<std::string> & fruits){for(auto const& fruit: fruits)queue.push(fruit + std::string(" pie"));}void product2(std::vector<std::string> & fruits){for(auto const& fruit: fruits)queue.push(fruit);}
};
void RingQueueSuite::one_to_one()
{Function4RingQueue function;std::vector<std::string> fruits{"Apple", "Banana", "Pear", "Plum", "Pineapple"};std::thread threads[2];threads[0] = std::thread(&Function4RingQueue::product1, std::ref(function), std::ref(fruits));threads[1] = std::thread(&Function4RingQueue::consume1, std::ref(function), fruits.size());for(auto &thread : threads)thread.join();TEST_ASSERT_EQUALS(fruits.size(), function.counter)function.counter = 0;threads[0] = std::thread(&Function4RingQueue::product2, std::ref(function), std::ref(fruits));threads[1] = std::thread(&Function4RingQueue::consume1, std::ref(function), fruits.size());for(auto &thread : threads)thread.join();TEST_ASSERT_EQUALS(fruits.size(), function.counter)
}void RingQueueSuite::one_to_multi()
{Function4RingQueue function;std::vector<std::string> fruits{"Apple", "Banana", "Pear", "Plum", "Pineapple"};std::thread product;std::vector<std::thread> consumes(fruits.size());for(size_t i = 0; i < consumes.size(); ++i)consumes[i] = std::thread(&Function4RingQueue::consume2, std::ref(function), i);product = std::thread(&Function4RingQueue::product1, std::ref(function), std::ref(fruits));product.join();for(auto &thread : consumes)thread.join();TEST_ASSERT_EQUALS(fruits.size(), function.counter)function.counter = 0;for(size_t i = 0; i < consumes.size(); ++i)consumes[i] = std::thread(&Function4RingQueue::consume2, std::ref(function), i);product = std::thread(&Function4RingQueue::product2, std::ref(function), std::ref(fruits));product.join();for(auto &thread : consumes)thread.join();TEST_ASSERT_EQUALS(fruits.size(), function.counter)
}
- 函数one_to_one测试一个生成者对应一个消费者。
- 函数one_to_multi测试一个生产者对应多个消费者。
4 运行
RingQueueSuite: 0/2
I get a Apple pie
I get a Banana pie
I get a Pear pie
I get a Plum pie
I get a Pineapple pieI get a Apple
I get a Banana
I get a Pear
I get a Plum
I get a Pineapple
RingQueueSuite: 1/2
I get a Apple pie in thread(1)I get a Banana pie in thread(0)I get a Pear pie in thread(2)I get a Plum pie in thread(4)I get a Pineapple pie in thread(3)I get a Apple in thread(0)I get a Banana in thread(1)I get a Plum in thread(3)I get a Pear in thread(2)I get a Pineapple in thread(4)
RingQueueSuite: 2/2, 100% correct in 0.007452 seconds
Total: 2 tests, 100% correct in 0.007452 seconds
分析:
- 从结果看入队顺序和出队顺序是一致的。