Linux生产者消费者模型
- Linux生产者消费者模型详解
- 生产者消费者模型
- 生产者消费者模型的概念
- 生产者消费者模型的特点
- 生产者消费者模型优点
- 基于BlockingQueue的生产者消费者模型
- 基于阻塞队列的生产者消费者模型
- 模拟实现基于阻塞队列的生产消费模型
- 基础实现
- 生产者消费者步调调整
- 条件唤醒优化
- 基于计算任务的扩展
- 总结
Linux生产者消费者模型详解
生产者消费者模型
生产者消费者模型的概念
生产者消费者模型通过一个容器解决生产者与消费者的强耦合问题。
- 通信方式:生产者不直接与消费者交互,而是将数据放入容器;消费者从容器取数据。
- 容器作用:缓冲区,解耦生产者与消费者,平衡双方处理能力。
生产者消费者模型的特点
生产者消费者模型是多线程同步与互斥的经典场景,具有以下特点:
- 三种关系:
- 生产者与生产者:互斥(竞争容器访问)。
- 消费者与消费者:互斥(竞争容器访问)。
- 生产者与消费者:互斥(共享容器)+同步(生产消费顺序)。
- 两种角色:生产者与消费者(线程或进程)。
- 一个交易场所:内存缓冲区(如队列)。
互斥原因:容器是临界资源,需用互斥锁保护,多线程竞争访问。
同步原因:
- 容器满时,生产者需等待,避免生产失败。
- 容器空时,消费者需等待,避免消费失败。
- 同步确保有序访问,防止饥饿,提高效率。
注意:互斥保证数据正确性,同步实现线程协作。
生产者消费者模型优点
- 解耦:生产者与消费者独立运行,通过容器间接交互。
- 支持并发:生产者生产时,消费者可同时消费。
- 支持忙闲不均:容器缓冲数据,平衡处理速度差异。
对比函数调用(紧耦合),生产者消费者模型是松耦合设计,生产者无需等待消费者处理。
基于BlockingQueue的生产者消费者模型
基于阻塞队列的生产者消费者模型
在多线程编程中,**阻塞队列(Blocking Queue)**是实现生产者消费者模型的常用数据结构。
- 与普通队列的区别:
- 队列空时,取元素操作阻塞,直到有数据。
- 队列满时,放元素操作阻塞,直到有空间。
- 应用场景:类似管道通信。
模拟实现基于阻塞队列的生产消费模型
基础实现
以单生产者、单消费者为例,使用C++ queue
实现阻塞队列:
BlockQueue.hpp:
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>#define NUM 5template<class T>
class BlockQueue {
private:bool IsFull() { return _q.size() == _cap; }bool IsEmpty() { return _q.empty(); }
public:BlockQueue(int cap = NUM) : _cap(cap) {pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}~BlockQueue() {pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full);pthread_cond_destroy(&_empty);}void Push(const T& data) {pthread_mutex_lock(&_mutex);while (IsFull()) {pthread_cond_wait(&_full, &_mutex); // 队列满,等待}_q.push(data);pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_empty); // 唤醒消费者}void Pop(T& data) {pthread_mutex_lock(&_mutex);while (IsEmpty()) {pthread_cond_wait(&_empty, &_mutex); // 队列空,等待}data = _q.front();_q.pop();pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_full); // 唤醒生产者}
private:std::queue<T> _q; // 阻塞队列int _cap; // 容量上限pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _full; // 满条件变量pthread_cond_t _empty; // 空条件变量
};
main.cpp:
#include "BlockQueue.hpp"
#include <unistd.h>void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data = rand() % 100 + 1;bq->Push(data);std::cout << "Producer: " << data << std::endl;}return nullptr;
}
void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data;bq->Pop(data);std::cout << "Consumer: " << data << std::endl;}return nullptr;
}
int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<int>* bq = new BlockQueue<int>;pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0;
}
说明:
- 单生产者单消费者:无需维护生产者间或消费者间的互斥。
- 互斥:
_mutex
保护队列。 - 同步:
_full
和_empty
条件变量控制生产消费顺序。 - 条件判断:用
while
防止伪唤醒。 - 运行结果:生产者与消费者步调一致,每秒交替生产消费。
生产者消费者步调调整
-
生产快,消费慢:
void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {int data = rand() % 100 + 1;bq->Push(data);std::cout << "Producer: " << data << std::endl;} } void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data;bq->Pop(data);std::cout << "Consumer: " << data << std::endl;} }
- 生产者快速填满队列后等待,消费者消费一个后唤醒生产者,后续步调一致。
-
生产慢,消费快:
void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data = rand() % 100 + 1;bq->Push(data);std::cout << "Producer: " << data << std::endl;} } void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {int data;bq->Pop(data);std::cout << "Consumer: " << data << std::endl;} }
- 消费者初始等待生产者生产,消费后继续等待,步调随生产者。
条件唤醒优化
调整唤醒条件,例如队列数据量超一半时唤醒消费者,小于一半时唤醒生产者:
void Push(const T& data) {pthread_mutex_lock(&_mutex);while (IsFull()) {pthread_cond_wait(&_full, &_mutex);}_q.push(data);if (_q.size() >= _cap / 2) {pthread_cond_signal(&_empty); // 超一半唤醒消费者}pthread_mutex_unlock(&_mutex);
}
void Pop(T& data) {pthread_mutex_lock(&_mutex);while (IsEmpty()) {pthread_cond_wait(&_empty, &_mutex);}data = _q.front();_q.pop();if (_q.size() <= _cap / 2) {pthread_cond_signal(&_full); // 少于一半唤醒生产者}pthread_mutex_unlock(&_mutex);
}
- 效果:生产者快速填满队列后等待,消费者消费至一半以下才唤醒生产者。
基于计算任务的扩展
将队列存储类型改为任务类,扩展功能:
Task.hpp:
#pragma once
#include <iostream>class Task {
public:Task(int x = 0, int y = 0, char op = 0) : _x(x), _y(y), _op(op) {}void Run() {int result = 0;switch (_op) {case '+': result = _x + _y; break;case '-': result = _x - _y; break;case '*': result = _x * _y; break;case '/': if (_y == 0) { std::cout << "Warning: div zero!" << std::endl; result = -1; }else { result = _x / _y; } break;case '%': if (_y == 0) { std::cout << "Warning: mod zero!" << std::endl; result = -1; }else { result = _x % _y; } break;default: std::cout << "error operation!" << std::endl; break;}std::cout << _x << " " << _op << " " << _y << "=" << result << std::endl;}
private:int _x, _y;char _op;
};
main.cpp:
#include "BlockQueue.hpp"
#include "Task.hpp"void* Producer(void* arg) {BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;const char* ops = "+-*/%";while (true) {int x = rand() % 100;int y = rand() % 100;char op = ops[rand() % 5];Task t(x, y, op);bq->Push(t);std::cout << "Producer task done" << std::endl;}return nullptr;
}
void* Consumer(void* arg) {BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;while (true) {sleep(1);Task t;bq->Pop(t);t.Run();}return nullptr;
}
int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<Task>* bq = new BlockQueue<Task>;pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0;
}
- 功能:生产者生成计算任务,消费者执行计算并输出结果。
- 扩展性:通过定义不同
Task
类实现多样化任务处理。
总结
- 模型核心:通过容器解耦生产者与消费者,支持并发与忙闲不均。
- 实现关键:阻塞队列结合互斥锁与条件变量,确保互斥与同步。
- 灵活性:可调整步调、唤醒条件,或扩展为复杂任务处理。