Flink自称是一个低延迟、高吞吐、有状态、容错的流式计算矿建。其中容错机制在之前的两篇介绍Checkpoint机制的文章中已经说明过了,本文主要了解下Flink是如何实现低延迟与高吞吐的。
低延迟和高吞吐其实是悖论。如果要求数据延迟低的话,那么数据肯定是来一条就处理一条,然后马上将数据发送给下游,这样延迟肯定是最低的。但是如果要提高吞吐量的话,不如先缓存一批数据,然后一次性将缓存的数据进行处理然后发送出去这样效率比较高。Flink的低延迟与高吞吐就是取了一个折衷,它设置了一个setBufferTimeout参数,用于控制上游往下游发送数据的频率:
这个参数默认是100ms,即每隔100ms会flush一次所有的channel,将当前Task中的数据发送给下游。
这个参数如果设置成-1,那么就会在Buffer满了或者Checkpoint触发时才会将数据发送到下游,此时能够获得最大的吞吐量。
这个参数如果设置成0,那么每条数据处理完毕之后都会立刻发送到下游,此时能够获得最低的延迟。
在之前的《Flink Job执行流程分析》文章中曾经分析过,Flink Job任务执行的最小单位是Task(一个Task是由多个chain在一起的subTask组成),不Chain在一起的Task之间交换数据流程如下图所示:
图中有几个名词先解释下:
ResultPartition(RP):用于存储Task产生的数据,由多个ResultSubpartition组成。这是为了区分发往不同接收者的数据,例如,在用于reduce或join的分区混洗的情况下。
ResultSubPartition(RS): RS的对应发送给下游不同的Task,有两种实现类:PipelinedSubpartition对应流处理。 BoundedBlockingSubpartition对应批处理。
InputGate:一个InputGate对应上游一个ResultSubPartition。接收方RP的逻辑等效项。
InputChannel: 一个InputChannel对应上游的一个ResultSubpartition。
ConnectiontionManager:每个TM中包含一个(在tasks之间共享)和一个MemoryManager (在tasks之间共享)。TM之间通过TCP连接来交互数据。需要注意的是,在Flink中,数据交换是发生在TM之间的,而不是task之间,在同一个TM中的不同task会复用同一个网络连接
先从输出开始看,这样解释起来清楚点。调试代码可以发现算子的out.collect()方法最终会跑到这儿,前一个类会讲序列化之后的数据发送到下游的各个Channel,后者只会发送到某一个Channel(例如KeyBy算子之后的操作)中:
这里暂时只先看后者(前者对应的应该是Broadcast流之类的):
//emit方法内部如下所示: @Override public void emit(T record) throws IOException, InterruptedException { // selectChannel()方法决定将数据发送到哪个Channel emit(record, channelSelector.selectChannel(record)); } protected void emit(T record, int targetChannel) throws IOException, InterruptedException { checkErroneous(); serializer.serializeRecord(record); // Make sure we don't hold onto the large intermediate serialization buffer for too long // 核心方法是这个 if (copyFromSerializerToTargetChannel(targetChannel)) { serializer.prune(); } }看下copyFromSerializerToTargetChannel()方法,这个方法就是将数据复制到目标Channel中:
/** * @param targetChannel * @return <tt>true</tt> if the intermediate serialization buffer should be pruned */ protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException { // We should reset the initial position of the intermediate serialization buffer before // copying, so the serialization results can be copied to multiple target buffers. serializer.reset(); boolean pruneTriggered = false; // BufferBuilder内部有一个MemorySegment(MemorySegment是Flink管理内存的一种结构)用于存储数据, // 同时内部还有个BufferConsumer,可以用BufferConsumer来读取写入这个Buffer的数据 BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); // 1.1 将数据写入到Buffer中(一条记录很大的话,可能会写入到多个Buffer中) while (result.isFullBuffer()) { // 1.2 对写满的Buffer进行处理 finishBufferBuilder(bufferBuilder); // If this was a full record, we are done. Not breaking out of the loop at this point // will lead to another buffer request before breaking out (that would not be a // problem per se, but it can lead to stalls in the pipeline). // 1.4 record全部写入到Buffer中了,返回true标记下压缩这个Buffer(因为这个Buffer可能没写满) if (result.isFullRecord()) { pruneTriggered = true; emptyCurrentBufferBuilder(targetChannel); break; } // 1.3 一个Buffer不够写,申请一个新的Buffer继续写 bufferBuilder = requestNewBufferBuilder(targetChannel); result = serializer.copyToBufferBuilder(bufferBuilder); } checkState(!serializer.hasSerializedData(), "All data should be written at once"); if (flushAlways) { flushTargetPartition(targetChannel); } return pruneTriggered; }requestNewBufferBuilder()方法内部会通过ResultPartition中的bufferPool来申请BufferBuilder,bufferPool用来管理BufferBuilder,而BufferBuilder个人理解是Task处理输入输出时特有的概念,应该是为了方便一部分的内存管理。
RecordWriter内部会有一个单独的OutputFlusher线程,定期触发ResultPartition的flush()操作。ResultPartition内部会触发各个ResultSubPartition的flush()操作:
这里以流处理模式为例进行说明。此时ResultSubPartition对应的具体实现类为PipelinedSubPartition,它的flush()方法会立即通知下游的Task去消费数据:
下游Task有两种:一种是LocalInputChannel,上下游Task在同一个JVM中;另一种是CreditBasedSequenceNumberingViewReader,上下游Task在不同的TaskManager中,需要通过网络进行数据传输,同时它提供了credit based反压机制(反压机制后续单独写篇文章说明下):
下游Task通过调用createReadView()创建一个PipelinedSubpartitionView来消费数据。这个View是在程序刚开始运行的时候创建的,此时程序还没有消费数据。创建View的时候需要提供一个BufferAvailabilityListener对象,用于作为buffer中有数据可用时候的回调,上面所说的flush()方法内部就是通过这个listener去通知下游Task去消费数据。创建View的时候,代码堆栈信息如下:
PipelinedSubpartitionView内部提供了getNextBuffer()方法,内部调用parent.pollBuffer()方法来获取Buffer中的数据,然后讲数据发往下游的Channel中。
下游Task通过SingleInputGate中的pollNext()方法拉取Channel中的数据,再调用用户编写的方法对数据进行处理:
参考:
https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks
https://www.jianshu.com/p/5748df8428f9(Flink通信机制)
https://www.cnblogs.com/029zz010buct/p/11637463.html (Task之间交换数据示意图)
https://blog.csdn.net/lvwenyuan_1/article/details/103404591(Flink内存管理机制)
https://www.jianshu.com/p/8499ec3f261c(CreditBasedSequenceNumberingViewReader通信)
https://blog.csdn.net/ByteDanceTech/article/details/108722605(字节跳动对Flink的优化)