C++高效死锁检测——实现原理与应用(基于强连通分量)

背景

在项目使用多进程、多线程过程中,因争夺资源而造成一种资源竞态,所以需加锁处理。如下图所示,线程 A 想获取线程 B 的锁,线程 B 想获取线程 C 的锁,线程 C 想获取线程 D 的锁, 线程 D 想获取线程 A 的锁,从而构建了一个资源获取环,当进程或者线程申请的锁处于相互交叉锁住的情况,就会出现死锁,它们将无法继续运行。

在这里插入图片描述

死锁的存在是因为有资源获取环的存在,所以只要能检测出资源获取环,就等同于检测出死锁的存在。

设计方案

本文实现的是一个锁管理器,提供加锁解锁功能,同时提供检测死锁功能,出现死锁后释放部分资源来解决死锁。死锁的检测是通过检测死锁图中有没有环来实现的,如果对于请求同一资源的两个锁L1和L2(其对应的进程为P1和P2),L1已经获得资源而L2在等待,则死锁图中有一条边P2->P1。

有向图中环的检测,即找到图中所有的强连通分量,使用Tarjan算法来实现,可以在O(E+V)时间找到所有的环。死锁图一般是比较稀疏的图,存储使用邻接表。

锁的数据结构为:

class Lock {
public:Lock(int p, int res, int stat) {pid = p;res_id = res;state = stat;}int pid;int res_id;int state; //0 == locked, 1 == waiting
};

锁的状态有两种,已持有,等待。对于同一个资源加的锁放在链表中,方便检索和随机位置的删除。如果一个锁L1是资源R1对应链表的头,则他是一个已经持有的锁,链表其他位置的锁Ln都在阻塞等待L1释放,因此在死锁图中新建 Ln.pid -> L1.pid 边。

锁管理器的类声明如下,实现了后台线程进行死锁检测。

class LockManager {
public:static LockManager& getInstance();~LockManager();// 获取锁,返回一个指向Lock对象的shared_ptr,ret表示结果shared_ptr<Lock> getLock(int pid, int res_id, int& ret);// 查找指定pid和res_id的锁,返回一个指向Lock对象的shared_ptrshared_ptr<Lock> findLock(int pid, int res_id);// 释放锁int releaseLock(shared_ptr<Lock> lock);// 检测是否有死锁,若有,设置tokill为需要解除锁的pidbool isDeadLock(int& pid);void print();// 启动死锁检测器,interval为检测间隔void startDetection(int interval);void stopDeadlockDetector();private:LockManager();// 计算强连通分量(SCC),存储在pid_to_SCCid中void calSCC(map<int, vector<int>>& pid_to_SCCid);// 释放指定pid的所有锁void releaseProcess(int pid); // 内部获取锁的实现,返回一个指向Lock对象的shared_ptr,possible表示是否可能发生死锁shared_ptr<Lock> getLockInternal(int pid, int res_id, bool& possible);// 内部释放锁的实现,返回操作结果,possible表示是否可能发生死锁int releaseLockInternal(shared_ptr<Lock> lock, bool& possible);void detectDeadlock();map<int, list<shared_ptr<Lock>>> res_to_locklist;// 资源ID到锁列表的映射map<int, pair<int, list<shared_ptr<Lock>>>> pid_to_locks;// 进程ID到锁计数和锁列表的映射map<int, list<int>> lock_graph;// 锁图,表示进程之间的等待依赖关系set<int> pid_set;// 进程ID集合,用于跟踪所有活跃的进程ID// 用于后台线程检测死锁的参数thread* deadlock_checker;// 指向死锁检测器线程的指针bool stop;int check_interval;mutex mtx;
};

具体实现

创建锁节点

获取锁:当一个进程 (pid) 请求一个资源 (res_id) 的锁时,会调用 getLock 方法。该方法首先检查该进程是否已经拥有该资源的锁(通过 findLock 方法)。

shared_ptr<Lock> LockManager::getLock(int pid, int res_id, int& ret) {bool deadlock_possible;ret = 0;if (findLock(pid, res_id) != nullptr) {ret = 1; return nullptr;}mtx.lock();auto p = getLockInternal(pid, res_id, deadlock_possible);mtx.unlock();return p;
}

内部获取锁逻辑:

  • 如果没有找到重复的锁,会调用内部方法 getLockInternal 来实际创建并获取锁。
  • 在 getLockInternal 方法中,会根据资源是否已经被其他进程锁定来创建不同状态的锁对象:
  • 如果资源没有被锁定,创建一个状态为 0 的锁对象(表示已锁定)。
  • 如果资源已经被其他进程锁定,创建一个状态为 1 的锁对象(表示等待)。
shared_ptr<Lock> LockManager::getLockInternal(int pid, int res_id, bool& deadlock_possible) {deadlock_possible = false; // 初始无死锁if (!res_to_locklist.count(res_id)) res_to_locklist[res_id] = list<shared_ptr<Lock>>{};if (!pid_to_locks.count(pid)) pid_to_locks[pid] = make_pair(0, list<shared_ptr<Lock>>{});pid_set.insert(pid); // 加入线程shared_ptr<Lock> newlock;if (res_to_locklist[res_id].size() == 0) {newlock = make_shared<Lock>(pid, res_id, 0);res_to_locklist[res_id].push_back(newlock);pid_to_locks[pid].first++;pid_to_locks[pid].second.push_back(newlock);}else {newlock = make_shared<Lock>(pid, res_id, 1);res_to_locklist[res_id].push_back(newlock);pid_to_locks[pid].second.push_back(newlock);if (pid_to_locks[pid].first > 0) {const auto& first_lock = res_to_locklist[res_id].front();int p0id = first_lock->pid;if (!lock_graph.count(pid)) lock_graph[pid] = list<int>{};lock_graph[pid].push_back(p0id);deadlock_possible = true; // 可能发生死锁需要检查}}return newlock;
}

释放锁

调用 releaseLock 方法释放锁对象。releaseLock 方法调用 releaseLockInternal 方法,实际进行锁释放操作。

内部释放锁的逻辑:

  • 从 res_to_locklist 中移除该锁对象。
  • 从 pid_to_locks 中移除该锁对象。
  • 如果锁对象的状态是 0(已锁定)且资源上仍有其他锁,则将资源上的下一个等待锁(状态为 1)转换为已锁定状态(状态为 0),并更新相应进程的锁计数。
  • 更新依赖关系图 (lock_graph):
  • 移除当前进程到所有依赖于它的进程的边。
  • 添加新的依赖关系,即新的持有锁的进程到其他等待进程的边。
int LockManager::releaseLock(shared_ptr<Lock> lock) {bool deadlock_possible;mtx.lock();releaseLockInternal(lock, deadlock_possible);mtx.unlock();return 0;
}
int LockManager::releaseLockInternal(shared_ptr<Lock> lock, bool& deadlock_possible) {deadlock_possible = false; // 初始化为没有死锁的可能性int pid = lock->pid; // 获取锁的进程IDint res_id = lock->res_id; // 获取锁的资源IDauto& locklist = res_to_locklist[res_id]; // 获取资源对应的锁列表locklist.remove(lock); // 从资源的锁列表中移除该锁pid_to_locks[pid].second.remove(lock); // 从进程的锁列表中移除该锁printf("release lock(pid=%d, res_id=%d, state=%d)\n", pid, res_id, lock->state);if (lock->state == 0 && locklist.size() > 0) { // 如果释放的是已锁定状态的锁,且资源上还有其他等待的锁pid_to_locks[pid].first--; // 减少该进程的锁计数int p0id = locklist.front()->pid; // 获取新获得锁的进程IDlocklist.front()->state = 0; // 将等待的锁状态改为已锁定pid_to_locks[p0id].first++; // 增加新获得锁的进程的锁计数for (auto it = locklist.begin(); it != locklist.end(); it++) { // 更新依赖关系图int p1id = (*it)->pid;lock_graph[p1id].remove(pid); // 移除指向释放锁的进程的依赖关系printf("remove edge(%d->%d)\n", p1id, pid);if (p1id != p0id) {lock_graph[p1id].push_back(p0id); // 添加新的依赖关系,指向新获得锁的进程printf("add edge(%d->%d)\n", p1id, p0id);}}// 可能导致死锁,需要检查deadlock_possible = true;}return 0;
}

Kosaraju算法

对反向图进行拓扑排序,并按照拓扑排序的逆序进行深度优先搜索 (DFS),是为了高效地找到原始图中的强连通分量 (SCC)。这种方法称为 Kosaraju算法,其主要思想是:

拓扑排序 确定访问顺序:

  • 对反向图进行拓扑排序,可以得到一个访问顺序,使得在原图中从某个节点出发的所有可能路径都被访问到。
  • 拓扑排序保证了在原图中,某个节点的所有后继节点在排序中都在它之前。这有助于后续步骤中的 SCC 检测。

逆序DFS 高效找到 SCC:

  • 按照拓扑排序的逆序进行 DFS 确保每次从尚未访问的节点出发时,能够遍历一个完整的强连通分量。
  • 由于拓扑排序的逆序保证了我们从图的“后面”开始访问(即从没有后继节点的节点开始),所以每次 DFS 都会完全包含一个 SCC。

这种方法的效率很高,因为每个节点和每条边都只被访问两次(一次在拓扑排序时,一次在逆序 DFS 时),所以 Kosaraju 算法的时间复杂度是 O(V + E),其中 V 是节点数,E 是边数。

死锁检测是通过计算锁图的强连通分量 (SCC) 来实现的。首先通过 reverseGraph 方法构建锁图的反向图。

void reverseGraph(map<int, list<int>>& origin, map<int, list<int>>& dest) {for (const auto& p : origin) {int e = p.first;const auto& vec = p.second;for (auto v : vec) {if (!dest.count(v)) dest[v] = list<int>{};dest[v].push_back(e);}}
}

通过 topoSort 方法对反向图进行拓扑排序,按照拓扑排序的逆序进行DFS。

void dfs(map<int, list<int>>& graph, int cur, set<int>& visited, vector<int>& topo_order) {if (visited.count(cur)) return;visited.insert(cur);for (auto x : graph[cur]) {dfs(graph, x, visited, topo_order);}topo_order.push_back(cur);
}void topoSort(map<int, list<int>>& graph, set<int>& pid_set, vector<int>& topo_order) {set<int> visited;for (auto x : pid_set) {dfs(graph, x, visited, topo_order);}
}

通过深度优先搜索 (DFS) 来计算锁图中的强连通分量。检查是否存在强连通分量。如果存在,说明有死锁,并选择一个进程进行终止以打破死锁。

void LockManager::calSCC(map<int, vector<int>>& pid_to_SCCid) {map<int, list<int>> reverse_graph;reverseGraph(lock_graph, reverse_graph); // 构建反向图vector<int> topo_order;topoSort(reverse_graph, pid_set, topo_order); // 对反向图进行拓扑排序set<int> visited;for (int i = topo_order.size() - 1; i >= 0; i--) { // 按照拓扑排序的逆序进行DFSauto vec = vector<int>{};dfs(lock_graph, topo_order[i], visited, vec);if (vec.size() > 1) pid_to_SCCid[i] = vec; // 找到强连通分量}
}

后台检测死锁

isDeadLock是实际进行检测死锁的函数:

计算强连通分量 (SCC):调用 calSCC 方法,计算图中的强连通分量并将结果存储。每个强连通分量表示一个可能的死锁环。

遍历每个强连通分量:对于每个强连通分量中的进程,打印其进程ID,并找到锁数量最少的进程。

选择要终止的进程:如果找到了锁数量最少的进程,将其标记为 tokill。通过终止该进程来打破死锁。

如果存在强连通分量,返回 true 表示存在死锁

bool LockManager::isDeadLock(int& tokill) {map<int, vector<int>> SCCid_to_pids;calSCC(SCCid_to_pids); // 计算强连通分量for (const auto& cyc : SCCid_to_pids) { // 遍历每个强连通分量const auto& vec = cyc.second;printf("detected deadlock: ");int minlocks = 1e8;int minpid = -1;for (auto it = vec.begin(); it != vec.end(); it++) { // 打印并找到最少锁的进程printf("%d->", *it);int nlock = pid_to_locks[*it].first;if (nlock < minlocks) {minlocks = nlock;minpid = *it;}}printf("%d\n", vec.front());if (minpid != -1) {printf("will release pid=%d(%d) to break deadlock\n", minpid, minlocks);tokill = minpid; // 选择需要终止的进程}}return SCCid_to_pids.size() > 0; // 如果存在强连通分量,说明存在死锁
}

detectDeadlock方法是一个后台线程,用于定期检测死锁并处理死锁。

LockManager::LockManager() {stop = false;check_interval = 1;deadlock_checker = new thread([this] { this->detectDeadlock(); });
}
void LockManager::detectDeadlock() {using std::chrono::system_clock;while (!stop) {int tokill;mtx.lock();while (isDeadLock(tokill)) {releaseProcess(tokill);}mtx.unlock();// 检查死锁std::time_t tt = system_clock::to_time_t(system_clock::now());struct std::tm* ptm = std::localtime(&tt);ptm->tm_sec += check_interval;std::this_thread::sleep_until(system_clock::from_time_t(mktime(ptm)));}
}

结果展示

使用下文的完整代码测试,set lock设置锁,release lock释放,

int main() {doGetLock(1, 2);doGetLock(1, 3);doGetLock(2, 2);doGetLock(3, 3);doGetLock(2, 3);doGetLock(3, 2);doReleaseLock(1, 2);doReleaseLock(1, 3);std::this_thread::sleep_for(std::chrono::seconds(2));doGetLock(5, 5);doGetLock(6, 6);doGetLock(5, 6);doGetLock(6, 5);int pid, resid;char tmp[40];while (scanf("%s %d %d", tmp, &pid, &resid) != EOF) {if (string(tmp) == "lock") {doGetLock(pid, resid);}else {doReleaseLock(pid, resid);}}std::this_thread::sleep_for(std::chrono::seconds(1));LockManager::getInstance().stopDeadlockDetector();return 0;
}
set lock(pid=1, res_id=2)
set lock(pid=1, res_id=3)
set lock(pid=2, res_id=2)
set lock(pid=3, res_id=3)
set lock(pid=2, res_id=3)
set lock(pid=3, res_id=2)
release lock(pid=1, res_id=2, state=0)
remove edge(2->1)
remove edge(3->1)
add edge(3->2) //锁交还2
release lock(pid=1, res_id=3, state=0)
remove edge(3->1)
remove edge(2->1)
add edge(2->3)
detected deadlock: 3->2->3
will release pid=3(1) to break deadlock
release lock(pid=3, res_id=3, state=0)
remove edge(2->3)
release lock(pid=3, res_id=2, state=1)
erase pid 3
set lock(pid=5, res_id=5)
set lock(pid=6, res_id=6)
set lock(pid=5, res_id=6)
set lock(pid=6, res_id=5)
detected deadlock: 6->5->6
will release pid=6(1) to break deadlock
release lock(pid=6, res_id=6, state=0)
remove edge(5->6)
release lock(pid=6, res_id=5, state=1)
erase pid 6

设置锁:

  • set lock(pid=1, res_id=2): 进程1请求资源2的锁。
  • set lock(pid=1, res_id=3): 进程1请求资源3的锁。
  • set lock(pid=2, res_id=2): 进程2请求资源2的锁。
  • set lock(pid=3, res_id=3): 进程3请求资源3的锁。
  • set lock(pid=2, res_id=3): 进程2请求资源3的锁。
  • set lock(pid=3, res_id=2): 进程3请求资源2的锁。

释放锁:

  • release lock(pid=1, res_id=2, state=0): 进程1释放资源2的锁(锁处于已锁定状态)。
  • remove edge(2->1): 从进程2到进程1的依赖关系被移除。
  • remove edge(3->1): 从进程3到进程1的依赖关系被移除。
  • add edge(3->2): 添加新的依赖关系,从进程3到进程2。
  • release lock(pid=1, res_id=3, state=0): 进程1释放资源3的锁(锁处于已锁定状态)。
  • remove edge(3->1): 从进程3到进程1的依赖关系被移除。
  • remove edge(2->1): 从进程2到进程1的依赖关系被移除。
  • add edge(2->3): 添加新的依赖关系,从进程2到进程3。

检测到死锁:

  • detected deadlock: 3->2->3: 检测到由进程3和进程2组成的死锁循环。
  • will release pid=3(1) to break deadlock: 选择进程3(持有一个锁)来解除死锁。

解除死锁:

  • release lock(pid=3, res_id=3, state=0): 释放进程3持有的资源3的锁(锁处于已锁定状态)。
  • remove edge(2->3): 移除从进程2到进程3的依赖关系。
  • release lock(pid=3, res_id=2, state=1): 释放进程3持有的资源2的锁(锁处于等待状态)。
  • erase pid 3: 从锁管理器中移除进程3。

设置新的锁:

  • set lock(pid=5, res_id=5): 进程5请求资源5的锁。
  • set lock(pid=6, res_id=6): 进程6请求资源6的锁。
  • set lock(pid=5, res_id=6): 进程5请求资源6的锁。
  • set lock(pid=6, res_id=5): 进程6请求资源5的锁。

再次检测到死锁:

  • detected deadlock: 6->5->6: 检测到由进程6和进程5组成的死锁循环。
  • will release pid=6(1) to break deadlock: 选择进程6(持有一个锁)来解除死锁。

解除新的死锁:

  • release lock(pid=6, res_id=6, state=0): 释放进程6持有的资源6的锁(锁处于已锁定状态)。
  • remove edge(5->6): 移除从进程5到进程6的依赖关系。
  • release lock(pid=6, res_id=5, state=1): 释放进程6持有的资源5的锁(锁处于等待状态)。
  • erase pid 6: 从锁管理器中移除进程6。

完整代码

#include <iostream>
#include <memory>
#include <map>
#include <list>
#include <vector>
#include <set>
#include <string>
#include <thread>
#include <chrono>
#include <mutex>
using namespace std;class Lock {
public:Lock(int p, int res, int stat) {pid = p;res_id = res;state = stat;}int pid;int res_id;int state; //0 == locked, 1 == waiting
};typedef pair<int, list<shared_ptr<Lock> > > pivec;class LockManager {
public:static LockManager& getInstance();~LockManager();// 获取锁,返回一个指向Lock对象的shared_ptr,ret表示结果shared_ptr<Lock> getLock(int pid, int res_id, int& ret);// 查找指定pid和res_id的锁,返回一个指向Lock对象的shared_ptrshared_ptr<Lock> findLock(int pid, int res_id);// 释放锁int releaseLock(shared_ptr<Lock> lock);// 检测是否有死锁,若有,设置tokill为需要解除锁的pidbool isDeadLock(int& pid);void print();// 启动死锁检测器,interval为检测间隔void startDetection(int interval);void stopDeadlockDetector();private:LockManager();// 计算强连通分量(SCC),存储在pid_to_SCCid中void calSCC(map<int, vector<int>>& pid_to_SCCid);// 释放指定pid的所有锁void releaseProcess(int pid); // 内部获取锁的实现,返回一个指向Lock对象的shared_ptr,possible表示是否可能发生死锁shared_ptr<Lock> getLockInternal(int pid, int res_id, bool& possible);// 内部释放锁的实现,返回操作结果,possible表示是否可能发生死锁int releaseLockInternal(shared_ptr<Lock> lock, bool& possible);void detectDeadlock();map<int, list<shared_ptr<Lock>>> res_to_locklist;// 资源ID到锁列表的映射map<int, pair<int, list<shared_ptr<Lock>>>> pid_to_locks;// 进程ID到锁计数和锁列表的映射map<int, list<int>> lock_graph;// 锁图,表示进程之间的等待依赖关系set<int> pid_set;// 进程ID集合,用于跟踪所有活跃的进程ID// 用于后台线程检测死锁的参数thread* deadlock_checker;// 指向死锁检测器线程的指针bool stop;int check_interval;mutex mtx;
};LockManager::LockManager() {stop = false;check_interval = 1;deadlock_checker = new thread([this] { this->detectDeadlock(); });
}LockManager::~LockManager() {stopDeadlockDetector();
}LockManager& LockManager::getInstance() {static LockManager inst;return inst;
}shared_ptr<Lock> LockManager::getLock(int pid, int res_id, int& ret) {bool deadlock_possible;ret = 0;if (findLock(pid, res_id) != nullptr) {ret = 1; return nullptr;}mtx.lock();auto p = getLockInternal(pid, res_id, deadlock_possible);mtx.unlock();return p;
}shared_ptr<Lock> LockManager::getLockInternal(int pid, int res_id, bool& deadlock_possible) {deadlock_possible = false; if (!res_to_locklist.count(res_id)) res_to_locklist[res_id] = list<shared_ptr<Lock>>{};if (!pid_to_locks.count(pid)) pid_to_locks[pid] = make_pair(0, list<shared_ptr<Lock>>{});pid_set.insert(pid);shared_ptr<Lock> newlock;if (res_to_locklist[res_id].size() == 0) {newlock = make_shared<Lock>(pid, res_id, 0);res_to_locklist[res_id].push_back(newlock);pid_to_locks[pid].first++;pid_to_locks[pid].second.push_back(newlock);}else {newlock = make_shared<Lock>(pid, res_id, 1);res_to_locklist[res_id].push_back(newlock);pid_to_locks[pid].second.push_back(newlock);if (pid_to_locks[pid].first > 0) {const auto& first_lock = res_to_locklist[res_id].front();int p0id = first_lock->pid;if (!lock_graph.count(pid)) lock_graph[pid] = list<int>{};lock_graph[pid].push_back(p0id);deadlock_possible = true;}}return newlock;
}shared_ptr<Lock> LockManager::findLock(int pid, int res_id) {shared_ptr<Lock> result = nullptr;mtx.lock();for (const auto& p : pid_to_locks[pid].second) {if (p->res_id == res_id) result = p;}mtx.unlock();return result;
}int LockManager::releaseLock(shared_ptr<Lock> lock) {bool deadlock_possible;mtx.lock();releaseLockInternal(lock, deadlock_possible);mtx.unlock();return 0;
}int LockManager::releaseLockInternal(shared_ptr<Lock> lock, bool& deadlock_possible) {deadlock_possible = false;int pid = lock->pid;int res_id = lock->res_id;auto& locklist = res_to_locklist[res_id];locklist.remove(lock);pid_to_locks[pid].second.remove(lock);printf("release lock(pid=%d, res_id=%d, state=%d)\n", pid, res_id, lock->state);if (lock->state == 0 && locklist.size() > 0) {pid_to_locks[pid].first--;int p0id = locklist.front()->pid;locklist.front()->state = 0;pid_to_locks[p0id].first++;for (auto it = locklist.begin(); it != locklist.end(); it++) {int p1id = (*it)->pid;lock_graph[p1id].remove(pid); printf("remove edge(%d->%d)\n", p1id, pid);if (p1id != p0id) {lock_graph[p1id].push_back(p0id);printf("add edge(%d->%d)\n", p1id, p0id);}}deadlock_possible = true;}return 0;
}bool LockManager::isDeadLock(int& tokill) {map<int, vector<int>> SCCid_to_pids;calSCC(SCCid_to_pids);for (const auto& cyc : SCCid_to_pids) {const auto& vec = cyc.second;printf("detected deadlock: ");int minlocks = 1e8;int minpid = -1;for (auto it = vec.begin(); it != vec.end(); it++) {printf("%d->", *it);int nlock = pid_to_locks[*it].first;if (nlock < minlocks) {minlocks = nlock;minpid = *it;}}printf("%d\n", vec.front());if (minpid != -1) {printf("will release pid=%d(%d) to break deadlock\n", minpid, minlocks);tokill = minpid;}}return SCCid_to_pids.size() > 0;
}void LockManager::releaseProcess(int pid) {if (pid_to_locks.count(pid)) {list<shared_ptr<Lock>> tmplist = pid_to_locks[pid].second;for (const auto& p_lock : tmplist) {bool possible; releaseLockInternal(p_lock, possible);}pid_to_locks.erase(pid);};if (lock_graph.count(pid)) lock_graph.erase(pid);pid_set.erase(pid);printf("erase pid %d\n", pid);
}//死锁检测void LockManager::startDetection(int interval) {if (deadlock_checker != nullptr) {check_interval = interval;deadlock_checker = new thread([this] {this->detectDeadlock();});}
}void LockManager::stopDeadlockDetector() {stop = true;if (deadlock_checker && deadlock_checker->joinable()) {printf("deadlock detector is stoped\n");deadlock_checker->join();deadlock_checker = nullptr;}
}void LockManager::detectDeadlock() {using std::chrono::system_clock;while (!stop) {int tokill;mtx.lock();while (isDeadLock(tokill)) {releaseProcess(tokill);}mtx.unlock();// 检查死锁std::time_t tt = system_clock::to_time_t(system_clock::now());struct std::tm* ptm = std::localtime(&tt);ptm->tm_sec += check_interval;std::this_thread::sleep_until(system_clock::from_time_t(mktime(ptm)));}
}// 计算SCC的辅助函数void reverseGraph(map<int, list<int>>& origin, map<int, list<int>>& dest) {for (const auto& p : origin) {int e = p.first;const auto& vec = p.second;for (auto v : vec) {if (!dest.count(v)) dest[v] = list<int>{};dest[v].push_back(e);}}
}void dfs(map<int, list<int>>& graph, int cur, set<int>& visited, vector<int>& topo_order) {if (visited.count(cur)) return;visited.insert(cur);for (auto x : graph[cur]) {dfs(graph, x, visited, topo_order);}topo_order.push_back(cur);
}void topoSort(map<int, list<int>>& graph, set<int>& pid_set, vector<int>& topo_order) {set<int> visited;for (auto x : pid_set) {dfs(graph, x, visited, topo_order);}
}void printVec(vector<int>& vec) {printf("vector[%d", vec.front());for (auto it = vec.begin() + 1; it != vec.end(); it++) {printf(",%d", *it);}printf("]\n");
}void LockManager::calSCC(map<int, vector<int>>& pid_to_SCCid) {map<int, list<int>> reverse_graph;reverseGraph(lock_graph, reverse_graph);vector<int> topo_order;topoSort(reverse_graph, pid_set, topo_order);set<int> visited;for (int i = topo_order.size() - 1; i >= 0; i--) {auto vec = vector<int>{};dfs(lock_graph, topo_order[i], visited, vec);if (vec.size() > 1) pid_to_SCCid[i] = vec;}
}void LockManager::print() {cout << "pid to locks:[\n";for (const auto& p : pid_to_locks) {printf("(%d, %d, [", p.first, p.second.first);for (const auto& q : p.second.second) {printf("(pid=%d, res_id=%d, state=%d),", q->pid, q->res_id, q->state);}printf("])\n");}cout << ']' << endl << "res to locks:[\n";for (const auto& p : res_to_locklist) {printf("(%d, [", p.first);for (const auto& q : p.second) {printf("(pid=%d, res_id=%d, state=%d),", q->pid, q->res_id, q->state);}printf("])\n");}cout << ']' << endl << "graph:[\n";for (const auto& p : lock_graph) {int e = p.first;printf("%d:[", e);for (const auto& q : p.second) {printf("%d,", q);}printf("]\n");}cout << ']' << endl << "pid_set:[";for (auto x : pid_set) printf("%d, ", x);cout << ']' << endl;
}int doGetLock(int pid, int rid) {static auto& lock_manager = LockManager::getInstance();int ret;printf("set lock(pid=%d, res_id=%d)\n", pid, rid);lock_manager.getLock(pid, rid, ret);if (ret == 1) {printf("lock(pid=%d, res_id=%d) is duplicated, lock failed\n", pid, rid);}return ret;
}int doReleaseLock(int pid, int rid) {static auto& lock_manager = LockManager::getInstance();auto lock = lock_manager.findLock(pid, rid);if (lock != nullptr) lock_manager.releaseLock(lock);else printf("no such lock(pid=%d, res_id=%d)\n", pid, rid);return 0;
}int main() {doGetLock(1, 2);doGetLock(1, 3);doGetLock(2, 2);doGetLock(3, 3);doGetLock(2, 3);doGetLock(3, 2);doReleaseLock(1, 2);doReleaseLock(1, 3);std::this_thread::sleep_for(std::chrono::seconds(2));doGetLock(5, 5);doGetLock(6, 6);doGetLock(5, 6);doGetLock(6, 5);int pid, resid;char tmp[40];while (scanf("%s %d %d", tmp, &pid, &resid) != EOF) {if (string(tmp) == "lock") {doGetLock(pid, resid);}else {doReleaseLock(pid, resid);}}std::this_thread::sleep_for(std::chrono::seconds(1));LockManager::getInstance().stopDeadlockDetector();return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/840609.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

回溯大法总结

前言 本篇博客将分两步来进行&#xff0c;首先谈谈我对回溯法的理解&#xff0c;然后通过若干道题来进行讲解&#xff0c;最后总结 对回溯法的理解 回溯法可以看做蛮力法的升级版&#xff0c;它在解决问题时的每一步都尝试所有可能的选项&#xff0c;最终找出所以可行的方案…

【Git】版本控制工具——Git介绍及使用

目录 版本控制版本控制系统的主要目标分类小结 分布式版本控制系统——GitGit特点Git与SVN的区别Git的工作机制 Git安装Git 团队协作机制团队内协作跨团队协作远程仓库远程仓库的作用有以下几个方面远程仓库操作流程/团队协作流程 Git分支什么是分支分支的好处 Git的常用命令Gi…

【热门话题】CentOS 常见命令指南

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 CentOS 常见命令指南一、文件与目录操作1. 切换目录2. 查看当前目录3. 列出目录…

SVM影像组学特征

近期做一个影像组学的分类模型 做的是一个胃癌T分期的模型&#xff0c;我刷选统计出一些胃癌区域的特征&#xff0c;如图&#xff1a;有癌症面积、体积等等 下面要做一个SVM&#xff08;支持向量机&#xff09;分类的模型&#xff0c;导入该文件&#xff0c;进行二分类&#x…

MFC密码对话框之间数据传送实例(源码下载)

新建一个login工程项目对话框&#xff0c;主对话框IDD_LOGIN_DIALOG中一个显示按钮IDC_BUTTON1、一个密码按钮IDC_BUTTON2。添加一个密码对话框IDD_DIALOG1&#xff0c;添加类password&#xff0c;在对话框中添加一个编辑框IDC_EDIT1、一个确定按钮IDC_BUTTON1。 程序功能&…

百度集团:AI重构,走到哪了?

内有自家公关一号“自曝”狼性文化&#xff0c;主动制造舆论危机。 外有&#xff0c;OpenAI、谷歌、字节、华为等大模型劲敌扎堆迭代新产品&#xff0c; 强敌环伺。 今天我们要说的是早就从BAT掉队的——百度。 最近&#xff0c;在武汉Aapollo Day 2024上&#xff0c;百度发布了…

抖音小店新规重磅来袭!事关店铺流量!商家的福音来了?

大家好&#xff0c;我是喷火龙。 就在前两天&#xff0c;抖店发布了新规&#xff0c;我给大家总结了一下&#xff0c;无非就是两点。 第一点&#xff1a;保证金下调&#xff0c;一证开多店。 第二点&#xff1a;新品上架破10单&#xff0c;有流量扶持。 咱来细细的解读&…

零基础HTML教程(34)--HTML综合实例

文章目录 1. 背景2. 开发流程2.1 网站功能设计2.2 建立网站目录结构2.3 开发首页2.2 生平简介页2.3 经典诗词页2.4 苏轼图集页2.5 留言板 3. 小结 1. 背景 通过前面33篇文章的学习&#xff0c;我们对HTML有了一个比较全面的了解。 本篇&#xff0c;我们编写一个网站实例&…

Unity在Windows平台播放HEVC/H.265格式视频的底层原理

相关术语、概念 HEVC/H.265 HEVC&#xff08;High Efficiency Video Coding&#xff09;是一种视频压缩标准&#xff0c;也被称为H.265。它是一种高效的视频编码标准&#xff0c;可以提供比之前的标准&#xff08;如H.264&#xff09;更高的压缩率&#xff0c;同时保持较高的…

ssm141餐厅点菜管理系统+vue

餐厅点菜管理系统的设计与实现 摘 要 网络技术和计算机技术发展至今&#xff0c;已经拥有了深厚的理论基础&#xff0c;并在现实中进行了充分运用&#xff0c;尤其是基于计算机运行的软件更是受到各界的关注。加上现在人们已经步入信息时代&#xff0c;所以对于信息的宣传和管…

大数据之Hive函数大全

&#x1f527; Hive函数大全 更多大数据学习资料请关注公众号“大数据领航员"免费领取 一、数学函数 1、取整函数: round 1.函数描述 返回值语法结构功能描述doubleround(double a)返回double类型的整数值部分&#xff08;遵循四舍五入&#xff09; 2.例程 hive>…

自定义RedisTemplate序列化器

大纲 RedisSerializerFastJsonRedisSerializer自定义二进制序列化器总结代码 在《RedisTemplate保存二进制数据的方法》一文中&#xff0c;我们将Java对象通过《使用java.io库序列化Java对象》中介绍的方法转换为二进制数组&#xff0c;然后保存到Redis中。实际可以通过定制Red…

智能化让幼儿园管理更加规范

在各个学龄阶段&#xff0c;幼儿园一向都是家长的教师最为操心的&#xff0c;一方面幼儿园孩子自主才能差&#xff0c;安全问题需求分外注重&#xff0c;另一方面&#xff0c;幼儿园孩子年纪小、缺少必定的认知才能和区分才能&#xff0c;需求加强引导。 那么怎么进步幼儿园孩子…

D60SB120-ASEMI整流桥D60SB120参数、封装、尺寸

编辑&#xff1a;ll D60SB120-ASEMI整流桥D60SB120参数、封装、尺寸 型号&#xff1a;D60SB120 品牌&#xff1a;ASEMI 封装&#xff1a;D-SB 批号&#xff1a;2024 最大重复峰值反向电压&#xff1a;1200V 最大正向平均整流电流(Vdss)&#xff1a;60A 功率(Pd)&#x…

力扣--哈希表13.罗马数字转整数

首先我们可以知道&#xff0c;一个整数&#xff0c;最多由2个罗马数字组成。 思路分析 这个方法能够正确将罗马数字转换为阿拉伯数字的原因在于它遵循了罗马数字的规则&#xff0c;并且对这些规则进行了正确的编码和处理。 罗马数字规则 罗马数字由以下字符组成&#xff1a…

运维笔记.MySQL.基于mysqldump数据备份与恢复

运维专题 MySQL.基于mysqldump数据备份与恢复 - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite:http://thispage.tech/Email: [email protected]. Shenzhen ChinaAddress of this article:https://blog.csdn.net/qq_2855…

多微信如何高效管理?一台电脑就能搞定!

对于有多个微信号的人来说&#xff0c;管理这些微信无疑是一道难题。 今天&#xff0c;就给大家分享一个能够让你高效管理多个微信号的神器——个微管理系统&#xff0c;下面&#xff0c;就一起来看看它都有哪些功能吧&#xff01; 1、多号同时登录在线 系统支持多个微信号同…

【综合类型第 39 篇】《我的创作纪念日》成为创作者的第2048天

这是【综合类型第 39 篇】&#xff0c;如果觉得有用的话&#xff0c;欢迎关注专栏。 前言 无意间看了一眼CSDN的私信&#xff0c;提示我 Allen Su &#xff0c;不知不觉今天已经是你成为创作者的 第2048天 啦&#xff0c;为了纪念这一天&#xff0c;我们为您准备了一份专属小…

2. PCI总线基本概念

PCI即Peripheral Componet Interconnect&#xff0c;中文意思是“外围器件互联”&#xff0c;是由PCISIG推出的一种局部并行总线标准。PCI总线是由ISA总线发展而来&#xff0c;是一种同步的独立于处理器的32位或64位局部总线。目前&#xff0c;PCI总线广泛应用于连接显卡&#…

操作抖音小店一直不出单怎么办?只需要做好这两点就可以了!

大家好&#xff0c;我是电商小V 最近很多新手小伙伴来咨询我说自己操作抖音小店&#xff0c;自己的店铺长时间不出单应该怎么办&#xff1f;今天咱们就来详细的说一下&#xff0c; 咱们要清楚的就是自己的店铺不出&#xff0c;只需要咱们做好这两点就可以了&#xff0c; 第一点…