目录
一,项目介绍
1.1 关于高并发内存池
1.2 关于池化技术
1.3 关于malloc
二,定长内存池实现
2.1 实现详情
2.2 完整代码
三,高并发内存池整体设计
四,threadcache设计
4.1 整体设计
4.2 哈希桶映射对齐规则
4.3 申请内存函数实现
4.4 TLS无锁访问
五,centralcache设计
5.1 整体设计
5.2 Span的设计
5.3 完善thread cache类的申请内存函数
5.4 centralcache类的实现
六,pagecache设计
6.1 整体设计
6.2 page cache详细设计
七,申请内存过程联调
7.1 第一次申请
7.2 第二次申请
7.3 第1025次申请
八,回收内存设计
8.1 threadcache回收内存
8.2 centralcache回收内存
8.3 pagecache回收内存
九,回收内存过程联调
十,优化
10.1 大于256KB内存块的申请和释放
10.2 释放对象时优化不传大小
10.3 读取映射关系时的加锁问题
十一,多线程下对比malloc性能测试
十二,使用基数树优化
12.1 性能瓶颈分析
12.2 关于基数树
12.3 使用基数树优化代码
12.4 最终测试
12.5 周边问题
一,项目介绍
1.1 关于高并发内存池
当前项目是实现一个高并发的内存池,原型为 Google 的一个开源项目 tcmalloc ,全称 Threead-Caching Malloc(现成缓存malloc),能够将内存配合线程进行高效的管理,最终实现的接口用于替换系统内存分配的的相关函数(mallco,free)
-
tcmalloc以比传统malloc更高的的效率出名,比如微软的Go语言就直接用它做了自己底层的内存分配器
-
该项目目的是为了学习tcmalloc的精华,理解其核心思想,简化tcmalloc的核心框架,进而模拟实现一个迷你版的高并发内存池
-
该项目设计的技术栈有:C/C++,数据结构,操作系统内存管理,单例模式,多线程与线程安全等
1.2 关于池化技术
- “池化技术”就是程序预先向系统申请一块资源,然后由程序自行管理,之所以要这么做,是因为申请资源和释放都有成本的,就比如你给对方打电话,然后说一个子打一次电话,说十个字就要打十次电话,所以不如打一次电话一次性说十个字
- 每次申请资源时直接从“池”中获取,释放时将资源重新放回“池中”,这样可以使程序申请释放内存的效率变高很多
- 在计算机中,除“内存池”外,还有线程池,对象池,连接池等,所以池化技术的应用非常广泛
问题:内存池主要解决什么问题?
解答:内存池主要是为了提高申请释放内存时的效率,能够避免让程序因为频繁的申请和释放内存导致整体效率降低;除此之外,我们还需要尝试解决外部碎片和内部碎片的问题:
- 外部碎片:我们有一些空闲的小内存块,但由于这些内存块不连续,所以就可能导致总体的空内存够用,但是还是程序还是会申请失败的问题
- 内部碎片:主要是由于一些对齐的需求,导致分配出去的空间中一些内存无法使用
所以要同时兼顾解决两种内存碎片是本项目的一大亮点,同时也是一大难点
1.3 关于malloc
- C/C++中我们要动态申请内存是通过malloc函数去申请的,包括C++中的new实际上也是封装了malloc函数,本质上还是malloc去申请
- malloc申请内存的流程大致如下图:
malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如微软的VS系列中的malloc就是微软自行实现的,而Linux下的gcc用的是glibc中的ptmalloc
问题:有malloc了为啥还要搞一个内存池呢?
解答:malloc其实就是一个通用的内存池,在任何场景下都可以使用;但是malloc并不是针对某种场景专门设计的,这也意味着malloc在任何场景下都不会有很高的性能,而内存池就是抓住了这一点,内存池的任务就是为了提高效率
二,定长内存池实现
定长内存池就是针对固定大小内存块的申请和释放的内存池,由于定长内存池只需要支持固定大小内存块的申请和释放,因此我们可以将其性能做到极致,并且在实现定长内存池时不需要考虑内存碎片等问题,因为我们 申请/释放 的都是固定大小的内存块
实现定长内存池的目的是为了熟悉一下对简单内存池的控制,其次,这个定长内存池后面会作为高并发内存池的一个基础组件
2.1 实现详情
①如何实现定长?
方法一:使用非类型模板参数,使得该内存池中申请到的对象的大小都是N
template<size_t N> //非类型模板参数
class ObjectPool
{};
方法二:定长内存池也叫做对象池,在创建对象池时,对象池可以根据传入的对象类型的大小来实现定长,因为传入对象的大小都是固定的
template<class T> //也可以用T来代替N实现定长的作用,因为T的大小是固定的
class ObjectPool
{};
②定长内存池的成员变量有哪些?
private://void* _memory = nullptr;char* _memory = nullptr; //void*在切割内存池时++会不好用,所以换成char*,它代表一个字节,方便申请时+操作size_t _remainBytes = 0; //表示当前内存池中还剩下的空间大小,防止用完后继续申请造成越界void* _freeList = nullptr; //自由链表,用来管理进程还给我的空间
对于向堆申请到的大块内存,我们可以用一个指针来对其进行管理,然后再用一个变量来记录这块内存的长度。
再然后,释放回来的内存块也需要管理,由于释放回来的内存块都是一个一个小的块,我们可以将这些内存块搞成一个链表,这个链表叫做自由链表,第三个变量就是指向这个自由链表的指针
③内存池如何管理进程释放回来的内存块?
- 进程还回来的定长内存块,用自由链表组织取来,由于链表的每个节点需要一个相互连接的指针,所以我们可以让内存块的前4个字节(32位)或8个字节(64位)作为连接指针,里面存的就是后面内存块的起始地址
- 因此我们的操作进行一个简单的头插,先往内存块的前4或前8个字节里面写入一个指向自由链表中第一个节点的指针,然后更新 _freeList 即可:
代码就这样写:
但是有个问题,上面的程序在32位下能跑,在64位下就挂掉了:32位下我们对内存块的前4个字节进行写入,64位下我们就需要对内存块的钱8个字节进行写入,但由于我们是int*,int*解引用后只能访问4个字节,需要把int换成long long,但是我们并不知道目前是32位还是64位,所以有了下面的问题
问题:如何让obj在32位下访问4个字节,在64位下访问8个字节?
解答:我们可以先把obj强转成(void**),然后再进行解引用,如下代码:
*(void**)obj = _freeList; //void**解引用看的是void*,32位下void*是四字节,64位下void*是八字节(这里其实只要是二级指针都可以,int**也可以)
- 首先obj最开始是T*,强转成了void**一个二级指针,int* obj解引用后就是取一个int的大小也就是4字节,这个32位和64位下都是固定的
- 而void**解引用后看的是void*的大小,而这个void*就很有讲究了,在32位下void*的大小是4字节,64位下void*的大小是8字节(注意是这个指针的大小,不是这个指针访问空间的大小)
- 而对void**解引用后,访问的空间大小取决于void*的大小,而void*的大小在32位下是4字节,在64位下是8字节,所以也就达到了我们的目的,如上述代码
所以我们的释放函数Delete就是:
void Delete(T* obj)
{//直接头插,其中_freeList是指向自由链表第一个节点的指针(不是哨兵节点哦)//显示调用析构函数清理对象obj->~T();//此时obj是一个T*,所以我也不知道obj是多大,所以我们把obj强转成一个int*,然后再对这个int*解引用就可以访问到内存块的前4个字节了//*(int*)obj = _freeList; //读取内存块的前4个字节,然后解引用进行写入//32位下,上面的程序没问题,但是64位下就挂了,因为32位下是四字节,64位下是8个字节了,所以32位下用int,64位下要用long long*(void**)obj = _freeList; //void**解引用看的是void*,32位下void*是四字节,64位下void*是八字节(这里其实只要是二级指针都可以,int**也可以)_freeList = obj;
}
④内存池如何为我们申请对象?
最开始的时候,先申请一大块内存
申请资源时,内存池应该优先把还回来的内存块对象再次重复利用,因此如果自由链表当中有内存块的话,就直接从自由链表头删一个内存块进行返回即可
如果自由链表当中没有合适内存块,那么我们就在大块内存中切出合适长度的内存块进行返回,当内存块切出后及时更新_memory指针的指向以及的_remainBytes值即可
两个注意事项:
- 由于我们后面进程释放内存块给内存池时,我们需要将内存块头插到自由链表中,那么我们必须保证切出来的空间至少能存下一个地址,假设用户只申请了1个字节,我们也要至少切4个字节或8个字节然后递过去
- 此外,当大块剩余的大块内存已经不足以切出合适大小时,我们应该再次向堆申请一块更大的空间,并更新 _memory指针和 _remainBytes 的值
所以我们的New函数这样写:
T* New()
{T* obj = nullptr; //指向申请的内存块的开头,把开头的指针返回给上一层if (_freeList != nullptr) //优先把还回来的内存块对象重复利用,看一下自由链表{void* next = *(void**)_freeList;obj = (T*)_freeList;_freeList = next;return obj;}else //自由链表没有合适内存块时,再去大块内存去切{if (_remainBytes < sizeof(T)) //当剩余的空间不够一个T对象大小时,重新开大空间{_memory = (char*)malloc(128 * 1024);_remainBytes = 128 * 1024;if (_memory == nullptr) throw std::bad_alloc(); //开失败了,表示系统也没有更多的内存块了,抛异常}obj = (T*)_memory;//32位下void*大小是4字节,64位下是8字节,如果T的大小也就是进程要申请空间的大小不足以存下一个指针时,我们要单独处理一下size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);_memory += objSize;_remainBytes -= objSize;}//只开了空间未初始化,也不好//定位new,显示调用T的构造函数初始化new(obj)T;return obj;
}
⑤如何跳过malloc直接向堆申请空间?
上面我们是用malloc的,所以我们是先调用malloc,然后malloc再去向系统申请,那么我们可以不可以跳过malloc,我们自己直接去向系统申请呢?
①在Windows下:我们用的接口是一个 VirtualAlloc函数,这个函数的作用就是直接跳过malloc,按页为单位,直接向堆要内存空间
②在Linux下也有这样的系统调用:
- brk函数:作用是将数据段的最高指针
- mmap函数:是在进程的虚拟地址空间中(在堆栈中间,称为文件映射区的区域),找一块空闲的虚拟内存
这两种方式分配的都是虚拟内存,并没有时机分配物理内存。在第一次访问的时候会发生缺页中断,操作系统分配物理内存,然后建立虚拟内存和物理内存之间的映射关系
下面通过条件编译分别实现下:
#ifdef _WIN32#include <Windows.h>
#else//...
#endif//直接去堆上申请按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage<<13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else// linux下brk mmap等
#endifif (ptr == nullptr)throw std::bad_alloc();return ptr;
}
2.2 完整代码
我们的定长内存池包括头文件和测试代码:
下面是ObjectPool.h的代码:
#pragma once
#include<iostream>
#include<vector>
#include<time.h>#ifdef _WIN32
#include <Windows.h>
#else
//...
#endif//直接去堆上申请按页申请空间
inline static void* SystemAlloc(size_t t)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, t << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else // linux下brk mmap等//由于brk和mmap涉及到虚拟内存的扩展和收缩,有点复杂,所以linux环境下还是用mallocvoid* ptr = malloc(128 * 1024)
#endifif (ptr == nullptr)throw std::bad_alloc();return ptr;
}//定长内存池
//template<size_t N> //非类型模板参数
template<class T> //也可以用T来代替N实现定长的作用,因为T的大小是固定的
class ObjectPool
{
public:T* New(){T* obj = nullptr; //指向申请的内存块的开头,把开头的指针返回给上一层if (_freeList != nullptr) //优先把还回来的内存块对象重复利用,看一下自由链表{void* next = *(void**)_freeList;obj = (T*)_freeList;_freeList == next;return obj;}else //自由链表没有合适内存块时,再去大块内存去切{//32位下void*大小是4字节,64位下是8字节,如果T的大小也就是进程要申请空间的大小不足以存下一个指针时,我们要单独处理一下size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);if (_remainBytes < objSize) //当剩余的空间不够一个T对象大小时,重新开大空间{_remainBytes = 128 * 1024; //每次开空间后,更新剩余空间大小//_memory = (char*)malloc(128 * 1024);_memory = (char*)SystemAlloc(_remainBytes >> 13);if (_memory == nullptr) throw std::bad_alloc(); //开失败了,表示系统也没有更多的内存块了,抛异常}obj = (T*)_memory;_memory += objSize;_remainBytes -= objSize;}//只开了空间未初始化,也不好//定位new,显示调用T的构造函数初始化new(obj)T;return obj;}void Delete(T* obj){//直接头插,其中_freeList是指向自由链表第一个节点的指针(不是哨兵节点哦)//显示调用析构函数清理对象obj->~T();//此时obj是一个T*,所以我也不知道obj是多大,所以我们把obj强转成一个int*,然后再对这个int*解引用就可以访问到内存块的前4个字节了//*(int*)obj = _freeList; //读取内存块的前4个字节,然后解引用进行写入//32位下,上面的程序没问题,但是64位下就挂了,因为32位下是四字节,64位下是8个字节了,所以32位下用int,64位下要用long long*(void**)obj = _freeList; //void**解引用看的是void*,32位下void*是四字节,64位下void*是八字节(这里其实只要是二级指针都可以,int**也可以)_freeList = obj;}private://void* _memory = nullptr;char* _memory = nullptr; //void*在切割内存池时++会不好用,所以换成char*,它代表一个字节,方便申请时+操作size_t _remainBytes = 0; //表示当前内存池中还剩下的空间大小,防止用完后继续申请造成越界void* _freeList = nullptr; //自由链表,用来管理进程还给我的空间
};
下面是test.cpp的代码,主要负责测定长内存池的性能:
#include"ObjectPool.h"struct TreeNode
{int _val;TreeNode* _left;TreeNode* _right;TreeNode():_val(0), _left(nullptr), _right(nullptr){}
};void TestObjectPool()
{// 申请释放的轮次const size_t Rounds = 3;// 每轮申请释放多少次const size_t N = 1000000;std::vector<TreeNode*> v1;v1.reserve(N);size_t begin1 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i) v1.push_back(new TreeNode);for (int i = 0; i < N; ++i) delete v1[i];v1.clear();}size_t end1 = clock();ObjectPool<TreeNode> TNPool;std::vector<TreeNode*> v2;v2.reserve(N);size_t begin2 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i) v2.push_back(TNPool.New());for (int i = 0; i < N; ++i) TNPool.Delete(v2[i]);v2.clear();}size_t end2 = clock();std::cout << "传统 new 和 delete:" << end1 - begin1 << std::endl;std::cout << "定长内存池 New 和 Delete:" << end2 - begin2 << std::endl;
}int main()
{TestObjectPool();
}
结果如下:
上述源码地址放在文章结尾
三,高并发内存池整体设计
项目主要解决什么问题?
现代很多开发环境都是多核多线程,那么申请内存必然存在激烈的锁竞争问题,malloc内部源码虽然也有针对多线程的申请锁的设计,但是我们项目的原型tcmalloc在多线程高并发的场景下能拥有更高的小,所以我们实现的内存池主要考虑下面几个问题:
- 性能问题
- 多线程环境下,锁竞争问题
- 内存碎片问题
整体设计
主要由下面三个层次组成:
- thread cache:每个线程都会有一个这个,用于分配小于等于256KB的内存,由于每个线程都有一个,所以没有锁,能提高效率
- central cache:这个是所有线程共享的,所以有锁,作用是按时机分配或回收thread cache的内存,比如thread内存不够时给它内存,thread内存过多时回收内存,简单来说就是线程缓存受中心缓存管理
- page cache: 负责向操作系统申请并管理以页为单位的大块内存,会在合适时机对central cache的内存进行补给和回收,主要将回收的内存尽可能进行合并,组成更大的连续内存块,缓解内存碎片问题
其实central cache是一个哈希桶的结构,只有当多个线程访问到一个桶的时候才会加锁,所以这里的锁竞争不会很激烈,后面会详细讲解
四,threadcache设计
4.1 整体设计
- 前面我们实现了定长内存池,由于其只支持固定大小内存块的申请和释放,所以里面只需要一个自由链表管理释放回来的内存块;那么现在我们要支持申请和释放不同大小的内存块,那么就需要多个自由链表来管理,因此thread cache实际上是一个哈希桶结构,每个桶都是一个自由链表
- thread cache支持小于等于256KB也就是256 * 1024字节的内存空间的申请,如果我们针对每种字节数的内存块都搞一个自由链表进行管理,那么就需要20多万个自由链表,光是存储这些自由链表的头指针就需要消耗大量空间,显然得不偿失
- 这时我们可以做一些平衡的牺牲,和结构体那样对字节数自定义一种对齐规则,假设我们的对齐数是8,那么thread cache的结构如下图:
- 此时当线程申请1~8字节的内存时会直接给出8字节,而当线程申请9~16字节的内存时会直接给出16字节,以此类推
当线程要申请某一大小的内存块时:
- ①先经过计算得到对齐后的字节数,找到对应的哈希桶
- ②如果该哈希桶中的自由链表中有内存块,那就从自由链表中头删一个内存块进行返回
- ③如果该自由链表已经为空了,那么就向下一层的central cache进行获取
但此时由于对齐的原因,就可能会导致一些碎片化的内存无法利用:比如线程只申请了6字节的内存,而thread cache给了8字节,这多给出的2字节就浪费掉了,这就是内存碎片的内部碎片
由于后面我们会频繁使用到自由链表,于是干脆将其单独分离出来组成一个类,放到一个公共头文件 Common.h:
static void*& NextObj(void* obj) //获取空间的前四个或前八个字节
{return *(void**)obj;
}//自由链表,管理切分好的小对象
class FreeList
{
public:void Push(void* obj) //头插{assert(obj);NextObj(obj) = _freeList;_freeList = obj;++_size;}void PushRange(void* start, void* end, size_t n) //支持插入多个对象{assert(start);assert(end);NextObj(end) = _freeList;_freeList = start;_size += n;}void* Pop() //头删{assert(_freeList);void* obj = _freeList;_freeList = NextObj(obj);--_size;return obj;}bool Empty(){return _freeList == nullptr;}size_t& MaxSize(){return _maxSize;}size_t Size(){return _size;}
private:void* _freeList = nullptr;size_t _maxSize = 1; //size_t _size = 0; //记录桶的数据个数
};
4.2 哈希桶映射对齐规则
映射规则我们也搞成一个类,叫做SizeClass,也放在Common.h里
①如何对齐
- 上面说过我们要自己定义一个对齐规则,而对齐规则主要就是看对齐数,8这个数字是最合适的,因为不论是32位还是64位机器,都至少要保证内存块能存下一个指针
- 但是如果是8,那么我们需要256 * 1024 / 8 = 32768 个桶,这个数量还是有点多了,所以我们可以分阶段,每个阶段匹配一个对齐数(tcmalloc源码里面更复杂,下面是简化版):
每阶段字节数 | 每阶段对齐数 | 哈希桶下标 |
---|---|---|
[1, 128] | 8 | [0, 16) |
[128 + 1, 1024] | 16 | [16, 72) |
[1024 + 1, 8 * 1024] | 128 | [72, 128) |
[8 * 1024 + 1, 64 * 1024) | 1024 | [128, 184) |
[64 * 1024 + 1, 256 * 1024] | 8 * 1024 | [184, 208) |
浪费率:
公式:浪费率 = 浪费的字节数 / 对齐后的字节数
- 以上面的公式为例,我们要得到最大浪费率,就要让分子最大,分母最小
- 比如以[128 + 1, 1024]这个区间为例,对齐数是16,那么最大浪费的字节数是15,最小对齐就是129要变成16的倍数,也就是144
- 那么该区间的最大浪费率就是 15 / 144 ≈ 10.4%
- 同理,后面两个区间的最大浪费率也都在10%左右,这个浪费率已经控制得非常小了,当然tcmalloc的浪费率更低,当然也很复杂
对齐我们需要搞两个函数,一个用来获取向上对齐后的字节数,一个获取对应哈希桶的下标:
//计算对象大小的对齐映射规则
class SizeClass
{
public:// 整体控制在最多10%左右的内碎片浪费// [1,128] 8byte对齐 freelist[0,16),第一个只能是8字节对齐,因为 存了指针的// [128+1,1024] 16byte对齐 freelist[16,72)// [1024+1,8*1024] 128byte对齐 freelist[72,128)// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)//获取向上对齐后的字节数static inline size_t RoundUp(size_t bytes);//获取对应哈希桶的下标static inline size_t Index(size_t bytes);
};
在获取某一字节数向上对齐的字节数时,先判断该字节数属于哪一个阶段,然后调用子函数根据每个阶段的对齐数得到最终thread cache要分配给进程的字节数:
static const size_t PAGE_SHIFT = 13;static inline size_t RoundUp(size_t size) 搞成内联是因为方便其它类不需要通过创建对象就能通过SizeClass访问该函数
{if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8 * 1024){return _RoundUp(size, 128);}else if (size <= 64 * 1024){return _RoundUp(size, 1024);}else if (size < 256 * 1024){return _RoundUp(size, 8*1024);}else //大于256K{return _RoundUp(size, 1 << PAGE_SHIFT);}
}
//第一个参数是要申请的字节数,第二个参数是该阶段的对齐数
static inline size_t _RoundUp(size_t size, size_t alignNum)
{//1,常规方法// size_t//if (size % alignNum == 0) return size; //正好是对应阶段的对齐数的倍数,所以不需要处理//else return (size / alignNum + 1) * alignNum;//2,巧妙方法return ((size + alignNum - 1) & ~(alignNum - 1));//假设是区间[1, 8],那么前面括号的size可能是1到8的任意一个数,最大值就是8 + 8 - 1 = 15//假设alignNum是8,也就是这个区间的对齐数,那么右边括号就是7,7的二进制为000111,取反后就是111000//左边括号可以表示的数在[8, 15],假设是15,二进制为001111,所以15与7的结果就是001000,是8//由于7的二进制的后面三位全是0,所以就算size是该区间的任何值,与的结果的后三位也都会是0,而前面的保持不变//再举个例子,比如size是3,对齐数是8,前面括号的值是10,二进制是001010;后面括号仍然是7,二进制是000111//7取反后是111000,所以拿这个二进制和10与一下,结果仍然是001000,还是8//上面两个例子就可以说明,通过这种方法最终得到的值的二进制就都与8对齐了//当对齐数是其它区间的对齐数时也是同样的结果
}
再之后,我们需要根据size先知道该字节数属于哪一个区间,然后通过子函数返回该字节数对应的哈希桶的下标:
// 计算映射的是哪一个自由链表桶
static inline size_t Index(size_t bytes)
{assert(bytes <= MAX_BYTES);// 每个区间有多少个链static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128) {return _Index(bytes, 3); //第二个参数是2的次方,因为子函数是直接通过位移得到对齐数的}else if (bytes <= 1024) { //计算出的桶是在该区间的桶的数量,所以要加上全面区间桶的数量才是算出的桶的下标return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024) {return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024) {return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];}else if (bytes <= 256 * 1024) {return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];}else {assert(false);}return -1;
}static inline size_t _Index(size_t size, size_t alignNum)
{/*if (bytes % alignNum == 0)return bytes / alignNum - 1;elsereturn bytes / alignNum;*/return ((size + (1 << alignNum) - 1) >> alignNum) - 1;
}
4.3 申请内存函数实现
①类的总体设计
按照上面的规则,我们自由链表个数是208,thread cache允许申请的最大内存是256KB,所以我们现在Common.h里定义好参数:
tatic const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREE_LIST = 208; //总的桶的数量
然后就是ThreadCache类,我们需要实现四个成员函数(目前先实现申请内存的函数,后面三个函数后面再讲):
class ThreadCache
{
public:void* Allocate(size_t size); //线程就通过这个函数来申请空间void Deallocate(void* ptr, size_t size) //size要告诉我在哪个桶//如果要申请的内存块大于256KB,就通过这个函数从中心缓存获取更多资源void* FetchFromCentralCache(size_t index, size_t size);//如果还给我的内存块数量超过了桶的最大存储数目,就要往上层传递void ListTooLong(FreeList& list, size_t size);private:FreeList _freeLists[NFREE_LIST]; //开256 * 1024个字节的空间
};
目前我们主要实现申请和释放内存的那两个,后面两个我们到后面设计中心缓存时在来设计
②线程申请内存的函数实现
线程往thread cache申请内存时,先计算对齐后的实际分配数和对应的哈希桶下标, 如果申请的空间大小在自由链表上有合适的空间,就返回这个空间;如果不满足就去中心缓存申请:
void* Allocate(size_t size) //线程就通过这个函数来申请空间
{assert(size <= MAX_BYTES); //MAX_BYTES在公共头文件中定义,为256 * 1024 KBsize_t alignSize = SizeClass::RoundUp(size); //计算size_t index = SizeClass::Index(size); //一共是208个桶,计算该字节数在哪一个桶,然后再去这个桶里拿内存if (!_freeLists[index].Empty()) //如果申请的空间大小在自由链表上有空闲的空间,就返回这个空间return _freeLists[index].Pop();elsereturn FetchFromCentralCache(index, alignSize); //后面实现
}
4.4 TLS无锁访问
有个问题,我们的线程如何才能获取到这个thread cache呢?
要实现每个线程都能无锁访问自己的thread cache,需要用到线程本地存储TLS(Thread Local Storage),这是一种变量存储方法,通过TLS创建的变量在它所在的线程是全局可访问的,但是不能呢被其它线程访问到,这样就确保了独立性
Windows和Linux下都有自己的TLS机制:
Windows:线程本地存储(Thread Local Storage) - 坦坦荡荡 - 博客园
Linux:Thread Local Storage(线程局部存储)TLS - 知乎
直接在ThreadCache.h文件下加上:
static _declspec(thread) ThreadCache* pTLS_ThreadCache = nullptr;
//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{pTLSThreadCache = new ThreadCache;
}
下面是对thread cache分配内存和验证TLS的测试:
由于我们不知道线程要申请和释放的内存是多大,所以我们不能让线程直接访问thread cache的Allocate函数,需要再封装一层,在头文件ConcurrentAlloc.h里面实现:
#pragma once#include"Common.h"
#include "ThreadCache.h"static void* ConcurrentAlloc(size_t size) //线程通过调用这个函数来获取资源
{if (size > MAX_BYTES) //如果线程申请的内存小于256KB,就先获取属于自己的thread cache,然后再调用它的Allocate{// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象if (pTLS_ThreadCache == nullptr){pTLS_ThreadCache = new ThreadCache;}cout << std::this_thread::get_id() << ":" << pTLS_ThreadCache << endl;return pTLS_ThreadCache->Allocate(size);}else //如果大于256KB,就去上层找{//后面实现}
}static void ConcurrentFree(void* ptr) //线程通过这个函数来释放内存
{//后面实现
}
然后我们搞一个UnitTest.cpp文件专门用来测试:
include"ConcurrentAlloc.h"
void TLStest()
{//每个线程都各自打印TLS的地址且地址不变,验证了TLS是存在的std::thread t1([]() {for (size_t i = 0; i < 5; ++i){void* ptr = ConcurrentAlloc(6);}});t1.join();std::thread t2([]() {for (size_t i = 0; i < 5; ++i){void* ptr = ConcurrentAlloc(7);}});//如果把t1.join()写在这里,由于是一起执行,打印结果会混乱不清t2.join();
}int main()
{TLStest();return 0;
}
五,centralcache设计
5.1 整体设计
thread cache自己最先向操作系统申请256KB大小的内存,当这256KB空间用完之后,thread cache就要向central cache申请了,并且当thread cache拥有超过256KB的空间时,要把多余的内存还给central,这就是central cache主要的作用
central cache的结构与thread cache一样都是哈希桶结构,并且遵循相同的对齐映射规则,这样的好处就是,当thread cache的某个桶中没有内存了,就可以直接到central cache中对应的哈希桶里去取内存就行了,并且共用同一个映射规则,发生错误了也容易排查
- 中心缓存是所有线程共享的,所以要有加锁访问
- 中心缓存在加锁时不是全加的,因为它和thread cache都是哈希桶结构,桶与桶之间是没有竞争关系的,所以只有当多个线程通过映射规则访问到同一个桶时才会加锁(这个锁我们叫做“桶锁”)
- thread cache的每个桶下面挂的都是自由链表,是一个一个已经切好的内存块,而central cache挂的是一个一个的span:
- span本身有两个指针,一个负责SpanList链表的连接,一个则指向另一个自由链表,这个自由链表里就算一个一个切好了的内存块,这些内存块的大小是和哈希桶的编号是对应的
然后就是中心缓存的申请和释放内存:
申请内存:
- 当thread cache内存用完时,就会批量向central cache申请一些,这里的批量申请有点类似与Tcp协议拥塞控制检测网络拥堵法检测报文的那个慢开始算法
- 然后去central cache找对应的桶,然后去一个一个遍历span去找合适的内存块并返回,这个过程是要加锁的,不过是桶锁所以效率不会降低很多
- 当中心缓存里的SpanList的所有Span都没有空间时,会再去向page cache申请一个更大的Span,然后按大小切好作为自由链表连接到一起,然后再取合适的Span给thread cache
- 再central cache的Span类中搞一个变量use_count记录分配了多少个对象出去,每分配一个出去就++use_count
释放内存:
- 当thread cache的内存超过256KB或者线程销毁时,会将内存释放会central cache中并--use_count,当use_count减到0时表示所有的对象都回到了Span中,则将Span释放给page cache,然后page cache会将内存块进行合并
问题:如上图,为什么需要多个Span?为什么第三个Span是空的?
解答:
- 假设只有一个Span,假设在8Byte这个桶下,那么一页要切成8字节的小块,那么就要切一千多个小块,我们不可能把这1000多个块一次性就给thread cache,所以一次给thread cache一个批量的,比如一次给5个或8个,这就是慢开始
- 所以我们的逻辑就是,刚开始只有一个Span,下面挂4个小块,当这四个小块用完后,就再来了一个Span里面再挂几个小块,就这样一路下去,就变成了多个Span
- 然后再是以上图为例,可以看到第三个Span是空的表明第三个Span的小块全被申请走了,但是前面两个Span不为空,为啥呢?不是说好的只有一个span的小块用完之后才会再来一个Span的吗?
- 因为线程不仅仅是社申请,也在释放,假设第三个Span分配完后,线程把空间又还回给第一和第二个Span了,这就出现了上图的情况
所以Central cache起到的作用主要是“承上启下”
5.2 Span的设计
central cache里面挂的也是一个一个的哈希桶,每个桶是一个链表,但是不是和thread cache一样的自由链表,而是先搞了一层SpanList链表,然后链表的每个节点就是一个一个的Span,每个Span再挂的就是自由链表
所以我们要搞两个东西,一个是Span,一个是SpanList,这两玩意儿都放在Common.h头文件里
Span设计
- 每个进程都有自己的地址空间,在32位下,进程地址空间的大小是2^32,在64位下,就是2^64,页的大小一般设4KB或8KB,我们以8KB也就是2^13个字节为例
- 在32位下,进制地址空间可以被切分为2^(32 - 13) = 2^19个页;而在64位下达到了2^51个页
- 如果需要给页号编号,在64位下的取值返回是[0, 2^51),因此不能直接用一个无符号整型来存储页号,这里我们采用条件编译来解决这个问题,如下代码:
//在32位下,进程地址空间为4G,2^32个字节,一个页是8K,为2^13,所以会被分成2^19大概50w个页
//在64位下,进程地址空间为16G,2^64字节,一个页8K,会被分成2^51个页
//所以需要分开搞,用条件编译
#ifdef _WIN64 //在Win32配置下,_Win32有定义,_Win64没有定义,但是在x64环境下,_Win32和_Win64都有定义,所以要把_WIN64放在前面
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else//linux
#endifstruct Span //span,管理多个连续页大块内存跨度结构
{PAGE_ID _pageId = 0; //大块内存起始页的页号,便于之后page cache进行前后页的合并size_t _n = 0; //表示该Span拥有页的数量Span* _next = nullptr; //双向链表Span* _prev = nullptr;size_t _useCount = 0; //表示已经分配给thread cache的小块内存的计数void* _freeList = nullptr; //切好的小块内存的自由链表bool _isUse = false; //表示该span是否在被使用,默认为false,为了加锁使用size_t _objSize = 0; //这个表示切好的对象的大小,方便释放内存时计算大小
};
设计成双链表是为了方便把内存还给page cache后删除这个span,所以搞成了带头双向循环的链表结构
SpanList设计
//带头双向循环链表
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}Span* Begin(){return _head->_next;}Span* End(){return _head;}bool Empty(){return _head->_next == _head; //如果为真,就返回空}void PushFront(Span* span){Insert(Begin(), span);}Span* PopFront(){Span* front = _head->_next;Erase(front);return front;}void Insert(Span* pos, Span* newSpan) //在指定位置前面插入{assert(pos);assert(newSpan);Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head); //不能把哨兵干掉Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}
private:Span* _head;
public:std::mutex _mtx; //桶锁
};
//注意,删除的span不需要delete释放,因为是还给page cache
5.3 完善thread cache类的申请内存函数
当线程通过thread cache申请空间时,如果申请的空间大于256KB或者thread cache里已经没有足够的空间时,thread cache就会去向central cache要内存
上面说过,thread cache找中心缓存要空间时,采用的慢开始算法:
size_t NumMoveSize(size_t size) //thread cache一次从中心缓存获取多少个对象(慢开始算法)
{assert(size > 0);int num = MAX_BYTES / size; //先计算出单个对象大小//如果对象很小,一次给你多一点点,如果对象很大,给你少一点点if (num < 2) num = 2;if (num > 512) num = 512;return num; //就是给一个折中的大小
}
通过满开始算法,就可以让thread cache找中心缓存要内存时一次别要太多减少浪费:
//如果要申请的内存块大于256KB或剩余空间不足时,就去从中心缓存获取更多资源
void* FetchFromCentralCache(size_t index, size_t size)
{//慢开始反馈调节算法//1,一次不会向中心缓存要很多、、太多对象,因为太多了用不完会浪费//2,随着申请对象数量增加,batchNum会不断++直至上限//3,size越大,一次向central cache要的batchNum就越小//3,size越小,一次向central cache要的batchNum就越大size_t batchNum = min(_freeLists[index].MaxSize(), NumMoveSize(size));if (_freeLists[index].MaxSize() == batchNum){_freeLists[index].MaxSize() += 1;}void* start = nullptr;void* end = nullptr;//往central cache要内存,我们要batchNum个size大小的内存,然后通过start和end返获得开始和结束地址//假设我们要10,但是中心缓存的对应的桶下面没有10了,那我thread cache要不要呢?虽然不够但我还是要//假设我只要1个,但是由于前面满开始算法,可能会给我2个或者更多//所以返回值表示中心缓存实际给我们的个数size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);assert(actualNum > 0);//批量申请的好处:假设我只要一个,中心缓存给了我3个,那么我把剩下的两个挂在自由链表上,那么当下一次我又只要一个的时候,就不需要再去page里要了if (actualNum == 1){assert(start == end);return start;}else //走到这里就是获取了多个,就把这多个挂到对应的桶的自由链表里去{_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);return start;}
Common.h里的FreeList类也要提供支持插入多个对象:
//class FreeList
void PushRange(void* start, void* end, size_t n) //支持插入多个对象
{assert(start);assert(end);NextObj(end) = _freeList;_freeList = start;_size += n;
}
在C++的algorithm头文件中有一个min函数,这是一个函数模板,而在windows.h头文件中也有一个min,这是一个宏。由于调用函数模板时需要进行参数类型的推演,因此当我们调用min函数时,编译器会优先匹配windows.h当中以宏的形式实现的min,此时当我们以std::min的形式调用min函数时会编译报错,这时我们只能选择将std::去掉,让编译器调用windows.h当中的min宏。
#include <algorithm>//<algorithm>算法头文件里有一个函数模板,但是windows.h里面有个宏,冲突了
#ifdef _WIN32#include <windows.h>
#else//Linux相关头文件
#endif
5.4 centralcache类的实现
①结构设计
- 由于central cache所有线程共享,所以我们搞成单例模式并提供一个全局访问点,这样所有线程就都可以通过这个访问点访问中心缓存了(这里我们搞成饿汉模式)
#pragma once
#include "Common.h"
#include "PageCache.h"//它也是一个哈希桶,thread cache下面挂的是切好的小对象,但是不能每个位置给桶,所以进行了一定程度的对齐和哈希映射
//它定义了一个管理大块内存对象的结构——span,每个桶下面有很多span,是双向链表
//单例模式
class CentralCache
{
public:static CentralCache* GetInstance() { return &_sInst; } //全局访问点//获取一个非空的SpanSpan* GetOneSpan(SpanList& list, size_t size);//从中心缓存获取一定数量的对象给thread cache,该函数计算从span中申请的对象数量size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);void ReleaseListToSpans(void* start, size_t size);private:SpanList _spanLists[NFREE_LIST]; //thread cache有几个桶,我就有几个桶
private://单例模式CentralCache() {} //把构造函数搞成私有CentralCache(const CentralCache&) = delete; //把拷贝构造干掉static CentralCache _sInst; //一上来直接搞一个对象
};
②获取一定数量的对象给thread cache
//获取一定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{size_t index = SizeClass::Index(size); //算出在桶中对应的位置_spanLists[index]._mtx.lock(); //桶锁,对单个桶加锁,防止多个线程访问到同一个桶Span* span = GetOneSpan(_spanLists[index], size); //要获取一个非空的spanassert(span); //找到的span不为空assert(span->_freeList); //span里也不为空start = span->_freeList; //将span的地址通过start返回,相当于将内存返回给了thread cache//①假设span有4个块,thread要3个,我们让end走两步走到第三个块,然后把start和end返回//②然后第三个块的前4或8个字节指向的是第四个块的地址,所以我们直接将end也就是第三个块的前4或8个地址赋值给span,相当于把这三个块切出去了end = start;size_t actualNum = 1; //该变量用来返回,表示实际给了threaad cache了多少个对象//如果span中只有4个对象,但是thread cache要5个,那么end访问到最后会崩,所以这种情况就是有多少给多少for (size_t i = 0; i < batchNum - 1; i++){if (NextObj(end) == nullptr) break;end = NextObj(end); //end往后移actualNum++;}span->_freeList = NextObj(end); //赋值span,完成切块NextObj(end) = nullptr; //将end的前4或8个字节赋值为空,使其成为链表的最后一个节点span->_useCount += actualNum;_spanLists[index]._mtx.unlock(); //解锁return actualNum; //从span中获取batchNum个对象,如果不够batchNum,有几个返回几个
}
③获取一个非空的Span
- GetOneSpan这个函数就是在central cache里获取一个span然后返回去
- 如果central cache一个span都没有了,就去page cache里获取,并且把获取到的页进行切分,然后放进central cache的桶里面
问题:如何将page给我的大块span进行切分?
解答:page cache给我的是一个Span对象指针,而Span本身不存储大块内存,只负责存储大块内存的信息,所以我们需要算出大块内存的起始地址
- ①我们可以用这个span起始页号乘以一页的大小就能得到这个span的起始地址
- ②然后用这个span的页数乘以一页的大小就可以得到整个span锁管理的内存块的大小
- ③最后用起始地址加上内存块的大小就可以得到内存块的结束位置
再然后就是切分的详细过程,可以配合代码加注释理解
//计算一次向page cache要几页空间,返回要的页数
size_t NumMovePage(size_t size)
{size_t num = NumMoveSize(size); //计算一次要多少个对象size_t npage = num * size; //计算一次要多少个字节,对象数目*单个对象大小npage >>= PAGE_SHIFT; //总对象除以一个页的大小,结果就是要申请的页数if (npage == 0) npage = 1; //如果要申请的大小一个页都没有,我们默认申请一个页return npage;
}//获取一个非空的Span
Span* GetOneSpan(SpanList& list, size_t size)
{//先在中心缓存查看有没有非空的SpanSpan* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr)return it; //说明有空余内存块的span,返回剩余的elseit = it->_next; //往后遍历}//走到这里说明没有任何有内存块的span了,需要往page cache获取list._mtx.unlock(); //先把中心缓存的桶锁解掉,这样如果其它线程释放内存对象回来时不会阻塞PageCache::GetInstance()->GetMtx().lock(); //往page cache申请时需要把整个page cache锁住,原因后面会讲Span* span = PageCache::GetInstance()->NewSpan(NumMovePage(size)); //找page cache要一定数量的spanspan->_isUse = true; //表明该span正在使用,等释放时再设为falsespan->_objSize = size; //保存切的数量PageCache::GetInstance()->GetMtx().unlock();//下面就是开始切分page cache给我的大块span,这个不需要加锁,因为这会儿其它线程访问不到这个spanchar* pageStart = (char*)(span->_pageId << PAGE_SHIFT); //算起始地址size_t bytes = span->_n << PAGE_SHIFT; //计算span的大块内存的大小(单位:字节)char* pageEnd = pageStart + bytes; //计算内存块的结束地址//下面就是把大块内存切成自由链表链接起来span->_freeList = pageStart; //先把大块内存的起始地址放在链表头节点上pageStart += size; //找到下一个对象大小的位置void* tail = span->_freeList; //辅助指针,负责遍历大内存块while (pageStart < pageEnd){NextObj(tail) = pageStart; //把当前空间前4个字节搞成下一个空间的地址tail = NextObj(tail); //tail指向下一个位置,然后继续把下一个位置的前4个或8个字节搞成下下一个空间的地址pageStart += size; //pageStart的地址也往后移//整个切分的过程其实就是把一块连续空间分成相等大小空间的起始地址头插到自由链表里,所以空间本质是还是连续的}NextObj(tail) = nullptr; //标记链表结尾list._mtx.lock(); //当切好span以后需要把span挂到桶里去的时候再重新加锁list.PushFront(span);return span;
}
六,pagecache设计
6.1 整体设计
当thread cache内存不够或者要大于256KB的空间时,找central cache要空间,然后当central cache没有合适的Span时,central cache则会去找page cache要更大的空间
page cache的基础结构和前面两个一样,都是哈希桶,但是和前面两个有很大区别:
- page cache采用的是直接定址法,,比如1号桶挂的都是大小为1页的span,2号桶挂的是大小为2页的span,依次类推
- page cache里的span是没有切分的完整的span,因为page cache是服务于central cache的,所以page cache只会把一个完整的span给central cache,至于如何对这个span细分那就是central cache的事情了
然后就是page cache的申请和释放内存
申请内存:
- 当central cache向page cache要内存时,page cache先检查自己的对应大小的桶有没有span,有就切一个返回,如果没有则向更大页的桶找一个,如果找到了就分裂成两个。加入我要4页page,但是4号桶没有,就一直往上面找,找到10号桶有page,就把这10页的page分成4页和6页的两个page
- 如果到128号桶都没有合适的span,则page cache调用mmap,brk或者VirtualAlloc等方式正式向系统申请一个128页的大内存,然后挂在链表中,接着重复上面的过程
- 所以刚开始的时候page cache里没有任何的span,所以最开始是向堆申请128页大小的内存块,而page cache里面那些小的span和最终要返回给central cache的span都是由这个128页的span切出来的
- 并且每次申请都是申请128页大小的span,所以我们可以套用我们之前的定长内存池
释放内存:
- 如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看看是否可以合并,如果合并继续向前寻找,这样就可以将切小的内存合并成一个大的span,减少内存碎片
问题:为什么最大是128
解答:线程申请的单个对象最大限制为256KB也就是256 * 1024个字节,前面说过一页大小是2或8KB,我们定义为8KB,所以128个页能被切分成4个256KB的对象,是足够的,并且这个最大不一定非得是128,可以根据具体的需求进行设置
6.2 page cache详细设计
①框架设计
- central cache是桶锁,这是因为多个thread cache向中心缓存申请时,是可以同时访问不同的桶的;但是当central cache往page cache要内存时,是同时向page cache申请的,所以在访问page cache的桶的时候要加锁
- page cache要加全局锁不能加桶锁,因为我们可能要访问page cache的多个桶,如果page cache用桶锁就会有频繁的加锁和解锁,得不偿失,所以我们用全局锁把整个page cache锁住
- page cache也可也以用单例模式,这里我们仍然用饿汉模式,刚开始就创建变量,然后提供一个全局访问点
#pragma once#include "Common.h"
#include "ObjectPool.h"
#include "PageMap.h"
class PageCache
{
public:static PageCache* GetInstance() { return &_sInst; } //全局访问点//获取从对象到span的映射Span* MapObjectToSpan(void* obj); //后面实现Span* NewSpan(size_t k); //新获取一个k数量页的Span返回给central cachevoid ReleaseSpanToPageCache(Span* span) //将中心缓存中完整的span还给page cache重新融合成一个大页std::mutex& GetMtx() { return _pageMtx; }private: //单例模式 -- 饿汉模式PageCache() {}PageCache(const PageCache&) = delete;
private:SpanList _spanLists[NPAGES]; //129大小,因为数组下标是从0开始的,1映射1,128映射128ObjectPool<Span> _spanPool; //如果需要new,可以直接套用我们前面的定长内存池,再次优化性能std::mutex _pageMtx; //不能用桶锁,需要全部锁起来,因为是整个PageCache活动的,和span的分裂和合并有关系static PageCache _sInst; //饿汉模式,一开始就把对象搞出来
};
②申请内存设计
Span* NewSpan(size_t k) //新获取一个k数量页的Span{assert(k > 0 && k < NPAGES);//先到第k号桶去找里面有没有spanif (!_spanLists[k].Empty()) //如果有span,就返回{Span* kSpan = _spanLists[k].PopFront();return kSpan;}else //第K号桶是空的,就去后面的桶找并做切分{for (size_t i = k + 1; i < NPAGES; i++) //循环往后面找{//找到不为空的桶,就开始切,最后就是k页的span返回给中心缓存,n-k页的span重新挂到n-k位置的桶去if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront(); //找到一个大的span,切成k页和n-k页的两个span//Span* kSpan = new Span;Span* kSpan = _spanPool.New();//在nSpan的头部切一个k页下来,只要变页号就可以了kSpan->_pageId = nSpan->_pageId; //kSpan是管理nSpan的,也就是上面图片每个内存块前面的那个spankSpan->_n = k; //保存页数nSpan->_pageId += k; nSpan->_n -= k; //nSpan被拿走了k个页//把切出来的n-k个页再挂到n-k号桶上去去_spanLists[nSpan->_n].PushFront(nSpan);return kSpan;}}//循环如果走完后,意味着page cache里也没有span了,所以要直接向操作系统申请一个128页的span//比如线程第一次申请内存时,central cache和page cache里是啥也没有的,就会走到这个位置 //Span* bigSpan = new Span;Span* bigSpan = _spanPool.New();void* ptr = SystemAlloc(NPAGES - 1); //向系统申请申请128页的空间bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; //设置页号bigSpan->_n = NPAGES - 1; //设置页的数量,就是129-1=128个页_spanLists[bigSpan->_n].PushFront(bigSpan); //把申请出来的空间挂到对应的桶上面然后递归调用自己return NewSpan(k); //为了避免代码重复,所以递归调用自己}}
七,申请内存过程联调
7.1 第一次申请
申请内存的过程我们用单线程即可,多线程不方便调试,只要观察在一个桶当中能否正确申请到内存即可,下面是测试函数:
void* p1 = ConcurrentAlloc(6);
void* p2 = ConcurrentAlloc(8);
void* p3 = ConcurrentAlloc(1);
①当线程第一次申请内存时,线程通过TLS获取到自己专属的thread cache对象,然后通过这个thread cache对象去申请内存
②第一次申请的是6字节,通过计算索引到了thread cache的第0号桶,但是此时第0号桶中没有对象,所以thread cache要向central cache申请
③thread cache一次最多向central cache申请8字节大小也就是512个对象, 所以在向中心缓存申请内存前,还需要将该上限值与自由链表的_maxSize进行比较,而第一次申请8字节,也就是一个对象,所以得出本次thread cache向central申请1个8字节的对象。
由于我们是使用的慢开始算法,第一次申请一个对象;第二次如果又申请一个对象,就会自增长到申请2个
④然后就是central cache找span给thread cache了,先加锁,获取到一个非空的span,然后将这个span简单处理后返回给thread cache就ok了
⑤然后就是要获取一个非空的span,先找到第0号桶,遍历span双链表,看看有没有非空的span,但是第一次申请肯定是没有的,所以我们就要先向page cache申请。
当page cache给我们一个大span后,我们需要将这个span进行加工,然后挂到我自己的桶里面,然后返回一个非空的span
⑥然后就是向page cache要空间,但是page cache是要根据页申请的,所以我们需要先计算下要向page cache申请多少个页
⑦然后就是page cache给大块的span给central cache了,但是第一次申请对象时page cache的所有桶中都是没有span的,所以要直接向堆申请一个128页的大小的bigSpan
申请到之后,填充span的信息,然后挂到第128号桶下,递归调用自己
⑧然后把128页的内存拿出来,切成1页和127页的两个span,把1页的span返回,把127页的span再次挂到127号桶下,完成page cache给central cache内存的过程
7.2 第二次申请
①当线程第二次申请8字节时,由于thread cache的第0号桶第一次向central cache申请的8字节给了线程第一次申请的6字节了,所以thread cache得再次向cache申请对象
②第二次向central cache申请对象时,由于慢增长机制,所以第二次会向central cache申请2个8字节大小的空间,第三次就会申请3个,这就是满增长
③然后就是central cache分配span给thread cache了,由于第一次central cache向page cache申请了一页的内存块,并将其切成了1024个8字节大小的对象,因此这次central cache是可以直接从0号桶搞2个8字节空间给thread cache的,不用再向page cache申请
④线程只要1个8字节大小的对象,所以thread cache把1个对象返回,另一个对象就挂在自己的第0号桶上,这样第三次再次申请1个对象时,就不再去central cache要了,第二次申请挂在第0号桶的那个返回即可
7.3 第1025次申请
- 我们先让线程申请1024次8字节对象,然后观察在第1025次申请时,central cache是否会再向page cache申请内存块,以此测试代码是否正确
- central cache第一次向page cache申请一页内存,这一页内存被 切成了1024个8字节大小的对象,所以当第1025次申请时,正常情况下会再次向page cache申请内存块
- 可以看到当第1025次申请时,central cache第0号桶中的这个span的_useCount已经增加到了1024,也就是说这1024个对象已经全部给线程用了,此时central cache就要再向page cache申请一页的span来切分
- 往后就是page cache把127号桶的那个127页大小的span拿出来,切成1页和126页的两个span,把1页的span返回给central cache,126页的那个继续挂到126号桶上
每次调试页号不一样是正常的,因为页号是根据地址来的,每次重新编译运行后的地址也是不一样的
八,回收内存设计
8.1 threadcache回收内存
- 当线程把空间还给thread cache后,需要将该对象插入到对应哈希桶的自由链表当中即可
- 但是随着还回来的空间不断增多,我们对应桶的自由链表也会越来越长,所以这些堆积在一个线程的thread cache是一种浪费,我们应该把这些内存还给central cache好让其它线程去申请
- 所以当thread cache中某个桶的自由链表超过它一次批量向central cache申请的个数时,就要把这个自由链表里的内存还给central cache
//class ThreadCache
void Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);size_t index = SizeClass::Index(size); //找对应的桶号_freeLists[index].Push(ptr); //把指针放进去// 当链表长度大于一次批量申请的内存时就开始还一段list给central cacheif (_freeLists[index].Size() >= _freeLists[index].MaxSize())ListTooLong(_freeLists[index], size);
}
还给central cache,就是从自由链表中一次取出多个对象,然后将这些对象还给central cache
//如果还给我的内存块数量超过了桶的最大存储数目,就要往上层传递
void ListTooLong(FreeList& list, size_t size)
{void* start = nullptr; //做输出型参数void* end = nullptr;list.PopRange(start, end, list.MaxSize());CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
然后我们的自由链表也需要支持一次删除多个对象的接口
void PopRange(void*& start, void*& end, size_t n)
{assert(n <= _size);start = _freeList;end = start;for (size_t i = 0; i < n - 1; i++) //取n个,只要控制end就好了{//n - 1是因为end指向第一个节点,我走两步即可,三个返回给你,第三个后面要置空end = NextObj(end);}_freeList = NextObj(end); //把后面的空间移到前面来NextObj(end) = nullptr;_size -= n;
}
补充:
当thread cache的某个自由链表过长时,就把这个自由链表中的所有对象全都还给central cache,但是我们的PushRange接口还是设置了三个参数,第三个参数表示指定数量的对象,因为有时候即使链表很长了,但是我们不一定要把所有对象全还给central cache,这样是为了代码的可修改性
并且我们这里只是简单实现,tcmalloc源码考虑的更多更多,比如判断thread cache是否要还对象给central cache时,还综合考虑了每个thread cache整体大小。比如当某个thread cache的总占用超过一定阈值时,就将该thread cache的部分对象还一些给central
8.2 centralcache回收内存
- 当thread cache中某个自由链表太长时,会将自由链表中地这些对象还给central cache中对应的span
- 但是central cache的每个桶中不止一个span,所以我们计算出还回来的对象应该挂到哪个桶里,还要知道应该挂到这个桶的哪个span下
- 那么如何根据thread cache还给我的内存块的地址推测出这个内存块原本所在的span?
- 我们大体的解决方法就是先存储页号和span之间的映射,当thread cache还一个内存块给central cache时,是给一个地址给中心缓存的,然后就可以根据之前建立的映射,根据页号就可以找到span了
①如 page何根据内存块的地址找到其所在的页号
公式:该页的页号 = 该页的地址 / 该页的大小
假设一页的大小是100,那么地址0~99的页通过计算都等于0,而地址100~188都属于第1页
②如何找到一个对象对应的span(映射机制)
- 在page cache第一次分配span给central cache时就可以建立页号和span之间的映射,这个映射在后面page cache合并页块时也会用到,我们使用C++的unordered_map用作映射容器
Span* NewSpan(size_t k) //新获取一个k数量页的Span
{assert(k > 0 && k < NPAGES);//先到第k号桶去找里面有没有spanif (!_spanLists[k].Empty()) //如果有span,就返回{Span* kSpan = _spanLists[k].PopFront();//建立id和span的映射,方便central cache回收小块内存时,查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}else //第K号桶是空的,就去后面的桶找并做切分{for (size_t i = k + 1; i < NPAGES; i++) //循环往后面找{//找到不为空的桶,就开始切,最后就是k页的span返回给中心缓存,n-k页的span重新挂到n-k位置的桶去if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront(); //找到一个大的span,切成k页和n-k页的两个span//Span* kSpan = new Span;Span* kSpan = _spanPool.New();//在nSpan的头部切一个k页下来,只要变页号就可以了kSpan->_pageId = nSpan->_pageId; //kSpan是管理nSpan的,也就是上面图片每个内存块前面的那个spankSpan->_n = k; //保存页数nSpan->_pageId += k; nSpan->_n -= k; //nSpan被拿走了k个页//把切出来的n-k个页再挂到n-k号桶上去去_spanLists[nSpan->_n].PushFront(nSpan);//建立id和span的映射,方便central cache回收小块内存时,查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}//循环如果走完后,意味着page cache里也没有span了,所以要直接向操作系统申请一个128页的span//比如线程第一次申请内存时,central cache和page cache里是啥也没有的,就会走到这个位置 //Span* bigSpan = new Span;Span* bigSpan = _spanPool.New();void* ptr = SystemAlloc(NPAGES - 1); //向系统申请申请128页的空间bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; //设置页号bigSpan->_n = NPAGES - 1; //设置页的数量,就是129-1=128个页_spanLists[bigSpan->_n].PushFront(bigSpan); //把申请出来的空间挂到对应的桶上return NewSpan(k); //为了避免代码重复,所以递归调用自己}
}
- 我们还要提供一个提供一个能让central cache根据页号获取span的函数
//class PageCache
//获取从对象到span的映射
Span* MapObjectToSpan(void* obj)
{PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT); //算出页号auto ret = _idSpanMap.find(id); //根据页号查找映射关系if (ret != _idSpanMap.end()) //找到了{return ret->second; //返回该对象属于的span指针}else //没找到{//一般来说不会找不到,如果找不到说明代码逻辑有问题,应全面检查assert(false);return nullptr;}
}
③central cache回收内存
- 知道了thread cache还给我的内存块属于哪个span后,就可以把这写内存块挂到对应的span自由链表中了,然后更新一下span的_useCount计数即可
- 如果在之后某个span的_useCount减为0了,说明span的正在使用的小块全部回来了,我们需要把这个span再还给page cache
//class CentralCache
void ReleaseListToSpans(void* start, size_t size)
{size_t index = SizeClass::Index(size); //先算要还给哪个桶_spanLists[index]._mtx.lock();//开始把还给我的对象再还给对应的spanwhile (start){void* next = NextObj(start);Span* span = PageCache::GetInstance()->MapObjectToSpan(start); //获取对象对应的span//然后只需要把对象头插到对应的span即可NextObj(start) = span->_freeList;span->_freeList = start;span->_useCount--;if (span->_useCount == 0) //当这个span的正在使用的小块全部回来了,我们需要还给page cache{//这个span就可以再回收给page cache了,page cache就可以去做前后页的合并_spanLists[index].Erase(span); //将这个span从链表中整个分离出来span->_freeList = nullptr;span->_next = nullptr;span->_prev = nullptr;//释放span给page cache时,使用page cache的大锁就可以了//这时把桶锁解掉,为了让其它线程能够访问这个桶_spanLists[index]._mtx.unlock();PageCache::GetInstance()->GetMtx().lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span); //还给page cachePageCache::GetInstance()->GetMtx().unlock();_spanLists[index]._mtx.lock();}start = next; //往后走}_spanLists[index]._mtx.unlock();
}
8.3 pagecache回收内存
- 当central cache中某个span的_useCount减到0了,那么central cache就要将这个span还给page cache了
- 大体过程起始很简单,page cache只要将换回来的span挂到对应的哈希桶上就行了
- 但是我们的page cache最主要需要解决的问题是“缓解内存碎片”,所以需要将换回来的span与其它空闲的span进行合并
合并的过程分为前合并和后合并:
- 假设还回来的span的起始页号是num,该span管理n页的空间
- 那么首先往前合并时,就要判断第num-1页对应的span是否空闲(在一个桶内),如果空闲就进行合并,假设8页的span和8页的span合并成了一个16页的span,那么就要继续把这个16页的span继续往前面找空闲的span继续合并,直到不能合并位置;往后合并也是同理
- 所以在合并span时,是需要通过页号获取对应span的,所以我们之前的映射就派上用场了
- 当我们通过页号找到对应的span时,这个span可能挂在page cache也可能挂在central cache,因为page cache刚分配一个span给central cache时,这个span的_useCount默认就是等于0的,这时候我们正在对该span进行切分的时候,page cache就把这个span拿去合并了,这显然不对
- 这时候我们span结构中的_isUse成员就派上用场了,负责标记该span是否正在被使用
下面就是page cache前后页的合并的具体步骤
①建立映射
- 注意,起始page cache和central cache都有一个映射,之前的那个映射是page cache要分配给central cache的span的映射,而page cache自己也是有span的,所以这个映射是建立page cache自己的span的映射
②进行span的合并
- 对span前后的页,尝试进行合并,缓解外内存碎片问题
- 假如我的pageId为1000,n为1,我们可以找999页或者1001页是不是空闲的
- 假如我的pageId为2000,n为5,我们可以找1999页或者2005页是不是空闲的,然后1999的为2,我就可以合并为一个7页的大页,然后2005的n为5,我可以再合成一个12页的
- 那么这样是咋缓解内存碎片的问题的呢?假设我们不进行合并,那么就有两个5页的span和一个2页的span,假设central cache要申请一个12页的span,虽然我们的总数是够的,但是因为是分散的,无法分出12页的span给central cache,这就是内存碎片问题,加上合并后该问题就能缓解
void ReleaseSpanToPageCache(Span* span) //将中心缓存中完整的span还给page cache重新融合成大页
{while (1) //向前合并{PAGE_ID prevId = span->_pageId - 1; //先找前面的页有没有空闲的auto ret = _idSpanMap.find(prevId);if (ret == _idSpanMap.end()) break; //没有找到空闲的,无法合并,直接breakSpan* prevSpan = ret->second;if (prevSpan->_isUse == true) break; //前面的页正在使用,不合并if (prevSpan->_n + span->_n > NPAGES - 1) break; //合并出超过128页的span没办法管理,也不合并//走到这里就可以往前合并了,其实合并就是改变下对应span的属性span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;//合并完后把空的span干掉_spanLists[prevSpan->_n].Erase(prevSpan);//delete prevSpan;_spanPool.Delete(prevSpan);}while (1) //向后合并{//假设一个span有5个页,所以我们需要跳过这个span才能找到下一个span的起始位置//假设页号是1000,有5个页,就要走到1004的位置才是下一个span的开始,也就是1000 + 5 - 1PAGE_ID nextId = span->_pageId + span->_n;auto ret = _idSpanMap.find(nextId);if (ret == _idSpanMap.end()) break; //和前面一样Span* nextSpan = ret->second;if (nextSpan->_isUse == true) break;if (nextSpan->_n + span->_n > NPAGES - 1) break;//走到这里开始和后面的span合并,往后合并就不需要改变页号了span->_n += nextSpan->_n;_spanLists[nextSpan->_n].Erase(nextSpan);//delete nextSpan;_spanPool.Delete(nextSpan);}_spanLists[span->_n].PushFront(span); //把合并好的span挂回双链表去span->_isUse = false;//然后老样子,建立该span与其尾页的映射_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId + span->_n - 1] = span;
}
九,回收内存过程联调
①线程释放内存函数
- 前面说过,由于我们不知道线程要申请和释放的内存是多大,所以我们不能让线程直接访问thread cache的Allocate和Deallocate函数
- 需要再封装一层,在头文件ConcurrentAlloc.h里面实现:
static void ConcurrentFree(void* ptr, size_t size)
{pTLS_ThreadCache->Deallocate(ptr, size);
}
②主函数实现
- 前面申请内存时,我们是进行了三次申请,那么老样子,我们这次仍然进行三次申请,但加了三次释放,以此来测试我们的释放流程是否符合预期
void TestFreeAlloc()
{void* p1 = ConcurrentAlloc(6);void* p2 = ConcurrentAlloc(8);void* p3 = ConcurrentAlloc(1);ConcurrentFree(p1, 6);ConcurrentFree(p2, 8);ConcurrentFree(p3, 1);
}
- 这三次申请和释放的对象大小对齐后都是8字节,所以涉及的桶就是thread cache和central cache的第0号桶,以及page cache1号桶
③thread cache的释放流程
- 前两次释放不会把内存块还给central cache
- 当第三次释放时,_size的值为3了,所以就要把对象还给central cache了
- 先将0号桶的对象全部弹出,然后调用central cache的ReleaseListToSpans函数来回收内存块到span
④central cache的释放流程
- 由于我们还回来了3个对象,所以循环会指向三次,_useCount也会减减三次
- 当第三次减减时,_useCount为0时,就要将这个span近一步还给page
⑤page cache回收流程
- 当page cache拿到内存时,第一件事就是向往前或往后合并缓解内存碎片问题,但是由于第一次申请是把128切成了1页和127页的span,所以不会往前合并
- 往后合并时,找到127页的span合并成一个128页的span,然后将原来127页的span干掉,紧接着把这个128页的span挂到第128号桶中,建立span与首尾页的映射,最后再将span的状态设置成未使用即可
十,优化
10.1 大于256KB内存块的申请和释放
我们可以将我们涉及到的内存块大小分为三个阶段:(一页的大小是8K = 8 * 1024字节)
释放内存的大小 | 申请方式 | 释放方式 |
---|---|---|
x <= 256KB(32页) | 向thread cache申请 | 释放给thread cache |
32页 < x <= 128页 | 向page cache申请 | 释放给page cache |
x >= 128页 | 直接向堆申请 | 直接释放给堆 |
①申请大块内存
我们前面实现对齐函数RoundUp时,早就预先对大于256K的情况做了判断:
所以对之前的申请逻辑就要新增一些东西了,当申请对象大于256KB时,不用再向thread cache申请了,直接调用page cache的NewSpan申请指定页数的span即可
static void* ConcurrentAlloc(size_t size) //线程通过调用这个函数来获取资源
{if (size < MAX_BYTES) //如果线程申请的内存小于256KB,就先获取属于自己的thread cache,然后再调用它的Allocate{// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象if (pTLS_ThreadCache == nullptr){static ObjectPool<ThreadCache> tcPool;pTLS_ThreadCache = tcPool.New();//pTLS_ThreadCache = new ThreadCache;}return pTLS_ThreadCache->Allocate(size);}else //如果大于256KB,就去上层找{size_t alignSize = SizeClass::RoundUp(size);size_t kpage = alignSize >> PAGE_SHIFT;PageCache::GetInstance()->GetMtx().lock();Span* span = PageCache::GetInstance()->NewSpan(kpage);span->_objSize = size;PageCache::GetInstance()->GetMtx().unlock();void* ptr = (void*)(span->_pageId << PAGE_SHIFT);return ptr;}
}
256KB就32页,当申请的页数大于128页时,直接向堆申请,如果小于128页就继续在page cache里申请
Span* NewSpan(size_t k) //新获取一个k数量页的Span
{assert(k > 0 && k < NPAGES);if (k > NPAGES - 1) //当k大于256K,就直接向堆申请{void* ptr = SystemAlloc(k);//Span* span = new Span;Span* span = _spanPool.New();span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = k;_idSpanMap[span->_pageId] = span; //存一下span的页号,方便释放return span;}//先到第k号桶去找里面有没有spanif (!_spanLists[k].Empty()) //如果有span,就返回{Span* kSpan = _spanLists[k].PopFront();//建立id和span的映射,方便central cache回收小块内存时,查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}else //第K号桶是空的,就去后面的桶找并做切分{for (size_t i = k + 1; i < NPAGES; i++) //循环往后面找{//找到不为空的桶,就开始切,最后就是k页的span返回给中心缓存,n-k页的span重新挂到n-k位置的桶去if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront(); //找到一个大的span,切成k页和n-k页的两个span//Span* kSpan = new Span;Span* kSpan = _spanPool.New();//在nSpan的头部切一个k页下来,只要变页号就可以了kSpan->_pageId = nSpan->_pageId; //kSpan是管理nSpan的,也就是上面图片每个内存块前面的那个spankSpan->_n = k; //保存页数nSpan->_pageId += k; nSpan->_n -= k; //nSpan被拿走了k个页//把切出来的n-k个页再挂到n-k号桶上去去_spanLists[nSpan->_n].PushFront(nSpan);//存储nSpan的页号跟nSpan的映射,方便page cache回收内存时进行前后合并_idSpanMap[nSpan->_pageId] = nSpan;_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;//-1是因为假如我从1000页开始,到1004就是5个span了,所以是1000 + 5 - 1,所以要-1//建立id和span的映射,方便central cache回收小块内存时,查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}//循环如果走完后,意味着page cache里也没有span了,所以要直接向操作系统申请一个128页的span//比如线程第一次申请内存时,central cache和page cache里是啥也没有的,就会走到这个位置 //Span* bigSpan = new Span;Span* bigSpan = _spanPool.New();void* ptr = SystemAlloc(NPAGES - 1); //向系统申请申请128页的空间bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; //设置页号bigSpan->_n = NPAGES - 1; //设置页的数量,就是129-1=128个页_spanLists[bigSpan->_n].PushFront(bigSpan); //把申请出来的空间挂到对应的桶上return NewSpan(k); //为了避免代码重复,所以递归调用自己}
}
②释放大块内存
释放我们是一样的,当释放的内存块大于256KB时,我们直接调用page cache的释放逻辑,所以我们要先获取对应的span,把锁加好在释放:
static void ConcurrentFree(void* ptr, size_t size)
{if (size < MAX_BYTES){assert(pTLS_ThreadCache);pTLS_ThreadCache->Deallocate(ptr, size);}else{Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);PageCache::GetInstance()->GetMtx().lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->GetMtx().unlock();}
}
然后在page cache释放内存时,如果span的大小大于128页了,则直接释放给堆,否则继续由page cache回收
对于SystemFree和SystemAlloc是一样的,效率比free更快
//Common.h
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE);
#else//linux下sbrk unmmap等,这里仍然用freefree(ptr);
#endif
}
③测试
void BigAlloc()
{void* p1 = ConcurrentAlloc(257 * 1024);ConcurrentFree(p1, 257 * 1024);void* p2 = ConcurrentAlloc(129 * 8 * 1024);ConcurrentFree(p2, 129 * 8 * 1024);
}
①申请257KB大小内存
- 当申请大于256KB内存块时,直接向page cache申请,由于257KB对齐后是33页,所以page cache向内存申请128页大小的span然后分成33页和96页的span,把33页的span直接返回
- 释放时也是直接调用page cache回收函数,由于是33页,不大于128页,所以会尝试将33页进行向上合并,最终和95页的合并成128页,再将128页的span挂到第128号桶下
②申请129页大小内存
- 首先129页是大于256KB的,所以会先调用page cache的申请函数
- 然后page cache再次判断,发现要申请的129页是大于128页的,所以直接去找堆申请,建立好映射后返回129页的span
- 释放129页内存时,也是直接还给堆,不过这个不确定,因为我们page cache的最大页数是可以自定义的,假如我们把桶的最大存储页数改为200,那么就不会直接释放到堆里去了
10.2 释放对象时优化不传大小
我们的申请和释放还是有点不方便,因为目前我们申请内存时指明了大小,但是在释放时仍然要指定大小,太麻烦了,而且联想到malloc和free,free是直接传入一个指针不需要传入大小的,所以我们可以把我们的释放逻辑函数也搞成只传指针的
这时候我们span结构中的_objSize成员派上用场了,这个代表这个span管理的内存块被切成一个个对象的大小
最后我们释放逻辑的代码可以优化成不传大小的了
static void ConcurrentFree(void* ptr)
{Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);size_t size = span->_objSize;if (size < MAX_BYTES){assert(pTLS_ThreadCache);pTLS_ThreadCache->Deallocate(ptr, size);}else{PageCache::GetInstance()->GetMtx().lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->GetMtx().unlock();}
}
多线程测试代码:
void TestMultiThread()
{std::thread t1([]() {std::vector<void*> v;for (size_t i = 0; i < 5; ++i){void* ptr = ConcurrentAlloc(6);v.push_back(ptr);}for (auto e : v){ConcurrentFree(e);}});std::thread t2([]() {std::vector<void*> v;for (size_t i = 0; i < 5; ++i){void* ptr = ConcurrentAlloc(6);}for (auto e : v){ConcurrentFree(e);}});t1.join();t2.join();
}
10.3 读取映射关系时的加锁问题
- span与页号的映射关系是存储在PageCache类中的,当我们访问这个映射关系时需要加锁,因为page cache申请和释放时也会访问甚至改变映射关系,所以我们需要加锁
- 我们可以使用C++的只能指针,也可以用普通的锁
十一,多线程下对比malloc性能测试
下面是多线程下对比malloc的测试代码:
#include"ConcurrentAlloc.h"
#include<stdio.h>
//调试技巧:1,函数栈帧 + 条件断点 2,// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(malloc(16));//v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (int)malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, (int)free_costtime);printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}// 三个参数分别代表单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){/*cout << i << endl;if (j == 1 && i == 1024){int x = 0;}*/v.push_back(ConcurrentAlloc(16));//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n", nworks, rounds, ntimes, (int)malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (int)free_costtime);printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}int main()
{size_t n = 10000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}
①第一次测试
固定大小内存的释放和申请:
v.push_back(malloc(16));
v.push_back(ConcurrentAlloc(16));
结果如下:
每个线程都是申请释放固定大小的对象,那么就全都访问的是各自thread cache的同一个桶,所以对应的都会访问到central cache的同一个桶,那么central cache就没用了
所以下面的第二次测试就是测试申请和释放不同大小的内存
②第二次测试
第二次测试是测试不同大小内存的申请和释放
v.push_back(malloc((16 + i) % 8192 + 1));
v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
结果如下:
③分析结果
- 可以发现,我们现在的内存池的效率和malloc比起来还是有差距的,所以下面我们来分析下效率低下的原因在哪里,然后尝试解决
十二,使用基数树优化
12.1 性能瓶颈分析
VS2022环境下的性能分析步骤
我们要分析我们自己的内存池,所以我们把申请次数调小一点,并屏蔽malloc的:
int main()
{size_t n = 1000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;//BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}
①打开性能诊断
在debug模式下,我们的调试选框打开后有一个选项叫做“性能诊断”(不同版本的VS可能叫法不一样):
点击后会重新打开一个窗口
②查看结果
前面是系统内置函数,可以发现我们自己写的函数就这两个的时间占比最多,加起来达到了差不多70%
而在Deallocate中:
再扒开ReleaseListToSpans看:
- 可以看到是我们MapObjectToSpan这个函数里的这个锁消耗了很多时间
- 其次就是我们central cache的那个桶锁
③分析结果
- 上面的结果表示了,我们自己实现的内存池效率低下的主要原因就是在锁的竞争上面,也就是调用MapObjectToSpan函数访问映射关系时的加锁问题
- tcmalloc中针对这一点使用了基数树进行优化,可以使在读取这个映射关系时可以不加锁
12.2 关于基数树
基数树是一个分层的哈希表,根据层数可以分为单层基数树,二层基数树,三层基数树等
①单层基数树
- 单层基数树采用直接定址法的哈希表,key就是页号value就是一个指针,本质就是一个数组
- 最终数组的长度就是2^19 = 524288,每一个页号对应span的地址就存储在数组对应下标的位置,总共占用位置 2^19 * 4 = 2 ^ 21 = 2M左右
- 所以单层基数树只在32位下可以用,如果放到64位下,那么数组的大小就是2^51 * 8 = 2^23 G,显然不行,所以如果是64位我们要使用三层基数树
下面是单层基数树的代码,当我们需要建立映射时调用set函数,读取映射时调用get即可:
template <int BITS>
class TCMalloc_PageMap1
{
public:typedef uintptr_t Number;explicit TCMalloc_PageMap1() {size_t size = sizeof(void*) << BITS; //要开辟的数组大小size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);memset(array_, 0, sizeof(void*) << BITS);}void* get(Number k) const {if ((k >> BITS) > 0) return NULL;return array_[k]; //返回页号对应的span}void set(Number k, void* v) {assert((k >> BITS) == 0); //k的范围必须在[0, 2^BITS - 1]之内array_[k] = v;}
private:static const int LENGTH = 1 << BITS; //页的数量void** array_; //value就是一个指针
};
②二层基数树
- 二层基数树就是在单层基数树的基础上,把value的指针进行再映射,比如用前5个比特位再基数树的第一层进行映射,得到对应的第二层,然后用剩下的比特位再进行映射,最终得到该页的span*
- 第一层占用2^5 * 4 = 2^7比特空间,第二层和单层基数树一样占用大约2M的空间
- 一层基数树一开始就需要把2M的空间全部开出来,而二层数组不是,一开始只需要把第一层的数组搞出来,当某一页号映射时再开辟对应的第二层数组即可
下面是二层基数树代码:
template <int BITS>
class TCMalloc_PageMap2
{
private:static const int ROOT_BITS = 5; //第一层给5个二进制位,对应页号的前5个比特位static const int ROOT_LENGTH = 1 << ROOT_BITS; //第一层存储个个数static const int LEAF_BITS = BITS - ROOT_BITS; //第二层对应页号的其余比特位static const int LEAF_LENGTH = 1 << LEAF_BITS; //第二层存储个数struct Leaf { void* values[LEAF_LENGTH]; }; //第一层数组中存储的元素类型Leaf* root_[ROOT_LENGTH]; //第一层数组void* (*allocator_)(size_t);
public:typedef uintptr_t Number;explicit TCMalloc_PageMap2() {memset(root_, 0, sizeof(root_)); //初始化第一层空间Ensure(0, 1 << BITS);//在32位下。我们将二层基数树的两层数组全开辟出来也就消耗了2M的空间,所以我们可以最开始就全搞出来,64位下就要写在set里面了}void* get(Number k) const {const Number i1 = k >> LEAF_BITS; //获取第一层对应下标const Number i2 = k & (LEAF_LENGTH - 1); //获取第二层对应下标if ((k >> BITS) > 0 || root_[i1] == NULL) return NULL; //当页号不在范围内或者无映射关系直接返回空return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS; //第一层对应下标const Number i2 = k & (LEAF_LENGTH - 1); //第二层对应下标ASSERT(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v; //建立映射}//该函数是我为了保证当需要建立某一页号与span之间的映射关系时//需要先调用该函数确保用于映射的空间是已经开辟了的,没有就现场开辟bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;if (i1 >= ROOT_LENGTH) return false; //映射页号超出返回,返回错误if (root_[i1] == NULL) //第一层对应下标空间未开辟,直接现场开辟{static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}
};
③三层基数树
看了单层和二层基数树,那么三层基数树也不难了,就是在二层基数树的基础上,搞了三层数组,映射三次:
- 三层基数树适用于64位,此时当要建立某一页号的映射关系时,再开辟对应的数组空间,而没有建立映射的页号就可以摆在那不管不开辟空间,此时就能很大程度地节省内存空间
下面是三层基数树的代码:
template <int BITS>
class TCMalloc_PageMap3
{
private:static const int INTERIOR_BITS = (BITS + 2) / 3; //第1和2层对应页号static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS; //第1和2层存储元素static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; //第三层对应页号的比特位个数static const int LEAF_LENGTH = 1 << LEAF_BITS; //第三次存储元素的个数struct Node { Node* ptrs[INTERIOR_LENGTH]; };struct Leaf { void* values[LEAF_LENGTH]; };Node* root_; //根节点Node* NewNode(){Node* result = reinterpret_cast<Node*>();if (result != NULL) memset(result, 0, sizeof(*result));return result;}
public:typedef uintptr_t Number;explicit TCMalloc_PageMap3() { root_ = NewNode(); } //构造函数void* get(Number k) const{const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); //获取第一层对应下标const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层const Number i3 = k & (LEAF_LENGTH - 1); //第三层//页号超出返回,或对应的空间未开辟if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) return NULL;return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]; //返回页号对应的span*}void set(Number k, void* v){ASSERT(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); //第一层const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层const Number i3 = k & (LEAF_LENGTH - 1); //第三层Ensure(k, 1); //确保映射第k页页号的空间是开辟好了的reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v; //建立映射}bool Ensure(Number start, size_t n){for (Number key = start; key <= start + n - 1;){const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS); //第一层对应的下标const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH) return false; //下标超出返回,直接返回错if (root_->ptrs[i1] == NULL) //第一层下标对应空间为开辟,现场开辟{Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}if (root_->ptrs[i1]->ptrs[i2] == NULL) //第二层下标对应空间未开辟{Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));if (leaf == NULL) return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}
};
12.3 使用基数树优化代码
①然后我们就可以用基数树代替我们的unordered_map了:
- 但是我们目前就在32位下进行测试,因为64位和Linux一样,需要做一些额外的处理
②然后我们前面所有和映射相关的地方都要改下,首先从NewSpan开始:
③然后就是释放函数:
①最后就是造成我们效率低下的罪魁祸首MapObjectToSpan函数了
-
//获取从对象到span的映射 Span* MapObjectToSpan(void* obj) {PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT); //算出页号//std::unique_lock<std::mutex> lock(_pageMtx); //RAII的智能指针类,自动构建锁和释放锁//auto ret = _idSpanMap.find(id); //根据页号查找映射关系//if (ret != _idSpanMap.end()) //找到了//{// return ret->second; //返回该对象属于的span指针//}//else //没找到//{// assert(false);// return nullptr;//}//用这个就不用加锁了,其它的线程访问unordered_map的时候,它底层的红黑树可能会旋转,节点之间的关系可能会变,所以要加锁//用哈希表的话,线程1要扩容,但是扩容后的结果还没替换原来的表时,线程2也来扩容,这样就会有很大的问题,所以要加锁//1,只有在ReleaseSpanToPageCache和NewSpan这两个函数中才会去建立id和span的映射,也就是说会去写//2,基数树,写之前会提前开好空间,写数据过程中,不会动结构//3,它的读写是分离的。假如线程1对一个位置读写的时候,线程2不可能对这个位置写,间接完成了锁的功能auto ret = (Span*)_idSpanMap.get(id);assert(ret != nullptr);return ret; }
- STL的容器都是没有线程安全的,C++的map的底层是红黑树,unordered_map的底层是哈希表,两者再插入数据时底层的结构都有可能变化,比如红黑树插入数据后会旋转,哈希表插入后可能会扩容,造成数据不一致问题,需要加锁
- 而基数树我们是采用的固定长度的空间,不论什么时候去获取某个页的映射,都是在一个固定的位置进行读取
- 建立映射操作都是在page cache进行的,也就是说,读取映射的都是对应的span的_useCount不为0的页,建立映射时都是对应的span的_useCount等于0的页,所以读取和建立同一个页的映射操作不会同时进行
12.4 最终测试
现在可以看到我们的效率比malloc快了差不多十倍了
12.5 周边问题
问题:tcmalloc是如何替代malloc的?原理是什么?
解答:Google开源的tcmalloc是会直接替换malloc的,不同平台替换的方式不同,比如Unix系统上的glibc使用了weak alias的技术替换;Windows使用了hook的钩子技术进行替换,不用改原来的代码,只需用钩子将代码使用malloc的地方勾过来让其只需该内存池的代码,这样所有的malloc的调用都跳转到了tcmalloc的逻辑(所谓的外挂就是这样搞得,用来进行系统层面的函数更改)
问题:能否将threadcache和centralcache合并成一个,减掉一层?
解答:不能,前面说过CentralCache作用是承上启下,假设去掉中心缓存,那么thread cache就直接和central cache直接对接,会产生许多问题
- 本来span的切分工作是central cache搞的,去掉central cache后,page cache中的span就同时存在切好的和未切好的,那么就有可能给了thread cache一部分,但是自己却仍然有一部分
- 并且在申请内存时,page cache不知道哪些是切好的哪些没切所以如何把这些混乱的span分配给thread cache是一个大问题
- page cache是用一个大锁把桶全部锁住的,所以就效率这块远远没central cache的桶锁高,得不偿失
总结:
- central cache负责均衡多个线程之间的同一个大小的内存需求
- central cache负责管理切好的span,page cache负责管理大块span,能够让span合理分配使用
- central cache的桶锁的效率远比page cache的大锁高
问题:
- thread cache销毁了,但是但是它还有内存没释放给central cache怎么办
- 假设线程有内存泄漏,有可能还有一些内存没回来或者挂在thread cache中,但是这个线程销毁了,导致这些内存没有回到central cache怎么办
解答:给项目注册一个回调函数,规定只要线程结束,就调用回调函数先把thread cache里的内存释放给central cache,可以在创建TLS时就绑定回调函数,在new线程的thread cache时绑定一个回调函数,一旦线程挂了崩溃了就调用这个回调函数进行清理
自主实现的定长内存池源码:高并发内存池/定长内存池 · 小堃学编程/项目集合 - 码云 - 开源中国
tcmalloc项目完整源码地址为:GitHub - google/tcmalloc
或者国内镜像:tcmalloc: TCMalloc (google-perftools) 是用于优化C++写的多线程应用,比glibc 2.3的malloc快。这个模块可以用来让MySQL在高并发下内存占用更加稳定。