在上一篇文章中,我们对块管理器BlockManager的一部分方法进行了介绍,本文将介绍剩余部分。
此方法用于任务尝试线程对持有的所有Block的锁进行释放。
def releaseAllLocksForTask(taskAttemptId: Long): Seq[BlockId] = { blockInfoManager.releaseAllLocksForTask(taskAttemptId) // 就是调用BlockInfoManager的方法 }用于获取Block。如果Block存在,则获取此Block并返回BlockResult,否则调用makeIterator方法计算Block,并持久化后返回BlockResult或Iterator。
def getOrElseUpdate[T]( blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { // Attempt to read the block from local or remote storage. If it's present, then we don't need // to go through the local-get-or-put path. get[T](blockId)(classTag) match { case Some(block) => return Left(block) case _ => // Need to compute the block. } // Initially we hold no locks on this block. doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match { case None => // Block已经成功存储到内存 // doPut() didn't hand work back to us, so the block already existed or was successfully // stored. Therefore, we now hold a read lock on the block. val blockResult = getLocalValues(blockId).getOrElse { // Since we held a read lock between the doPut() and get() calls, the block should not // have been evicted, so get() not returning the block indicates some internal error. releaseLock(blockId) throw new SparkException(s"get() failed for block $blockId even though we held a lock") } // We already hold a read lock on the block from the doPut() call and getLocalValues() // acquires the lock again, so we need to call releaseLock() here so that the net number // of lock acquisitions is 1 (since the caller will only call release() once). releaseLock(blockId) Left(blockResult) case Some(iter) => // Block在存储到内存时发生了错误 // The put failed, likely because the data was too large to fit in memory and could not be // dropped to disk. Therefore, we need to pass the input iterator back to the caller so // that they can decide what to do with the values (e.g. process them without caching). Right(iter) } }此方法用于将Block数据写入存储体系。
def putIterator[T: ClassTag]( blockId: BlockId, values: Iterator[T], level: StorageLevel, tellMaster: Boolean = true): Boolean = { require(values != null, "Values is null") doPutIterator(blockId, () => values, level, implicitly[ClassTag[T]], tellMaster) match { // 调用doPutIterator方法 case None => // 返回None, 说明Block已经成功存储到内存 true case Some(iter) => // 若返回Some(),说明计算得到的Block存储到内存时发生了错误 // Caller doesn't care about the iterator values, so we can close the iterator here // to free resources earlier iter.close() false } }此方法用于创建并获取DiskBlockObjectWriter,通过DiskBlockObjectWriter可以跳过对DiskStore的使用,直接将数据写入磁盘。
def getDiskWriter( blockId: BlockId, file: File, serializerInstance: SerializerInstance, bufferSize: Int, writeMetrics: ShuffleWriteMetricsReporter): DiskBlockObjectWriter = { val syncWrites = conf.get(config.SHUFFLE_SYNC) new DiskBlockObjectWriter(file, serializerManager, serializerInstance, bufferSize, syncWrites, writeMetrics, blockId) // 属性syncWrites将决定DiskBlockObjectWriter将数据写入磁盘时是采取同步方式还是异步方式,默认是异步的 }此方法的作用是获取由单个对象组成的Block。
def getSingle[T: ClassTag](blockId: BlockId): Option[T] = { get[T](blockId).map(_.data.next().asInstanceOf[T]) }此方法用于将由单个对象组成的Block写入存储体系。
def putSingle[T: ClassTag]( blockId: BlockId, value: T, level: StorageLevel, tellMaster: Boolean = true): Boolean = { putIterator(blockId, Iterator(value), level, tellMaster) }此方法用于从内存中删除Block,当Block的存储级别允许写入磁盘,Block将被写入磁盘。此方法主要在内存不足,需要从内存腾出空闲空间时使用。
private[storage] override def dropFromMemory[T: ClassTag]( blockId: BlockId, data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { logInfo(s"Dropping block $blockId from memory") // 确认当前任务尝试线程是否已经持有BlockId对应的写锁 val info = blockInfoManager.assertBlockIsLockedForWriting(blockId) var blockIsUpdated = false val level = info.level // 如果允许,则将Block写入磁盘 if (level.useDisk && !diskStore.contains(blockId)) { logInfo(s"Writing block $blockId to disk") data() match { case Left(elements) => diskStore.put(blockId) { channel => val out = Channels.newOutputStream(channel) serializerManager.dataSerializeStream( blockId, out, elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]]) } case Right(bytes) => diskStore.putBytes(blockId, bytes) } blockIsUpdated = true } // 将Block从内存中删除 val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L val blockIsRemoved = memoryStore.remove(blockId) if (blockIsRemoved) { blockIsUpdated = true } else { logWarning(s"Block $blockId could not be dropped from memory as it does not exist") } val status = getCurrentBlockStatus(blockId, info) // 获取当前Block的状态 if (info.tellMaster) { reportBlockStatus(blockId, status, droppedMemorySize) // 向BlockManagerMaster报告Block状态 } if (blockIsUpdated) { addUpdatedBlockStatusToTaskMetrics(blockId, status) // 更新任务度量信息 } status.storageLevel // 返回Block的存储级别 }此方法的作用为删除指定的Block。
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { logDebug(s"Removing block $blockId") blockInfoManager.lockForWriting(blockId) match { // 获取指定Block的写锁 case None => // The block has already been removed; do nothing. logWarning(s"Asked to remove block $blockId, which does not exist") case Some(info) => // 从存储体系中删除Block removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster) addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty) } }此方法的作用为移除指定Rdd的所有Block。
def removeRdd(rddId: Int): Int = { // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks. logInfo(s"Removing RDD $rddId") val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId) // 从entries中找出所有的RDDBlockId blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) } // 遍历删除所有找出的BlockId blocksToRemove.size }此方法的作用为移除属于指定的Broadcast的所有Block。可以看出其实现与2.22 相似。
def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = { logDebug(s"Removing broadcast $broadcastId") val blocksToRemove = blockInfoManager.entries.map(_._1).collect { case bid @ BroadcastBlockId(`broadcastId`, _) => bid } blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) } blocksToRemove.size }