高并发内存池项目

目录

  • 项目简介
  • 什么是内存池
    • 池化技术
    • 内存池
  • 内存池主要解决的问题
  • 定长内存池的设计
  • 高并发内存池的整体框架设计
  • thread cache
    • thread cache的整体设计
    • thread cache哈希桶的对齐规则
    • threadcacheTLS无锁访问
  • central cache
    • centralcache整体设计
    • central cache 结构设计
    • central cache核心实现
  • page cache
    • page 整体设计
    • page cache结构设计
    • page cache 中获取一个非空span
  • 申请内存过程联调
  • threadcache回收内存
  • centralcache回收内存
  • pagecache回收内存
  • 释放内存过程联调
  • 大于256KB的大块内存申请问题
  • 定长内存池引入
  • 释放对象时优化为不传对象大小
  • 多线程环境下对比malloc测试
  • 性能瓶颈分析

项目简介

当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。

我们这个项目是把tcmalloc最核心的框架简化后拿出来,模拟实现出一个自己的高并发内存池,目的就是学习tcamlloc的精华,这个项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等等方面的知识。

什么是内存池

首先我们先来了解一下内存池:

池化技术

所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。

在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。

内存池

内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。

内存池主要解决的问题

内存池主要解决的当然是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。

在这里插入图片描述

如上图所示,这就是内存碎片,我们还需要了解,内存碎片分为内碎片和外碎片,我们上图所示的就为外碎片,外碎片就是就是指一些空闲的连续内存区域空间太小,这些内存空间不连续导致合计的内存足够,但是并不能满足一些内存的申请需求。内碎片是因为一些对齐的需求,导致分配出去的空间中一些内存无法被利用。后序实现过程中我们就会了解到。

malloc

C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的。

而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的一套,linux gcc用的glibc中的ptmalloc。

在这里插入图片描述

定长内存池的设计

申请内存使用的是malloc,malloc其实就是一个通用的大众货,什么场景下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能,下面我们就先来设计一个定长内存池做个开胃菜,当然这个定长内存池在我们后面的高并发内存池中也是有价值的,所以学习他目的有两层,先熟悉一下简单内存池是如何控制的,然后他会作为我们后面内存池的一个基础组件。

定长内存池是针对固定内存空间大小的内存池的申请和释放,因此我们可以将其性能发挥到极致,同样,我们也不需要考虑内存碎片的问题,因为我们申请和释放的都是固定大小的空间。

如何实现定长?

在类中可以有多种方法去实现定长这种操作,比如说非类型模板参数,我们可以使用非类型模板参数使所有内存池的大小为N。

template<size_t N>
class ObjectPool
{//....
};

同样,我们也可以通过传入对象的方法来确定内存池的大小,比如传入的参数为int,该内存池就支持4个字节大小内存的申请和释放。

template<class T>
class ObjectPool
{//....
};

接下来我们就需要向内存申请一块空间,在windows下我们可以使用VirtualAlloc函数,在Linux下,可以调用brk或mmap函数。

在这儿我们还需要考虑平台的问题,如果我们的程序需要再Linux下运行怎么办,所以在这儿最好使用条件编译,我们可以通过条件编译将对应平台下向堆申请内存的函数进行封装,此后我们就不必再关心当前所在平台,当我们需要直接向堆申请内存时直接调用我们封装后的SystemAlloc函数即可。

#ifdef _WIN32#include <Windows.h>
#else#endif//直接去堆上按页申请空间
static inline void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else//Linux下方法
#endifif (ptr == nullptr){throw std::bad_alloc();}return ptr;
}

定长内存池的成员变量

我们需要在堆上申请一段空间,所以我们需要用一个指针来指向这片空间,我们还需要知道我们申请的内存是大小,也需要一个变量来进行记录。为了方便我们在后续的的实现过程中容易对这段空间进行分配,我们可以将指针定义为char类型的,如果我们需要向后移动n个字节,进行加n操作就可以了。

在这里插入图片描述

对于那些已经释放回来的定长内存块我们也需要进行管理,因为接下来还会继续使用它,所以我们可以创建用一个链表将他们连接起来,方便进行管理,这个链表被称为自由链表,我们需要创建一个指针指向这个自由链表,方便可以找到他。

在这里插入图片描述
所以,我们创建的定长内存池中需要包含三个成员变量:

  • _memory - 执行大块内存的指针;
  • _freeLists - 指向自由链表头结点的指针;
  • _remainBytes - 所需申请的大块内存空间大小。

如何对释放的对象进行管理?

对于内存池中已经释放的对象,我们需要将其插入到自由链表中,这个自由链表并不是一个真正意义上的链表,他只是在我们已经分配好的内存块中选取4个(32位机器下)和8个(64位机器下)字节空间来存储后面内存块的起始地址。

在这儿我们就需要考虑如何让选取的这4个或者8个字节的空间分别在对应的机器上被识别出来了,因为如果我们使用int变量的话,不管在32位还是64位机器上,都是4个字节。在这儿我们设置为二级指针即可,二级指针指向的是一级指针的地址,对于指针来说,在32位机器上就是4个字节,在64位机器上就是8个字节。

此时对于我们已经释放的对象,就可以采用头插的方式将其插入我们的自由链表中:

在这里插入图片描述

void*& NextObj(void* ptr)
{return *(void**)ptr;
}

对于释放的对象,我们需要调用其析构函数来释放对象,否则会造成内存泄漏,同时再将释放掉的内存块头插进自由链表中。

void Delete(T* obj)
{//显示调用析构函数进行资源清理obj->T();//头插结点NextObj(obj) = _freeLists;_freeLists = obj;
}

如何申请对象?

首先,我们需要查看自由链表表中是否存在未使用的内存块,如果有,我们就使用它,方法就是从自由链表中头删一个节点下来,返回即可;

在这里插入图片描述

如果此时自由链表中没有的话,就需要去堆上重新申请一个大内存块下来,然后对其进行切分,在这儿我们需要注意要记录_memory指针指向的位置和申请的内存块的 _remainBytes的大小。

在这里插入图片描述

在这里我们需要注意的是,我们所切分的内存块的大小必须满足能够存储一个指针大小的空间,因为在这块内存中存储一个起始的地址。

而且,当我们被切割剩下的的内存块不足以分配一个对象时,就需要我们重新向堆上申请一个大内存块,操作跟上面一样。

代码如下:

T* New()
{T* obj = nullptr;//优先使用自由链表中还回来的内存块if (_freeLists){void* next = NextObj(_freeLists);obj = (T*)_freeLists;_freeLists = next;}else{//剩余内存空间大小不够一个sizeof(T)时,就开辟空间if (_remainBytes < sizeof(T)){_remainBytes = 128 * 1024;_memory = (char*)SystemAlloc(_remainBytes >> 13);if (_memory == nullptr){throw std::bad_alloc();}}//否则就直接进行切分obj = (T*)_memory;size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);_memory += objSize;_remainBytes -= objSize;}//定位new表达式初始化objnew(obj)T;return obj;
}

特别注意的是,空间此时已经申请出来了,但是我们并没有初始化,此时就可以使用定位new表达式在类内进行初始化。

性能测试

此时我们定长内存池已经实现完成了,整体代码如下:

#pragma once#include <iostream>
#include <vector>
using std::cout;
using std::endl;#ifdef _WIN32#include <windows.h>
#else#endif//直接去堆上按页申请空间
static inline void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else//Linux下方法
#endifif (ptr == nullptr){throw std::bad_alloc();}return ptr;
}//template<size_t N>
template<class T>
class ObjectPool
{
public:T* New(){T* obj = nullptr;//优先使用自由链表中还回来的内存块if (_freeLists){void* next = NextObj(_freeLists);obj = (T*)_freeLists;_freeLists = next;}else{//剩余内存空间大小不够一个sizeof(T)时,就开辟空间if (_remainBytes < sizeof(T)){_remainBytes = 128 * 1024;_memory = (char*)SystemAlloc(_remainBytes);if (_memory == nullptr){throw std::bad_alloc();}}//否则就直接进行切分obj = (T*)_memory;size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);_memory += objSize;_remainBytes -= objSize;}//定位new表达式初始化objnew(obj)T;return obj;}void*& NextObj(void* ptr){return *(void**)ptr;}void Delete(T* obj){//显示调用析构函数进行资源清理obj->~T();//头插结点NextObj(obj) = _freeLists;_freeLists = obj;}
private:char* _memory = nullptr; //指向大内存块的指针size_t _remainBytes = 0; //内存块的大小void* _freeLists = nullptr;// 自由链表的头节点
};

接下来将其与库中的malloc进行对比,看看谁的效率更高,测试代码如下:

struct TreeNode
{int _val;TreeNode* _left;TreeNode* _right;TreeNode():_val(0), _left(nullptr), _right(nullptr){}
};void TestObjectPool()
{// 申请释放的轮次const size_t Rounds = 1000000;// 每轮申请释放多少次const size_t N = 5;std::vector<TreeNode*> v1;v1.reserve(N);size_t begin1 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v1.push_back(new TreeNode);}for (int i = 0; i < N; ++i){delete v1[i];}v1.clear();}size_t end1 = clock();std::vector<TreeNode*> v2;v2.reserve(N);ObjectPool<TreeNode> TNPool;size_t begin2 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v2.push_back(TNPool.New());}for (int i = 0; i < N; ++i){TNPool.Delete(v2[i]);}v2.clear();}size_t end2 = clock();cout << "new cost time:" << end1 - begin1 << endl;cout << "object pool cost time:" << end2 - begin2 << endl;
}

在这里插入图片描述
我们可以发现,定长内存池的整体效率是高于我们的new(底层调用malloc实现)的,因为我们的定长内存池就是针对申请定长对象而设计的,所以在某些特殊场景下的效率更高。

高并发内存池的整体框架设计

高并发内存池解决的问题

现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。

  1. 性能问题。
  2. 多线程环境下,锁竞争问题。
  3. 内存碎片问题。

高并发内存池的整体框架

concurrent memory pool主要由以下3个部分构成:

  • thread cache:线程缓存是每个线程独立拥有,用于小于256kb内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也是他高效的最主要原因;
  • central cache:中心缓存时所有线程共享,thread cache按需从中心缓存中获取对象,central cache会在合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,其他线程吃紧,达到内存分配在多个线程中更均衡的调度的目的,central cache作为所有线程共享,就需要进行加锁操作,确保线程安全问题,这儿我们使用的是桶锁。同时,只有thread cache中没有可以分配的内存时,才会去central cache中进行申请,这儿的竞争也不是那么的激烈;
  • page cache:页缓存是在central cache上面的一层缓存,存储的缓存是以页为单位进行存储和分布的,当central cache中没有内存时,page cache会分配一定数量的page,切割成定长大小的小块内存,分配给central cache,当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。

在这里插入图片描述

注意:

  • thread cache在这儿主要解决了锁竞争的问题,因为每个线程都有相对应的一个thread cache,需要内存就去相对应的一个thread cache中进行申请就可以了,完全不需要加锁解锁,提高了效率;
  • central cache称为中心缓存,当thread cache中存在大量不用的内存时,我们会将其返回给central cache以供其他的线程进行申请,而当thread cache中没有没有内存时,central cache会给他分配内存,而且由于我们使用的是桶锁,不同大小的内存会在不同的桶中去申请,锁的竞争也大大的减少了。
  • page cache就主要负责以页为单位的大内存,如果central cache中没有内存了,就会去 page cache中申请,如果page cache中也没有内存了,就回去堆中按页进行申请,同时,还会帮我们解决内存碎片的问题,后面我们就会介绍到。

thread cache

thread cache的整体设计

thread cache整体就是一个哈希桶结构,每个桶中存放的就是一个自由链表,我们前面实现的定长内存池是申请一个固定大小的内存,所以只需要一个自由链表,对于我们的thread cache来说,每个桶的位置都会对应一个自由链表。

但是如果我们对每个字节都映射一个桶的话,256KB数据此时就需要20多万个桶,很显然太多了,所以我们此时让他以某种对齐规则进行向上对齐,比如我们以8字节数据为例,那么1 ~ 8字节对应申请8字节内存块,9 ~ 16字节就申请16字节内存,以此内推下去:

在这里插入图片描述

当线程需要申请某一大小的内存时,就会经过相应的计算找到对应桶的位置,然后去看自由链表中有没有空内存块,如果有,就从自由链表中头删一个节点下来,将该内存块分配给该线程,如果自由链表中没有空内存块,就会去central cache中获取。

在此过程就会出现我需要5字节内存大小,但是此时就直接会给你一个8字节内存大小的内存块,多出来的3字节就浪费了,无法被利用,这就是内碎片,他是因为某些对齐的原因而无法得到利用。

我们可以对我们的自由链表相应的调用接口进行封装,首先提供一个头插(Push) 和 头删(Pop)函数:

static void*& NextObj(void* obj)
{return *(void**)obj;
}//自由链表:管理切分好的小对象
class FreeList
{
pu//头插void Push(void* obj){NextObj(obj) = _freeList;_freeList = obj;}//头删void* Pop(){assert(_freeList);void* obj = _freeList;_freeList = NextObj(obj);return obj;}
private:void* _freeList = nullptr; // 头结点指针
};

最终我们就可以总结出来,自由链表的个数对应的就是桶的个数,而桶的个数是由相应的对齐规则所决定的,thread cache就相当于一个数组一样。

thread cache哈希桶的对齐规则

我们需要保证每一个内存块最少可以存储一个指针大小,即4字节(32位机器下)和8字节(64位机器下),所以我们可以以8字节为对齐规则,但是如果全部以8字节为对齐规则,此时依然会有32768个桶,桶的数量依然是很多的,所以我们就可以让不同范围的字节数按照不同的对齐规则进行对齐,规则如下:

字节数范围对齐数哈希桶下标
[1, 128]8[0, 16)
[128 + 1, 1024]16[16, 72 )
[1024 + 1, 8 * 1024]128[72, 128)
[8 * 1024 + 1, 64 * 1024]1024[128, 184)
[64 * 1024 + 1, 256 * 1024]8 * 1024[184, 256)

通过以上的对齐规则,我们就可以发现,除去[1, 128]区间内的字节数,在其余区间内我们均可以将空间的浪费率控制在10%左右,比如在[128, 1024]区间内,最大的浪费字节数为15,此时的浪费率就为:15 / 144 * 100% ≈ 10.42%, 在[1025, 8 * 1024]区间内,最大浪费字节数为127,此时浪费率就为:127 / 1152 * 100% ≈ 11.01%,以此类推…,都控制在10%左右。

对齐映射规则的制定

接下来我们就可以提供一个用来对齐映射规则的类:

类中需要包含两个函数,一个函数对应的是求对齐以后的字节数的字节数,另一个函数为对应的哈希桶的下标,后序需要频繁的在多个文件中调用到这两个函数,所以我们可以将这两个函数设置为静态函数并且用inline进行修饰。

//计算对象大小的对齐规则
class SizeClass
{
public://获取向上对齐后的字节数static inline size_t RoundUp(size_t size);//获取对齐后对应的哈希桶的下标static inline size_t Index(size_t bytes);
};

首先我们获取向上对齐后的字节数,我们可以通过一个子函数来获取:

//获取向上对齐后的字节数
static inline size_t RoundUp(size_t size)
{if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8 * 1024){return _RoundUp(size, 128);}else if (size <= 64 * 1024){return _RoundUp(size, 1024);}else if (size <= 256 * 1024){return _RoundUp(size, 8 * 1024);}else{assert(false);return -1;}
}

对于计算对齐后的字节数,我们首先可以想到的一种方法就是下面这种:

static inline size_t _RoundUp(size_t size, size_t alignNum)
{size_t alignSize = 0;if (size % alignNum != 0){alignSize = (size / alignNum + 1) * alignNum;}else{alignSize = size;}return alignSize;
}

我们还可以通过位运算来进行计算,这儿也推荐使用位运算来进行操作,因为位运算的效率始终是高于正常的加减乘除的。

	static inline size_t _RoundUp(size_t size, size_t alignNum){return (size + alignNum - 1) & ~(alignNum - 1);}

我们举例说明一下大家就可以理解这种方式了,我们假设size的大小为8字节,此时对应的alignNum就为8,按上面的计算方式转化为二进制就如下图所示:

在这里插入图片描述
此时我们的向上对齐数就是最终取值就为2 ^ 3 = 8。

我们在意size为12为例,此时对应的alignNum就为16,转换为上面的二进制关系就如下所示:

在这里插入图片描述

此时向上对齐数的取值就为2 ^ 4 = 16。

接下来我们就需要获取对齐以后对应的哈希桶的下标了,首先我们应该判断字节数属于哪一区间,然后调用子函数继续进行处理。

//获取对齐后对应的哈希桶的下标//获取对齐后对应的哈希桶的下标static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128){return _Index(bytes, 3);}else if (bytes <= 1024){return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024){return _Index(bytes - 1024, 7) + group_array[0] + group_array[1];}else if (bytes <= 64 * 1024){return _Index(bytes - 8 * 1024, 10) + group_array[0] + group_array[1] + group_array[2];}else if (bytes <= 256 * 1024){return _Index(bytes - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];}else{assert(false);}return -1;}

同样,对于子函数我们最容易想到的也是下面这种写法:

static inline size_t _Index(size_t bytes, size_t alignNum)
{if (bytes % alignNum == 0){return bytes / alignNum - 1;}else{return bytes / alignNum;}
}

但是我们会更倾向于位运算的方法,效率更高:


static inline size_t _Index(size_t bytes, size_t align_shift)
{return (bytes + (1 << align_shift - 1) >> align_shift) - 1;
}

我们以10字节数据按8字节对齐为例,此时align_shift就是3,如下图所示:

在这里插入图片描述
最后-1是因为下标是从0开始的,所以需要-1。

ThreadCache类的实现

经过最终的计算,我们可以得出自由链表的数量为208个,也就是对应哈希桶的数量,而我们的ThreadCache允许申请的最大内存为256KB。

//ThreadCache可以申请最大的内存块
static const size_t MAX_BYTES = 256 * 1024;//自由链表的数量,即哈希桶的数量
static const size_t NFREELIST = 208;

接下来对Thread Cache类进行定义,我们需要定义一个存储208个自由链表的数组,目前只需要提供一个可以申请对象的Allocate函数即可,后续再进行增加:

#include "Common.h"class ThreadCache
{//从ThreadCache中申请对象void* Allocate(size_t size);private://存储自由链表的数组,即哈希桶FreeList _freeList[NFREELIST];
};

通过ThreadCache申请对象时,我们需要根据对应的size大小求出对应哈希桶的位置,如果对应的哈希桶中的自由链表有空闲的内存块,就从自由链表上面弹出一个,如果没有,就需要去Central Cache中去申请,我们在这儿先提供一个FetchFromCentralCache函数,后面在实现。

void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);//获取对齐数size_t alignSize = SizeClass::RoundUp(size);//获取对应哈希桶下标size_t Index = SizeClass::Index(size);if (!_freeList[Index].empty()){//如果对应哈希桶不为空,直接弹出一个内存块return _freeList[Index].Pop();}else{//如果对应哈希桶为空,直接去Central Cache中获取空间return FetchFromCentralCache(alignSize, Index);}
}

threadcacheTLS无锁访问

每一个线程都应该有自己独立的Thread Cache,因为我们创建Thread Cache的目的就是要进行无锁访问,
所以该Thread Cache不可以创建为全局的,全局变量所有的线程均可访问,就需要频繁地进行加锁解锁,增加了代码的复杂性和拖慢了效率。

而我们需要实现每个线程独立访问自己的Thread Cache,就需要使用线程局部存储TLS(Thread Local Storage),这是一种变量的存储方法,使用该方法可以保证它所在的线程是可以被全局访问的,但是不能被其他线程所访问到,这样就保证了数据线程的独立性。

//TLS无锁访问
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

每个线程被创建了以后并不会立即就拥有自己的Thread Cache,而是需要通过调用相关的内存申请接口才会去获取到自己的Thread Cache。

//通过TLS无锁访问获取属于自己的Thread Cache
static void* ConcurrentAlloc(size_t size)
{if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}//cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);
}

central cache

centralcache整体设计

当某一个线程申请内存时,如果对应的thread cache中对应的自由链表不为空,就直接从自由链表中弹出一个内存块下来,如果thread cache中对应的自由链表为空,我们就需要去central cache中获取内存。

central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。这样就方便如果thread cache中没有内存了,就可以直接去central cache中对应的哈希桶中去获取,不需要在遍历一遍数组,加快了效率。

central cache与thread cache的不同之处

  • thread cache作为每个线程独享,并不需要进行引入互斥锁,而central cache是所有线程共享的,获取的过程中必须加锁解锁,否则就会引发线程安全的问题;
  • 但是central cache的加锁并不是给整个central cache加锁,而是加桶锁,针对于每个桶进行加锁,只有当不同线程去访问同一个桶时,才会出现锁竞争的情况,而不同的线程访问不同的锁,是不会出现锁竞争的情况的;
  • central cache与thread cache还有一个不同点就在于thread cache的哈希桶中链接的是一个个的内存块,而central cache的哈希桶中链接的是一个个的span,每个span管理的都是以页为单位的大块内存,每个桶里的若干个span以双链表的方式连接起来,而每个span下面又会链接一个自由链表用于保存一块块的切分好的内存,根据其所在的哈希桶位置切分成对应的大小。

在这里插入图片描述

central cache 结构设计

页号类型

在32位机器下,进程地址空间的大小为2 ^ 32,在64位机器下,进程地址空间的大小为2 ^ 64,一般页的大小保持在4K或者是8K,我们以8K为例,如果是在32位机器下,页的大小就为2 ^ 32 / 2 ^ 13 = 2 ^ 19,在64位机器下,页的大小就为2 ^ 64 / 2 ^ 13 = 2 ^ 51,所以并不能单纯的将页号设置为long long类型,在此我们需要用到条件编译,在32位机器下就是size_t类型,在64位机器下就是long long 类型:

#ifdef _WIN64typedef long long PAGE_ID
#elif _WIN32typedef size_t PAGE_ID;
#else// linux
#endif

在32位下,_WIN32有定义,_WIN64没有定义;而在64位下,_WIN32和_WIN64都有定义。因此在条件编译时,我们应该先判断_WIN64是否有定义,再判断_WIN32是否有定义。

span结构

span是一个管理以页为单位的大块内存的双链表结构,所以其对应的参数设置应该如下:

// 管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0; //存储大块内存的页号size_t _n = 0; // 页的数量Span* _prev = nullptr;Span* _next = nullptr;size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数void* _freeList = nullptr; // 每个Span对应的自由链表
};

参数说明

  • _pageId:因为Span是对是以页为单位管理大块内存的,为了在后续过程将Span还给page cache是方便进行合并,所以在这儿我们对页号进行记录;
  • _n:每个Span管理的页数是不固定的,所以我们也需要对对应的页数进行记录;
  • _prev,_next:Span作为一个双向链表结构,必须存在头尾指针;
  • _useCount:当Span每分配出去一个大块内存时,_useCount就进行++操作,当thread cache还回来一个内存块时,_useCount就进行- -操作,当_useCount为0时,表示所有的内存都还回来了,此时就可以将该Span还给page cache了;
  • _freeList:每个Span都会对应一个自由链表,将管理的大块内存链接起来。

SpanList的实现

接下来我们就需要实现一个双链表结构,在这儿我们先提供一个插入和删除函数即可:

//双链表结构
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);//不能删除哨兵位头结点assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}
private:Span* _head;
};

central cache的结构

central cache与thread cache对应的结构是一样的,桶的数量都是208个,但是central cache每个桶中双链表结构将一个个Span链接起来。

class ThreadCache
{
public://......
private:SpanList _spanList[NFREELIST];
};

central cache核心实现

central cache核心代码实现

central cache在整个进程中只有一个,所以这儿我们可以将其设置为单例模式,整个过程指初始化一次,单例模式分为饿汉模式和懒汉模式,在这儿我们就以饿汉模式来实现:


class CentralCache
{
public://提供一个全局访问点访问对象static CentralCache* GetInstance(){return &_Inst;}
private:SpanList _spanList[NFREELIST];
private://构造函数设置为私有CentralCache(){}//拷贝构造和赋值运算符重载删除,防止调用CentralCache(const CentralCache&) = delete;CentralCache& operator=(const CentralCache&) = delete;//提供一个静态的对象static CentralCache _Inst;
};

central cache中还需要提供一个单例对象,而且在程序一开始就创建一个单例对象出来。

CentralCache CentralCache::_Inst;

慢开始反馈调节算法

thread cache每次向central cache获取内存时,我们应该分配多少给他呢?看起来好像分配多少都不合适,如果一次分配的太少了,就会频繁的来进行获取,如果一次分配的太多了,可能大部分用不到,造成空间的浪费。

所以在这儿我们提供了一个慢反馈调节算法,当thread cache向central cache获取内存时,对于小对象我们就可以分给他多一点,对于大对象我们就可以分给他少一点。

我们可以根据具体对象的大小,计算出所给对象的个数,但是又不能太多,所以我们将thread cache可以获得的对象控制在[2,512]之间,并且提供一下函数进行控制。

// 一次从central cache中获取对少个对象
size_t NumMoveSize(size_t size)
{assert(size > 0);int num = MAX_BYTES / size;if (num < 2)num = 2;if (num > 512)num = 512;return num;
}

但是对于小对象来说,一次分配512个也还是太多了,所以我们又可以在_freeList中设置一个_maxSize成员变量,初始值设置为1,提供一个公有成员函数来获取这个变量,这样每一个自由链表都会对应一个自己的_maxSize。

//自由链表:管理切分好的小对象
class FreeList
{
public:size_t& MaxSize(){return _maxSize;}
private:void* _freeList = nullptr; // 头结点指针size_t _maxSize = 1;
};

当每次thread cache向central cache获取对象时,我们会取_maxSize与计算出来的对象的个数的最小值,如果此次采用的是_maxSize的值,我们会将thread cache中该自由链表中_maxSize的值加1。

所以第一次进行申请时申请到的都是1个,但是在后序的申请过程中,每次_maxSize的值都会加1,直到增长到超出计算的值以后就不会在增长了,此后申请到的对象个数就是计算出的个数。

从中心缓存获取对象

我们通过慢反馈调节算法向central cache获取对象以后,接下来就需要线程分配,如果某个线程此时申请的就是一个对象,我们直接返回即可,因为thread cache在central cache中获取的原因就是因为thread cache中此时并没有空闲的对象,所以当只申请下来一个的时候就直接分配给该线程使用了,如果一次申请多个对象的情况,我们就需要将一个对象分配给线程,再将剩下的对象链接到自由链表上面,方便后序的分配。

void* ThreadCache::FetchFromCentralCache(size_t size, size_t Index)
{// 慢开始反馈调节算法// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完// 2、如果你不断有这个size大小内存需求,那么batchNum就会不断增长,直到上限// 3、size越大,一次向central cache要的batchNum就越小// 4、size越小,一次向central cache要的batchNum就越大size_t bathNum = std::min(_freeList[Index].MaxSize(), SizeClass::NumMoveSize(size));if (_freeList[Index].MaxSize() == bathNum){_freeList[Index].MaxSize() += 1;}void* start = nullptr;void* end = nullptr;// 去centralcache中获取实际数量的内存块size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, bathNum, size);assert(actualNum >= 1);//如果大小为1,直接就将内存块分配出去if (actualNum == 1){assert(start == end);return start;}//如果不为1,需要先分配出去一个,再将剩余的挂在自由链表上else{_freeList[Index].PushRange(NextObj(start), end);return start;}
}

从中心缓存获取一定数量的对象

此时我们需要去central cache中获取一定数量的对象,需要找到对应的哈希桶结构,然后去span中获取内存块,因为我们需要的是一段内存块,所以我们知道其开始和结束为止即可。

// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{//计算对象大小所对应哈希桶的下标size_t index = SizeClass::Index(size);//加桶锁_spanLists[index]._mtx.lock();// 获取一个非空的spanSpan* span = CentralCache::GetInstance()->GetOneSpan(_spanLists[index], size);assert(span);assert(span->_freeList); //span对应的自由链表不为空// 开始获取对应数量的内存块// 从span中获取batchNum个对象// 如果不够batchNum个,有多少拿多少start = span->_freeList;end = start;size_t i = 0;size_t actualNum = 1;// 统计实际分配的内存块个数while (i < batchNum - 1 && NextObj(end) != nullptr){end = NextObj(end);i++;actualNum++;}// 将剩余的自由链表链接到spanspan上面span->_freeList = NextObj(end);// 取出来的那一段end位置指向nullptrNextObj(end) = nullptr;//解锁_spanLists[index]._mtx.unlock();return actualNum;
}
  • 当我们找到对应的哈希桶位置的时候,我们需要获取的是一个非空的span,因为空的span中并没有链接任何东西,我们需要去一个非空的span中进行获取,即使这个span中并没有足够数量的内存块,我们依旧会获取,有多少我们就获取多少;
  • 由于central cache是所有线程共享的,所以在申请对象的过程中我们需要进行加锁操作,当申请对象完成以后,我们又需要进行结果,这样就避免了线程安全问题;
  • 所以最终我们申请到的对象数量可能会与我们计算出来的数量不一样,因此我们需要统计我们实际申请的内存块的数量,而且我们也并不需要担心我们申请到的内存块的数量比计算的少,因为thread cache本质上是想申请一个内存块分配给线程的,我们一次分配多个只是为了将剩余的对象链接到thread cache对应的自由链表中,后续进行分配的时候更加方便。

申请到的一段对象的插入问题

我们此时申请到的是一段对象,分配一个出去以后要将剩余的对象插入到自由链表中,所以我们需要在FreeList中提供一个插入一段内存块的函数。

//自由链表:管理切分好的小对象
class FreeList
{
public:// 插入一段对象void PushRange(void* start, void* end){NextObj(end) = _freeList;_freeList = start;}
private:void* _freeList = nullptr; // 头结点指针
};

page cache

page 整体设计

page cache和central cache一样,对应的也是哈希桶结构,而且每个哈希桶中中的一个个的span也是以双链表的形式链接起来的。

page cache和central cache不同之处就在于thread cache和central cache的映射规则是相同的,但是page cache哈希桶的映射规则就是直接定址法,例如1号桶映射的就是1页span,2号桶映射的就是2页span,以此类推。

还有一个不同之处就在于page cache中的span并没有被切分,我们知道central cache中每个桶的span都被切分为了一个一个对应大小的对象,以供thread cache进行获取,而page cache是对central cache服务的,central cache向page cache获取的就是一整个固定页数的span,并不需要被切分,如何切分是central cache自己来决定的。

在这里插入图片描述

在这里我们选择最大悬挂128页的桶,我们需要使对应的桶号跟页数对应起来,所以将哈希桶的个数设置为129。

为什么选择悬挂128页的桶呢?

因为我们将页的大小设置为8K,128页对应就是1024K,可以被切分为4个256KB的对象,因此是足够的,同样,如果我们需要悬挂更大的span也是可以的。

//page cache 中哈希桶的个数
static const size_t NPAGES = 129;

如何向page cache申请内存

  • 当central cache中没有空闲的span时,此时central cache就会向page cache申请一个span,假设此时central cache此时向page cache申请一个n页的span;
  • 如果page cache中第n个桶中没有空闲的span,此时我们并不是立即向堆申请一个n页的span,而是在第n + 1个桶中去寻找,如果第n + 1个桶中有空闲的span,我们会将第n + 1页切分为一个n页的span和一个1页的span,将切分出来的n页span分配给central cache,而剩下的1页的span挂在对应1页哈希桶的后面,以此类推;
  • 上面我们不直接在堆上申请内存的原因就在于我们应该尽量一次申请一块大一点儿的内存,因为page cache最后还需要将空闲的span进行合并,所以我们需要保持空间的连续性才可以,如果频繁的向堆申请小块内存,会造成大量的内存空间上不连续。
  • 只有当走到128页时page cache中依然没有空闲的span,此时就会去堆上申请一个128页的内存块下来,然后将其切分为一个n页的span和一个128-n页的span,将这个n页span分配给central cache,这个128-n页的span链接在对应哈希桶的后面;
  • 其实我们每次向堆申请的都是128页的内存块,central cache中的这些span都是由128页的span切分出来的。

page cache结构设计

page cache实现方式

  • page cache作为所有线程共享的结构,所以在这儿我们也可以将page cache设置为单例模式;
  • page cache作为所有线程共享,就需要加锁,我们实现central cache过程中加的为桶锁,但是在实现page cache时就不是桶锁了,这儿需要一个全局的大锁将整个哈希桶给锁住,因为thread cache向central cache申请内存块时,他们两个映射规则相同,直接到对应的哈希桶中去获取就可以,所以我们只需要一个桶锁就可以完成,而central cache向page cache申请span时,对应的桶里面如果没有就会去下一个桶中进行寻找,操作的是整个结构,而且后续page cache还需要对span进行合并,也是整个结构的操作,如果使用桶锁就会频繁的加锁和解锁,极大的影响了效率,所以在这儿我们使用的是一把大锁。
class PageCache
{
public:static PageCache* GetInstance(){return &_sInst;}//获取一个k页大小的spanSpan* NewSpan(size_t k);
public://提供一把大锁std::mutex _pageMtx;
private:SpanList _spanList[NPAGES];private://构造函数设置为私有PageCache(){}//拷贝构造设置为删除PageCache(const PageCache&) = delete;//提供一个静态的对象static  PageCache _sInst;
};

然后程序运行起来就立即创建一个单例对象。

PageCache PageCache::_sInst;

page cache 中获取一个非空span

thread cache向central cache中申请内存时,需要去central cache中对应的哈希桶中的span里寻找,本质上就是遍历这个双链表结构,找到空闲的span并且不为空然后给thread cache分配对象,我们可以构造一个类似于迭代器的方法,在SpanList中提供一个begin()和end()函数然后进行遍历。

//双链表结构
class SpanList
{
public:Span* begin(){return _head->_next;}Span* end(){return _head;}
private:Span* _head;
public:std::mutex _mtx;
};

如果遍历完整个双链表,依然没有找到空闲或者不为空的span,此时central cache就需要向page cache申请span了。

具体申请多大的内存块是由我们实际的size所决定的,前面我们已经计算出了thread cache向central cache申请的内存块的数量,我们可以通过该数量和实际size的大小来决定central cache要向page cache申请几页的span。

首先我们应该计算出thread cache一次向central cache申请内存块的上线数量,然后通过该数量和单个对象的大小计算出整体的字节数,再将字节数转换为页数,如果转化出来的页数小于1页就给他分配一页,大于1页是几页就分配几页。

//计算从page cache中获取页的大小
static size_t NumMovePage(size_t size)
{assert(size > 0);//计算thread cache向central cache 申请内存块上限数量size_t num = NumMoveSize(size);//num个size大小对象所占的字节数size_t nPage = num * size;//转换为页数nPage >>= PAGE_SHIFT;if (nPage == 0){nPage = 1;}return nPage;
}

其中PAGE_SHIFT为页偏移大小转换,一页大小为8KB。

//页大小转换偏移,1页为8KB
static const size_t PAGE_SHIFT = 13;

当central cache申请到一个n页span以后,还需要将span切分为一个个对应大小的对象,然后挂到该span对应的自由链表中。

所以我们就需要找到对应span的起始地址,当我们申请一个span对象成功以后,此时我们是知道对应的页号的,第一页的地址为0,那么第n页的地址就也能够该是该页号乘以一页的大小即可,而起始地址加上该页的大小就是对应的结束位置。

知道起始地址和结束地址以后,我们就可以对大块内存进行切分,切分为一个个的小内存块将他链接到该spam的自由链表上。

注意,在这而我们采用的是尾差是方式,因为对应的小对象是从一个大的对象切分下来的,从头开始切分,如果我们进行尾插的话,切分以后自由链表中的每个内存块看着也是连续的,这时当我们把这些连续内存分配给某个线程使用时,可以提高该线程的CPU缓存利用率。

// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{//遍历双链表,寻找有没有空闲并且不为空的spanSpan* it = list.begin();while (it != list.end()){if (it != nullptr){return it; }else{it = it->_next;}}//遍历完也没有找到,此时就需要去page cache中申请Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));// 找到该span的起始地址char* start = (char*)(span->_pageId << PAGE_SHIFT);// 对应的字节数size_t bytes = span->_n << PAGE_SHIFT;// 结束位置char* end = start + bytes;//此时就可以进行切分//可以先切分一块下来当做头,方便尾插span->_freeList = start;start = start + size;void* tail = span->_freeList;int i = 1;//尾插while (start < end){i++;NextObj(tail) = start;tail = NextObj(tail);start += size;}//最后一个位置指向空NextObj(end) = nullptr;//将该span头插在双链表中list.PushFront(span);return span;
}

我们还要在SpanList中实现一个头插函数,直接复用Insert函数即可:

//双链表结构
class SpanList
{
public://头插void PushFront(Span* span){Insert(begin(), span);}Span* begin(){return _head->_next;}
private:Span* _head;
public:std::mutex _mtx;
};

获取一个K页的span

当遍历完central cache中某个哈希桶的双链表都没有找到一个空闲的span时,就在page cache中去申请,假设我们此时需要申请一个k页的span。

我们直接去page cache中k页中遍历双链表寻找有没有空闲的span,如果有,我们就可以直接将这个span弹出并返回,所以我们还需要再_spanLists类中Empty函数和PopFront函数,用于获得对应的span。

//双链表结构
class SpanList
{
public:bool Empty(){return _head->_next == _head;}void* PopFront(){Span* front = _head->_next;Erase(front);return front;}
private:Span* _head;
public:std::mutex _mtx;
};

如果第k页找不到空闲的span,就去第k 页往后进行寻找,只要后面任意一个桶有k页span,我们就可以对它进行切分,假设是第n个桶,此时就可以切分为一个k页的span和一个n - k页的span,将k页span分配给central cache,n - k页的span链接到对应的哈希桶后面。

如果遍历128页以后依然没有空闲span,就需要向堆申请一个128页的内存块,然后切分成一个k页和128 - k页的span,将k页span分配给central cache,128 - k页的span链接到对应的哈希桶后面。

Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);// 如果第k页不为空,弹出一个头部span给central cacheif (!_spanList[k].Empty()){return _spanList[k].PopFront();}// 如果为空就继续向后寻找for (int i = k + 1; k < NPAGES; i++){//如果找到一个不为空的,就进行切分if (!_spanList[i].Empty()){// 在span头部切一个k页span下来// 将这个k页span返回// 将剩下的n - k 页span挂在对应哈希桶上面Span* nSpan = _spanList[i].PopFront();Span* kSpan = new Span;kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;_spanList[nSpan->_n].PushFront(nSpan);return kSpan;}}// 走到这里,证明page cache中并没有span,需要向堆申请Span* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr << PAGE_SHIFT;bigSpan->_n = NPAGES - 1;//将申请下来的span挂在page cache上_spanList[bigSpan->_n].PushFront(bigSpan);//复用递归进行调用return NewSpan(k);
}

当page cache中没有span时,我们向堆申请一个大内存块,然后将其挂在128页的哈希桶后面,随后开始切分,此时我们并不需要重新编写代码,只需要复用前面切分的步骤,然后递归调用即可,防止了代码冗余。

page cache 锁的设置

我们前面已经介绍到page cache是加了一把大锁,那么我们应该怎么控制加锁的顺序呢?

当我们遍历central cache对应的哈希桶的双链表时,没有找到空闲的span,此时我们就需要去page cache中获取,在这儿我们要做的就是先将central cache对应哈希桶的桶锁给解掉,因为尽管此时central cache中这个桶里啥也没有,但是thread cache是会将空闲的内存块还给central cache的,如果我们此时没有解开这个桶锁,thread cache还内存块的过程就会阻塞进行等待。

所以我们在向page cache申请内存块之前,需要先将桶锁解掉,然后再page cache中加一把大锁,在申请到k页span以后,我们并不需要立即再加上这个桶锁,因为我们还需要对这个申请来的span进行切分,切分完毕以后才会将其挂在对应的桶上去,所以在切分完毕以后再去申请桶锁。

// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{//遍历双链表,寻找有没有空闲并且不为空的spanSpan* it = list.begin();while (it != list.end()){if (it != nullptr){return it; }else{it = it->_next;}}// 此时central cache中并没有找到span,解掉桶锁,去page cache中寻找list._mtx.unlock();//给page cache加上一把大锁PageCache::GetInstance()->_pageMtx.lock();//遍历完也没有找到,此时就需要去page cache中申请Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));// 申请完毕以后解锁PageCache::GetInstance()->_pageMtx.unlock();// 找到该span的起始地址char* start = (char*)(span->_pageId << PAGE_SHIFT);// 对应的字节数size_t bytes = span->_n << PAGE_SHIFT;// 结束位置char* end = start + bytes;//此时就可以进行切分//可以先切分一块下来当做头,方便尾插span->_freeList = start;start = start + size;void* tail = span->_freeList;int i = 1;//尾插while (start < end){i++;NextObj(tail) = start;tail = NextObj(tail);start += size;}//最后一个位置指向空NextObj(end) = nullptr;//切分完毕,头插切好的span以前在加桶锁list._mtx.lock();//将该span头插在双链表中list.PushFront(span);return span;
}

申请内存过程联调

在这儿我们注意一点,thread cache需要通过TLS调用自己的专属thread cache对象,我们需要有一个windows.h的头文件,由于在windows.h中也存在一个min的宏,此时会与我们algorithm库中的min函数发生冲突,所以此时我们直接用windows.h中的min即可。

我们先单线程场景下调试我们的程序,因为多线程场景下过于复杂,接下来以申请三次内存为例:

void TestConcurrentAlloc1()
{void* p1 = ConcurrentAlloc(6);void* p2 = ConcurrentAlloc(8);void* p3 = ConcurrentAlloc(1);cout << p1 << endl;cout << p2 << endl;cout << p3 << endl;
}

当线程第一次申请内存时,会直接通过TLS调用自己的专属thread cache对象,然后通过这个对象进行内存的申请。

在这里插入图片描述

接下来就会调用Allocate函数去thread cache中申请内存,此时就会获得对齐数为8,thread cache中对应的哈希桶的下标为0。

在这里插入图片描述

由于第一次申请内存,此时thread cache 0 下标哈希桶位置什么也没有,所以就会去central cache中申请内存,我们此时就需要计算出我们需要的内存块的数量,然后去central cache中申请,对应8字节数据所对应的上限数量为512个,但是在这儿我们使用的是慢反馈调节算法,需要会挑选_maxSize与上限数量的最小值,所以bathNum得值为1.

然后_maxSize的值会进行自增,下一次向central cache申请的内存块数量就会变为2个。

在这里插入图片描述

但是central cache中对应哈希桶里的span可能并不一定存在足够数量的内存块,所以我们需要获取到实际我们所能够申请到数量,此时依然去central cache中对应的哈希桶的位置寻找,所以index依然为0。

我们需要获取thread cache 0号桶中一个非空的span,在此之前需要对0号桶进行加锁操作,表示有线程正在进行访问。

在这里插入图片描述

在central cache 0 号桶找一个非空的span,其实就是遍历双链表结构,从这个双链表中获取一个非空的span。

在这里插入图片描述

遍历完成以后,会发现并没有找到,接下来就会去page cache中申请一个对应页数的span,但是在此之前还需要将0号桶的桶锁解掉,因为我们需要考虑thread cache还对象的问题,然后给page cache加上一把大锁,之后开始向page cache申请内存。

在这里插入图片描述

由于我们申请的为8字节的数据,所以对应的num个数应该为512个,对应页的字节数4096个,不够一页,就直接给他分配一页。

在这里插入图片描述

第一次申请内存,此时page cache中128个桶中都是空的,所以需要向堆申请一个128页的内存块下来,然后进行分割。

在这里插入图片描述

调用监视窗口可以看见此时page cache向堆申请了一个128页的内存块。

在这里插入图片描述
我们可以计算对应的数据是否正确。

在这里插入图片描述

接下来就会将128页的span插入到对应的128页的桶中,随后在进行切分出一个一页的和一个127页的,将127页的挂在对应127号桶的后面。

在这里插入图片描述

从page cache中申请内存完毕以后,会将page cache加上的那把大锁解开,然后对申请到的一页span进行切分。

切分过程就是获取到这个span的起始地址和结束地址,然后根据我们实际需要获取到的对齐数的大小进行切分。

在这里插入图片描述

在确定内存块的开始和结束后,就可以将其切分成一个个8字节大小的对象挂到该span的自由链表中了。在调试过程中通过内存监视窗口可以看到,切分出来的每个8字节大小的对象的前四个字节存储的都是下一个8字节对象的起始地址。

在这里插入图片描述

切分结束以后就需要获取0号桶的桶锁,然后再将切分好的span头插到0号桶的自由链表中。

在这里插入图片描述

由于thread cache只向central cache申请了一个对象,因此拿到这个非空的span后,直接从这个span里面取出一个对象即可,此时该span的_useCount也由0变成了1。

在这里插入图片描述

由于此时thread cache实际只向central cache申请到了一个对象,因此直接将这个对象返回给线程即可,其余对象依然挂在自由链表中。

在这里插入图片描述

此后,只要只要某个线程向thread申请小于8字节的内存块,就会直接去 0 号桶的自由链表中申请,因为最开始已经申请了1024个8byte的内存块,直到用完以后再继续在central cache中申请,如果一次申请超过8字节的内存块,就会根据对齐数,重复上面的步骤,申请内存。

为了进一步测试代码的正确性,我们可以做这样一个测试:让线程申请1024次8字节的对象,然后通过调试观察在第1025次申请时,central cache是否会再向page cache申请内存块。

void TestConcurrentAlloc2()
{for (size_t i = 0; i < 1024; ++i){void* p1 = ConcurrentAlloc(6);cout << p1 << endl;}void* p2 = ConcurrentAlloc(8);cout << p2 << endl;
}

当我们申请1024次对象以后,我们就会看见此时central cache的_useCount就累加到了1024,当第1025次申请对象时,此时central cache就需要向page cache申请内存了。

在这里插入图片描述

向page cache申请内存时,并不会直接向堆申请,因为刚刚已经向堆申请了128页的内存,被切分为了1页和127页,所以此次向page cache申请内存时,只需要将127页的span继续进行切分为1页和126页,将1页的span分配给central cache,再将126页span挂在page cache的相应哈希桶上。

在这里插入图片描述

threadcache回收内存

当某些线程申请的内存不再被使用,我们就可以将其还给thread cache,供后续调用过程使用,但是随着回收内存块数量的增加,可能会存在大量的内存块不被使用,这也是一种浪费,所以我们可以讲这些内存块和换个central cache,供其他的线程使用。

如果thread cache某个桶当中自由链表的长度超过它一次批量向central cache申请的对象个数,那么此时我们就要把该自由链表当中的这些对象还给central cache。

//释放内存
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size < MAX_BYTES);//计算对应size所对应在thread cache中哈希桶的位置size_t index = SizeClass::Index(size);//将该内存块插入到对应哈希桶下标的自由链表中_freeList[index].Push(ptr);//如果此时自由链表中的内存块的数量大于批量申请的内存时,就将这一段内存归还给central cacheif (_freeList[index].Size() >= _freeList[index].MaxSize()){ListTooLong(_freeList[index], size);}
}

当自由链表中的内存块的数量过长需要归还给central cache时我们的做法是从该自由链表中取出一次批量个数的对象,然后将取出的这些对象还给central cache中对应的span即可。

// 释放对象链表过长时,回收内存回到中心缓存
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{void* start = nullptr;void* end = nullptr;//获取这段内存的起始位置list.PopRange(start, end, list.MaxSize());//将这段内存归还给central cacheCentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

所以我们的FreeList中需要支持一个Size函数来记录自由链表的个数,因此需要新增一个_size变量来进行记录,这样我们的PushRange就需要在插入对象以后进行更新_size,还需要提供一个PopRange函数来获取一段对象,方便将其换个central cache。

//自由链表:管理切分好的小对象
class FreeList
{
public://头插void Push(void* obj){assert(obj);NextObj(obj) = _freeList;_freeList = obj;_size++;}// 插入一段对象void PushRange(void* start, void* end, size_t n){NextObj(end) = _freeList;_freeList = start;_size += n;}// 取出一段对象void PopRange(void*& start, void*& end, size_t n){assert(n <= _size);start = _freeList;end = start;for (size_t i = 0; i < n - 1; i++){end = NextObj(end);}_freeList = NextObj(end);NextObj(end) = nullptr;_size -= n;}//头删void* Pop(){assert(_freeList);void* obj = _freeList;_freeList = NextObj(obj);_size--;return obj;}bool empty(){return _freeList == nullptr;}size_t& MaxSize(){return _maxSize;}size_t Size(){return _size;}
private:void* _freeList = nullptr; // 头结点指针size_t _maxSize = 1;size_t _size = 0;
};

对于PushRange的最后一个参数,应该是我们实际获取到的内存块的数量 - 1,因为我们会先分派一个给相应的线程,然后再将剩下的挂在自由链表上面。

_freeList[Index].PushRange(NextObj(start), end, actualNum - 1);

我们并不是将所有的对象都归还给了central cache,而是在设计PopRange的时候规定一定数量的对象,我们的目的是为了保证不会出现某一个thread cache拥有过多的对象,而其他thread cache却很闲的问题,所以我们不一定要将thread cache中的所有对象归还给central cache。

centralcache回收内存

thread cache归还对象给central cache时,只能找到对应桶的位置,但是在central cache中,每个桶都会存在多个span,我们需要知道知道thread cache归还回来的对象需要挂在哪个span上面,这又是如何计算呢?

如何计算对应的span?

我们现在能知道的就是对应页的地址,也就是这个对象的地址,页的地址是由页号乘以对应每页的字节数而得来的,所以我们也可以反推出对应的页号,我们只需要除以对应的每页的大小即可。

当我们找到对应页号以后,又该如何计算对应的span呢?在这儿我们的解决方法是建立页号与 span 之间的映射关系,由于在合并的时候我们也会用到这个映射关系,所以我们可以将这个哈希表建立在page cache中,提供一个接口,让central cache也可以获得。

class PageCache
{
public:static PageCache* GetInstance(){return &_sInst;}//获取一个k页大小的spanSpan* NewSpan(size_t k);
public://提供一把大锁std::mutex _pageMtx;
private:SpanList _spanList[NPAGES];//建立页号与span的映射关系std::unordered_map<PAGE_ID, Span*> isSpanMap;private://构造函数设置为私有PageCache(){}//拷贝构造设置为删除PageCache(const PageCache&) = delete;//提供一个静态的对象static  PageCache _sInst;
};

每当page cache 分配对象给central cache时,都会建立页号与span的映射关系,所以此后thread cache还对象给central cache时,就会通过这个映射关系找到对应的span。

所以在调用NewSpan这个接口获取k页span时就需要建立好对应的映射关系。

Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);// 如果第k页不为空,弹出一个头部span给central cacheif (!_spanList[k].Empty()){Span* kSpan = _spanList[k].PopFront();//返回之前建立对应页号与span之间映射关系for (PAGE_ID i = 0; i < kSpan->_n - 1; i++){isSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}// 如果为空就继续向后寻找for (size_t i = k + 1; i < NPAGES; i++){// 如果找到一个不为空的,就进行切分if (!_spanList[i].Empty()){// 在span头部切一个k页span下来// 将这个k页span返回// 将剩下的n - k 页span挂在对应哈希桶上面Span* nSpan = _spanList[i].PopFront();Span* kSpan = new Span;kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;_spanList[nSpan->_n].PushFront(nSpan);//返回之前建立对应页号与span之间映射关系for (PAGE_ID i = 0; i < kSpan->_n - 1; i++){isSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}// 走到这里,证明page cache中并没有span,需要向堆申请Span* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;// 将申请下来的span挂在page cache上_spanList[bigSpan->_n].PushFront(bigSpan);// 复用递归进行调用return NewSpan(k);
}

此时就可以通过对象的地址获取到对应的页号,在通过页号获取到对应的span了。

// 获取对应映射的span
Span* PageCache::MapObjectToSpan(void* obj)
{assert(obj);//获取对应页号PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);auto ret = isSpanMap.find(id);if(ret != isSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}
}

这儿我们要通过页号来查找span,此时我们需要查找的页号一定是建立过映射关系的,如果出现报错,就证明我们前面的代码逻辑上就出现了错误。

centralcache回收内存

当thread cache归还对象给central cache时,我们只需要遍历这些对象。然后将其插入到对应的span中即可,同时也要更新对应的_useCount。

当central cache中的 _useCount减小到0的时候,证明此时已经将central cache中该位置span下对应的对象全部归还,此时就需要将central cache中的该span归还给page cache进行合并。

//将thread cache归还的对象链接到对应的span下
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{assert(start);assert(size <= MAX_BYTES);// 计算对应下标size_t index = SizeClass::Index(size);//加桶锁_spanLists[index]._mtx.lock();while (start){void* next = NextObj(start);//获取对象对应映射的spanSpan* span = PageCache::GetInstance()->MapObjectToSpan(start);//头插进spanNextObj(start) = span->_freeList;span->_freeList = start;span->_useCount--;// 说明此时该span当中对象已经还还完了// 就要将该span归还给page cacheif (span->_useCount == 0){_spanLists[index].Erase(span);span->_freeList = nullptr;span->_prev = nullptr;span->_next = nullptr;// 此时需要将桶锁解掉,然后再加上page cache的大锁_spanLists[index]._mtx.unlock();PageCache::GetInstance()->_pageMtx.lock();//将该span归还给page cachePageCache::GetInstance()->ReleaseSpanToPageCache(span);// 归还完成以后解锁PageCache::GetInstance()->_pageMtx.unlock();//加桶锁_spanLists[index]._mtx.lock();}start = next;}//解锁_spanLists[index]._mtx.unlock();
}

这儿需要注意的是central cache的桶锁与page cache大锁的配合使用的问题,当我们我们在进行遍历之间就需要先加上桶锁,然后开始遍历,归还对象,当_useCount减为0时,就需要将该span还给page cache,此时先要把桶锁解开,然后再加page cache的那一把大锁,归还完成以后解开,之后还需要在加上桶锁,因为我们遍历的操作还没完成,依次循环下去,直到遍历完成,再解开桶锁即可。

我们将span归还给page cache本质是将他从双链表中移除,由于page cache中并不需要一个一个的自由链表,所以我们可以将自由链表置空,同时也要将该span的前后指针也置空,因为后续我们需要将他插入进page cache对应的双链表中。

pagecache回收内存

对于central cache 还回来的span,我们并不是只将他插入到page cache对应的哈希桶里就可以了,我们还有尝试对他进行合并,这样就有效的解决了内存碎片的问题。

如何进行合并?

合并可以分为向前合并和向后合并。假设我们此时页号为num,页数为n的span需要进行合并,向前合并就是指我们去找页号为num - 1的页是不是空闲的,如果空闲就合并,依次向前进行合并。而向后合并就是我们去找num + n 页是不是空闲的,如果空闲,就合并,然后在依次向后进行合并。只有当合并的页数超过128页或者是没有空闲的时候才不能合并。

那我么我们如何判断对应的页是不是空闲的呢?首先肯定是不能使用_useCount进行判断的,因为进行合并肯定是对page cache中的空闲页进行合并的,如果是在多线程情况下,可能会存在某一个页正在被切分,此时_useCount依然为0,还没有被切分完就拿去合并了,显然是不合理的。所以我们需要再span中提供一个变量,来表示该span是否被使用,就可以很好的解决此问题了。

// 管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0; //存储大块内存的页号size_t _n = 0; // 页的数量Span* _prev = nullptr;Span* _next = nullptr;size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数void* _freeList = nullptr; // 每个Span对应的自由链表bool isUse = false; //来标记该span是否空闲
};

当我们申请带一个新的NewSpan时,我们就将该span状态标记为true。

//给page cache加上一把大锁
PageCache::GetInstance()->_pageMtx.lock();
//遍历完也没有找到,此时就需要去page cache中申请
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
// 标记该span正在被使用
span->isUse = true;
// 申请完毕以后解锁
PageCache::GetInstance()->_pageMtx.unlock();

当central cache归还一个span以后,我们将该span状态标记为false。

// 此时需要将桶锁解掉,然后再加上page cache的大锁
_spanLists[index]._mtx.unlock();PageCache::GetInstance()->_pageMtx.lock();
//将该span归还给page cache
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
// 将该span状态标记为false
span->isUse = false;
// 归还完成以后解锁
PageCache::GetInstance()->_pageMtx.unlock();

而且合并的时候补助需要被分配的页的页号与对应span的关系。而且还需要切分以后未被分配的页对应的页号与span的关系,因为我们是需要向前先后找空闲的span进行合并的,所以我们在page cache中申请一个k页span时,还需要将未分配的span对应的页和span之间的映射关系也建立出来,也就是建立其首位的页号与span的映射关系。

Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);// 如果第k页不为空,弹出一个头部span给central cacheif (!_spanList[k].Empty()){Span* kSpan = _spanList[k].PopFront();//返回之前建立对应页号与span之间映射关系for (PAGE_ID i = 0; i < kSpan->_n; i++){isSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}// 如果为空就继续向后寻找for (size_t i = k + 1; i < NPAGES; i++){// 如果找到一个不为空的,就进行切分if (!_spanList[i].Empty()){// 在span头部切一个k页span下来// 将这个k页span返回// 将剩下的n - k 页span挂在对应哈希桶上面Span* nSpan = _spanList[i].PopFront();Span* kSpan = new Span;kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;_spanList[nSpan->_n].PushFront(nSpan);//建立未分配的span对应的页与span之间的对应关系isSpanMap[nSpan->_pageId] = nSpan;isSpanMap[nSpan->_pageId + nSpan->_n] = nSpan;//返回之前建立对应页号与span之间映射关系for (PAGE_ID i = 0; i < kSpan->_n - 1; i++){isSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}// 走到这里,证明page cache中并没有span,需要向堆申请Span* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;// 将申请下来的span挂在page cache上_spanList[bigSpan->_n].PushFront(bigSpan);// 复用递归进行调用return NewSpan(k);
}

以上操作完成以后,就可以对page cache中的span进行合并了。

// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{// 向前合并while (1){// 获取页号PAGE_ID prevId = span->_pageId - 1;auto ret = isSpanMap.find(prevId);// 前面没有页号,不合并,直接breakif (ret == isSpanMap.end()){break;}Span* prevSpan = ret->second;// 如果前一个span正在被使用,不合并,直接breakif (prevSpan->isUse == true){break;}// 如果对应span加起来的页数超过128页,不合并,直接breakif (prevSpan->_n + span->_n > NPAGES - 1);{break;}//合并span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;// 释放掉原来的未被合并的span_spanList[prevSpan->_n].Erase(prevSpan);delete prevSpan;}//向后合并while (1){// 获取向后页号PAGE_ID nextId = span->_pageId + span->_n;auto ret = isSpanMap.find(nextId);// 后面没有页号,不合并,直接breakif (ret == isSpanMap.end()){break;}//如果后面的span正在被使用,不合并,直接breakSpan* nextSpan = ret->second;if (nextSpan->isUse == true){break;}// 如果对应span加起来的页数超过128页,不合并,直接breakif (span->_n + nextSpan->_n > NPAGES - 1){break;}//合并span->_n += nextSpan->_n;// 释放_spanList[nextSpan->_n].Erase(nextSpan);delete nextSpan;}// 将合并完成的span插入对应哈希桶,并设置状态为false_spanList[span->_n].PushFront(span);span->isUse = false;// 建立对应映射关系isSpanMap[span->_pageId] = span;isSpanMap[span->_pageId + span->_n - 1] = span;
}

释放内存过程联调

接下来对释放内存的过程进行演示,首先我们设置一个ConcurrentFree函数,使每个线程都可以调用自己thread cache来释放对象。

//通过TLS无锁访问释放属于自己的Thread Cache
static void ConcurrentFree(void* ptr, size_t size)
{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);
}

由于之前之前进行是三次内存的申请,所以接下来我们进行三次内存的释放,来理解一下这个释放流程。

void TestConcurrentAlloc1()
{void* p1 = ConcurrentAlloc(6);void* p2 = ConcurrentAlloc(8);void* p3 = ConcurrentAlloc(1);cout << p1 << endl;cout << p2 << endl;cout << p3 << endl;ConcurrentFree(p1, 6);ConcurrentFree(p2, 8);ConcurrentFree(p3, 1);
}

我们一共申请了三次内存,这三次内存对齐后大小均为8字节,所以对应的thread cache中的桶号为0,page cache 中对应的页号为1。

我们后续释放内存的一系列操作都是在thread cache和central cache的0号桶,page cache的1号桶中完成的,在第三次申请对象以后,此时thread cache对应的0号桶已经空了,是没有对象的,调用监视窗口就可以发现_maxSize已经为3了,但是_size依然为0。

在这里插入图片描述

而当我们第一次释放完对象以后,_size就会++,变为1,说明此时thread cache中自由链表中内存块的数量只有1个,并不会还给central cache。

在这里插入图片描述
此时就会继续调用ConcurrentFree函数第2次释放对象,_size增加到2,此时也不会将这段对象还给central cache。

在这里插入图片描述

接下来第3次释放内存,_size会++到3,此时_size就会等于_maxSize,就需要将thread cache中的自由链表换个central cache了。

在这里插入图片描述

在thread cache将自由链表还给central cache时,是一次性将这段空间还给central cache的,_size就会直接变为0,然后再调用ReleaseListToSpans函数就这段空间归还给central cache。

在这里插入图片描述

进入central cache以后归还对象以前,需要先将0号桶的桶锁加上,然后通过对应计算出来的页号与span之间的映射关系,找到0号桶中对应的span,将这段对象头插到span中,并且还需要将central cache中_useCount进行- -操作,来判断是否已经将对象归还完毕,此时对应的_useCount就变为2。

在这里插入图片描述

我们申请了3个对象,都是从该span上切分下来的,所以只有当3个对象都归还完毕以后,_useCount就会- -为0,此时就需要将central cache中的span归还给page cache。

在这里插入图片描述

将central cache中的对象归还给page cache之前,我们需要将0号桶的桶锁解开,然后在加上page cache的那一把大锁,调用ReleaseSpanToPageCache函数将span归还给page cache,此时我们需要将该span对应的头尾指针以及_freeLists都置空,因为后续page cache需要进行合并,而page cache接收到对应的span以后,就会进行相应的合并。

在这里插入图片描述

由于这1页是从第128页的头部切割下来的,所以并不会进行向前合并,只会走向后合并的步骤,由于第127页并没有被分配。所以走向后合并的步骤时根据映射关系一定可以找到第127页,进行合并,合并以后需要记得将原来的第127号同后面的span给delete掉,因为已经合并了,原来的就没有用了,合并完成以后再重新建立映射关系,将该span的状态设置为未使用即可。

在这里插入图片描述

当page cache 完成对象合并以后,也就意味着这次central cache归还对象已经完成,就需要解掉page cache中的那把大锁,然后再将桶锁加上,继续检查是否还需要将其他对象归还给central cache中span,实际上此时已经归还完毕了,直接解开桶锁。

在这里插入图片描述

以上就位内存释放的流程。

大于256KB的大块内存申请问题

对于小于256KB的内存,我们只需要去thread cache中进行申请就可以了,对于大于256KB的内存,我们可以直接去page cache中申请,但是page cache也只有128页,对应就是可以申请4个256KB的内存,对于大于128页的内存,那就需要直接去堆中进行申请。

内存大小申请方式
0 ~ 256KB(32页)通过thread cache申请内存
32页~ 128页直接向page cache cache申请内存
> 128页直接向堆申请内存

对于大于256KB的内存,也是需要向上进行对齐的,但是此时就是按页进行对齐了。

所以此时我们就需要对我们的RoundUp函数进行修改:

	//获取向上对齐后的字节数static inline size_t RoundUp(size_t size){if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8 * 1024){return _RoundUp(size, 128);}else if (size <= 64 * 1024){return _RoundUp(size, 1024);}else if (size <= 256 * 1024){return _RoundUp(size, 8 * 1024);}else{// 按页进行对齐return _RoundUp(size, (1 << PAGE_SHIFT));}}

对于申请内存的逻辑此时也需要进行修改了,对于大于256KB的内存就需要向page cache 直接进行申请了。而小于256KB的才会调用TLS通过无锁的方式向堆申请。

//通过TLS无锁访问获取属于自己的Thread Cache
static void* ConcurrentAlloc(size_t size)
{//当数据大于256KB时向page cache直接申请if (size > MAX_BYTES){//计算对齐数size_t alignSize = SizeClass::RoundUp(size);//获取页数size_t kPage = alignSize >> PAGE_SHIFT;//直接向page cache申请内存PageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(kPage);PageCache::GetInstance()->_pageMtx.unlock();void* ptr = (void*)(span->_pageId << PAGE_SHIFT);return ptr;}else{if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);}
}

直接去page cache申请内存时,我们前面编写的业务逻辑只能保证128页以内的内存的申请,如果大于128页,就需要直接向堆申请内存,所以对于我们的NewSpan的逻辑还需进行调整。

Span* PageCache::NewSpan(size_t k)
{assert(k > 0);//当页数大于128页时,直接向堆申请if (k > NPAGES - 1){void* ptr = SystemAlloc(k);Span* span = new Span;span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = k;// 建立映射关系isSpanMap[span->_pageId] = span;return span;}// 如果第k页不为空,弹出一个头部span给central cacheif (!_spanList[k].Empty()){Span* kSpan = _spanList[k].PopFront();//返回之前建立对应页号与span之间映射关系for (PAGE_ID i = 0; i < kSpan->_n - 1; i++){isSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}// 如果为空就继续向后寻找for (size_t i = k + 1; i < NPAGES; i++){// 如果找到一个不为空的,就进行切分if (!_spanList[i].Empty()){// 在span头部切一个k页span下来// 将这个k页span返回// 将剩下的n - k 页span挂在对应哈希桶上面Span* nSpan = _spanList[i].PopFront();Span* kSpan = new Span;kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;_spanList[nSpan->_n].PushFront(nSpan);//建立未分配的span对应的页与span之间的对应关系isSpanMap[nSpan->_pageId] = nSpan;isSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;//返回之前建立对应页号与span之间映射关系for (PAGE_ID i = 0; i < kSpan->_n; i++){isSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}// 走到这里,证明page cache中并没有span,需要向堆申请Span* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;// 将申请下来的span挂在page cache上_spanList[bigSpan->_n].PushFront(bigSpan);// 复用递归进行调用return NewSpan(k);
}

释放逻辑

此时我们在释放的过程中也需要进行判断,看释放给那个对象。

内存大小释放方式
0 ~ 256KB(32页)释放给thread cache
32页~ 128页释放给page cache
> 128页释放给堆

这儿释放的逻辑和申请的逻辑很像,对于小于256KB的数据,通过TLS无锁访问直接是否给自己的thread cache,对于大于256KB的内存就直接释放给page cache,我们只需要知道他的起始地址就可以了,所以一开始对于大于256KB的内存我们只建立了起始页号与对应span的映射关系。

//通过TLS无锁访问释放属于自己的Thread Cache
static void ConcurrentFree(void* ptr, size_t size)
{if (size > MAX_BYTES){//通过映射关系获取到对应的spanSpan* span = PageCache::GetInstance()->MapObjectToSpan(ptr);//调用ReleaseSpanToPageCache来释放对象PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}
}

所以我们对应ReleaseSpanToPageCache函数的逻辑也要进行修改,当小于128页时是归还给page cache,而大于128页时就是归还给堆了,归还给堆的接口叫做VirtualFree,我们可以封装一个SystemFree的接口,当我们需要将内存释放给堆时直接调用SystemFree即可。

SystemFree接口

static inline void SystemFree(void* ptr)
{
#ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE);
#else//Linux下方法
#endif
}

ReleaseSpanToPageCache函数的逻辑修改

// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{//如果对应空间大于128页,就释放给堆if (span->_n > NPAGES - 1){void* ptr = (void*)(span->_pageId << PAGE_SHIFT);SystemFree(ptr);delete span;return;}// 向前合并while (1){// 获取页号PAGE_ID prevId = span->_pageId - 1;auto ret = isSpanMap.find(prevId);// 前面没有页号,不合并,直接breakif (ret == isSpanMap.end()){break;}Span* prevSpan = ret->second;// 如果前一个span正在被使用,不合并,直接breakif (prevSpan->isUse == true){break;}// 如果对应span加起来的页数超过128页,不合并,直接breakif (prevSpan->_n + span->_n > NPAGES - 1){break;}//合并span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;// 释放掉原来的未被合并的span_spanList[span->_n].Erase(prevSpan);delete prevSpan;}//向后合并while (1){// 获取向后页号PAGE_ID nextId = span->_pageId + span->_n;auto ret = isSpanMap.find(nextId);// 后面没有页号,不合并,直接breakif (ret == isSpanMap.end()){break;}//如果后面的span正在被使用,不合并,直接breakSpan* nextSpan = ret->second;if (nextSpan->isUse == true){break;}// 如果对应span加起来的页数超过128页,不合并,直接breakif (span->_n + nextSpan->_n > NPAGES - 1){break;}//合并span->_n += nextSpan->_n;// 释放_spanList[nextSpan->_n].Erase(nextSpan);delete nextSpan;}// 将合并完成的span插入对应哈希桶,并设置状态为false_spanList[span->_n].PushFront(span);span->isUse = false;// 建立对应映射关系isSpanMap[span->_pageId] = span;isSpanMap[span->_pageId + span->_n - 1] = span;
}

接下来我们申请一个大于256KB的空间尝试一下,来理解他的申请和释放过程。

void TestConcurrentAlloc3()
{void* p1 = ConcurrentAlloc(257 * 1024);void* p2 = ConcurrentAlloc(129 * 8 * 1024);cout << p1 << endl;cout << p2 << endl;ConcurrentFree(p1, 257 * 1024);ConcurrentFree(p2, 129 * 8 * 1024);
}

当我们申请的内存大于256KB但小于128页时,直接去page cache中进行申请,此时会k的大小会为33页,此时page cache中并没有任何span,所以会去向堆直接申请一个128页的内存下来,然后切分为33页和95页,将33页的分配给某个线程,剩下的95页挂在对应哈希桶的后面。

在这里插入图片描述

释放内存同样也是,会直接调用ReleaseSpanToPageCache函数直接释放给page cache,然后再由page cache 进行合并,合并成为一个128页的span,然后将原来的span delete释放掉,将128页的span挂在128号桶的位置上。

在这里插入图片描述

对于大于128页的内存,会直接调用SystemAlloc接口直接向堆申请,我们这儿申请的是一个129页的内存下来,然后建立对应映射关系。

在这里插入图片描述

大于128页的内存释放同样也是直接释放给堆就可以了,通过对应的映射关系,找到这个span,然后调用ReleaseSpanToPageCache函数直接释放给堆。

在这里插入图片描述

定长内存池引入

我们实现的高并发内存是要代替malloc来进行内存申请的,但是在我们当前的代码下并没有完全的摆脱malloc,因为有些场景下我们在使用new来创建对象,new底层实际上还是malloc来实现的,所以我们此时最开始实现的定长内存池也就派上用场。

我们需要在page cache中引入一个定长内存池,为span对象来创建空间,就可以避免使用new,因此在page cache 中定义一个_spanPool,用于span对象的申请和释放。

class PageCache
{
public://......
private://引入定长内存池ObjectPool<Span> _spanPool;
};

对于代码中任意使用了new的位置都可以使用该_spanPool进行替换,使用了delete的位置也就可以使用该_spanPool释放。

//申请span对象
Span* span = _spanPool.New();
//释放span对象
_spanPool.Delete(span);

此外,每个线程第一次申请内存时都会创建其专属的thread cache,而这个thread cache目前也是new出来的,我们也需要对其进行替换。

if (pTLSThreadCache == nullptr)
{std::mutex tcMtx;//pTLSThreadCache = new ThreadCache;static ObjectPool<ThreadCache> tcPool;tcMtx.lock();pTLSThreadCache = tcPool.New();tcMtx.unlock();
}

这里我们将用于申请ThreadCache类对象的定长内存池定义为静态的,保持全局只有一个,让所有线程创建自己的thread cache时,都在个定长内存池中申请内存就行了。

但注意在从该定长内存池中申请内存时需要加锁,防止多个线程同时申请自己的ThreadCache对象而导致线程安全问题。

释放对象时优化为不传对象大小

我们平时使用malloc函数的时候,需要传入申请内存的大小,在我们释放对象的时候,只需要传入被释放对象的指针即可。

而我们自己实现的tcmalloc在释放对象时是需要传入对象的大小的,因为我们需要知道对应对象的大小才知道他要归还的对象是谁,而且我们需要改size计算对应哈希桶的位置。在这儿我们就可以进行一下优化,如果我们想做到只传入一个指针就可以解决问题,就需要建立对应对象地址和对象大小的关系。

我们现在可以通过对象的地址找到对应的span,那么我们就可以在span结构中再增加一个_objSize成员,该成员代表着这个span管理的内存块被切成的一个个对象的大小。

// 管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0; //存储大块内存的页号size_t _n = 0; // 页的数量Span* _prev = nullptr;Span* _next = nullptr;size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数void* _freeList = nullptr; // 每个Span对应的自由链表bool isUse = false; //来标记该span是否空闲size_t _objSize = 0; //切好的小对象的大小
};

获取一个k页的span都是在page cache中获取的,所以我们在获取到这个k也的span以后,可以将对应的小对象的个数保存下来。

	//遍历完也没有找到,此时就需要去page cache中申请Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));span->_objSize = size;

此时当我们释放对象时,就可以直接从对象的span中获取到该对象的大小,准确来说获取到的是对齐以后的大小。

//通过TLS无锁访问释放属于自己的Thread Cache
static void ConcurrentFree(void* ptr)
{//通过映射关系获取到对应的spanSpan* span = PageCache::GetInstance()->MapObjectToSpan(ptr);size_t size = span->_objSize;if (size > MAX_BYTES){//调用ReleaseSpanToPageCache来释放对象PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}
}

对于映射关系的加锁问题

对应的页号与span的映射关系我们是使用哈希表来实现的,STL容器本身就是线程不安全的,由于我们在添加映射关系的过程是在page cache中实现的,会有一把大锁进行约束,所以保证了映射的线程安全问题,但是我们在访问容器的过程中就会出现线程安全的问题。

我们获取span的过程并没有添加任何锁结构,如果我们是在central cache访问这个映射关系,或是在调用ConcurrentFree函数释放内存时访问这个映射关系,那么就存在线程安全的问题。因为此时可能其他线程正在page cache当中进行某些操作,并且该线程此时可能也在访问这个映射关系,因此当我们在page cache外部访问这个映射关系时是需要加锁的。

此时我们就可以考虑使用C++当中的unique_lock:

// 获取对应映射的span
Span* PageCache::MapObjectToSpan(void* obj)
{assert(obj);//获取对应页号PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);//构造时自动加锁,析构时解锁std::unique_lock<std::mutex> lock(_pageMtx);auto ret = isSpanMap.find(id);if(ret != isSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}
}

多线程环境下对比malloc测试

下面我们在多线程场景下对我们实现的高并发内存池进行测试:

测试代码

#include "ConcurrentAlloc.h"// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(malloc(16));//v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (unsigned int)malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, (unsigned int)free_costtime);printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, (unsigned int)(malloc_costtime + free_costtime));
}// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(ConcurrentAlloc(16));//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (unsigned int)malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (unsigned int)free_costtime);printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, (unsigned int)(malloc_costtime + free_costtime));
}int main()
{size_t n = 1000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}

首先,我们来进行一下固定大小内存的申请和释放:

v.push_back(malloc(16));
v.push_back(ConcurrentAlloc(16));

运行程序,我们就会发现此时效果并不明显:

在这里插入图片描述

因为我们此时申请的固定大小的内存,就会通过thread cache固定的去central cache的某个桶中桶中均匀的进行申请,并不是均匀分布的,此时central cache中的桶锁也就没有起多大作用了,固定的向某一个申请空间和释放空间,所以对比于malloc 效果并不明显。

接下来我们看一下不同大小内存的申请和释放:

v.push_back(malloc((16 + i) % 8192 + 1));
v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));

运行后可以看到,由于申请和释放内存的大小是不同的,此时central cache当中的桶锁就起作用了,ConcurrentAlloc的效率也有了较大增长,但相比malloc来说还是差一点点。

在这里插入图片描述

性能瓶颈分析

对比于malloc,我们实现的高并发内存池还是存在差距的,接下来我们使用VS下的性能分析工具来进行一线性能分析。

通过分析结果可以看到,光是Deallocate和MapObjectToSpan这两个函数就占用了一半多的时间。

在这里插入图片描述

而在Deallocate函数中,调用ListTooLong函数时消耗的时间是最多的。

在这里插入图片描述

在ListTooLong函数中,调用ReleaseListToSpans函数时消耗的时间是最多的。

在这里插入图片描述

在ReleaseListToSpans函数中,调用MapObjectToSpan函数时消耗的时间是最多的。

在这里插入图片描述

最终就会发现,调用该函数时会消耗这么多时间就是因为锁的原因,此当前项目的瓶颈点就在锁竞争上面,需要解决调用MapObjectToSpan函数访问映射关系时的加锁问题。tcmalloc当中针对这一点使用了基数树进行优化,使得在读取这个映射关系时可以做到不加锁。

在这里插入图片描述
如果想对基数树进行了解,可以去去看一下tcmalloc 的源码是如何实现的,我们在这儿设计的这个高并发内存池主要是为了学习的,中间存在很多不足的地方,以上就是对于整个高并发内存池的设计。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/668541.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis——高级主题

介绍Redis的高级主题&#xff0c;包括服务器配置、Redis事务、Redis发布和订阅、Pipeline批量发送请求、数据备份与恢复等。 1、服务器配置 在Windows和Linux的Redis服务器里面&#xff0c;都有一个配置文件。Redis配置文件位于Redis安装目录下&#xff0c;在不同操作系统下&…

Redis核心技术与实战【学习笔记】 - 21.Redis实现分布式锁

概述 在《20.Redis原子操作》我们提到了应对并发问题时&#xff0c;除了原子操作&#xff0c;还可以通过加锁的方式&#xff0c;来控制并发写操作对共享数据的修改&#xff0c;从而保证数据的正确性。 但是&#xff0c;Redis 属于分布式系统&#xff0c;当有多个客户端需要争…

【Java知识手册】一.Java开发工具和前言

文章目录 1 Java前言1.1 简介1.2 Java环境搭建1.3 程序的开发步骤1.4 idea开发工具1.5用idea开发一个helloworld 前言&#xff1a;以初学着的身份&#xff0c;准备在该平台整理点最近学习的知识&#xff0c;方便后续查看相关的技术点&#xff0c;有兴趣的可以一块交流学习。目标…

机器学习本科课程 大作业 多元时间序列预测

1. 问题描述 1.1 阐述问题 对某电力部门的二氧化碳排放量进行回归预测&#xff0c;有如下要求 数据时间跨度从1973年1月到2021年12月&#xff0c;按月份记录。数据集包括“煤电”&#xff0c;“天然气”&#xff0c;“馏分燃料”等共9个指标的数据&#xff08;其中早期的部分…

接口测试要测试什么?

一. 什么是接口测试&#xff1f;为什么要做接口测试&#xff1f; 接口测试是测试系统组件间接口的一种测试。接口测试主要用于检测外部系统与系统之间以及内部各个子系统之间的交互点。测试的重点是要检查数据的交换&#xff0c;传递和控制管理过程&#xff0c;以及系统间的相互…

Maven 安装教程

一、安装地址 1.官网安装最新版本 2.其他版本&#xff0c;我这里是maven-3/3.6.2 二、配置环境 1. 点击此电脑鼠标右击->属性->高级系统设置->环境变量 &#xff0c;配置系统变量->新建&#xff1a;MAVEN_HOME 2.配置path 路径 &#xff1a;%MAVEN_HOME%\bin 三、安…

ChatGPT之制作短视频

引言 今天带来了如何使用 ChatGPT和剪映来制作简单的短视频教程&#xff0c;在这其中 ChatGPT的作用主要是帮我们生成文案&#xff0c;剪映的功能就是根据文案自动生成视频&#xff0c;并配上一些图片、动画、字幕和解说。 ChatGPT生成文案 首先&#xff0c;我们需要使用提示…

Anaconda的安装及其配置

一、简介 Anaconda是一个开源的包、环境管理器&#xff0c;主要具有以下功能和特点&#xff1a; 提供conda包管理工具&#xff1a;可以方便地创建、管理和分享Python环境&#xff0c;用户可以根据自己的需要创建不同的环境&#xff0c;每个环境都可以拥有自己的Python版本、库…

常用排序算法(Java版本)

1 引言 常见的排序算法有八种&#xff1a;交换排序【冒泡排序、快速排序】、插入排序【直接插入排序、希尔排序】、选择排序【简单选择排序、堆排序】、归并排序、基数排序。 2 交换排序 所谓交换&#xff0c;就是序列中任意两个元素进行比较&#xff0c;根据比较结果来交换…

nginx slice模块的使用和源码分析

文章目录 1. 为什么需要ngx_http_slice_module2. 配置指令3. 加载模块4. 源码分析4.1 指令分析4.2 模块初始化4.3 slice模块的上下文4.2 $slice_range字段值获取4.3 http header过滤处理4.4 http body过滤处理5 测试和验证 1. 为什么需要ngx_http_slice_module 顾名思义&#…

程序员为什么不喜欢关电脑,这回答很霸道!

在大家的生活中&#xff0c;经常会发现这样一个现象&#xff1a;程序员经常不关电脑。 至于程序员不关电脑的原因&#xff0c;众说纷纭。 其中这样的一个程序员&#xff0c;他的回答很霸道&#xff1a; “因为我是程序员&#xff0c;我有权选择不关电脑。我需要在任何时候都能够…

C++一维数组

个人主页&#xff1a;PingdiGuo_guo 收录专栏&#xff1a;C干货专栏 铁汁们大家好呀&#xff0c;我是PingdiGuo_guo&#xff0c;今天我们来学习一下数组&#xff08;一维&#xff09;。 文章目录 1.数组的概念与思想 2.为什么要使用数组 3.数组的特性 4.数组的操作 1.定义…

0 代码自动化测试:RF 框架实现企业级 UI 自动化测试

前言 现在大家去找工作&#xff0c;反馈回来的基本上自动化测试都是刚需&#xff01;没有自动化测试技能&#xff0c;纯手工测试基本没有什么市场。 但是很多人怕代码&#xff0c;觉得自动化测试就需要代码&#xff01;代码学习起来很难&#xff01; 当然代码学习不难&#xf…

优思学院|精益生产-改变制造业的革命性理念

在今日这个变幻莫测、竞争如潮的市场环境中&#xff0c;企业如同海上的帆船&#xff0c;面临着狂风巨浪的考验。在这样的大背景之下&#xff0c;精益生产&#xff08;Lean Production&#xff09;这一理念&#xff0c;宛如一盏明灯&#xff0c;指引着无数企业穿越迷雾&#xff…

安科瑞消防设备电源监控系统在杭后旗医院项目的设计与应用

摘要&#xff1a;本文简述了消防设备电源的组成原理&#xff0c;分析了消防设备电源监控系统在应用中的设计依据和相关规范。通过安科瑞消防设备电源监控系统在杭后旗医院项目的实例介绍&#xff0c;阐述了消防设备电源功能的实现及其重要意义。 关键词&#xff1a;消防设备电…

【jenkins】主从机制及添加Slave节点操作

一、master-slave 日常构建Jenkins任务中&#xff0c;会经常出现下面的情况&#xff1a; 自动化测试需要消耗大量的 CPU 和内存资源&#xff0c;如果服务器上还有其他的服务&#xff0c;可能会造成卡顿或者宕机这样的情况&#xff1b; Jenkins 平台上除了这个项目&#xff0c…

【Linux】解决:为什么重复创建同一个【进程pid会变化,而ppid父进程id不变?】

前言 大家好吖&#xff0c;欢迎来到 YY 滴Linux 系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过Linux的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; YY的《C》专栏YY的《C11》专栏YY的…

开源软件在技术革新和行业变革中的作用

引言&#xff1a; 在数字化浪潮推动下&#xff0c;开源软件以其独特的魅力重构了软件开发的生态系统&#xff0c;成为技术创新和行业变革的催化剂。它通过低成本、高协作性、极致透明度的特征&#xff0c;成为企业和个人的首选。本文将深度探讨开源软件的影响力&#xff0c;展…

【C++刷题】二叉树的深搜

二叉树的深搜 一、计算布尔二叉树的值1、题目描述2、代码3、解析 二、求根节点到叶节点数字之和1、题目描述2、代码3、解析 三、二叉树剪枝1、题目描述2、代码3、解析 四、验证二叉搜索树1、题目描述2、代码3、解析 五、二叉搜索树中第K小的元素1、题目描述2、代码3、解析 六、…

16.docker删除redis缓存数据、redis常用基本命令

1.进入redis容器内部 &#xff08;1&#xff09;筛选过滤出redis容器 docker ps | grep "redis"&#xff08;2&#xff09;进入redis容器 #说明&#xff1a;d24为redis容器iddocker exec -it d24 /bin/bash2.登陆redis (1) 进入redis命令行界面 redis-cli说明&a…