本文从源码角度分析spark统一内存管理的实现原理。
统一内存管理对象的创建
统一内存管理对象在SparkEnv中进行创建和管理,这样内存管理就在Driver和Executor端中都可以使用。在SparkEnv的create函数中,创建内存管理对象的实现代码如下:
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false) val memoryManager: MemoryManager = if (useLegacyMemoryManager) { new StaticMemoryManager(conf, numUsableCores) } else { // spark2默认使用统一内存管理模式,所以执行这里 UnifiedMemoryManager(conf, numUsableCores) }
从以上代码片段可知,使用静态内存管理还是统一内存管理,是由参数:spark.memory.useLegacyMode决定的。从spark-2.0开始默认都是使用统一内存管理,一般不会修改该参数。
所以,一般情况下,默认会创建统一内存管理:UnifiedMemoryManager对象。这几个对象之间的关系,如图1所示:
图1 内存管理对象和SparkContext
统一内存管理初始化
在创建统一内存管理对象时,会进行初始化操作。为了便于管理和分配内存,在初始化初始化时会把内存分成几个部分:预留内存,用户内存,执行和存储内存。
统一内存管理对象初始化时的主要步骤如下:
(1)计算JVM可用的最大内存,保存在变量:systemMemory中,默认从参数spark.testing.memory获取值但一般不设置,所以会获取:Runtime.getRuntime.maxMemory的值。
(2)计算需要预留的内存数:reservedMemory,先取参数:spark.testing.reservedMemory的值,但一般不设置,此时使用默认值:300M。
(3)计算系统使用内存的最小值,它是预留内存的1.5倍,也就是:minSystemMemory=reservedMemory * 1.5,若系统使用内存比这个值小:systemMemory < minSystemMemory,则报错:请增加spark.driver.memory的值。
(4)获取executor的内存值:val executorMemory = conf.getSizeAsBytes("spark.executor.memory"),若executorMemory < minSystemMemory,则报错:请增加spark.executor.memory的值。
(5)计算系统可用内存的总量,系统内存-预留内存,得到spark可以使用的总内存:usableMemory = systemMemory - reservedMemory
(6)计算任务执行和存储可用内存总量。计算公式是:usableMemory * memoryFraction。其中memoryFraction是一个小数,是配置项spark.memory.fraction的值,默认值是0.6。
(7)最大可用内存已经计算出来了,此时可以创建UnifiedMemoryManager对象了,代码如下:
new UnifiedMemoryManager( conf, maxHeapMemory = maxMemory, onHeapStorageRegionSize = (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong, numCores = numCores)
从创建统一内存管理对象的代码中可以看出,默认情况下任务的执行内存和存储内存是个占50%。可以通过参数spark.memory.storageFraction来调整执行内存和存储内存的占比。
完成统一内存初始化后,内存的划分情况如图2所示:
图2 统一内存初始化内存分布
统一内存管理的实现
前面已经说明,统一内存管理是在UnifiedMemoryManager类中实现的。下面我们来分析统一内存管理的实现逻辑。
该类的声明如下:
private[spark] class UnifiedMemoryManager private[memory] ( conf: SparkConf, val maxHeapMemory: Long, onHeapStorageRegionSize: Long, numCores: Int)
统一内存管理为spark提供了灵活使用内存的机制。它把一块大的可使用的内存分成执行内存和存储内存。执行内存主要被Executor在执行任务时使用,而存储内存主要用来存储数据块。
该类的成员变量说明如下
onHeapStorageRegionSize堆内内存区的大小,以字节为单位。该内存区不是静态保留的; 执行器可以在必要时进行借用。仅当实际存储内存使用量超过此区域时,才能清除缓存块。
maxHeapMemory最大可用堆内存。该成员变量是通过函数getMaxMemory计算而来的,具体的计算方法见下面的分析。
numCores核数。
获取执行内存
在执行当前任务内存不足时会需要申请执行内存。申请内存的过程可能会向存储内存池(StorageMemoryPool)借用一部分内存,并把这部分内存添加到执行内存池(ExecutionMemoryPool)中。能够向存储内存池借用内存必须满足以下条件之一:
(1)存储池的空闲内存大于0;
(2)存储是否已经借用了执行池的内存。通过:存储内存池目前的大小减去初始化设置的存储内存池的大小是否大于0来进行判断,也就是计算storagePool.poolSize - storageRegionSize是否大于0。若大于0(已借用)表示可以分配。
在借用存储内存时,可能会把存储池中的内存释放一部分,若这部分内存的rdd设置了useDisk级别,还会把这些内存的数据写入磁盘,否则,这些内存中的存储数据就丢失了。
内存块的释放是在MemoryStore对象中完成(后面的文章会详细分析这实现),官方文档中提到过,释放老的内存块的算法是LRU(最近最少使用),这是由于在MemoryStore中内存块是以LinkedHashMap的结构组织的,在链表的头部就是“最近最少使用”的内存块。这部分内容在分析MemoryStore的实现时再继续讲解。
下面分析获取执行内存操作的实现逻辑。
acquireExecutionMemory函数
在统一内存管理中实现获取执行内存的函数是:acquireExecutionMemory。该函数的原型如下:
override private[memory] def acquireExecutionMemory( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long = synchronized {...}
该函数尝试为目前的执行任务获取numBytes执行内存。对于该函数需要注意以下几点:
(1)它尝试获取numBytes字节大小的内存,返回能够获取的字节数,若返回0,则表示无法分配内存;
(2)它是同步函数,所以当有多个任务调用该函数时可能会阻塞,直到有足够的内存,这样做是为了在把数据进行持久化之前,让每个任务都有机会获取到1/2N的内存(其中N是运行的任务数)。
(3)当老的任务占用很多内存,而新任务数又不断增加时,阻塞就可能会发生。
实现逻辑
获取执行内存操作的实现逻辑如下:
(1)根据参数memoryMode的值来选择操作:若是堆内模式(ON_HEAP),获取堆内的执行和存储池总量和堆内可用存储内存总量,以及总的堆内内存大小。若是堆外模式(OFF_HEAP),获取堆外的执行和存储池总量和堆外可用存储内存总量,以及总的堆外内存大小。这一步的代码实现如下:
val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match { // 堆内模式 case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, onHeapStorageRegionSize, maxHeapMemory) // 堆外模式 case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, offHeapStorageMemory, maxOffHeapMemory) }
(2)判断是否需要增加执行内存池(ExecutionPool)。当执行内存池中空闲内存量小于需要申请的内存量时,则会尝试增加执行池。尝试增加执行池的过程,本质上就是向存储池StorageMemoryPool借用内存的过程。能够成功借用存储池的内存,需要满足以下两个条件之一:1)存储池有空闲内存;2)存储池的量大于初始化的量。(也就是说,已经向执行内存池借用了一些内存,存储池大小增加了)
另外,这个过程可能执行多次,每次尝试都必须能够获取到一些内存,可能会清除掉一些内存中的数据块,以防其他任务在缓存大的数据块和清除数据之间进行反复。那么,为什么每次只能清除一些内存呢?这是因为在MemoryStore中,内存是以MemoryEntry对象来组织和管理的,清理时也是以这个为单位进行的,而每个这样的对象的大小是不同的。
尝试增加执行内存池大小的实现代码如下:
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = { if (extraMemoryNeeded > 0) { // 可以分配内存的条件:1.存储池有空闲内存 或 2.存储池已经借用了执行池的内存 val memoryReclaimableFromStorage = math.max( storagePool.memoryFree, storagePool.poolSize - storageRegionSize) if (memoryReclaimableFromStorage > 0) { // 通过下面的函数来释放存储内存池的内存,减少存储内存池的大小。 val spaceToReclaim = storagePool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) // 到这里,说明存储内存池的空间已经释放,这一步只需要减少存储内存池的大小即可 storagePool.decrementPoolSize(spaceToReclaim) // 增加执行内存池大小的量 executionPool.incrementPoolSize(spaceToReclaim) } } }
要注意的是,执行内存池将借用的内存均匀地分配给活动任务,以限制每个任务的执行内存分配。保持这个大于执行池大小是很重要的,这不考虑可以通过清除存储而释放的潜在内存。另外,这个数量应该保持在“maxMemory”以下,以便在任务中执行内存分配的公平性,否则,任务可能占用超过其平均份额的执行内存。
(3)然后调用executionPool.acquireMemory来获取内存,该函数的声明如下:
private[memory] def acquireMemory( numBytes: Long, // 想要获取的内存数 taskAttemptId: Long, // 想要获取内存的任务数 maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit, // 一个回调函数,用来增长内存池的大小 computeMaxPoolSize: () => Long = () => poolSize // 回调函数,用来获取某个时刻允许获取内存的最大值 ): Long = lock.synchronized
该函数尝试为给定任务获取numBytes大小的内存,并返回获取到的内存大小。该函数可能会阻塞,直到有足够的内存再返回。该函数的执行逻辑大致如下:
添加任务到taskMemory这个map中,该map保存了任务id和申请的内存大小的对应关系。
调用maybeGrowExecutionPool回调函数来向storeage申请内存,若内存不够该函数会释放掉一些存储内存。一次释放的内存可能不够,所以该函数可能会尝试多次。
maybeGrowExecutionPool会调用memoryStore.evictBlocksToFreeSpace函数,在该函数中会根据rdd和内存模式等参数来清除一些内存块,释放对应大小的内存,具体的实现过程在后面分析。
获取存储内存
获取存储内存的过程比获取执行内存的过程要相对简单。因为,获取存储内存时不会强制释放正在使用的执行内存,而只能从执行池的空闲内存中申请。
所以,申请存储内存的步骤主要是以下几步:
(1)判断需要申请的内存数量,是否大于存储池的空闲内存量。若大于(存储池的内存量不够),则向执行池的空闲内存申请一部分内存。要注意:可能执行池的空闲内存也不够,或根本就没有空闲内存。
(2)调用存储池的内存获取函数获取内存,若空闲内存不够,则需要从存储池中按LRU算法释放一部分内存。
获取存储内存是在函数acquireStorageMemory中实现,下面我们来分析一下该函数的具体实现。
acquireStorageMemory函数
该函数的原型如下:
override def acquireStorageMemory( blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized {...}
该函数的参数:
memoryMode: MemoryMode:该参数是内存的模式,主要有两种:ON_HEAP或OFF_HEAP。
numBytes:需要申请的内存大小,单位是bytes
blockId:数据块的ID,也是可能会被释放的数据块。若该id为空,则会通过LRU算法寻找需要释放块对应的内存。
该函数是一个同步函数,若是多个线程同时调用该函数,可能会阻塞。
实现分析
该函数的主要实现逻辑如下:
(1)根据参数memoryMode来获取此种模式下的最大可以用存储内存,保存在变量maxMemory中。
(2)判断内存申请量(即参数numBytes)是否大于maxMemory,若申请内存大于最大可用内存,会失败。报错:该blockid的数据块需要的内存超过最大使用内存。
(3)若申请的内存大小:numBytes大于存储池的空闲内存大小,则需要从执行池中“借用”一些空闲内存。借用的意思是,从执行池的空闲内存中获取一部分内存,但要注意:最多从执行池中借用空闲内存量,不会释放任务正在使用的执行内存。实现代码如下:
if (numBytes > storagePool.memoryFree) { // 所需内存量大于可用存储空闲内存量,需要从执行池中申请一部分内存 val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes - storagePool.memoryFree) //最多获取执行池中空闲的内存量大小 executionPool.decrementPoolSize(memoryBorrowedFromExecution) // 执行内存池减少内存数 storagePool.incrementPoolSize(memoryBorrowedFromExecution) // 存储内存池增加对应内存 }
注意:这一步是体现统一内存思想的重要的一步。
(4)若能够从执行内存池中借用成功,这一步就直接在存储内存池中申请内存了。代码很简单,就是调用存储内存池的内存申请函数:
storagePool.acquireMemory(blockId, numBytes)
storagePool#acquireMemory函数
该函数来完成存储池的内存申请工作。要注意,此时的存储池可能有空闲的内存,也可能没有空闲内存。当存储池没有空闲内存时,需要把已有的某些数据块从存储池中清除,以满足当前数据块的存储需要。
该函数的实现逻辑如下:
(1) 计算需要释放的内存量
需要申请的内存量减去空闲的内存量,就是需要释放的内存量。也就是说,需要从已经使用的存储内存块中释放一部分内存。
val numBytesToFree = math.max(0, numBytes - memoryFree)
(2) 第(1)步已经计算出来需要释放的内存量了。下面调用StorageMemoryPool.acquireMemory函数来申请内存,释放一定的数据块。该函数会调用MemoryStore.evictBlocksToFreeSpace来清除数据块。会被清除的数据块的判断如下:
def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = { //存储模式相同,且blockId没有被RDD占用 或则不是要替换相同RDD的不同数据块 entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))}
若有可以释放的数据块,还需要获取一把写锁,加锁的目的是防止目前还有其他的线程在读该数据块。当锁获取成功后,就可以开始删除数据块了,具体的删除过程是通过blockInfoManager.removeBlock来进行的。该函数会把需要清除的元数据和数据块从blockManager中删除。
释放内存块:MemoryStore#evictBlocksToFreeSpace函数
这是MemoryStore类的成员函数,该函数完成内存块的释放,若存储级别包含useDisk还会把内存中的数据保存到磁盘中。该函数的原型如下:
private[spark] def evictBlocksToFreeSpace( blockId: Option[BlockId], space: Long, memoryMode: MemoryMode): Long = {
其中的blockId是数据块的id,每个id都对应一个内存块。释放内存块的逻辑如下:
(1)遍历内存块的队列。这是一个LinkedHashMap,最后一次被访问的内存块节点会放到链表的后面,这样最近没有被访问的内存块就在队列的头部。
(2)检查内存块是否可以被释放。释放内存块需要满足以下条件:
1)内存块的模式必须和参数中memoryMode的值相等;
2)该blockId对应的内存块没有被其他RDD占用,或则不是要替换相同RDD的不同数据块。
(3)若满足以上两个条件,就会释放该内存块。释放内存块的过程如下:
1)确认内存块的写锁已经锁上了;
2)通过blockId的信息检查存储级别是否包含useDisk,若包含则把内存的数据写入到磁盘上。写入磁盘 的过程是通过DiskStore对象来完成的。
(4)由于实际的内存是通过MemoryStore来管理的,所以,最后一步就是从memoryStore中删除并释放blockId对应的内存块,并减少MemoryStore的内存数量。到此,就完成了内存释放的整个过程。至于MemoryStore是如何释放内存的,会在分析MemoryStore时进行分析。
计算可用堆内存储内存:maxOnHeapStorageMemory函数
该函数用来计算堆内可用内存,逻辑很简单:就是使用总的堆内存储内存-为执行器可分配的堆内内存:
override def maxOnHeapStorageMemory: Long = synchronized { // 计算可用堆内内存 maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed }
计算对外存储内存:maxOffHeapStorageMemory函数
该函数用来计算可用堆外内存:使用总堆外内存-为执行器分配的堆外内存:
override def maxOffHeapStorageMemory: Long = synchronized { // 计算可用堆外内存 maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed }
小结
本文讲述了spark统一内存管理的实现原理。从实现层面来看,Spark的统一内存管理都是在UnifiedMemoryManager类中实现。不管是执行还是存储内存不足时,都可以向对方借用内存。但内存不足时,可以根据LRU来释放存储正在使用的内存,但不能释放执行时正在使用的内存。
另外,最终的内存块释放和数据块的持久化是通过MemoryStore,DiskStore以及BlockManager这几个系统来完成的,这些组件的原理会在后面的文章中继续分析。