Flink Table API开窗分组聚合

    技术2022-07-11  77

    业务需求:每隔5秒钟,统计通话数量,假设数据延迟最大3秒。

    代码实现

    object TestWindowByTableAPI { // 每隔5秒钟,统计通话数量,假设数据延迟最大3秒,需要watermark def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // eventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, settings) val stream = env.socketTextStream("flink101", 8888) .map(line => { var arr = line.split(",") Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong) }) // 引入watermark,让窗口延迟触发 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Log](Time.seconds(3)) { override def extractTimestamp(element: Log): Long = element.callTime }) // 从stream中创建动态table,并指定哪个字段是eventTime val table = tableEnv.fromDataStream(stream, 'sid, 'callOut, 'callIn, 'callType, 'callTime.rowtime) // 开窗,滚动窗口,第一种写法 val result = table.window(Tumble.over("5.second").on("callTime").as("window")) .groupBy('window, 'sid) // 必须使用两个字段分组,分别是窗口和id .select('sid, 'window.start, 'window.end, 'sid.count) // 聚合计算 // 第二种写法 table.window(Tumble over 5.second on 'callTime as 'window) // 如果是滑动窗口 // 写法一 table.window(Slide over 10.second every 5.second on 'callTime as 'window) // 写法二 table.window(Slide.over("10.second").every("5.second").on("callTime").as("window")) env.execute("TestWindowByTableAPI") } }
    Processed: 0.015, SQL: 12