这篇博客的重点在于代码实现,理论部分请看CSDN
一、单生产单消费
1.环形队列的实现
单生产单消费的情况下,我们只需要维护生产者和消费者之间的互斥和同步关系即可
将环形队列封装成一个类:首先给出整体框架,接着会说明每一个类内函数的实现
#pragma once#include <iostream>
#include <vector>
#include <cassert>
#include <semaphore.h>// 环形队列的默认大小
static const int gcap = 5;// 设置成模版类型,队列中可以放任意类型的数据
template <class T>
class RingQueue
{
private:// 封装系统调用sem_waitvoid P(sem_t &sem);// 封装系统调用sem_postvoid V(sem_t &sem);public:RingQueue()~RingQueue()// 生产者向队列里放数据void Push(const T &in);// 消费者从队列中取数据void Pop(T *out);private:std::vector<T> _queue; // 数组模拟环形队列int _cap; // 环形队列的容量sem_t _spaceSem; // 生产者 -> 空间资源sem_t _dataSem; // 消费者 -> 数据资源int _producerStep; // 生产者的位置(数组下标)int _consumerStep; // 消费者的位置(数组下标)
};
(1) void P(sem_t &sem);
封装系统调用sem_wait,便于在push和pop中使用
void P(sem_t &sem)
{int n = sem_wait(&sem);assert(n == 0);(void)n;
}
(2) void V(sem_t &sem);
封装系统调用sem_post,便于在push和pop中使用
void V(sem_t &sem)
{int n = sem_post(&sem);assert(n == 0);(void)n;
}
(3) RingQueue()
RingQueue(const int &cap = gcap): _queue(cap), _cap(cap)
{// 初始化信号量int n = sem_init(&_spaceSem, 0, _cap);assert(n == 0);n = sem_init(&_dataSem, 0, 0);assert(n == 0);// 初始化生产者和消费者的位置_productorStep = _consumerStep = 0;
}
(4) ~RingQueue()
~RingQueue()
{sem_destroy(&_spaceSem);sem_destroy(&_dataSem);
}
(5) void Push(const T &in);
void Push(const T &in)
{// 生产者要了一个空间,空间信号量--P(_spaceSem);// 把数据放进队列_queue[_producerStep++] = in;// 维护环状结构_producerStep %= _cap;// 队列新增了数据,数据信号量++V(_dataSem);
}
(6) void Pop(T *out);
// 消费者从队列中取数据
void Pop(T *out)
{// 消费者拿了一个数据,数据信号量--P(_dataSem);// 把数据拿出队列*out = _queue[_consumerStep++];// 维护环状结构_consumerStep %= _cap;// 队列多出了空间,空间信号量++V(_spaceSem);
}
2.上层调用逻辑
#include "RingQueue.hpp"
#include <pthread.h>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>void *ProductorRoutine(void *rq)
{RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);// RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);while (true){// // version1// int data = rand() % 10 + 1;// ringqueue->Push(data);// std::cout << "生产完成,生产的数据是:" << data << std::endl;// sleep(1);// version2// 构建/获取 任务 -- 花费时间int x = rand() % 10;int y = rand() % 5;char op = oper[rand() % oper.size()];Task t(x, y, op, mymath);// 生产任务ringqueue->Push(t);// 输出提示std::cout << "生产者派发了一个任务:" << t.toTaskString() << std::endl;sleep(1);}
}void *ConsumerRoutine(void *rq)
{// RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);while (true){// // version1// int data;// ringqueue->Pop(&data);// std::cout << "消费完成,消费的数据是:" << data << std::endl;// version2Task t;// 消费任务 -- 花费时间ringqueue->Pop(&t);std::cout << SelName() << ",消费者消费了一个任务:" << t() << std::endl;}
}int main()
{srand((unsigned int)time(nullptr) ^ getpid());RingQueue<int> *rq = new RingQueue<int>();// RingQueue<Task> *rq = new RingQueue<Task>();// 单生产,单消费pthread_t p, c;pthread_create(p + i, nullptr, ProductorRoutine, rq);pthread_create(c + i, nullptr, ConsumerRoutine, rq);pthread_join(p[i], nullptr);pthread_join(c[i], nullptr);delete rq;return 0;
}
二、多生产多消费
1.环形队列的实现
多生产多消费的情况下//...
多生产多消费的环形队列与单生产单消费的不同之处 -> 多了两把锁
#pragma once#include <iostream>
#include <vector>
#include <cassert>
#include <semaphore.h>// 环形队列的默认大小
static const int gcap = 5;// 设置成模版类型,队列中可以放任意类型的数据
template <class T>
class RingQueue
{
private:// 封装系统调用sem_waitvoid P(sem_t &sem);// 封装系统调用sem_postvoid V(sem_t &sem);public:RingQueue()~RingQueue()// 生产者向队列里放数据void Push(const T &in);// 消费者从队列中取数据void Pop(T *out);private:std::vector<T> _queue; // 数组模拟环形队列int _cap; // 环形队列的容量sem_t _spaceSem; // 生产者 -> 空间资源sem_t _dataSem; // 消费者 -> 数据资源int _producerStep; // 生产者的位置(数组下标)int _consumerStep; // 消费者的位置(数组下标)pthread_mutex_t _pmutex; // 生产者pthread_mutex_t _cmutex; // 消费者
};
(1) RingQueue()
RingQueue(const int &cap = gcap): _queue(cap), _cap(cap){int n = sem_init(&_spaceSem, 0, _cap);assert(n == 0);n = sem_init(&_dataSem, 0, 0);assert(n == 0);_productorStep = _consumerStep = 0;pthread_mutex_init(&_pmutex, nullptr);pthread_mutex_init(&_cmutex, nullptr);}
(2) ~RingQueue()
~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);pthread_mutex_destroy(&_pmutex);pthread_mutex_destroy(&_cmutex);}
(3) void Push(const T &in);
// 生产者void Push(const T &in){//? 代码有没有优化的可能// 你认为:现加锁,后申请信号量;还是现申请信号量,在加锁?// pthread_mutex_lock(&_pmutex);// 申请到了信号量,意味着我一定能进行正常的生产P(_spaceSem);pthread_mutex_lock(&_pmutex);_queue[_productorStep++] = in;_productorStep %= _cap;pthread_mutex_unlock(&_pmutex);V(_dataSem);// pthread_mutex_unlock(&_pmutex);}
(4) void Pop(T *out);
// 消费者void Pop(T *out){// 你认为:现加锁,后申请信号量;还是现申请信号量,在加锁?// pthread_mutex_lock(&_cmutex);P(_dataSem);pthread_mutex_lock(&_cmutex);*out = _queue[_consumerStep++];_consumerStep %= _cap;pthread_mutex_unlock(&_cmutex);V(_spaceSem);// pthread_mutex_unlock(&_cmutex);}
2.上层调用逻辑
#include "RingQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>std::string SelName()
{char name[128];snprintf(name, sizeof(name), "thread[0x%x]", pthread_self());return name;
}void *ProductorRoutine(void *rq)
{// RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);while (true){// // version1// int data = rand() % 10 + 1;// ringqueue->Push(data);// std::cout << "生产完成,生产的数据是:" << data << std::endl;// sleep(1);// version2// 构建/获取 任务 -- 花费时间int x = rand() % 10;int y = rand() % 5;char op = oper[rand() % oper.size()];Task t(x, y, op, mymath);// 生产任务ringqueue->Push(t);// 输出提示std::cout << SelName() << ",生产者派发了一个任务:" << t.toTaskString() << std::endl;sleep(1);}
}void *ConsumerRoutine(void *rq)
{// RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);while (true){// // version1// int data;// ringqueue->Pop(&data);// std::cout << "消费完成,消费的数据是:" << data << std::endl;// version2Task t;// 消费任务 -- 花费时间ringqueue->Pop(&t);std::cout << SelName() << ",消费者消费了一个任务:" << t() << std::endl;}
}int main()
{srand((unsigned int)time(nullptr) ^ getpid());// RingQueue<int> *rq = new RingQueue<int>();RingQueue<Task> *rq = new RingQueue<Task>();// 单生产,单消费pthread_t p[4], c[8];for (int i = 0; i < 4; i++)pthread_create(p + i, nullptr, ProductorRoutine, rq);for (int i = 0; i < 4; i++)pthread_create(c + i, nullptr, ConsumerRoutine, rq);// 多生产,多消费?--> 只要保证,最终进入临界区的是一个生产,一个消费就行!// 多生产,多消费的意义?--> for (int i = 0; i < 4; i++)pthread_join(p[i], nullptr);for (int i = 0; i < 4; i++)pthread_join(c[i], nullptr);delete rq;return 0;
}