DiskBlockManager是存储体系的成员之一,它负责为逻辑的Block与数据写入磁盘的位置之间建立逻辑的映射关系。对于不了解的事务,我们应该先看看它定义了哪些属性。DiskBlockManager的属性如下。
conf:即sparkconf。deleteFilesOnStop:停止DiskBlockManager的时候是否删除本地目录的布尔类型标记。当不指定外部的ShuffleClient(即spark.shuffle.service.enable属性为false)或当前实例是Driver时,此属性为true。localDirs:本地目录的数组。localDirs 就 DiskBlockManager 管理的本地目录数组。localDirs 是通过调用 createLocalDirs方法创建的本地目录数组,其实质是调用了Utils工具类的 getConfiguredLocalDirs方法获取本地路径(getConfigguredLocalDirs方法默认获取 spark.local.dir 属性或者系统属性 java.io.tmpdir指定的目录,目录可能有多个),并在每个路径下创建以 blockmgr- 为前缀,UUID为后缀的随机字符串的子目录。
//org.apache.spark.storage.DiskBlockManager private def createLocalDirs(conf: SparkConf): Array[File] = { Utils.getConfiguredLocalDirs(conf).flatMap { rootDir => try { val localDir = Utils.createDirectory(rootDir, "blockmgr") logInfo(s"Created local directory at $localDir") Some(localDir) } catch { case e: IOException => logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e) None } } } subDirsPerLocalDir:磁盘存储DiskStore的本地子目录的数量。可以通过spark.diskStore.subDirectories属性配置,默认为64.subDirs:DiskStore的本地子目录的二维数组。shutdownHook:此属性的作用时初始化DiskBlockManager时,调用addShutdownHook方法(见代码清单6-19),为DiskBlockManager设置号关闭钩子。有了对localDirs的初步了解,我们来看DiskBlockManager的文件目录结构图。
图中一级目录名称都采用了简单的示意表示,例如Blockmgr-UUID-0、Blockmgr-UUID-1、Blockmgr-UUID-2,代表每个文件夹名称由Blockmgr- 和 UUID 生成的随机串缓存,且此随机串不相同,这里使用N代表subDirsPerLocalDir属性的大小。每个二级目录下都N个二级目录(这里的N即subDirsPerLocalDir属性的大小),有些二级目录下可能暂时还没有Block文件。
根据指定的filename获取文件。
//org.apache.spark.storage.DiskBlockManager def getFile(filename: String): File = { val hash = Utils.nonNegativeHash(filename) // 调用nonNegativeHash获取文件名的非负hash值 val dirId = hash % localDirs.length // 用取余方法获得选中的一级目录 val subDirId = (hash / localDirs.length) % subDirsPerLocalDir // 获取余数作为选中的二级目录 val subDir = subDirs(dirId).synchronized { // 获取二级目录,若不存在则创建 val old = subDirs(dirId)(subDirId) if (old != null) { old } else { val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) if (!newDir.exists() && !newDir.mkdir()) { throw new IOException(s"Failed to create local dir in $newDir.") } subDirs(dirId)(subDirId) = newDir newDir // 返回耳机目录下的文件 } }此方法用于检查本地localDirs目录中是否含有BlockId对应的文件
def containsBlock(blockId: BlockId): Boolean = { getFile(blockId.name).exists() }此方法用于获取本地localDirs目录中的所有文件。
def getAllFiles(): Seq[File] = { subDirs.flatMap { dir => dir.synchronized { dir.clone() // 同步加克隆,保证线程安全 } }.filter(_ != null).flatMap { dir => val files = dir.listFiles() if (files != null) files else Seq.empty } }此方法用于获取本地localDirs目录中所有Block的BlockId。
def getAllBlocks(): Seq[BlockId] = { getAllFiles().map(f => BlockId(f.getName)) }此方法用于为中间结果创建唯一的BlockId和文件,此文件用于保存本地Block数据。
def createTempLocalBlock(): (TempLocalBlockId, File) = { var blockId = new TempLocalBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { blockId = new TempLocalBlockId(UUID.randomUUID()) } (blockId, getFile(blockId)) }此方法创建唯一的BlockId和文件,用来存储Shuffle的中间结果(即map任务的输出)。
def createTempShuffleBlock(): (TempShuffleBlockId, File) = { var blockId = new TempShuffleBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { blockId = new TempShuffleBlockId(UUID.randomUUID()) } (blockId, getFile(blockId)) }此方法用于正常停止DiskBlockManager。
private[spark] def stop() { // Remove the shutdown hook. It causes memory leaks if we leave it around. try { ShutdownHookManager.removeShutdownHook(shutdownHook) // 移除钩子 } catch { case e: Exception => logError(s"Exception while removing shutdown hook.", e) } doStop() }从上述代码可以看出,stop实际执行的方法是doStop,其实现如下。
private def doStop(): Unit = { if (deleteFilesOnStop) { localDirs.foreach { localDir => // 遍历一级目录 if (localDir.isDirectory() && localDir.exists()) { try { if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) { Utils.deleteRecursively(localDir) // 调用deleteRecursively递归删除一级目录下的所有内容 } } catch { case e: Exception => logError(s"Exception while deleting local spark dir: $localDir", e) } } } } }