Conditions
- What's A Condition Variable?
- Boost.Interprocess Condition Types And Headers
- Anonymous condition example
What's A Condition Variable?
- 在前面的例子中,一个mutex被用来锁定,但我们不能用它来有效地等待,直到满足继续的条件。一个条件变量可以做两件事。
- wait: 线程被阻塞,直到其他线程通知它可以继续,因为导致等待的条件已经消失。
- 通知:线程通知其他线程可以继续。线程向一个被阻塞的线程或所有被阻塞的线程发送一个信号,告诉它们引起它们等待的条件已经消失。
- 条件变量中的等待总是与一个mutex相关联。在等待条件变量之前,必须先锁定该mutex。当在条件变量上等待时,线程解锁mutex并原子性地等待。
- 当线程从等待函数中返回时(例如因为信号或超时),mutex对象再次被锁定。
Boost.Interprocess Condition Types And Headers
- Boost.Interprocess offers the following condition types:
- #include <boost/interprocess/sync/interprocess_condition.hpp>
- interprocess_condition。一个匿名的条件变量,可以放在共享内存或内存映射文件中,与 boost::interprocess::interprocess_mutex 一起使用。
- #include <boost/interprocess/sync/interprocess_condition_any.hpp>
- interprocess_condition_any.一个匿名的条件变量,可以放在共享内存或内存映射文件中,用于任何锁类型。一个匿名的条件变量,可以放在共享内存或内存映射文件中,用于任何锁类型。
- #include <boost/interprocess/sync/named_condition.hpp>
- named_condition.一个命名的条件变量,与 named_mutex 一起使用。与named_mutex一起使用的命名条件变量。
- #include <boost/interprocess/sync/named_condition_any.hpp>
- named_condition: 一个命名的条件变量,可用于任何类型的锁。
-
命名条件与匿名条件类似,但它们与命名的mutexes结合使用。好几次,我们不想用同步数据来存储同步对象。我们希望使用相同的数据改变同步方法(从进程间到进程内,或者没有任何同步)。将进程共享的匿名同步与同步数据一起存储将禁止这样做。我们希望通过网络或其他通信方式发送同步数据。发送过程共享的同步对象就没有任何意义了。
Anonymous condition example
- 想象一下,一个进程,将一个跟踪写到一个简单的共享内存缓冲区,另一个进程逐一打印。第一个进程写入跟踪并等待另一个进程打印数据。为了达到这个目的,我们可以使用两个条件变量:第一个条件变量用来阻止发送者,直到第二个进程打印信息,第二个条件变量用来阻止接收者,直到缓冲区有痕迹要打印。
- 共享内存跟踪缓冲区(doc_anonymous_condition_shared_data.hpp)
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>struct trace_queue
{enum { LineSize = 100 };trace_queue(): message_in(false){}//Mutex to protect access to the queueboost::interprocess::interprocess_mutex mutex;//Condition to wait when the queue is emptyboost::interprocess::interprocess_condition cond_empty;//Condition to wait when the queue is fullboost::interprocess::interprocess_condition cond_full;//Items to fillchar items[LineSize];//Is there any messagebool message_in;
};
- 这是该进程的主进程。创建共享内存,将缓冲区放置在那里,并开始逐一写入消息,直到写下 "最后一条消息",表示没有更多的消息要打印。
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
#include <cstdio>
#include "doc_anonymous_condition_shared_data.hpp"using namespace boost::interprocess;int main ()
{//Erase previous shared memory and schedule erasure on exitstruct shm_remove{shm_remove() { shared_memory_object::remove("MySharedMemory"); }~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }} remover;//Create a shared memory object.shared_memory_object shm(create_only //only create,"MySharedMemory" //name,read_write //read-write mode);try{//Set sizeshm.truncate(sizeof(trace_queue));//Map the whole shared memory in this processmapped_region region(shm //What to map,read_write //Map it as read-write);//Get the address of the mapped regionvoid * addr = region.get_address();//Construct the shared structure in memorytrace_queue * data = new (addr) trace_queue;const int NumMsg = 100;for(int i = 0; i < NumMsg; ++i){scoped_lock<interprocess_mutex> lock(data->mutex);if(data->message_in){data->cond_full.wait(lock);}if(i == (NumMsg-1))std::sprintf(data->items, "%s", "last message");elsestd::sprintf(data->items, "%s_%d", "my_trace", i);//Notify to the other process that there is a messagedata->cond_empty.notify_one();//Mark message buffer as fulldata->message_in = true;}}catch(interprocess_exception &ex){std::cout << ex.what() << std::endl;return 1;}return 0;
}
- 第二个过程打开共享内存,打印每一条消息,直到收到 "最后一条消息 "消息。
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
#include <cstdio>
#include "doc_anonymous_condition_shared_data.hpp"using namespace boost::interprocess;int main ()
{//Erase previous shared memory and schedule erasure on exitstruct shm_remove{shm_remove() { shared_memory_object::remove("MySharedMemory"); }~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }} remover;//Create a shared memory object.shared_memory_object shm(create_only //only create,"MySharedMemory" //name,read_write //read-write mode);try{//Set sizeshm.truncate(sizeof(trace_queue));//Map the whole shared memory in this processmapped_region region(shm //What to map,read_write //Map it as read-write);//Get the address of the mapped regionvoid * addr = region.get_address();//Construct the shared structure in memorytrace_queue * data = new (addr) trace_queue;const int NumMsg = 100;for(int i = 0; i < NumMsg; ++i){scoped_lock<interprocess_mutex> lock(data->mutex);if(data->message_in){data->cond_full.wait(lock);}if(i == (NumMsg-1))std::sprintf(data->items, "%s", "last message");elsestd::sprintf(data->items, "%s_%d", "my_trace", i);//Notify to the other process that there is a messagedata->cond_empty.notify_one();//Mark message buffer as fulldata->message_in = true;}}catch(interprocess_exception &ex){std::cout << ex.what() << std::endl;return 1;}return 0;
}
- 通过条件变量,一个进程如果不能继续工作就可以阻塞,当满足继续工作的条件时,另一个进程可以唤醒它。
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
#include <cstring>
#include "doc_anonymous_condition_shared_data.hpp"using namespace boost::interprocess;int main ()
{//Create a shared memory object.shared_memory_object shm(open_only //only create,"MySharedMemory" //name,read_write //read-write mode);try{//Map the whole shared memory in this processmapped_region region(shm //What to map,read_write //Map it as read-write);//Get the address of the mapped regionvoid * addr = region.get_address();//Obtain a pointer to the shared structuretrace_queue * data = static_cast<trace_queue*>(addr);//Print messages until the other process marks the endbool end_loop = false;do{scoped_lock<interprocess_mutex> lock(data->mutex);if(!data->message_in){data->cond_empty.wait(lock);}if(std::strcmp(data->items, "last message") == 0){end_loop = true;}else{//Print the messagestd::cout << data->items << std::endl;//Notify the other process that the buffer is emptydata->message_in = false;data->cond_full.notify_one();}}while(!end_loop);}catch(interprocess_exception &ex){std::cout << ex.what() << std::endl;return 1;}return 0;
}