对内存使用的不同,是Spark与Hadoop的重要区别之一。Hadoop只将内存作为计算资源,Spark除将内存作为计算资源外,还将内存的一部分纳入到存储体系中。Spark使用MemoryManager对存储体系和内存计算所使用的内存进行管理。
内存池实质上是对物理内存的逻辑规划,协助Spark任务在运行时合理地使用内存资源。Spark将内存从逻辑上区分为堆内存和堆外内存,称为内存模式(MemoryNode)。定义在枚举类型MemoryMode中。
@Private public enum MemoryMode { ON_HEAP, OFF_HEAP }注意:这里的堆内存并不能与JVM中的java堆直接划等号,它只是JVM堆内存的一部分。堆外内存则是Spark使用sun.misc.Unsafe的API直接在工作结点的系统内存中开辟的空间。无论是哪种内存,都需要一个内存池堆内存进行管理,内存池定义在MemoryPool中。
private[memory] abstract class MemoryPool(lock: Object) { @GuardedBy("lock") // lock: 对内存池提供线程安全保证的锁对象。 private[this] var _poolSize: Long = 0 // 内存池的大小,单位为字节 /** * Returns the current size of the pool, in bytes. */ final def poolSize: Long = lock.synchronized { // 返回内存池的大小 _poolSize } /** * Returns the amount of free memory in the pool, in bytes. */ final def memoryFree: Long = lock.synchronized { // 获取内存池的空闲空间 _poolSize - memoryUsed } /** * Expands the pool by `delta` bytes. */ final def incrementPoolSize(delta: Long): Unit = lock.synchronized { // 为内存池扩展delta的大小 require(delta >= 0) _poolSize += delta } /** * Shrinks the pool by `delta` bytes. */ final def decrementPoolSize(delta: Long): Unit = lock.synchronized { // 为内存池缩小delta的大小 require(delta >= 0) require(delta <= _poolSize) require(_poolSize - delta >= memoryUsed) _poolSize -= delta } /** * Returns the amount of used memory in this pool (in bytes). */ def memoryUsed: Long }下图表示了MemoryPool的内存模型,以及它的两种具体实现。
spark内存池模型图中StoryMemoryPool是存储体系用到的内存池,而ExecutionMemoryPool是计算引擎用到的内存池。
本文只介绍StoryMemoryPool,而ExecutionMemoryPool放到计算体系中讲。
StoryMemoryPool 是对用于存储的物理内存的逻辑抽象,通过对存储内存的逻辑管理,提高Spark存储体系对内存的使用效率。StoryMemoryPool拥有的方法和属性如下。
memoryMode: 内存模式。枚举值poolName:内存池的名称。与memoryMode对应。_memoryUsed:已经使用的内存大小(单位为字节)。_memoryStore:当前StorageMemoryPool所关联的MemoryStore。setMemoryStore:设置_memoryStorereleaseAllMemory:释放当前内存池的所有内存,即将_memoryUsed置为0以上方法的实现简单,就不详细描述了。
此方法用于给BlockId对应的Block获取numBytes指定的内存大小。
def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized { val numBytesToFree = math.max(0, numBytes - memoryFree) acquireMemory(blockId, numBytes, numBytesToFree) }从代码可以看出,其本质是调用了acquireMemory的重载实现,代码如下
def acquireMemory( blockId: BlockId, numBytesToAcquire: Long, numBytesToFree: Long): Boolean = lock.synchronized { assert(numBytesToAcquire >= 0) assert(numBytesToFree >= 0) // 空间空间足够才进行下一步 assert(memoryUsed <= poolSize) if (numBytesToFree > 0) { // 腾出numBytesToFree指定大小的空间 memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode) } // NOTE: If the memory store evicts blocks, then those evictions will synchronously call // back into this StorageMemoryPool in order to free memory. Therefore, these variables // should have been updated. val enoughMemory = numBytesToAcquire <= memoryFree if (enoughMemory) { _memoryUsed += numBytesToAcquire // 增加已使用的内存大小 } enoughMemory // 返回是否成功获得了用于存储的Block空间 }此方法用于释放内存。
def releaseMemory(size: Long): Unit = lock.synchronized { if (size > _memoryUsed) { logWarning(s"Attempted to release $size bytes of storage " + s"memory when we only have ${_memoryUsed} bytes") _memoryUsed = 0 } else { _memoryUsed -= size // 将已使用内存置为0 } }此方法用于释放指定大小的空间,缩小内存池的大小。
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized { val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree) val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // 如果空闲空间大于等于需要释放的空间,直接返回spacetoFree,否则需要腾出占用内存的部分Block的空间 // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode) // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. spaceFreedByReleasingUnusedMemory + spaceFreedByEviction } else { spaceFreedByReleasingUnusedMemory } }拥有的属性。
conf:即SparkConfnumCores:CPU内核数onHeapStorageMemory:用于存储的堆内存大小onHeapExecutionMemory:用于执行计算的堆内存大小onHeapStorageMemoryPool:用于堆内存的存储内存池,大小onHeapStorageMemory属性指定offHeapStorageMemoryPool:堆外内存的存储内存池onHeapExecutionMemoryPool:堆内存的执行计算内存池,大小由onHeapExecutionMemory属性指定offHeapExecutionMemoryPool:堆外内存的执行计算内存池maxOffHeapMemory:堆外内存的最大值。可以通spark.memory.offHeap.size 属性指定,默认offHeapStorageMemory:用于存储的堆外内存大小。可以通过spark.memory.storageFraction 属性(默认为0.5)修改存储占用堆外内存根据上述的属性,我们可以用下图来表示MemoryManager管理的4块内存池
MemoryManager管理的4块内存池由于MemoryManager中提供的方法既有用于存储体系的,也有用于任务计算的,本节只介绍MemoryManager中提供的与存储体系有关的方法,如下:
maxOnHeapStorageMemory:返回用于存储的最大堆内存。需子类实现maxOffHeapStorageMemory:返回用于存储的最大堆外内存。需子类实现setMemoryStore:给onHeapStorageMemoryPool 和offHeapStorageMemoryPool 设置MemoryStore。为存储BlockId对应的Block,从堆内存或堆外内存获取所需大小(即numsBytes)的内存
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean为扩展BlockId对应的Block,从堆内存或堆外内存获取所需大小(即numsBytes)的内存
def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean从堆内存或堆外内存释放指定大小的内存.
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized { memoryMode match { case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes) case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes) } }从堆内存及堆外内存释放所有内存.
final def releaseAllStorageMemory(): Unit = synchronized { onHeapStorageMemoryPool.releaseAllMemory() offHeapStorageMemoryPool.releaseAllMemory() }释放指定大小的展开内存
final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized { releaseStorageMemory(numBytes, memoryMode) }onHeapStorageMemoryPool与offHeapStorageMemoryPool中一共占用的存储内存
final def storageMemoryUsed: Long = synchronized { onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed }MemoryManager有两个子类,分别是StaticMemoryManager和UnifiedMemoryManager。StaticMemoryManager保留了早期Spark版本遗留下来的静态内存管理机制,也不存在堆外内存的模型。在静态内存管理机制下,Spark应用程序 在运行的存储内存和执行内存的大小均为固定的(spark 1.6.0之前版本)。从Spark 1.6.0 版本开始,以 UnifiedMemoryManager 作为默认的内存管理器。UnifiedMemoryManager提供了统一的内存管理机制,即Spark应用程序在运行期的存储内存和执行内存将共享统一的内存空间,可以动态调节两块内存的空间大小。下面将讲述UnifiedMemoryManager。
UnifiedMemoryManager在MemoryManager的内存模型之上,将计算内存和存储内存之间的边界修改为“软”边界,即任何一方可以向另一方借用空闲的内存。
UnifiedMemoryManager中的4块内存池相比较MemoryManager进行了调整。
UnifiedMemoryManager中的4块内存池上图中将 onHeapStorageMemoryPool 和 onHeapExecutionMemoryPool 合在一起,将堆内存作为一个整体看待。即存储方或计算的空闲空间(即memoryFree表示的区域)都可以供给另一方使用。(即软边界)
接下来,我们来看UnifiedMemoryManager对父类方法的实现。
从代码可以看出这里的最大存储堆内存等于总的堆内存 减去 已经使用的用于执行计算的堆内存。(软边界,类似于共享内存了)
为存储BlockId对应的Block,从堆内存或堆外内存获取所需大小(numsBytes)的内存。
override def acquireStorageMemory( blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized { assertInvariants() assert(numBytes >= 0) // 获取内存模式对应的计算内存池、存储内存池和可以存储的最大大小 val (executionPool, storagePool, maxMemory) = memoryMode match { // 模式匹配 case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, maxOnHeapStorageMemory) case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, maxOffHeapMemory) } if (numBytes > maxMemory) { // 所需空间大于可以存储的最大空间,返回false // Fail fast if the block simply won't fit logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + s"memory limit ($maxMemory bytes)") return false } if (numBytes > storagePool.memoryFree) { // 小于最大空间,同时大于存储池的空闲空间,则到计算内存池去借用空间 // There is not enough free memory in the storage pool, so try to borrow free memory from // the execution pool. val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes) executionPool.decrementPoolSize(memoryBorrowedFromExecution) storagePool.incrementPoolSize(memoryBorrowedFromExecution) } storagePool.acquireMemory(blockId, numBytes) }为展开BlockId对应的Block,从堆内存或堆外内存获取所需大小。(实质上就是对acquireStorageMemory的调用)