代码
- 代码的名称是read_write_mutex.h
- 这个代码可用,但是未优化,还存在冗余的代码
- 如果涉及到进程挂掉了,造成进程堵塞,如何解决?还未涉及
//#ifndef BOOST_THREAD_PTHREAD_SHARED_MUTEX_HPP
#define BOOST_THREAD_PTHREAD_SHARED_MUTEX_HPP// (C) Copyright 2006-8 Anthony Williams
// (C) Copyright 2012 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)#include <boost/assert.hpp>
#include <boost/static_assert.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
#include <boost/thread/detail/thread_interruption.hpp>
#endif
#ifdef BOOST_THREAD_USES_CHRONO
#include <boost/chrono/system_clocks.hpp>
#include <boost/chrono/ceil.hpp>
#endif
#include <boost/thread/detail/delete.hpp>
#include <boost/assert.hpp>#include <boost/config/abi_prefix.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/named_condition.hpp>namespace bip = boost::interprocess;
namespace chy
{class shared_mutex{private:class state_data{public:state_data () :shared_count(0),exclusive(false),upgrade(false),exclusive_waiting_blocked(false){}void assert_free() const{BOOST_ASSERT( ! exclusive );BOOST_ASSERT( ! upgrade );BOOST_ASSERT( shared_count==0 );}void assert_locked() const{BOOST_ASSERT( exclusive );BOOST_ASSERT( shared_count==0 );BOOST_ASSERT( ! upgrade );}void assert_lock_shared () const{BOOST_ASSERT( ! exclusive );BOOST_ASSERT( shared_count>0 );//BOOST_ASSERT( (! upgrade) || (shared_count>1));// if upgraded there are at least 2 threads sharing the mutex,// except when unlock_upgrade_and_lock has decreased the number of readers but has not taken yet exclusive ownership.}void assert_lock_upgraded () const{BOOST_ASSERT( ! exclusive );BOOST_ASSERT( upgrade );BOOST_ASSERT( shared_count>0 );}void assert_lock_not_upgraded () const{BOOST_ASSERT( ! upgrade );}bool can_lock () const{return ! (shared_count || exclusive);}void exclusive_blocked (bool blocked){exclusive_waiting_blocked = blocked;}void lock (){exclusive = true;}void unlock (){exclusive = false;exclusive_waiting_blocked = false;}bool can_lock_shared () const{return ! (exclusive || exclusive_waiting_blocked);}bool more_shared () const{return shared_count > 0 ;}unsigned get_shared_count () const{return shared_count ;}unsigned lock_shared (){return ++shared_count;}void unlock_shared (){--shared_count;}bool unlock_shared_downgrades(){if (upgrade) {upgrade=false;exclusive=true;return true;} else {exclusive_waiting_blocked=false;return false;}}void lock_upgrade (){++shared_count;upgrade=true;}bool can_lock_upgrade () const{return ! (exclusive || exclusive_waiting_blocked || upgrade);}void unlock_upgrade (){upgrade=false;--shared_count;}//private:unsigned shared_count;bool exclusive;bool upgrade;bool exclusive_waiting_blocked;};state_data state;
// boost::mutex state_change;bip::named_mutex state_change{bip::open_or_create,"mutex"};
// boost::condition_variable shared_cond;bip::named_condition shared_cond{bip::open_or_create,"read"};
// boost::condition_variable exclusive_cond;bip::named_condition exclusive_cond{bip::open_or_create,"write"};boost::condition_variable upgrade_cond;void release_waiters(){exclusive_cond.notify_one();shared_cond.notify_all();}public:BOOST_THREAD_NO_COPYABLE(shared_mutex)shared_mutex(){}~shared_mutex(){}void lock_shared(){
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONSboost::this_thread::disable_interruption do_not_disturb;
#endif
// boost::unique_lock<boost::mutex> lk(state_change);
// boost::unique_lock<bip::named_mutex>lk(state_change);boost::unique_lock<bip::named_mutex> lk(state_change);while(!state.can_lock_shared()){shared_cond.wait(lk);}state.lock_shared();}bool try_lock_shared(){
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);if(!state.can_lock_shared()){return false;}state.lock_shared();return true;}//#if defined BOOST_THREAD_USES_DATETIME
// bool timed_lock_shared(boost::system_time const& timeout)
// {
//#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
// boost::this_thread::disable_interruption do_not_disturb;
//#endifboost::unique_lock<boost::mutex> lk(state_change);
// boost::unique_lock<bip::named_mutex>lk(state_change);
//
// while(!state.can_lock_shared())
// {
// if(!shared_cond.timed_wait(lk,timeout))
// {
// return false;
// }
// }
// state.lock_shared();
// return true;
// }// template<typename TimeDuration>
// bool timed_lock_shared(TimeDuration const & relative_time)
// {
// return timed_lock_shared(boost::get_system_time()+relative_time);
// }
//#endif
#ifdef BOOST_THREAD_USES_CHRONOtemplate <class Rep, class Period>bool try_lock_shared_for(const boost::chrono::duration<Rep, Period>& rel_time){return try_lock_shared_until(boost::chrono::steady_clock::now() + rel_time);}template <class Clock, class Duration>bool try_lock_shared_until(const boost::chrono::time_point<Clock, Duration>& abs_time){
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONSboost::this_thread::disable_interruption do_not_disturb;
#endif
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);while(!state.can_lock_shared())//while(state.exclusive || state.exclusive_waiting_blocked){
// if(boost::cv_status::timeout==shared_cond.wait_until(lk,abs_time))
// {
// return false;
// }}state.lock_shared();return true;}
#endifvoid unlock_shared(){
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);state.assert_lock_shared();state.unlock_shared();if (! state.more_shared()){if (state.upgrade){// As there is a thread doing a unlock_upgrade_and_lock that is waiting for ! state.more_shared()// avoid other threads to lock, lock_upgrade or lock_shared, so only this thread is notified.state.upgrade=false;state.exclusive=true;//lk.unlock();upgrade_cond.notify_one();}else{state.exclusive_waiting_blocked=false;//lk.unlock();}release_waiters();}}void lock(){
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONSboost::this_thread::disable_interruption do_not_disturb;
#endif
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);while (state.shared_count || state.exclusive){state.exclusive_waiting_blocked=true;exclusive_cond.wait(lk);}state.exclusive=true;}#if defined BOOST_THREAD_USES_DATETIMEbool timed_lock(boost::system_time const& timeout){
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONSboost::this_thread::disable_interruption do_not_disturb;
#endif
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);while(state.shared_count || state.exclusive){state.exclusive_waiting_blocked=true;if(!exclusive_cond.timed_wait(lk,timeout)){if(state.shared_count || state.exclusive){state.exclusive_waiting_blocked=false;release_waiters();return false;}break;}}state.exclusive=true;return true;}template<typename TimeDuration>bool timed_lock(TimeDuration const & relative_time){return timed_lock(boost::get_system_time()+relative_time);}
#endif
#ifdef BOOST_THREAD_USES_CHRONOtemplate <class Rep, class Period>bool try_lock_for(const boost::chrono::duration<Rep, Period>& rel_time){return try_lock_until(boost::chrono::steady_clock::now() + rel_time);}template <class Clock, class Duration>bool try_lock_until(const boost::chrono::time_point<Clock, Duration>& abs_time){
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONSboost::this_thread::disable_interruption do_not_disturb;
#endif
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);while(state.shared_count || state.exclusive){state.exclusive_waiting_blocked=true;
// if(boost::cv_status::timeout == exclusive_cond.wait_until(lk,abs_time))
// {
// if(state.shared_count || state.exclusive)
// {
// state.exclusive_waiting_blocked=false;
// release_waiters();
// return false;
// }
// break;
// }}state.exclusive=true;return true;}
#endifbool try_lock(){
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);if(state.shared_count || state.exclusive){return false;}else{state.exclusive=true;return true;}}void unlock(){
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);state.assert_locked();state.exclusive=false;state.exclusive_waiting_blocked=false;state.assert_free();release_waiters();}void lock_upgrade(){
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONSboost::this_thread::disable_interruption do_not_disturb;
#endif
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);while(state.exclusive || state.exclusive_waiting_blocked || state.upgrade){shared_cond.wait(lk);}state.lock_shared();state.upgrade=true;}#if defined BOOST_THREAD_USES_DATETIMEbool timed_lock_upgrade(boost::system_time const& timeout){
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONSboost::this_thread::disable_interruption do_not_disturb;
#endif
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);while(state.exclusive || state.exclusive_waiting_blocked || state.upgrade){if(!shared_cond.timed_wait(lk,timeout)){if(state.exclusive || state.exclusive_waiting_blocked || state.upgrade){return false;}break;}}state.lock_shared();state.upgrade=true;return true;}template<typename TimeDuration>bool timed_lock_upgrade(TimeDuration const & relative_time){return timed_lock_upgrade(boost::get_system_time()+relative_time);}
#endif
#ifdef BOOST_THREAD_USES_CHRONOtemplate <class Rep, class Period>bool try_lock_upgrade_for(const boost::chrono::duration<Rep, Period>& rel_time){return try_lock_upgrade_until(boost::chrono::steady_clock::now() + rel_time);}template <class Clock, class Duration>bool try_lock_upgrade_until(const boost::chrono::time_point<Clock, Duration>& abs_time){
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONSboost::this_thread::disable_interruption do_not_disturb;
#endif
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);while(state.exclusive || state.exclusive_waiting_blocked || state.upgrade){
// if(boost::cv_status::timeout == shared_cond.wait_until(lk,abs_time))
// {
// if(state.exclusive || state.exclusive_waiting_blocked || state.upgrade)
// {
// return false;
// }
// break;
// }}state.lock_shared();state.upgrade=true;return true;}
#endifbool try_lock_upgrade(){
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);if(state.exclusive || state.exclusive_waiting_blocked || state.upgrade){return false;}else{state.lock_shared();state.upgrade=true;state.assert_lock_upgraded();return true;}}void unlock_upgrade(){
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);//state.upgrade=false;state.unlock_upgrade();if(! state.more_shared() ){state.exclusive_waiting_blocked=false;release_waiters();} else {shared_cond.notify_all();}}// Upgrade <-> Exclusivevoid unlock_upgrade_and_lock(){
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONSboost::this_thread::disable_interruption do_not_disturb;
#endif
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);state.assert_lock_upgraded();state.unlock_shared();
// while (state.more_shared())
// {
// upgrade_cond.wait(lk);
// }state.upgrade=false;state.exclusive=true;state.assert_locked();}void unlock_and_lock_upgrade(){
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);state.assert_locked();state.exclusive=false;state.upgrade=true;state.lock_shared();state.exclusive_waiting_blocked=false;state.assert_lock_upgraded();release_waiters();}bool try_unlock_upgrade_and_lock(){
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);state.assert_lock_upgraded();if( !state.exclusive&& !state.exclusive_waiting_blocked&& state.upgrade&& state.shared_count==1){state.shared_count=0;state.exclusive=true;state.upgrade=false;state.assert_locked();return true;}return false;}
#ifdef BOOST_THREAD_USES_CHRONOtemplate <class Rep, class Period>booltry_unlock_upgrade_and_lock_for(const boost::chrono::duration<Rep, Period>& rel_time){return try_unlock_upgrade_and_lock_until(boost::chrono::steady_clock::now() + rel_time);}template <class Clock, class Duration>booltry_unlock_upgrade_and_lock_until(const boost::chrono::time_point<Clock, Duration>& abs_time){
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONSboost::this_thread::disable_interruption do_not_disturb;
#endif
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);state.assert_lock_upgraded();if (state.shared_count != 1){
// for (;;)
// {
// boost::cv_status status = shared_cond.wait_until(lk,abs_time);
// if (state.shared_count == 1)
// break;
// if(status == boost::cv_status::timeout)
// return false;
// }}state.upgrade=false;state.exclusive=true;state.exclusive_waiting_blocked=false;state.shared_count=0;return true;}
#endif// Shared <-> Exclusivevoid unlock_and_lock_shared(){
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);state.assert_locked();state.exclusive=false;state.lock_shared();state.exclusive_waiting_blocked=false;release_waiters();}#ifdef BOOST_THREAD_PROVIDES_SHARED_MUTEX_UPWARDS_CONVERSIONSbool try_unlock_shared_and_lock(){boost::unique_lock<boost::mutex> lk(state_change);state.assert_lock_shared();if( !state.exclusive&& !state.exclusive_waiting_blocked&& !state.upgrade&& state.shared_count==1){state.shared_count=0;state.exclusive=true;return true;}return false;}
#ifdef BOOST_THREAD_USES_CHRONOtemplate <class Rep, class Period>booltry_unlock_shared_and_lock_for(const chrono::duration<Rep, Period>& rel_time){return try_unlock_shared_and_lock_until(chrono::steady_clock::now() + rel_time);}template <class Clock, class Duration>booltry_unlock_shared_and_lock_until(const chrono::time_point<Clock, Duration>& abs_time){
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONSboost::this_thread::disable_interruption do_not_disturb;
#endifboost::unique_lock<boost::mutex> lk(state_change);state.assert_lock_shared();if (state.shared_count != 1){for (;;){cv_status status = shared_cond.wait_until(lk,abs_time);if (state.shared_count == 1)break;if(status == cv_status::timeout)return false;}}state.upgrade=false;state.exclusive=true;state.exclusive_waiting_blocked=false;state.shared_count=0;return true;}
#endif
#endif// Shared <-> Upgradevoid unlock_upgrade_and_lock_shared(){
// boost::unique_lock<boost::mutex> lk(state_change);boost::unique_lock<bip::named_mutex>lk(state_change);state.assert_lock_upgraded();state.upgrade=false;state.exclusive_waiting_blocked=false;release_waiters();}#ifdef BOOST_THREAD_PROVIDES_SHARED_MUTEX_UPWARDS_CONVERSIONSbool try_unlock_shared_and_lock_upgrade(){boost::unique_lock<boost::mutex> lk(state_change);state.assert_lock_shared();if( !state.exclusive&& !state.exclusive_waiting_blocked&& !state.upgrade){state.upgrade=true;return true;}return false;}
#ifdef BOOST_THREAD_USES_CHRONOtemplate <class Rep, class Period>booltry_unlock_shared_and_lock_upgrade_for(const chrono::duration<Rep, Period>& rel_time){return try_unlock_shared_and_lock_upgrade_until(chrono::steady_clock::now() + rel_time);}template <class Clock, class Duration>booltry_unlock_shared_and_lock_upgrade_until(const chrono::time_point<Clock, Duration>& abs_time){
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONSboost::this_thread::disable_interruption do_not_disturb;
#endifboost::unique_lock<boost::mutex> lk(state_change);state.assert_lock_shared();if( state.exclusive|| state.exclusive_waiting_blocked|| state.upgrade){for (;;){cv_status status = exclusive_cond.wait_until(lk,abs_time);if( ! state.exclusive&& ! state.exclusive_waiting_blocked&& ! state.upgrade)break;if(status == cv_status::timeout)return false;}}state.upgrade=true;return true;}
#endif
#endif};typedef shared_mutex upgrade_mutex;
}#include <boost/config/abi_suffix.hpp>//#endif
测试代码
#include <boost/thread/thread.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/ref.hpp>#include "read_write_mutex.h"#include <string>
#include <mutex>chy::shared_mutex global_mutex;
int global_num = 10;//全局变量,写者改变全局变量,读者读全局变量
namespace bip = boost::interprocess;
//bip::named_mutex global_mutex(bip::open_or_create,"mtx");//读线程
void read_thread(std::string &name){
// boost::lock_guard<bip::named_mutex> lock(global_mutex);//读锁定
// bip::named_mutex global_mutex(bip::open_or_create,"mtx");
// global_mutex.lock();boost::shared_lock<chy::shared_mutex> lock{global_mutex};printf("线程%s抢占了资源,global_num = %d\n",name.c_str(),global_num);boost::this_thread::sleep(boost::posix_time::seconds(1));printf("线程%s释放了资源...\n",name.c_str());
// global_mutex.unlock();
}//写线程
void write_thread(std::string &name){
// std::lock_guard<bip::named_mutex> lock(global_mutex);//写锁定
// bip::named_mutex global_mutex(bip::open_or_create,"mtx");
// global_mutex.lock();boost::lock_guard<chy::shared_mutex> lock{global_mutex};global_num++;//写线程改变数据的数值printf("线程%s抢占了资源,global_num = %d\n",name.c_str(),global_num);boost::this_thread::sleep(boost::posix_time::seconds(1));printf("线程%s释放了资源...\n",name.c_str());
// global_mutex.unlock();
}int main(){std::string read_thread_r1 = "read_thread_r1";std::string read_thread_r2 = "read_thread_r2";std::string read_thread_r3 = "read_thread_r3";std::string read_thread_r4 = "read_thread_r4";std::string read_thread_r5 = "read_thread_r5";std::string write_thread_w1 = "write_thread_w1";std::string write_thread_w2 = "write_thread_w2";boost::thread_group tg;tg.create_thread(boost::bind(read_thread,boost::ref(read_thread_r1)));tg.create_thread(boost::bind(read_thread,boost::ref(read_thread_r2)));tg.create_thread(boost::bind(read_thread,boost::ref(read_thread_r3)));tg.create_thread(boost::bind(read_thread,boost::ref(read_thread_r4)));tg.create_thread(boost::bind(read_thread,boost::ref(read_thread_r5)));tg.create_thread(boost::bind(write_thread,boost::ref(write_thread_w1)));tg.create_thread(boost::bind(write_thread,boost::ref(write_thread_w2)));tg.join_all();return 0;
}