本文使用Table Api实现word count,自定义UDF实现单词切割。
object TestUDFByWordCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, settings)
// source
val stream= env.socketTextStream("flink101", 8888)
val table = tableEnv.fromDataStream(stream, 'line)
// 使用table api切割单词,自定义UDF
val my_func = new MyFlatMapFunction
// 把一行数据传入UDF进行切割,然后给列重新命名
val re = table.flatMap(my_func('line)).as('word, 'word_c)
.groupBy('word) // 分组聚合
.select('word, 'word_c.sum as 'cnt)
tableEnv.toRetractStream[Row](re).filter(_._1 == true).print()
tableEnv.execute("TestUDFByWordCount")
}
// 自定义UDF
class MyFlatMapFunction extends TableFunction[Row] {
// 定义函数处理之后的返回类型,输出单词和1
override def getResultType: TypeInformation[Row] = Types.ROW(Types.STRING, Types.INT)
// 函数主体
def eval(str: String): Unit = {
if (StringUtils.isNotEmpty(str)) str.trim.split(" ").foreach(word => {
val row = new Row()
row.setField(0, word)
row.setField(1,1 )
collect(row)
})
}
}
}
转载请注明原文地址:https://ipadbbs.8miu.com/read-13835.html