Flink Table API运用与UDF实现

    技术2022-07-11  99

    本文使用Table Api实现word count,自定义UDF实现单词切割。

    object TestUDFByWordCount { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, settings) // source val stream= env.socketTextStream("flink101", 8888) val table = tableEnv.fromDataStream(stream, 'line) // 使用table api切割单词,自定义UDF val my_func = new MyFlatMapFunction // 把一行数据传入UDF进行切割,然后给列重新命名 val re = table.flatMap(my_func('line)).as('word, 'word_c) .groupBy('word) // 分组聚合 .select('word, 'word_c.sum as 'cnt) tableEnv.toRetractStream[Row](re).filter(_._1 == true).print() tableEnv.execute("TestUDFByWordCount") } // 自定义UDF class MyFlatMapFunction extends TableFunction[Row] { // 定义函数处理之后的返回类型,输出单词和1 override def getResultType: TypeInformation[Row] = Types.ROW(Types.STRING, Types.INT) // 函数主体 def eval(str: String): Unit = { if (StringUtils.isNotEmpty(str)) str.trim.split(" ").foreach(word => { val row = new Row() row.setField(0, word) row.setField(1,1 ) collect(row) }) } } }
    Processed: 0.008, SQL: 9