3.1 条件竞争
恶性条件竞争通常发生于完成对多于一个的数据块的修改。例如对一个双向链表的结点的修改。该节点有两个指针。
避免条件竞争的两种方式:
方式一:确保只有进行修改的线程才能看到不变量被破坏时的中间状态。从其他访问线程的角度来
看,修改不是已经完成了,就是还没开始。
方式二:对数据结构和不变量的设计进行修改,修改完的结构必须能完成一系列不可分
割的变化,也就是保证每个不变量保持稳定的状态,这就是所谓的无锁编程。
3.2 使用互斥量
不建议直接使用std::mutex,要保证在所有的函数出口调用unlock();C++标准库为互斥量提供了一个RAII语法的模板类 std::lock_guard ,在构造时就能提供已锁的互斥量,并在析构的时候进行解锁。
#include <list>
#include <mutex>
#include <algorithm>std::list<int> some_list; // 1
std::mutex some_mutex; // 2void add_to_list(int new_value)
{std::lock_guard<std::mutex> guard(some_mutex); // 3some_list.push_back(new_value);
}bool list_contains(int value_to_find)
{std::lock_guard<std::mutex> guard(some_mutex); // 4std::scoped_lock guard(some_mutex);return std::find(some_list.begin(), some_list.end(), value_to_find) != some_list.end();
}
3.2.1 互斥锁解决条件竞争
切勿将受保护数据的指针或引用传递到互斥锁作用域之外,无论是函数返回值,还是存储在外部可见内存,亦或是以参数的形式传递到用户提供的函数中去。下面这个例子是将以参数的形式传递到用户提供的函数中。
#include <list>
#include <mutex>
#include <algorithm>
#include <thread>
#include <numeric>
#include <vector>
#include <functional>class some_data
{int a;std::string b;
public:void do_something() {a += 1;}
};//这个类包装有一个数据,对这个数据进行保护
class data_wrapper
{
private:some_data data;//这个互斥锁是保护data数据的std::mutex m;public:template<typename Function>void process_data(Function func){std::lock_guard<std::mutex> l(m);func(data); // 1 传递“保护”数据给用户函数 当保护数据传递了出去,该保护数据失去了保护。}
};some_data* unprotected;void malicious_function(some_data& protected_data)
{unprotected = &protected_data;
}data_wrapper x;void foo()
{x.process_data(malicious_function); // 2 传递一个恶意函数unprotected->do_something(); // 3 在无保护的情况下访问保护数据
}
现在有一个问题是:即使使用了互斥量或其他机制保护了共享数据,就不必再为条件竞争所担忧吗?
答案是否定的。考虑双链表的例子,为了能让线程安全地删除一个节点,需要确保防止对这三个节点(待删除的节点及其前后相邻的节点)的并发访问。如果只对指向每个节点的指针进行访问保护,那就和没有使用互斥量一样,条件竞争仍会发生。看这个例子:
template<typename T, typename Container = std::deque<T>>
class stack
{
public:explicit stack(const Container&);bool empty() const;size_t size() const;T top();void push(T const&);void push(T&&);void pop();
};
即使stack的私有数据都在互斥锁的保护之下且没有将私有数据传递出去,虽然empty()和size()可能在返回时是正确的,但其它的结果是不可靠的;当它们返回后,其他线程就可以自由地访问栈,并且可能push()多个新元素到栈中,也可能pop()一些已在栈中的元素。这样的话,之前从empty()和size()得到的结果就有问题了。
stack<int> s;
if (! s.empty()){ // 1
int const value = s.top(); // 2
s.pop(); // 3
do_something(value);
}
在1和2执行中间,可能有来自另一个线程的pop()调用删除了最后一个元素,对一个空栈操作会导致未定义行为。
导致该问题的是接口设计问题。
std::stack 的设计人员将这个操作分为两部分:先获取顶部元素(top()),然后从栈中移
除(pop())的原因是:当将这两个操作合成一个的时候,如果我先将元素从栈中移除(这是一个节点,将指针指向修改一下),然后再复制该元素,当拷贝元素失败的时候,要弹出的数据消失了,但它的确从栈中移除了。
要解决上述提到的条件竞争有四种方式:
**方式一:**传入一个引用。
std::vector<int> result;//栈中的一个元素
some_stack.pop(result);
该方法的缺点是:需要临时构造一个节点的实例。构造函数可能需要参数,代码在这个阶段可能提供不了这个参数。还需要支持赋值操作。
方式二: 无异常抛出的拷贝构造函数或移动构造函数
将两个pop和top两个操作合二为一的时候可能出现拷贝元素失败的问题,如果该元素的拷贝构造函数和移动构造函数是无异常抛出就可以。
方式三: 返回元素的指针
这样就可以避免拷贝。
方式四: 1+2或者1+3
下面使用方式1和方式3写一个线程安全的堆栈类定义。
#include <list>
#include <mutex>
#include <algorithm>
#include <thread>
#include <numeric>
#include <vector>
#include <functional>
#include <deque>
#include <exception>
#include <memory> /*
* class empty_stack : std::exception
{
public:const char* what() const throw() {return "Empty stack exception: Cannot perform operation on empty stack";}
};
* 在C++11之前,throw()被用来指定函数不会抛出任何异常,如果函数抛出了异常,则会导致未定义行为。
在C++11中,应该使用noexcept关键字来代替throw()。在这种情况下,const char* what() const throw()声明了what()函数不会抛出异常。
这表示代码在执行what()函数时不会引发异常。这在异常处理中非常重要,因为如果在处理异常时又引发了新的异常,会导致程序终止。
*/
class empty_stack : std::exception
{
public:const char* what() const noexcept {return "Empty stack exception: Cannot perform operation on empty stack";}
};template<typename T>
class threadsafe_stack
{
private:std::stack<T> data;mutable std::mutex m;
public:threadsafe_stack(): data(std::stack<T>()) {}threadsafe_stack(const threadsafe_stack& other){std::lock_guard<std::mutex> lock(other.m);data = other.data; // 1 在构造函数体中的执行拷贝}threadsafe_stack& operator=(const threadsafe_stack&) = delete;void push(T new_value){std::lock_guard<std::mutex> lock(m);data.push(new_value);}//这两个pop操作的设计可以避免获取栈顶元素不对的问题。/** stack<int> s;if (! s.empty()){ // 1int const value = s.top(); // 2s.pop(); // 3do_something(value);}如果线程1执行完2后,线程3执行了2和3就会导致两个线程获取到了同一个值,但是pop了两次;使用下面的接口不会出现获取栈顶元素不对的问题。*/std::shared_ptr<T> pop(){std::lock_guard<std::mutex> lock(m);if (data.empty()) throw empty_stack(); // 在调用pop前,检查栈是否为空std::shared_ptr<T> const res(std::make_shared<T>(data.top())); // 在修改堆栈前,分配出返回值data.pop();return res;}void pop(T& value){std::lock_guard<std::mutex> lock(m);if (data.empty()) throw empty_stack();value = data.top();data.pop();}bool empty() const{std::lock_guard<std::mutex> lock(m);return data.empty();}
};int main()
{while (true);return 0;
}
线程不安全的stack对于pop接口的设计中,如果有下面的情况存在会出现问题。
stack<int> s;if (! s.empty()){ // 1int const value = s.top(); // 2s.pop(); // 3do_something(value);}
如果线程1执行完位置2后,线程2执行了位置2和位置3就会导致两个线程获取到了同一个值,但是pop了两次;使用线程安全的stack的接口不会出现获取栈顶元素不对的问题。
对于这句代码的理解:if (data.empty()) throw empty_stack();
旧的stack会导致未定义行为,而这个将会有异常抛出,用户可以自行决定该如何做。
3.2.2 死锁
存在两个锁或者两个以上的锁的时候,可能会出现死锁。死锁是两个线程互相等待,从而什么耶做不了。
避免死锁的一般建议,就是让两个互斥量总以相同的顺序上锁。
std::mutex是不可重入的互斥量,不能在同一线程中连续两次锁定。加几次锁必须解几次锁,不能多也不能少
举一个经典的死锁例子:
#include <iostream>
#include <thread>
#include <mutex>std::mutex m1;
std::mutex m2;void threadA()
{std::unique_lock<std::mutex> lock1(m1);std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 稍作延时,使线程B中的锁得到执行std::unique_lock<std::mutex> lock2(m2); // 线程A尝试获取锁m2
}void threadB()
{std::unique_lock<std::mutex> lock2(m2);std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 稍作延时,使线程A中的锁得到执行std::unique_lock<std::mutex> lock1(m1); // 线程B尝试获取锁m1
}int main()
{std::thread t1(threadA);std::thread t2(threadB);t1.join();t2.join();return 0;
}
3.2.2.1 使用std::lock同时加锁
#include <list>
#include <mutex>
#include <algorithm>
#include <thread>
#include <numeric>
#include <vector>
#include <functional>
#include <deque>
#include <exception>
#include <memory> // 这里的std::lock()需要包含<mutex>头文件
class some_big_object {};void swap(some_big_object& lhs, some_big_object& rhs) {}class X
{
private:some_big_object some_detail;std::mutex m;
public:X(some_big_object const& sd) :some_detail(sd) {}friend void swap(X& lhs, X& rhs){if (&lhs == &rhs) return;std::lock(lhs.m, rhs.m); // 1/*std::adopt_lock是一个标志,指示std::lock_guard对象构造函数不需要再次加锁互斥量。它假设互斥量已在当前线程上被提前锁定,std::lock_guard对象将负责互斥量的解锁*/std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);// 2std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);// 3swap(lhs.some_detail, rhs.some_detail);}
};
'if (&lhs == &rhs) return;
'这句代码是必须的。如果没有这句代码,可能导致同一个线程获取两次一个互斥量。
std::lock 要么将两个锁都锁住,要不一个都不锁。
C++17提供了 std::scoped_lock<> 一种新的RAII类型模板类型,与std::lock_guard<> 的功能等价。 std::scoped_lock能接受不定数量的互斥锁类型作为模板参数。
swap函数重写如下:
void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::scoped_lock guard(lhs.m,rhs.m); // 1
swap(lhs.some_detail,rhs.some_detail);
}
3.2.2.2 避免死锁的建议
建议一:避免嵌套锁
一个线程已获得一个锁时,再别去获取第二个。当你需要获取多个锁,使用一
个 std::lock 来做这件事,避免产生死锁。
建议二:避免在持有锁时调用用户提供的代码
建议三:使用固定顺序获取锁
建议四:使用锁的层次结构
3.2.2.3 adopt_lock和defer_lock两个参数
class X
{
private:some_big_object some_detail;std::mutex m;public:X(some_big_object const& sd) :some_detail(sd) {}friend void swap(X& lhs, X& rhs){if (&lhs == &rhs)return;std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);// 1std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock);// 1 std::defer_lock 留下未上锁的互斥量std::lock(lock_a, lock_b); // 2 互斥量在这里上锁swap(lhs.some_detail, rhs.some_detail);}friend void swap(X& lhs, X& rhs){if (&lhs == &rhs)return;std::lock(lhs.m, rhs.m); // 1std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);// 2std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);// 3swap(lhs.some_detail, rhs.some_detail);}
};
从这两个版本的swap函数中体会一下两个参数的区别;
3.3 仅在初始化过程中保护共享数据
3.3.1 分析几个代码
//用互斥实现线程安全的延迟初始化
std::shared_ptr<some_resource> resource_ptr;
void foo() {std::unique_lock<std::mutex> lk(resource_mutex);if(!resource_ptr){resource_ptr.reset(new some_resource);}lk.unlock();resource_ptr->do_something();
}
虽然安全,但是每次调用foo,线程都会等待获取锁,性能太低。
双重检查带来的问题:
void undefined_behaviour_with_double_checked_locking() {if (!resource_ptr) {//1std::lock_guard<std::mutex> lk(resource_mutex);if (!resource_ptr) {//2resource_ptr.reset(new some_resource);//3}}resource_ptr->do_something();//4
}
这段代码可能导致的未定义行为,分析如下:
指令重排问题:
- 第一步: 为对象分配一片内存
- 第二步: 构造一个对象,存入已分配的内存区
- 第三步: 将指针指向这片内存区
但是有时候第二步和第三步会乱序,这是编译器优化的结果。
考虑如下场景: 当解锁发生了,3只做了第三步的操作,第二步操作还未进行,此时这个地址是一个野指针。当另一个线程进行do_something时就会出现未定义行为。
使用std::call_once和std::once_flag解决这个问题。
代码如下:
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;void init_resource() {resource_ptr.reset(new some_resource);
}void foo() {std::call_once(resource_flag, init_resource);resource_ptr->do_something();
}
3.3.2 关于"C++11标准规定初始化只会在某一个线程上单独发生,在初始化完成之前,其他线程不会越过静态数据的声明而继续运行"的理解
C++11标准确实规定了在静态数据成员初始化之前,其他线程不能越过其声明继续运行。这个规定适用于静态数据成员和局部静态变量。
具体来说,C++11规定了在多线程环境下,静态变量(包括静态数据成员和局部静态变量)的初始化是线程安全的。每个线程只会执行一次静态变量的初始化,而其他线程在初始化完成之前会被阻塞。
例如,对于如下代码:
void foo() {static int x = 1;// ...
}
在多线程环境下,每个线程第一次调用foo()
时,静态变量x
的初始化会被分配给一个线程,其作用域仅限于那个线程。其他线程在初始化完成之前无法访问x
,而只有在初始化完成后才能访问。
需要注意的是,对于普通的局部变量,C++标准没有规定其初始化的线程安全性。但是,大部分编译器会根据编译器的设定和优化策略来管理局部变量的初始化过程。在多线程环境下,如果确保局部变量的初始化线程安全性是必要的话,可以使用std::call_once
或其他线程同步机制来保证。
3.3.3 基于std::call_once和std::once_flag实现一个单例模式
class Singleton {
private:static Singleton* instance;static std::once_flag initFlag;Singleton() {}public:static Singleton* getInstance() {std::call_once(initFlag, []() {instance = new Singleton();});return instance;}
};
3.4 读写锁
对于这样一种场景, 数据更新很少,但也是存在的,即一般都是读取,但有时也会写;
共享锁即读锁: std::shared_lock <std::shared_mutex>;
排他锁即写锁: 对应std::lock_guard<std::shared_mutex>和std::unique_lock<std::shared_mutex>
class dns_cache {std::map<std::string, dns_entry> entries;//mutablemutable std::shared_mutex entry_mutex;public:dns_entry find_entry(std::string const& domain) const {std::shared_lock<std::shared_mutex> lk(entry_mutex);std::map<std::string, dns_entry>::const_iterator const it =entries.find(domain);return (it == entries.end()) ? dns_entry() : it->second;}void update_or_add_entry(std::string const& domain, dns_entry const& dns_details) {std::lock_guard<std::shared_mutex> lk(entry_mutex);entries[domain] = dns_details;}
};