在看flink Window 机制的相关博客时,看到有个同学的需求很有趣,
"如何让一个DataStream中的某个字段与21天前的该字段做比较?"
该同学给定了一个大小21天,每一天滑动一次的window
解决方案: 在其TimeWindow上进行修改,挖空中间不需要的20天,整出来一个门字形的TimeWindow,
这样只剩下第一天和最后一天的两个门脚数据,以进行后续的操作,如比较
具体实现依靠自定义Evictor实现,
这样只需要应用该GantryTimeEvictor 即可
keyedStream .window(SlidingEventTimeWindows.of(Time.days(21), Time.days( .evictor(GantryTimeEvictor.of(Time.days(1))); 代码如下:
package com.run; import org.apache.flink.api.common.time.Time; import org.apache.flink.streaming.api.windowing.evictors.Evictor; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; import java.util.Iterator; /** * @Author MrBlack * @Date 2019/12/17 017 下午 6:43 **/ public class GantryTimeEvictor implements Evictor<Object, TimeWindow> { // 双脚宽度 private final long gantryFootSize; // 是否在应用窗口后驱除 private final boolean doEvictAfter; public GantryTimeEvictor(long gantryFootSize) { this.gantryFootSize = gantryFootSize; this.doEvictAfter = false; } public GantryTimeEvictor(long gantryFootSize, boolean doEvictAfter) { this.gantryFootSize = gantryFootSize; this.doEvictAfter = doEvictAfter; } /** * 在应用窗口前驱除不需要的元素 * * @param elements 当前在窗口中的元素 * @param size 窗口中元素数量 * @param timeWindow 当前窗口 * @param ctx Evictor上下文 */ @Override public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, TimeWindow timeWindow, EvictorContext ctx) { if (!this.doEvictAfter) { this.evict(elements, size, timeWindow, ctx); } } /** * 在应用窗口后驱除不需要的元素 */ @Override public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, TimeWindow timeWindow, EvictorContext ctx) { if (this.doEvictAfter) { this.evict(elements, size, timeWindow, ctx); } } private void evict(Iterable<TimestampedValue<Object>> elements, int size, TimeWindow timeWindow, EvictorContext ctx) { if (this.hasTimestamp(elements)) { long evictCutBegin = timeWindow.getStart() + this.gantryFootSize; long evictCutEnd = timeWindow.getEnd() - this.gantryFootSize; Iterator iterator = elements.iterator(); while (iterator.hasNext()) { TimestampedValue<Object> record = (TimestampedValue) iterator.next(); if (record.getTimestamp() >= evictCutBegin && record.getTimestamp() <= evictCutEnd) { iterator.remove(); } } } } /** * 判断元素是否有时间戳 * * @param elements * @return */ private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) { Iterator<TimestampedValue<Object>> it = elements.iterator(); return it.hasNext() && it.next().hasTimestamp(); } public static GantryTimeEvictor of(Time gantryFootSize) { return new GantryTimeEvictor(gantryFootSize.toMilliseconds()); } public static GantryTimeEvictor of(Time gantryFootSize, boolean doEvictAfter) { return new GantryTimeEvictor(gantryFootSize.toMilliseconds(), doEvictAfter); } }