Flink的窗口算子 WindowOperator的实现原理

    技术2022-07-12  70

    窗口算子WindowOperator是窗口机制的底层实现,它几乎会牵扯到所有窗口相关的知识点,因此相对复杂。本文将以由面及点的方式来分析WindowOperator的实现。首先,我们来看一下对于最常见的时间窗口(包含处理时间和事件时间)其执行示意图:

    上图中,左侧从左往右为事件流的方向。方框代表事件,事件流中夹杂着的竖直虚线代表水印,Flink通过水印分配器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWatermarksOperator这两个算子)向事件流中注入水印。元素在streaming dataflow引擎中流动到WindowOperator时,会被分为两拨,分别是普通事件和水印。

    如果是普通的事件,则会调用processElement方法(上图虚线框中的三个圆圈中的一个)进行处理,在processElement方法中,首先会利用窗口分配器为当前接收到的元素分配窗口,接着会调用触发器的onElement方法进行逐元素触发。对于时间相关的触发器,通常会注册事件时间或者处理时间定时器,这些定时器会被存储在WindowOperator的处理时间定时器队列和水印定时器队列中(见图中虚线框中上下两个圆柱体),如果触发的结果是FIRE,则对窗口进行计算。

    如果是水印(事件时间场景),则方法processWatermark将会被调用,它将会处理水印定时器队列中的定时器。如果时间戳满足条件,则利用触发器的onEventTime方法进行处理。

    而对于处理时间的场景,WindowOperator将自身实现为一个基于处理时间的触发器,以触发trigger方法来消费处理时间定时器队列中的定时器满足条件则会调用窗口触发器的onProcessingTime,根据触发结果判断是否对窗口进行计算。

    以上是WindowOperator的常规流程最简单的表述,事实上其逻辑要复杂得多。我们首先分解掉几个内部核心对象,上图中我们看到有两个队列:分别是水印定时器队列和处理时间定时器队列。这里的定时器是什么?它有什么作用呢?接下来我们就来看看它的定义——WindowOperator的内部类Timer。Timer是所有时间窗口执行的基础,它其实是一个上下文对象,封装了三个属性:

    timestamp:触发器触发的时间戳; key:当前元素所归属的分组的键; window:当前元素所属窗口;

    在我们讲解窗口触发器时,我们曾提及过触发器上下文对象,它作为process系列方法参数。在WindowOperator内部我们终于看到了对该上下文对象接口的实现——Context,它主要提供了三种类型的方法:

    提供状态存储与访问; 定时器的注册与删除; 窗口触发器process系列方法的包装;

    在注册定时器时,会新建定时器对象并将其加入到定时器队列中。等到时间相关的处理方法(processWatermark和trigger)被触发调用,则会从定时器队列中消费定时器对象并调用窗口触发器,然后根据触发结果来判断是否触动窗口的计算。我们选择事件时间的处理方法processWatermark进行分析(处理时间的处理方法trigger跟其类似):

    ?

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    public void  processWatermark(Watermark mark) throws Exception {

        //定义一个标识,表示是否仍有定时器满足触发条件  

        boolean fire;  

        do {

            //从水印定时器队列中查找队首的一个定时器,注意此处并不是出队(注意跟remove方法的区别)     

            Timer<k, w=""> timer = watermarkTimersQueue.peek();     

            //如果定时器存在,且其时间戳戳不大于水印的时间戳

            //(注意理解条件是:不大于,水印用于表示小于该时间戳的元素都已到达,所以所有不大于水印的触发时间戳都该被触发)

            if (timer != null && timer.timestamp <= mark.getTimestamp()) {

                //置标识为真,表示找到满足触发条件的定时器        

                fire = true;        

                //将该元素从队首出队

                watermarkTimers.remove(timer);        

                watermarkTimersQueue.remove();

                //构建新的上下文        

                context.key = timer.key;        

                context.window = timer.window;        

                setKeyContext(timer.key);        

                //窗口所使用的状态存储类型为可追加的状态存储

                AppendingState<in, acc=""> windowState;        

                MergingWindowSet<w> mergingWindows = null;        

                //如果分配器是合并分配器(比如会话窗口)

                if (windowAssigner instanceof MergingWindowAssigner) {

                    //获得合并窗口帮助类MergingWindowSet的实例           

                    mergingWindows = getMergingWindowSet();           

                    //获得当前窗口对应的状态窗口(状态窗口对应着状态后端存储的命名空间)

                    W stateWindow = mergingWindows.getStateWindow(context.window);           

                    //如果没有对应的状态窗口,则跳过本次循环

                    if (stateWindow == null) {                             

                        continue;           

                    }

                    //获得当前窗口对应的状态表示           

                    windowState = getPartitionedState(stateWindow,

                        windowSerializer, windowStateDescriptor);        

                } else {

                    //如果不是合并分配器,则直接获取窗口对应的状态表示           

                    windowState = getPartitionedState(context.window,

                        windowSerializer, windowStateDescriptor);        

                }

                //从窗口状态表示中获得窗口中所有的元素        

                ACC contents = windowState.get();        

                if (contents == null) {           

                    // if we have no state, there is nothing to do           

                    continue;        

                }

                //通过上下文对象调用窗口触发器的事件时间处理方法并获得触发结果对象

                TriggerResult triggerResult = context.onEventTime(timer.timestamp);        

                //如果触发的结果是FIRE(触动窗口计算),则调用fire方法进行窗口计算

                if (triggerResult.isFire()) {           

                    fire(context.window, contents);        

                }

                //而如果触动的结果是清理窗口,或者事件时间等于窗口的清理时间(通常为窗口的maxTimestamp属性)        

                if (triggerResult.isPurge() ||

                    (windowAssigner.isEventTime()

                        && isCleanupTime(context.window, timer.timestamp))) {

                    //清理窗口及元素           

                    cleanup(context.window, windowState, mergingWindows);        

                }     

            } else {

                //队列中没有符合条件的定时器,置标识为否,终止循环        

                fire = false;     

            }  

        } while (fire);  

        //向下游发射水印

        output.emitWatermark(mark);  

        //将当前算子的水印属性用新水印的时间戳覆盖

        this.currentWatermark = mark.getTimestamp();

    }</w></in,></k,>

    以上方法虽然冗长但流程还算清晰,其中的fire方法用于对窗口进行计算,它会调用内部窗口函数(即InternalWindowFunction,它包装了WindowFunction)的apply方法。

    而isCleanupTime和cleanup这对方法主要涉及到窗口的清理。如果当前窗口是时间窗口,且窗口的时间到达了清理时间,则会进行清理窗口清理。那么清理时间如何判断呢?Flink是通过窗口的最大时间戳属性结合允许延迟的时间联合计算的:

    ?

    1

    2

    3

    4

    5

    6

    7

    8

    private long  cleanupTime(W window) {

        //清理时间被预置为窗口的最大时间戳加上允许的延迟事件  

        long cleanupTime = window.maxTimestamp() + allowedLateness;

        //如果窗口为非时间窗口(其maxTimestamp属性值为Long.MAX_VALUE),则其加上允许延迟的时间,

        //会造成Long溢出,从而会变成负数,导致cleanupTime < window.maxTimestamp 条件成立,

        //则直接将清理时间设置为Long.MAX_VALUE  

        return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;

    }

    求出清理时间后会与定时器注册的时间进行对比,如果两者相等则布尔条件为真,否则为假:

    ?

    1

    2

    3

    4

    protected final  boolean  isCleanupTime(W window, long time) {  

        long cleanupTime = cleanupTime(window);  

        return  cleanupTime == time;

    }

    下面我们来看一下清理方法主要做了哪些事情:

    ?

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    private void  cleanup(W window,              

        AppendingState<in, acc=""> windowState,              

        MergingWindowSet<w> mergingWindows) throws Exception {

        //清空窗口对应的状态后端的状态  

        windowState.clear();

        //如果支持窗口合并,则清空窗口合并集合中对应当前窗口的记录  

        if (mergingWindows != null) {  

            mergingWindows.retireWindow(window);  

        }

        //清空上下文对象状态  

        context.clear();

    }</w></in,>

    关于窗口清理,其实三大处理方法(processElement\/processWatermark\/trigger)都会进行判断,如果满足条件则清理。而真正注册清理定时器的逻辑在processElement中,它会调用registerCleanupTimer方法:

    ?

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    protected void  registerCleanupTimer(W window) {

        //这里注册的时间即为计算过了的清理时间  

        long cleanupTime = cleanupTime(window);

        //根据不同的时间分类调用不同的注册方法  

        if (windowAssigner.isEventTime()) {     

            context.registerEventTimeTimer(cleanupTime);  

        } else {     

            context.registerProcessingTimeTimer(cleanupTime);  

        }

    }

    从上面的代码段可知:清理定时器跟普通定时器是一样的。

    如果没有延迟,对于事件时间和处理时间而言,也许它们的窗口清理不一定是由清理定时器触发。因为在事件时间触发器和处理时间触发器中,它们注册的定时器对应的时间点就是窗口的最大时间戳。由于这些定时器在队列中一般排在清理定时器之前,所以这些定时器会优先于清理定时器得到执行(优先触发窗口的清理)。而这里的registerCleanupTimer方法,是一般化的清理机制,针对所有类型的窗口都适用,并确保窗口一定会得到清理。而对于刚刚提到的这种情况,重复的“清理”定时器并不会产生负作用。

    WindowOperator还有一个继承者:EvictingWindowOperator,该算子在常规的窗口算子上支持了元素驱逐器(见上图中大虚线框内部的小虚线长方形)。EvictingWindowOperator特别的地方主要在于其fire的实现——在进行窗口计算之前会预先对符合驱逐条件的元素进行剔除,具体实现见如下代码:

    ?

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    private void  fire(W window, Iterable<streamrecord<in>> contents) throws Exception {  

        timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());  

        //计算要驱逐的元素个数  

        int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window);  

        FluentIterable<in> projectedContents = FluentIterable     

            .from(contents)     

            .skip(toEvict)     

            .transform(new Function<streamrecord<in>, IN>() {        

                @Override        

                public IN apply(StreamRecord<in> input) {           

                    return input.getValue();        

                }     

            });  

        userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);

    }</in></streamrecord<in></in></streamrecord<in>

    在最终调用窗口计算的apply方法之前,会先计算要驱逐的元素个数,然后跳过这些元素并且跳过的都是从首个元素开始的连续个元素(这一点在之前我们分析窗口元素驱逐器是也曾提及过)。

    这里采用了Guava类库的FluentIterable帮助类,它扩展了Iterable接口并提供了非常丰富的扩展API。

    Processed: 0.009, SQL: 9