摘要
BlockInfoManeger是BlockManager的子组件之一,其对Block锁的管理采用了共享锁与排他锁,即读锁/写锁。
1. lockForWriting: 锁定写(获取写锁) 2. get: 获取BlockId对应的BlockInfo. 4. downgradeLock: 锁降级 5. lockNewBlockForWriting: 在写新Block是获取写锁 6. releaseAllLocksForTask: 释放给定的task所占用的所有Block的锁 7. size: 返回info的大小 8. entries: 遍历infos,返回迭代器 9. removeBlock: 移除BlockId对应的BlockInfo. 10. clear: 清除BlockInfoManager中的所有信息, 并通知所有在BlockInfoManager管理的锁上等待的线程。
目录
BlockInfoManager的确对BlockInfo进行了一些简单的管理,但是BlockInfoManager将主要对Block的锁资源进行管理。
Block锁的基本概念
BlockInfoManeger是BlockManager的子组件之一,其对Block锁的管理采用了共享锁与排他锁,即读锁/写锁。
BlockInfoManager的成员属性
TaskAttemptId:本质是Long类型的重命名,用来表示一次Task尝试的ID。
infos:存储BlockId与BlockInfo的映射关系,这就是为什么BlockInfo结构中并没有包含BlockId对应的字段。writeLocksByTask:存储TaskAttemptId与该Task获取写锁的块之间的映射关系。注意BlockId存储在集合中,也就是说一次Task尝试可以获取0个或多个块的写锁。readLocksByTask:存储TaskAttemptId与该Task获取读锁的块之间的映射关系。一次Task尝试也可以获取0个或多个块的读锁。
Block锁的实现
1. lockForWriting: 锁定写(获取写锁)
def lockForWriting(
blockId: BlockId,
blocking: Boolean = true): Option[BlockInfo] = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId")
do {
infos.get(blockId) match { // 从info中获取BlockId对应的BlockInfo
case None => return None
case Some(info) =>
if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) {
// 如果写锁没有被占用且没有线程正在读取此Block,则尝试持有写锁,并返回,否则下一步
info.writerTask = currentTaskAttemptId
writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
return Some(info)
}
}
if (blocking) {
wait() // 等待占用写锁的线程释放写锁后唤醒, 有可能出现无限期等待
}
} while (blocking)
None
}
2. get: 获取BlockId对应的BlockInfo.
private[storage] def get(blockId: BlockId): Option[BlockInfo] = synchronized {
info.get(blockId)
}
3. unlock: 释放BlockId对应Block上的锁
def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): Unit = synchronized {
val taskId = taskAttemptId.getOrElse(currentTaskAttemptId)
logTrace(s"Task $taskId releasing lock for $blockId")
val info = get(blockId).getOrElse { // 获取BlockId对应的BlockInfo
throw new IllegalStateException(s"Block $blockId not found")
}
if (info.writerTask != BlockInfo.NO_WRITER) {
info.writerTask = BlockInfo.NO_WRITER // 释放写锁
writeLocksByTask.removeBinding(taskId, blockId)
} else { // 释放读锁
assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
info.readerCount -= 1
val countsForTask = readLocksByTask(taskId)
val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1 // 如果大于0,表示该线程多次获得了这个读锁
assert(newPinCountForTask >= 0,
s"Task $taskId release lock on block $blockId more times than it acquired it")
}
notifyAll()
}
4. downgradeLock: 锁降级
def downgradeLock(blockId: BlockId): Unit = synchronized {
logTrace(s"Task $currentTaskAttemptId downgrading write lock for $blockId")
val info = get(blockId).get // 获取BlockId对应的BlockInfo
require(info.writerTask == currentTaskAttemptId,
s"Task $currentTaskAttemptId tried to downgrade a write lock that it does not hold on" +
s" block $blockId")
unlock(blockId) // 释放当前的写锁
val lockOutcome = lockForReading(blockId, blocking = false) // 获取读锁
assert(lockOutcome.isDefined)
}
5. lockNewBlockForWriting: 在写新Block是获取写锁
def lockNewBlockForWriting(
blockId: BlockId,
newBlockInfo: BlockInfo): Boolean = synchronized { // 创建新的BlockInfo
logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
lockForReading(blockId) match { // 获取写锁
case Some(info) => // 说明此BlockId对应的Block已经存在
// Block already exists. This could happen if another thread races with us to compute
// the same block. In this case, just keep the read lock and return.
false // 不必再获取写锁, 返回false
case None =>
// Block does not yet exist or is removed, so we are free to acquire the write lock
infos(blockId) = newBlockInfo
lockForWriting(blockId) // 获取写锁
true
}
}
6. releaseAllLocksForTask: 释放给定的task所占用的所有Block的锁
def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = synchronized {
val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]() //
val readLocks = readLocksByTask.remove(taskAttemptId).getOrElse(ImmutableMultiset.of[BlockId]())
val writeLocks = writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty) // 从writeLocksByTask 中删除对应的任务尝试标识
for (blockId <- writeLocks) { // 遍历释放锁
infos.get(blockId).foreach { info =>
assert(info.writerTask == taskAttemptId)
info.writerTask = BlockInfo.NO_WRITER // 释放写锁
}
blocksWithReleasedLocks += blockId
}
readLocks.entrySet().iterator().asScala.foreach { entry =>
val blockId = entry.getElement
val lockCount = entry.getCount
blocksWithReleasedLocks += blockId
get(blockId).foreach { info =>
info.readerCount -= lockCount
assert(info.readerCount >= 0)
}
}
notifyAll()
blocksWithReleasedLocks
}
7. size: 返回info的大小
8. entries: 遍历infos,返回迭代器
9. removeBlock: 移除BlockId对应的BlockInfo.
def removeBlock(blockId: BlockId): Unit = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to remove block $blockId")
infos.get(blockId) match { // 获取BlockId对应的BlockInfo
case Some(blockInfo) =>
if (blockInfo.writerTask != currentTaskAttemptId) { // 只有正在写入的任务尝试线程是当前线程,当前线程才有权力取移除BlockInfo, 否则抛出异常
throw new IllegalStateException(
s"Task $currentTaskAttemptId called remove() on block $blockId without a write lock")
} else {
infos.remove(blockId) // 移除BlockInfo
blockInfo.readerCount = 0 // 将BlockInfo的读线程数清0
blockInfo.writerTask = BlockInfo.NO_WRITER // 将BlockInfo的writerTask置为NO_WRITER
writeLocksByTask.removeBinding(currentTaskAttemptId, blockId) // 解除任务尝试线程与BlockId的绑定
}
case None =>
throw new IllegalArgumentException(
s"Task $currentTaskAttemptId called remove() on non-existent block $blockId")
}
notifyAll() // 通知所有在BlockId对应的Block的锁上等待的线程
}
10. clear: 清除BlockInfoManager中的所有信息, 并通知所有在BlockInfoManager管理的锁上等待的线程。