MemoryStore负责将Block存储到内存。Spark通过将广播数据、RDD、Shuffle数据存储到内存,减少了对磁盘I/O的依赖,提高了程序的读写效率。
MemoryStore依赖于MemoryManager的服务。
Block再内存中是以什么形式存在呢?Spark将内存中的Block抽象为特质MemoryEntry,其定义如下。
private sealed trait MemoryEntry[T] { def size: Long // 当前Block的大小 def memoryMode: MemoryMode // 存入内存的内存模式 def classTag: ClassTag[T] // Block的类型标记 }MemoryEntry有两个实现类,他们的实现如下。
private case class DeserializedMemoryEntry[T]( // 表示反序列化后的MemoryEntry value: Array[T], size: Long, classTag: ClassTag[T]) extends MemoryEntry[T] { val memoryMode: MemoryMode = MemoryMode.ON_HEAP } private case class SerializedMemoryEntry[T]( // 表示序列化后的MemoryEntry buffer: ChunkedByteBuffer, memoryMode: MemoryMode, classTag: ClassTag[T]) extends MemoryEntry[T] { def size: Long = buffer.size }BlockManager实现了特质BlockEvictionHandler,并重写了dropFromMemory方法,BlockManager在构造MemoryStore时,将自身的引用作为blockEvictionHandler参数传递给MemoryStore的构造器,因而BlockEvictionHandler就是BlockManager。BlockManager及BlockManager重写的dropFromMemory方法的实现都将在 spark源码阅读-存储体系7中讲述。
entries:块ID与对应的MemoryEntry的映射关系,用LinkedHashMap结构存储,初始容量为32,负载因子0.75。onHeapUnrollMemoryMap/offHeapUnrollMemoryMap:分别存储TaskAttempId与该Task在堆内、堆外内存占用的展开内存大小映射关系。unrollMemoryThreshold:在展开块之前申请的初始展开内存大小,由spark.storage.unrollMemoryThreshold配置项来控制,默认1MB。除此之外,还有一些方法对MemoryStore的模型提供了概念上的描述:
maxMemory:堆内与堆外存储内存之和。如果内存管理器为StaticMemoryManager,该值为定值;如果内存管理器为UnifiedMemoryManager,该值会浮动。memoryUsed:已经使用了的堆内与堆外存储内存之和。currentUnrollMemory:当前展开内存占用的大小,由上面的onHeapUnrollMemoryMap/offHeapUnrollMemoryMap统计而来。blocksMemoryUsed:当前除展开内存之外的存储内存(即真正存储块的内存)大小,即memoryUsed与currentUnrollMemory之差。currentUnrollMemoryForThisTak:当前的任务尝试线程用于扩展Block锁占用的内存。numTasksUnrolling:当前使用MemoryStore展开Block任务的数量。根据上文对MemoryStore成员的了解,我们来看看MemoryStore的内存模型。MemoryStore相比于MemoryManager,提供了一种宏观的内存模型,MemoryManager模型的堆内存和堆外内存在MemoryStore的内存模型中是透明的,UnifiedMemoryManager中存储内存与计算内存的“软”边界也是透明的。(这里的透明我的理解是没有任何边界,不知道对不对)
MemoryStore内存模型从图中可以看出,整个MemoryStore的存储分为三块:一块是MemoryStore的entries属性持有的很多MemoryEntry所占据的内存blocksMemoryUsed;一块是onHeapUnrollMemoryMap或offHeapUnrollMemoryMap中展开方式占用的内存currentUnrollMemory。展开Block的行为类似生活中的占座行为,在内存中做一个标记表示正在被使用,防止在写入的时候发现内存不足(这样可以在写入数据之前就发现)
MemoryStore提供了很多方法,便于堆Block数据的存储和读取。
此方法用于获取BlockId对应的MemoryEntry(即Block的内存形式)所占用的大小。
def getSize(blockId: BlockId): Long = { entries.synchronized { // 同步控制 entries.get(blockId).size } }此方法将BlockId对应的Block(已经封装为ChunkedByteBuffer)写入内存。
def putBytes[T: ClassTag]( blockId: BlockId, size: Long, memoryMode: MemoryMode, _bytes: () => ChunkedByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // BlockId对应的Block不存在 if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) { // 获取逻辑内存 // We acquired enough memory for the block, so go ahead and put it val bytes = _bytes() // 获取Block的数据 assert(bytes.size == size) val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]]) entries.synchronized { entries.put(blockId, entry) // 将Block数据写入内存(entries缓存) } logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) true } else { false } }展开尝试执行任务给定的Block(还是不太理解这里的展开到底有什么意义)
def reserveUnrollMemoryForThisTask( blockId: BlockId, memory: Long, memoryMode: MemoryMode): Boolean = { memoryManager.synchronized { val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode) // 获得展开内存 if (success) { val taskAttemptId = currentTaskAttemptId() // 更新taskAttemptId与展开内存大小之间的关系 val unrollMemoryMap = memoryMode match { case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap // 直接内存 } unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory } success // 返回获取成功或失败的信号 } }此方法用于释放任务尝试线程占用的内存,releaseUnrollMemoryForThisTask的实现如下。
def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit = { val taskAttemptId = currentTaskAttemptId() memoryManager.synchronized { val unrollMemoryMap = memoryMode match { case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap } if (unrollMemoryMap.contains(taskAttemptId)) { // 如果线程在展开内存映射中,则进入下一步 val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId)) if (memoryToRelease > 0) { unrollMemoryMap(taskAttemptId) -= memoryToRelease memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode) // 调用MemoryManager的releaseUnrollMemory释放内存 } if (unrollMemoryMap(taskAttemptId) == 0) { unrollMemoryMap.remove(taskAttemptId) // 若需要清楚的内存大小为0,则只需清除映射表中对应的信息 } } } }此方法将BlockId对应的Block(已经转换为Iterator)写入内存。有时候放入内存的Block很大,所以一次性将此对象写入内存可能将引发OOM异常。为了避免这种情况的发生,首先需要 将Block转换为Iterator,然后渐进式地展开此Iterator,并且周期性地检查是否有足够的展开内存。此方法涉及很多变量,为了便于理解,这里先解释这些变量的含义,然后再分析方法实现。
elementsUnrolled:已经展开的元素数量keepUnrolling:MemoryStore是否仍然有足够的内存,以便于继续展Block(即Iterator)initialMemoryThreshold:即unrollMemoryThreshold。用来展开任Block之前,初始请求的内存大小,可以修改属性 spark.storage.unrollMemoryThreshold(默认为1MB)改变大小memoryCheckPeriod:检查内存是否有足够的阀值,此值固定为16。字面上有周期的含义,但是此周期并非指时间,而是已经展开的元素的数量 elementsUnrolled。memoryThreshold:当前任务用于展开Block所保留的内存memoryGrowthFactor:展开内存不充足时,请求增长的因为。此值固定为1.5。unrollMemoryUsedByThisBlock:Block已经使用的展开内存大小,初始大小为initialMemoryThresholdvector:用于追踪Block(即Iterator)每次迭代的数据。 private[storage] def putIteratorAsValues[T]( blockId: BlockId, values: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") var elementsUnrolled = 0 // 已经展开的元素数量 var keepUnrolling = true val initialMemoryThreshold = unrollMemoryThreshold val memoryCheckPeriod = 16 var memoryThreshold = initialMemoryThreshold val memoryGrowthFactor = 1.5 var unrollMemoryUsedByThisBlock = 0L var vector = new SizeTrackingVector[T]()(classTag) keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP) if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") } else { unrollMemoryUsedByThisBlock += initialMemoryThreshold } //不断迭代读取Iterator中的数据,将数据放入追踪器vector中 while (values.hasNext && keepUnrolling) { vector += values.next() if (elementsUnrolled % memoryCheckPeriod == 0) {//周期性地检查 val currentSize = vector.estimateSize() if (currentSize >= memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP) if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest } memoryThreshold += amountToRequest } } elementsUnrolled += 1 } if (keepUnrolling) {//申请到足够多的展开内存,将数据写入内存 val arrayValues = vector.toArray vector = null val entry = new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) val size = entry.size def transferUnrollToStorage(amount: Long): Unit = {//将展开Block的内存转换为存储Block的内存 memoryManager.synchronized { releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount) val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) assert(success, "transferring unroll memory to storage memory failed") } } val enoughStorageMemory = { if (unrollMemoryUsedByThisBlock <= size) { val acquiredExtra = memoryManager.acquireStorageMemory( blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP) if (acquiredExtra) { transferUnrollToStorage(unrollMemoryUsedByThisBlock) } acquiredExtra } else {//当unrollMemoryUsedByThisBlock > size,归还多余的展开内存空间 val excessUnrollMemory = unrollMemoryUsedByThisBlock - size releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) transferUnrollToStorage(size) true } } if (enoughStorageMemory) { entries.synchronized { entries.put(blockId, entry) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) Right(size) } else { assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, "released too much unroll memory") Left(new PartiallyUnrolledIterator( this, MemoryMode.ON_HEAP, unrollMemoryUsedByThisBlock, unrolled = arrayValues.toIterator, rest = Iterator.empty)) } } else { logUnrollFailureMessage(blockId, vector.estimateSize()) Left(new PartiallyUnrolledIterator( this, MemoryMode.ON_HEAP, unrollMemoryUsedByThisBlock, unrolled = vector.iterator, rest = values)) } }此方法以序列化后的字节数组方式,将BlockId对应的Block(已经封装为ChunkedBytesBuffer)。
此方法从内存中读取BlockId对应的Block(已经封装为ChunkedByteBuffer)。
def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { val entry = entries.synchronized { entries.get(blockId) } entry match { case null => None case e: DeserializedMemoryEntry[_] => // 只能获取序列化之后的Block throw new IllegalArgumentException("should only call getBytes on serialized blocks") case SerializedMemoryEntry(bytes, _, _) => Some(bytes) } }此方法用于从内存中读取BlockId对应的Block(已经封装为Iterator)。
def getValues(blockId: BlockId): Option[Iterator[_]] = { val entry = entries.synchronized { entries.get(blockId) } entry match { case null => None case e: SerializedMemoryEntry[_] => // 只能获取没有被序列化的Block throw new IllegalArgumentException("should only call getValues on deserialized blocks") case DeserializedMemoryEntry(values, _, _) => val x = Some(values) x.map(_.iterator) } }此方法用于从内存中移除BlockId对应的Block。
def remove(blockId: BlockId): Boolean = memoryManager.synchronized { val entry = entries.synchronized { entries.remove(blockId) // 将Block从内存移除 } if (entry != null) { entry match { case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() case _ => } // 释放逻辑的存储内存 memoryManager.releaseStorageMemory(entry.size, entry.memoryMode) logDebug(s"Block $blockId of size ${entry.size} dropped " + s"from memory (free ${maxMemory - blocksMemoryUsed})") true // 移除成功 } else { false // 移除失败 } }此方法用于清空MemoryStore。
def clear(): Unit = memoryManager.synchronized { entries.synchronized { entries.clear() } onHeapUnrollMemoryMap.clear() offHeapUnrollMemoryMap.clear() // 清除展开内存 memoryManager.releaseAllStorageMemory() // 清除存储内存 logInfo("MemoryStore cleared") }此方法用于驱逐Block,以便释放一些空间来存储新的Block。
private[spark] def evictBlocksToFreeSpace( blockId: Option[BlockId], space: Long, // 需要驱逐Block锁需要腾出的空间 memoryMode: MemoryMode): Long = { // Block的内存模式 assert(space > 0) memoryManager.synchronized { var freedMemory = 0L // 已经释放的内存大小 val rddToAdd = blockId.flatMap(getRddId) // 将要添加的RDD的RDDBlockId标记。rddToAdd实际是通过对BlockId应用getRddId实现的 val selectedBlocks = new ArrayBuffer[BlockId] // 已经选择的用于驱逐的Block的BlockId数组 def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = { entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) } // This is synchronized to ensure that the set of entries is not changed // (because of getValue or getBytes) while traversing the iterator, as that // can lead to exceptions. entries.synchronized { val iterator = entries.entrySet().iterator() while (freedMemory < space && iterator.hasNext) { // 当已经驱逐的空间小于需要的内存大小时,找出符合要求的Block进行驱逐 val pair = iterator.next() val blockId = pair.getKey val entry = pair.getValue if (blockIsEvictable(blockId, entry)) { // 如果Block是可驱逐的 // We don't want to evict blocks which are currently being read, so we need to obtain // an exclusive write lock on blocks which are candidates for eviction. We perform a // non-blocking "tryLock" here in order to ignore blocks which are locked for reading: if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) { selectedBlocks += blockId freedMemory += pair.getValue.size // freeMemory类似于一个标记 } } } } def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = { val data = entry match { case DeserializedMemoryEntry(values, _, _) => Left(values) case SerializedMemoryEntry(buffer, _, _) => Right(buffer) } val newEffectiveStorageLevel = blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag) if (newEffectiveStorageLevel.isValid) { // The block is still present in at least one store, so release the lock // but don't delete the block info blockInfoManager.unlock(blockId) } else { // The block isn't present in any store, so delete the block info so that the // block can be stored again blockInfoManager.removeBlock(blockId) } } if (freedMemory >= space) { // 可供驱逐的总空间大小大于所需空间大小时才执行驱逐 var lastSuccessfulBlock = -1 try { logInfo(s"${selectedBlocks.size} blocks selected for dropping " + s"(${Utils.bytesToString(freedMemory)} bytes)") (0 until selectedBlocks.size).foreach { idx => val blockId = selectedBlocks(idx) val entry = entries.synchronized { entries.get(blockId) } // This should never be null as only one task should be dropping // blocks and removing entries. However the check is still here for // future safety. if (entry != null) { dropBlock(blockId, entry) afterDropAction(blockId) } lastSuccessfulBlock = idx } logInfo(s"After dropping ${selectedBlocks.size} blocks, " + s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}") freedMemory } finally { // like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal // with InterruptedException if (lastSuccessfulBlock != selectedBlocks.size - 1) { // the blocks we didn't process successfully are still locked, so we have to unlock them (lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx => val blockId = selectedBlocks(idx) blockInfoManager.unlock(blockId) } } } } else { // 可供驱逐的总空间大小要小于所需空间,则释放原本准备要清除的各个Block的写锁 blockId.foreach { id => logInfo(s"Will not store $id") } selectedBlocks.foreach { id => blockInfoManager.unlock(id) } 0L } } }此方法判断本地是否包含给定的BlockId锁对应的Block文件。
def contains(blockId: BlockId): Boolean = { entries.synchronized { entries.containsKey(blockId) } }