P4 并发控制

文章目录

    • Task1 锁管理器
      • LockTable
      • UnLockTable
      • LockRow
      • UnLockRow
    • Task2 死锁检测
    • Task3 并发查询执行器
      • Isolation Level
      • seq_scan_executor
      • insert_executor
      • delete_executor
      • transaction_manager

Task1 锁管理器

LockManager类包含两个属性类,分别是LockRequestLockRequestQueue,行锁请求还是表锁请求都适配LockRequest,其属性:

/** Txn_id of the txn requesting the lock */
txn_id_t txn_id_;
/** 请求锁的锁定模式 */
LockMode lock_mode_;
/** Oid of the table for a table lock; oid of the table the row belong to for a row lock */
table_oid_t oid_;  // 若是表锁则表示表的Oid;若是行锁则表示行所属表的oid
/** 为行锁时是行RID;未用于表锁 */
RID rid_;  // RID应该是唯一的
/** 锁是否被授予 */
bool granted_{false};

一个资源(行或表)可能有多个事务进行请求,所以有了LockRequestQueue来先入先出地处理来的请求,其属性:

/** 同一资源(表或行)的锁请求列表 */
std::list<std::shared_ptr<LockRequest>> request_queue_;/** 通知此rid上被阻止的事务 */
std::condition_variable cv_;  // 条件变量
/** 升级事务的TXN id(如有) */
txn_id_t upgrading_ = INVALID_TXN_ID;  // 正在此资源上尝试锁升级的事务id.
/** 配合 */
std::mutex latch_;  // 与cv_配合使用实现等待资源,即这边锁住,cv_那边等待std::vector<size_t> lock_vector_;  // 该表或该行已授予的各个锁的数量,5种锁

LockManager类的属性:

TransactionManager *txn_manager_;/** 结构,用于保存给定表oid的锁请求 */
std::unordered_map<table_oid_t, std::shared_ptr<LockRequestQueue>> table_lock_map_;  // 每个表对应的的锁队列
/** Coordination */
std::mutex table_lock_map_latch_;/** 结构,用于保存给定RID的锁请求 */
std::unordered_map<RID, std::shared_ptr<LockRequestQueue>> row_lock_map_;
/** Coordination */
std::mutex row_lock_map_latch_;std::atomic<bool> enable_cycle_detection_;
std::thread *cycle_detection_thread_;/** 等待图表示。 */
// std::unordered_map<txn_id_t, std::vector<txn_id_t>> waits_for_;
std::mutex waits_for_latch_;
// 重新设计wait_for_,使得若t1等待t2的资源,令t2指向t1,释放t2时减少所有等待t2的事务的入度
std::map<txn_id_t, std::set<txn_id_t>> waits_for_;  // 保证每次都从txn_id_t最小的事务开始扫描
std::map<txn_id_t, int> visited_;  // 表示一个节点未被访问过(0),正在访问(1),已经被访问完(2)
bool hascycle_;                    // 是否有环
std::unordered_map<txn_id_t, std::unordered_set<table_oid_t>> map_txn_table_; // 记录事务锁住的表
std::unordered_map<txn_id_t, std::unordered_set<RID>> map_txn_row_; // 记录事务锁住的行

LockTable

T
F
T
T
F
T
T
F
F
T
F
T
F
T
T
F
F
T
T
F
LockTable()
获取隔离级别和事务状态
事务状态是提交或回滚
退出,返回false
事务状态是增长阶段
隔离级别是可重复读
DealLockOfTable()
隔离级别是读未提交
要加锁不是X锁或IX锁
读未提交下加共享锁异常
隔离级别是读已提交
隔离级别是可重复读
收缩状态加锁异常
隔离状态是读未提交
要加锁是X锁或IX锁
隔离级别是读已提交
要加锁是S锁或IS锁
//lock_manager.h的LOCK_NOTE(部分)
/**
REPEATABLE_READ:事务需要获取所有锁。所有锁都允许处于GROWING状态在收缩状态下不允许有锁
READ_COMMITTED:事务需要获取所有锁。所有锁都允许处于GROWING状态只有IS、S锁允许处于收缩状态
READ_UNCOMMITTED:事务只需要接收IX、X锁。在GROWING状态下允许使用X、IX锁。S, IS, SIX是不允许的
**/

DealLockOfTable()要做的就是进行加锁的准备工作。先将table_lock_map_属性加锁,其表示表oid和锁请求队列间的映射,若其中无对应oid的内容则初始化oid的键值对。解锁table_lock_map_加锁表的锁请求队列,查找该表的锁队列,当前事务是否曾请求过,若未请求过就新建一个锁请求std::make_shared<LockRequest>(txn_id, lock_mode, oid);插入锁队列中,调用WaitForLock()返回。若已锁住该表,若这次请求的锁和之前的模式一样,将队列解锁,返回true。剩下的情况就是两次锁模式不同,尝试锁升级(说是升级,其实是锁模式切换,去旧换新),若有其他事务在升级该表的其他锁(),则解锁队列、设置事务状态为回滚、抛出UPGRADE_CONFLICT异常。取得队列中旧锁请求,判断锁升级是否兼容:

//lock_manager.h
/**
LOCK UPGRADE:在已经锁定的资源上调用Lock()应该具有以下行为:- 如果请求的锁模式与当前持有的锁模式相同,lock()应该返回true,因为它已经拥有该锁。- 如果请求的锁模式不同,lock()应该升级事务持有的锁。基本上应该有三个步骤来执行锁升级一般- 1。检查升级的前提条件- 2。放下当前锁,保留升级位置- 3。等待新锁授予正在升级的锁请求应该优先于同一资源上等待的其他锁请求。在升级时,只允许以下转换:IS -> [S, X, IX, SIX]S  -> [X, SIX]IX -> [X, SIX]SIX-> [X]任何其他升级都被认为是不兼容的,这样的尝试应该将TransactionState设置为ABORTED,并抛出TransactionAbortException (INCOMPATIBLE_UPGRADE)。此外,应该只允许一个事务在给定资源上升级其锁。同一资源上的多个并发锁升级应该将TransactionState设置为ABORTED,并抛出TransactionAbortException (UPGRADE_CONFLICT)。
**/

升级锁其实就是将锁请求队列中lock_vector_旧锁模式对应的计数减一,然后将reuest_queue_中的旧锁删除(erase()),调用RemoveFromSet()将事务中与旧锁模式相对应的表锁集合中的表oid删除,恢复granted_、设置upgrading_为当前事务id、更新锁模式、锁请求队列request_queue_中添加新锁请求,调用WaitForLock()等待授予锁。

WaitForLock()就是真正授予锁的函数了,前面一个while循环是为了阻塞其他线程进行加锁,之后和清除旧锁过程类似,granted_置true、调用InsertToSet()将表oid插入表锁集合中、锁队列的lock_vector_中对应锁模式的计数器加一、将upgrading_恢复。

WaitForLock()开头的while循环作用是实现线程的阻塞等待锁资源。具体来说,它使用了一个条件变量cv_和一个互斥锁std::unique_lock<std::mutex> mylock(lock_queue->latch_, std::adopt_lock);std::adopt_lock托管了lock_queue->latch_unique_lock)来实现线程的阻塞和唤醒。循环条件是CanGrantLock()返回false,其作用是根据upgranding_(当前请求对应的事务正在升级锁,是最高优先级)和锁请求在request_queue_中的顺序(没有事务正在升级锁,迭代器遍历找到第一个与其他所有已被授予的锁兼容CompatibleLock()但还未被授予!granted_的那个锁请求)判断当前请求现在是否适合获得锁。

只要不满足条件,进入循环,lock_queue->cv_.wait(mylock);阻塞并释放托管的lock_queue->latch直到解锁表的时候notify_all()唤醒,之后检测事务状态,若是ABORTED则直接从request_queue_中删除,因为处理死锁的线程会把死锁的事务的状态置为ABORTED

UnLockTable

分别处理:

  1. table_lock_map_中没有表oid到锁请求队列的映射:ATTEMPTED_UNLOCK_BUT_NO_LOCK_HELD异常
  2. HaveRowLock()通过获取事务的两种行锁集合(s_row_lock_set_x_row_lock_set_)检测该事务在该表oid的行上有锁未释放:TABLE_UNLOCKED_BEFORE_UNLOCKING_ROWS异常
  3. 迭代器遍历requeset_queue_检测事务没有表oid的锁:ATTEMPTED_UNLOCK_BUT_NO_LOCK_HELD异常

处理完异常状态后:

T
F
T
T
F
T
F
F
开始
是共享锁
隔离级别是可重复读&&事务状态是增长阶段
是排他锁&&事务状态是增长阶段
事务状态更新为收缩阶段
事务状态更新为收缩阶段
隔离级别是读未提交
唤醒所有线程,返回false
解锁

之后真正进入解锁代码,同加锁,lock_vector_对应锁模式加一,删除request_queue_中锁请求,RemoveFromSet()删除事务的特定锁模式对应的表锁集合中的表oid,唤醒所有线程,解锁锁请求队列。

LockRow

加行锁和加表锁类似,区别在于:

  1. 行锁不支持意愿锁
  2. 收缩阶段时,加X锁直接LOCK_ON_SHRINKING异常
  3. 收缩阶段,可重复读和读未提交都不允许加锁,LOCK_ON_SHRINKINGLOCK_SHARED_ON_READ_UNCOMMITTED异常
  4. 收缩阶段,读已提交只允许加S锁

DealLockOfRow()DealLockOfTable()类似,区别在于:

  1. 行锁开头需要检查本事务是否有相应表锁CheckTableLock()(取事务的行锁集合查找对应表的oid)没有就直接抛异常TABLE_LOCK_NOT_PRESENT
  2. 升级锁时,因为只有两种锁,所以判断兼容就只需要判断不能是X锁转S锁即可

WaitForRowLock()WaitForLock()极其相似,逻辑相同。

UnLockRow

解锁与解表锁同。

Task2 死锁检测

锁管理器应该在后台线程中运行死锁检测,定期构建等待图,并根据需要中止事务以消除死锁。

您可能需要从成员变量 txn_manager_ 访问事务的状态。如果’ txn_manager_ ‘被设置为’ nullptr ', ’ StartDeadlockDetection '将不会被调用,并且您不需要检测死锁。

/** 等待图表示。 */
std::mutex waits_for_latch_;
// 重新设计wait_for_,使得若t1等待t2的资源,令t2指向t1,释放t2时减少所有等待t2的事务的入度
std::map<txn_id_t, std::set<txn_id_t>> waits_for_;  // 保证每次都从txn_id_t最小的事务开始扫描
std::map<txn_id_t, int> visited_;  // 表示一个节点未被访问过(0),正在访问(1),已经被访问完(2)
//visited_的下标和waits_for_中的key和value集合里的值是匹配的
bool hascycle_;                    // 是否有环
std::unordered_map<txn_id_t, std::unordered_set<table_oid_t>> map_txn_table_;  // 记录事务锁住的表
std::unordered_map<txn_id_t, std::unordered_set<RID>> map_txn_row_; // 记录事务锁住的行

等待图一般都是等待的事务指向被等待的事务,但这里反过来了,因为之后需要当死锁时删除等待的事务,这样更快。

AddEdge()将等待的事务插入insert()进被等待的事务key对应的集合中。

RemoveEdge()删除erase()事务。

HasCycle()判断等待图中是否有环,先将访问列表visited_中所有结点初始化为0,之后遍历waits_for_,只要key事务未访问过就DFS(),使用的是深度优先搜索:

// 从txn_id开始深度优先遍历
void LockManager::DFS(txn_id_t txn_id) {visited_[txn_id] = 1;  // 该节点正在被访问for (const txn_id_t id : waits_for_[txn_id]) {//遍历等待该事务的事务集合if (visited_[id] == 0) {DFS(id);if (hascycle_) {//当递归找到一个环,直接返回return;}} else if (visited_[id] == 1) {hascycle_ = true;  // id被访问过, 现在又将被访问, 有环return;}}visited_[txn_id] = 2;  // 所有与txn_id相关的节点均已被访问,改为2, 无环
}

DFS()返回后若找到一个环就遍历visited_,如果是无环的事务结点会被改置为2,还为1的只会是return返回的有环结点,将该结点赋给入参,返回true。

GetEdgeList()嵌套循环waits_for_将边记录下返回。

RunCycleDetection()死锁检验线程的运行函数:

std::thread *cycle_detection_thread_;
std::atomic<bool> enable_cycle_detection_;void StartDeadlockDetection() {BUSTUB_ENSURE(txn_manager_ != nullptr, "txn_manager_ is not set.")enable_cycle_detection_ = true;cycle_detection_thread_ = new std::thread(&LockManager::RunCycleDetection, this);
}

RunCycleDetection()是一个循环函数,由enable_cycle_detection_控制,每次循环都会清空并重新创建等待图waits_for_并记录事务锁住的表和行(map_txn_table_map_txn_row_),然后内部一个循环,由HasCycle()控制,只要有死锁就设置死锁事务状态为回滚,删除waits_for_中死锁事务相关的边(即等待关系),然后从死锁事务锁住的表和行(map_txn_table_map_txn_row_)中获取表和行的id,从tabe_lock_map_row_lock_map_中获得死锁锁住的表和行的锁请求队列,lock_vector_中该死锁事务的锁模式计数器减一,RemoFromSet()RemoveFromRowSet()删除锁集合中的表oid和rid,删除锁请求队列中死锁事务的请求,lock_queue->cv_.notify_all();唤醒锁住的表和行的其他请求线程。

Task3 并发查询执行器

Isolation Level

  • 一个事务应该为所有写操作持有X锁,直到它提交或终止,无论它的隔离级别如何。
  • 对于 REPEATABLE_READ,一个事务应该为所有读操作持有S锁,直到提交或终止。
  • 对于 READ_COMMITTED,一个事务应该为所有读操作占用S锁,但可以立即释放它们。
  • 对于 READ_UNCOMMITTED,事务不需要为读操作获取任何S锁。

seq_scan_executor

Init()锁表,Next()锁表

//Init()开头
isl_ = exec_ctx_->GetTransaction()->GetIsolationLevel();
if (exec_ctx_->IsDelete()) {  // 当前操作是delete或updateTryLockTable(LockManager::LockMode::INTENTION_EXCLUSIVE, plan_->table_oid_);  // IX锁
} else {if (isl_ == IsolationLevel::REPEATABLE_READ || isl_ == IsolationLevel::READ_COMMITTED) {TryLockTable(LockManager::LockMode::INTENTION_SHARED, plan_->table_oid_);  // IS锁}
}//Next()开头
if (exec_ctx_->IsDelete()) {TryLockRow(LockManager::LockMode::EXCLUSIVE, plan_->table_oid_, it_->GetRID());  // 删除或更新操作加写锁
} else {if (isl_ == IsolationLevel::REPEATABLE_READ || isl_ == IsolationLevel::READ_COMMITTED) {TryLockRow(LockManager::LockMode::SHARED, plan_->table_oid_, it_->GetRID());  // 只读操作加读锁}
}

TryLockXXX()是将加锁函数try-catch包围,拼凑出错时的异常信息,解锁TryUnLockXXX()同理。

Next()获取元组后,判断元组未删除(pair.first.is_deleted_)若执行器上下文属性exec_ctx_is_delete_(标识这次查询表是否是为了删除/更改表中数据)为false,且隔离级别为READ_COMMITTED,就立即解锁行。已删除时直接解锁行。当查询到表末尾后,隔离状态是READ_COMMITTED且执行器不会删除/修改数据,则解锁表。

insert_executor

try {if (!exec_ctx_->GetLockManager()->LockTable(exec_ctx_->GetTransaction(), LockManager::LockMode::INTENTION_EXCLUSIVE,plan_->table_oid_)) {throw ExecutionException(std::string("Insert Table Ix lock fail"));}
} catch (TransactionAbortException &e) {throw ExecutionException(std::string("Insert Table Ix lock fail"));
}

delete_executor

如果你在执行器context中正确地基于’ IsDelete() ‘实现了’ SeqScanExecutor ',那么你不需要在这个执行器中使用任何锁。

transaction_manager

主要实现两个函数:TransactionManager::Abort()TransactionManager::Commit()

Commit()先调用ReleaseLocks()释放事务持有的所有锁:先从事务的各个锁集合中收集行、表锁到新容器中,然后嵌套循环解锁行锁、循环解锁表锁。之后设置事务状态为COMMITTED

Abort()负责恢复事务的写集中的所有更改。每次对表进行插入、删除时都会在事务的table_write_set_index_write_set_中插入一条TableWriteRecordIndexWriteRecord,其中有操作的各种信息(如操作的表oid、对应的表堆、行rid,索引oid等)。遍历两个写集合,对里面存放的修改过的元组(主要是插入和删除的)的元信息进行回退(设置is_deleted_),然后弹出集合,设置事务状态为ABOTTED

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/114901.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

flink中使用GenericWriteAheadSink的优缺点

背景 GenericWriteAheadSink是flink中提供的实现几乎精确一次输出的数据汇抽象类&#xff0c;本文就来看一下使用GenericWriteAheadSink的优缺点 GenericWriteAheadSink的优缺点 先看一下GenericWriteAheadSink的原理图 优点&#xff1a; 几乎可以精确一次的输出&#xf…

漏洞扫描系统的主要功能有哪些

漏洞扫描系统是一种自动化的工具&#xff0c;用于发现和报告计算机网络系统中的安全漏洞。这些漏洞可能包括软件漏洞、配置错误、不安全的网络设备等。漏洞扫描系统的主要功能包括以下几个方面&#xff1a; 目标识别&#xff1a;漏洞扫描系统首先需要识别目标系统的基本信息&am…

在Word中,图片显示不全

在今天交作业的时候&#xff0c;发现了一个非常SB的事情&#xff0c;把图片复制过去显示不完全&#xff1a; 使用文心一言查看搜索了一下&#xff0c;发现可能是以下几种原因&#xff1a; 图片所在行的行高设置不正确。可以重新设置行高&#xff0c;具体步骤包括打开图片显示…

【网络编程】基于epoll的ET模式下的Reactor

需要云服务器等云产品来学习Linux的同学可以移步/-->腾讯云<--/-->阿里云<--/-->华为云<--/官网&#xff0c;轻量型云服务器低至112元/年&#xff0c;新用户首次下单享超低折扣。 目录 一、Reactor介绍 二、基于epoll的ET模式下的Reactor计算器代码 1、Tcp…

[计算机入门] 应用软件介绍(娱乐类)

3.21 应用软件介绍(娱乐类) 3.21.1 音乐&#xff1a;酷狗 音乐软件是一类可以帮助人们播放、管理和发现音乐的应用程序。它们提供了丰富的音乐内容&#xff0c;用户可以通过搜索、分类浏览或个性化推荐等方式找到自己喜欢的歌曲、专辑或艺术家。音乐软件还通常支持创建和管理…

Python基础入门例程1-NP1 Hello World!

描述 将字符串 Hello World! 存储到变量str中&#xff0c;再使用print语句将其打印出来。 输入描述&#xff1a; 无 输出描述&#xff1a; 一行输出字符串Hello World! 解答&#xff1a; str "Hello World!" print(str) 解释说明&#xff1a; 赋值变量&…

Leetcode系列(双语)——GO两数之和

文章目录 两数之和 &#xff08;Two Sum&#xff09;解题拓展——SolutionSolution 1: Brute ForceCodeLanguage: Go 两数之和 &#xff08;Two Sum&#xff09; Given an array of integers nums and an integer target, return indices of the two numbers such that they ad…

Metabase:简单快捷的商业智能与数据分析工具 | 开源日报 No.61

moby/moby Stars: 66.8k License: Apache-2.0 Moby 是一个由 Docker 创建的开源项目&#xff0c;旨在实现和加速软件容器化。它提供了工具包组件的“乐高集”&#xff0c;可以将它们组装成基于容器的自定义系统的框架。组件包括容器生成工具、容器注册表、业务流程工具、运行时…

Django系列之DRF搜索和过滤

1. model之间关系 class Publish(models.Model):name models.CharField(max_length32)city models.CharField(max_length8)email models.CharField(max_length32)class Book(models.Model):title models.CharField(max_length32)price models.DecimalField(max_digits5, …

互联网Java工程师面试题·Java 面试篇·第四弹

目录 59、我们能自己写一个容器类&#xff0c;然后使用 for-each 循环码&#xff1f; 60、ArrayList 和 HashMap 的默认大小是多数&#xff1f; 61、有没有可能两个不相等的对象有有相同的 hashcode&#xff1f; 62、两个相同的对象会有不同的的 hash code 吗&#xff1f; …

[AUTOSAR][诊断管理][ECU][$14] 清除诊断相关信息

文章目录 一、简介(1)应用场景(2)清除DTC原理(3) 请求格式二、示例代码(1) 14_cls_dtc_info.c三、 常见bug大揭秘一、简介 根据ISO14119-1标准中所述,诊断服务14主要用于Client向Server(ECU)请求清除诊断相关信息。 (1)应用场景 一般而言,14诊断服务,主要应用场景…

Kubernetes 通过 Deployment 部署Jupyterlab

概要 在Kubernetes上部署jupyterlab服务&#xff0c;链接Kubernetes集群内的MySQL&#xff0c;实现简单的数据开发功能。 前置条件 镜像准备&#xff1a;自定义Docker镜像--Jupyterlab-CSDN博客 MySQL-Statefulset准备&#xff1a;StatefulSet 简单实践 Kubernetes-CSDN博客…

​​​​​​​2022年上半年 软件设计师 上午试卷(1-32)

以下关于冯诺依曼计算机的叙述中&#xff0c;不正确的是 &#xff08;1&#xff09; 。 &#xff08;1&#xff09; A. 程序指令和数据都采用二进制表示 B. 程序指令总是存储在主存中&#xff0c;而数据则存储在高速缓存中 C. 程序的功能都由中央处理器&#xff08;CPU&…

5、k8s部署Nginx Proxy Manager

前言 Nginx-Proxy-Manager 是一个基于 Web 的 Nginx 服务器管理工具&#xff0c;它允许用户通过浏览器界面轻松地管理和监控 Nginx 服务器。通过 Nginx-Proxy-Manager&#xff0c;可以获得受信任的 SSL 证书&#xff0c;并通过单独的配置、自定义和入侵保护来管理多个代理。用…

Linux自有服务与软件包管理

服务是一些特定的进程&#xff0c;自有服务就是系统开机后就自动运行的一些进程&#xff0c;一旦客户发出请求&#xff0c;这些进程就自动为他们提供服务&#xff0c;windows系统中&#xff0c;把这些自动运行的进程&#xff0c;称为"服务" 举例&#xff1a;当我们使…

5.5G移动通信技术

5.5G即5G-Advanced&#xff0c;是一种移动通信技术。 5.5G 是 5G 和 6G 之间的过渡阶段&#xff0c;将在速率、时延、连接规模和能耗方面全面超越现有 5G&#xff0c;有望实现下行万兆和上行千兆的峰值速率、毫秒级时延和低成本千亿物联。按照国际标准组织 3GPP 定义&#xff…

Linux:用户和权限

Linux&#xff1a;用户和权限 1. 认知root用户1.1 root用户&#xff08;超级管理员&#xff09;1.2 su和exit命令1.3 sudo命令1.3.1 为普通用户配置sudo认证 2. 用户、用户组管理2.1 用户组管理2.2 用户管理2.3 getent命令 3. 查看权限控制3.1 认知权限信息3.1.1 案例 4. 修改权…

rust学习——引用与借用(references-and-borrowing)

引用与借用&#xff08;references-and-borrowing&#xff09; 先看一个返回参数的所有权的代码 fn main() {let s1 String::from("hello");let (s2, len) calculate_length(s1);println!("The length of {} is {}.", s2, len); }fn calculate_length(…

二、BurpSuite Intruder暴力破解

一、介绍 解释&#xff1a; Burp Suite Intruder是一款功能强大的网络安全测试工具&#xff0c;它用于执行暴力破解攻击。它是Burp Suite套件的一部分&#xff0c;具有高度可定制的功能&#xff0c;能够自动化和批量化执行各种攻击&#xff0c;如密码破解、参数枚举和身份验证…

Promise详解:手写Promise底层-实现Promise所有的功能和方法

前言 目标&#xff1a;封装一个promise&#xff0c;更好的理解promise底层逻辑需求&#xff1a;实现以下promise所有的功能和方法 如下图所示一、构造函数编写 步骤 1、定义一个TestPromise类&#xff0c; 2、添加构造函数&#xff0c; 3、定义resolve/reject&#xff0c; 4、…