Sem.hpp(用于封装信号量):
#include<iostream>
#include<queue>
#include<unistd.h>
#include <semaphore.h>
using namespace std;
class Sem
{
public:Sem(int num){sem_init(&_sem,0,num);}~Sem(){sem_destroy(&_sem);}void V(){sem_post(&_sem);}void P(){sem_wait(&_sem);}
private:sem_t _sem;
};
Task.hpp(封装任务队列):
#include "sem.hpp"
#include <iostream>
#include <queue>
#include <pthread.h>
using namespace std;int num = 10;class Task
{
public:Task(): _space(num), _data(0){pthread_mutex_init(&_mtx, nullptr);}void Push(int x){_space.P(); // 生产者等待空间pthread_mutex_lock(&_mtx);q.push(x);cout << "生产了:" << x << endl;pthread_mutex_unlock(&_mtx);_data.V(); // 通知消费者数据已生产}void Pop(){_data.P(); // 消费者等待数据pthread_mutex_lock(&_mtx);cout << "消费了:" << q.front() << endl;q.pop();pthread_mutex_unlock(&_mtx);_space.V(); // 通知生产者空间已释放}private:Sem _space; // 空闲空间数量Sem _data; // 数据数量pthread_mutex_t _mtx;queue<int> q;
};
.cc:
#include"Task.hpp"
void* consumer(void* args)
{Task* t = (Task*)args;while(1){sleep(1);t->Pop();}return nullptr;
}
void* producer(void* args)
{Task* t = (Task*)args;while(1){t->Push(rand()%1000);}return nullptr;
}
int main()
{srand(time(nullptr));Task* t = new Task;pthread_t Consumer[5];pthread_t Producer[5];for(int i = 0; i < 5; i++){pthread_create(&Consumer[i], nullptr, consumer, (void*)t);pthread_create(&Producer[i], nullptr, producer, (void*)t);}for(int i = 0; i < 3; i++){pthread_join(Consumer[i], nullptr);pthread_join(Producer[i], nullptr);}delete t;return 0;
}