项目一:高并发内存池

1. 项目介绍

1.1 这个项目是做什么的

当前项目是实现一个高并发的内存池,他的原型是 google 的一个 开源项目tcmalloc tcmalloc 全称
Thread-Caching Malloc ,即线程缓存的 malloc 实现了高效的多线程内存管理,用于替代系统的内存 分配相关的函数(malloc free )。
tcmalloc的代码量和复杂度上升了很多,再加上自己的能力有限的情况下,我主要是把tcmalloc 最核心的框架简化后拿出来,模拟实现出一个自己的高并发内存池,目的就是学习tcamlloc 的精华,

1.2 这个项目主要应用的技术

C/C++ 、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁
等等方面的知识

2. 内存池

内存池是指程序预先从操作系统 申请一块足够大内存 ,此后,当程序中需要申请内存的时候,不是直接 向操作系统申请,而是 直接从内存池中获取
同理,当 程序释放内存 的时候,并不真正将内存返回给操作系统,而是 返回内存池 。当程序退出( 或者特定时间 ) ,内存池才将之前申请的内存真正释放

 2.1 池化技术

所谓 池化技术 ,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过 量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率

2.2 内存池解决的问题

  •  当然是解决效率的问题了
  • 比如现在有385byte的空间,但是我们要申请超过256byte的空间却申请不出来
    因为这两块空间碎片化,不连续了,这种碎片叫做外碎片
  • 外碎片:是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求

2.3 malloc

C/C++ 中要动态申请内存都是通过 malloc 去申请内存,而实际我们不是直接去堆获取内存的
malloc 就是一个内存池 malloc() 相当于向操作系统 “批发 了一块较大的内存空间,然后 “零售” 给程序用。当全部“ 售完 或程序有大量的内存需求时,再根据实际需求向操作系统 进货

 

3. 定长内存池 

1. 性能达到极致 2. 不考虑内存碎片等问题

malloc 其实就是一个通用的大众货,什么场景下都可以用,
但是什么场景下都可以用就意味着什么场景下都不会有很高的性能

3.1 主要思路 

  •  step1:先开辟一大段的空间并且用指针_memeory指向
  • step1: 又为了解决_memeory指向空间的小 < 申请空间的大小,引入了_remainBytes记录剩余空间
  • step1: 申请空间,就会从这里面拿,而释放的空间,我选择用_freeList指针指向这段空间,并一一链接,做到回收并重复利用
  • step1: 而在_freeList连接的空间里,我用前4或8个字节,记录下一个空间的地址(这就避免了重新定义变量去记录)
  • step2: 这时我就意识到,申请空间需要先从_freeList里面拿,看看是否有回收的空间可以重复利用

3.2 ObjectPool.h

#pragma once
#include <iostream>
#include <vector>
#include <windows.h>
#include <new>
using std::cout;
using std::endl;template<class T>
class ObjectPool
{
public:T* New() {size_t objsize = sizeof(T) < sizeof(void*) ? 4 : sizeof(T);T* obj = NULL;// step2:申请空间有限从回收链表_freeList里拿if (_freeList) {obj = (T*)_freeList;_freeList = *(void**)obj;}else {// step1// 小于1个指针给1个指针的大小if (_remainBytes < objsize) {_remainBytes = 128 * 1024;_memeory = (char*)malloc(_remainBytes);// 开个128kb的空间if (_memccpy == NULL) {throw std::bad_alloc();// 直接抛出异常}}obj = (T*)_memeory;_memeory += objsize;_remainBytes -= objsize;}new(obj)T;// 定位new,就是初始化这段空间return obj;}void Delete(T* obj) {// step1// 将这些空间一一连接起来,回收并利用obj->~T();// 显示调用析构函数*(void**)obj = _freeList;// *(void**)解引用就是void*的大小,也就是一个指针的大小_freeList = obj;}
private:char* _memeory = NULL;// 指向内存块 头部 的指针size_t _remainBytes = 0;// 指向内存块 剩余 的指针void* _freeList = NULL;// 指向回收链表的 头指针
};

3.3 Test.c

#include "ObjectPool.h"
struct TreeNode
{int _val;TreeNode* _left;TreeNode* _right;TreeNode():_val(0), _left(nullptr), _right(nullptr){}
};void TestObjectPool()
{// 申请释放的轮次const size_t Rounds = 30;// 每轮申请释放多少次const size_t N = 1000000;//100wstd::vector<TreeNode*> v1;v1.reserve(N);size_t begin1 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v1.push_back(new TreeNode);}for (int i = 0; i < N; ++i){delete v1[i];}v1.clear();}size_t end1 = clock();ObjectPool<TreeNode> TNPool;std::vector<TreeNode*> v2;v2.reserve(N);size_t begin2 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v2.push_back(TNPool.New());}for (int i = 0; i < 100000; ++i){TNPool.Delete(v2[i]);}v2.clear();}size_t end2 = clock();cout << "new cost time:" << end1 - begin1 << endl;cout << "object pool cost time:" << end2 - begin2 << endl;
}int main()
{TestObjectPool();return 0;
}

  •  在32位relase模式下,可以明显看出定长内存池的效率明显比malloc高很多

3.4 事后反思 

3.4.1 反思一

ps: 还有一个很巧妙的设计点,为了解决不知道是32位的系统,还是64位的系统,我选择解引用2级指针,而2级指针的解引用就是1级指针的大小,4或8

3.4.2 反思二 

ps: 在我设计的定长内存池中,我还是用了malloc开闭一大段空间作为内存池,这样还是没有避开malloc,之后我就想到直接调用系统接口开辟空间

  • windows下向堆申请页为单位的大块内存的接口->VirtualAlloc
  • Linux向堆申请页为单位的大块内存的接口->brk和mmap

4. 高并发内存池整体框架设计

4.1 主要解决的问题 

a.   性能问题

b.   多线程环境下,锁竞争问题

c.    内存碎片问题

4.2 三大组成部分 

concurrent memory pool 主要由以下 3 个部分构成:
  1. thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方
  2. central cache:中心缓存是所有线程所共享,thread cache按需从central cache获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的
    central cache是存在竞争的,所以从这里取内存对象是需要加锁首先这里用的是桶锁,其次只有thread cache没有内存对象时才会找central cache,所以这里竞争不会很激烈
  3. page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小 的小块内存,分配给central cache当一个span的几个跨度页的对象都回收以后,page cache 会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片 的问题。

5. 高并发内存池--thread cache 

thread cache 是哈希桶结构,每个桶是一个按桶位置 映射 大小的内存块对象的 自由链表 每个线程都会有一个thread cache 对象,这样每个线程在这里获取对象和释放对象时是 无锁的

5.1 主要思路

  1. 由于后面需要重复包含某些头文件,这里可以把它们放在Comment.h中,方便后面调用
    同理,也需要把freelist自由链表封装成类

  2. 而在ThreadCache中需要AllocateDeallocate接口,因为是哈希桶结构(挂的是自由链表)
    所以申请空间,会先在自由链表中拿,不够或没有会去找centrallcache拿,则又需要一个FetchFromcentrallcache接口,释放空间,会先返回自由链表中挂起,多了又会返回给centrallcache,

  3. 同时需要考虑一个比较重要的就是哈希桶的映射规则
    计算对齐数(无法避免内碎片)和计算几号桶
    static const size_t MAX_SIZE = 256 * 1024;// 最大字节数
    static const size_t NFREELISTS = 208;// 最大自由链表数

  4.  为了保证每一个线程都有一个ThreadCache,且不会相互影响,这时需要引入TLS thread local storage
    static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
    (线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这就保持了数据的线程独立性,而熟知的全局变量,是所以线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度)
  5. 为了后面测试代码,这里需要再整体封装一层,在Concurrent.h中实现ConcurrentAllocConcurrentFree

 5.1 Commen.h

#pragma once
#include <iostream>
#include <vector>
#include <windows.h>
#include <new>
#include <assert.h>
#include <thread>
static const size_t MAX_SIZE = 256 * 1024;// 最大字节数
static const size_t NFREELISTS = 208;// 最大自由链表数
using std::cout;
using std::endl;// 这里是引用返回,这里本质就是返回obj前面4个字节的地址
static void*& NextObj(void* obj) {return *(void**)obj;
}// 管理内存块的自由链表
class FreeList {
public:// 头插void push(void* obj) {assert(obj);NextObj(obj) = _freeList;_freeList = obj;}// 头删void* pop() {assert(_freeList);void* obj = _freeList;_freeList = NextObj(obj);return obj;}bool empty() {return _freeList == nullptr;}
private:void* _freeList = nullptr;// 管理一个一个的小对象
};// 映射对齐规则
class SizeClass {
public:// 计算对齐数static inline size_t _RoundUp(size_t size, size_t Align_Num) {return ((size + Align_Num - 1) & ~(Align_Num - 1));// 位运算的效率更高}// 把函数定义成静态成员函数,就可以直接调用static size_t RoundUp(size_t size) {if (size <= 128) {return _RoundUp(size, 8);// 8字节对齐}else if (size <= 1024) {return _RoundUp(size, 16);}else if (size <= 8 * 1024) {return _RoundUp(size, 128);}else if (size <= 64 * 1024) {return _RoundUp(size, 1024);}else if (size <= 256 * 1024) {return _RoundUp(size, 8 * 1024);}else {assert(false);// 直接报错return -1;// 暂时不考虑254kb}}// 计算几号桶static size_t _IndexNum(size_t size,size_t Align_left) {return ((size + (1 << Align_left) - 1) >> Align_left) - 1;}static size_t IndexNum(size_t size) {int group_num[4] = { 16,72,128,184 };if (size <= 128) {return _IndexNum(size, 3);//根据对齐数->这里我直接传的是2几次方}else if (size <= 1024) {return _IndexNum(size-128, 4)+group_num[0];}else if (size <= 8 * 1024) {return _IndexNum(size-1024,7)+group_num[1];}else if (size <= 64 * 1024) {return _IndexNum(size-8*1024, 10) + group_num[2];}else if (size <= 256 * 1024) {return _IndexNum(size-64*1024, 13) + group_num[3];}else {assert(false);// 直接报错return -1;// 暂时不考虑254kb}}
};

 5.2 Concurrent.h

5.3 ThreadCache.h

5.4  ThreadCache.cpp

5.5 Test.cpp

#include "Concurrent.h"void Thread1() {for (int i = 0; i < 5; i++) {void* obj = ConcurrentAlloc(9);//ConcurrentFree(obj, 9);}
}void Thread2() {for (int i = 0; i < 5; i++) {void* obj = ConcurrentAlloc(18);//ConcurrentFree(obj, 18);}
}void TestThreadCache() {std::thread t1(Thread1);t1.join();std::thread t2(Thread2);t2.join();}
int main()
{TestThreadCache();return 0;
}

5.6 事后反思

5.6.1 映射规则

5.6.2  缺少联动性

  • ThreadCache申请空间,会先在自由链表中拿,不够或没有会去找centrallcache拿(未实现)
  • 释放空间,会先返回自由链表中挂起,多了又会返回给centrallcache(未实现)

6.高并发内存池-CentrallCache

central cache 也是一个 哈希桶 结构,他的哈希桶的映射关系跟 thread cache 是一样的。
不同的是他的每个哈希桶位置挂是SpanList 链表结构,每个映射桶下面的 span 中的大内存块被按映射关系切成了一个个小内存块对象挂在span 的自由链表中

6.1 主要思路 

  

  1. 首先需要把span和spanList实现出来, 其中spanList应该是一个带头双向循环链表
  2. 由于存在多线程共同竞争的问题,这里可以使用单例模式中的饿汉模式
  3. 由于上一层,thread cache申请空间不够,会在centrall cache中拿空间,所以需要完善并实现FetchFromcentrallcache
  4.  thread cache拿空间的时候,需要拿一批空间,这里具体需要拿多少个,可以使用慢调节算法
    Sizeclass(映射规则)中实现一个NumMoveSize,size_t n = MAX_SIZE / size;
    if(n<2)就返回2,if(n>512)就返回512,其他就返回n

    可以再控制一下慢调节算法,在自由链表中加入max_num变量,使其第一次拿给一个,第二次拿给二个(也可以增长的再快点)
    size_t batchNum = min(_freeLists[Index].MaxNum(), SizeClass::NumMoveSize(size));
    也不是想拿几个就拿几个,不够的话就是有多少给多少,size_t actualNum;
  5.   thread cache拿空间的时候,从代码中来看是一段自由链表,所以又需要void* start = nullptr, * end = nullptr;记录头尾,返回是第一个小内存块,而多的就需要挂起,
  6. centrall cache中需要实现一个函数,从中心缓存获取一定数量的对象给thread cache
    size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
    因为存在多个线程同时竞争的问题,所以需要加锁
    给对象申请一批空间,应该从非空的span中拿,又需要实现一个找非空spand函数,
    注意:如果是一开始的情况,是找不到非空span的则又需要从page cache中拿空间
    Span* GetOneSpan(SpanList& list, size_t batchNum);// 暂时不实现
  7. FetchRangeObj它的返回值,应该是size_t,返回的是thread cache中centrall cache中拿的数量(并不是想拿多少就拿多少,不够的话就是有多少给多少)

6.2 CentrallCache.h 

6.3 CentrallCache.cpp 

#pragma once
#include "CentrallCache.h"CentrallCache CentrallCache::_sInst;
Span* CentrallCache::GetOneSpan(SpanList& list, size_t batchNum) {return nullptr;// 这里如果没有找到非空span,那么就可能从page Chache中拿内存
}size_t CentrallCache::FetchRangeObj(void*& start, void*& end,\size_t batchNum, size_t size) {size_t Index = SizeClass::IndexNum(size);// 几号桶// 因为存在多个线程同时竞争的问题,所以需要加锁_spanlists[Index]._mtx.lock();Span* newSpan = GetOneSpan(_spanlists[Index], batchNum);// 得到一个非空spanassert(newSpan);assert(newSpan->_freeList);size_t i = 0, actualNum = 1;start = newSpan->_freeList;end = newSpan->_freeList;while (i < batchNum-1 && NextObj(end)) {end = NextObj(end);i++;actualNum++;}newSpan->_freeList = NextObj(end);NextObj(end) = nullptr;// 断开_spanlists[Index]._mtx.lock();return actualNum;
}

 6.3 其他新增

6.4 其他完善 

6.5 事后反思 

6.5.1 单例模式 

  • 由于存在多线程竞争同一块资源的问题,直接把CentrallCache类设计成饿汉模式最好

6.5.2 慢调节算法

  • thread cache在centrall cache中拿数据,并不是只拿一个而是一批,具体是多少个需要计算
  • size_t batchNum = min(_freeLists[Index].MaxNum(), SizeClass::NumMoveSize(size));
  • size_t actualNum = CentrallCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);

6.5.4 span中的成员变量

  • PAGE_ID _pageId = 0;// 大块内存起始页的页号
    • 在32位下,PAGE_ID 应该是size_t
    • 在64位下,PAGE_ID 应该是unsigned long long
  • 暂时未使用,需要和下一层联动 size_t _n = 0;// 页的数量
  • 暂时未使用,需要和下一层联动 size_t _useCount = 0;// 已经使用页的数量 

6.5.3 SpanList中的span和自由链表FreeList

  • thread cache在centrall cache中拿数据,其实是在centrall cache中找到一个非空span,拿一段空间给thread cache,从代码上看就是一段自由链表

6.5.4 缺少联动性

  • centrall cache中GetOneSpan找非空span的时候,有可能找不到,这时就应该去下一层page cache 中拿空间
  • thread cache和centrall cache都只写了一部分申请的逻辑,而释放的逻辑暂时没写,因为要和下一层联动 

7.高并发存池-Pagecahe Cache

记住 地址  = 页号 * 8kb 

central cache 也是一个 哈希桶 结构,他的哈希桶的映射关系跟Centrall Cache 是一样的。
不同的是他的每个span都是 一大块内存

 7.1 主要思路

  1. 因为还是存在多进程竞争的问题,所以这里还是把page cache设计成单列模式
  2. 虽然page cache也是哈希桶结构,但它是直接映射的,1page -> 1page,128page->page
  3. 在上一层CentrallCache 中没有span的时候,会从page cache通过NewSpan拿一个span,但具体拿几页的span,还是需要通过慢增长算法-> NumMovePage
  4. CentrallCache在得到这个大块内存span之后,需要把它切分成自由链表的形式,并挂在CentrallCache对应的哈希桶中
    char* start = (char*)(span->_pageId << PAGE_SHIFT);// 起始地址 = 地址 * 8 * 1024
    size_t bytes = span->_n << PAGE_SHIFT;// 页号* 8 * 1024
    char* end = start + bytes;
  5. 通过上面的思路,page cache中就需要实现NewSpan函数(获取某一页的大块内存)
    如果在page cache的哈希桶对应的k页上有大块内存,则直接PopFront
    否则就会从k+1页开始遍历,找一块大块内存,并切分成k页的span + n-k页的span
    走到这里就说明 page cache为空,就需要从系统中申请内存(windows下,SystemAlloc)

 7.2 加锁问题

  • 毫无疑问page cache中肯定是需要加锁的,
  • 但是不能设计成桶锁,因为page cache中会涉及到页的合并,要2page的没有它就会去找3page的,会将这个3page的切分1page + 2page的2个大块内存
    同时也可能会存在2个线程,1个线程要1page,另一个线程要2page,需要同时拆分大page的情况
  • 因为NewSpan中肯定会涉及多线程竞争的问题,则这里可以直接考虑把锁加在外面
    central cache是通过GetOneSpan得到一个k页的span,则这里就可以先把centrall cache的桶锁解掉,这样如果有其他线程释放内存对象回来,就不会阻塞

7.3 PageCache.cpp

#include "PageCache.h"
PageCache PageCache::_sInst;// 初始化Span* PageCache::NewSpan(size_t k) {assert(k > 0 && k < NPAGE);if (!_spanlists[k].Empty()) {return _spanlists[k].PopFront();}// 去其他页中找,并分割for (int i = k + 1; i < NPAGE; i++) {if (!_spanlists[i].Empty()) {Span* nSpan = _spanlists[i].PopFront();Span* kSpan = new Span;// 在nSpan的头部切一个k页下来kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;// 注意可能有问题nSpan->_n = i - k;_spanlists[nSpan->_n].PushFront(nSpan);// 将剩余的放回去return kSpan;}}// 如果走到这里,说明就需要向系统申请空间了Span* bigSpan = new Span;void* ptr = SystemAlloc(NPAGE - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGE - 1;_spanlists[bigSpan->_n].PushFront(bigSpan);// 将这个128页的span又放到128号桶中return NewSpan(k);// 递归再走一遍,这时第128页的桶就是非空了,就可以切分,调用了
}

 7.4 PageCache.h

#pragma once
#include "Commen.h"class PageCache {
public:// 创建一个接口使其能调用这个接口static PageCache* GetInstance() {return &_sInst;}Span* NewSpan(size_t k);// 获取一个k页的spanstd::mutex _pagemtx;// 这是一把大锁
private:SpanList _spanlists[NPAGE];private:// 单例模式PageCache() {};// 构造函数私有化PageCache(const PageCache&) = delete;// 禁掉拷贝构造PageCache& operator=(PageCache&) = delete;// 禁掉赋值重载static PageCache _sInst;// 单列模式 -> 饿汉模式,一开始就定义好
};

7.5 其他新增

// 管理连续页的跨度结构
class SpanList {
public:// 必须写构造函数SpanList() {pHead = new Span;pHead->_next = pHead;pHead->_prev = pHead;}Span* Begin() {return pHead->_next;}Span* End() {return pHead;}bool Empty() {return pHead == pHead->_next;}void PushFront(Span* span){Insert(Begin(), span);}Span* PopFront() {Span* front = pHead->_next;Erase(front);return front;}void Insert(Span* pos, Span* newSpan) {Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_next = pos;pos->_prev = newSpan;newSpan->_prev = prev;}void Erase(Span* pos) {Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}
private:Span* pHead = nullptr;// 头节点
public:std::mutex _mtx;// 桶锁
};

7.6 其他完善

7.7 事后反思

7.7.1 缺少联动性

  • 其实项目走到这里,高并发内存池的申请过程 也走完了,其中的释放过程需要回收,内存空间,3层都是相互关联,
  • 剩下的就需要把释放逻辑完成,再优化这个项目,再测试这个项目的性能

8. 申请过程联调

8.1 修改代码错误

  •  项目一开始,thread cache中没有内存,会通过FetchFromcentrallcache函数向centrall cache中拿内存,这时应该是最好还是传桶号 + 对齐数

  •  span中的_useCount主要是在释放逻辑中起作用

8.2 验证centrall cache切分大块内存时连接成功 

  • 在这个项目的一开始,thread cache中没有内存,它会向centrall cache中申请内存,而centrall cache中没有内存,就会向page cache中申请内存,而它又没有,就会向系统申请一个128page的大块内存
  • page cache会将这个128page分成 1page(返回) + 127page(挂起)
  • centrall cache再得到这个1page的内存块,就会切分成一个一个的span并相互连接起来,最后再返回第一个span

8.3 验证在centrall cache多次向page cache拿1page没有问题

  • 项目的一开始是,page cache会向系统拿128page的内存块
    并分成1page(返回)+127page(挂起)
  •  centrall cache用完了1page,再向page cache中拿时,page cache就会把127page 切分成1page(返回给centrall cache) + 126page(挂起)

9.释放并回收过程 

9.1 thread cache释放逻辑

9.1.1 完善FreeList

  • FreeList自由链表中需要引入_size记录个数,相应的pop,push就需要_size--和_size++
  • FreeList的PopRangePushRange最好保持一致,(void*& start, void*& end, size_t n)

9.1.2 ListTooLong

  • 当这个自由链表的个数大于 一次批量的个数时,就会调用ListTooLong函数进行回收

9.2 Centrall cache释放逻辑 

9.2.1 页号 与 span*的映射 

std::unordered_map<PAGE_ID, Span*> _idSpanMap;// 页号 和 span*的映射

这里最后还是定义在page cache中,因为在后面页的合并,page cache也需要

  • 因为多进程直接相互竞争,不断申请,不断释放,则这一段自由链表上挂的小内存块,有大有小,则就可以通过映射解决
    std::unordered_map<PAGE_ID, Span*> _idSpanMapl;// 页号 和 span*的映射

9.2.2 ReleaseListToSpans 

void CentrallCache::ReleaseListToSpans(void* start, size_t size) {// 把这段自由链表放到span中size_t Index = SizeClass::IndexNum(size);while (start) {void* next = NextObj(start);//记录下一个// 这里每次的span都可能不一样Span* span = PageCache::GetInstance()->MapObjectToSpan(start);// 头插一下NextObj(start) = span->_freeList;span->_freeList = start;span->_useCount--;// 如果下面这个条件为真的话,说明span的切分出的所有小块内存都还回来了// 这个span就可以再回收给page cache,page cache可以再尝试去做前后页的合并if (span->_useCount == 0) {_spanlists[Index].Erase(span);// 只是断开了连接,但是没有删除span->_freeList = nullptr;span->_next = nullptr;span->_prev = nullptr;// 防止多进程阻塞的问题,这里最后还是先把桶锁解开_spanlists[Index]._mtx.unlock();PageCache::GetInstance()->_pagemtx.lock();//再加一把大锁PageCache::GetInstance()->ReleaseSpanToPageCache(span);// 交给page cache让它合并PageCache::GetInstance()->_pagemtx.unlock();// 解锁_spanlists[Index]._mtx.lock();}start = next;}return;
}
  • Centrall cachethread cache的span都还回来了,就可以把它交给page cache进行页的合并
  • 这时最后先把Centrall cache的桶锁去掉,再把page cache的大锁加上,这样就不会出现多进程的阻塞的问题,

9.3 Page cache释放逻辑 

9.3.1 向前向后合并

  • 如果 central cache 释放回一个 span 则依次寻找 span 的前后 page id 的没有在使用的 空闲span 看是否可以合并,如果合并继续向前或向后寻找。这样就可以将切小的内存合并收缩成大的 span ,减少 内存碎片
  • page cache在系统中拿了个大块内存,并切分之后, 剩下的nSpan(页号是n-k)也是需要映射的,但此时只需要 映射前_pageId和后_pageId就行了

9.3.2 ReleaseSpanToPageCache

// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span) {// 对前后页进行合并,缓解内存碎片的问题while (1) {PAGE_ID prevId = span->_pageId - 1;auto ret = _idSpanMap.find(prevId);// 前面的页号没有,就不合并了if (ret == _idSpanMap.end()) {break;}// 前面相邻页的span在使用,不合并了Span* prevSpan = ret->second;if (prevSpan->_isUse == true) {break;}// 合并超过128page的span没办法管理了,不合并了if (prevSpan->_n + span->_n > NPAGE - 1) {break;}// 合并成大块的页,也是需要映射的,被合并的page也是需要头删的span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;_spanlists[prevSpan->_n].Erase(prevSpan);delete prevSpan;}while (1) {PAGE_ID nextId = span->_pageId + span->_n;auto ret = _idSpanMap.find(nextId);// 前面的页号没有,就不合并了if (ret == _idSpanMap.end()) {break;}// 前面相邻页的span在使用,不合并了Span* nextSpan = ret->second;if (nextSpan->_isUse == true) {break;}// 合并超过128page的span没办法管理了,不合并了if (nextSpan->_n + span->_n > NPAGE - 1) {break;}// 合并成大块的页,也是需要映射的,被合并的page也是需要头删的span->_n += nextSpan->_n;_spanlists[nextSpan->_n].Erase(nextSpan);delete nextSpan;}// 最后合并成的新大块page也是需要插入 + 映射的_spanlists[span->_n].PushFront(span);span->_isUse = false;_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId+span->_n-1] = span;return;
}

9.4  释放过程联调

9.4.1 验证能在page cache中发生页的合并

void TestConcurrentAlloc1()
{void* p1 = ConcurrentAlloc(6);void* p2 = ConcurrentAlloc(8);void* p3 = ConcurrentAlloc(1);void* p4 = ConcurrentAlloc(7);void* p5 = ConcurrentAlloc(8);void* p6 = ConcurrentAlloc(8);void* p7 = ConcurrentAlloc(8);cout << p1 << endl;cout << p2 << endl;cout << p3 << endl;cout << p4 << endl;cout << p5 << endl;ConcurrentFree(p1, 6);ConcurrentFree(p2, 8);ConcurrentFree(p3, 1);ConcurrentFree(p4, 7);ConcurrentFree(p5, 8);ConcurrentFree(p6, 8);ConcurrentFree(p7, 8);
}

 9.4.2 验证多线程下也会发生页的合并

void MultiThreadAlloc1()
{std::vector<void*> v;for (size_t i = 0; i < 7; ++i){void* ptr = ConcurrentAlloc(6);v.push_back(ptr);}for (auto e : v){ConcurrentFree(e, 6);}
}void MultiThreadAlloc2()
{std::vector<void*> v;for (size_t i = 0; i < 7; ++i){void* ptr = ConcurrentAlloc(16);v.push_back(ptr);}for (auto e : v){ConcurrentFree(e, 16);}
}void TestMultiThread()
{std::thread t1(MultiThreadAlloc1);std::thread t2(MultiThreadAlloc2);t1.join();t2.join();
}

  • 当_useCount等于0就表示,centrall cache分配给thread cache中所有的span还回来了
  • 则就需要调用page cache中的ReleaseSpanToPageCache把这几个span合并成一个大块的span(这里是合成128page) 

10.收尾工作

10.1 解决申请内存 大于 256KB

10.1.1 ConcurrentAlloc

void* ConcurrentAlloc(size_t size) {if (size > MAX_SIZE){size_t alignSize = SizeClass::RoundUp(size);size_t kpage = alignSize >> PAGE_SHIFT;PageCache::GetInstance()->_pagemtx.lock();Span* span = PageCache::GetInstance()->NewSpan(kpage);// span->_pageId = size;PageCache::GetInstance()->_pagemtx.unlock();void* ptr = (void*)(span->_pageId << PAGE_SHIFT);return ptr;}else {if (pTLSThreadCache == nullptr) {pTLSThreadCache = new ThreadCache;}//cout << std::this_thread::get_id() << " : " << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);}
}
  • <= 256KB 使用三层缓存
  • > 256KB
    • 128*8K >= size > 32 * 8K 就直接去page cache中拿内存
    • size > 128*8k 直接去找系统堆

10.1.2 NewSpan 

10.1.3 ConcurrentFree

10.1.4  ReleaseSpanToPageCache

  • 由于在Newspan的时候 大于128kb的字节也做了映射处理,所以这里释放就可以统一处理

 10.1.5 测试程序

void BigAlloc()
{void* p1 = ConcurrentAlloc(257 * 1024);ConcurrentFree(p1, 257 * 1024);void* p2 = ConcurrentAlloc(129 * 8 * 1024);ConcurrentFree(p2, 129 * 8 * 1024);
}

10.2 使用定长内存池配合脱离使用new

  •  在这个项目里使用new最多的就是在page cache中,而为了使这个 项目脱离new,就可以使用上次的定长内存池,这所有的new delete都需要改造
  • ObjectPool<Span> _spanPool;// 定长内存池

10.3 释放对象时优化为不传对象大小 

  •  只要在span中定义一个_objSize用来标识,记录每次切分的span大小就可以
  • 至于为什么以前的释放要传size,主要是因为要区分是大于256kb,还是小于256kb

10.3.1 对MapObjectToSpan的加锁

  •  因为会涉及到多线程访问MapObjectToSpan,所以需要加锁

11. 性能测试 

#include"ConcurrentAlloc.h"// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(malloc(16));v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (size_t)malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, (size_t)free_costtime);printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(ConcurrentAlloc(16));v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (size_t)malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (size_t)free_costtime);printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}int main()
{size_t n = 10000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}

12. 总结

 一个高并发内存池是用于管理和分配内存的数据结构,它可以在多线程或多进程环境下高效地处理内存分配和释放的需求。以下是一个简单的设计思路:

  • 内存块管理:内存池可以使用一个固定大小的内存块数组来管理可用的内存块。每个内存块都有一个标志位来表示是否已被分配。
  • 分配算法:内存池可以使用一种高效的分配算法,如首次适应(First Fit)或最佳适应(Best Fit),来选择合适大小的内存块进行分配。
  • 并发控制:为了支持高并发,内存池需要实现并发控制机制,如互斥锁或读写锁,以确保在多线程或多进程环境下的安全访问。
  • 内存回收:当内存块不再使用时,需要将其标记为可用,并添加到可用内存块列表中,以便下次分配时可以重复利用。
  • 扩展性:为了支持更高的并发和更大的内存需求,内存池可以实现动态扩展机制,当可用内存不足时,可以动态增加内存块的数量。
  • 错误处理:内存池应该能够处理内存分配失败的情况,并提供相应的错误处理机制,如返回错误码或抛出异常。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/687643.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

蓝桥杯备赛_python_BFS搜索算法_刷题学习笔记

1 bfs广度优先搜索 1.1 是什么 1.2怎么实现 2案例学习 2.1.走迷宫 2.2.P1443 马的遍历 2.3. 九宫重排&#xff08;看答案学的&#xff0c;实在写不来&#xff09; 2.4.青蛙跳杯子&#xff08;学完九宫重排再做bingo&#xff09; 2.5. 长草 3.总结 1 bfs广度优先搜索 【P…

1.初识Tauri

文章目录 一、前言二、基本认识三、js与rust通信四、构建应用 一、前言 原文以及后续文章可点击查看&#xff1a;初识Tauri。 Tauri是一款比较新的跨平台桌面框架&#xff0c;也是我目前最喜欢的一个框架&#xff0c;其官网为&#xff1a;Tauri 它的作用其实和Electron很像&…

【PyQt】在PyQt5的界面上集成matplotlib绘制的图像

文章目录 0 前期教程1 概述2 matplotlib2.1 库导入2.2 图片的各个部分解释2.3 代码风格2.4 后端 3 集成matplotlib图像到pyqt界面中3.1 使用到的模块3.2 理解Qt Designer中的“控件提升”3.3 界面与逻辑分离的思路3.4 扩展 0 前期教程 【PyQt】PyQt5进阶——串口上位机及实时数…

transformer-Attention is All You Need(一)

1. 为什么需要transformer 循环模型通常沿输入和输出序列的符号位置进行因子计算。通过在计算期间将位置与步骤对齐&#xff0c;它们根据前一步的隐藏状态和输入产生位置的隐藏状态序列。这种固有的顺序特性阻止了训练样本内的并行化&#xff0c;这在较长的序列长度上变得至关重…

STM32-开发工具

开发过程中可能用到的工具 1、烧录下载调试工具ST-LINK ST-LINK&#xff0c;是ST(意法半导体)推出的调试编程工具&#xff0c;适用于STM32系列芯片的USB接口的下载及在线仿真器。 2、串口调试工具/串口下载工具 串口调试工具是一种用于通过串口通信协议与目标设备进行数据交…

源码网打包,目前有3000多个资源

源码网打包&#xff0c;目前有3000多个资源 需要赶快下手吧&#xff0c;到手可以使用&#xff0c;搭建好和本站一样&#xff0c;全网唯一 优化缩略图演示&#xff1a;https://www.htm.ink默认缩略图演示&#xff1a;https://blog.htm.ink网站截图

const--类的常量成员函数

在C中,为了禁止成员函数修改数据成员的值,可以将它设置为常量成员函数。设置常量成员函数的方法是在函数原型的后面加上const,形式如下: class x { …………… T f(t1,t2) const{} ………… }; 常量成员函数的作用&#xff1a; 将成员函数设置为const&#xff0c;表明该成员函…

FMEA的六大分类——SunFMEA软件

FMEA是一种预防性的质量工具&#xff0c;通过对产品设计或过程的故障模式进行分析&#xff0c;评估其可能产生的影响&#xff0c;从而采取相应的措施来降低产品的故障风险。根据分析的范围和目的&#xff0c;FMEA可以分为以下几种类型&#xff0c;今天sun fmea软件系统和大家一…

理解孟子思想,传承中华文化

为了更好地了解和传承中华文化&#xff0c;加深对孟子思想的认识与理解&#xff0c;探究孟子思想在现代社会的传承与发展&#xff0c;2024年2月18日&#xff0c;曲阜师范大学计算机学院“古韵新声&#xff0c;格物致‘知’”实践队队员崔本迪在山东省泰安市东平县进行了深入的调…

vue-路由(六)

阅读文章你可以收获什么&#xff1f; 1 明白什么是单页应用 2 知道vue中的路由是什么 3 知道如何使用vueRouter这个路由插件 4 知道如何如何封装路由组件 5 知道vue中的声明式导航router-link的用法 6 知道vue中的编程式导航的使用 7 知道声明式导航和编程式导航式如何传…

代码随想录算法训练营第33天| Leetcode1005.K次取反后最大化的数组和、134. 加油站、135. 分发糖果

文章目录 Leetcode 1005.K次取反后最大化的数组和Leetcode 134. 加油站Leetcode 135. 分发糖果 Leetcode 1005.K次取反后最大化的数组和 题目链接&#xff1a;Leetcode 1005.K次取反后最大化的数组和 题目描述&#xff1a; 给你一个整数数组 nums 和一个整数 k &#xff0c;按…

根据三维点坐标使用matplotlib绘制路径轨迹

需求&#xff1a;有一些点的三维坐标&#xff08;x&#xff0c;y&#xff0c;z&#xff09;&#xff0c;需要绘制阿基米德螺旋线轨迹图。 points.txt 0.500002, -0.199996, 0.299998 0.500545, -0.199855, 0.299338 0.501112, -0.199688, 0.298704 0.501701, -0.199497, 0.298…

在Linux系统中设置HTTP隧道以实现网络穿透和端口转发

在数字化世界中&#xff0c;网络穿透和端口转发成为了许多开发者和系统管理员必备的技能。而在Linux系统中&#xff0c;通过设置HTTP隧道&#xff0c;我们可以轻松实现这一目标&#xff0c;让我们的服务即便在内网环境中也能被外部世界所访问。 那么&#xff0c;如何在Linux系…

一文搞懂设计模式—观察者模式

本文已收录至Github&#xff0c;推荐阅读 &#x1f449; Java随想录 微信公众号&#xff1a;Java随想录 文章目录 使用场景实现方式Java对观察者模式的支持Guava对观察者模式的支持Spring对观察者模式的支持 优缺点 观察者模式&#xff08;Observer Pattern&#xff09;是一种…

小型洗衣机哪个牌子质量好?小型洗衣机十大排名

清洗内衣内裤这些贴身衣物确实是一件比较头疼的事&#xff0c;有的小伙子由于工作的劳累通常在洗完澡后并不喜欢直接清洗内衣内裤&#xff0c;会存上几天再扔到洗衣机里&#xff0c;这样做是很不可取的&#xff0c;因为穿过的内裤很久不洗就会滋生细菌&#xff0c;另外&#xf…

Java使用Documents4j实现Word转PDF(知识点+案例)

文章目录 前言源码获取一、认识Documents4j二、快速集成2.1、pom.xml依赖2.2、word转PDF实现项目目录WordUtils.javaDemo6.java测试效果 参考文章资料获取 前言 博主介绍&#xff1a;✌目前全网粉丝2W&#xff0c;csdn博客专家、Java领域优质创作者&#xff0c;博客之星、阿里…

Linux-系统资源管理的命令

目录 查看CPU&#xff1a;more /proc/meminfo 查看内存数据&#xff1a;free -m / free -h 查看系统版本&#xff1a;more /etc/issue 查看操作系统的类型&#xff1a;uname -a 查看主机名称&#xff1a;hostname 查看磁盘空间&#xff1a;df -h 查看某个目录空间…

【解决(几乎)任何机器学习问题】:处理分类变量篇(上篇)

这篇文章相当长&#xff0c;您可以添加至收藏夹&#xff0c;以便在后续有空时候悠闲地阅读。 本章因太长所以分为上下篇来上传&#xff0c;请敬请期待 很多⼈在处理分类变量时都会遇到很多困难&#xff0c;因此这值得⽤整整⼀章的篇幅来讨论。在本章中&#xff0c;我将 讲述不同…

快速搞懂时间序列数据平稳检验

在对时间序列数据进行分析预测时&#xff0c;平稳时间序列数据预测效果更好。所以首先要检测数据是否平稳&#xff0c;没有趋势的时间序列数据&#xff0c;我们称为平稳的&#xff0c;即随着时间的推移&#xff0c;表现出恒定的方差&#xff0c;具有恒定的自相关结构。本文介绍…

Linux 虚拟机在线热扩容分区

介绍 本教程是用于Linux虚拟机在调整虚拟硬盘大小后&#xff0c;进行在线不重启热扩容分区大小。 适用于RHEL 7以上的版本及衍生发行版。&#xff08;如Centos、Rocky Linux、Alma Linux等&#xff09; 硬盘分区在线热扩容 刷新硬盘容量 echo 1 > /sys/block/sda/device…