业务需求:每隔5秒钟,统计通话数量,假设数据延迟最大3秒。
代码实现
object TestWindowByTableAPI {
// 每隔5秒钟,统计通话数量,假设数据延迟最大3秒,需要watermark
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// eventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, settings)
val stream = env.socketTextStream("flink101", 8888)
.map(line => {
var arr = line.split(",")
Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
}) // 引入watermark,让窗口延迟触发
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Log](Time.seconds(3)) {
override def extractTimestamp(element: Log): Long = element.callTime
})
// 从stream中创建动态table,并指定哪个字段是eventTime
val table = tableEnv.fromDataStream(stream, 'sid, 'callOut, 'callIn, 'callType, 'callTime.rowtime)
// 开窗,滚动窗口,第一种写法
val result = table.window(Tumble.over("5.second").on("callTime").as("window"))
.groupBy('window, 'sid) // 必须使用两个字段分组,分别是窗口和id
.select('sid, 'window.start, 'window.end, 'sid.count) // 聚合计算
// 第二种写法
table.window(Tumble over 5.second on 'callTime as 'window)
// 如果是滑动窗口
// 写法一
table.window(Slide over 10.second every 5.second on 'callTime as 'window)
// 写法二
table.window(Slide.over("10.second").every("5.second").on("callTime").as("window"))
env.execute("TestWindowByTableAPI")
}
}
转载请注明原文地址:https://ipadbbs.8miu.com/read-14711.html