并发编程(19)——引用计数型无锁栈

文章目录

  • 十九、day19
  • 1. 引用计数
  • 2. 代码实现
    • 2.1 单引用计数器无锁栈
    • 2.2 双引用计数器无锁栈
  • 3. 本节的一些理解

十九、day19

上一节我们学习通过侯删链表以及风险指针与侯删链表的组合两种方式实现了并发无锁栈,但是这两种方式有以下缺点:

第一种方式:如果 pop 操作频繁被多个线程调用,待删除的节点将不断累积,导致待删除列表无法及时回收,进而占用大量内存。此时我们需要一种机制,每次调用 pop 函数结束后都需要判断侯删链表中的节点是否可以被 delete,即风险指针。

第二种方式

  1. 如何正确且高效地分配这块内存是一个难点。

  2. 为了删除一个节点,需要遍历风险指针数组,检查该节点是否被某个风险指针引用。同时,在处理待删除节点时,也需要从风险数组中选取合适的节点记录其地址,这进一步增加了开销。

  3. 而且因为该方法受专利保护,所以在一些场合可能无法使用

在这一节中,我们学习如何通过引用计数来实现无锁栈以及安全回收机制。

参考:

恋恋风辰官方博客

《c++并发编程》中无锁栈的实现为什么要用双引用计数器_无锁栈 双重引用技术-CSDN博客

C++实现“无锁”双buffer - 知乎


1. 引用计数

本节的引用计数通过两个计数器实现:一个是存储在节点内部的 内部计数器,另一个是与指针绑定在一起的 外部计数器。外部计数器和指针被封装在一个结构体 counted_node_ptr 中,因此栈顶指针的类型从原生指针(void⭐、int⭐、char⭐、node⭐…)修改为 counted_node_ptr

struct counted_node_ptr {int external_count; // 外部计数器node* ptr;          // 原始指针,指向节点
};

具体机制如下:

  • 当线程加载 head 指针时,外部计数器会加 1,表示有新的线程在引用这个节点。
  • 当线程不再引用某个节点时,内部计数器会减 1。

通过将 内部计数器外部计数器 相加,就可以得到该节点的实际引用计数。当引用计数归零时,说明没有线程再访问这个节点,此时可以安全地删除它。这种设计确保了节点的引用计数在并发环境下能够准确管理,同时避免了资源泄露的问题。数据结构如下图所示

在这里插入图片描述

图片来源:https://blog.csdn.net/ld_long/article/details/137692475

当线程尝试从栈中弹出数据时,会按照以下步骤操作:

  1. 读取栈顶指针
    线程首先读取 head 指针(栈顶指针),将其值存入线程的局部变量 local_head 中,同时将 head 指针的外部计数器加 1。这个过程是原子的,因此能够确保线程安全。
  2. 同步计数器
    local_head 的外部计数器更新为与 head 的外部计数器相同。随后,线程可以通过 local_head 访问节点。
  3. 判断能否弹出节点
    • 如果在 local_head 载入后,发现 local_head 的指针值与当前 head 不一致,说明有其他线程在此期间修改了 head。此时,当前线程无法弹出节点,因为它载入的 local_head 已作废。
    • 线程会将节点的内部计数器减 1(因为当前线程不再指向该节点)。如果内部计数器变为 0(说明没有其他线程再引用这个节点),当前线程负责删除节点;否则,什么也不做。
  4. 成功弹出节点
    • 如果 local_head 的指针值仍与 head 相同,说明没有其他线程修改 head,当前线程可以成功弹出节点。
    • 线程将 head 指针更新为当前节点的下一个节点(即 head.ptr->next),这个更新操作是原子的。
  5. 更新计数器
    • 弹出节点后,将 local_head 的外部计数器减 1,因为 head 指针已经不再指向这个节点。
    • 再次将 local_head 的外部计数器减 1,因为 local_head 也准备不再指向这个节点(实际上,可以一次性减 2,这里分开解释是为了说明两个来源)。
    • 然后,将 local_head 的外部计数器值加到节点的内部计数器上。
  6. 判断节点是否需要删除
    • 如果内部计数器因此变为 0(即它原本是外部计数器的相反数),说明没有线程再引用该节点,当前线程负责删除节点。
    • 如果内部计数器不为 0,则无需删除节点。

通过这样的流程,可以确保节点在无人引用时才被安全回收,同时减少不必要的内存操作,提升了效率和线程安全性。

这段文字光看很难看懂,我们通过代码进行解释:

2. 代码实现

2.1 单引用计数器无锁栈

在此之前,我们需要知道:该方式必须通过两个计数器才能实现,如果只用一个,会有一些问题。我们先通过只用一个计数器实现引用计数,进而说明为什么只能使用两个计数器实现。

template<typename T>
class single_ref_stack {
public:single_ref_stack():head(nullptr) {}~single_ref_stack() {//循环出栈while (pop());}void push(T const& data);std::shared_ptr<T> pop();   
private:struct ref_node {//1 数据域智能指针std::shared_ptr<T>  _data;//2 引用计数std::atomic<int> _ref_count;//3  下一个节点ref_node* _next;ref_node(T const& data_) : _data(std::make_shared<T>(data_)),_ref_count(1), _next(nullptr) {}};//头部节点std::atomic<ref_node*> head;
};

上段代码是一个单引用的无锁栈实现,内容很简单不多于解释,我们只需要注意一点:计数器引用计数必须要使用原子类型,为了保证多线程修改下仍是线程安全的。

接下来实现 pop() 和 push():

void push(T const& data) {auto new_node = new ref_node(data);new_node->next = head.load();while (!head.compare_exchange_weak(new_node->next, new_node));
}

push() 函数很简单,就是将数据插入至栈链表的表头,并且调用原子操作 compare_exchange_weakhead 修改为 new_node,即将 head 指针指向新插入的表头节点 new_node

std::shared_ptr<T> pop() {ref_node* old_head = head.load();for (;;) {if (!old_head) { // 空栈检查return std::shared_ptr<T>();}//1 只要执行pop就对引用计数+1++(old_head->_ref_count);//2 比较head和old_head想等则交换否则说明head已经被其他线程更新if (head.compare_exchange_strong(old_head, old_head->_next)) {auto cur_count = old_head->_ref_count.load();auto new_count;//3  循环重试保证引用计数安全更新do {//4 减去本线程增加的1次和初始的1次new_count = cur_count - 2;} while (!old_head->_ref_count.compare_exchange_weak(cur_count, new_count));//返回头部数据std::shared_ptr<T> res;//5  交换数据res.swap(old_head->_data);//6if (old_head->_ref_count == 0) {delete old_head;}return res;}else {//7 if (old_head->_ref_count.fetch_sub(1) == 1) {delete old_head;}}}
}

该函数仅有两种返回值:空栈返回一个空的智能指针;非空栈返回一个指向弹出数据的智能指针。执行流程如下:

  1. 进行空栈检查,如果是空栈,直接返回空的 std::shared_ptr<T>(),否则继续执行
  2. 因为节点结构体的成员增加了一个原子类型的引用计数,这里增加当前线程对节点(old_head)的引用计数(因为计数器是 std::atomic<int> 类型的整型原子变量,所以 ++ 是原子操作),避免该节点(old_head)在操作过程中被其他线程删除
  3. 使用 CAS(Compare-And-Swap) 操作尝试将 headold_head 更新为 old_head->_next。如果成功说明当前线程成功弹出头节点,否则说明其他线程在此期间修改了 head,需要重新尝试。
  4. 如果判断成功,则 head 被修改为 old_head->_next。按照上一节的做法,这里需要将 old_head 加入至侯删链表,但是在本节中我们需要在这里更新引用计数
    1. 首先,读取 old_head 的引用计数(我们在 pop 函数最开始将其 +1,保证其他线程不会删除它),并将其赋值给 cur_count
    2. 然后将引用计数减去 2(pop 函数一开始加了1,每个节点初始化的时候默认构造函数将引用计数设置为 1,一共增加了两次 1)
    3. 使用 CAS(Weak) 重试,保证引用计数能被安全更新到 new_count
    4. 引用计数修改成功之后,将弹出节点 old_head 的数据存储至 std::shared_ptr<T> 类型的智能指针中,并判断 old_head 的引用计数是否为 0,如果为 0 则直接将 old_head 删除,否则保留
    5. 最后返回存有 old_head->_data 的智能指针 res
  5. 如果判断失败,说明其他线程在此期间修改了 head。判断失败的线程可以理解为抢占失败的线程,因为 pop 函数的一开始会将栈表头节点的引用计数 +1,所以如果抢占失败,需要在 pop 函数结束前将引用计数 -1。如果引用 -1 之后的值为 1,则表示其他对该节点 pop 的线程已经结束,当前线程是最后引用此指针的线程,我们需要在该线程中将 old_head 删除。

但是该函数存在隐含的风险

  • 风险1:假设线程 1 和线程 2 同时调用pop函数,并已经读取了head 的值,现在准备将 old_head 的引用计数 +1,但线程 2 率先完成了pop函数操作,将引用计数 _ref_count 更新为 0 并删除了 old_head。此时,线程 1 恰好继续执行第 1 步代码,却尝试访问已经被删除的 old_head,因此导致程序崩溃。
  • 风险2:假设线程 1 和线程 2 同时调用 pop函数,并先后将 old_head 的引用计数 +1(此时 old_head 的引用计数总计为 3),假如线程 1 率先通过 if 的判断,将 head 成功更新为 old_head->next;而线程 2 会因为 CAS 会读取到 head 最新修改的值,即 old_head->next,因为 head 最新的值和 old_head 不同,所以会将 old_head 修改为 old_head->next,线程 2 走进 else 分支之后判断的 old_head->ref_count 其实是 old_head 修改后的引用计数,即 old_head->next->ref_count,为1(只有初始化的时候才 +1,其他线程没有对它的引用计数进行操作),从而会将其删除。其他线程如果要弹出 old_head->next 的时候会发现节点已经不存在,报错。流程如下图所示:

在这里插入图片描述

  • 假设线程 1 和线程 2 同时调用 pop函数,并先后将 old_headnode1)的引用计数 +1(此时 old_head 的引用计数总计为 3),如上图中的流程1
  • 假如线程 1 率先通过 if 的判断,将 head 成功更新为 old_head->next,即node2,如上图的流程2
  • 线程 2 会因为 CAS 会读取到 head 最新修改的值,即 old_head->nextnode2),因为 head 最新的值和 old_head 不同,所以会将 old_head 修改为和 head 相同的值 old_head->next,如上图的流程3
  • 因为线程 2 没通过 if 的判断条件,从而走进了 else 分支,else分支中会执行 old_head->_ref_count.fetch_sub(1) == 1操作,此时 old_head 指向节点的引用计数是 1。注意,fetch_sub_ref_count 减 1 后返回的是减之前的值,即1,而不是减之后的值 0,所以这里虽然将 _ref_count 从 1 减为 0 ,但是 if 条件仍然会判断成功,从而将 old_head 删除
  • 假如线程 3 在线程 2 执行完之后调用了 pop 函数想要将 node2 弹出,按正常流程来走的话,会读取 head 指向的节点,即 node2,但因为 node2 已经被线程 2删除,所以这里在读取 head.load() 的时候会报错,此时 head 指向空指针 nullptr,如下图所示:

在这里插入图片描述

解决方法就是将引用计数与指针解耦,将引用计数从结构体提出来,不与指针耦合

将原本数据结构体 ref_node 分解为 ref_nodenode 两个结构体,前者用于存储引用计数,后者用于存储数据。

struct node {//1 数据域智能指针std::shared_ptr<T>  _data;//2  下一个节点ref_node _next;node(T const& data_) : _data(std::make_shared<T>(data_)) {}
};
struct ref_node {// 引用计数std::atomic<int> _ref_count;node* _node_ptr;ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}ref_node():_node_ptr(nullptr),_ref_count(0){}
};

如果将 head 存储为指针类型而不是副本类型

std::atomic<ref_node*> head; // 头节点

注意:这里的头节点存储的类型是 ref_node* 而不是 node*

那么 pop() 函数需修改如下:

std::shared_ptr<T> pop() {//0 处ref_node* old_head = head.load();for (;;) {//1 只要执行pop就对引用计数+1并更新到head中ref_node* new_head;do {new_head = old_head;new_head->_ref_count += 1; // 7} while (!head.compare_exchange_weak(old_head, new_head));//4 确保 head == old_head == new_head old_head = new_head;auto* node_ptr = old_head->_node_ptr; // 获取当前节点的 node(存储数据的结构体)if (node_ptr == nullptr) { // 判断空栈return  std::shared_ptr<T>();}//2 比较head和old_head想等则交换否则说明head已经被其他线程更新if (head.compare_exchange_strong(old_head, node_ptr->_next)) {// 要返回的值std::shared_ptr<T> res;//5 交换智能指针res.swap(node_ptr->_data);//6 增加的数量int increase_count = old_head->_ref_count.fetch_sub(2);//3 判断仅有当前线程持有指针则删除if (increase_count == 2) {delete node_ptr;}return res;}else {if (old_head->_ref_count.fetch_sub(1) == 1) {delete node_ptr;}}}
}
  • 第一个 while 是为了在多线程下安全地更新头节点的引用计数
  • 因为 headold_headnew_head 都是 ref_node 类型的结构体,这里必须使用 auto* node_ptr = old_head->node_ptr 获取和当前引用计数匹配的数据节点,使用该节点判断是否存在内容;而 head 的下一个节点并不是 head ->node_ptr,而是 head ->node_ptr->next
  • 第二个 while 是为了更新 head 的值,将表头节点从链表中删除(从链表中删除而不是从内存中删除)。
  • 如果 head 更新成功,则将 old_head ->node_ptr 弹出,而不是将 old_head 弹出,old_head 表示的仅仅只是引用计数而不是数据。
  • int increase_count = old_head->_ref_count.fetch_sub(2)是为了获取引用计数增加的数量(ref_count.fetch_sub(2) 会将 ref_count 的值减去 2 并返回 _ref_count 的旧值,而不是减去 2 后的新值),如果确实只增加了 2,那么表示当前线程是弹出节点的唯一持有者,可以直接删除。

如果我们将 head存储为指针类型而不是副本类型,那么上面的代码有以下风险:

  • 风险1:和上面引用计数和数据耦合为一个结构体时的风险2相同,会造成误删:

    假设线程 1 比线程 2 先执行,线程 1 在步骤 2 进行比较并交换操作(CAS)后,会将 head 更新为新的值。而此时线程 2 也尝试执行同样的 CAS 操作,但由于 head 已被线程 1 修改,线程 2 的 CAS 操作会失败。

    此时,线程 2 会进入 else 分支进行处理,并更新 old_head 为当前的 head 值。然而,此时线程 2 中的 old_head 的引用计数为 1,而 old_head 指向的数据实际上是新的 head 所指向的数据节点。如果线程 2 随后误将 old_head 指向的数据节点删除,就会导致问题。因为线程 2 删除的实际上是当前 head 所指向的节点,而它并未真正弹出节点或修改 head 的值。这会导致其他线程在尝试弹出栈顶元素时访问到已被删除的节点,从而引发程序崩溃。

  • 风险2

    假设线程 1 和线程 2 都执行完了第 0 步的代码,此时它们读取到的 old_head 值是相同的。线程 1 抢先一步执行,完成了第 5 步操作,并准备执行第 3 步判断逻辑时,线程 2 抢占了 CPU 开始执行第 7 步的代码。

    虽然线程 2 的 while 循环会检测到 old_head 和当前 head 不同,并因此进行重试,更新了 old_head,但在 do 循环的第一次迭代中,线程 2 的 old_head 和线程 1 的 old_head 仍然指向同一个节点。此时,线程 2 修改了 old_head 的引用计数。这会导致线程 1 在执行第 3 步时,引用计数不满足删除条件(因为线程 2 将old_head的计数加一,因为head、old_head、new_head的类型都是指针类型,指向的是同一块内存(指针),所以会反映到线程1中。将此时为3,线程 1 加一,线程 2 加一,初始化加一),因此不会进入 if 逻辑删除节点。

    与此同时,因为线程2在2处之后while会不断重试,线程2的head已经和old_head指向不同了,导致线程2也不会回收old_head内部节点指向的数据。最终,old_head 节点中的数据既没有被线程 1 删除,也没有被线程 2 回收,导致内存泄漏。

    问题根源在于,多个线程对 old_head 的引用计数存在竞争,但删除逻辑对引用计数的操作未能同步,导致某些节点被遗漏回收。

  • 一种和风险2相似但是正常的情况

    假设线程 1 和线程 2 都执行完了第 0 步的代码,此时两者读取的 old_head 值是相同的。接下来,线程 1 比线程 2 先获得 CPU 时间片并继续执行,而线程 2 因未抢占到 CPU 时间片停在了第 1 步。

    线程 1 按照顺序继续执行,最终到达第 3 步并将 node_ptr 所指向的节点删除。同时,head 已更新为新的栈顶元素,也就是 old_head 的下一个节点。

    这时,线程 2 抢占到了 CPU 时间片,继续执行第 1 步的代码,并在第一个 while 循环中因为 CAS 失败将 old_head 更新为当前 head 的值。此时,因为 new_head 是指针,所以对 new_head 的修改能反映到old_head 本身,old_head的引用计数也增加了 1,变为 2,但它实际指向的是下一个节点(即新的栈顶)。

    在这种情况下,线程 2 仍会进入 if 条件(不是进入 else 分支),对新的 old_head 节点执行删除操作。由于此时的 old_head 和引用计数逻辑匹配,这种情况是正常的,不会引发错误。

我们总结如下:

  1. 在实现引用计数内存回收机制时,应尽量避免直接存储指针,因为存储指针会带来多个线程同时操作同一块内存的风险。我们这里需要将 head 的类型从 std::atomic<ref_node*> 修改为 std::atomic<ref_node>,使用副本类型而不是指针类型。

  2. 引入一个专门用于记录减少引用计数的计数器。为了确保在多个线程中修改该计数器的安全性,可以将其设计为 node 类的成员变量。由于 node 类的实例是通过指针存储在栈的节点中的,多个线程能够安全地访问和修改它。

通过将引用计数分为“增加”和“减少”两部分,并将减少的计数器放入 node 类中,我们能够更清晰地追踪各线程对节点的引用情况,从而避免因引用计数不准确导致的内存泄漏或竞争问题。即双引用计数器版本。

  1. 一个节点是否可以被回收,取决于其整体引用计数(增加的引用计数加减少的引用计数)是否为 0。只有当引用计数为 0 时,节点才可以被安全地删除。

2.2 双引用计数器无锁栈

有两种双引用计数器无锁栈的实现方式,一种是博主恋恋风辰的实现方式:将增加计数器放在 ref_node 结构体中,将减少计数器放在 node 结构体中;另一种是C++并发编程书中的实现方式,使用了内存序进行了优化,具体实现相同,只不过后者将一些功能分离成了单独的函数,并通过内存序增加了效率。后者我们在下一节学习,本节只学习第一种。

#pragma once
#include <atomic>template<typename T>
class single_ref_stack {
public:single_ref_stack(){}~single_ref_stack() {//循环出栈while (pop());}void push(T const& data) {auto new_node =  ref_node(data);new_node._node_ptr->_next = head.load();while (!head.compare_exchange_weak(new_node._node_ptr->_next, new_node));}std::shared_ptr<T> pop() {ref_node old_head = head.load();for (;;) {//1 只要执行pop就对引用计数+1并更新到head中ref_node new_head;do {new_head = old_head;new_head._ref_count += 1;} while (!head.compare_exchange_weak(old_head, new_head));old_head = new_head;auto* node_ptr = old_head._node_ptr;if (node_ptr == nullptr) {return  std::shared_ptr<T>();}//2 比较head和old_head想等则交换否则说明head已经被其他线程更新if (head.compare_exchange_strong(old_head, node_ptr->_next)) {//要返回的值std::shared_ptr<T> res;//交换智能指针res.swap(node_ptr->_data);//增加的数量int increase_count = old_head._ref_count - 2;if (node_ptr->_dec_count.fetch_add(increase_count) == -increase_count) {delete node_ptr;}return res;}else {if (node_ptr->_dec_count.fetch_sub(1) == 1) {delete node_ptr;}}}}private:struct ref_node;struct node {//1 数据域智能指针std::shared_ptr<T>  _data;//2  下一个节点ref_node _next;node(T const& data_) : _data(std::make_shared<T>(data_)) {}// 减少引用计数器std::atomic<int>  _dec_count;};struct ref_node {// 增加引用计数int  _ref_count;node* _node_ptr;ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}ref_node():_node_ptr(nullptr),_ref_count(0){}};//头部节点std::atomic<ref_node> head;
};

针对单引用计数器提出的结论,我们做如下修改:

  1. 增加一个减少计数器,用于记录引用计数减少的次数,存储至 node 结构体中。因为 head 存储的是 std::atomic<ref_node> 类型,所以 _ref_count 的类型是 int 而不用是 std::atomic<int>ref_node 的数据只需每个线程自己看到就行,不用分享给其他线程。

    因为 _ref_count 的类型是 int,所以其他线程没办法看到,为了让其他线程看到 ref_count 的值,我们在 pop 内部将 _ref_count - 2 的值加到了 _dec_count 上面,dec_count原子变量,其他线程可以看到。而且我们通过CAS可以修改head的值,因为head是原子类型,所以head的引用计数也会被其他线程看到。一共有两种方式。

    struct ref_node;struct node {//1 数据域智能指针std::shared_ptr<T>  _data;//2  下一个节点ref_node _next;node(T const& data_) : _data(std::make_shared<T>(data_)) {}//减少的数量std::atomic<int>  _dec_count;};struct ref_node {// 引用计数int  _ref_count;node* _node_ptr;ref_node( T const & data_):_node_ptr(new node(data_)), _ref_count(1){}ref_node():_node_ptr(nullptr),_ref_count(0){}};
  1. head 的类型从 std::atomic<ref_node*> 修改为 std::atomic<ref_node>
std::atomic<ref_node> head;

push() 函数的实现如下:

void push(T const& data) {auto new_node =  ref_node(data);new_node._node_ptr->_next = head.load();while (!head.compare_exchange_weak(new_node._node_ptr->_next, new_node));
}

pop() 函数的实现如下:

std::shared_ptr<T> pop() {ref_node old_head = head.load();for (;;) {//1 只要执行pop就对引用计数+1并更新到head中ref_node new_head;//2do {new_head = old_head;new_head._ref_count += 1;} while (!head.compare_exchange_weak(old_head, new_head));old_head = new_head;//3auto* node_ptr = old_head._node_ptr;if (node_ptr == nullptr) {return  std::shared_ptr<T>();}//4 比较head和old_head相等则交换否则说明head已经被其他线程更新if (head.compare_exchange_strong(old_head, node_ptr->_next)) {//要返回的值std::shared_ptr<T> res;//交换智能指针res.swap(node_ptr->_data);//5  增加的数量int increase_count = old_head._ref_count - 2;//6  if (node_ptr->_dec_count.fetch_add(increase_count) == -increase_count) {delete node_ptr;}return res;}else {//7if (node_ptr->_dec_count.fetch_sub(1) == 1) {delete node_ptr;}}}
}
  1. 当多个线程并发调用 pop 时,可能会有线程在代码的第 2 处不断重试。这种重试的原因是 headold_head 之间的某些字段(如引用计数或节点node地址)可能已经被其他线程更新。但无论如何,head 使用的是副本存储(std::atomic<ref_node>),即使重试增加了引用计数,也不会影响到其他线程,因为每个线程操作的都是自己的 old_head 副本
  2. 在代码的第 3 处,我们从 old_head 中取出 node_ptr,用来保存指向节点的地址。这样,后续操作中可以安全地减少该节点的引用计数(比如 6 和 7 处的 node_ptr->_dec_count.fetch_add(x))。因为多个线程可能同时操作同一个 node_ptr 指向的节点,所以引用计数使用了原子变量,以确保多个线程对其修改时相互可见且线程安全。
  3. 在第 4 处,我们通过比较交换(compare_exchange_strong)操作来判断 old_headhead 是否一致。因为 head 、new_head、old_head 存储的是副本类型,所以多个线程看到的 old_head 的值可能不一样(因为每个线程的 old_head 都是独立的副本,互不干涉),但我们能保证仅有一个线程进入if逻辑,进入的线程就是old_head和head匹配的那个。 如果一致,说明当前线程成功抢占操作权,进入 if 逻辑;如果不一致,说明 head 已被其他线程更新,当前线程无法进入 if,需要进行后续的 else 逻辑处理。值得注意的是,即使多个线程同时看到不同的 old_head,也只有一个线程能进入 if 条件,其它线程都会失败。
  4. 在第 5 处,我们已经确定当前线程需要处理的 node_ptr 节点。此时,我们定义了 res,用来保存弹出的数据。在调整引用计数时,先减去 2,其中 1 表示当前线程自身的操作,另 1 表示 初始化的 1。接着计算其他并发线程操作该节点的数量,存储在 increase_count 中。最终,是否删除节点取决于增加和减少的引用计数之和是否为 0。
  5. 在第 6 处,我们使用 fetch_add 操作来调整 _dec_count,并返回操作前的值。如果返回值为负的 increase_count,说明当前线程是唯一操作该节点的线程(因为没有其他线程增加引用计数),此时可以安全删除该节点。如果返回值不是负的 increase_count,则说明还有其他线程正在操作此节点,不能删除。

示例

假设有两个线程 t1t2 执行 pop() 函数,假如线程 t1 先于线程 t2 执行 2 处代码:

  • head 的类型是std::atomic<ref_node>,而 new_headold_head 的类型是 ref_node,所以在线程 t1t2 中的 old_headnew_head 互相独立互不影响。
  • 线程 t1 将属于自身线程的 old_headnew_head 副本的引用计数 +1,并将 head 中的数据修改为和 old_head 相同的数据内容,head 的增计数器 _ref_count 值此时为 2。
  • 当线程 t1 执行完 2 处代码后,线程 t2 开始执行,此时因为 CAS 机制使得线程 t2 专属的 old_head 中的增引用计数器被修改为和 head 相同的内容,即 2。因为 head 的原子类型,所以线程 t1head 的修改会被线程 t2 看到,此时线程 t2 看到的 head 的增引用计数器的值为 2,在线程 t2 第二次 CAS 重试之后,会将 old_head 的增计数器 +1,从 2 增到 3。(所以虽然 _ref_count 的类型是 int,但是通过这一种方式可以间接的让其他线程看到)。

假如线程 t1 先于线程 t2 执行 4 处代码:

  • 因为 head.load() == old_head,所以线程 t1 会进入 if 分支并将 head 更新到 node_ptr->_next

  • increase_count 表示除本线程对目标节点的引用外,其他线程对该目标节点的引用数量,这里 -2 是将线程 t1 对目标节点的引用减去,再将初始化时对引用的初始化减一,一共减去二。increase_count 此时应该为 1,表示除了线程 t1 在引用目标节点外,还有线程 t2 在引用目标节点(除了将 head 更新这种方式,也可以通过这种方式让其他线程看到引用计数的修改increase_count 表示其他线程对目标节点的引用数量)

线程 t1 在执行 6 处代码之前,一共有两种可能:

  • 第一种可能:

    线程 t2 还未执行 7 处代码,此时 _dec_count 的默认初始值为 0,线程 t1_dec_countincrease_count ,即加 1,并返回 _dec_count 原本的值 0;此时不满足 if 的判断,即 0 != -1,所以不会删除数据节点。

    线程 t2 在线程 t1 执行完 6 处代码之后执行 7 处代码,此时 _dec_count 被线程 t1 增加到了 1,然后在 if 判断中对 _dec_count - 1 并返回 _dec_count 初值,即 1;因为 1 == 1,表示当前仅有线程 t2 引用目标节点,可以直接删除。

  • 第二种可能:

    线程 t2 已经执行了 7 处代码,此时 _dec_count 的默认初始值为 0,线程 t2_dec_count 减 1 ,并返回 0;因为 0 != 1,所以线程 t2 不会将数据内容删除。

    线程 t1 在线程 t2 执行完 7 处代码后执行 6 处代码,此时 _dec_count 的值被线程 t2 修改为 -1,然后在 if 判断中将 _dec_countincrease_count,并返回 -1;因为 -1 == -1,此时仅有线程 t1 引用了目标节点,线程 t1 负责将节点删除。

headstd::atomic<ref_node> 类型,old_headnew_headref_node 类型。head 的修改能及时反映到不同线程上(在局一致性内存序下)

3. 本节的一些理解

为什么需要两个计数器,一个是否足够?

如果只用一个计数器,我们首先要考虑把它放在哪里。直觉上,可能会想到将计数器放在节点内部,但这样是行不通的。因为计数器的目的是避免空悬指针解引用(访问已删除的节点),但如果计数器在节点内部,访问计数器本身就需要先访问节点。如果节点已经被删除,这就会导致问题(所以我们将引用计数与指针解耦,将引用计数从结构体提出来,不与指针耦合,将一个结构体拆解成了两个)。

举个场景:线程 A 读取了 head 指针,并存到 local_head 中。这时线程 A 还没来得及通过 local_head 访问节点内部的计数器并增加计数,而线程 B 率先操作了节点的计数器,加了 1,然后弹出了节点,并在操作完成后让计数器减 1,发现计数变成了 0,于是删除了节点。此时,线程 A 再试图通过 local_head 访问节点时,节点已经被删除了,出现了空悬指针解引用。

因此,如果只使用一个计数器,它就不能放在节点内部,而需要放在节点外部。计数器需要和 head 指针处于同一“层级”(即数据结构体node和ref_node相互关联,我们通过ref_node->node_ptr即可访问数据内容,通过node->next即可访问下一个节点的计数器),成为所有线程都能直接操作的数据。

为了实现这一点,我们可以把计数器和 head 指针合二为一。这样,读取 head 指针和增加计数器的操作可以原子地进行,避免在这两步之间出现时间差(如果计数器和 head 是两个独立变量,操作之间无法保证原子性)。理想情况下,每当有一个指针指向 head 节点,计数器加 1;当指针不再指向该节点时,计数器减 1。但这种设计在实现时会遇到复杂问题。使用一个计数器无法实现,我们需要使用两个计数器来实现。

为什么一个计数器无法实现?

设想以下过程:

  1. 线程 A 加载 headnew_head(类型为 ref_node),如果 head 没有变化,就增加 new_head 的计数器,并更新 head 的计数器。最后将 new_head 的计数器赋值给 old_head,此时,三者的计数器都变为 2。注意,这些操作发生在线程 A 访问节点之前,因此确保节点不会被删除。
  2. 在线程 A 还没来得及访问节点时,线程 B 也加载了 head 到它的 local_head。线程 B 增加了计数器,此时计数器值变为 3(表明两个线程正在引用该节点,在此之前线程B会经历一次CAS,因为head已经被线程A修改,所以线程B的old_head会读取head最新的值,此时线程B即可读取到线程A对引用计数修改的最新值,再+1会变为3)。接着,线程 B 将 head 修改为指向下一个节点,并让 head 的计数器减 2,计数器变回 1。
  3. 线程 B 完成操作后退出,此时节点只被线程A引用,计数器保持为 1。线程 A 继续操作,但发现 old_head(3) 和当前的 head(1) 已不一致,放弃弹出该节点,并进入 else 分支让减计数器减 1,计数器变为 0。

问题是,上述过程仅仅是在两个计数器的共同配合下才会实现的。如果仅有一个计数器,那么计数器永远不会减到 0,导致节点永远不会被删除。这是因为不同线程对计数器的操作是独立的,无法相互感知。

为什么需要两个计数器?

为了解决这个问题,计数器必须与节点绑定,且每个节点只能有一个全局计数器,就像 shared_ptr 的设计一样。然而,shared_ptr 无法实现对 head 的无锁、原子性操作,因此必须引入第二个计数器。

两个计数器的设计允许:

  • 一个计数器跟随 head,用于线程安全地读取和操作(增计数器)。
  • 另一个计数器绑定到节点,用于判断节点是否可以安全删除(减计数器)。

这样的设计虽然复杂,但能确保计数的正确性和节点的安全性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/65086.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【微信小程序】2|轮播图 | 我的咖啡店-综合实训

轮播图 引言 在微信小程序中&#xff0c;轮播图是一种常见的用户界面元素&#xff0c;用于展示广告、产品图片等。本文将通过“我的咖啡店”小程序的轮播图实现&#xff0c;详细介绍如何在微信小程序中创建和管理轮播图。 轮播图数据准备 首先&#xff0c;在home.js文件中&a…

tortoisegit推送失败

tortoisegit推送失败 git.exe push --progress -- "origin" testLidar:testLidar /usr/bin/bash: gitgithub.com: No such file or directory fatal: Could not read from remote repository. Please make sure you have the correct access rights and the reposit…

WebGIS实战开源项目:智慧机场三维可视化(学习笔记)

From&#xff1a;新中地 1.简介 智慧机场解决方案&#xff0c;基于数字化大平台&#xff0c;融合AI、大数据、IoT、视频云、云计算等技术&#xff0c;围绕机场“运控、安防、服务”三大业务领域&#xff0c;构建“出行一张脸”及“运行一张图”两大场景化解决方案。 https://…

C++----类与对象(下篇)

再谈构造函数 回顾函数体内赋值 在创建对象时&#xff0c;编译器通过调用构造函数&#xff0c;给对象中各个成员变量一个合适的初始值。 class Date{ public: Date(int year, int month, int day) { _year year; _month month; _day day; } private: int _year; int _mo…

重装系统后的那点事儿

每次重装系统完&#xff0c;总是有这有那的问题&#xff0c;比如这几个常见问题&#xff0c;各种C盘的数据最好在重装系统前备份哦&#xff0c;不然有点小麻烦&#xff0c;要下载好多东西&#xff01;&#xff01; 文章目录 需要新应用打开此 ms-paint新链接搜索框不见了 需要…

ROS2中通过launch读取.yaml配置文件启动节点

环境&#xff1a;Ubuntu22.04&#xff0c;ROS2-humble 通过修改.yaml配置文件中的参数&#xff0c;可以不用重新编译源代码进行软件调试。 1.yaml文件格式 bag_to_image_node&#xff1a;运行的ROS2节点名称 参数格式参考如下&#xff1a; bag_to_image_node:ros__parameters…

链原生 Web3 AI 网络 Chainbase 推出 AVS 主网, 拓展 EigenLayer AVS 场景

在 12 月 4 日&#xff0c;链原生的 Web3 AI 数据网络 Chainbase 正式启动了 Chainbase AVS 主网&#xff0c;同时发布了首批 20 个 AVS 节点运营商名单。Chainbase AVS 是 EigenLayer AVS 中首个以数据智能为应用导向的主网 AVS&#xff0c;其采用四层网络架构&#xff0c;其中…

【文档搜索引擎】搜索模块的完整实现

调用索引模块&#xff0c;来完成搜索的核心过程 主要步骤 简化版本的逻辑&#xff1a; 分词&#xff1a;针对用户输入的查询词进行分词&#xff08;用户输入的查询词&#xff0c;可能不是一个词&#xff0c;而是一句话&#xff09;触发&#xff1a;拿着每个分词结果&#xf…

MySQL 数据库优化详解【Java数据库调优】

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c; 忍不住分享一下给大家。点击跳转到网站 学习总结 1、掌握 JAVA入门到进阶知识(持续写作中……&#xff09; 2、学会Oracle数据库入门到入土用法(创作中……&#xff09; 3、手把…

ensp 关于acl的运用和讲解

ACL&#xff08;Access Control List&#xff0c;访问控制列表&#xff09;是一种常用于网络设备&#xff08;如路由器、交换机&#xff09;上的安全机制&#xff0c;用于控制数据包的流动与访问权限。ACL 可以指定哪些数据包允许进入或离开某个网络接口&#xff0c;基于不同的…

突围边缘:OpenAI开源实时嵌入式API,AI触角延伸至微观世界

当OpenAI宣布开源其名为openai-realtime-embedded-sdk的实时嵌入式API时&#xff0c;整个科技界都为之震惊。这一举动意味着&#xff0c;曾经遥不可及的强大AI能力&#xff0c;如今可以被嵌入到像ESP32这样的微型控制器中&#xff0c;真正地将AI的触角延伸到了物联网和边缘计算…

知识中台与人工智能:融合赋能企业智能化知识服务与决策

在数字化、智能化的时代背景下&#xff0c;企业面临着前所未有的机遇与挑战。为了提升知识管理与服务的能力&#xff0c;推动企业的智能化转型与发展&#xff0c;知识中台与人工智能的融合应用正成为新的趋势。知识中台作为连接数据、知识与业务的核心平台&#xff0c;能够为企…

《Web 项目开发之旅》

一、项目简介 介绍项目的背景与目标&#xff0c;说明为什么要开展这个 Web 项目。展示项目最终完成后的整体页面截图&#xff0c;让读者对项目外观有初步印象。 二、技术选型 阐述在项目中使用的前端技术&#xff08;如 HTML、CSS、JavaScript 框架等&#xff09;、后端技术…

VSCode搭建Java开发环境 2024保姆级安装教程(Java环境搭建+VSCode安装+运行测试+背景图设置)

名人说&#xff1a;一点浩然气&#xff0c;千里快哉风。—— 苏轼《水调歌头》 创作者&#xff1a;Code_流苏(CSDN) 目录 一、Java开发环境搭建二、VScode下载及安装三、VSCode配置Java环境四、运行测试五、背景图设置 很高兴你打开了这篇博客&#xff0c;更多详细的安装教程&…

【GIS教程】使用GDAL实现栅格转矢量(GeoJSON、Shapefile)- 附完整代码

文章目录 一、 应用场景1、GeoJSON2、ESRI Shapefile3、GDAL 二、基本思路1、数据准备2、重投影&#xff08;可选&#xff09;3、创建空的矢量图层4、栅格转矢量 三、完整代码四、总结五、拓展&#xff08;使用ArcGIS工具进行栅格转矢量&#xff09; 一、 应用场景 TIFF格式的…

计算机毕业设计PySpark+Hadoop中国城市交通分析与预测 Python交通预测 Python交通可视化 客流量预测 交通大数据 机器学习 深度学习

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

JVM系列(十三) -常用调优工具介绍

最近对 JVM 技术知识进行了重新整理&#xff0c;再次献上 JVM系列文章合集索引&#xff0c;感兴趣的小伙伴可以直接点击如下地址快速阅读。 JVM系列(一) -什么是虚拟机JVM系列(二) -类的加载过程JVM系列(三) -内存布局详解JVM系列(四) -对象的创建过程JVM系列(五) -对象的内存分…

Hive其四,Hive的数据导出,案例展示,表类型介绍

目录 一、Hive的数据导出 1&#xff09;导出数据到本地目录 2&#xff09;导出到hdfs的目录下 3&#xff09;直接将结果导出到本地文件中 二、一个案例 三、表类型 1、表类型介绍 2、内部表和外部表转换 3、两种表的区别 4、练习 一、Hive的数据导出 数据导出的分类&…

使用RKNN进行YOLOv8人体姿态估计的实战教程:yolov8-pose.onnx转yolov8-pose.rknn+推理全流程

之前文章有提到“YOLOv8的原生模型包含了后处理步骤,其中一些形状超出了RK3588的矩阵计算限制,因此需要对输出层进行一些裁剪”,通过裁剪后得到的onnx能够顺利的进行rknn转换,本文将对转rnkk过程,以及相应的后处理进行阐述。并在文末附上全部源码、数据、模型的百度云盘链…

Pytorch | 利用NI-FGSM针对CIFAR10上的ResNet分类器进行对抗攻击

Pytorch | 利用NI-FGSM针对CIFAR10上的ResNet分类器进行对抗攻击 CIFAR数据集NI-FGSM介绍背景算法流程 NI-FGSM代码实现NI-FGSM算法实现攻击效果 代码汇总nifgsm.pytrain.pyadvtest.py 之前已经针对CIFAR10训练了多种分类器&#xff1a; Pytorch | 从零构建AlexNet对CIFAR10进行…