目录
1.环形队列
2.加上信号量的理解
3.代码
1.环形队列
环形队列使用vector封装出来的。
环形队列可以实现并发生产和消费,就是在消费的同时也可以生产。
这个是建立在生产者消费者位置不重合的情况下。
因为位置重合之后,环形队列为空或者满,
为空的时,只能让生产者先生产,消费者后消费,
为满时,消费者先消费,生产者后生产。
如何实现环形的效果
当下标遍历到vector末尾的时候, 下标 %=判断环形队列为空还是为满 vector容量,下标就回到数组的开始。
当生产者消费者重合时
1.队列为空(访问环形队列,让生产者先生产)
2.队列为满(访问环形队列,让消费者先消费)
当生产者与消费者不重合时
队列一定不为空&&不为满,这个时候生产者可以生产,消费者可以消费。
2.加上信号量的理解
生产者只关心环形队列有没有空间让他生产
消费者只关心环形队列有没有数据让他消费
信号量是一种资源预定机制,预定成功之后肯定会有空间和数据,给消费者和生产者。
所以生产者在生产之前要对空间的信号量进行p操作,生产完成之后要对数据的信号量进行v操作
所以消费者在生产之前要对数据的信号量进行p操作,消费完成之后要对空间的信号量进行v操作
3.代码
ringqueue.hpp
#ifndef __RINGQUEUEHPP__
#define __RINGQUEUEHPP__
#include<pthread.h>
#include<thread>
#include<vector>
#include<semaphore.h>
#include"Task.hpp"
#include <iostream>
#include <string.h>
template<class T>
class ringqueue
{
private:void p(sem_t &sem){int ret = sem_wait(&sem);if(ret == -1){std::cout<< strerror(errno) <<std::endl;}}void v(sem_t &sem){int ret = sem_post(&sem);if(ret == -1){std::cout<< strerror(errno) <<std::endl;}}public:ringqueue(int cap = 8):_ringqueue(cap),_cap(cap),p_step(0),c_step(0){sem_init(&_room,0,cap);sem_init(&_data,0,0);pthread_mutex_init(&c_mutex,nullptr);pthread_mutex_init(&p_mutex,nullptr);}//生产者向队列里生产void enqueue(T in){ p(_room); // 申请空间pthread_mutex_lock(&p_mutex);_ringqueue[p_step++] = in;p_step %= _cap;pthread_mutex_unlock(&p_mutex);v(_data); // 生产之后资源++} void pop(T &out){//int roomnum = 0;//int datanum = 0;//sem_getvalue(&_room,&roomnum);//sem_getvalue(&_data,&datanum);//std::cout<< roomnum <<" " <<datanum <<std::endl;p(_data); //申请资源pthread_mutex_lock(&c_mutex);out = _ringqueue[c_step++];c_step %= _cap;pthread_mutex_unlock(&c_mutex);v(_room);}~ringqueue(){sem_destroy(&_room);sem_destroy(&_data);pthread_mutex_destroy(&p_mutex);pthread_mutex_destroy(&c_mutex);}private:std::vector<T> _ringqueue;int _cap;//容量int p_step ;//生产者下次生产的位置int c_step ;//消费者下次消费的位置sem_t _room; //空间sem_t _data; //资源pthread_mutex_t p_mutex; //为了多生产之间的互斥关系pthread_mutex_t c_mutex; //为了多消费之间的互斥关系
};#endif
Task.hpp
#ifndef __TASKHPP__
#define __TASKHPP__#include<string>
#include<iostream>
class Task
{
public:Task():taskname("未知任务"){}Task(std::string name):taskname(name){}void excute(){std::cout<<"Excute " << taskname <<std::endl;}std::string & name(){return taskname;}
private:std::string taskname;
};#endif
Main.cc
#include "Task.hpp"
#include "ringqueue.hpp"
#include <iostream>
#include <thread>
#include <string>
#include <vector>
#include <time.h>
#include <unistd.h>
std::vector<std::string> Taskarr{"Download", "UPLoad", "cacluate"};void product(void* args)
{ringqueue<Task>* rq = (ringqueue<Task>*)args;srand(time(nullptr));while (true){sleep(1);Task task(Taskarr[rand() % 3]);rq->enqueue(task);std::cout << "product task "<< task.name() << std::endl;}
}//void consum(ringqueue<Task> &rq)
//用指針的方式传递参数,把ringqueue的指针传过来,
void consum(void* args)
{ringqueue<Task>* rq = (ringqueue<Task>*)args;sleep(6);while (true){sleep(1);Task task;rq->pop(task);task.excute(); }
}
//是不是这里的问题 这里传过去的时候发生了拷贝
//是的
void product_start(std::vector<std::thread> &threadss, ringqueue<Task> &rq, int num)
{for (int i = 0; i < num; i++){threadss.emplace_back(product, (void*)&rq);}
}void consumer_start(std::vector<std::thread> &threadss, ringqueue<Task> &rq, int num)
{for (int i = 0; i < num; i++){threadss.emplace_back(consum, (void*)&rq);}
}void Waitall(std::vector<std::thread> &threadss)
{for (auto &thread : threadss){thread.join();}
}int main()
{std::vector<std::thread> threadss;ringqueue<Task> rq(5);product_start(threadss, rq,3);consumer_start(threadss, rq, 5);Waitall(threadss);return 0;
}