✨个人主页: 熬夜学编程的小林
💗系列专栏: 【C语言详解】 【数据结构详解】【C++详解】【Linux系统编程】【Linux网络编程】【项目详解】
目录
1、pagecache
1.1、整体设计
1.2、核心实现
1.3、获取Span
1.3.1、获取一个非空的Span
1.3.2、获取一个K页的span
1.3.3、加锁版获取非空span
1、pagecache
1.1、整体设计
page cache 与 central cache结构的相同之处
central cache的结构与thread cache 是相同的,都是哈希桶结构,并且page cache的每个哈希桶中挂的也是一个个的span,这些span也是按照双链表的结构链接起来的。
page cache 与 central cache结构的不同之处
1、page cache与 central cache的映射规则不同(包括thread cache)。page cache的哈希桶映射规则采用的是直接定址法,比如1号桶挂的都是1页的span,2号桶挂的都是2页的span,一次类推。
2、central cache每个桶中的span被切成一个个对应大小的对象,以供thread cache申请。而page cache当中的span是没有被进一步切小的,因为page cache服务的是central cache,当central cache没有span时,向page cache申请的是某一固定页数的span,而如何切分申请到的这个span由central cache自己来决定。
page cache当中究竟有多少个桶,这就要看你最大想挂多大页的span了,我们这里就最大挂128页的span,为了让桶号与页号对应起来,我们可以将第0号桶空出来,因此我们需要将哈希桶的个数设置成129。
// page cache中哈希桶的个数
static const size_t NPAGES = 129;
为什么这里最大挂128页的span呢?
因为线程申请单个对象最大是256KB,而128页可以被切成4个256KB的对象,因此是足够的。当然,如果你想在page cache中挂更大的span也是可以的,根据具体的需求进行设置即可。
在page cache中获取一个n页的span
1、如果central cache要获取一个n页的span,那我们就可以在page cache的第n号桶中取出一个span返回给central cache即可。
2、如果第n号桶中没有span,这时我们并不是直接向堆申请一个n页的span,而是继续在后面的桶中寻找span。
- 当第n号桶中没有span时,我们可以找第n+1号桶,因为我们可以将第n+1页的span分成一个n页的span和一个1页的span,这时我们就可以将n页的span返回,而将切分后的1页挂接到1号桶中。
- 当后面的桶当中都没有span,这时我们就只能向堆申请一个128页的内存块,并将其用一个span结构管理起来,然后将128页的span切分成n页的span和128-n页的span,其中n页的span返回给central cache,而128-n页的span挂到第128-n号桶中。
- 注意:当我们直接向堆申请以页为单位的内存时,我们应该尽量申请大块一点的内存块,因为此时申请到的内存是连续的,当线程需要内存时我们可以将其切小后分配给线程,当线程将内存释放后我们又可以将其合并成大块连续的内存。
实际上,我们每次向堆申请的都是128页大小的内存块,central cache要的这些span实际都是由128页的span切分出来的。
1.2、核心实现
1、当每个线程的thread cache没有内存时都会向central cache申请,此时多个线程的thread cache如果访问的不是central cache的同一个桶,那么这些线程是可以同时进行访问的。这时central cache的多个桶可能同时向page cache申请内存,索引page cache也是存在线程安全问题的,因此在访问page cache时也必须要加锁。
- 但是page cache这里不能使用桶锁,因为当central cache向page cache申请内存时,page cache可能会将其他桶当中大页的span切小再给central cache。此外,当central cache将某个span归还给page cache时,page cache也会尝试将该span与其他桶当中的span进行合并。
- 换句话说,在访问page cache时,我们可能需要访问page cache中的多个桶,如果page cache用桶锁就会出现大量频繁的加锁和解锁,导致程序的效率低下。因此我们在访问page cache时没有使用桶锁,而是用一把大锁将整个page cache锁住。
2、page cache在整个进程中只能存在一个,因此我们也需要将其设置为单例模式。
// 单例模式
class PageCache
{
public:// 提供一个全局访问点static PageCache* GetInstance(){return &_sInst;}std::mutex _pageMtx; // 大锁,设置公有方便调用
private:SpanList _spanLists[NPAGES];// C++98,构造函数私有PageCache(){}// C++11,禁止拷贝PageCache(const PageCache&) = delete;static PageCache _sInst;
};
当程序运行起来后我们就立马创建该对象即可。
// 类外初始化静态成员
PageCache PageCache::_sInst;
1.3、获取Span
1.3.1、获取一个非空的Span
thread cache向central cache申请对象时,central cache需要先从对应的哈希桶中获取到一个非空的Span,然后从这个非空的Span中取出指定大小的对象返回给thread cache。那central cache到底是如何从对应的哈希桶中获取到一个非空的Span的呢?
1、遍历central cache对应哈希桶中的双链表,如果该双链表中有非空的Span,那么直接将Span返回即可。为了方便遍历这个双链表,我们可以模拟迭代器的方式,给SpanList类提供Begin和End成员函数,分别获取双链表的第一个Span结点和最后一个Span结点的下一个结点,也就是头结点。
// 管理Span的链表结构
class SpanList
{
public:Span* Begin(){return _head->_next;}Span* End(){return _head;}
private:Span* _head; // 双向链表头结点指针
public: std::mutex _mtx; // 桶锁,方便类外访问
};
2、但如果遍历双链表后发现双链表中没有span,或该双链表中的span都为空,那么此时central cache就需要向page cache申请内存块了。
那具体是向page cache申请多大的内存块呢?
我们可以根据具体所需对象的大小来决定,就像之前我们根据对象的大小计算出,thread cache一次向central cache申请对象的个数上限,现在我们是根据对象的大小计算出,central cache一次应该向page cache申请几页的内存块。
我们可以先根据对象的大小计算出,thread cache一次向central cache申请对象的个数上限,然后将这个上限值乘以单个对象的大小,就算出了具体需要多少字节,最后再将这个算出来的字节数转换为页数,如果转换后不够一页,那么我们就申请一页,否则转换出来是几页就申请几页。也就是说,central cache向page cache申请内存时,要求申请到的内存尽量能够满足thread cache向central cache申请时的上限。
class SizeClass
{
public: // central cache一次向page cache获取多少页static size_t NumMovePage(size_t size){// 计算出thread cache一次向central cache申请对象的个数上限size_t num = NumMoveSize(size); // num 个size 大小的对象所需的字节数size_t nPage = num * size;nPage >>= PAGE_SHIFT; // 将字节数转换为页数if (nPage == 0)nPage = 1; // 至少给一页return nPage;}
};
- PAGE_SHIFT代表页大小转换偏移,我们这里以页的大小为8K为例,PAGE_SHIFT的值就是13。
// 页大小转换偏移,即一页定义为2^13,也就是8KB
static const size_t PAGE_SHIFT = 13;
- 注意:当central cache申请到若干页的span后,还需要将这个span切成一个个对应大小的对象挂到该span的自由链表当中。
如何找到一个span所管理的内存块呢?
首先需要计算出该span的起始地址,我们可以用这个span的起始页号乘以一页的大小即可得到这个span的起始地址,然后用这个span的页数乘以一页的大小就可以得到这个span所管理的内存块的大小,用起始地址加上内存块的大小即可得到这块内存块的结束位置。
明确了这块内存的起始和结束位置后,我们就可以进行切分了。根据所需对象的大小,每次从大块内存切出一块固定大小的内存块尾插到span的自由链表中即可。
为什么是尾插呢?
因为我们如果是将切好的对象尾插到自由链表,这些对象看起来是按照链式结构链接起来的,而实际它们在物理上是连续的,这时当我们把这些连续内存分配给某个线程使用时,可以提高该线程的CPU缓存利用率。
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{// 1.先在list中寻找非空的SpanSpan* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr){return it;}else{it = it->_next;}}// 2.list中没有非空的Span,只能向page cache申请Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));// 3.计算span大块内存的其实地址和大块内存的大小(字节数)char* start = (char*)(span->_pageID << PAGE_SHIFT); // 起始地址size_t bytes = span->_n << PAGE_SHIFT; // 内存大小char* end = start + bytes; // 结束地址// 4.把大块内存切成自由链表链接起来// a.先切一小块做头,方便尾插(定义一个尾结点指针)span->_freeList = start;start += size;void* tail = span->_freeList;int i = 1; // 用于调试while (start < end){++i;Next(tail) = start;tail = Next(tail); // tail = startstart += size;}Next(tail) = nullptr; // 将尾结点指向空// b.切好Span以后,将切好的Span挂到桶里面(头插)list.PushFront(span);return span;//return nullptr;
}
1.在list中寻找非空的Span
2.申请内存和计算起始地址
3.把大块内存切成自由链表链接起来
注意:当我们把span切分好之后,需要将这个切分好的span挂接到central cache对应的哈希桶中。因此SpanList类需要提供一个将span插入到双链表的接口。此处我们选择头插,这样当central cache下一次从该双链表获取非空span时,一来就能找到。
由于SpanList类已经实现了Insert和Begin函数,这里实现双链表的头插直接在双链表的Begin位置进行Insert(在头结点插入数据)即可。
// 管理Span的链表结构
class SpanList
{
public:// 头插Spanvoid PushFront(Span* span){Insert(Begin(), span); // Begin之前插入span}
private:Span* _head; // 双向链表头结点指针
public: std::mutex _mtx; // 桶锁,方便类外访问
};
1.3.2、获取一个K页的span
当我们调用上述的GetOneSpan从central cache的某个哈希桶获取一个非空的span时,如果遍历哈希桶中的双链表后发现双链表中没有span,或该双链表中的span都为空,那么此时central cache就需要向page cache申请若干页的span了,下面我们来分析如何从page cache获取一个k页的span。
1、因为page cache是直接按照页数进行映射的,因此我们要从page cache获取一个k页的span,就应该 直接先去找page cache的第k号桶, 如果第k号桶中有span,那我们直接头删一个span返回给central cache就行了。所以我们这里需要再给 SpanList类添加对应的Empty和PopFront函数。
// 管理Span的链表结构
class SpanList
{
public:// 判断是否为空bool Empty(){return _head == _head->_next;}// 头删SpanSpan* PopFront(){Span* front = _head->_next;Erase(front);return front;}
private:Span* _head; // 双向链表头结点指针
public: std::mutex _mtx; // 桶锁,方便类外访问
};
2、如果page cache的第k号桶中没有span,我们就应该继续找后面的桶,只要后面任意一个桶中有一个n页的span,我们就可以将其切分成一个k页的span和一个n-k页的span,然后将切出来k页span返回给central cache,再将n-k页的span挂接到page cache的第n-k号桶中。
3、如果后面的桶中也都没有span,此时我们需要向堆申请一个128页的span了,在向堆申请内存时,直接调用我们封装的SystemAlloc函数即可。
注意:向堆申请内存后得到的是这块内存的起始地址,此时我们需要将该地址转换为页号。由于我们向堆申请内存时都是按照页进行申请的,因此我们直接将该地址除以一页的大小即可得到对应的页号。
// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES); // 保证在桶的范围内// 1.检查第k个桶里面有没有Span,有则返回if (!_spanLists[k].Empty()){return _spanLists[k].PopFront();}// 2.检查一下后面的桶里面有没有span,有则将其切分for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;// 在nSpan的头部切一个k页下来kSpan->_pageID = nSpan->_pageID;kSpan->_n = k;// 更新nSpan位置nSpan->_pageID += k;nSpan->_n -= k;// 将剩下的挂到对应映射的位置_spanLists[nSpan->_n].PushFront(nSpan);return kSpan;}}// 3.没有大页的span,找堆申请128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1); // 申请128页内存bigSpan->_pageID = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1; // 页数_spanLists[bigSpan->_n].PushFront(bigSpan);// 递归调用自己(避免代码重复)return NewSpan(k);
}
1.第k个桶中有Span
2.第k个桶后面有Span
3.没有大页的span,找堆申请128页的span
说明:当我们向堆申请到128页的span后,需要将其切分成k页的span和128-k页的span,但是为了尽量避免出现重复的代码,此处最好不要再编写对应的切分代码。我们可以先将申请到的128页的span挂接到page cache对应的哈希桶中,然后再递归调用该函数即可,此时在往后找span时就一定会在第128号桶找到该span,然后进行切分。
有一个问题:当central cache向page cache申请内存时,central cache对应的哈希桶是处于加锁的状态的,那在访问page cache之前我们应不应该把central cache对应的桶锁解掉呢?
这里建议在访问page cache前,先把central cache对应的桶锁解掉。虽然此时central cache的这个桶当中是没有内存供其他thread cache申请的,但thread cache除了申请内存还会释放内存,如果在访问page cache前将central cache对应的桶锁解掉,那么此时当其他thread cache想要归还内存到central cache的这个桶时就不会被阻塞。
因此在调用NewSpan函数之前,我们需要先将central cache对应的桶锁解掉,然后再将page cache的大锁加上,当申请到k页的span后,我们需要将page cache的大锁解掉,但此时我们不需要立刻获取到central cache中对应的桶锁。因为central cache拿到k页的span后还会对其进行切分操作,因此我们可以在span切好后需要将其挂到central cache对应的桶上时,再获取对应的桶锁。
1.3.3、加锁版获取非空span
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{// 1.先在list中寻找非空的SpanSpan* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr){return it;}else{it = it->_next;}}// 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞list._mtx.unlock();// 2.list中没有非空的Span,只能向page cache申请PageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));PageCache::GetInstance()->_pageMtx.unlock();// 3.计算span大块内存的其实地址和大块内存的大小(字节数)char* start = (char*)(span->_pageID << PAGE_SHIFT); // 起始地址size_t bytes = span->_n << PAGE_SHIFT; // 内存大小char* end = start + bytes; // 结束地址// 4.把大块内存切成自由链表链接起来// a.先切一小块做头,方便尾插(定义一个尾结点指针)span->_freeList = start;start += size;void* tail = span->_freeList;int i = 1; // 用于调试while (start < end){++i;Next(tail) = start;tail = Next(tail); // tail = startstart += size;}Next(tail) = nullptr; // 将尾结点指向空// b.切好Span以后,将切好的Span挂到桶里面(头插),再加锁list._mtx.lock();list.PushFront(span);return span;//return nullptr;
}