【条件变量导读】条件变量是多线程中比较灵活而且容易出错的线程同步手段,比如:虚假唤醒、为啥条件变量要和互斥锁结合使用?windows和linux双平台下,初始化、等待条件变量的api一样吗?
本文将分别为您介绍条件变量在windows和linux平台下的用法和注意事项,好!直接进入主题。
条件变量的使用场景可以用如下流程图进行阐述。
我们需反复判断一个多线程共享条件是否满足,一直到该条件满足为止(由于该条件被多个线程操作)。因此每次判断前进行加锁操作,判断完毕后解锁。但上述逻辑存在严重的效率问题,假设我们解锁离开临界区后,其他线程修改了条件,导致条件满足了;此时程序仍然需要睡眠 n 秒后才能得到反馈。因此我们需要这样一种机制:
某个线程 A 在条件不满足的情况下,主动让出互斥锁,让其他线程去争夺这把锁,当前线程A在此处等待,等待条件的满足;一旦条件满足,其他线程释放锁,并通知条件满足,线程A就可以被立刻唤醒并能获取到互斥锁对象。
1、Windows下条件变量的用法
具体条件变量的定义和api,我就不介绍了,大家参考如下示例程序,就能很轻松地掌握条件变量地初始化,本文地重点是介绍条件变量地用法及注意事项。
#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <list>class ThreadTask
{public:ThreadTask(int taskId){m_taskId = taskId;}void doTask(){std::cout << " threadId: " << std::this_thread::get_id() << " do Task, taskId: " << m_taskId << std::endl;}private:int m_taskId;
};/定义全局互斥锁对象
std::mutex myMutex;
//定义全局的windows条件变量
std::condition_variable myCv;
/全局任务队列
std::list<ThreadTask*> taskList;void* consumeThread()
{while (true){/判全局条件(公共队列taskList是否为空)前,先加锁std::unique_lock<std::mutex> lk(myMutex);while (taskList.empty()){ /*如果条件不满足,那继续等待条件变量满足条件同时立刻让出刚占有的互斥锁对象,让其他线程去争抢*/myCv.wait(lk); } //假设条件满足了,当前线程将从myCv.wait(lk)返回,//并立刻获取互斥锁对象操作公共的全局队列ThreadTask* pTask = taskList.front();//头部弹任务taskList.pop_front();if (!pTask)continue;pTask->doTask();delete pTask;pTask = nullptr;}return nullptr;
}void* produceThread()
{int taskId = 0;while (true){ThreadTask* pTask = nullptr;{std::lock_guard<std::mutex> lk(myMutex);taskId++;pTask = new ThreadTask(taskId);taskList.push_back(pTask);std::cout << "thread: " << std::this_thread::get_id() << " produce a Task, taskId: " << taskId << std::endl;}/*生产完任务,通知消费线程consumeThread条件满足释放锁资源myMutex*/myCv.notify_one();std::this_thread::sleep_for(std::chrono::seconds(1));}return nullptr;
}int main()
{std::thread consumeThread1(consumeThread);std::thread consumeThread2(consumeThread);std::thread consumeThread3(consumeThread);std::thread produceThread(produceThread);if (produceThread.joinable())produceThread.join();if (consumeThread1.joinable())consumeThread1.join();if (consumeThread2.joinable())consumeThread2.join();if (consumeThread3.joinable())consumeThread3.join();return 0;
}
程序运行的结果:
可以看出生产线程生产完任务塞到公共队列中去,通知消费线程去公共队列中取任务,一共四个线程在操作公共队列taskList,并没有出现资源冲突的情况。这便是条件变量使用的妙处!
从上述代码中可以看到,条件变量竟然在等待一把互斥锁。
std::unique_lock<std::mutex> lk(myMutex);
while (taskList.empty())myCv.wait(lk);
为啥条件变量要和互斥锁配合一起使用?我们可以假设下面这段伪码,互斥锁和条件变量分开使用。
lock(myMutex)
while (taskList.empty())
{//释放锁unlock(myMutex);/再等待条件cvcond_wait(&cv);//再加锁lock(myMutex)
}
假设线程当前线程(线程A)执行到第5行代码,释放了锁,此时操作系统把CPU时间片分配给另外一个等待myMutex的线程B,随后线程B释放信号,表明条件cv已经满足,等到线程A争抢到CPU时间片之后,就已经错过了线程B释放的信号了,那么线程B将永远阻塞在cond_wait()接口上。
解锁和等待条件变量必须是原子性的操作,要么都成功,要么都不成功,否则就很难保证线程的同步。
还有虚假唤醒的问题,何为虚假唤醒,就是 myCv.wait(lk)接口突然返回了,但它并不是被其它线程的信号唤醒的,可能是被操作系统某个中断信号给唤醒的,此时并没有相应的任务需要处理,如果继续让线程走下去,就可能会有问题,所以为了防止这种虚假唤醒的现象,我们外部循环去判断公共队列是否为空,如果为空,那就继续等待。这是Linux服务端面试必问的考点,请同学们慎重。
好,介绍完条件变量在windows下的用法,那么接着看下条件变量在linux下的用法。
2、Linux下条件变量的用法
条件变量的用法流程和windows的差不多,主要差异就是创建线程、初始化条件变量、等待条件变量的api接口不一样。
那,直接上代码!
#include <iostream>
#include <pthread.h>
#include <error.h>
#include <list>
#include <unistd.h>
#include <semaphore.h>
using namespace std;class ThreadTask
{public:ThreadTask(int taskId){m_taskId = taskId;}void doTask(){cout << " doTask taskId : " << m_taskId << " thread Id: " << pthread_self() << endl;}private:int m_taskId;
};pthread_mutex_t myMutex;
pthread_cond_t myCond;
list<ThreadTask*> taskList; void* consumeThread(void* param)
{while(true){pthread_mutex_lock(&myMutex);while(taskList.empty()){pthread_cond_wait(&myCond, &myMutex); }ThreadTask* pTask = taskList.front();taskList.pop_front();pthread_mutex_unlock(&myMutex);if (pTask == nullptr)continue;pTask->doTask();delete pTask;pTask = nullptr;}return NULL;
}
void* produceThread(void* param)
{int taskID = 0;ThreadTask* pTask = NULL;while (true){pTask = new ThreadTask(taskID);pthread_mutex_lock(&myMutex);taskList.push_back(pTask);std::cout << "produce a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl; pthread_mutex_unlock(&myMutex);//释放信号量,通知消费者线程pthread_cond_signal(&myCond);taskID++;sleep(1);}return NULL;
}int main()
{pthread_mutex_init(&myMutex, NULL);pthread_cond_init(&myCond, NULL);//创建3个消费者线程pthread_t consumerThreadID[5];for (int i = 0; i < 3; ++i){pthread_create(&consumerThreadID[i], NULL, consumeThread, NULL);}//创建一个生产者线程pthread_t producerThreadID;pthread_create(&producerThreadID, NULL, produceThread, NULL);pthread_join(producerThreadID, NULL);for (int i = 0; i < 3; ++i){pthread_join(consumerThreadID[i], NULL);}pthread_cond_destroy(&myCond);pthread_mutex_destroy(&myMutex); return 0;
}
Linux平台下运行的结果: