1. 项目介绍
1.1 这个项目是做什么的
1.2 这个项目主要应用的技术
2. 内存池
内存池是指程序预先从操作系统 申请一块足够大内存 ,此后,当程序中需要申请内存的时候,不是直接 向操作系统申请,而是 直接从内存池中获取 ;同理,当 程序释放内存 的时候,并不真正将内存返回给操作系统,而是 返回内存池 。当程序退出( 或者特定时间 ) 时,内存池才将之前申请的内存真正释放。
2.1 池化技术
2.2 内存池解决的问题
- 当然是解决效率的问题了
- 比如现在有385byte的空间,但是我们要申请超过256byte的空间却申请不出来
因为这两块空间碎片化,不连续了,这种碎片叫做外碎片 - 外碎片:是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求。
2.3 malloc
3. 定长内存池
1. 性能达到极致 2. 不考虑内存碎片等问题
3.1 主要思路
- step1:先开辟一大段的空间并且用指针_memeory指向
- step1: 又为了解决_memeory指向空间的小 < 申请空间的大小,引入了_remainBytes记录剩余空间
- step1: 申请空间,就会从这里面拿,而释放的空间,我选择用_freeList指针指向这段空间,并一一链接,做到回收并重复利用
- step1: 而在_freeList连接的空间里,我用前4或8个字节,记录下一个空间的地址(这就避免了重新定义变量去记录)
- step2: 这时我就意识到,申请空间需要先从_freeList里面拿,看看是否有回收的空间可以重复利用
3.2 ObjectPool.h
#pragma once
#include <iostream>
#include <vector>
#include <windows.h>
#include <new>
using std::cout;
using std::endl;template<class T>
class ObjectPool
{
public:T* New() {size_t objsize = sizeof(T) < sizeof(void*) ? 4 : sizeof(T);T* obj = NULL;// step2:申请空间有限从回收链表_freeList里拿if (_freeList) {obj = (T*)_freeList;_freeList = *(void**)obj;}else {// step1// 小于1个指针给1个指针的大小if (_remainBytes < objsize) {_remainBytes = 128 * 1024;_memeory = (char*)malloc(_remainBytes);// 开个128kb的空间if (_memccpy == NULL) {throw std::bad_alloc();// 直接抛出异常}}obj = (T*)_memeory;_memeory += objsize;_remainBytes -= objsize;}new(obj)T;// 定位new,就是初始化这段空间return obj;}void Delete(T* obj) {// step1// 将这些空间一一连接起来,回收并利用obj->~T();// 显示调用析构函数*(void**)obj = _freeList;// *(void**)解引用就是void*的大小,也就是一个指针的大小_freeList = obj;}
private:char* _memeory = NULL;// 指向内存块 头部 的指针size_t _remainBytes = 0;// 指向内存块 剩余 的指针void* _freeList = NULL;// 指向回收链表的 头指针
};
3.3 Test.c
#include "ObjectPool.h"
struct TreeNode
{int _val;TreeNode* _left;TreeNode* _right;TreeNode():_val(0), _left(nullptr), _right(nullptr){}
};void TestObjectPool()
{// 申请释放的轮次const size_t Rounds = 30;// 每轮申请释放多少次const size_t N = 1000000;//100wstd::vector<TreeNode*> v1;v1.reserve(N);size_t begin1 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v1.push_back(new TreeNode);}for (int i = 0; i < N; ++i){delete v1[i];}v1.clear();}size_t end1 = clock();ObjectPool<TreeNode> TNPool;std::vector<TreeNode*> v2;v2.reserve(N);size_t begin2 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v2.push_back(TNPool.New());}for (int i = 0; i < 100000; ++i){TNPool.Delete(v2[i]);}v2.clear();}size_t end2 = clock();cout << "new cost time:" << end1 - begin1 << endl;cout << "object pool cost time:" << end2 - begin2 << endl;
}int main()
{TestObjectPool();return 0;
}
- 在32位relase模式下,可以明显看出定长内存池的效率明显比malloc高很多
3.4 事后反思
3.4.1 反思一
ps: 还有一个很巧妙的设计点,为了解决不知道是32位的系统,还是64位的系统,我选择解引用2级指针,而2级指针的解引用就是1级指针的大小,4或8
3.4.2 反思二
ps: 在我设计的定长内存池中,我还是用了malloc开闭一大段空间作为内存池,这样还是没有避开malloc,之后我就想到直接调用系统接口开辟空间
- windows下向堆申请页为单位的大块内存的接口->VirtualAlloc
- Linux下向堆申请页为单位的大块内存的接口->brk和mmap
4. 高并发内存池整体框架设计
4.1 主要解决的问题
a. 性能问题
b. 多线程环境下,锁竞争问题
4.2 三大组成部分
- thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
- central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。
central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。 - page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小 的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache 会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片 的问题。
5. 高并发内存池--thread cache
thread cache 是哈希桶结构,每个桶是一个按桶位置 映射 大小的内存块对象的 自由链表 。每个线程都会有一个thread cache 对象,这样每个线程在这里获取对象和释放对象时是 无锁的
5.1 主要思路
-
由于后面需要重复包含某些头文件,这里可以把它们放在Comment.h中,方便后面调用
同理,也需要把freelist自由链表封装成类, -
而在ThreadCache中需要Allocate和Deallocate接口,因为是哈希桶结构(挂的是自由链表)
所以申请空间,会先在自由链表中拿,不够或没有会去找centrallcache拿,则又需要一个FetchFromcentrallcache接口,释放空间,会先返回自由链表中挂起,多了又会返回给centrallcache, -
同时需要考虑一个比较重要的就是哈希桶的映射规则,
计算对齐数(无法避免内碎片),和计算几号桶
static const size_t MAX_SIZE = 256 * 1024;// 最大字节数
static const size_t NFREELISTS = 208;// 最大自由链表数 - 为了保证每一个线程都有一个ThreadCache,且不会相互影响,这时需要引入TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
(线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这就保持了数据的线程独立性,而熟知的全局变量,是所以线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度) - 为了后面测试代码,这里需要再整体封装一层,在Concurrent.h中实现ConcurrentAlloc和ConcurrentFree
5.1 Commen.h
#pragma once
#include <iostream>
#include <vector>
#include <windows.h>
#include <new>
#include <assert.h>
#include <thread>
static const size_t MAX_SIZE = 256 * 1024;// 最大字节数
static const size_t NFREELISTS = 208;// 最大自由链表数
using std::cout;
using std::endl;// 这里是引用返回,这里本质就是返回obj前面4个字节的地址
static void*& NextObj(void* obj) {return *(void**)obj;
}// 管理内存块的自由链表
class FreeList {
public:// 头插void push(void* obj) {assert(obj);NextObj(obj) = _freeList;_freeList = obj;}// 头删void* pop() {assert(_freeList);void* obj = _freeList;_freeList = NextObj(obj);return obj;}bool empty() {return _freeList == nullptr;}
private:void* _freeList = nullptr;// 管理一个一个的小对象
};// 映射对齐规则
class SizeClass {
public:// 计算对齐数static inline size_t _RoundUp(size_t size, size_t Align_Num) {return ((size + Align_Num - 1) & ~(Align_Num - 1));// 位运算的效率更高}// 把函数定义成静态成员函数,就可以直接调用static size_t RoundUp(size_t size) {if (size <= 128) {return _RoundUp(size, 8);// 8字节对齐}else if (size <= 1024) {return _RoundUp(size, 16);}else if (size <= 8 * 1024) {return _RoundUp(size, 128);}else if (size <= 64 * 1024) {return _RoundUp(size, 1024);}else if (size <= 256 * 1024) {return _RoundUp(size, 8 * 1024);}else {assert(false);// 直接报错return -1;// 暂时不考虑254kb}}// 计算几号桶static size_t _IndexNum(size_t size,size_t Align_left) {return ((size + (1 << Align_left) - 1) >> Align_left) - 1;}static size_t IndexNum(size_t size) {int group_num[4] = { 16,72,128,184 };if (size <= 128) {return _IndexNum(size, 3);//根据对齐数->这里我直接传的是2几次方}else if (size <= 1024) {return _IndexNum(size-128, 4)+group_num[0];}else if (size <= 8 * 1024) {return _IndexNum(size-1024,7)+group_num[1];}else if (size <= 64 * 1024) {return _IndexNum(size-8*1024, 10) + group_num[2];}else if (size <= 256 * 1024) {return _IndexNum(size-64*1024, 13) + group_num[3];}else {assert(false);// 直接报错return -1;// 暂时不考虑254kb}}
};
5.2 Concurrent.h
5.3 ThreadCache.h
5.4 ThreadCache.cpp
5.5 Test.cpp
#include "Concurrent.h"void Thread1() {for (int i = 0; i < 5; i++) {void* obj = ConcurrentAlloc(9);//ConcurrentFree(obj, 9);}
}void Thread2() {for (int i = 0; i < 5; i++) {void* obj = ConcurrentAlloc(18);//ConcurrentFree(obj, 18);}
}void TestThreadCache() {std::thread t1(Thread1);t1.join();std::thread t2(Thread2);t2.join();}
int main()
{TestThreadCache();return 0;
}
5.6 事后反思
5.6.1 映射规则
5.6.2 缺少联动性
- ThreadCache申请空间,会先在自由链表中拿,不够或没有会去找centrallcache拿(未实现)
- 释放空间,会先返回自由链表中挂起,多了又会返回给centrallcache(未实现)
6.高并发内存池-CentrallCache
6.1 主要思路
- 首先需要把span和spanList实现出来, 其中spanList应该是一个带头双向循环链表,
- 由于存在多线程共同竞争的问题,这里可以使用单例模式中的饿汉模式
- 由于上一层,thread cache申请空间不够,会在centrall cache中拿空间,所以需要完善并实现FetchFromcentrallcache
- thread cache拿空间的时候,需要拿一批空间,这里具体需要拿多少个,可以使用慢调节算法
在Sizeclass(映射规则)中实现一个NumMoveSize,size_t n = MAX_SIZE / size;
if(n<2)就返回2,if(n>512)就返回512,其他就返回n
可以再控制一下慢调节算法,在自由链表中加入max_num变量,使其第一次拿给一个,第二次拿给二个(也可以增长的再快点)
size_t batchNum = min(_freeLists[Index].MaxNum(), SizeClass::NumMoveSize(size));
也不是想拿几个就拿几个,不够的话就是有多少给多少,size_t actualNum; - thread cache拿空间的时候,从代码中来看是一段自由链表,所以又需要void* start = nullptr, * end = nullptr;记录头尾,返回是第一个小内存块,而多的就需要挂起,
- centrall cache中需要实现一个函数,从中心缓存获取一定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
因为存在多个线程同时竞争的问题,所以需要加锁
给对象申请一批空间,应该从非空的span中拿,又需要实现一个找非空spand函数,
注意:如果是一开始的情况,是找不到非空span的则又需要从page cache中拿空间
Span* GetOneSpan(SpanList& list, size_t batchNum);// 暂时不实现 - FetchRangeObj它的返回值,应该是size_t,返回的是thread cache中centrall cache中拿的数量(并不是想拿多少就拿多少,不够的话就是有多少给多少)
6.2 CentrallCache.h
6.3 CentrallCache.cpp
#pragma once
#include "CentrallCache.h"CentrallCache CentrallCache::_sInst;
Span* CentrallCache::GetOneSpan(SpanList& list, size_t batchNum) {return nullptr;// 这里如果没有找到非空span,那么就可能从page Chache中拿内存
}size_t CentrallCache::FetchRangeObj(void*& start, void*& end,\size_t batchNum, size_t size) {size_t Index = SizeClass::IndexNum(size);// 几号桶// 因为存在多个线程同时竞争的问题,所以需要加锁_spanlists[Index]._mtx.lock();Span* newSpan = GetOneSpan(_spanlists[Index], batchNum);// 得到一个非空spanassert(newSpan);assert(newSpan->_freeList);size_t i = 0, actualNum = 1;start = newSpan->_freeList;end = newSpan->_freeList;while (i < batchNum-1 && NextObj(end)) {end = NextObj(end);i++;actualNum++;}newSpan->_freeList = NextObj(end);NextObj(end) = nullptr;// 断开_spanlists[Index]._mtx.lock();return actualNum;
}
6.3 其他新增
6.4 其他完善
6.5 事后反思
6.5.1 单例模式
- 由于存在多线程竞争同一块资源的问题,直接把CentrallCache类设计成饿汉模式最好
6.5.2 慢调节算法
- thread cache在centrall cache中拿数据,并不是只拿一个而是一批,具体是多少个需要计算
- size_t batchNum = min(_freeLists[Index].MaxNum(), SizeClass::NumMoveSize(size));
- size_t actualNum = CentrallCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
6.5.4 span中的成员变量
- PAGE_ID _pageId = 0;// 大块内存起始页的页号
- 在32位下,PAGE_ID 应该是size_t
- 在64位下,PAGE_ID 应该是unsigned long long
- 暂时未使用,需要和下一层联动 size_t _n = 0;// 页的数量
- 暂时未使用,需要和下一层联动 size_t _useCount = 0;// 已经使用页的数量
6.5.3 SpanList中的span和自由链表FreeList
- thread cache在centrall cache中拿数据,其实是在centrall cache中找到一个非空span,拿一段空间给thread cache,从代码上看就是一段自由链表
6.5.4 缺少联动性
- centrall cache中GetOneSpan找非空span的时候,有可能找不到,这时就应该去下一层page cache 中拿空间
- thread cache和centrall cache都只写了一部分申请的逻辑,而释放的逻辑暂时没写,因为要和下一层联动
7.高并发存池-Pagecahe Cache
记住 地址 = 页号 * 8kb
7.1 主要思路
- 因为还是存在多进程竞争的问题,所以这里还是把page cache设计成单列模式
- 虽然page cache也是哈希桶结构,但它是直接映射的,1page -> 1page,128page->page
- 在上一层CentrallCache 中没有span的时候,会从page cache通过NewSpan拿一个span,但具体拿几页的span,还是需要通过慢增长算法-> NumMovePage
- CentrallCache在得到这个大块内存span之后,需要把它切分成自由链表的形式,并挂在CentrallCache对应的哈希桶中
char* start = (char*)(span->_pageId << PAGE_SHIFT);// 起始地址 = 地址 * 8 * 1024
size_t bytes = span->_n << PAGE_SHIFT;// 页号* 8 * 1024
char* end = start + bytes; - 通过上面的思路,page cache中就需要实现NewSpan函数(获取某一页的大块内存)
如果在page cache的哈希桶对应的k页上有大块内存,则直接PopFront
否则就会从k+1页开始遍历,找一块大块内存,并切分成k页的span + n-k页的span
走到这里就说明 page cache为空,就需要从系统中申请内存(windows下,SystemAlloc)
7.2 加锁问题
- 毫无疑问page cache中肯定是需要加锁的,
- 但是不能设计成桶锁,因为page cache中会涉及到页的合并,要2page的没有它就会去找3page的,会将这个3page的切分1page + 2page的2个大块内存
同时也可能会存在2个线程,1个线程要1page,另一个线程要2page,需要同时拆分大page的情况 - 因为NewSpan中肯定会涉及多线程竞争的问题,则这里可以直接考虑把锁加在外面
central cache是通过GetOneSpan得到一个k页的span,则这里就可以先把centrall cache的桶锁解掉,这样如果有其他线程释放内存对象回来,就不会阻塞
7.3 PageCache.cpp
#include "PageCache.h"
PageCache PageCache::_sInst;// 初始化Span* PageCache::NewSpan(size_t k) {assert(k > 0 && k < NPAGE);if (!_spanlists[k].Empty()) {return _spanlists[k].PopFront();}// 去其他页中找,并分割for (int i = k + 1; i < NPAGE; i++) {if (!_spanlists[i].Empty()) {Span* nSpan = _spanlists[i].PopFront();Span* kSpan = new Span;// 在nSpan的头部切一个k页下来kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;// 注意可能有问题nSpan->_n = i - k;_spanlists[nSpan->_n].PushFront(nSpan);// 将剩余的放回去return kSpan;}}// 如果走到这里,说明就需要向系统申请空间了Span* bigSpan = new Span;void* ptr = SystemAlloc(NPAGE - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGE - 1;_spanlists[bigSpan->_n].PushFront(bigSpan);// 将这个128页的span又放到128号桶中return NewSpan(k);// 递归再走一遍,这时第128页的桶就是非空了,就可以切分,调用了
}
7.4 PageCache.h
#pragma once
#include "Commen.h"class PageCache {
public:// 创建一个接口使其能调用这个接口static PageCache* GetInstance() {return &_sInst;}Span* NewSpan(size_t k);// 获取一个k页的spanstd::mutex _pagemtx;// 这是一把大锁
private:SpanList _spanlists[NPAGE];private:// 单例模式PageCache() {};// 构造函数私有化PageCache(const PageCache&) = delete;// 禁掉拷贝构造PageCache& operator=(PageCache&) = delete;// 禁掉赋值重载static PageCache _sInst;// 单列模式 -> 饿汉模式,一开始就定义好
};
7.5 其他新增
// 管理连续页的跨度结构
class SpanList {
public:// 必须写构造函数SpanList() {pHead = new Span;pHead->_next = pHead;pHead->_prev = pHead;}Span* Begin() {return pHead->_next;}Span* End() {return pHead;}bool Empty() {return pHead == pHead->_next;}void PushFront(Span* span){Insert(Begin(), span);}Span* PopFront() {Span* front = pHead->_next;Erase(front);return front;}void Insert(Span* pos, Span* newSpan) {Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_next = pos;pos->_prev = newSpan;newSpan->_prev = prev;}void Erase(Span* pos) {Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}
private:Span* pHead = nullptr;// 头节点
public:std::mutex _mtx;// 桶锁
};
7.6 其他完善
7.7 事后反思
7.7.1 缺少联动性
- 其实项目走到这里,高并发内存池的申请过程 也走完了,其中的释放过程需要回收,内存空间,3层都是相互关联,
- 剩下的就需要把释放逻辑完成,再优化这个项目,再测试这个项目的性能
8. 申请过程联调
8.1 修改代码错误
- 项目一开始,thread cache中没有内存,会通过FetchFromcentrallcache函数向centrall cache中拿内存,这时应该是最好还是传桶号 + 对齐数
- span中的_useCount主要是在释放逻辑中起作用
8.2 验证centrall cache切分大块内存时连接成功
- 在这个项目的一开始,thread cache中没有内存,它会向centrall cache中申请内存,而centrall cache中没有内存,就会向page cache中申请内存,而它又没有,就会向系统申请一个128page的大块内存
- page cache会将这个128page分成 1page(返回) + 127page(挂起)
- 而centrall cache再得到这个1page的内存块,就会切分成一个一个的span并相互连接起来,最后再返回第一个span,
8.3 验证在centrall cache多次向page cache拿1page没有问题
- 项目的一开始是,page cache会向系统拿128page的内存块,
并分成1page(返回)+127page(挂起) - centrall cache用完了1page,再向page cache中拿时,page cache就会把127page 切分成1page(返回给centrall cache) + 126page(挂起)
9.释放并回收过程
9.1 thread cache释放逻辑
9.1.1 完善FreeList
- FreeList自由链表中需要引入_size记录个数,相应的pop,push就需要_size--和_size++
- FreeList的PopRange和PushRange最好保持一致,(void*& start, void*& end, size_t n)
9.1.2 ListTooLong
- 当这个自由链表的个数大于 一次批量的个数时,就会调用ListTooLong函数进行回收
9.2 Centrall cache释放逻辑
9.2.1 页号 与 span*的映射
std::unordered_map<PAGE_ID, Span*> _idSpanMap;// 页号 和 span*的映射
这里最后还是定义在page cache中,因为在后面页的合并,page cache也需要
- 因为多进程直接相互竞争,不断申请,不断释放,则这一段自由链表上挂的小内存块,有大有小,则就可以通过映射解决
std::unordered_map<PAGE_ID, Span*> _idSpanMapl;// 页号 和 span*的映射
9.2.2 ReleaseListToSpans
void CentrallCache::ReleaseListToSpans(void* start, size_t size) {// 把这段自由链表放到span中size_t Index = SizeClass::IndexNum(size);while (start) {void* next = NextObj(start);//记录下一个// 这里每次的span都可能不一样Span* span = PageCache::GetInstance()->MapObjectToSpan(start);// 头插一下NextObj(start) = span->_freeList;span->_freeList = start;span->_useCount--;// 如果下面这个条件为真的话,说明span的切分出的所有小块内存都还回来了// 这个span就可以再回收给page cache,page cache可以再尝试去做前后页的合并if (span->_useCount == 0) {_spanlists[Index].Erase(span);// 只是断开了连接,但是没有删除span->_freeList = nullptr;span->_next = nullptr;span->_prev = nullptr;// 防止多进程阻塞的问题,这里最后还是先把桶锁解开_spanlists[Index]._mtx.unlock();PageCache::GetInstance()->_pagemtx.lock();//再加一把大锁PageCache::GetInstance()->ReleaseSpanToPageCache(span);// 交给page cache让它合并PageCache::GetInstance()->_pagemtx.unlock();// 解锁_spanlists[Index]._mtx.lock();}start = next;}return;
}
- 当Centrall cache给thread cache的span都还回来了,就可以把它交给page cache进行页的合并
- 这时最后先把Centrall cache的桶锁去掉,再把page cache的大锁加上,这样就不会出现多进程的阻塞的问题,
9.3 Page cache释放逻辑
9.3.1 向前向后合并
- 如果 central cache 释放回一个 span , 则依次寻找 span 的前后 page id 的没有在使用的 空闲span , 看是否可以合并,如果合并继续向前或向后寻找。这样就可以将切小的内存合并收缩成大的 span ,减少 内存碎片
- page cache在系统中拿了个大块内存,并切分之后, 剩下的nSpan(页号是n-k)也是需要映射的,但此时只需要 映射前_pageId和后_pageId就行了
9.3.2 ReleaseSpanToPageCache
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span) {// 对前后页进行合并,缓解内存碎片的问题while (1) {PAGE_ID prevId = span->_pageId - 1;auto ret = _idSpanMap.find(prevId);// 前面的页号没有,就不合并了if (ret == _idSpanMap.end()) {break;}// 前面相邻页的span在使用,不合并了Span* prevSpan = ret->second;if (prevSpan->_isUse == true) {break;}// 合并超过128page的span没办法管理了,不合并了if (prevSpan->_n + span->_n > NPAGE - 1) {break;}// 合并成大块的页,也是需要映射的,被合并的page也是需要头删的span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;_spanlists[prevSpan->_n].Erase(prevSpan);delete prevSpan;}while (1) {PAGE_ID nextId = span->_pageId + span->_n;auto ret = _idSpanMap.find(nextId);// 前面的页号没有,就不合并了if (ret == _idSpanMap.end()) {break;}// 前面相邻页的span在使用,不合并了Span* nextSpan = ret->second;if (nextSpan->_isUse == true) {break;}// 合并超过128page的span没办法管理了,不合并了if (nextSpan->_n + span->_n > NPAGE - 1) {break;}// 合并成大块的页,也是需要映射的,被合并的page也是需要头删的span->_n += nextSpan->_n;_spanlists[nextSpan->_n].Erase(nextSpan);delete nextSpan;}// 最后合并成的新大块page也是需要插入 + 映射的_spanlists[span->_n].PushFront(span);span->_isUse = false;_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId+span->_n-1] = span;return;
}
9.4 释放过程联调
9.4.1 验证能在page cache中发生页的合并
void TestConcurrentAlloc1()
{void* p1 = ConcurrentAlloc(6);void* p2 = ConcurrentAlloc(8);void* p3 = ConcurrentAlloc(1);void* p4 = ConcurrentAlloc(7);void* p5 = ConcurrentAlloc(8);void* p6 = ConcurrentAlloc(8);void* p7 = ConcurrentAlloc(8);cout << p1 << endl;cout << p2 << endl;cout << p3 << endl;cout << p4 << endl;cout << p5 << endl;ConcurrentFree(p1, 6);ConcurrentFree(p2, 8);ConcurrentFree(p3, 1);ConcurrentFree(p4, 7);ConcurrentFree(p5, 8);ConcurrentFree(p6, 8);ConcurrentFree(p7, 8);
}
9.4.2 验证多线程下也会发生页的合并
void MultiThreadAlloc1()
{std::vector<void*> v;for (size_t i = 0; i < 7; ++i){void* ptr = ConcurrentAlloc(6);v.push_back(ptr);}for (auto e : v){ConcurrentFree(e, 6);}
}void MultiThreadAlloc2()
{std::vector<void*> v;for (size_t i = 0; i < 7; ++i){void* ptr = ConcurrentAlloc(16);v.push_back(ptr);}for (auto e : v){ConcurrentFree(e, 16);}
}void TestMultiThread()
{std::thread t1(MultiThreadAlloc1);std::thread t2(MultiThreadAlloc2);t1.join();t2.join();
}
- 当_useCount等于0就表示,centrall cache分配给thread cache中所有的span还回来了
- 则就需要调用page cache中的ReleaseSpanToPageCache把这几个span合并成一个大块的span(这里是合成128page)
10.收尾工作
10.1 解决申请内存 大于 256KB
10.1.1 ConcurrentAlloc
void* ConcurrentAlloc(size_t size) {if (size > MAX_SIZE){size_t alignSize = SizeClass::RoundUp(size);size_t kpage = alignSize >> PAGE_SHIFT;PageCache::GetInstance()->_pagemtx.lock();Span* span = PageCache::GetInstance()->NewSpan(kpage);// span->_pageId = size;PageCache::GetInstance()->_pagemtx.unlock();void* ptr = (void*)(span->_pageId << PAGE_SHIFT);return ptr;}else {if (pTLSThreadCache == nullptr) {pTLSThreadCache = new ThreadCache;}//cout << std::this_thread::get_id() << " : " << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);}
}
- <= 256KB 使用三层缓存
- > 256KB
- 128*8K >= size > 32 * 8K 就直接去page cache中拿内存
- size > 128*8k 直接去找系统堆
10.1.2 NewSpan
10.1.3 ConcurrentFree
10.1.4 ReleaseSpanToPageCache
- 由于在Newspan的时候 大于128kb的字节也做了映射处理,所以这里释放就可以统一处理
10.1.5 测试程序
void BigAlloc()
{void* p1 = ConcurrentAlloc(257 * 1024);ConcurrentFree(p1, 257 * 1024);void* p2 = ConcurrentAlloc(129 * 8 * 1024);ConcurrentFree(p2, 129 * 8 * 1024);
}
10.2 使用定长内存池配合脱离使用new
- 在这个项目里使用new最多的就是在page cache中,而为了使这个 项目脱离new,就可以使用上次的定长内存池,这所有的new 和delete都需要改造
- ObjectPool<Span> _spanPool;// 定长内存池
10.3 释放对象时优化为不传对象大小
- 只要在span中定义一个_objSize用来标识,记录每次切分的span大小就可以
- 至于为什么以前的释放要传size,主要是因为要区分是大于256kb,还是小于256kb的
10.3.1 对MapObjectToSpan的加锁
- 因为会涉及到多线程访问MapObjectToSpan,所以需要加锁
11. 性能测试
#include"ConcurrentAlloc.h"// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(malloc(16));v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (size_t)malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, (size_t)free_costtime);printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(ConcurrentAlloc(16));v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (size_t)malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (size_t)free_costtime);printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}int main()
{size_t n = 10000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}
12. 总结
一个高并发内存池是用于管理和分配内存的数据结构,它可以在多线程或多进程环境下高效地处理内存分配和释放的需求。以下是一个简单的设计思路:
- 内存块管理:内存池可以使用一个固定大小的内存块数组来管理可用的内存块。每个内存块都有一个标志位来表示是否已被分配。
- 分配算法:内存池可以使用一种高效的分配算法,如首次适应(First Fit)或最佳适应(Best Fit),来选择合适大小的内存块进行分配。
- 并发控制:为了支持高并发,内存池需要实现并发控制机制,如互斥锁或读写锁,以确保在多线程或多进程环境下的安全访问。
- 内存回收:当内存块不再使用时,需要将其标记为可用,并添加到可用内存块列表中,以便下次分配时可以重复利用。
- 扩展性:为了支持更高的并发和更大的内存需求,内存池可以实现动态扩展机制,当可用内存不足时,可以动态增加内存块的数量。
- 错误处理:内存池应该能够处理内存分配失败的情况,并提供相应的错误处理机制,如返回错误码或抛出异常。