Flink中Time机制总结

    技术2026-01-21  7

    最近做的实验涉及到Flnik的时间戳机制,系统的总结一下Flink的time机制

    一、Flink中timestamp和watermark使用

    在数据源中指定时间戳和水位线

    @Override public void run(SourceContext<MyType> ctx) throws Exception { while (/* condition */) { MyType next = getNext(); //指定时间戳 ctx.collectWithTimestamp(next, next.getEventTimestamp()); if (next.hasWatermarkTime()) { //指定水位线 ctx.emitWatermark(new Watermark(next.getWatermarkTime())); } } }

    在数据流中执行(运行时指定,越靠近数据源越好)

    //周期性生成watermark source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple3<String, Integer, Long>>() { @Nullable @Override /*不是每次记录来了都会调用,默认是200ms,可以通过 env.getConfig.setAutiWatermarkInterval(5000)设置,参数是自定义时间长度*/ public Watermark getCurrentWatermark() { return null; } @Override //生成时间戳 public long extractTimestamp(Tuple3<String, Integer, Long> element, long previousElementTimestamp) { return 0; } }); source.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<>() { @Override public long extractTimestamp(Object element, long previousElementTimestamp) { return 0; } @Nullable @Override //每次都会调用,检查是否能够生成watermark public Watermark checkAndGetNextWatermark(Object lastElement, long extractedTimestamp) { return null; } });

    二、ProcessFunction中对time的应用

    这个底层接口可以实现:

    获取记录的时间戳和当前的processtime获取当前算子的时间注册定时器timer,写回调逻辑按时触发 source.process(new ProcessFunction<Tuple3<String, Integer, Long>, Object>() { @Override //处理逻辑;通过ctx参数还能得到TimerService类,注册事件时间定时器和处理时间定时器 public void processElement(Tuple3<String, Integer, Long> value, Context ctx, Collector<Object> out) throws Exception { //参数时间:当事件时间(或者处理时间)超过给定参数,触发timer ctx.timerService().registerEventTimeTimer(1000); ctx.timerService().registerProcessingTimeTimer(1000); } @Override //回调逻辑 public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception { super.onTimer(timestamp, ctx, out); } })
    Processed: 0.013, SQL: 9