一、Linux线程同步
同步是相对于竞争的概念;
同步就是在保证安全的前提下啊,按照一定的顺序访问临界资源;
所有的资源一定是先访问的临界资源,申请失败然后才进行排队的;互斥锁保证的是来访问的进程只允许有一个进入,剩下的都失败然后按照一定的顺序进行排队,排队之后所就不管了,除非线程被唤醒;换句话说互斥是前提,同步是结果;
同步的方式:条件变量依赖互斥锁使用;
1.1条件变量
条件变量:1.提供一种简单的通知机制;2.提供一个等待队列,即使用队列的方式使得线程之间具有一定的顺序;
互斥锁和条件变量都是需要被管理起来的,即原生线程库会为它们创建对应的内核结构,然后使用数据结构将它们维护起来;即凡是可以创建多个对象的,都要进行管理;
线程执行传参函数是尽量保证不要传递主线程栈内元素的地址,防止被影响所有最好传递的是拷贝;
typedef union
{struct{int __lock;unsigned int __futex;__extension__ unsigned long long int __total_seq;__extension__ unsigned long long int __wakeup_seq;__extension__ unsigned long long int __woken_seq;void *__mutex;//内部有mutex是有一定的耦合度的unsigned int __nwaiters;unsigned int __broadcast_seq;} __data;char __size[__SIZEOF_PTHREAD_COND_T];__extension__ long long int __align;
} pthread_cond_t;
1.1.1条件变量的初始化和释放
#include <pthread.h>
int pthread_cond_destroy(pthread_cond_t *cond);//条件变量的释放
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);//条件变量的申请
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//全局或者静态条件变量的申请和释放
1.1.2条件变量的等待
#include <pthread.h>
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
//将申请锁失败的线程进行排队,第二个参数是锁;
1.1.3条件变量的唤醒等待
线程被唤醒后会去上次等待的地方继续往下执行;
#include <pthread.h>
int pthread_cond_broadcast(pthread_cond_t *cond);//唤醒所有队列中的线程
int pthread_cond_signal(pthread_cond_t *cond);//唤醒队首线程
//线程被唤醒后需要重新获取锁;
1.1.4使用
pthread_mutex_lock(&mutex);//用来阻塞申请不成功的线程
pthread_cond_wait(&cond, &mutex);//先释放锁再排队
//将被阻塞的线程进行排队,传递锁的原因就是进入等待队列是会锁所自动释放,否则就进入等待就没有锁了,导致死锁问题;
如上操作最终就转变成了,所有新线程不管是否申请锁成功都是先进行排队,控制逻辑交给主线程完成;通过创建完线程每一个线程睡眠一小会,来控制顺序;
1.2同步周边问题
1.2.1为什么要让线程休眠(排队)
因为临界资源不就绪,临界资源是二元的就绪和不就绪,就绪可以访问,不就绪不可以访问,此时最好休眠就行,不要一直申请做无效的事;
根据临界资源的状态判断来决定线程是否排队并且排队的线程是否被唤醒;临界资源要保证共享资源加互斥;
即最终访问临界资源的方式就变成了先对临界资源进行判断,所以要在判断前就加锁,然后分情况1.在临界资源就绪的情况下只允许一个执行流访问;2.在临界资源不就绪的情况下,执行的是同步而不是竞争;
二、CP模型(consumer producer)
CP模型是一个安全高效实现同步互斥的模型;
总结:表现为三种关系两种角色和一个通信场所;单执行流传参会使得耦合度过高,而一个执行流将参数传到缓冲区,另一个执行流获取参数本质上就是解耦合传参的设计;
2.1生活中的CP模型
超市:1.需求大,满足生产者一次性大量生产,因为生产少,生产作业线就没有意义,需求少,生产多了就浪费;2.存在现成的商品,满足消费者即需即有的特点;本质上超市就是将数据进行缓存;支持忙闲不均(可以实现并发),生产和消费解耦合;
生产者:关心超市剩余空间;
消费者:关心超市剩余商品;
2.2计算机中的CP模型
本质是计算机中的执行流进行通信并且是安全高效的通信,会存在并发问题;
数据的生命周期包括,产生数据、发送数据、读取数据、处理数据;要注意的是:1.数据发送以大批量的方式可以提高效率;2产生数据是需要消耗时间的;
CP在计算机中都是线程,超市就是一块特定的内存空间,通信要求的共享资源,商品就是数据;
存在并发问题的场景:
1.生产者之间;2消费者之间;生产者和消费者之间;
生产者:先互斥后竞争;
消费者:先互斥后竞争;
生产者和消费者:先互斥后同步(先生产后消费,因为数据是生产者生产的,生产者对数据的竞争能力强,会导致消费者长时间饥饿);
2.3CP模型实现方式
2.3.1基于阻塞队列的CP模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出,管道类似于阻塞队列;
存在伪唤醒的情况,由于多个生产线程被唤醒后对于锁的竞争能力较强,会导致消费线程长时间得不到CPU产生饥饿,而且竞争成功的生产线程会从wait的位置继续向后执行,这样会因为没有从临界资源的判断开始执行产生数据不一致问题;即对于临界资源的判断使用循环方式,这样从wait之后执行直接进行下一次的判断;
#pragma once
#include <queue>
#include <pthread.h>
template <class T>
class blockqueue
{static const int defaultnum = 20;public:blockqueue(int maxcapacity = defaultnum) : maxcapacity_(maxcapacity){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&p_cond_, nullptr);pthread_cond_init(&c_cond_, nullptr);low_water = 5;high_water = 15;}~blockqueue(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&p_cond_);pthread_cond_destroy(&c_cond_);}public:T pop(){pthread_mutex_lock(&mutex_);while (q_.size() == 0){pthread_cond_wait(&c_cond_, &mutex_);}T ret = q_.front();q_.pop();if (q_.size() <= low_water)pthread_cond_signal(&p_cond_);pthread_mutex_unlock(&mutex_);return ret;}void push(const T &value){pthread_mutex_lock(&mutex_);while (q_.size() == maxcapacity_){pthread_cond_wait(&p_cond_, &mutex_);}q_.push(value);if (q_.size() >= high_water)pthread_cond_signal(&c_cond_);pthread_mutex_unlock(&mutex_);}private:std::queue<T> q_; // 本质上还一个共享资源,是有线程安全的int maxcapacity_; // 用于临界资源判断是否就绪// int mincapacity_; // 用于临界资源判断是否就绪pthread_mutex_t mutex_; // 由于queue容器是不安全的,所以想要加锁保护pthread_cond_t p_cond_; // 由于生产者和消费者之间的关系是不一样的需要进行判断是同步和竞争pthread_cond_t c_cond_;int low_water;int high_water;
};
2.4正确看待CP模型的高效问题
高效体现在当前线程发送数据的同时其他线程可以处理数据;当前线程读取数据的同时其他线程可以生产数据;而发送数据和读取数据即生产者和消费者之间本质上是串行的;
三、POSIX信号量
阻塞队列中的队列是当做一份共享资源使用的;通过加锁保证共享资源的安全性;有时候共享资源也可以被看作多份;分成多少个小块共享资源就最多允许多少个线程访问一整个共享资源;使用信号量保证块共享资源数量和线程的数量匹配并且保证每一个线程访问的是不同的块共享资源;申请和释放信号量要求必须是原子的;可以使用互斥锁保证信号量的PV操作是原子的;访问一个划分了多个区域的共享资源要先申请信号量;
使用信号量执行PV操作之后就不需要对临界资源是否就绪进行判断了,因为申请信号量的时候就已经进行了临界资源的判断了;
3.1信号量的初始化和释放
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);//信号量的初始化
//第一个参数表示信号量;
//第二个参数pshared为0表示允许线程间进行通信,为1表示进程间进行通信;
//第三个参数表示信号量的初始值;
int sem_destroy(sem_t *sem);//信号量的释放
//参数是信号量
3.2信号量的PV操作
#include <semaphore.h>
int sem_wait(sem_t *sem);//阻塞式地等待,P操作将信号量-1;
int sem_trywait(sem_t *sem);//非阻塞地等待,失败直接返回;int sem_post(sem_t *sem);//发布信号量,V操作,将信号量+1;
四、基于环形队列的生产消费模型
4.1环形队列的原理
使用数组模拟环形队列,即使用下标++并且%=数组长度;
使用两个下标(head tail)来维护这个数组,当生产一个数据,*head = value,head ++ 并 head %= size,当消费一个数据*tail = 0,tail ++ 并 tail %= size;
需要注意的是环形队列空和满时,两个下标都是指向同一个位置,会导致不容易判断为空还是满;
解决方式:
1.使用计数器维护,默认为0,push数据就++,pop数据就–;2空一个位置,如果head+1等于tail就停止;
4.2环形队列生产消费模型的原理
三个原则:
1.指向同一个位置是不可以同时访问;
2.消费者速度不能比生产者快;
3.生产者不能超过消费者一圈;
即不空或者不满的时候是可以并发的发送数据和读取数据的,指向同一个位置是必须等待,为空时消费者等待生产者生产,满是生产者等待消费者消费;
4.3信号量实现环形队列生产消费模型
对于生产者用剩余空间的数量来设置信号量spacesem,默认大小为环形队列的大小,对于消费者使用剩余数据数量来设置信号量datasem,默认大小为0;
1.指向同一个位置时,其中的一个信号量一定为零,导致不可以进行P操作,也就是满足同一位置不能同时访问;
2.因为spacesem开始不为零,所以消费者的速度不能比生产者快;因为spacesem为0时,datasem大小为环形队列的大小,这时生产者就无法P成功,也就是说生产者不可能超过消费者一圈;
3.当执行的不是同一个位置时,也就是不空或者不满时,由于信号量都不为0,所以都会申请信号量成功允许并发访问;
生产者先申请spacesem,然后生产成功执行Vdatasem,因为生产成功对于消费者就是多了一个消费数据;然后消费者就可以先申请datasem,消费成功执行Vspacesem;
注意数组的空间大小保持不变,只是下标变化;由于多个线程对于下标访问存在线程安全,所以需要进行加锁;
生产者和消费者如果使用一把锁就会导致生产者和消费者无法并发发送和读取数据;所以使用两把锁;另外使用锁要在PV操作之间,因为1.信号量PV本身是原子的。2.PV操作和锁的申请释放应该是并行的;
总结:对于环形队列CP模型大部分时间只有两个线程,一个生产者和一个消费者,队列空或者满的时候,只有一个线程,生产者或消费者;而阻塞队列CP模型则任何时刻只有一个线程;
#pragma once#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>template <class T>
class ringqueue
{
public:ringqueue(int capacity = defaultnum) : v_(capacity), capacity_(capacity), c_step_(0), p_step_(0){sem_init(&c_data_sem_, 0, 0);sem_init(&p_space_sem_, 0, capacity_);pthread_mutex_init(&cmutex_, nullptr);pthread_mutex_init(&pmutex_, nullptr);}void push(const T &value) // 发送数据{p(&p_space_sem_);lock(&pmutex_);v_[p_step_++] = value;p_step_ %= capacity_;unlock(&pmutex_);v(&c_data_sem_);}void pop(T *out) // 读取数据{p(&c_data_sem_);lock(&cmutex_);*out = v_[c_step_++];c_step_ %= capacity_;unlock(&cmutex_);v(&p_space_sem_);}~ringqueue(){sem_destroy(&c_data_sem_);sem_destroy(&p_space_sem_);pthread_mutex_destroy(&cmutex_);pthread_mutex_destroy(&pmutex_);}private:void p(sem_t *sem){sem_wait(sem);}void v(sem_t *sem){sem_post(sem);}void lock(pthread_mutex_t *mutex){pthread_mutex_lock(mutex);}void unlock(pthread_mutex_t *mutex){pthread_mutex_unlock(mutex);}private:std::vector<T> v_; // 只用数组来模拟环形队列int capacity_; // 用来维护数组的大小int c_step_; // 消费者下标int p_step_; // 生产者下标sem_t c_data_sem_; // 消费信号量sem_t p_space_sem_; // 生产信号量pthread_mutex_t cmutex_;pthread_mutex_t pmutex_;static const int defaultnum = 20; // 成员常量
};