目录
线程互斥
相关概念
互斥量mutex
互斥量的接口
初始化互斥量
销毁互斥量
互斥量加锁/解锁
可重入VS线程安全
概念
可重入与线程安全的联系
可重入与线程安全的区别
死锁
死锁的四个必要条件
避免死锁
避免死锁的算法
线程同步
条件变量
条件变量函数 初始化
销毁
等待条件满足
唤醒等待
CP问题
代码实现
线程互斥
相关概念
- 临界资源:多线程执行流共享的资源就叫临界资源。
- 临界区:每个线程内部,访问临界资源的代码,就叫临界区。
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用。
- 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。
互斥量mutex
为了解决多个线程并发的操作共享变量所带来的问题,本质上就是需要一把锁,Linux中提供的锁被称为互斥量。
互斥量的接口
初始化互斥量
初始化互斥量有两种方法:
方法一:静态分配:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
方法二:动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict attr);参数:mutex:要初始化的互斥量attr:NULL
销毁互斥量
销毁互斥量时需要注意:
- 使用PTHREAD_MUTEX_INITIALIZER初始化的互斥量不需要销毁。
- 不要销毁一个已经加锁的互斥量。
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁。
int pthread_mutex_destroy(pthread_mutex_t *mutex);
互斥量加锁/解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);……int pthread_mutex_unlock(pthread_mutex_t *mutex);//返回值 : 成功返回 0, 失败返回错误号
调用 pthread_mutex_lock 时,会有以下几种情况:
- 互斥量处于未锁的状态,该函数就会将互斥量锁定,同时返回成功。
- 发起函数调用时,其他线程已经锁定了互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_mutex_lock调用就会就入阻塞(执行流被挂起),等待互斥量解锁。
可重入VS线程安全
概念
线程安全:多个线程并发同一段代码的时候,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,就会出现该问题。
重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数再重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入,否则就是不可重入函数。
可重入与线程安全的联系
- 函数是可重入的,那就是线程安全的。
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果一个函数中有全局变量,那么这个函数既不是线程安全的也不是可重入的。
可重入与线程安全的区别
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
加锁的本质:用时间换取安全
加锁的表现:线程对于临界区代码串行执行
加锁原则:尽量的保证临界区代码,越少越好
在纯互斥的环境下,如果锁分配不够合理,容易导致其他线程的饥饿问题!当然不是说只要有互斥,必有饥饿。适合纯互斥的场景,就用互斥。
让所有的线程,获取锁按照一定的顺序,也就是按照一定的顺序获取资源--同步。
死锁
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。
只有一把锁也是会出现死锁的情况,一个线程再持有锁的情况下,再申请锁,就会出现死锁。
死锁的四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用。(前提)
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已经获得的资源保持不放。(原则1)
- 不剥夺条件:一个执行流已经获得的资源,再未使用完之前,不能强行剥夺。(原则2)
- 循环等待条件:若干个执行流之间形成一种头尾相接的循环等待资源的关系。(重要条件)
避免死锁
- 破坏死锁的四个必要条件(请求与保持/不剥夺/循环等待)
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
避免死锁的算法
- 死锁检测算法
- 银行家算法
线程同步
条件变量
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
条件变量函数 初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);参数:cond:要初始化的条件变量attr:NULL(条件变量的属性)
销毁
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);参数:cond:要在这个条件变量上等待mutex:互斥量
唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond); //唤醒在等待队列的全部线程int pthread_cond_signal(pthread_cond_t *cond);//唤醒等待队列的第一个线程
CP问题
consumer producter
在编写生产消费模型前,我们先梳理一下两者涉及的概念
三种关系
生产者 VS 生产者 互斥关系
消费者 VS 消费者 互斥关系
生产者 VS 消费者 互斥关系,只有互斥可能会导致消费者饥饿问题,所有还要有同步
两种角色
生产者和消费者
一个交易场所
特定结构的内存空间
优点
- 支持忙闲不均
- 生产和消费进行解耦
代码实现
blockqueue.hpp
#pragma once#include <iostream>
#include <queue>
#include <pthread.h>
template <class T>
class BlockQueue
{static const int defaultnum = 5;static int data=0;
public:BlockQueue(int maxcap = defaultnum):maxcap_(maxcap){phtread_mutex_init(&mutex_,nullptr);pthread_cond_init(&c_cond_,nullptr);pthread_cond_init(&p_cond_,nullptr);low_water_ = maxcap_/3;high_water_ = (maxcap_*2)/3;}T &pop(){pthread_mutex_lock(&mutex_);if(q.size() == 0){//没有东西了,消费者去排队挂起pthread_cond_wait(&c_cond_,&mutex_);}T out = q.pop();//消费了一个,一定有空间给生产者生产,所以这里唤醒生产者if(q.size()<=low_water_)pthread_cond_signal(&p_cond_);pthread_mutex_unlock(&mutex_);return out;}void push(const T &in){pthread_mutex_lock(&mutex_);if(q.size() == maxcap_){//1.调用的时候自动释放锁//2.生产的到达极值不能继续生产了pthread_cond_wait(&p_cond_,&mutex_);}//1.队列没满 2.被唤醒q.push(in);//生产了一个,可以通知消费者来消费了if(q.size()>=high_water_)pthread_cond_signal(&c_cond_);pthread_mutex_unlock(&mutex_)}~BlockQueue(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&c_cond_);pthread_cond_destroy(&p_cond_);}
private:std::queue<T> q_;int maxcap_; //极值,到多少就不生产了int low_water_;int high_water_; pthread_mutex_t mutex_;pthread_cond_t c_cond_; //消费者条件变量pthread_cond_t p_cond_; //生产者条件变量};
main.cc
#include "BlockQueue.hpp"void *Consumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);while(true){//消费int data = bq->pop();std::cout<<"消费了一个数据:"<< data <<std::endl;}
}
void *Productor(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){//生产data++;bq->push(data);std::cout<<"生产了一个数据:"<<data<<std::end;}
}
int main()
{BlockQueue<int> *bq = new BlockQueue<int>();pthread_t c,p;pthread_create(&c,nullptr,Consumer,bq);pthread_create(&p,nullptr,Productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);delete bq; return 0;
}
makefile
test_blockqueue:main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clea:rm -f test_blockqueue