同学参加CSCC2024数据库系统赛道比赛,我和他一起研究了一些优化的case,最后成功拿到全国2/325。在这里记录一下我们讨论优化过的问题(建议把源码下下来边读边搜代码,否则会晕)
行锁占用内存过大
Q:TPCC测试中行锁占了大量内存,疑似是glibc的malloc的内存管理有问题,生产中换个其他库比如jemalloc就能解决,但是OJ只有glibc
相关代码:
/*** @description: 申请行级共享锁* @return {bool} 加锁是否成功* @param {Transaction*} txn 要申请锁的事务对象指针* @param {Rid&} rid 加锁的目标记录ID 记录所在的表的fd* @param {int} tab_fd*/
bool LockManager::lock_shared_on_record(Transaction *txn, const Rid &rid, int tab_fd) {std::lock_guard lock(latch_);if (!check_lock(txn)) {return false;}LockDataId lock_data_id(tab_fd, rid, LockDataType::RECORD);auto &&it = lock_table_.find(lock_data_id);if (it == lock_table_.end()) {it = lock_table_.emplace(std::piecewise_construct, std::forward_as_tuple(lock_data_id),std::forward_as_tuple()).first;it->second.oldest_txn_id_ = txn->get_transaction_id();}auto &lock_request_queue = it->second;for (auto &lock_request: lock_request_queue.request_queue_) {// 如果锁请求队列上该事务已经有共享锁或更高级别的锁(X)了,加锁成功if (lock_request.txn_id_ == txn->get_transaction_id()) {// 事务能执行到这里,要么第一次申请,要么等待结束了,拿到锁了assert(lock_request.granted_);return true;}}// 如果其他事务有 X 锁,加锁失败(no-wait)// if (lock_request_queue.group_lock_mode_ == GroupLockMode::X || lock_request_queue.group_lock_mode_ == GroupLockMode::IX || lock_request_queue.group_lock_mode_ == GroupLockMode::SIX) {// lock_request_queue.cv_.notify_all();// throw TransactionAbortException(txn->get_transaction_id(), AbortReason::DEADLOCK_PREVENTION);// }// 第一次申请,检查锁队列中有没有冲突的事务// Check for conflicting locks and apply wait-die logicif (lock_request_queue.group_lock_mode_ == GroupLockMode::X || lock_request_queue.group_lock_mode_ ==GroupLockMode::IX || lock_request_queue.group_lock_mode_ == GroupLockMode::SIX) {if (txn->get_transaction_id() > lock_request_queue.oldest_txn_id_) {throw TransactionAbortException(txn->get_transaction_id(), AbortReason::DEADLOCK_PREVENTION);}lock_request_queue.oldest_txn_id_ = txn->get_transaction_id();lock_request_queue.request_queue_.emplace_back(txn->get_transaction_id(), LockMode::SHARED);std::unique_lock<std::mutex> ul(latch_, std::adopt_lock);auto &&cur = lock_request_queue.request_queue_.begin();lock_request_queue.cv_.wait(ul, [&lock_request_queue, txn, &cur]() {for (auto &&it = lock_request_queue.request_queue_.begin(); it != lock_request_queue.request_queue_.end();++it) {if (it->txn_id_ != txn->get_transaction_id()) {if (it->lock_mode_ != LockMode::SHARED || it->granted_) {return false;}} else {cur = it;break;}}return true;});cur->granted_ = true;lock_request_queue.group_lock_mode_ = static_cast<GroupLockMode>(std::max(static_cast<int>(GroupLockMode::S), static_cast<int>(lock_request_queue.group_lock_mode_)));++lock_request_queue.shared_lock_num_;txn->get_lock_set()->emplace(lock_data_id);ul.release();return true;}// 每次事务申请锁都要更新最老事务idif (txn->get_transaction_id() < lock_request_queue.oldest_txn_id_) {lock_request_queue.oldest_txn_id_ = txn->get_transaction_id();}// 将当前事务锁请求加到锁请求队列中lock_request_queue.request_queue_.emplace_back(txn->get_transaction_id(), LockMode::SHARED, true);// 更新锁请求队列锁模式为共享锁lock_request_queue.group_lock_mode_ = GroupLockMode::S;++lock_request_queue.shared_lock_num_;txn->get_lock_set()->emplace(lock_data_id);return true;
}/*** @description: 申请行级排他锁* @return {bool} 加锁是否成功* @param {Transaction*} txn 要申请锁的事务对象指针* @param {Rid&} rid 加锁的目标记录ID* @param {int} tab_fd 记录所在的表的fd*/
bool LockManager::lock_exclusive_on_record(Transaction *txn, const Rid &rid, int tab_fd) {std::lock_guard lock(latch_);if (!check_lock(txn)) {return false;}LockDataId lock_data_id(tab_fd, rid, LockDataType::RECORD);auto &&it = lock_table_.find(lock_data_id);if (it == lock_table_.end()) {it = lock_table_.emplace(std::piecewise_construct, std::forward_as_tuple(lock_data_id),std::forward_as_tuple()).first;it->second.oldest_txn_id_ = txn->get_transaction_id();}auto &lock_request_queue = it->second;for (auto &lock_request: lock_request_queue.request_queue_) {// 该事务上的锁请求队列上已经有互斥锁了,加锁成功if (lock_request.txn_id_ == txn->get_transaction_id()) {assert(lock_request.granted_);if (lock_request.lock_mode_ == LockMode::EXCLUSIVE) {return true;}// 如果当前记录没有其他事务在读,升级写锁if (lock_request.lock_mode_ == LockMode::SHARED && lock_request_queue.request_queue_.size() == 1) {lock_request.lock_mode_ = LockMode::EXCLUSIVE;lock_request_queue.group_lock_mode_ = GroupLockMode::X;lock_request_queue.shared_lock_num_ = 0;return true;}assert(lock_request.lock_mode_ == LockMode::SHARED);// 整个队列的时间戳不一定严格降序,需比较其中最老的事务id,用一个 oldest_txn_id_ 变量来维护,且等待队列中的处于等待的当前事务不可能还会申请其他锁了(阻塞)// 无论有没有得到锁都要先进入等待队列,得到锁后 granted_ 置真if (txn->get_transaction_id() > lock_request_queue.oldest_txn_id_) {// Younger transaction requests the lock, abort the current transactionthrow TransactionAbortException(txn->get_transaction_id(), AbortReason::DEADLOCK_PREVENTION);}lock_request_queue.oldest_txn_id_ = txn->get_transaction_id();lock_request_queue.request_queue_.emplace_back(txn->get_transaction_id(), LockMode::EXCLUSIVE);std::unique_lock<std::mutex> ul(latch_, std::adopt_lock);auto &&cur = lock_request_queue.request_queue_.begin();// 通过条件:当前请求之前没有任何已授权的请求lock_request_queue.cv_.wait(ul, [&lock_request_queue, txn, &cur]() {for (auto &&it = lock_request_queue.request_queue_.begin();it != lock_request_queue.request_queue_.end(); ++it) {if (it->txn_id_ != txn->get_transaction_id()) {if (it->granted_) {return false;}} else {cur = it;break;}}return true;});cur->granted_ = true;lock_request_queue.group_lock_mode_ = GroupLockMode::X;txn->get_lock_set()->emplace(lock_data_id);ul.release();return true;}}// 如果其他事务有其他锁,加锁失败(no-wait)if (lock_request_queue.group_lock_mode_ != GroupLockMode::NON_LOCK) {if (txn->get_transaction_id() > lock_request_queue.oldest_txn_id_) {// Younger transaction requests the lock, abort the current transactionthrow TransactionAbortException(txn->get_transaction_id(), AbortReason::DEADLOCK_PREVENTION);}lock_request_queue.oldest_txn_id_ = txn->get_transaction_id();lock_request_queue.request_queue_.emplace_back(txn->get_transaction_id(), LockMode::EXCLUSIVE);std::unique_lock<std::mutex> ul(latch_, std::adopt_lock);auto &&cur = lock_request_queue.request_queue_.begin();// 通过条件:当前请求之前没有任何已授权的请求lock_request_queue.cv_.wait(ul, [&lock_request_queue, txn, &cur]() {for (auto &&it = lock_request_queue.request_queue_.begin(); it != lock_request_queue.request_queue_.end();++it) {if (it->txn_id_ != txn->get_transaction_id()) {if (it->granted_) {return false;}} else {cur = it;break;}}return true;});cur->granted_ = true;lock_request_queue.group_lock_mode_ = GroupLockMode::X;txn->get_lock_set()->emplace(lock_data_id);ul.release();return true;}if (txn->get_transaction_id() < lock_request_queue.oldest_txn_id_) {lock_request_queue.oldest_txn_id_ = txn->get_transaction_id();}// 将当前事务锁请求加到锁请求队列中lock_request_queue.request_queue_.emplace_back(txn->get_transaction_id(), LockMode::EXCLUSIVE, true);// 更新锁请求队列锁模式为排他锁lock_request_queue.group_lock_mode_ = GroupLockMode::X;// 添加行级排他锁txn->get_lock_set()->emplace(lock_data_id);return true;
}
A:为什么lock_table_一直在emplace但没释放?
Q:lock_table_的value(LockRequestQueue)里面的request_queue(锁请求队列)是会在unlock时erase掉里面的元素的。但因为考虑同一个行(LockDataId)还可能会加锁,所以没有释放lock_table_的key
相关代码:
/*** @description: 释放锁* @return {bool} 返回解锁是否成功* @param {Transaction*} txn 要释放锁的事务对象指针* @param {LockDataId} lock_data_id 要释放的锁ID*/
bool LockManager::unlock(Transaction *txn, const LockDataId &lock_data_id) {std::lock_guard lock(latch_);auto &txn_state = txn->get_state();// 事务结束,不能再解锁if (txn_state == TransactionState::COMMITTED || txn_state == TransactionState::ABORTED) {return false;}if (txn_state == TransactionState::GROWING) {txn_state = TransactionState::SHRINKING;}std::unordered_map<LockDataId, LockRequestQueue>::iterator it;std::unordered_map<IndexMeta, std::unordered_map<LockDataId, LockRequestQueue> >::iterator ii;if (lock_data_id.type_ == LockDataType::GAP) {ii = gap_lock_table_.find(lock_data_id.index_meta_);if (ii == gap_lock_table_.end()) {return true;}it = ii->second.find(lock_data_id);if (it == ii->second.end()) {return true;}} else {it = lock_table_.find(lock_data_id);if (it == lock_table_.end()) {return true;}}auto &lock_request_queue = it->second;auto &request_queue = lock_request_queue.request_queue_;auto &&request = request_queue.begin();for (; request != request_queue.end(); ++request) {if (request->txn_id_ == txn->get_transaction_id()) {break;}}if (request == request_queue.end()) {return true;}// 一个事务可能对某个记录持有多个锁,S,IXdo {// 维护锁请求队列if (request->lock_mode_ == LockMode::SHARED || request->lock_mode_ == LockMode::S_IX) {--lock_request_queue.shared_lock_num_;}if (request->lock_mode_ == LockMode::INTENTION_EXCLUSIVE || request->lock_mode_ == LockMode::S_IX) {--lock_request_queue.IX_lock_num_;}// 删除该锁请求request_queue.erase(request);request = request_queue.begin();for (; request != request_queue.end(); ++request) {if (request->txn_id_ == txn->get_transaction_id()) {break;}}} while (request != request_queue.end());// 维护队列锁模式,为空则无锁// TODO 擦除锁表if (request_queue.empty()) {lock_request_queue.group_lock_mode_ = GroupLockMode::NON_LOCK;lock_request_queue.oldest_txn_id_ = INT32_MAX;// 唤醒等待的事务lock_request_queue.cv_.notify_all();// if (lock_data_id.type_ == LockDataType::GAP) {// // 相交的间隙锁也得唤醒// for (auto &[data_id, queue]: ii->second) {// // if (queue.group_lock_mode_ != GroupLockMode::NON_LOCK) {// if (lock_data_id.gap_.isCoincide(data_id.gap_)) {// queue.cv_.notify_all();// }// // }// }// }return true;}// 否则找到级别最高的锁和时间戳最小的事务auto max_lock_mode = LockMode::INTENTION_SHARED;for (auto &request: request_queue) {max_lock_mode = std::max(max_lock_mode, request.lock_mode_);if (request.txn_id_ < lock_request_queue.oldest_txn_id_) {lock_request_queue.oldest_txn_id_ = request.txn_id_;}}lock_request_queue.group_lock_mode_ = static_cast<GroupLockMode>(static_cast<int>(max_lock_mode) + 1);// 唤醒等待的事务lock_request_queue.cv_.notify_all();// if (lock_data_id.type_ == LockDataType::GAP) {// // 相交的锁表也得唤醒// for (auto &[data_id, queue]: ii->second) {// // if (queue.group_lock_mode_ != GroupLockMode::NON_LOCK) {// if (lock_data_id.gap_.isCoincide(data_id.gap_)) {// queue.cv_.notify_all();// }// // }// }// }return true;
}
A:现在问题是占内存,不就应该erase掉lock_table_里的东西吗(否则哈希表过大)?
Q:但erase好像并没有释放内存,只是还给内存池了
A:还给内存池就可以了啊。不释放给OS也只会导致其它程序无法分配,你自己的程序不受影响。但现在是你自己的程序内存占的过多,所以不是链接里的那个问题
实现异步写盘后速度反而更慢
异步写盘代码:
DiskScheduler::DiskScheduler(DiskManager *disk_manager) : disk_manager_(disk_manager) {// TODO(P1): remove this line after you have implemented the disk scheduler API// throw NotImplementedException(// "DiskScheduler is not implemented yet. If you have finished implementing the disk scheduler, please remove the// " "throw exception line in `disk_scheduler.cpp`.");// Spawn the background threadstop_thread_ = false;background_thread_.emplace([&] { StartWorkerThread(); });// background_thread_->detach();
}DiskScheduler::~DiskScheduler() {// Put a `std::nullopt` in the queue to signal to exit the loop// for (auto &[_, req] : request_queue_) {// std::ignore = _;// req.Put(std::nullopt);// }stop_thread_ = true;if (background_thread_.has_value()) {if (background_thread_.value().joinable()) {background_thread_->join();}}
}void DiskScheduler::Schedule(DiskRequest r) { request_queue_[r.page_id_.page_no].Put(std::make_optional(std::move(r))); }void DiskScheduler::ScheduleRead(Page &page) {auto e = request_queue_[page.get_page_no()].TryReadFromQueue();if (e.has_value() && e.value().has_value()) {auto &last_req = e.value();memcpy(page.get_data(), last_req->data_, PAGE_SIZE);} else {disk_manager_->read_page(page.get_page_id().fd, page.get_page_id().page_no, page.get_data(), PAGE_SIZE);request_queue_[page.get_page_no()].LoadBuffer(DiskRequest(page.get_page_id(), page.get_data()));}
}void DiskScheduler::StartWorkerThread() {while (!stop_thread_) {for (auto &[_, req] : request_queue_) {std::ignore = _;while (auto e = req.Get()) {if (!e.has_value()) {break;}auto &r = e.value();disk_manager_->write_page(r.page_id_.fd, r.page_id_.page_no, r.data_, PAGE_SIZE);std::this_thread::sleep_for(std::chrono::milliseconds(100));}}std::this_thread::sleep_for(std::chrono::milliseconds(1000));}
}
外部调度代码:
/*** @description: 更新页面数据, 如果为脏页则需写入磁盘,再更新为新页面,更新page元数据(data, is_dirty, page_id)和page table* @param {Page*} page 写回页指针* @param {PageId} new_page_id 新的page_id* @param {frame_id_t} new_frame_id 新的帧frame_id*/
void BufferPoolInstance::update_page(Page *page, PageId new_page_id, frame_id_t new_frame_id) {// Todo:// 1 如果是脏页,写回磁盘,并且把dirty置为false// 2 更新page table// 3 重置page的data,更新page idif (page->is_dirty()) {
#ifdef ENABLE_LOGGING// 置换出脏页且 lsn 大于 persist 时需要刷日志回磁盘if (log_manager_ != nullptr && page->get_page_lsn() > log_manager_->get_persist_lsn()) {log_manager_->flush_log_to_disk();}
#endif
#ifdef ENABLE_ASYNC_DISKauto it = disk_scheduler_.find(page->get_page_fd());if (it == disk_scheduler_.end()) {it = disk_scheduler_.emplace(page->get_page_fd(), disk_manager_).first;}it->second.Schedule({page->get_page_id(), page->get_data()});
#elsedisk_manager_->write_page(page->get_page_id().fd, page->get_page_id().page_no, page->get_data(), PAGE_SIZE);
#endifpage->is_dirty_ = false;}page_table_.erase(page->get_page_id());page_table_[new_page_id] = new_frame_id;// TODO new_page 需要 reset,update 不需要,因为会被覆盖page->reset_memory();page->id_ = new_page_id;page->pin_count_ = 0;
}
A:这个disk_scheduler_里大概有多少个元素?
Q:一个表对应一个disk_scheduler_,一个disk_scheduler_是一个线程。现在跑的这个test case是有9个表
Q:TPCC大部分跑的sql都是过索引的,buffer pool页面命中率很高,需要置换的次数很少,也就某个sql会有20次来次,其他基本0次或者个位数
A:那本来(实现异步写盘之前)是偶尔write_page卡一下,现在是九个线程一直sleep 1000,可能对主线程有点影响。建议不要构造DiskScheduler的时候就启动线程,可以等put时候再启动,比如看request数量大于多少之后再创建线程之类的,然后处理完了就关闭。也可以用信号量实现,有任务的时候让系统自动拉起线程,比较轻量,可以参考这个实现:
class AsyncIncreWriteThread : public QThread {
private://单例模式,这块不需要懒加载。异步增量写入线程一定需要。static AsyncIncreWriteThread* asyncIncreWriteThread;SwitchRecordStore* recordStore;QSemaphore semaphore;int32_t batchSize;atomic_int32_t unInCreWriteCount; //保证并发操作安全 信号量会有强制写入操作,而这个数据可靠AsyncIncreWriteThread(const int32_t& size){this->recordStore = new SwitchRecordStore();this->batchSize = size;this->unInCreWriteCount = 0;}//析构函数里面都是同步释放 仅在最头部控制异步~AsyncIncreWriteThread(){MemUtil::clearPtrMem(recordStore);}void addComitInfo(const vector<pair<ResourceInfo, vector<Record*>>>& upDateInfo, const vector<pair<ResourceInfo, vector<IndexRecord*>>>& indexUpDateInfo, const vector<pair<ResourceInfo, vector<uint64_t>>>& bitMapUpDateInfo){const int32_t& recCount = recordStore->addComitInfo(upDateInfo, indexUpDateInfo, bitMapUpDateInfo);//依据的是record个数unInCreWriteCount.fetch_add(recCount); //先更新个数再释放信号量semaphore.release(recCount);}void forceIncreWriteToDisk(){semaphore.release(batchSize); //释放所需的信号量 强制开始异步写入while(unInCreWriteCount.load() != 0); //非阻塞等待完成 如果本来就为0会立刻返回}bool isResUnIncreWriteToDisk(const ResourceInfo& resInfo){return recordStore->isResUnIncreWriteToDisk(resInfo);}protected:virtual void run(){while(true){//这里获取信号量不严格 外界调用相关函数释放所需信号量会强制刷进磁盘semaphore.acquire(batchSize);if(unInCreWriteCount.load() == 0){ continue; } //无需异步写入 跳过直接等待下一次异步写//根据记录写磁盘 并更新同步进度IOUtil::updateSyncProgress(recordStore->increWriteToDisk() * Record::getRecordSize());unInCreWriteCount.store(0);}}
}
缓冲池分片不均衡
Q:缓冲池在事务执行过程中有几千万次fetchpage的函数调用,目前我用了一把大锁来解决并发,效率比较低,profile发现缓冲池几乎不发生写盘,读盘和等锁占了比较多的时间,所以想用缓冲池分片的方法来缓解等锁的问题(原先只有一个缓冲池/一个锁,现在开成多个)。但是现在通过哈希来分配缓冲池并没有实现很好的负载均衡,比如两个缓冲池,一个缓冲池被占满了,一个缓冲池只用了1/3
相关代码:
Page *BufferPoolManager::new_page(PageId *page_id) {*page_id = {page_id->fd, disk_manager_->allocate_page(page_id->fd)};return instances_[get_instance_no(*page_id)]->new_page(page_id);
}
PageId定义:
struct PageId {int fd; // Page所在的磁盘文件开启后的文件描述符, 来定位打开的文件在内存中的位置page_id_t page_no = INVALID_PAGE_ID;friend bool operator==(const PageId &x, const PageId &y) { return x.fd == y.fd && x.page_no == y.page_no; }friend bool operator!=(const PageId &x, const PageId &y) { return !(x == y); }bool operator<(const PageId &x) const {if (fd < x.fd) return true;return page_no < x.page_no;}std::string toString() {return "{fd: " + std::to_string(fd) + " page_no: " + std::to_string(page_no) + "}";}// inline int64_t Get() const {// return (static_cast<int64_t>(fd << 16) | page_no);// }
};// PageId的自定义哈希算法, 用于构建unordered_map<PageId, frame_id_t, PageIdHash>
// struct PageIdHash {
// size_t operator()(const PageId &x) const { return (x.fd << 16) | x.page_no; }
// };namespace std {template<>struct hash<PageId> {size_t operator()(const PageId &obj) const {// return (obj.fd << 16) | obj.page_no;std::size_t h1 = std::hash<int>{}(obj.fd);std::size_t h2 = std::hash<page_id_t>{}(obj.page_no);return (h1 << 1) ^ (h2);}};
}
Q:我们的场景是,fd变化很小,pageno变化很大。或许应该设计一个和pageno相关性比较大的哈希函数
A:这个是一个表一个文件(fd)的吗?
Q:对,9个表9个fd,表有大有小
A:现在这个哈希函数是h2左移之后都变成偶数了,所以如果%2的话,hash结果奇偶是否均匀就看fd的奇偶是否均匀。如果是少量大表多数小表,且大表的fd是偶数,那就是第一个pool的元素多
Bison ast::SemValue拷贝占用大量时间
A:分析了一下,速度慢是因为这个解析器是所有连接共享的,所以总在等待锁。需要改成一个连接(一个线程)一个解析器。首先对flex和bison的配置做修改,主要是需要加%option reentrant和%param {void *xx}这两个配置:
这样改完之后,生成的解析器函数里都会多出来一个yyscanner参数,不同线程传不同的这个就行了:
每个线程一个scanner之后,parse_tree也需要改成多个:
Q:快了不少,但是火焰图几乎没变?
A:火焰图统计的是次数,不是时间
Q:每隔相同时间抽样一次,抽样数量也可以反映时间吧
A:开了多线程之后单次采样给它加的数量多了,所以实际时间和图里的数量应该是随线程数有个倍数关系的
源码仓库
Kosthi/RMDB-2024: CSCC2024数据库管理系统赛道 一等奖参赛作品 (2/325) (github.com)
同学正在找工作!!哪位正在招数据库系统研发快去私信他嗷!!他狠厉害的