绪论
并发编程纷繁复杂,其中用于线程同步的主要工具——条件变量,虽然精悍,但是要想正确灵活的运用却并不容易。
对于条件变量的理解有三个难点:
- 为什么
wait
函数需要将解锁和阻塞、唤醒和上锁这两对操作编程原子的? - 为什么
wait
函数需要配合while
进行使用? - 通知线程是应该先
notify
再unlock
还是先unlock
后notify
?
希望大家看完下面的介绍能够得到想要的答案。想要了解更多关于C++并发编程信息可以移步的我仓库:C++并发编程
条件变量
C++提供了两种条件变量的实现:std::condition_variable
和std::condition_variable_any
。前者只能和std::mutex
配合使用,后者只需要符合互斥的标准即可。因为std::condition_variable_any
更通用,所以可能产生额外的开销,如果没什么特殊需要,尽可能使用std::condition_variable
条件变量是非常重要的线程同步的手段(目前我认为是最重要的),因此对其的深入理解至关重要。
-
条件变量总是和互斥一起配合使用,互斥用于保护共享数据,条件变量用于
- 通知(通知线程)
- 判断该共享数据是否满足条件(等待线程)
-
通知线程往往先通过互斥保护共享数据,对数据进行一定的修改后再发送通知(
notify_one()、notify_all()
)。需要注意的是我们应尽可能在临界区内发送通知,从而避免可能出现的优先级翻转和条件变量失效问题。虽然临界区外通知可以让等待线程一旦被唤醒就能立即解锁互斥查看是否满足情况,但是在Pthread进行wait morphint后基本上两者没有性能上的差距。详细的分析可以参考博客:条件变量用例–解锁与signal的顺序问题。notify_one()
理论上只会唤醒一个等待线程,适用于共享变量数量发生变化的情况,例如通知消息队列中的消息个数增加。notify_all()
会唤醒所有等待该条件变量的线程,适用于共享变量状态发生变化的情况,例如通知所有工作线程开始计算。
-
等待线程先获得互斥,然后将锁和判定条件传递给
wait
函数等待返回。-
wait
函数首先会根据判断条件判断是否满足条件(返回true
)-
如果满足条件,则直接返回(互斥依旧上锁)
-
如果不满足条件,则阻塞等待,并解锁互斥(让其他线程得以修改共享数据的状态)。直到被
notify
函数唤醒,再次上锁,判断条件是否满足。这里的阻塞和解锁、唤醒和上锁都是原子的,就是为了避免两个动作分别执行出现的条件竞态。- 解锁和阻塞是原子的:lock → !pred() → unlock → sleep;如果变量的改变以及唤醒事件发生在unlock和sleep中间,那么你不会检测到,也就是错过了这次唤醒。假如下次唤醒依赖于此次唤醒的成功(也就是说不会主动唤醒第二次),那么将发生死锁。
- 唤醒和上锁是原子的:wakeup → lock → !pred :如果条件在wakeup和lock之间从满足变成了不满足(不是因为其他等待线程修改,而是因为负责唤醒的线程自己再次修改了条件),那么此次唤醒将失败。假如后面条件的再次满足依赖于此次条件满足成功(也就是说条件不会再主动满足),那么将发生死锁。
需要理解的是上面的死锁的出现是有限定条件的(例如唤醒之间的依赖、条件满足的依赖),虽然大多数情况下没有这么严格的条件,但是工具本身需要避免这种危险的情况。
原子操作保证了重要的唤醒和条件满足都能够至少被一个等待线程看到。
-
可以看到
wait
函数内部需要解锁互斥,所以就不能使用不提供unlock
函数的lock_guard
,而应该使用和互斥有相同接口的unique_lock
。
-
-
其实C++的线程库是对pthread库的封装,因此也可以像pthread库一样只传入互斥,解锁并等待通知,一旦接收到通知后再上锁,然后在一个
while
循环中进行判断。while (!pred()) {cond_.wait(lk); //调用pthread_cond_wait }
对于传入判定条件的版本,其实内部也是这样的一个封装罢了。
-
-
之所以说
notify_one()
理论上只会唤醒一个等待线程是因为存在调用一次notify_one()
却唤醒了多个线程的可能性,甚至有时候没有调用notify
等待线程都被唤醒,称这种意外唤醒等待线程的情况为伪唤醒。按照C++标准的规定,这种伪唤醒出现的数量和频率都不确定,因此要求等待线程的判定函数不能有副作用(可重用),并且需要在唤醒后再次判断条件是否满足,如果不满足则需要重新等待。这也是为什么上面的代码使用while
进行条件判断而不是if
的原因。
消息队列
//
// Created by edward on 22-11-16.
// use condtion_variable to genenrate a thread safe message queue
//#include "utils.h"
#include <mutex>
#include <queue>
#include <condition_variable>
#include <iostream>
#include <thread>
#include <string>template<typename T>
class MessageQueue {
public:void push(T t) {std::lock_guard lk(mtx_); //互斥保护数据queue_.push(std::move(t));cond_.notify_one(); //临界区内发送通知,避免优先级反转和条件变量失效}T pop() {T frnt;std::unique_lock lk(mtx_);cond_.wait(lk, [&](){return !queue_.empty();});frnt = std::move(queue_.front());queue_.pop();return frnt;}
private:mutable std::mutex mtx_;mutable std::condition_variable cond_;std::queue<T> queue_;
};using namespace std;template<typename T>
void data_prepare(MessageQueue<T> &messageQueue) {T t;while (cin >> t) {messageQueue.push(std::move(t));}
}template<typename T>
void data_process(MessageQueue<T> &messageQueue) {T t;int idx = 0;while (true) {t = messageQueue.pop(); //数据的处理在临界区外edward::print("[", idx++, "]:", t);}
}int main() {MessageQueue<string> messageQueue;edward::print("test begin:");thread preparer(data_prepare<string>, ref(messageQueue));thread processer(data_process<string>, ref(messageQueue));preparer.join();//不用等待processer,如果preparer结束,则直接推出进程return 0;
}
运行结果
其中用到了我自己写的库函数头文件utils
,如果想要了解更多信息可以移步C++ 工具函数库