flink 三种方式实现wordcount

    技术2022-07-31  71

    sum

    在对datastream keyby后使用sum函数聚合

    package com.stanley.wordcount import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ /** * Created by admin on 2020/7/2. */ object SumWordCount { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //并行度设置为1 env.setParallelism(1) //读取文本流数据 val inputDataStream:DataStream[String] = env.socketTextStream("node1",9999) val outputDataStream:DataStream[(String,Int)] = inputDataStream.flatMap(_.split(" ")) .map((_, 1)) .keyBy(0) .sum(1) outputDataStream.print("sum_wordcount") env.execute("wc test") } }

    processfunction

    调用最底层processfunction,将count保存成一个keyedstate

    package com.stanley.wordcount import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.util.Collector /** * Created by admin on 2020/7/2. */ object ProcessWordCount { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputDataStream:DataStream[String] = env.socketTextStream("node1",9999) val outputDataStraem:DataStream[(String,Int)] = inputDataStream .flatMap(_.split(" ")) .map((_, 1)) .keyBy(0) //调用新建的MyProcessFunction .process(new MyProcessFunction) outputDataStraem.print("process_wordcount") env.execute("wc test") } } class MyProcessFunction extends KeyedProcessFunction[Tuple,(String,Int),(String,Int)]{ //创建一个countState private var countState:ValueState[Int] = _ override def open(parameters: Configuration): Unit = { //初始化countState countState = getRuntimeContext.getState[Int](new ValueStateDescriptor[Int]("count",classOf[Int])) } override def processElement(i: (String, Int), context: KeyedProcessFunction[Tuple, (String, Int), (String, Int)]#Context, collector: Collector[(String, Int)]): Unit = { //取出count var count = countState.value() count+=1 //更新countState countState.update(count) collector.collect((i._1,count)) } }

    RichMapFunction

    RichMapFunction和ProcessFunction一样都是实现了AbstractRichFunction,所以同样拥有生命周期方法和运行时上下文,以及keyed state

    package com.stanley.wordcount import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.util.Collector /** * Created by admin on 2020/7/2. */ object RichWordCount { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputDataStream:DataStream[String] = env.socketTextStream("node1",9999) val outputDataStraem:DataStream[(String,Int)] = inputDataStream .flatMap(_.split(" ")) .map((_, 1)) .keyBy(0) //调用MyRichMapFunction .map(new MyRichMapFunction) outputDataStraem.print("rich_wordcount") env.execute("wc test") } } class MyRichMapFunction extends RichMapFunction[(String,Int),(String,Int)]{ private var countState:ValueState[Int] = _ override def open(parameters: Configuration): Unit = { countState = getRuntimeContext.getState[Int](new ValueStateDescriptor[Int]("count",classOf[Int])) } override def map(in: (String,Int)): (String, Int) = { var count = countState.value() count+=1 countState.update(count) (in._1,count) } }

    总结

    sum方法适合在比较简单的逻辑的计算中使用,ProcessFunction和RichMapFunction在实际应用环境中可以通过将状态保存到状态后端,如果出现故障通过checkpoint来恢复。

    Processed: 0.010, SQL: 9