目录
- 1 概述
- 2 实现
- 3 测试
- 3 运行
1 概述
最近研究了C++11的并发编程的线程/互斥/锁/条件变量,利用互斥/锁/条件变量实现一个支持多线程并发的阻塞队列,队列大小没有限制。
阻塞队列是一个模板类,有两个模块参数,参数1是元素类型,参数2是容器类型,可以是std::deque和std::list,默认是std::deque。入队操作没有阻塞,出队操作如果队列为空则阻塞。
其类图为:
2 实现
实现代码如下:
#ifndef BLOCK_QUEUE_H
#define BLOCK_QUEUE_H
#include <deque>
#include <mutex>
#include <condition_variable>template<typename T, typename Sequence = std::deque<T>>
class block_queue
{Sequence queue_;std::mutex mutex_;std::condition_variable cv_;
public:typedef typename Sequence::value_type value_type;typedef typename Sequence::size_type size_type;typedef typename std::unique_lock<std::mutex> lock_type;block_queue() = default;block_queue(block_queue const&) = delete;block_queue(block_queue&& ) = delete;block_queue& operator = (block_queue const&) = delete;block_queue& operator = (block_queue &&) = delete;bool empty() const{lock_type lock(mutex_);return queue_.empty();}size_type size() const{lock_type lock(mutex_);return queue_.size();}void push(value_type const& value){{lock_type lock(mutex_);queue_.push_back(value);}cv_.notify_one();}void push(value_type && value){{lock_type lock(mutex_);queue_.push_back(std::move(value));}cv_.notify_one();}template<class... Args>void emplace(Args&&... args){{lock_type lock(mutex_);queue_.emplace_back(args...);}cv_.notify_one();}value_type pop(){lock_type lock(mutex_);while(queue_.empty())cv_.wait(lock);value_type value = std::move(queue_.front());queue_.pop_front();return value;}
};
#endif
说明:
- 三个入队接口:
- push(T const&) 左值入队
- push(T &&) 左值入队
- emplace() 构造参数入队
- 一个出队接口
- pop()
3 测试
基于cpptest的测试代码如下:
template<typename Sequence>
struct Function4BQ
{block_queue<std::string, Sequence> queue;std::mutex mutex;int counter = 0;void consume1(size_t n){std::cerr << "\n";for(size_t i = 0; i < n; ++i){std::cerr << "I get a " << queue.pop() << std::endl;counter++;}}void consume2(size_t id){std::string fruit = queue.pop();{std::unique_lock<std::mutex> lock(mutex);std::cerr << "\nI get a " << fruit << " in thread(" << id << ")" << std::endl;counter++;}}void product1(std::vector<std::string> & fruits){for(auto const& fruit: fruits)queue.emplace(fruit + std::string(" pie"));}void product2(std::vector<std::string> & fruits){for(auto const& fruit: fruits)queue.push(fruit + std::string(" pie"));}void product3(std::vector<std::string> & fruits){for(auto const& fruit: fruits)queue.push(fruit);}
};
typedef Function4BQ<std::deque<std::string>> Function4BqDeque;
typedef Function4BQ<std::list<std::string>> Function4BqList;
void BlockQueueSuite::one_to_one()
{Function4BqDeque function;std::vector<std::string> fruits{"Apple", "Banana", "Pear", "Plum", "Pineapple"};std::thread threads[2];threads[0] = std::thread(&Function4BqDeque::product1, std::ref(function), std::ref(fruits));threads[1] = std::thread(&Function4BqDeque::consume1, std::ref(function), fruits.size());for(auto &thread : threads)thread.join();TEST_ASSERT_EQUALS(fruits.size(), function.counter)function.counter = 0;threads[0] = std::thread(&Function4BqDeque::product2, std::ref(function), std::ref(fruits));threads[1] = std::thread(&Function4BqDeque::consume1, std::ref(function), fruits.size());for(auto &thread : threads)thread.join();TEST_ASSERT_EQUALS(fruits.size(), function.counter)function.counter = 0;threads[0] = std::thread(&Function4BqDeque::product3, std::ref(function), std::ref(fruits));threads[1] = std::thread(&Function4BqDeque::consume1, std::ref(function), fruits.size());for(auto &thread : threads)thread.join();TEST_ASSERT_EQUALS(fruits.size(), function.counter)}void BlockQueueSuite::one_to_multi()
{Function4BqList function;std::vector<std::string> fruits{"Apple", "Banana", "Pear", "Plum", "Pineapple"};std::thread product;std::vector<std::thread> consumes(fruits.size());for(size_t i = 0; i < consumes.size(); ++i)consumes[i] = std::thread(&Function4BqList::consume2, std::ref(function), i);product = std::thread(&Function4BqList::product1, std::ref(function), std::ref(fruits));product.join();for(auto &thread : consumes)thread.join();TEST_ASSERT_EQUALS(fruits.size(), function.counter)function.counter = 0;for(size_t i = 0; i < consumes.size(); ++i)consumes[i] = std::thread(&Function4BqList::consume2, std::ref(function), i);product = std::thread(&Function4BqList::product2, std::ref(function), std::ref(fruits));product.join();for(auto &thread : consumes)thread.join();TEST_ASSERT_EQUALS(fruits.size(), function.counter)function.counter = 0;for(size_t i = 0; i < consumes.size(); ++i)consumes[i] = std::thread(&Function4BqList::consume2, std::ref(function), i);product = std::thread(&Function4BqList::product3, std::ref(function), std::ref(fruits));product.join();for(auto &thread : consumes)thread.join();TEST_ASSERT_EQUALS(fruits.size(), function.counter)
}
说明:
- 函数one_to_one测试一个生成者对应一个消费者(容器类型使用std::deque)。
- 函数one_to_multi测试一个生产者对应多个消费者(容器类型使用std::list)
3 运行
BlockQueueSuite: 0/2
I get a Apple pie
I get a Banana pie
I get a Pear pie
I get a Plum pie
I get a Pineapple pieI get a Apple pie
I get a Banana pie
I get a Pear pie
I get a Plum pie
I get a Pineapple pieI get a Apple
I get a Banana
I get a Pear
I get a Plum
I get a Pineapple
BlockQueueSuite: 1/2
I get a Apple pie in thread(0)I get a Banana pie in thread(1)I get a Pear pie in thread(2)I get a Plum pie in thread(3)I get a Pineapple pie in thread(4)I get a Apple pie in thread(0)I get a Banana pie in thread(1)I get a Pear pie in thread(2)I get a Plum pie in thread(3)I get a Pineapple pie in thread(4)I get a Apple in thread(0)I get a Banana in thread(1)I get a Pear in thread(2)I get a Plum in thread(4)I get a Pineapple in thread(3)
BlockQueueSuite: 2/2, 100% correct in 0.009150 seconds
Total: 2 tests, 100% correct in 0.009150 seconds
分析:
- 从结果看入队顺序和出队顺序是一致的。