Flink自定义Window----自定义Evictor(驱逐者)

    技术2022-07-12  76

    在看flink Window 机制的相关博客时,看到有个同学的需求很有趣,

    "如何让一个DataStream中的某个字段与21天前的该字段做比较?"

    该同学给定了一个大小21天,每一天滑动一次的window

    解决方案: 在其TimeWindow上进行修改,挖空中间不需要的20天,整出来一个门字形的TimeWindow,

    这样只剩下第一天和最后一天的两个门脚数据,以进行后续的操作,如比较

    具体实现依靠自定义Evictor实现,

    这样只需要应用该GantryTimeEvictor 即可

    keyedStream     .window(SlidingEventTimeWindows.of(Time.days(21), Time.days(     .evictor(GantryTimeEvictor.of(Time.days(1))); 代码如下:

    package com.run;   import org.apache.flink.api.common.time.Time; import org.apache.flink.streaming.api.windowing.evictors.Evictor; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;   import java.util.Iterator;   /**  * @Author MrBlack  * @Date 2019/12/17 017 下午 6:43  **/ public class GantryTimeEvictor implements Evictor<Object, TimeWindow> {       // 双脚宽度     private final long gantryFootSize;     // 是否在应用窗口后驱除     private final boolean doEvictAfter;       public GantryTimeEvictor(long gantryFootSize) {         this.gantryFootSize = gantryFootSize;         this.doEvictAfter = false;     }       public GantryTimeEvictor(long gantryFootSize, boolean doEvictAfter) {         this.gantryFootSize = gantryFootSize;         this.doEvictAfter = doEvictAfter;     }       /**      * 在应用窗口前驱除不需要的元素      *      * @param elements   当前在窗口中的元素      * @param size       窗口中元素数量      * @param timeWindow 当前窗口      * @param ctx        Evictor上下文      */     @Override     public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, TimeWindow timeWindow, EvictorContext ctx) {         if (!this.doEvictAfter) {             this.evict(elements, size, timeWindow, ctx);         }     }       /**      * 在应用窗口后驱除不需要的元素      */     @Override     public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, TimeWindow timeWindow, EvictorContext ctx) {         if (this.doEvictAfter) {             this.evict(elements, size, timeWindow, ctx);         }     }       private void evict(Iterable<TimestampedValue<Object>> elements, int size, TimeWindow timeWindow, EvictorContext ctx) {         if (this.hasTimestamp(elements)) {             long evictCutBegin = timeWindow.getStart() + this.gantryFootSize;             long evictCutEnd = timeWindow.getEnd() - this.gantryFootSize;             Iterator iterator = elements.iterator();             while (iterator.hasNext()) {                 TimestampedValue<Object> record = (TimestampedValue) iterator.next();                 if (record.getTimestamp() >= evictCutBegin && record.getTimestamp() <= evictCutEnd) {                     iterator.remove();                 }             }         }     }       /**      * 判断元素是否有时间戳      *      * @param elements      * @return      */     private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) {         Iterator<TimestampedValue<Object>> it = elements.iterator();         return it.hasNext() && it.next().hasTimestamp();     }       public static GantryTimeEvictor of(Time gantryFootSize) {         return new GantryTimeEvictor(gantryFootSize.toMilliseconds());     }       public static GantryTimeEvictor of(Time gantryFootSize, boolean doEvictAfter) {         return new GantryTimeEvictor(gantryFootSize.toMilliseconds(), doEvictAfter);     } }  

    Processed: 0.009, SQL: 9