spark源码阅读-存储体系4(磁盘存储DiskStore)

    技术2023-12-03  95

    DiskStore负责将Block存储到磁盘。其依赖于DiskBlockManager的服务。在spark1.x.x版本中,BlockStore提供了对磁盘存储DiskStore、内存存储MemoryStore及Tachyon存储TachyonStore的统一规范,上述都是具体的实现。但是从Spark2.0.0版本开始,取消了TachyonStore,取消了统一的接口规范。DiskStore和MemoryStore都是单独实现的。本文介绍DiskStore的实现。

    我们首先从了解DiskStore的属性开始,逐步理解DiskStore发挥的作用,以及其与DiskBlockManager之间的关系。

    conf:即SparkConf。diskManager:即DiskBlockManager。minMemoryMapBytes:读取磁盘中的Block时,是直接读取还是使用FileChannel的内存镜像映射方法

    什么是FileChannel的内存镜像映射方法?在java NIO中,FileChannel的map方法所提供的快速读写技术,其本质是将通道所连接的数据结点中的全部或部分数据直接映射到一个内存buffle中,这个buffer即为映像,直接对这个buffle进行修改会反映到结点数据上。这个buffle叫做MappedBuffle,犹豫是内存镜像,所以特别快。

    DiskStore的实现

    1. getSize

    此方法用于获取BlockId多对应的Block的大小。

    def getSize(blockId: BlockId): Long = blockSizes.get(blockId)

    2. contains

    此方法用于判断本地磁盘存储路径下是否包含给定的BlockId所对应的Block文件。

    def contains(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) file.exists() }

    3. remove

    此方法用于删除指定BlockId对应的Block文件。

    def remove(blockId: BlockId): Boolean = { blockSizes.remove(blockId) val file = diskManager.getFile(blockId.name) if (file.exists()) { val ret = file.delete() if (!ret) { logWarning(s"Error deleting ${file.getPath()}") } ret } else { false } }

    4. put

    此方法用于将BlockId所对应的Block写入磁盘。

    def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = { if (contains(blockId)) { // 调用contains方法判断给定的BlockId所对应的Block文件是否存在,如已存在则抛出异常,不存在进入下一步 throw new IllegalStateException(s"Block $blockId is already present in the disk store") } logDebug(s"Attempting to put block $blockId") val startTimeNs = System.nanoTime() val file = diskManager.getFile(blockId) // 调用DiskBlockManager的getFile获取BlockId对应的Block文件 val out = new CountingWritableChannel(openForWrite(file)) // 打开文件输出流 var threwException: Boolean = true try { writeFunc(out) // 调用writeFunc,对Block文件进行写入 blockSizes.put(blockId, out.getCount) threwException = false } finally { try { out.close() } catch { case ioe: IOException => if (!threwException) { threwException = true throw ioe } } finally { if (threwException) { remove(blockId) // 关闭文件输出流失败时,调用remove删除对应的Block文件 } } } logDebug(s"Block ${file.getName} stored as ${Utils.bytesToString(file.length())} file" + s" on disk in ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms") }

    5. putBytes

    此方法将BlockId所对应的Block写入磁盘,Block的内容已经封装为ChunkedByteBuffle。

    def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { put(blockId) { channel => // 调用put方法,并设置回调函数 bytes.writeFully(channel) // 使用了Utils工具类中的tryWithSafeFinally方法。 } }

    6. getBytes

    此方法读取BlockId对应的Block,并封装为ChunkedByteBuffle(DiskBlockData)返回。(版本更新后有所变化)

    def getBytes(blockId: BlockId): BlockData = { getBytes(diskManager.getFile(blockId.name), getSize(blockId)) } def getBytes(f: File, blockSize: Long): BlockData = securityManager.getIOEncryptionKey() match { case Some(key) => // Encrypted blocks cannot be memory mapped; return a special object that does decryption // and provides InputStream / FileRegion implementations for reading the data. new EncryptedBlockData(f, blockSize, conf, key) case _ => new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, f, blockSize) }

    Processed: 0.010, SQL: 9