✨个人主页: 熬夜学编程的小林
💗系列专栏: 【C语言详解】 【数据结构详解】【C++详解】【Linux系统编程】
目录
1、POSIX信号量
2、基于环形队列的生产消费模型
2.1、代码实现
2.1.1、RingQueue基本结构
2.1.2、PV操作
2.1.3、构造析构函数
2.1.4、生产者入队
2.1.5、消费者出队
2.2、代码测试
2.2.1、内置类型
2.2.2、类类型
2.2.3、多生产多消费
1、POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);参数:pshared:0表示线程间共享,非零表示进程间共享value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
上一弹生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序(POSIX信号量):
2、基于环形队列的生产消费模型
- 环形队列采用数组模拟,用模运算来模拟环状特性
- 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
- 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
多线程如何在环形队列中进行生产和消费?1.单生产单消费 2.多生产多消费
1.队列为空,让谁先访问?生产者先生产
2.队列为满,让谁先访问?消费者来消费
3.队列不为空 && 队列不为满 -- 生产和消费同时进行
2.1、代码实现
2.1.1、RingQueue基本结构
RingQueue使用模板类实现,当单生产单消费的时候可以不加锁,因为使用信号量就可以实现同步与互斥,当多生产多消费的时候需要进行加锁,且为两把锁!!!
template<typename T>
class RingQueue
{
private:// 等待信号量,信号量-1void P(sem_t& s);// 发布信号量(归还资源),信号量+1void V(sem_t& s);
public:// 构造函数初始化队列大小,信号量以及互斥锁RingQueue(int max_cap):_ringqueue(max_cap)/*构造队列成员个数*/,_max_cap(max_cap),_c_step(0),_p_step(0);// 生产者生产数据void Push(const T& in);// 消费者使用数据void Pop(T* out);// 析构函数释放信号量和互斥锁~RingQueue();
private:std::vector<T> _ringqueue; // vector实现队列int _max_cap; // 最大容量int _c_step; // 消费者位置int _p_step; // 生产者位置sem_t _data_sem; // 消费者关心数据资源sem_t _space_sem; // 生产者关心空间资源pthread_mutex_t _c_mutex;pthread_mutex_t _p_mutex;
};
2.1.2、PV操作
P操作等待信号量,V操作发布信号量!
// 等待信号量,信号量-1
void P(sem_t& s)
{sem_wait(&s);
}
// 发布信号量(归还资源),信号量+1
void V(sem_t& s)
{sem_post(&s);
}
2.1.3、构造析构函数
构造函数申请队列空间,初始化信号量和互斥锁(多生产多消费需要使用互斥锁),析构函数释放信号量和互斥锁!!!
// 构造函数初始化队列大小,信号量以及互斥锁
RingQueue(int max_cap):_ringqueue(max_cap)/*构造队列成员个数*/,_max_cap(max_cap),_c_step(0),_p_step(0)
{sem_init(&_data_sem,0,0); // 第一个0表示线程共享,第二个0表示信号量初始值为0sem_init(&_space_sem,0,max_cap);// pthread_mutex_init(&_c_mutex,nullptr);// pthread_mutex_init(&_p_mutex,nullptr);
}
// 析构函数释放信号量和互斥锁
~RingQueue()
{sem_destroy(&_data_sem);sem_destroy(&_space_sem);// pthread_mutex_destroy(&_c_mutex);// pthread_mutex_destroy(&_p_mutex);
}
2.1.4、生产者入队
生产者生产数据(无锁版本)!!!
// 生产者生产数据
void Push(const T& in)
{// 信号量:是一个计数器,是资源的预订机制。预订:在外部,可以不判断资源是否满足,就可以知道内部资源的情况!P(_space_sem); // 信号量这里,对资源进行使用,申请,为什么不判断一下条件是否满足???信号量本身就是判断条件!_ringqueue[_p_step] = in; // 生产数据_p_step++;_p_step %= _max_cap; // 循环轮转V(_data_sem); // 归还数据资源,不为满归还
}
2.1.5、消费者出队
消费者使用数据(无锁版本)!!!
// 消费者使用数据
void Pop(T* out)
{P(_data_sem); *out = _ringqueue[_c_step];_c_step++;_c_step %= _max_cap;V(_space_sem);
}
2.2、代码测试
2.2.1、内置类型
Consumer
void *Consumer(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);while (true){// 1.消费int data;rq->Pop(&data);// 2.处理数据std::cout << "Consumer->" << data << std::endl;}
}
Productor
void *Productor(void *args)
{srand(time(nullptr) ^ getpid());RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);while (true){// 1.构造数据int x = rand() % 10 + 1; // [1,10]// 2.生产数据rq->Push(x);std::cout << "Productor->" << x << std::endl;sleep(1);}
}
主函数
int main()
{RingQueue<int> *rq = new RingQueue<int>(5);// 单生产,单消费pthread_t c, p;pthread_create(&c, nullptr, Consumer, rq);pthread_create(&p, nullptr, Productor, rq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}
运行结果
2.2.2、类类型
Consumer
void *Consumer(void *args)
{RingQueue<Task> *rq = static_cast<RingQueue<Task>*>(args);while (true){// 1.消费Task t;rq->Pop(&t);// 2.处理数据t();std::cout << "Consumer->" << t.result() << std::endl;}
}
Productor
void *Productor(void *args)
{srand(time(nullptr) ^ getpid());RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){// 1.构造数据int x = rand() % 10 + 1; // [1,10]usleep(x * 1000);int y = rand() % 10 + 1; Task t(x,y);// 2.生产数据rq->Push(t);std::cout << "Productor->" << t.debug() << std::endl;sleep(1);}
}
主函数
int main()
{RingQueue<Task> *rq = new RingQueue<Task>(5);// 单生产,单消费pthread_t c, p;pthread_create(&c, nullptr, Consumer, rq);pthread_create(&p, nullptr, Productor, rq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}
运行结果
2.2.3、多生产多消费
多生产多消费能够保证生产与消费之间的同步与互斥,但是不能保证生产与生产,消费与消费之间的同步与互斥,因此需要在入队与出队时加锁!!!
生产者入队
// 生产者生产数据
void Push(const T& in)
{// 信号量:是一个计数器,是资源的预订机制。预订:在外部,可以不判断资源是否满足,就可以知道内部资源的情况!P(_space_sem); // 信号量这里,对资源进行使用,申请,为什么不判断一下条件是否满足???信号量本身就是判断条件!pthread_mutex_lock(&_p_mutex); // 给生产者上锁,解决多生产多消费数据不一致问题,放在P后效率更高_ringqueue[_p_step] = in; // 生产数据_p_step++;_p_step %= _max_cap; // 循环轮转pthread_mutex_unlock(&_p_mutex); // 解锁V(_data_sem); // 归还数据资源,不为满归还
}
消费者出队
// 消费者使用数据
void Pop(T* out)
{P(_data_sem); pthread_mutex_lock(&_c_mutex);*out = _ringqueue[_c_step];_c_step++;_c_step %= _max_cap;pthread_mutex_unlock(&_c_mutex);V(_space_sem);
}
主函数
int main()
{RingQueue<Task> *rq = new RingQueue<Task>(5);// 多生产,多消费pthread_t c1,c2,p1,p2,p3;pthread_create(&c1, nullptr, Consumer, rq);pthread_create(&c2, nullptr, Consumer, rq);pthread_create(&p1, nullptr, Productor, rq);pthread_create(&p2, nullptr, Productor, rq);pthread_create(&p3, nullptr, Productor, rq);pthread_join(c1,nullptr);pthread_join(c2,nullptr);pthread_join(p1,nullptr);pthread_join(p2,nullptr);pthread_join(p3,nullptr);return 0;
}
无锁
加锁