一文搞懂 Flink 处理 Barrier 全过程

    技术2024-03-20  91

    上次我们讲到了 Flink Checkpoint Barrier 全流程 还有 Flink 消费消息的全流程

    分类

    Flink 处理 Barrier 分两种:

    barrier 对齐barrier 不对齐 对应的类 我们就以 BarrierBuffer ( barrier 对齐 ) 为例。
    正文

    关键就是 getNextNonBlocked 方法

    @Override // 从 ResultSubPartition 中获取数据并处理 barrier public BufferOrEvent getNextNonBlocked() throws Exception { while (true) { // process buffered BufferOrEvents before grabbing new ones Optional<BufferOrEvent> next; //barrier block 解除后 currentBuffered 不为 null,其他情况都是 null 了 if (currentBuffered == null) { // 如果当前有堆积的消息,直接从 InputGate 中获取,否则从缓存中获取(通过 CachedBufferBlocker 缓存的数据) // 通过 inputGate 中的 inputChannel 来获取 ResultSubPartition 中的数据 next = inputGate.getNextBufferOrEvent(); }else { //barrier block 解除后 next 中的 value 不为 null next = Optional.ofNullable(currentBuffered.getNext()); if (!next.isPresent()) { //完成缓冲数据的消费 completeBufferedSequence(); return getNextNonBlocked(); } } if (!next.isPresent()) { if (!endOfStream) { // end of input stream. stream continues with the buffered data endOfStream = true; releaseBlocksAndResetBarriers(); return getNextNonBlocked(); } else { // final end of both input and buffered data return null; } } //当 barrier 全部对齐之后,先消费 bufferBlocker 中的 ArrayDeque<BufferOrEvent> currentBuffers 的数据 BufferOrEvent bufferOrEvent = next.get(); if (isBlocked(bufferOrEvent.getChannelIndex())) { // if the channel is blocked, we just store the BufferOrEvent // barrier 对齐 缓存数据 bufferBlocker.add(bufferOrEvent); checkSizeLimit(); } else if (bufferOrEvent.isBuffer()) { return bufferOrEvent; } // 处理 barrier else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) { if (!endOfStream) { // process barriers only if there is a chance of the checkpoint completing //除 trigger task 外的 operator 都是在这里做的 checkpoint 只有通过 processInput 消费到才表示 barrier 经过了上游算子 processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex()); } } else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) { processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent()); } else { if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) { processEndOfPartition(); } return bufferOrEvent; } } }

    当没有发生 barrier 对齐完成 这个动作时,currentBuffered == null,currentBuffered 就是当前要处理的 buffer,当 buffer 是数据的时候它就正常消费数据走 Flink 消费消息的全流程,当遇到 barrier 时,开始处理 barrier

    // 一个 opertor 必须收到从每个 inputchannel 发过来的同一序号的 barrier 之后才能发起本节点的 checkpoint, // 如果有的 channel 的数据处理的快了,那该 barrier 后的数据还需要缓存起来, // 如果有的 inputchannel 被关闭了,那它就不会再发送 barrier 过来了 private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception { final long barrierId = receivedBarrier.getId(); // fast path for single channel cases if (totalNumberOfInputChannels == 1) { if (barrierId > currentCheckpointId) { // new checkpoint currentCheckpointId = barrierId; // 触发 checkpoint notifyCheckpoint(receivedBarrier); } return; } // -- general code path for multiple input channels -- // 大于等于第二次处理 barrier 的时候 if (numBarriersReceived > 0) { // this is only true if some alignment is already progress and was not canceled if (barrierId == currentCheckpointId) { // regular case //阻塞 channelIndex 对应的 channel 其实就是 blockedChannels[channelIndex] = true; onBarrier(channelIndex); }else if (barrierId > currentCheckpointId) { // we did not complete the current checkpoint, another started before LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + "Skipping current checkpoint.", inputGate.getOwningTaskName(), barrierId, currentCheckpointId); // let the task know we are not completing this notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId)); // abort the current checkpoint releaseBlocksAndResetBarriers(); // begin a the new checkpoint beginNewAlignment(barrierId, channelIndex); }else { // ignore trailing barrier from an earlier checkpoint (obsolete now) return; } }else if (barrierId > currentCheckpointId) { // first barrier of a new checkpoint beginNewAlignment(barrierId, channelIndex); }else { // either the current checkpoint was canceled (numBarriers == 0) or // this barrier is from an old subsumed checkpoint return; } // check if we have all barriers - since canceled checkpoints always have zero barriers // this can only happen on a non canceled checkpoint if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) { // actually trigger checkpoint if (LOG.isDebugEnabled()) { LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.", inputGate.getOwningTaskName(), receivedBarrier.getId(), receivedBarrier.getTimestamp()); } releaseBlocksAndResetBarriers(); // 当收到全部的 barrier 之后,就会触发 notifyCheckpoint(), // 该方法又会调用 StreamTask 的 triggerCheckpoint ,和之前的operator是一样的 notifyCheckpoint(receivedBarrier); } }

    numBarriersReceived 的默认值是0,所以第一个 barrier 进来后,会进入 beginNewAlignment 方法

    private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException { currentCheckpointId = checkpointId; //numBarriersReceived++ 并设置 channelIndex 对应的 channel 为 block channel onBarrier(channelIndex); startOfAlignmentTimestamp = System.nanoTime(); if (LOG.isDebugEnabled()) { LOG.debug("{}: Starting stream alignment for checkpoint {}.", inputGate.getOwningTaskName(), checkpointId); } }

    当再有其他相同的 barrier 进入时,barrierId == currentCheckpointId 为 true,直到 numBarriersReceived + numClosedChannels == totalNumberOfInputChannels 时,触发 notifyCheckpoint,并报告 alignment buffer 以及 alignment time。(彩蛋: 稍后会更新 checkpoint 全流程欢迎关注 )。

    如果其他的 channel 中的 barrier 延迟了,即 numBarriersReceived + numClosedChannels != totalNumberOfInputChannels,已经 receive barrier 对应的 channel 数据会进入 bufferBlocker。 bufferBlocker 是通过 ArrayDeque currentBuffers 来存储数据的,也就是说默认情况下 bufferBlocker.currentBuffers 会无限增大。

    当 numBarriersReceived + numClosedChannels == totalNumberOfInputChannels 时,会先进行 releaseBlocksAndResetBarriers() 在进行 notifyCheckpoint。 releaseBlocksAndResetBarriers 主要的目的是要先消费已加入缓存中的数据。

    /** * Releases the blocks on all channels and resets the barrier count. * Makes sure the just written data is the next to be consumed. */ // 将 bufferBlocker 里面缓存的数据 bufferOrEvent 赋值给 currentBuffered private void releaseBlocksAndResetBarriers() throws IOException { LOG.debug("{}: End of stream alignment, feeding buffered data back.", inputGate.getOwningTaskName()); for (int i = 0; i < blockedChannels.length; i++) { blockedChannels[i] = false; } if (currentBuffered == null) { // common case: no more buffered data currentBuffered = bufferBlocker.rollOverReusingResources(); if (currentBuffered != null) { currentBuffered.open(); } }else { // uncommon case: buffered data pending // push back the pending data, if we have any LOG.debug("{}: Checkpoint skipped via buffered data:" + "Pushing back current alignment buffers and feeding back new alignment data first.", inputGate.getOwningTaskName()); // since we did not fully drain the previous sequence, we need to allocate a new buffer for this one BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources(); if (bufferedNow != null) { bufferedNow.open(); queuedBuffered.addFirst(currentBuffered); numQueuedBytes += currentBuffered.size(); currentBuffered = bufferedNow; } } if (LOG.isDebugEnabled()) { LOG.debug("{}: Size of buffered data: {} bytes", inputGate.getOwningTaskName(), currentBuffered == null ? 0L : currentBuffered.size()); } // the next barrier that comes must assume it is the first numBarriersReceived = 0; if (startOfAlignmentTimestamp > 0) { latestAlignmentDurationNanos = System.nanoTime() - startOfAlignmentTimestamp; startOfAlignmentTimestamp = 0; } }

    当执行完 releaseBlocksAndResetBarriers 方法时,currentBuffered!=null 了,会进入

    //barrier block 解除后 next 中的 value 不为 null next = Optional.ofNullable(currentBuffered.getNext());

    然后直接消费数据

    //当 barrier 全部对齐之后,先消费 bufferBlocker 中的 ArrayDeque<BufferOrEvent> currentBuffers 的数据 BufferOrEvent bufferOrEvent = next.get(); if (isBlocked(bufferOrEvent.getChannelIndex())) { // if the channel is blocked, we just store the BufferOrEvent // barrier 对齐 缓存数据 bufferBlocker.add(bufferOrEvent); checkSizeLimit(); } else if (bufferOrEvent.isBuffer()) { return bufferOrEvent; }

    一直消费缓存中的数据( 此过程会阻塞不会继续消费 inputGate 中的数据),直至消耗完成

    next = Optional.ofNullable(currentBuffered.getNext()); if (!next.isPresent()) { //完成缓冲数据的消费 completeBufferedSequence(); return getNextNonBlocked(); }

    完成了之后,就跟程序第一次运行至此一样,循环往复。

    总结

    Processed: 0.012, SQL: 9