spark源码阅读-存储体系7(块管理器BlockManager)1

    技术2023-12-05  84

    BlockManager运行在每个结点上(包括Driver和Executor),提供对本地或远端结点上的内存、磁盘及堆外内存中Block的管理。存储体系从狭义上来说就是指BlockManager,从广义上来说,则包括整个Spark集群中的各个BlockManager、BlockInfoManager、DiskBlockManager、DiskStore、MemoryManager、MemoryStore、对急群众所有的BlockManager进行管理的BlockManagerMaster及各个结点上对外提供Block上传和下载服务的BlockTransferService。

    一、BlockManager的初始化

    每个Driver或Executor在创建自身的SparkEnv时都会创建BlockManager。BlockManager只有在其initialize方法被调用后才能发挥作用。

    def initialize(appId: String): Unit = { blockTransferService.init(this) // 初始化BlockTransferService,即NettyBlockTransferService. externalBlockStoreClient.foreach { blockStoreClient => blockStoreClient.init(appId) } blockReplicationPolicy = { // 设置Block的复制策略 val priorityClass = conf.get(config.STORAGE_REPLICATION_POLICY) val clazz = Utils.classForName(priorityClass) val ret = clazz.getConstructor().newInstance().asInstanceOf[BlockReplicationPolicy] logInfo(s"Using $priorityClass for block replication policy") ret } val id = BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None) val idFromMaster = master.registerBlockManager( id, diskBlockManager.localDirsString, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint) // 生成当前BlockManager的BlockManagerId blockManagerId = if (idFromMaster != null) idFromMaster else id // 生成shuffleServerId shuffleServerId = if (externalShuffleServiceEnabled) { logInfo(s"external shuffle service port = $externalShuffleServicePort") BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) } else { blockManagerId } // Register Executors' configuration with the local shuffle service, if one should exist. if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { registerWithExternalShuffleServer() // 注册外部的Shffle服务,当启用了外部Shuffle服务时将建立一个BlockManager作为ShffleServerId,否则时BlockManager自身的BlockManagerId } hostLocalDirManager = if (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) && !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { externalBlockStoreClient.map { blockStoreClient => new HostLocalDirManager( futureExecutionContext, conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE), blockStoreClient, blockManagerId.host, externalShuffleServicePort) } } else { None } logInfo(s"Initialized BlockManager: $blockManagerId") }

    二、BlockManager提供的方法

    BlockManager提供了很多方法,这些方法将为Spark的存储体系提供支持,所以有必要将他们介绍一番。

    2.1 reregister

    此方法用于向BlockManagerMaster重新注册BlockManager,并向BlockManagerMaster报告所有的Block信息。

    def reregister(): Unit = { // TODO: We might need to rate limit re-registering. logInfo(s"BlockManager $blockManagerId re-registering with master") master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint) // 向BlockManagerMaster注册BlockManager reportAllBlocks() // 报告所有的Block信息 }

    其中报告所有Block信息的reportAllBlocks实现如下

    private def reportAllBlocks(): Unit = { logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.") for ((blockId, info) <- blockInfoManager.entries) { val status = getCurrentBlockStatus(blockId, info) if (info.tellMaster &amp;&amp; !tryToReportBlockStatus(blockId, status)) { logError(s"Failed to report $blockId to master; giving up.") return } } }

    可以看到,reportAllBlocks方法遍历BlockInfoManager管理的所有BlockId与BlockInfo的映射关系,并进行如下操作。

    1)调用getCurrentBlockStatus方法(见下一个代码),获取Block的状态信息BlockStatus。2)如果需要将Block的BlockStatus汇报给BlockManagerMaster,则调用tryToReportBlockStatus方法(见下一个代码),向BlockManagerMaster汇报此Block的状态信息。 private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = { info.synchronized { info.level match { case null => BlockStatus.empty case level => val inMem = level.useMemory &amp;&amp; memoryStore.contains(blockId) val onDisk = level.useDisk &amp;&amp; diskStore.contains(blockId) val deserialized = if (inMem) level.deserialized else false val replication = if (inMem || onDisk) level.replication else 1 val storageLevel = StorageLevel( useDisk = onDisk, useMemory = inMem, useOffHeap = level.useOffHeap, deserialized = deserialized, replication = replication) val memSize = if (inMem) memoryStore.getSize(blockId) else 0L val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L BlockStatus(storageLevel, memSize, diskSize) } } } private def tryToReportBlockStatus( blockId: BlockId, status: BlockStatus, droppedMemorySize: Long = 0L): Boolean = { val storageLevel = status.storageLevel val inMemSize = Math.max(status.memSize, droppedMemorySize) val onDiskSize = status.diskSize master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) }

    2.2 getLocalBytes

    此方法用于从存储体系获取BlockId对应的Block的数据,并封装为ChunkedByteBuffer后返回。(最新的版本中有所改动)

    def getLocalBytes(blockId: BlockId): Option[BlockData] = { logDebug(s"Getting local block $blockId as bytes") assert(!blockId.isShuffle, s"Unexpected ShuffleBlockId $blockId") blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) } } private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): BlockData = { val level = info.level logDebug(s"Level for block $blockId is $level") // In order, try to read the serialized bytes from memory, then from disk, then fall back to // serializing in-memory objects, and, finally, throw an exception if the block does not exist. if (level.deserialized) { // 如果Block没有被序列化,那么按照DiskStore、MemoryStore的顺序、获取Block的数据 // Try to avoid expensive serialization by reading a pre-serialized copy from disk: if (level.useDisk &amp;&amp; diskStore.contains(blockId)) { // Note: we purposely do not try to put the block back into memory here. Since this branch // handles deserialized blocks, this block may only be cached in memory as objects, not // serialized bytes. Because the caller only requested bytes, it doesn't make sense to // cache the block's deserialized objects since that caching may not have a payoff. diskStore.getBytes(blockId) } else if (level.useMemory &amp;&amp; memoryStore.contains(blockId)) { // The block was not found on disk, so serialize an in-memory copy: new ByteBufferBlockData(serializerManager.dataSerializeWithExplicitClassTag( blockId, memoryStore.getValues(blockId).get, info.classTag), true) } else { handleLocalReadFailure(blockId) } } else { // storage level is serialized // 如果Block已经被序列化了,那么按照MemoryStore的顺序,获取Block数据 if (level.useMemory &amp;&amp; memoryStore.contains(blockId)) { new ByteBufferBlockData(memoryStore.getBytes(blockId).get, false) } else if (level.useDisk &amp;&amp; diskStore.contains(blockId)) { val diskData = diskStore.getBytes(blockId) maybeCacheDiskBytesInMemory(info, blockId, level, diskData) .map(new ByteBufferBlockData(_, false)) .getOrElse(diskData) } else { handleLocalReadFailure(blockId) } } } 可以看出getLocalBytes只处理不是ShuffleBlock的Block

    2.3 getLocalBlockData

    此方法用于获取本地Block的数据。

    override def getLocalBlockData(blockId: BlockId): ManagedBuffer = { if (blockId.isShuffle) { shuffleManager.shuffleBlockResolver.getBlockData(blockId) // 如果当前Block是ShuffleBlock,那么调用ShuffleManager...get...获取Block消息 } else { // 当前Block不是ShuffleBlock getLocalBytes(blockId) match { case Some(blockData) => new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true) case None => // If this block manager receives a request for a block that it doesn't have then it's // likely that the master has outdated block statuses for this block. Therefore, we send // an RPC so that this block is marked as being unavailable from this block manager. reportBlockStatus(blockId, BlockStatus.empty) throw new BlockNotFoundException(blockId.toString) } } }

    2.4 getLocalValues

    用于从本地的BlockManager中获取Block数据

    def getLocalValues(blockId: BlockId): Option[BlockResult] = { logDebug(s"Getting local block $blockId") blockInfoManager.lockForReading(blockId) match { // 获取BlockId锁对应Block的读锁 case None => logDebug(s"Block $blockId was not found") None case Some(info) => val level = info.level logDebug(s"Level for block $blockId is $level") val taskContext = Option(TaskContext.get()) if (level.useMemory &amp;&amp; memoryStore.contains(blockId)) { // 优先从MemoryStore中读取Block数据 val iter: Iterator[Any] = if (level.deserialized) { memoryStore.getValues(blockId).get } else { serializerManager.dataDeserializeStream( blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag) } // We need to capture the current taskId in case the iterator completion is triggered // from a different thread which does not have TaskContext set; see SPARK-18406 for // discussion. val ci = CompletionIterator[Any, Iterator[Any]](iter, { releaseLock(blockId, taskContext) }) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) } else if (level.useDisk &amp;&amp; diskStore.contains(blockId)) { // 从磁盘中读取Block数据 val diskData = diskStore.getBytes(blockId) val iterToReturn: Iterator[Any] = { if (level.deserialized) { val diskValues = serializerManager.dataDeserializeStream( blockId, diskData.toInputStream())(info.classTag) maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) } else { val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData) .map { _.toInputStream(dispose = false) } .getOrElse { diskData.toInputStream() } serializerManager.dataDeserializeStream(blockId, stream)(info.classTag) } } val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, { releaseLockAndDispose(blockId, diskData, taskContext) }) Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } else { handleLocalReadFailure(blockId) } } }

    2.5 putBytes

    要介绍putBytes,需要首先介绍doPut。doPut用于执行Block的写入。

    private def doPut[T]( blockId: BlockId, level: StorageLevel, classTag: ClassTag[_], tellMaster: Boolean, keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = { require(blockId != null, "BlockId is null") require(level != null &amp;&amp; level.isValid, "StorageLevel is null or invalid") val putBlockInfo = { val newInfo = new BlockInfo(level, classTag, tellMaster) if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) { // 获取Block的写锁 newInfo } else { logWarning(s"Block $blockId already exists on this machine; not re-adding it") if (!keepReadLock) { // 如果Block已经存在且不需要持有读锁,则需要当前线程释放持有的读锁 // lockNewBlockForWriting returned a read lock on the existing block, so we must free it: releaseLock(blockId) // 释放读锁 } return None } } val startTimeNs = System.nanoTime() var exceptionWasThrown: Boolean = true val result: Option[T] = try { val res = putBody(putBlockInfo) // 调用putBody执行写入 exceptionWasThrown = false if (res.isEmpty) { // 写入成功 // the block was successfully stored if (keepReadLock) { // 需要保持读锁,则将锁降级 blockInfoManager.downgradeLock(blockId) } else { // 释放锁 blockInfoManager.unlock(blockId) } } else { // 写入失败,移除此Block removeBlockInternal(blockId, tellMaster = false) logWarning(s"Putting block $blockId failed") } res } catch { // 写入时发生异常 // Since removeBlockInternal may throw exception, // we should print exception first to show root cause. case NonFatal(e) => logWarning(s"Putting block $blockId failed due to exception $e.") throw e } finally { // This cleanup is performed in a finally block rather than a `catch` to avoid having to // catch and properly re-throw InterruptedException. if (exceptionWasThrown) { // If an exception was thrown then it's possible that the code in `putBody` has already // notified the master about the availability of this block, so we need to send an update // to remove this block location. removeBlockInternal(blockId, tellMaster = tellMaster) // 移除Block // The `putBody` code may have also added a new block status to TaskMetrics, so we need // to cancel that out by overwriting it with an empty block status. We only do this if // the finally block was entered via an exception because doing this unconditionally would // cause us to send empty block statuses for every block that failed to be cached due to // a memory shortage (which is an expected failure, unlike an uncaught exception). addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty) // 更新任务度量信息 } } val usedTimeMs = Utils.getUsedTimeNs(startTimeNs) if (level.replication > 1) { logDebug(s"Putting block ${blockId} with replication took $usedTimeMs") } else { logDebug(s"Putting block ${blockId} without replication took ${usedTimeMs}") } result }

    再来看看putBytes的实现

    def putBytes[T: ClassTag]( blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel, tellMaster: Boolean = true): Boolean = { require(bytes != null, "Bytes is null") val blockStoreUpdater = ByteBufferBlockStoreUpdater(blockId, level, implicitly[ClassTag[T]], bytes, tellMaster) blockStoreUpdater.save() // save方法中再调用putBytes方法 } def save(): Boolean = { doPut(blockId, level, classTag, tellMaster, keepReadLock) { info => val startTimeNs = System.nanoTime() // Since we're storing bytes, initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. val replicationFuture = if (level.replication > 1) { Future { // 创建异步线程,调用replicate方法复制Block数据到其他结点的存储体系中 // This is a blocking action and should run in futureExecutionContext which is a cached // thread pool. replicate(blockId, blockData(), level, classTag) }(futureExecutionContext) } else { null } if (level.useMemory) { // 优先使用内存 // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. val putSucceeded = if (level.deserialized) { saveDeserializedValuesToMemoryStore(blockData().toInputStream()) } else { saveSerializedValuesToMemoryStore(readToByteBuffer()) } if (!putSucceeded &amp;&amp; level.useDisk) { // 内存不足,写入磁盘 logWarning(s"Persisting block $blockId to disk instead.") saveToDiskStore() } } else if (level.useDisk) { // 存储级别为写入磁盘 saveToDiskStore() } val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid if (blockWasSuccessfullyStored) { // Now that the block is in either the memory or disk store, // tell the master about it. info.size = blockSize if (tellMaster &amp;&amp; info.tellMaster) { reportBlockStatus(blockId, putBlockStatus) } addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus) } logDebug(s"Put block ${blockId} locally took ${Utils.getUsedTimeNs(startTimeNs)}") if (level.replication > 1) { // 等待异步的复制线程完成 // Wait for asynchronous replication to finish try { ThreadUtils.awaitReady(replicationFuture, Duration.Inf) } catch { case NonFatal(t) => throw new Exception("Error occurred while waiting for replication to finish", t) } } if (blockWasSuccessfullyStored) { None } else { Some(blockSize) } }.isEmpty }

    2.6 getRemoteBlock

    此方法从远端的BlockManager以序列化的字节形式获取Block数据。

    private def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq[BlockManagerId]] = { val startTimeNs = System.nanoTime() val locations = master.getLocations(blockIds).toArray logDebug(s"Got multiple block location in ${Utils.getUsedTimeNs(startTimeNs)}") locations } private[spark] def getRemoteBlock[T]( blockId: BlockId, bufferTransformer: ManagedBuffer => T): Option[T] = { logDebug(s"Getting remote block $blockId") require(blockId != null, "BlockId is null") // Because all the remote blocks are registered in driver, it is not necessary to ask // all the slave executors to get block status. val locationsAndStatusOption = master.getLocationsAndStatus(blockId, blockManagerId.host) if (locationsAndStatusOption.isEmpty) { logDebug(s"Block $blockId is unknown by block manager master") None } else { val locationsAndStatus = locationsAndStatusOption.get val blockSize = locationsAndStatus.status.diskSize.max(locationsAndStatus.status.memSize) locationsAndStatus.localDirs.flatMap { localDirs => val blockDataOption = readDiskBlockFromSameHostExecutor(blockId, localDirs, locationsAndStatus.status.diskSize) val res = blockDataOption.flatMap { blockData => try { Some(bufferTransformer(blockData)) } catch { case NonFatal(e) => logDebug("Block from the same host executor cannot be opened: ", e) None } } logInfo(s"Read $blockId from the disk of a same host executor is " + (if (res.isDefined) "successful." else "failed.")) res }.orElse { fetchRemoteManagedBuffer(blockId, blockSize, locationsAndStatus).map(bufferTransformer) } } }

    其中fetchRemoteManagedBuffer的实现如下:

    private def fetchRemoteManagedBuffer( blockId: BlockId, blockSize: Long, locationsAndStatus: BlockManagerMessages.BlockLocationsAndStatus): Option[ManagedBuffer] = { // If the block size is above the threshold, we should pass our FileManger to // BlockTransferService, which will leverage it to spill the block; if not, then passed-in // null value means the block will be persisted in memory. val tempFileManager = if (blockSize > maxRemoteBlockToMem) { remoteBlockTempFileManager } else { null } var runningFailureCount = 0 var totalFailureCount = 0 val locations = sortLocations(locationsAndStatus.locations) val maxFetchFailures = locations.size var locationIterator = locations.iterator while (locationIterator.hasNext) { val loc = locationIterator.next() logDebug(s"Getting remote block $blockId from $loc") val data = try { val buf = blockTransferService.fetchBlockSync(loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager) if (blockSize > 0 &amp;&amp; buf.size() == 0) { throw new IllegalStateException("Empty buffer received for non empty block") } buf } catch { case NonFatal(e) => runningFailureCount += 1 totalFailureCount += 1 if (totalFailureCount >= maxFetchFailures) { // Give up trying anymore locations. Either we've tried all of the original locations, // or we've refreshed the list of locations from the master, and have still // hit failures after trying locations from the refreshed list. logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " + s"Most recent failure cause:", e) return None } logWarning(s"Failed to fetch remote block $blockId " + s"from $loc (failed attempt $runningFailureCount)", e) // If there is a large number of executors then locations list can contain a // large number of stale entries causing a large number of retries that may // take a significant amount of time. To get rid of these stale entries // we refresh the block locations after a certain number of fetch failures if (runningFailureCount >= maxFailuresBeforeLocationRefresh) { locationIterator = sortLocations(master.getLocations(blockId)).iterator logDebug(s"Refreshed locations from the driver " + s"after ${runningFailureCount} fetch failures.") runningFailureCount = 0 } // This location failed, so we retry fetch from a different one by returning null here null } if (data != null) { // If the ManagedBuffer is a BlockManagerManagedBuffer, the disposal of the // byte buffers backing it may need to be handled after reading the bytes. // In this case, since we just fetched the bytes remotely, we do not have // a BlockManagerManagedBuffer. The assert here is to ensure that this holds // true (or the disposal is handled). assert(!data.isInstanceOf[BlockManagerManagedBuffer]) return Some(data) } logDebug(s"The value of block $blockId is null") } logDebug(s"Block $blockId not found") None }

    2.7 getStatus

    此方法用于获取Block状态。

    def getStatus(blockId: BlockId): Option[BlockStatus] = { blockInfoManager.get(blockId).map { info => val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L BlockStatus(info.level, memSize = memSize, diskSize = diskSize) } }

    2.8 downgradeLock

    此方法用于将当前线程持有的Block的写锁降级为读锁。

    def downgradeLock(blockId: BlockId): Unit = { blockInfoManager.downgradeLock(blockId) }

    2.9 releaseLock

    此方法用于当前线程对持有的Block的锁进行释放。

    def releaseLock(blockId: BlockId, taskContext: Option[TaskContext] = None): Unit = { val taskAttemptId = taskContext.map(_.taskAttemptId()) // SPARK-27666. When a task completes, Spark automatically releases all the blocks locked // by this task. We should not release any locks for a task that is already completed. if (taskContext.isDefined &amp;&amp; taskContext.get.isCompleted) { logWarning(s"Task ${taskAttemptId.get} already completed, not releasing lock for $blockId") } else { blockInfoManager.unlock(blockId, taskAttemptId) } }

    2.9 registerTask

    将尝试任务线程注册到BlockInfoManager

    def registerTask(taskAttemptId: Long): Unit = { blockInfoManager.registerTask(taskAttemptId) // 实际上代理了BlockInfoManager的registerTask方法 }

    To be continued...太长了,分为两篇

    Processed: 0.013, SQL: 9