线程系列:
Linux–线程的认识(一)
Linux–线程的分离、线程库的地址关系的理解、线程的简单封装(二)
线程的互斥:临界资源只能在同一时间被一个线程使用
生产消费模型
信号量
信号量(Semaphore)是在多线程环境下用于控制对共享资源的访问的一种同步机制。
它可以确保在同一时刻只有一个线程能够访问共享资源,从而避免多个线程同时修改相同资源而导致的数据竞争和不确定结果。
信号量的主要作用是保护共享资源,确保在同一时刻只有一个线程能够对其进行访问。
通过信号量,线程在进入关键代码段之前必须获取信号量,一旦完成了对共享资源的操作,就释放信号量,以允许其他线程访问该资源。
特点
- 1.同步机制:信号量可用于线程之间的同步,确保在一个线程进入关键代码段时,其他线程必须等待,直到该线程释放信号量。
- 2.互斥访问:信号量可以实现对共享资源的互斥访问,即同一时刻只允许一个线程访问共享资源,避免了数据竞争。
- 3.计数器:信号量可以理解为一个计数器,它记录着可用的资源数量。当资源被一个线程占用时,计数器减少;当资源被释放时,计数器增加。
信号量与互斥量的区别
- 互斥量(Mutex) 是一种用于保护共享资源的锁,它只允许一个线程进入临界区,其他线程必须等待该线程释放锁才能进入。互斥量只有两种状态:锁定和非锁定。
- 信号量 则是一个更为通用的同步原语,它可以控制对一组资源的访问。信号量的值可以大于1,表示同时允许多个线程访问资源,这在某些情况下可能是必要的。信号量的实现原理涉及到原子操作和操作系统的底层支持,在不同的操作系统和编程语言中可能会有不同的实现方式。通常,操作系统提供了对信号量的原子操作支持,以确保在多线程环境下信号量的正确使用。
原子性
原子性(Atomicity)指的是一个或多个操作在执行过程中,不会被其他线程或进程的调度所打断,从而保证了数据的一致性和完整性。
原子性是一个重要的并发编程概念,它确保了一个操作或一组操作在执行时具有不可分割性,即这些操作要么全部执行成功,要么全部不执行,不会出现部分执行的情况。原子操作是不可分割的,要么完全执行,要么不执行,不存在中间状态。
临界资源
临界资源是指一次仅允许一个进程(或线程)使用的共享资源。这类资源如果同时被多个进程访问,可能会导致数据不一致或程序错误。
常见的临界资源包括物理设备(如打印机、磁带机等)和软件资源(如消息缓冲队列、全局变量、数组、缓冲区等)。
相关函数
semget()
用于获取一个信号量的标识符
#include<sys/sem.h>
#include<sys/type.h>
#include<sys/ipc.h>int semget(key_t key, int num_sems, int sem_flags);
- key:这是一个整数值,用于唯一标识一个信号量集。不相关的进程可以通过它访问同一个信号量集。当 key 的值为
IPC_PRIVATE
(通常是一个特殊的常量,表示私有)时,会创建一个新的、仅创建者进程可访问的信号量集。若 key 的值是由ftok()
等函数生成的,且sem_flags
中包含了IPC_CREAT
标志,则系统会尝试查找是否存在与 key 值相匹配的信号量集。 - num_sems:指定在信号量集中需要创建的信号量个数。这个参数在创建信号量集时有效,如果信号量集已存在,则忽略此参数。
- sem_flags:这是一组标志,用于控制
semget()
的行为。常用的标志包括IPC_CREAT
(如果不存在,则创建信号量集)和IPC_EXCL
(与 IPC_CREAT 结合使用,确保创建的信号量集是唯一的)。此外,sem_flags 的低9位还定义了信号量集的权限,类似于文件的访问权限。
返回成功时,semget() 返回一个正整数,即信号量集的标识符(IPC标识符)。
失败时,返回 -1,并设置 errno 以指示错误原因。
semctl()
用于对信号量的控制
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h> int semctl(int semid, int semnum, int cmd, ...);
- semid:信号量集的标识符,即信号表的索引。
- semnum:信号集内的索引,用于存取信号集内的某个特定信号量。在某些命令中,此参数可能会被忽略。
- cmd:指定要执行的控制命令。根据命令的不同,可能需要额外的参数。
- … :对于某些命令,可能需要一个 union semun 类型的参数,用于传递或接收数据。
成功执行时,根据不同的命令返回不同的非负值。
失败时返回 -1,并设置 errno 以指示错误原因。
semop()
对信号量的操作
#include <sys/sem.h> int semop(int semid, struct sembuf *sops, unsigned nsops);
- semid:信号量集的标识符,通过semget()函数获取。
- sops:指向sembuf结构数组的指针,每个sembuf结构定义了一个要执行的操作。
- nsops:sops数组中sembuf结构的数量,即要执行的操作数。
sembuf 结构体定义在<sys/sem.h>或<linux/sem.h>中,用于指定对单个信号量的操作:
struct sembuf { unsigned short sem_num; /* 信号量的编号 */ short sem_op; /* 信号量的操作(正/负/零)*/ short sem_flg; /* 信号量的操作标志 */
};
返回成功为0,失败为-1;
循环队列
生产消费模型中我们用阻塞队列作为缓冲区,还可以用循环队列来作为缓冲区;
在循环队列中有两个下标,一个代表生产者的,另一个代表消费者的,生产者生产数据或消费者取出数据都向前移动1;
当队列为空时,那么生产者和消费者的下标都处于相同位置下,需要让生产者先生产,消费者后消费;
队列为满时,生产者和消费者的下标也处于相同的位置,需要让消费者先消费,生产者后生产;
对于其他情况,生产者和消费者的下标都不相同,那么生产者和消费者不就可以同时并发进行了;
sem_wait
函数用于对信号量进行减一操作,如果信号量的值大于零,则将其减一并允许线程(生产者)继续执行。如果信号量的值为零,则线程会被阻塞,直到信号量的值变为非零。这是一个原子操作,确保在任何时刻只有一个线程可以减少信号量的值。所以不用对P函数进一步判断;sem_post
函数用于对信号量进行加一操作,这通常用于释放资源或通知等待的线程资源已经可用(消费者从循环队列取出)。当调用sem_post
时,如果有一个或多个线程正在等待该信号量,那么其中一个线程将被唤醒并继续执行。
多生产-多消费
当有多个线程时,只要信号量>1,是允许多个线程访问共享资源的而我们这里对于循环队列的生产或者消费都是互斥的,也就是多个生产者时,同时只能有一个生产者对循环队列存入数据,所以这里我们还需要加上互斥锁,使生产者或者消费者有序进入循环队列存取数据;(2把锁,生产者和消费者各一把);
代码
Ring_Queue.hpp
#include<iostream>
#include<string>
#include<vector>
#include<semaphore.h>
#include<pthread.h>using namespace std;
template<class T>
class RingQueue
{
public:RingQueue(int cap): _cap(cap),_ring_queue(cap),_productor_step(0),_comsumer_step(0){sem_init(&_room_sem,0,_cap);sem_init(&_data_sem,0,0);pthread_mutex_init(&_productor_mutex,nullptr);pthread_mutex_init(&_consumer_mutex,nullptr);}void Enqueue(const T& in){P(_room_sem);//生产者对空间的判断//如果没有空间,那么将会阻塞于P函数,所以执行下列语句必定有空间Lock(_productor_mutex);_ring_queue[_productor_step++]=in;_productor_step%=_cap;Unlock(_productor_mutex);V(_data_sem);//生产者放入数据进入队列,让消费者的信号量++}void Pop(T* out){P(_data_sem);Lock(_consumer_mutex);*out=_ring_queue[_comsumer_step++];_comsumer_step%=_cap;Unlock(_consumer_mutex);V(_room_sem);}~RingQueue(){sem_destroy(&_room_sem);sem_destroy(&_data_sem);pthread_mutex_destroy(&_productor_mutex);pthread_mutex_destroy(&_consumer_mutex);}
private:void P(sem_t& sem){sem_wait(&sem);}void V(sem_t& sem){sem_post(&sem);}void Lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}void Unlock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}//环形队列vector<T> _ring_queue;int _cap;//生产和消费的下标int _productor_step;int _comsumer_step;//定义信号量sem_t _room_sem;//生产者的sem_t _data_sem;//消费者的//定义锁,维护多生产多消费之间的互斥关系pthread_mutex_t _productor_mutex;pthread_mutex_t _consumer_mutex;
};
Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include<iostream>
#include<string>
#include<pthread.h>
#include<functional>
#include<unistd.h>using namespace std;namespace ThreadMdule
{template<typename T>using func_t = std::function<void(T&,string name)>;template<typename T>class Thread{public:void Excute(){_func(_data,_threadname);}Thread(func_t<T> func, T& data, const std::string &name="none-name"): _func(func), _data(data), _threadname(name), _stop(true){}static void* threadroutine(void* args){Thread<T>* self=static_cast<Thread<T>*>(args);self->Excute();return nullptr;}bool start(){int n=pthread_create(&_tid,nullptr,threadroutine,this);if(!n){_stop = false;return true;}else{return false;}}void Detach(){if(!_stop){pthread_detach(_tid);}}void Join(){if(!_stop){pthread_join(_tid,nullptr);}}string name(){return _threadname;}void Stop(){_stop = true;}~Thread() {}private:pthread_t _tid;std::string _threadname;T& _data; func_t<T> _func;bool _stop;};
}#endif
main.cc
#include"RingQueue.hpp"
#include"Thread.hpp"
#include<string>
#include<vector>
#include<unistd.h>
#include"Task.hpp"
#include<ctime>using namespace ThreadMdule;
using ringqueue_t=RingQueue<Task>;
void Productor(ringqueue_t& rq,string name)
{while (true){rq.Enqueue(Download);cout << "Productor : " << name << endl;}
}
void Consumer(ringqueue_t& rq,string name)
{while (true){sleep(1);Task t;rq.Pop(&t);std::cout << "Consumer handler task: " <<name <<endl;t();//执行任务}
}
void InitComm(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq, func_t<ringqueue_t> func,string who)
{for (int i = 0; i < num; i++){std::string name = "thread-" + std::to_string(i + 1)+"-"+who;threads->emplace_back(func, rq, name);}
}
void InitProductor(vector<Thread<ringqueue_t>>* threads,int num,ringqueue_t& rq)
{InitComm(threads,num,rq,Productor,"productor");
}
void InitConsumer(vector<Thread<ringqueue_t>>* threads,int num,ringqueue_t& rq)
{InitComm(threads,num,rq,Consumer,"consumer");
}void StartAll(vector<Thread<ringqueue_t>>& threads)
{for(auto& thread:threads){std::cout << "start: " << thread.name() << std::endl;thread.start();}
}
void WaitAllThread(std::vector<Thread<ringqueue_t>> &threads)
{for (auto &thread : threads){thread.Join();}
}
int main()
{ringqueue_t* rq=new ringqueue_t(10);vector<Thread<ringqueue_t>> threads;InitProductor(&threads,3,*rq);InitConsumer(&threads,2,*rq);StartAll(threads);WaitAllThread(threads);return 0;
}