BlockManagerMaster的作用是对存在于Executor或Driver上的BlockManager进行统一管理。Executor与Driver关于BlockManager的交互都依赖于BlockManagerMaster,比如Exector需要向Driver发送注册BlockManager、更新Executor上的Block最新信息、询问所需要的Block目前所在位置及当Executor运行结束需要将此Executor移除等。
在Spark执行环境一节中有介绍过,Driver上的BlockManagerMaster会实例化并且注册BlockManagerMasterEndpoint。无论是Driver还是Executor,它们的BlockManagerMaster的driverEndpoint属性都将持有BlockManagerMasterEndpoint的RpcEndpiointRef。无论是Driver还是Executor,每个BlockManager都拥有自己的BlockManagerSlaveEndpoint,且BlockManager的slaveEndpoint属性保存着各自BlockManagerSlaveEndpoint的RpcEndpointRef。BlockManagerMaster负责发送消息,BlockManagerMasterEndpoint负责消息的接收与处理,BlockManagerSlaveEndpoint则接收BlockManagerMasterEndpoint下发的命令。
BlockManagerMaster负责发送各种与存储体系相关的消息,这些消息的类型包括:(事件消息和状态消息)
RemoveExecutor(移除Executor)RegisterBlockManager(注册BlockManager)UpdateBlockInfo(更新Block信息)GetLocations(获取Block的位置)GetLocationsMultipleBlockIds(获取多个Block的位置)GetPeers(获取其它BlockManager的BlockManagerId)GetExecutorEndpointRef(获取Executor的EndpointRef引用)RemoveBlock(移除Block)RemoveRdd(移除Rdd Block)RemoveShuffle(移除Shuffle Block)RemoveBroadcast(移除Broadcast Block)GetMemoryStatus(获取指定的BlockManager的内存状态)GetStorageStatus(获取存储状态)GetMatchingBlockIds(获取匹配过滤条件的Block)HasCachedBlocks(指定的Executor上是否有缓存的Block)StopBlockManagerMaster(停止BlockManagerMaster)可以看到,BlockManagerMaster能够发送的消息类型多种多样,为了更容易理解,以RegisterBlockManager为例:
def registerBlockManager( id: BlockManagerId, localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { logInfo(s"Registering BlockManager $id") val updatedId = driverEndpoint.askSync[BlockManagerId]( RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) // 使用askSync方法向BlockManagerMaster发送消息 logInfo(s"Registered BlockManager $updatedId") updatedId }BlockManagerMasterEndpoint接收Driver或Executor上BlockManagerMaster发送的消息,对所有的BlockManager进行统一管理。BlockManagerMasterEndpoint定义了一些管理BlockManager的属性。
blockManagerInfo:BlockManagerId与BlockManagerInfo之间映射关系的缓存blockManagerIdByExecutor:Executor ID 与 BlockManagerId之间映射关系的缓存blockLocations:BlockId与存储了此BlockId对应Block的BlockManager的BlockManagerId之间的一对多关系缓存topologyMapper:对集群所有节点的拓扑结构的映射BlockManagerMasterEndpoint重写了特质RpcEndpoint的receiveAndReply方法,用于接收BlockManager相关的消息:
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) case _updateBlockInfo @ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => val isSuccess = updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) context.reply(isSuccess) // SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo // returns false since the block info would be updated again later. if (isSuccess) { listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo))) } case GetLocations(blockId) => context.reply(getLocations(blockId)) case GetLocationsAndStatus(blockId, requesterHost) => context.reply(getLocationsAndStatus(blockId, requesterHost)) case GetLocationsMultipleBlockIds(blockIds) => context.reply(getLocationsMultipleBlockIds(blockIds)) case GetPeers(blockManagerId) => context.reply(getPeers(blockManagerId)) case GetExecutorEndpointRef(executorId) => context.reply(getExecutorEndpointRef(executorId)) case GetMemoryStatus => context.reply(memoryStatus) case GetStorageStatus => context.reply(storageStatus) case GetBlockStatus(blockId, askSlaves) => context.reply(blockStatus(blockId, askSlaves)) case IsExecutorAlive(executorId) => context.reply(blockManagerIdByExecutor.contains(executorId)) case GetMatchingBlockIds(filter, askSlaves) => context.reply(getMatchingBlockIds(filter, askSlaves)) case RemoveRdd(rddId) => context.reply(removeRdd(rddId)) case RemoveShuffle(shuffleId) => context.reply(removeShuffle(shuffleId)) case RemoveBroadcast(broadcastId, removeFromDriver) => context.reply(removeBroadcast(broadcastId, removeFromDriver)) case RemoveBlock(blockId) => removeBlockFromWorkers(blockId) context.reply(true) case RemoveExecutor(execId) => removeExecutor(execId) context.reply(true) case StopBlockManagerMaster => context.reply(true) stop() }上述case匹配的消息与上文 讲述BlockManagerMaster的职责时 提到的消息类型一一对应。 根据上述代码清单,在接收到发过来的消息后,将调用register方法。
private def register( idWithoutTopologyInfo: BlockManagerId, localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { // the dummy id is not expected to contain the topology information. // we get that info here and respond back with a more fleshed out block manager id val id = BlockManagerId( // 生成BlockManagerId idWithoutTopologyInfo.executorId, idWithoutTopologyInfo.host, idWithoutTopologyInfo.port, topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host)) val time = System.currentTimeMillis() executorIdToLocalDirs.put(id.executorId, localDirs) if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { case Some(oldId) => // A block manager of the same executor already exists, so remove it (assumed dead) logError("Got two different block manager registrations on same executor - " + s" will replace old one $oldId with new one $id") removeExecutor(id.executorId) // 移除blockManagerIdByExecutor中的缓存信息 case None => } logInfo("Registering block manager %s with %s RAM, %s".format( id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id)) blockManagerIdByExecutor(id.executorId) = id // 将executorId与新创建的BlockManagerId的对应关系添加到blockManagerIdByExecutor中 val externalShuffleServiceBlockStatus = if (externalShuffleServiceRddFetchEnabled) { val externalShuffleServiceBlocks = blockStatusByShuffleService .getOrElseUpdate(externalShuffleServiceIdOnHost(id), new JHashMap[BlockId, BlockStatus]) Some(externalShuffleServiceBlocks) } else { None } blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint, externalShuffleServiceBlockStatus) } listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) // 向listenerBus投递SparkListenerBlockManagerAdded类型的事件,listenerThread线程最终将触发对所有SparkListener的onBlockManagerAdded方法的调用,进而达到监控的目的 id }其中removeExecutor方法的实现如下
private def removeExecutor(execId: String): Unit = { logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.") blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) }BlockManagerSlaveEndpoint用于接收BlockManagerMasterEndpoint的命令并执行相应的操作。BlockManagerSlaveEndpoint也重写了RpcEndpoint的receiveAndReply方法。
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) case _updateBlockInfo @ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => val isSuccess = updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) context.reply(isSuccess) // SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo // returns false since the block info would be updated again later. if (isSuccess) { listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo))) } case GetLocations(blockId) => context.reply(getLocations(blockId)) case GetLocationsAndStatus(blockId, requesterHost) => context.reply(getLocationsAndStatus(blockId, requesterHost)) case GetLocationsMultipleBlockIds(blockIds) => context.reply(getLocationsMultipleBlockIds(blockIds)) case GetPeers(blockManagerId) => context.reply(getPeers(blockManagerId)) case GetExecutorEndpointRef(executorId) => context.reply(getExecutorEndpointRef(executorId)) case GetMemoryStatus => context.reply(memoryStatus) case GetStorageStatus => context.reply(storageStatus) case GetBlockStatus(blockId, askSlaves) => context.reply(blockStatus(blockId, askSlaves)) case IsExecutorAlive(executorId) => context.reply(blockManagerIdByExecutor.contains(executorId)) case GetMatchingBlockIds(filter, askSlaves) => context.reply(getMatchingBlockIds(filter, askSlaves)) case RemoveRdd(rddId) => context.reply(removeRdd(rddId)) case RemoveShuffle(shuffleId) => context.reply(removeShuffle(shuffleId)) case RemoveBroadcast(broadcastId, removeFromDriver) => context.reply(removeBroadcast(broadcastId, removeFromDriver)) case RemoveBlock(blockId) => removeBlockFromWorkers(blockId) context.reply(true) case RemoveExecutor(execId) => removeExecutor(execId) context.reply(true) case StopBlockManagerMaster => context.reply(true) stop() }