【项目设计】高并发内存池(Concurrent Memory Pool)

目录

1️⃣项目介绍

🍙项目概述

🍙知识储备

2️⃣内存池介绍

🍙池化技术

🍙内存池

🍙内存池主要解决的问题

🍥内碎片

🍥外碎片

🍙malloc

3️⃣ 定长内存池设计

4️⃣ 项目整体框架实现

5️⃣Thread Cache设计

🍙自由链表

🍙对齐映射规则设计

🍥对齐大小计算

🍥映射桶号计算

🍙ThreadCache类

🍥 申请内存

🍣慢开始反馈调节算法

🍥释放内存

🍥TLS(thread local storage)无锁访问 

6️⃣Central Cache设计

🍙SpanList链表结构设计

🍙Central Cache类

🍥申请内存

🍥释放内存

7️⃣Page Cache设计

🍙Page Cache类

🍥映射查找Span

🍥申请内存

🍥释放内存

8️⃣申请释放联调

🍙申请内存联调

🍙释放内存联调

9️⃣大于256Kb大块内存申请释放问题

🍙大块内存申请问题

🍙大块内存释放问题

🔟性能对比及基数树优化

🍙性能对比

🍙性能瓶颈分析

🍙基数树优化


1️⃣项目介绍

🍙项目概述

        本项目设计一个高并发内存池(Concurrent Memory Pool)基于Google开源项目tcmalloc(Thread-Caching Malloc),即线程缓存的malloc,实现了高效的多线程内存管理,可用于替代系统的内存分配函数(malloc、free),Go语言还把tcmalloc做了自己的内存分配器。

        本项目旨在把tcmalloc的核心精华框架部分简化后拿来,自己模拟实现出一个学习版的高并发内存池

🍙知识储备

        本项目会用到C/C++ 、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等方面的知识。

2️⃣内存池介绍

🍙池化技术

        所谓 “池化技术” ,就是 程序先向系统申请过量的资源,然后自己管理,以备不时之需 。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
        在计算机中,有很多使用“ 这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。

🍙内存池

内存池的研究重点不是向操作系统申请内存,而是 对已申请到的内存的管理   
     
        内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间) 时,内存池才将之前申请的内存真正释放。

🍙内存池主要解决的问题

        内存池首先主要解决效率的问题,系统调用的性能开销是比较大的,当程序对堆的操作比较频繁时,这样做的结果会严重影响程序的性能,所以可以实现一个内存池对内存进行管理,而不是交给内核去进行系统调用。

        其次分配内存时,还要解决内存碎片的问题,内存碎片分为内碎片和外碎片

🍥内碎片

        内碎片的产生是因为申请内存空间时根据设计的对齐规则导致分配出去的空间有可能会有部分空间未被利用,这些在已经分配出去但未被使用的内存空间就是内碎片。

🍥外碎片

        外碎片的产生是因为2段空间不连续,碎片化,即使有足够的内存空间,也无法申请出来。

🍙malloc

        C/C++中我们要动态申请内存都是通过 malloc 去申请内存,但是我们要知道,实际我们不是直接使用系统调用去堆获取内存的,而是通过内存池去进行管理的,向系统获取一块大内存,然后切开分配给程序,当不够时再向系统申请大内存。malloc 就是一个内存池,底层设计是ptmalloc。
参考博客:

malloc的底层实现(ptmalloc)_z_ryan的博客-CSDN博客

3️⃣ 定长内存池设计

设计一个定长的内存池,为了将申请和释放与malloc分开,本项目要和malloc进行性能比较,那么各处实现就不能调用malloc以及对应的free,new和delete是C++的一个关键字,其底层调用了malloc和free,所以我们要避开使用C++的关键字,自己实现一个New和Delete。

定长内存池设计结构如下:

//定长内存池
template<class T>
class ObjectPool {
public:T* New(){T* obj = nullptr;//如果有还回的内存,直接使用还回的内存块if (_freeList){obj = (T*)_freeList;_freeList = *(void**)obj;//内存块中首个指针大小(头4/8字节)存的是下一个还回内存块的地址}else{//如果内存块为空或者剩余的内存块不足以继续申请T对象if (_remainbytes < sizeof(T)){_remainbytes = 128 * 1024;//128kb_memory = (char*)SystemAlloc(_remainbytes>>PAGE_SHIFT);if (_memory == nullptr){throw std::bad_alloc();}}obj = (T*)_memory;size_t objsize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);_memory += objsize;_remainbytes -= objsize;}//使用定位new调用对象的构造函数创建对象,不会自动分配内存new(obj)T;return obj;}void Delete(T* obj){//因为定位new不会管理内存释放,必须显示调用对象的析构函数obj->~T();//头插到freeList*(void**)obj = _freeList;_freeList = obj;}
private:char* _memory = nullptr;//指向内存块的指针void* _freeList = nullptr;//管理还回内存的自由链表 int _remainbytes = 0;//内存块中剩余的字节数
};

自由链表取到下一个内存块的地址设计在Thread Cache设计中自由链表模块有详细介绍,定长内存池在Windows下使用系统调用(VirtualAlloc)从堆中申请内存,在Linux下使用brk或mmap。

//堆上申请内存
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage * (1 << PAGE_SHIFT),MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else// brk mmap等
#endifif (ptr == nullptr)throw std::bad_alloc();return ptr;
}

有了内存还需要创建对象,这里采用定位new来调用对象的构造函数进行创建对象,因为定位new不会管理内存释放,所以我们在释放的时候要显示调用对象的析构函数,对资源进行清理,并且我们的释放实际上并不归还内存,而只是释放资源然后将内存挂在自由链表进行管理

4️⃣ 项目整体框架实现

高并发内存池整体框架——三级缓存
        现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc 本身其实已经很优秀,但本项目的原型 tcmalloc在多线程高并发的场景下更胜一筹 ,所以本项目实现的内存池需要考虑以下几方面的问题:
        🍘性能问题。
        🍘多线程环境下,锁竞争问题。
        🍘内存碎片问题。

高并发内存池(Concurrent Memory Pool)三级缓存:

        ⭐线程缓存(Thread Cache)——无锁

        ⭐中心缓存(Central Cache)——桶锁

        ⭐页缓存(Page Cache)       ——整体锁

设计:Thread Cache分配对象最大256Kb,根据定义的对齐映射规则计算出Thread Cache和Central Cache总桶数为208,Page Cache桶数(按页数)设计为129(0号桶不参与),采取线性映射,最大页数为128,假设1页8Kb,128*8Kb=1Mb,完全够给最大字节256Kb分4个。

static const size_t MAX_BYTES = 256 * 1024;//threadcache最大256kb
static const size_t NFREELISTS = 208;//使用static const代替define,208是根据定义的字节对齐算出的总共桶数
static const size_t NPAGES = 129;//总共Page桶数,128为最大页数,假设1页有8Kb,128*8Kb=1Mb,完全够给最大字节256Kb分4个
static const size_t PAGE_SHIFT = 13;//2^13 页大小8k

5️⃣Thread Cache设计

        Thread Cache:线程缓存是每个线程独有的,用于小于 256KB 的内存的分配, 线程从这里申请内 存不需要加锁,利用TLS无锁访问机制,每个线程独享一个 cache ,这也就是这个并发线程池高效的地方

        Thread Cache 哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表 。每个线程都会有一个thread cache 对象,这样每个线程在这里获取对象和释放对象时是无锁的。

🍙自由链表

        自由链表管理释放回来的小内存块和中心缓存中分配的未使用的小内存块,结构如下:

自由链表结构

         因为自由链表是用来管理小内存块的,所以其必须能够指向下一块小内存,那么当对象的大小<当前平台指针大小时,需要按指针的大小进行划分

        关于不同平台的问题,32位平台 指针大小4Byte,64位平台 指针大小8Byte。那么如何设计获取指针大小呢?

*(void**)    获取指针大小地址//获取结点obj存的下一个结点地址(前4/8字节),加static仅当前文件可见,防止重定义
static void*& NextObj(void* obj)
{return *(void**)obj;
}

        * 解引用本质上是对地址区间进行获取类型大小的内容,比如int*,对int*进行 * 解引用,实际上是获取int类型大小的地址内容,也就是4Byte的内容。

        void**是指针的指针,*(void**),就是对获取void*类型大小的地址内容,此时如果是32位平台就获得了4Byte大小内容,如果是64位平台就获得了8Byte大小内容。


        在Thread Cache中哈希桶每个桶就是一个自由链表,自由链表中一定会有插入、删除、判空等操作,并且我们还可以记录个数_size,_maxSize这个桶最多能挂多少个,那么这么多个自由链表就需要被管理,我们设计一个管理自由链表的结构:

//管理切好的小对象自由链表
class FreeList
{
public:void Push(void* obj){//头插NextObj(obj) = _freeList;_freeList = obj;++_size;}void PushRange(void* start, void* end,size_t n){NextObj(end) = _freeList;_freeList = start;_size += n;}void PopRange(void*& start, void*& end, size_t n){//头删assert(n >= _size);start = _freeList;end = start;for (size_t i = 0; i < n - 1; i++){end = NextObj(end);}_freeList = NextObj(end);NextObj(end) = nullptr;_size -= n;}void* Pop( ){//头删void* obj = _freeList;_freeList = NextObj(obj);--_size;return obj;}bool Empty(){return _freeList == nullptr;}size_t Size(){return _size;}size_t& MaxSize(){return _maxSize;}
private:void* _freeList = nullptr;size_t _maxSize = 1;//自由链表最大个数size_t _size = 0;
};

🍙对齐映射规则设计

🍥对齐大小计算

[1,128]8byte对齐freelist[0,16)
[128+1,1024]16byte对齐freelist[16,72)
[1024+1,8*1024]128byte对齐freelist[72,128)
[8*1024+1,64*1024]1024byte对齐freelist[128,184)
[64*1024+1,256*1024]8*1024byte对齐freelist[184,208)

        该设计规则除了第一个桶的内碎片浪费大,保证其他桶内碎片浪费整体保证在10%左右。

        (内碎片浪费率=浪费的字节/分配的字节),比如现在有129字节,就要分配144字节,只使用第一个16byte对齐桶的1个字节,浪费15字节,但总共分配了128+16=144字节,所以内碎片浪费率=15/144=10.4%

        根据设计规则,通过传入参数(字节数),进行简单逻辑判断跳转至子函数_RoundUp进行对齐后的字节数计算。

//对齐大小计算static inline size_t RoundUp(size_t bytes){assert(bytes <= MAX_BYTES);if (bytes <= 128){return _RoundUp(bytes, 8);}else if (bytes <= 1024){return _RoundUp(bytes, 16);}else if (bytes <= 8 * 1024){return _RoundUp(bytes, 128);}else if (bytes <= 64 * 1024){return _RoundUp(bytes, 1024);}else if (bytes <= 256 * 1024){return _RoundUp(bytes, 8*1024);}else{assert(false);}return -1;}

        对齐后的字节数计算函数(_RoundUp)设计我们学习参考tcmalloc的实现,采用位运算的方式进行,该设计思路十分巧妙,值得我们去学习使用。

	//计算对齐后的bytes大小static inline size_t _RoundUp(size_t bytes, size_t alignNum){return (bytes + alignNum - 1) & ~(alignNum - 1);}

🌰例子:

         bytes=7   alignNum=8
         alignNum-1=7        0000 0111
         ~(alignNum-1)       1111 1000
         7+8-1=15               0000 1111
         &                            0000 1000 = 8 = 对齐后所占大小
       
         bytes=9        alignNum=8
         9+8-1=16               0001 0000
         &                            0001 0000 = 16 = 对齐后所占大小

🍥映射桶号计算

        首先根据上面设计的对齐映射规则,我们可以计算得到对应桶号的区间,利用数组将区间桶号保存,再使用简单逻辑判断进入子函数(_Index)计算当前所在区间映射到的桶号,最终对齐映射的桶号=区间前的桶数+当前区间桶号

//计算映射在哪一个桶static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);//每个字节对齐数区间的最大链数(桶数)static int group[4] = { 16,56,56,56 };if (bytes <= 128) {return _Index(bytes, 3);}else if (bytes <= 1024) {return _Index(bytes - 128, 4) + group[0];}else if (bytes <= 8 * 1024) {return _Index(bytes - 1024, 7) + group[1] + group[0];}else if (bytes <= 64 * 1024) {return _Index(bytes - 8 * 1024, 10) + group[2] + group[1] + group[0];}else if (bytes <= 256 * 1024) {return _Index(bytes - 64 * 1024, 13) + group[3] + group[2] + group[1] + group[0];}else {assert(false);}return -1;}

        同样的在这里学习参考tcmalloc的设计,巧妙使用位运算进行当前区间桶号计算,位运算比算术运算更加高效。

//计算当前对齐大小对应的所在桶号static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}

🌰例子:

        [1,8]    align_shift=3    1<<3=8
        ((1+8-1)>>3)-1=0    0号桶
        ...
        ((8+8-1)>>3)-1=0    0号桶
        [9,16]    align_shift=3    1<<3=8
        ((9+8-1)>>3)-1=1    1号桶
        ...
        ((16+8-1)>>3)-1=0    1号桶
        bytes=129    抛去bytes=128前的桶,只剩1bytes,再分配16字节对齐的0号桶,
        总桶号就是前128bytes桶号+当前16bytes的桶号

🍙ThreadCache类

class ThreadCache
{
public:// 申请和释放内存对象void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);// 从中心缓存获取对象void* FetchFromCentralCache(size_t index, size_t size);//释放对象时,链表过长时,回收内存回到中心缓存void ListTooLong(FreeList& list, size_t size);private:FreeList _freeLists[NFREELISTS];
};//TLS——无锁使变量在线程与线程之间独立
static __declspec(thread) ThreadCache* TLS_ThreadCache = nullptr;

🍥 申请内存

//申请内存
void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);size_t alignSize = AMSize::RoundUp(size);size_t index = AMSize::Index(size);if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}else{//去中心缓存取return FetchFromCentralCache(index,alignSize);}
}
当内存申请 size<=256KB ,先获取到线程本地存储的 Thread Cache 对象,计算 size 映射的哈希桶自由链表下标i
⭐如果自由链表_freeLists[i] 中有对象,则直接 Pop 一个内存对象返回。
Pop()函数在上面的FreeList类中,因为是从自由链表上取走一个去使用,所以需要返回值void*
//头删
void* Pop( ){void* obj = _freeList;_freeList = NextObj(obj);--_size;return obj;}

⭐如果_freeLists[i] 中没有对象时,则批量从 Central Cache 中获取一定数量的对象,头插入到自由链表并返回一个对象。
void* ThreadCache::FetchFromCentralCache(size_t index,size_t alignSize)
{size_t batchNum = min(_freeLists[index].MaxSize(), AMSize::NumMoveSize(alignSize));if (_freeLists[index].MaxSize() == batchNum){//想修改返回值所以使用引用作为MaxSize返回值_freeLists[index].MaxSize() += 1;}void* start = nullptr;void* end = nullptr;size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, alignSize);assert(actualNum >= 1);if (actualNum == 1){assert(start == end);return start;}else{//返回1个(start),剩下的(从start下一个开始)挂接到桶上_freeLists[index].PushRange(NextObj(start), end,actualNum-1);return start;}
}

     对于需求不同字节大小,从Central Cache获取的分配个数又需要考虑性能, 对于分配8bytes,可以多分配一些(但要有上限),对于256*1024bytes,则少分配些(但要有下限)
采用慢开始反馈调节算法
    1.最开始不会一次向Central Cache一次批量要太多,因为要太多可能用不完
    2.如果不要这个size大小内存需求,那么betchNum就会不断增长直到上限。
    3.size越大,一次向Central Cache要的batchNum就越小
    4.size越小,一次向Central Cache要的batchNum就越大 

🍣慢开始反馈调节算法

// 一次从中心缓存获取多少个static size_t NumMoveSize(size_t size){assert(size > 0);// [2, 512],一次批量移动多少个对象的(慢启动)上限值// 小对象一次批量上限高// 小对象一次批量上限低int num = MAX_BYTES / size;if (num < 2)num = 2;if (num > 512)num = 512;return num;}

🌰如果只需要8Byte大小,从Central Cache获取批量数就是256*1024/8,其结果大于512,返回512个;如果需要256Kb大小,从Central Cache获取批量数就是256Kb/256Kb=1,其结果小于2,返回2个。

        这样设计批量在于确定上下限,不会使得从中心缓存获取的小块内存过多或过少,如果获取过多,一直不使用,达到一定数量时又会回收给Central Cache,多此一举,所以确定上下限。计算结果在上下限之间的就返回计算个数。

🍥释放内存

⭐当释放内存小于256Kb 时将内存释放回 Thread Cache ,计算 size 映射自由链表桶位置 i ,将对象 Push到_freeLists[i]
//释放内存
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);//找到映射的自由链表桶,对象插入进去size_t index = AMSize::Index(size);_freeLists[index].Push(ptr);//当链表长度大于一次批量申请的内存时就开始还一段list给CentralCache if (_freeLists[index].Size() >= _freeLists[index].MaxSize()){ListTooLong(_freeLists[index], size);}
}
⭐当链表的长度过长,则回收一部分内存对象到Central Cache
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{void* start = nullptr;void* end = nullptr;list.PopRange(start, end, list.MaxSize());CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

start和end在PopRange中是输出型参数,进入PopRange中进行头删将待回收的链表内存对象拿出来返还给Central Cache 。

🍥TLS(thread local storage)无锁访问 

        我们在设计中要求每一个线程都有一个独属于自己的ThreadCache类,如果我们把他ThreadCache类实现为全局的,那么必然每个线程共享这个类,势必会发生竞争问题,需要加锁。

        频繁的控制锁的加锁和解锁会增加时间成本,这显然和我们要的高性能不相符,所以这里提出一个变量存储方法TLS,线程局部存储TLS,该方法下:变量在当前线程下是全局可访问的,在线程和线程之间是独立局部的,这有效的实现了每个线程独属于自己的类,避免加锁。

//TLS——无锁使变量在线程与线程之间独立
static __declspec(thread) ThreadCache* TLS_ThreadCache = nullptr;

我们使用TLS机制,创建一个ThreadCache类指针,进行多线程下创建线程独立的类。该指针在申请和释放联调过程中调用。

6️⃣Central Cache设计

        Central Cache也是一个哈希桶结构,他的哈希桶的映射关系跟T hread Cache 是一样的。不同的是他的每个哈希桶位置挂是SpanList 链表结构,不过每个映射桶下面的 span 中的大内存块被按映射关系切成了一个个小内存块对象挂在span 的自由链表中。

🍙SpanList链表结构设计

Span管理以页为单位大小的大内存块

Span是以页为单位,那么就涉及到一个问题,页号在32位下,最高(2^32)/(2^13)=2^19,2^19我们需要4字节大小来表示,可以用size_t类型可以表示,但如果是64位下,页号最高(2^64)/(2^8)=2^51,我们需要8字节大小来表示,可以用unsigned long long类型。所以我们使用条件编译进行判断使用何种变量:

#ifdef _WIN64typedef unsigned long long PAGE_ID;
#elif _WIN32typedef size_t PAGE_ID;
#else//linux
#endif

细节:64位系统下,包含了宏_WIN32和_WIN64;如果把_WIN32放在最开始判断,那么就无法识别出64位系统,会一直识别为32位,所以我们将_WIN64放在最开始判断64位系统

但实际上size_t在64位下是unsigned long long 或者unsigned _int64类型(范围:[0,2^64 -1]),32位下是unsigned int类型。如果想要编写可移植的代码,应该避免直接使用int或long类型,而是要使用size_t类型。

 所以你也可以简化为:

#ifdef _WIN32typedef size_t PAGE_ID;//64位下也有宏_WIN32
#else//linux
#endif


Span里存储页号、页数、前后指针、切分小块内存的大小(用于释放的时候传参)、切分好的小块内存的数目(回收对象,如果Span内切分出去的对象全部回收,即_useCount=0,回收Span给PageCache进行页合并)、切好小块内存的自由链表、该Span是否被使用(用以合并Span判断)

struct Span
{PAGE_ID _pageId= 0;//大块内存起始页的页号size_t _n = 0;//页的数量,本质和PageCache中的SpanList数组(桶)下标一致,可以用来寻找挂接的桶位置Span* _next = nullptr;Span* _prev = nullptr;size_t _objSize = 0;//切好的对象大小size_t _useCount = 0;//切好小块内存,分配个thread_cache的计数void* _freeList = nullptr;//切好小块内存的自由链表bool _isUse = false;//判断是否被使用
};

SpanList带头双向循环链表,其结构如下:(一个头结点以及桶锁)

//带头双向循环链表
class SpanList
{
public:SpanList(){_head = new Span;_head->_prev = _head;_head->_next = _head;}Span* Begin(){return _head->_next;}Span* End(){return _head;}void PushFront(Span* span){Insert(Begin(), span);}void Insert(Span* pos, Span* newSpan){assert(pos);Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;pos->_prev = newSpan;newSpan->_next = pos;}Span* PopFront( ){Span* span = _head->_next;Erase(span);return span;}void Erase(Span* pos){assert(pos);assert(pos != _head);//不能删带头结点Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}bool Empty(){return _head->_next == _head;}
private:Span* _head;
public:std::mutex _mtx;
};

🍙Central Cache类

        Central Cache:中心缓存是所有线程所共享, Thread Cache 按需从 central cache 中获取 的对象。Central Cache 合适的时机回收 Thread Cache 中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的
        Central Cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有 Thread Cache当 没有内存对象时才会找 Central Cache ,所以这里竞争不会很激烈

        Central Cache是所有线程共享的,所以只设计1个,并且当程序运行的时候我们就要创建出来,所以我们用单例模式的饿汉模式。

#pragma once
#include"Common.h"//因为所有线程对象共用一个CentralCache,
//所以设计成单例模式
class CentralCache
{
public:static CentralCache* GetInstance(){return _pInst;}//获取一个非空的spanSpan* GetOneSpan(SpanList& list, size_t size);//从中心缓存获取一定数量的对象给Thread Cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);// 将一定数量的对象释放到span跨度void ReleaseListToSpans(void* start, size_t size);private:SpanList _spanLists[NFREELISTS];
private://构造函数私有CentralCache(){}//禁止拷贝构造函数CentralCache(const CentralCache&) = delete;static CentralCache* _pInst;//声明
};

🍥申请内存

⭐当Thread Cache 中没有内存时,就会批量向 Central Cache 申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;Central Cache也有一个哈希映射的
spanList spanList 中挂着 span ,从 span中取出对象给Thread Cache,这个过程是需要加锁的,不
过这里使用的是一个桶锁,尽可能提高效率。
        从Central Cache中的span取对象,那么一定是Thread Cache的桶中没有剩余的对象,因为我们是从span中获取的,那么一定是一端连续的内存,我们 只需要首位地址就可以,而且需要将首位地址返回(设置为输出型参数) ,用来给Thread Cache头插挂接一段(PushRange)对象。
//从中心缓存获取一定数量的对象给Thread Cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{size_t index = AMSize::Index(size);_spanLists[index]._mtx.lock();//加桶锁Span* span = GetOneSpan(_spanLists[index], size);assert(span);assert(span->_freeList);//从span中获取batchNum个对象,如果不够batchNum个,有多少拿多少end = start = span->_freeList;size_t i = 0;size_t actualNum = 1;while (i < batchNum - 1 && NextObj(end) != nullptr){end = NextObj(end);++i;++actualNum;}//span中内存的自由链表指向分出后的余下内存结点span->_freeList = NextObj(end);//分出的最后个结点指向空NextObj(end) = nullptr;span->_useCount += actualNum;_spanLists[index]._mtx.unlock();return actualNum;
}

        这里使用桶锁,防止多个线程同时访问一个桶,造成线程安全问题。

        并且从Central Cache中的span切分(在GetOne中切分)batchNum对象给Thread Cache,但是可能实际上span并没剩下那么多,只能将剩下的分配给Thread Cache,所以需要统计一个实际值actualNum_useCount+=actualNum更新span中切分出去的对象,保证回收不会出错

        返回实际分配到的对象数目,在Thread Cache中返回1个使用,剩余的actualNum头插挂接到Central Cache对应的桶上

⭐Central Cache映射的spanList 中所有 span 的都没有内存以后,则需要向 Page Cache 申请一个新的span对象,拿到 span 以后将 span 管理的内存按大小切好作为自由链表链接到一起。然后从 span中取对象给Thread Cache。
//获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{//查看当前的spanlist这时是否有 未分配对象的spanSpan* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr){return it;}else{it = it->_next;}}//先把CentralCache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞list._mtx.unlock();//走到此说明没有空闲span了,只能找PageCache要PageCache::GetInstance()->_pageMtx.lock();Span* span=PageCache::GetInstance()->NewSpan(AMSize::NumMovePage(size));span->_isUse = true;span->_objSize = size;//(小于256Kb)三级缓存中一定是从PageCache中去拿,存储对象大小PageCache::GetInstance()->_pageMtx.unlock();//切分span并挂接到桶,此时不需要加锁,因为这会其他线程访问不到这个span//计算span的大块内存的起始地址和大块内存的大小(字节数)char* start = (char*)(span->_pageId << PAGE_SHIFT);size_t bytes = span->_n << PAGE_SHIFT;char* end = start + bytes;//把大块内存切成自由链表链接起来//先切一块下来做头,方便尾插,尾插是为了保存地址顺序span->_freeList = start;start += size;void* tail = span->_freeList;while (start < end){NextObj(tail) = start;tail = NextObj(tail);start += size;}//最后一个span内的小内存块指向空NextObj(tail) = nullptr;//切好span后,挂接到桶需要加锁list._mtx.lock();list.PushFront(span);return span;
}

        如果Central Cache当前桶有剩余的span,直接返回该span,不需要去Page Cache申请span。

        如果没有剩余span,解开桶锁,进入PageCache中获取span,获取后记录使用情况和存储对象大小,并且Page Cache实际上我们也只设计了1个,所以他也需要加锁。

        为什么要解开桶锁?

CentralCache是桶锁,PageCache是整个锁。

在CentralCache::GetOneSpan()中获取一个span,需要从Page获取Span时,先把桶锁解掉,如果此时线程1和2都执行GetOneSpan(),因为PageCache::NewSpan()有整个锁,产生阻塞,也不会产生混乱

也就是说CentralCache在此时解不解锁在获取Span时作用一样,但是我可以线程1在这个桶拿Span,并且线程2在这个桶释放Span,为了提高效率,所以我们解开桶锁

        页缓存获取span是按页来分配的,所以接口NewSpan需要传递页数,我们设计NumMovePage获取页数:传递申请的对象对齐大小,先进入NumMoveSize获取向Central Cache申请的span个数,对齐大小*个数=总Byte,总Byte/页大小=需要的页数,不满足1页给1页。

	static size_t NumMovePage(size_t size){size_t num = NumMoveSize(size);size_t npage = num * size;	//算出需要的总Byte大小npage >>= PAGE_SHIFT;		//总Byte大小/页大小=需要的页数if (npage == 0)npage = 1;return npage;}

        从Page Cache中获取span后,我们span中只存储了页信息,但没有他的地址信息,那我们怎么获得地址去管理连接内存对象呢?

        这里就要引入一个概念:页的起始地址=页号*页大小

                                                页的尾地址=起始地址+页的数量*页的大小

                                                页号=页的起始地址/页大小

        那么在相邻页之间地址,其地址大小小于后面一页的起始地址,➗页大小必定也能得到该页的页号。这在回收中有着重要作用。

        从Page Cache中获取到span后,我们通过上面的概念,可以计算出该span的起始地址和尾地址,我们再根据对象大小进行切分,因为内存物理上其实是连续的,而我们这里要在抽象的把他形成链式结构,我们就需要通过尾插来保证地址的连续。切好后将该span挂在Central Cache的桶。

⭐Central Cache的中挂的span 中_ useCount 记录分配了多少个对象出去,分配一个对象给Thread Cache,就 ++_useCount。

🍥释放内存

⭐当Thread Cache 过长或者线程销毁,则会将内存释放回 Central Cache 中的,释放回来时 _
useCount-- 。当 useCount 减到 0 时则表示所有对象都回到了 span ,则将 span 释放回 Page Cache
Page Cache 中会对前后相邻的空闲页进行合并。
// 将一定数量的对象释放到span跨度
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{//找到在哪个桶上size_t index = AMSize::Index(size);_spanLists[index]._mtx.lock();//加锁,因为有桶锁防止多线程竞争//回收到spanwhile (start){void* next = NextObj(start);//找到对应的span,小内存(自由链表)头插Span* span = PageCache::GetInstance()->MapObjToSpan(start);NextObj(start) = span->_freeList;span->_freeList = start;span->_useCount--;//如果为0,说明span切分出的小块内存都回来了,这个span可以再回收给PageCache,再尝试去前后页合并if (span->_useCount == 0){//从桶里拿掉这个span_spanLists[index].Erase(span);//知道span的页号就可以知道span的起始地址从而找到整块span,不需要考虑小块内存链表_freeList了span->_freeList = nullptr;span->_next = nullptr;span->_prev = nullptr;_spanLists[index]._mtx.unlock();//已经拿掉span了,可以释放桶锁给别人PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();_spanLists[index]._mtx.lock();}start = next;}_spanLists[index]._mtx.unlock();
}

        头插回收一定数量对象到span,如果全部回收,即_useCount==0,则可以将该span拿给Page Cache进行页的合并。

        那么如何通过地址获取对应的span呢?我们就需要调用MapObjToSpan函数来获取,这将在下面介绍。

7️⃣Page Cache设计

Page Cache :页缓存是在C entral Cache 缓存上面的一层缓存,存储的内存是以页为单位存储及分
配的,C entral Cache 没有内存对象时,从P age Cache 分配出一定数量的 page ,并切割成定长大小
的大块内存,分配给 Central Cache 当一个 span 的几个跨度页的对象都回收以后,P age Cache
会回收C entral Cache 满足条件的 span 对象,并且合并相邻的页,组成更大的页,缓解内存碎片
的问题。

🍙Page Cache类

Page Cache我们在设计中也是只有一个, 所以设置成单例模式。

        并且在Page Cache中我们桶的映射规则与上面2级缓存不同,这里采用直接定址法,i号桶挂i页内存

        桶的个数根据需求而定,我们申请内存最大是256Kb,页大小为8K,也就是说我们要想申请一个256Kb的对象就必须要(256/8=32)32页的span,那么我们可以多分配一些,设置桶个数为128,128页可以申请4个256Kb对象。实际上128页就是1Mb大小。

        页缓存中主要对页进行操作,所以我们有必要对页和span建立一个映射关系,方便我们查找管理,所以使用哈希表unordered_map<PAGE_ID,Span*>

        对页缓存的访问需求实际上很少,所以我们使用一个整体锁来进行管理线程安全即可,避免频繁调用锁,消耗时间。

        在创建Span中,我们使用了最开始设计的定长内存池来申请和释放对象,与new和delete分离。

#pragma once#include"Common.h"
#include"ObjectPool.h"
//单例模式
class PageCache
{
public:static PageCache* GetInstance(){return _pInst;}//获取从对象到span的映射Span* MapObjToSpan(void* obj);//获取一个K页SpanSpan* NewSpan(size_t k);// 释放空闲span回到Pagecache,并合并相邻的spanvoid ReleaseSpanToPageCache(Span* span);std::mutex _pageMtx;//全局锁private:SpanList _spanLists[NPAGES];//页数作桶的映射下标std::unordered_map<PAGE_ID, Span*>_idSpanMap;ObjectPool<Span>_spanPool;
private:PageCache(){}PageCache(const PageCache&) = delete;static PageCache* _pInst;};

🍥映射查找Span

        根据Central Cache申请内存部分引入的概念,我们可以得知页的起始地址*页大小=页号,我们可以通过这个公式得到页号,然后在哈希表中查找到对应的span。

        这里我们使用RAII原则的unique_lock,构造时加锁,出作用域对象解锁,防止程序异常退出导致死锁,优化代码。

//通过页的起始地址找到页,从而映射找到span
Span* PageCache::MapObjToSpan(void* obj)
{//算页号PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;std::unique_lock<std::mutex>lock(_pageMtx);//RAII思想,构造时加锁,出作用域对象销毁调用析构函数解锁查找auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}
}

🍥申请内存

⭐当central cache page cache 申请内存时, page cache 先检查对应位置有没有 span ,如果没有 则向更大页寻找一个 span ,如果找到则分裂成两个 。比如:申请的是 4 page 4 page 后面没
有挂 span ,则向后面寻找更大的 span ,假设在 10 page 位置找到一个 span ,则将 10 page span分裂为一个 4 page span 和一个 6 page span
⭐如果找到_spanList[128] 都没有合适的 span ,则向系统使用 mmap brk 或者是 VirtualAlloc 等方式申请128 page span 挂在自由链表中,再重复 1 中的过程。
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0);if (k > NPAGES - 1){//页数大于128,直接向堆申请void* ptr = SystemAlloc(k);//Span* span = new Span;Span* span=_spanPool.New();//页号*页大小=该页的起始地址span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = k;_idSpanMap[span->_pageId] = span;//记录pageId和span映射关系,方便释放的时候通过页找到span//_idSpanMap.set(span->_pageId, span);//基数树优化return span;}//先检查第k个桶里面有没有spanif (!_spanLists[k].Empty()){Span* kSpan= _spanLists[k].PopFront();//建立id和span的映射,方便CentralCache回收小块内存时,查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;//_idSpanMap.set(kSpan->_pageId + i, kSpan);//基数树优化}return kSpan;//kSpan页返回给CentralCache}//检查后面桶里有没有spanfor (size_t i = k + 1; i < NPAGES; ++i){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();//Span* kSpan = new Span;Span* kSpan = _spanPool.New();//在nSpan的头部切下k页//k页span返回给CentralCache;nSpan再挂接到对应映射的位置kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;//更新编号nSpan->_n -= k;//既是剩余页数也是映射位置_spanLists[nSpan->_n].PushFront(nSpan);//挂接//存储nSpan的首尾页号跟nSpan映射,方便PageCache回收内存时进行合并查找_idSpanMap[nSpan->_pageId] = nSpan;_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;//nSpan最后一个页号//_idSpanMap.set(nSpan->_pageId, nSpan);//_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);//基数树优化//建立id和span的映射,方便CentralCache回收小块内存时,查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId+i] = kSpan;//_idSpanMap.set(kSpan->_pageId + i, kSpan);//基数树优化
;			}return kSpan;//kSpan页返回给CentralCache}}//走到这说明后面没有大页的span,这时需要去堆要一个128页的span//Span* bigSpan = new Span;Span* bigSpan = _spanPool.New();void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);return NewSpan(k);
}
  • 如果申请页大于128页,则需要向堆申请,我们后续再说。
  • 如果该桶还有span,则直接取出span给Central Cache,并哈希表保存页号和span的映射。
  • 如果该桶没有,则从后面的桶中取span,并更新该span被切后的页号和页数再挂接到对应页号的桶上,建立页号和span的映射关系,方便后续回收。
  • 如果后续桶也没有span,则向系统堆申请128页的span,挂接到128号桶,再递归调用切出要的页span。

🍥释放内存

⭐如果central cache 释放回一个 span 则依次寻找 span 的前后 page id 的没有在使用的空闲 span 看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的 span ,减少 内存碎片
void PageCache::ReleaseSpanToPageCache(Span* span)
{//大于128页,直接还给堆if (span->_n > NPAGES - 1){void* ptr = (void*)(span->_pageId << PAGE_SHIFT);SystemFree(ptr);//delete span;_spanPool.Delete(span);return;}//尝试span前后页合并,缓解内存外碎片问题while (1){PAGE_ID prevId = span->_pageId - 1;auto ret = _idSpanMap.find(prevId);//如果没有前面的页号,不合并了if (ret == _idSpanMap.end()){break;}//如果前面的相邻页span在使用,不合并了Span* prevSpan = ret->second;/*auto ret =(Span*) _idSpanMap.get(prevId);if (ret == nullptr){break;}Span* prevSpan = ret;*///基数树优化if (prevSpan->_isUse == true){break;}//如果合并超过128页的span,没办法管理,不合并了if (prevSpan->_n + span->_n > NPAGES - 1){break;}//合并span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;//合并了要删除挂接在桶上的prevSpan_spanLists[prevSpan->_n].Erase(prevSpan);//delete prevSpan;_spanPool.Delete(prevSpan);}while (1){PAGE_ID nextId = span->_pageId + span->_n;auto ret = _idSpanMap.find(nextId);if (ret == _idSpanMap.end()){break;}Span* nextSpan = ret->second;/*auto ret = (Span*)_idSpanMap.get(nextId);if (ret == nullptr){break;}Span* nextSpan = ret;*///基数树优化if (nextSpan->_isUse == true){break;}if (span->_n + nextSpan->_n > NPAGES - 1){break;}//合并span->_n += nextSpan->_n;_spanLists[nextSpan->_n].Erase(nextSpan);//delete nextSpan;_spanPool.Delete(nextSpan);}//前后页合并后的span或者无法合并的span挂接到在PageCache对应桶_spanLists[span->_n].PushFront(span);span->_isUse = false;_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId + span->_n - 1] = span;/*_idSpanMap.set(span->_pageId, span);_idSpanMap.set(span->_pageId + span->_n - 1, span);*///基数树优化
}
  • 如果归还页大于128页,则直接还给堆,同样我们下面再讲。
  • 首先向相邻前页合并,再向相邻后页合并。
  • 如果相邻页没有就不合并跳出,如果相邻页正在使用就不合并跳出(这里为什么要使用_isUse而不使用_useCount==0呢?)如果合并页超过128,无法管理不合并跳出。
  • 走完前后页合并逻辑后,将页挂接到Page Cache的桶并建立映射关系。

为什么要使用_isUse而不使用_useCount==0来判断相邻页是否正在被使用呢?

        因为可能在给CentralCache划分span的时候,_usercount还未++,此时还是0,恰好有可能其他线程在PageCache判断此时划分给CentralCache的为0拿来合并,这就造成了线程安全的问题。

解决方法:span增加一个bool值,判断是否被使用

8️⃣申请释放联调

🍙申请内存联调

        接口ConcurrentAlloc联调程序申请内存 :

static void* ConcurrentAlloc(size_t size)
{if (TLS_ThreadCache == nullptr){//TLS_ThreadCache = new ThreadCache;static ObjectPool<ThreadCache>tcPool;TLS_ThreadCache = tcPool.New();}//cout << std::this_thread::get_id() << ";" << TLS_ThreadCache << endl;return TLS_ThreadCache->Allocate(size);}
}

🍙释放内存联调

        接口ConcurrentFree联调程序释放内存: 

static void ConcurrentFree(void* ptr)
{Span* span = PageCache::GetInstance()->MapObjToSpan(ptr);//通过映射关系找到spansize_t size = span->_objSize;assert(TLS_ThreadCache);TLS_ThreadCache->Deallocate(ptr, size);
}

9️⃣大于256Kb大块内存申请释放问题

🍙大块内存申请问题

我们三级缓存的设计主要考虑的是小于256Kb的对象,那如果大于256Kb我们如何处理呢?

  • 在Page Cache中我曾提到256Kb需要32页,但我们Page Cache设计的最大有128页。所以如果申请对象大于32页小于等于128页,我们可以直接向Page Cache申请内存
  • 如果大于128页,我们就需要向系统堆空间申请内存

修改联调程序:大于256Kb我们就直接去Page Cache

Page Cache中大于128页向堆申请内存,小于等于则继续按逻辑获取页内存。

🍙大块内存释放问题

  • 大于128页,直接向堆释放内存
  • 小于等于128页则继续走Page Cache逻辑页合并

修改释放联调程序:在这里能看出MapObjToSpan的价值,通过地址就可以映射找到span,并且为了获取存储对象大小,在span结构中增添_objSize

Page Cache大于128页向堆释放内存。

        并且在申请和释放的对象的过程中,我们使用了定长内存池创建释放对象,不使用new和delete使得可以和malloc进行性能比较。

🔟性能对比及基数树优化

🍙性能对比

        对比多线程下设计的高并发内存池和malloc的性能:分别对相同大小内存和不同大小内存进行申请和释放。

#include"ConcurrentAlloc.h"void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(malloc(16));//固定大小内存//v.push_back(malloc((16 + i) % 8192 + 1));//不同大小内存}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (unsigned int)malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, (unsigned int)free_costtime);printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, (unsigned int)(malloc_costtime + free_costtime));
}void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(ConcurrentAlloc(16));v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (unsigned int)malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (unsigned int)free_costtime);printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, (unsigned int)(malloc_costtime + free_costtime));
}int main()
{size_t n = 1000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}
  • ntimes:单轮申请、释放次数
  • nworks:线程数
  • rounds:轮次数
  • 线程内部使用lambda表达式(C++11新特性),用于定义匿名函数,以值传递捕获k,以引用传递捕获其他父作用域的变量
  • 使用原子变量atomic(C++11新特性),不会导致多线程下数据竞争,注意:printf没法直接大于atomic类型对象,需要强转。

测试结果:性能有待优化

🍙性能瓶颈分析

我们使用VS自带的性能探查器进行时间检测。

根据检测结果,我们发现性能瓶颈点在MapObjToSpan的锁竞争上。

🍙基数树优化

        在tcmalloc中实际上在释放内存中对该处使用了基数树优化,那我们也学习使用基数树对我们的程序进行优化。

        单层基数树是直接地址映射法进行直接哈希,也就是说页号与span直接对应。

// 一层基数树(直接哈希)
template <int BITS>
class TCMalloc_PageMap1 {
private:static const int LENGTH = 1 << BITS;//页数目,BITS是存储页号需要多少位,假设一页8K=2^13;32位下存储页号需要=(32-13)=19位void** array_;//指针数组
public:typedef uintptr_t Number;explicit TCMalloc_PageMap1( ) {size_t bytes = sizeof(void*) << BITS;//需要开辟的字节数size_t alignSize = AMSize::_RoundUp(bytes, 1 << PAGE_SHIFT);//bytes>2^18(256*1024),按页大小对齐array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);//按页分配内存memset(array_, 0, sizeof(void*) << BITS);}//返回映射值void* get(Number k) const {if ((k >> BITS) > 0) {//页号不在页数目范围return NULL;}return array_[k];}//建立映射void set(Number k, void* v) {array_[k] = v;}
};

        非类型模板参数BITS表示存储页号最多需要比特位的个数,32位下最大页号2^19次,此时BITS就是19,数组个数就是2^19,每个存储1个指针,所以数组总大小2^21=2M。

        64位下最大页号2^51次,此时BITS就是51,数组个数就是2^19,每个存储1个指针,所以数字总大小2^54=2^24G,这实在是太大了,所以我们需要继续分层。

        二层基数树实际上就是把BITS进行分层映射,在32位下,用前5比特位映射第一层,得到2^5个,后14位映射到第二层得到该页的span指针。总共占用大小2^5 * 2^14 * 4 =2^21=2M。和一层基数树开辟的大小是一样的,但是二层基数树最开始只需要开辟第一层,当需要某一页号进行映射再开辟第二层,而一层基数树一开始直接开辟全部。

//二层基数树(分层哈希)
template <int BITS>
class TCMalloc_PageMap2 {
private:static const int ROOT_BITS = 5;//前5个比特位static const int ROOT_LENGTH = 1 << ROOT_BITS;//2^5第一层存储元素个数static const int LEAF_BITS = BITS - ROOT_BITS;//19-5=14,剩下14个比特位static const int LEAF_LENGTH = 1 << LEAF_BITS;//2^14第二层存储元素个数// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH];typedef uintptr_t Number;explicit TCMalloc_PageMap2( ) {	memset(root_, 0, sizeof(root_));//第一层空间清理PreallocateMoreMemory();}void* get(Number k) const {const Number i1 = k >> LEAF_BITS;//k低19位存储页号(合法的高位都是0),右移14位,获取19位中的前5位([18,14])确定第一层的下标const Number i2 = k & (LEAF_LENGTH - 1);//获取后13位与k与运算获得第二层的下标if ((k >> BITS) > 0 || root_[i1] == NULL)// 页号值超过范围或者页号映射的空间未开辟{return NULL;}return root_[i1]->values[i2];//返回映射的span指针}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);assert(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;//建立映射}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;// 检查是否超出第一层下标范围if (i1 >= ROOT_LENGTH)return false;// 开辟空间if (root_[i1] == NULL)//第一层i1指向的空间未开辟 {static ObjectPool<Leaf>LeafPool;Leaf* leaf = (Leaf*)LeafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}//推进叶节点的地址key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;//移到下一页空间首地址}return true;}void PreallocateMoreMemory() {//将第二层空间全部开好Ensure(0, 1 << BITS);}
};

        设计Ensure函数进行需要页号时再开辟第二层空间,并且全部开辟内存消耗也不多,所以我们在构造的时候就全部开辟出来。

         32位可以使用一层和二层基数树,64位下需要使用三层基数树,分析过程和二层实际一样,省略。   

        本项目只在32位平台使用基数数优化,我们使用单层基数树优化代码:

当我们需要建立映射关系时就调用基数树函数set:

_idSpanMap.set(span->_pageId, span);

当我们需要读取映射关系时就调用基数树函数get:

auto ret = (Span*)_idSpanMap.get(id);

MapObjToSpan函数此时无需加锁:

//通过页的起始地址找到页,从而映射找到span
Span* PageCache::MapObjToSpan(void* obj)
{//算页号PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;auto ret = (Span*)_idSpanMap.get(id);assert(ret != nullptr);return ret;
}

为什么无需加锁?

MapObjToSpan在进行读操作。

1.只有这两个函数中会去建立id和span的映射,也就是说会去写操作

2.基数树,写之前会提取开好空间,写数据过程中,不会动结构。

3.读写是分离的。线程1对这个位置读写操作时,线程2不可能对这个位置进行读写操作。

        我们不会同时对同一个页进行读取映射和建立映射的操作,因为我们只有在释放对象时才需要读取映射,但是在这个位置地方进行读操作也绝不会进行写操作,因为我们在开始开辟这个位置的时候就已经写操作写好映射了,而建立映射的写操作都是在page cache进行的(页缓存中我们加了一把大锁,更不可能出现写操作的竞争);也不可能2个线程对同一个位置进行读操作,因为读操作是在释放对象过程中,这期间有桶锁,所以也不可能产生竞争。

再次性能测试,优化结果:多线程场景下性能比malloc好。


本项目最终性能优化后只实现了在32位下运行,如若64位下则不应使用基数树优化。

源码:

https://gitee.com/hao-welcome/ConcurrentMemoryPool

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/63610.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

channel并发编程

不要通过共享内存通信&#xff0c;要通过通信共享内存。 channel是golang并发编程中一种重要的数据结构&#xff0c;用于多个goroutine之间进行通信。 我们通常可以把channel想象成一个传送带&#xff0c;将goroutine想象成传送带周边的人&#xff0c;一个传送带的上游放上物品…

打破对ChatGPT的依赖以及如何应对ChatGPT的错误和幻觉

​ OpenAI的ChatGPT是第一个真正流行的生成式AI工具&#xff0c;但它可能不是最好的。现在是时候扩大你的AI视野了。 ChatGPT成为了基于大语言模型(LLM)的聊天机器人的同义词。但是现在是时候停止对ChatGPT的痴迷&#xff0c;开始发现这个新世界中强大的替代品了。 首先&a…

【内推码:NTAMW6c】 MAXIEYE智驾科技2024校招启动啦

MAXIEYE智驾科技2024校招启动啦【内推码&#xff1a;NTAMW6c】 【招聘岗位超多&#xff01;&#xff01;公司食堂好吃&#xff01;&#xff01;】 算法类&#xff1a;感知算法工程师、SLAM算法工程师、规划控制算法工程师、目标及控制算法工程师、后处理算法工程师 软件类&a…

python 深度学习 解决遇到的报错问题4

目录 一、DLL load failed while importing _imaging: 找不到指定的模块 二、Cartopy安装失败 三、simplejson.errors.JSONDecodeError: Expecting value: line 1 column 1 (char 0) 四、raise IndexError("single positional indexer is out-of-bounds") 五、T…

操作系统中一些零散的知识点

第三章 内存管理 在虚拟内存系统中&#xff0c;虚拟内存的最大容量是由计算机的地址结构&#xff08;CPU寻址范围&#xff09;确定的&#xff0c;而虚拟内存的实际容量是受到“内存大小磁盘空间大小”、“地址线位数”共同制约&#xff0c;取二者最小值实现虚拟内存管理必须有…

Lvs+KeepAlived高可用高性能负载均衡

目录 1.环境介绍 2.配置keepalived 3.测试 1.测试负载均衡 2.测试RS高可用 3.测试LVS高可用 3.1测试lvs主服务宕机 3.2.测试lvs主服务器恢复 4.我在实验中遇到的错误 1.环境介绍 环境&#xff1a;centos7 RS1---RIP1:192.168.163.145 VIP 192.168.163.200 RS2---RIP2…

无涯教程-Android Studio函数

第1步-系统要求 您将很高兴知道您可以在以下两种操作系统之一上开始Android应用程序的开发- MicrosoftWindows10/8/7/Vista/2003(32或64位)MacOSX10.8.5或更高版本,最高10.9(小牛) GNOME或KDE桌面 第二点是,开发Android应用程序所需的所有工具都是开源的,可以从Web上下载。以…

【Interaction交互模块】ActionPublisher/ActionReciever

文章目录 需求案例原理0、相应准备1、发布器、接收器2、将把两者联系起来3、前提状态 补充 需求 Interactor只能将一个动作&#xff08;如按下手柄抓取键&#xff09;&#xff0c;传递给Interactble,如果要传两个或多个&#xff0c;就需要用发布器和接收器。 案例 右手柄抓取…

小疆智控CANOpen转PROFINET网关连接EA180C CANOPEN总线型伺服配置案例

1、首先新建一个工程&#xff0c;在CanOpen转Profinet网关配置软件中添加主站设备&#xff0c;如下图&#xff1b; 2、在CanOpen转Profinet网关设备点击导入EA180C CANOPEN总线型伺服 EDS 文件&#xff0c;右键添加从属设备&#xff1b; 3、设备设置站地址&#xff0c;如图&…

云原生架构:在云环境中构建弹性应用

随着云计算技术的快速发展&#xff0c;云原生架构已经成为现代软件开发的热门话题。作为一种在云环境中构建和运行应用程序的方法论&#xff0c;云原生架构强调弹性、可扩展性和灵活性&#xff0c;使开发者能够更好地应对复杂的业务需求。本文将深入探讨云原生架构的核心概念、…

TDesign表单rules通过函数 实现复杂逻辑验证输入内容

Element ui 中 我们可以通过validator 绑定函数来验证一些不在表单model中的值 又或者处理一下比较复杂的判断逻辑 TDesign也有validator 但比较直观的说 没有Element那么好用 这里 我们给validator绑定了我们自己的checkAge函数 这个函数中 只有一个参数 value 而且 如果你的…

一种改进多旋翼无人机动态仿真的模块化仿真环境研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

ChatGPT 总结前端HTML, JS, Echarts都包含哪些内容

AIGC ChatGPT ,BI商业智能, 可视化Tableau, PowerBI, FineReport, 数据库Mysql Oracle, Office, Python ,ETL Excel 2021 实操,函数,图表,大屏可视化 案例实战 http://t.csdn.cn/zBytu

Web3.0:重新定义互联网的未来

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; Web3.0&#xff1a;重新定义互联网的未来 Web3.0是指下一代互联网&#xff0c;也称为“分布式互联网”。相比于Web1.0和Web2.0&#xff0c;Web3.0具有更强的去中心化、…

京东API接口解析,实现获得JD商品评论

要获取京东商品评论&#xff0c;需要使用京东的开放平台API接口。以下是一个基本的示例&#xff0c;解析并实现获取JD商品评论的API接口。 首先&#xff0c;你需要访问京东开放平台并注册一个开发者账号。注册完成后&#xff0c;你需要创建一个应用并获取到API的权限。 在获取…

SQL Server开启变更数据捕获(CDC)

一、CDC简介 变更数据捕获&#xff08;Change Data Capture &#xff0c;简称 CDC&#xff09;&#xff1a;记录 SQL Server 表的插入、更新和删除操作。开启cdc的源表在插入、更新和删除操作时会插入数据到日志表中。cdc通过捕获进程将变更数据捕获到变更表中&#xff0c;通过…

【OpenCV入门】第六部分——腐蚀与膨胀

文章结构 腐蚀膨胀开运算闭运算形态学方法梯度运算顶帽运算黑帽运算 腐蚀 腐蚀操作可以让图像沿着自己的边界向内收缩。OpenCV通过”核“来实现收缩计算。“核”在形态学中可以理解为”由n个像素组成的像素块“&#xff0c;像素块包含一个核心&#xff08;通常在中央位置&…

构建现代应用:Java中的热门架构概览

文章目录 1. 三层架构2. Spring框架3. 微服务架构4. Java EE&#xff08;Enterprise Edition&#xff09;5. 响应式架构6. 大数据架构7. 领域驱动设计&#xff08;Domain-Driven Design&#xff0c;DDD&#xff09;8. 安卓开发架构结论 &#x1f389;欢迎来到Java学习路线专栏~…

使用Arrays.asList生成的List集合,操作add方法报错

早上到公司&#xff0c;刚到工位&#xff0c;测试同事就跑来说"功能不行了&#xff0c;报服务器异常了&#xff0c;咋回事";我一脸蒙&#xff0c;早饭都顾不上吃&#xff0c;要来了测试账号复现了一下&#xff0c;然后仔细观察测试服务器日志&#xff0c;发现报了一个…

芯探科技--泛自动驾驶激光雷达解决方案

泛自动驾驶应用领域: 无人配送车 无人叉车 服务机器人 无人清扫车 …… 泛自动驾驶激光雷达解决方案介绍 在中低速移动过程中,类似无人配送车、无人叉车、服务型机器人、无人清扫车等具有自动驾驶功能的车辆,其需要对周围的环境进行探测,进而实现…