Linux —— POSIX信号量 - 基于环形队列的生产消费模型
- POSIX信号量
- 信号量的概念
- POSIX信号量的类型
- 信号量的操作
- POSIX信号量函数
- 基于环形队列的生产消费模型
- 设计思路
- 同步和安全性
- 代码
POSIX信号量
信号量的概念
POSIX信号量是一种用于进程和线程之间同步的机制,主要用于控制对共享资源的访问。信号量是一种同步原语,通常表现为一个整数值,表示可用资源的数量。信号量的值不小于0。如果一个进程尝试将信号量的值减少到小于0,操作将被阻塞,知道信号量的值增加到允许的范围。
POSIX信号量的类型
POSIX信号量主要分为两种类型:
- 命名信号量:
- 具有一个名称,可以通过该名称在不同的进程间访问。命名信号量通常用于进程间的同步。
- 使用
sem_open()
函数创建或打开命名信号量
2.未命名信号量: - 不具有名称,通常存在于内存中,适用于同一进程内的多个进程之间的同步。
- 未命名信号量可以通过
sem_init()
函数进行初始化。
信号量的操作
信号量的基本操作包括:
- P操作(等待操作):使用
sem_wait()
函数实现,尝试减少信号量的值。如果信号量的值为0,则调用线程将被阻塞,直到信号量的值大于0。 - V操作(释放操作):使用
sem_post()
函数实现,增加信号量的值,通知其他等待的线程或进程信号量的可用性。
POSIX信号量函数
POSIX信号量提供了一组函数来创建、操作和销毁信号量。以下是一些常用的POSIX信号量函数及其参数和返回值:
- sem_init
功能: 初始化未命名信号量。
原型:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
sem
: 指向信号量对象的指针。pshared
: 如果为0,则信号量仅用于线程间同步;如果非0,则信号量可用于进程间同步。
value: 信号量的初始值。
返回值:
- 成功时返回0;失败时返回-1,并设置errno以指示错误类型。
- sem_destroy
功能: 销毁信号量。
原型:
int sem_destroy(sem_t *sem);
参数:
- ·sem·: 指向要销毁的信号量对象的指针。
返回值: - 成功时返回0;失败时返回-1,并设置errno。
- sem_wait
功能: 进行 P 操作(等待操作),尝试获取信号量。
原型:
int sem_wait(sem_t *sem);
参数:
sem
: 指向信号量对象的指针。
返回值:
- 成功时返回0;失败时返回-1,并设置errno。如果信号量的值为0,调用线程将被阻塞。
- sem_post
- 功能: 进行V操作(释放操作),增加信号量的值。
- 原型:
int sem_post(sem_t *sem);
参数:
sem
: 指向信号量对象的指针。
返回值:
- 成功时返回0;失败时返回-1,并设置errno。
基于环形队列的生产消费模型
下面通过STL的vector来设计一个环形队列,环形队列采用数组模拟,用模运算来模拟环状特性:
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来
判断满或者空。另外也可以预留一个空的位置,作为满的状态
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
环形队列实际上就是一个链表,只是在理解上理解为一个环形的:
设计思路
- 使用环形队列:
- 环形队列是一种循环使用固定大小内存的数据结构,非常适合生产者-消费者模型。
- 通过维护生产者指针
(_p_step)
和消费者指针(_c_step)
,可以实现对环形队列的环形访问。
- 使用信号量:
- 使用两个信号量:
_data_sem
(数据信号量)和_space_sem(
空间信号量)。 _data_sem
表示队列中可用数据的数量,初始值为0。_space_sem
表示队列中可用空间的数量,初始值为队列的最大容量(_max_cap)
。
- 使用互斥量:
- 使用两个互斥量:
_p_mutex
(生产者锁)和_c_mutex
(消费者锁)。 - 互斥量用于保护生产者和消费者对队列的并发访问。
同步和安全性
- 生产者:
- 生产者首先使用 P
(_space_sem)
申请可用空间,如果没有可用空间,会被阻塞。 - 获取
_p_mutex
锁,保护对队列的写入操作。 - 将数据写入队列,并更新生产者指针
_p_step
。 - 释放
_p_mutex
锁。 - 使用 V
(_data_sem)
发布一个数据可用信号。
- 消费者:
- 消费者首先使用 P
(_data_sem)
申请可用数据,如果没有可用数据,会被阻塞。 - 获取
_c_mutex
锁,保护对队列的读取操作。 - 从队列中读取数据,并更新消费者指针
_c_step
。 - 释放
_c_mutex
锁。 - 使用 V
(_space_sem)
发布一个空间可用信号。
- 同步和安全性:
- 信号量确保了生产者和消费者对队列的访问是同步的。
- 生产者在有可用空间时才能写入,消费者在有可用数据时才能读取。
- 互斥量确保了生产者和消费者对队列的并发访问是安全的。
- 每个线程在访问队列时都会获取相应的互斥量,保证了临界区的互斥访问。
- 其他考虑:
- 在构造函数中初始化信号量和互斥量。
- 在析构函数中销毁信号量和互斥量。
- 确保在异常情况下,信号量和互斥量也能被正确销毁。
代码
- RingQueue.hpp 环形队列的实现
#pragma once#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <semaphore.h>template <typename T>
class RingQueue
{
private:void P(sem_t &s){sem_wait(&s);}void V(sem_t &s){sem_post(&s);}public:RingQueue(int max_cap) : _ringqueue(max_cap), _max_cap(max_cap), _c_step(0), _p_step(0){sem_init(&_data_sem, 0, 0);sem_init(&_space_sem, 0, max_cap);pthread_mutex_init(&_c_mutex, nullptr);pthread_mutex_init(&_p_mutex, nullptr);}void Push(const T &in) // 生产者{// 信号量本身就是一种资源预约机制,无需判断,即可知道资源的内部情况P(_space_sem);pthread_mutex_lock(&_p_mutex);_ringqueue[_p_step] = in;_p_step++;_p_step %= _max_cap;pthread_mutex_unlock(&_p_mutex);V(_data_sem);}void Pop(T *out) // 消费者{P(_data_sem);pthread_mutex_lock(&_c_mutex);*out = _ringqueue[_c_step];_c_step++;_c_step %= _max_cap;pthread_mutex_unlock(&_c_mutex);V(_space_sem);}~RingQueue(){sem_destroy(&_data_sem);sem_destroy(&_space_sem);pthread_mutex_destroy(&_c_mutex);pthread_mutex_destroy(&_p_mutex);}private:std::vector<T> _ringqueue;int _max_cap;int _c_step;int _p_step;sem_t _data_sem; // 消费者 数据信号量sem_t _space_sem; // 生产者 空间信号量pthread_mutex_t _c_mutex;pthread_mutex_t _p_mutex;
};
- Task.hpp Task类的实现
#pragma once
#include <iostream>class Task
{
public:Task(){}Task(int x , int y):_x(x),_y(y){}~Task(){}void Excute(){_result = _x + _y;}std::string Debug(){std::string msg = std::to_string(_x) + " + " + std::to_string(_y) + " = " + " ? ";return msg;}std::string Result(){std::string msg = std::to_string(_x) + " + " + std::to_string(_y) + " = " + std::to_string(_result);return msg;}void operator()(){Excute();}
private:int _x;int _y;int _result;
};
- main.cc 代码上层的调用逻辑
#include "RingQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>void *Consumer(void*args)
{RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while(true){Task t;// 1. 消费rq->Pop(&t);// 2. 处理数据t();std::cout << "Consumer-> " << t.Result() << std::endl;}
}
void *Productor(void*args)
{RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while(true){sleep(1);// 1. 构造数据int x = rand() % 10 + 1; //[1, 10]usleep(x*1000);int y = rand() % 10 + 1;Task t(x, y);// 2. 生产rq->Push(t);std::cout << "Productor -> " << t.Debug() << std::endl;}
}int main()
{srand(time(nullptr) ^ getpid());RingQueue<Task> *rq = new RingQueue<Task>(5);// 单单pthread_t c1, c2, p1, p2, p3;pthread_create(&c1, nullptr, Consumer, rq);pthread_create(&c2, nullptr, Consumer, rq);pthread_create(&p1, nullptr, Productor, rq);pthread_create(&p2, nullptr, Productor, rq);pthread_create(&p3, nullptr, Productor, rq);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);pthread_join(p3, nullptr);return 0;
}