spark源码阅读-存储体系2(Block信息管理器)

    技术2023-12-03  98

    摘要

    BlockInfoManeger是BlockManager的子组件之一,其对Block锁的管理采用了共享锁与排他锁,即读锁/写锁。

    1. lockForWriting: 锁定写(获取写锁) 2. get: 获取BlockId对应的BlockInfo. 4. downgradeLock: 锁降级 5. lockNewBlockForWriting: 在写新Block是获取写锁 6. releaseAllLocksForTask: 释放给定的task所占用的所有Block的锁 7. size: 返回info的大小 8. entries: 遍历infos,返回迭代器 9. removeBlock: 移除BlockId对应的BlockInfo. 10. clear: 清除BlockInfoManager中的所有信息, 并通知所有在BlockInfoManager管理的锁上等待的线程。 目录

    BlockInfoManager的确对BlockInfo进行了一些简单的管理,但是BlockInfoManager将主要对Block的锁资源进行管理

    Block锁的基本概念

    BlockInfoManeger是BlockManager的子组件之一,其对Block锁的管理采用了共享锁与排他锁,即读锁/写锁。

    BlockInfoManager的成员属性

    TaskAttemptId:本质是Long类型的重命名,用来表示一次Task尝试的ID。infos:存储BlockId与BlockInfo的映射关系,这就是为什么BlockInfo结构中并没有包含BlockId对应的字段。writeLocksByTask:存储TaskAttemptId与该Task获取写锁的块之间的映射关系。注意BlockId存储在集合中,也就是说一次Task尝试可以获取0个或多个块的写锁。readLocksByTask:存储TaskAttemptId与该Task获取读锁的块之间的映射关系。一次Task尝试也可以获取0个或多个块的读锁。

    Block锁的实现

    1. lockForWriting: 锁定写(获取写锁)

    def lockForWriting( blockId: BlockId, blocking: Boolean = true): Option[BlockInfo] = synchronized { logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId") do { infos.get(blockId) match { // 从info中获取BlockId对应的BlockInfo case None => return None case Some(info) => if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) { // 如果写锁没有被占用且没有线程正在读取此Block,则尝试持有写锁,并返回,否则下一步 info.writerTask = currentTaskAttemptId writeLocksByTask.addBinding(currentTaskAttemptId, blockId) logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId") return Some(info) } } if (blocking) { wait() // 等待占用写锁的线程释放写锁后唤醒, 有可能出现无限期等待 } } while (blocking) None }

    2. get: 获取BlockId对应的BlockInfo.

    private[storage] def get(blockId: BlockId): Option[BlockInfo] = synchronized { info.get(blockId) }

    3. unlock: 释放BlockId对应Block上的锁

    def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): Unit = synchronized { val taskId = taskAttemptId.getOrElse(currentTaskAttemptId) logTrace(s"Task $taskId releasing lock for $blockId") val info = get(blockId).getOrElse { // 获取BlockId对应的BlockInfo throw new IllegalStateException(s"Block $blockId not found") } if (info.writerTask != BlockInfo.NO_WRITER) { info.writerTask = BlockInfo.NO_WRITER // 释放写锁 writeLocksByTask.removeBinding(taskId, blockId) } else { // 释放读锁 assert(info.readerCount > 0, s"Block $blockId is not locked for reading") info.readerCount -= 1 val countsForTask = readLocksByTask(taskId) val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1 // 如果大于0,表示该线程多次获得了这个读锁 assert(newPinCountForTask >= 0, s"Task $taskId release lock on block $blockId more times than it acquired it") } notifyAll() }

    4. downgradeLock: 锁降级

    def downgradeLock(blockId: BlockId): Unit = synchronized { logTrace(s"Task $currentTaskAttemptId downgrading write lock for $blockId") val info = get(blockId).get // 获取BlockId对应的BlockInfo require(info.writerTask == currentTaskAttemptId, s"Task $currentTaskAttemptId tried to downgrade a write lock that it does not hold on" + s" block $blockId") unlock(blockId) // 释放当前的写锁 val lockOutcome = lockForReading(blockId, blocking = false) // 获取读锁 assert(lockOutcome.isDefined) }

    5. lockNewBlockForWriting: 在写新Block是获取写锁

    def lockNewBlockForWriting( blockId: BlockId, newBlockInfo: BlockInfo): Boolean = synchronized { // 创建新的BlockInfo logTrace(s"Task $currentTaskAttemptId trying to put $blockId") lockForReading(blockId) match { // 获取写锁 case Some(info) => // 说明此BlockId对应的Block已经存在 // Block already exists. This could happen if another thread races with us to compute // the same block. In this case, just keep the read lock and return. false // 不必再获取写锁, 返回false case None => // Block does not yet exist or is removed, so we are free to acquire the write lock infos(blockId) = newBlockInfo lockForWriting(blockId) // 获取写锁 true } }

    6. releaseAllLocksForTask: 释放给定的task所占用的所有Block的锁

    def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = synchronized { val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]() // val readLocks = readLocksByTask.remove(taskAttemptId).getOrElse(ImmutableMultiset.of[BlockId]()) val writeLocks = writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty) // 从writeLocksByTask 中删除对应的任务尝试标识 for (blockId <- writeLocks) { // 遍历释放锁 infos.get(blockId).foreach { info => assert(info.writerTask == taskAttemptId) info.writerTask = BlockInfo.NO_WRITER // 释放写锁 } blocksWithReleasedLocks += blockId } readLocks.entrySet().iterator().asScala.foreach { entry => val blockId = entry.getElement val lockCount = entry.getCount blocksWithReleasedLocks += blockId get(blockId).foreach { info => info.readerCount -= lockCount assert(info.readerCount >= 0) } } notifyAll() blocksWithReleasedLocks }

    7. size: 返回info的大小

    8. entries: 遍历infos,返回迭代器

    9. removeBlock: 移除BlockId对应的BlockInfo.

    def removeBlock(blockId: BlockId): Unit = synchronized { logTrace(s"Task $currentTaskAttemptId trying to remove block $blockId") infos.get(blockId) match { // 获取BlockId对应的BlockInfo case Some(blockInfo) => if (blockInfo.writerTask != currentTaskAttemptId) { // 只有正在写入的任务尝试线程是当前线程,当前线程才有权力取移除BlockInfo, 否则抛出异常 throw new IllegalStateException( s"Task $currentTaskAttemptId called remove() on block $blockId without a write lock") } else { infos.remove(blockId) // 移除BlockInfo blockInfo.readerCount = 0 // 将BlockInfo的读线程数清0 blockInfo.writerTask = BlockInfo.NO_WRITER // 将BlockInfo的writerTask置为NO_WRITER writeLocksByTask.removeBinding(currentTaskAttemptId, blockId) // 解除任务尝试线程与BlockId的绑定 } case None => throw new IllegalArgumentException( s"Task $currentTaskAttemptId called remove() on non-existent block $blockId") } notifyAll() // 通知所有在BlockId对应的Block的锁上等待的线程 }

    10. clear: 清除BlockInfoManager中的所有信息, 并通知所有在BlockInfoManager管理的锁上等待的线程。

    Processed: 0.019, SQL: 9