Hadoop 3.1.3学习笔记3

    技术2026-03-30  12

    Hadoop 3.1.3学习笔记3

    本节将介绍在丢失数据后的重建(Resctruction)过程中,Hadoop是如何选择源节点和目标节点

    在学习笔记2中介绍了丢失数据的重构过程,其中核心方法为StripedBlockReconstructor的reconstruct方法

    从上一节我们得知,从readMinimumSources中获取了数量等同于数据块数量的datanode作为重建的数据源,reconstructTargets方法开始对数据进行重建。

    private void reconstructTargets(int toReconstructLen) throws IOException { ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen); int[] erasedIndices = stripedWriter.getRealTargetIndices(); ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen); long start = System.nanoTime(); getDecoder().decode(inputs, erasedIndices, outputs); long end = System.nanoTime(); this.getDatanode().getMetrics().incrECDecodingTime(end - start); stripedWriter.updateRealTargetBuffers(toReconstructLen); }

    这里的inputs就是上述数据源,erasedIndices就是丢失的数据块,outputs就是重构后输出的数据,显然,我们一次“重建”的最小单位是bytebuffer,这里的解码过程调用了decode接口,这个方法根据所选的码型(codec)不同对应不同的doDecode方法,例如LRC码:

    protected void doDecode(ByteBufferDecodingState decodingState) throws IOException { CoderUtil.resetOutputBuffers(decodingState.outputs, decodingState.decodeLength); prepareDecoding(decodingState.inputs, decodingState.erasedIndexes); ByteBuffer[] realInputs = new ByteBuffer[numRealInputUnits]; for (int i = 0; i < numRealInputUnits; i++) { realInputs[i] = decodingState.inputs[validIndexes[i]]; } LRCUtil.encodeData(gfTables, realInputs, decodingState.outputs); }

    其中prepareDecoding方法负责获取实际的“数据源”,具体到LRC,当某个block丢失后,我们选取这个block所在局部组的其他block即可将其恢复,具体选择方法可以阅读以下方法:

    private <T> void prepareDecoding(T[] inputs, int[] erasedIndexes) throws IOException { int[] tmpValidIndexes = CoderUtil.getValidIndexes(inputs); // Initialize the number of input units for global recover use this.numRealInputUnits = getNumDataUnits(); int k = getNumDataUnits(); int l = getNumLocalParityUnits(); int r = getNumParityUnits(); int[] tmpRealValidIndexes = new int[getNumDataUnits()]; // Verify if we need to recover locally or globally // when erasedIndexes.length = 1 we only need l of units to recover <==> local recover if (erasedIndexes.length == 1){ if (erasedIndexes[0] < k + l){ // We only need half of data units to recover data this.numRealInputUnits = k / l; // Create a candidate int[] localIndexes = new int[this.numRealInputUnits + 1]; if (erasedIndexes[0] < k / 2 || erasedIndexes[0] == k){ this.localXFlag = true; // Generate a candidate list for local X indexes for (int j = 0; j < this.numRealInputUnits; j++){ localIndexes[j] = j; } localIndexes[this.numRealInputUnits] = k; } // end if the first erased index is in local X part. else{ this.localYFlag = true; // Generate a candidate list for local Y indexes for (int j = 0; j < this.numRealInputUnits; j++){ localIndexes[j] = j + k / 2; } localIndexes[this.numRealInputUnits] = k + 1; } // Select the local indexes from the candidate list tmpRealValidIndexes = new int[this.numRealInputUnits]; int cur = 0; for (int j = 0; j < localIndexes.length; j++) { if (localIndexes[j] != erasedIndexes[0]) { tmpRealValidIndexes[cur++] = localIndexes[j]; } } } // end if erasedIndexes[0] < getNumDataUnits() + 2 else { this.numRealInputUnits = getNumDataUnits(); tmpRealValidIndexes = tmpValidIndexes; } } // end if erasedIndexes.length == 1 else if (erasedIndexes.length < r + l){ this.numRealInputUnits = getNumDataUnits(); int erasedFlag = 0; if (erasedIndexes[0] < k/2 || erasedIndexes[0] == k){ // X region has at least one erased unit erasedFlag = 0; } else if (erasedIndexes[0] < k || erasedIndexes[0] == k + 1){ // Y region has at least one erased unit erasedFlag = 1; } else { erasedFlag = 2; // All erased units are in the global parity region } tmpRealValidIndexes = getGlobalValidIndexes(tmpValidIndexes, this.numRealInputUnits, erasedFlag); } // end if erasedIndexes.length < getNumParityUnits() else { if (erasedIndexesInLocal(erasedIndexes)){ throw new HadoopIllegalArgumentException( "Too many erased in a local part, data not recoverable"); } else { this.numRealInputUnits = getNumDataUnits(); tmpRealValidIndexes = tmpValidIndexes; } } if (Arrays.equals(this.cachedErasedIndexes, erasedIndexes) && Arrays.equals(this.validIndexes, tmpRealValidIndexes)) { return; // Optimization. Nothing to do } this.cachedErasedIndexes = Arrays.copyOf(erasedIndexes, erasedIndexes.length); this.validIndexes = Arrays.copyOf(tmpRealValidIndexes, tmpRealValidIndexes.length); processErasures(erasedIndexes); }

    其余的部分遵循LRC的编解码方法,不再赘述。在这里要特别指出一点,这里的LRC码的实现并没有减少数据读取量,因为依然读取了k个(即与数据块数量相同)bytebuffer的数据,此处需要日后加以修正。

    下面说明修复好的数据如何被放到指定的datanode中。

    在reconstructTargets之后,调用stripedWriter的transferData2Targets方法将修复好的数据发送到制定的datanode中。

    int transferData2Targets() { int nSuccess = 0; for (int i = 0; i < targets.length; i++) { if (targetsStatus[i]) { boolean success = false; try { writers[i].transferData2Target(packetBuf); nSuccess++; success = true; } catch (IOException e) { LOG.warn(e.getMessage()); } targetsStatus[i] = success; } } return nSuccess; }

    我们观察此方法,transferData2Target显然是发送packetBuf大小的数据到目标节点,我们后面再说,此处是否发送取决于targetsStatus,即目标状态,这个数组在stripedWriter的initTargetStreams中被赋值

    int initTargetStreams() { int nSuccess = 0; for (short i = 0; i < targets.length; i++) { try { writers[i] = createWriter(i); nSuccess++; targetsStatus[i] = true; } catch (Throwable e) { LOG.warn(e.getMessage()); } } return nSuccess; }

    从此方法可以看出,targetsStatus的每一位为true还是false取决于createWriter即建立写入流

    private StripedBlockWriter createWriter(short index) throws IOException { return new StripedBlockWriter(this, datanode, conf, reconstructor.getBlock(targetIndices[index]), targets[index], targetStorageTypes[index], targetStorageIds[index]); }

    我们观察StripedBlockWriter构造函数中的init方法

    private void init() throws IOException { Socket socket = null; DataOutputStream out = null; DataInputStream in = null; boolean success = false; try { InetSocketAddress targetAddr = stripedWriter.getSocketAddress4Transfer(target); socket = datanode.newSocket(); NetUtils.connect(socket, targetAddr, datanode.getDnConf().getSocketTimeout()); socket.setTcpNoDelay( datanode.getDnConf().getDataTransferServerTcpNoDelay()); socket.setSoTimeout(datanode.getDnConf().getSocketTimeout()); Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(block, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), new StorageType[]{storageType}, new String[]{storageId}); long writeTimeout = datanode.getDnConf().getSocketWriteTimeout(); OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(socket); DataEncryptionKeyFactory keyFactory = datanode.getDataEncryptionKeyFactoryForBlock(block); IOStreamPair saslStreams = datanode.getSaslClient().socketSend( socket, unbufOut, unbufIn, keyFactory, blockToken, target); unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, DFSUtilClient.getSmallBufferSize(conf))); in = new DataInputStream(unbufIn); DatanodeInfo source = new DatanodeInfoBuilder() .setNodeID(datanode.getDatanodeId()).build(); new Sender(out).writeBlock(block, storageType, blockToken, "", new DatanodeInfo[]{target}, new StorageType[]{storageType}, source, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0, stripedWriter.getChecksum(), stripedWriter.getCachingStrategy(), false, false, null, storageId, new String[]{storageId}); targetSocket = socket; targetOutputStream = out; targetInputStream = in; success = true; } finally { if (!success) { IOUtils.closeStream(out); IOUtils.closeStream(in); IOUtils.closeStream(socket); } } }

    很显然此方法建立了到目标datanode的socket,也即若此处无法连接到目标datanode,则抛出异常,则targetsStatus对应位无法被设为true(Boolean初始值为false)。那么,目标datanode又源自哪里呢?

    stripedWriter的target来自于stripedReconInfo,最终由BlockPlacement的chooseTarget决定。

    Processed: 0.010, SQL: 9