Flink(process-KeyedProcessFunction和flatmap-RichFlatMapFunction)

    技术2024-10-29  19

    需求:温度不能相差10 如果超过10就报警 下面有两中方法实现 process和flatmap process-KeyedProcessFunction

    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector //传感器读书样列类 case class SensorReading(id :String,timestamp:Long,temperature:Double) object ProcessFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream = env.socketTextStream("hadoop102", 7777) val dataStream = stream.map(data=>{ val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000 }) val processedStream = dataStream.keyBy(_.id) .process(new TempChangeAlert(10.0)) dataStream.print("input data") processedStream.print("processed data") env.execute() } } class TempChangeAlert(threshold:Double) extends KeyedProcessFunction[String,SensorReading,(String,Double,Double)] { //定义一个状态变量,保存上次的温度值 lazy val lastTempState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double])) override def processElement(i: SensorReading, context: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, collector: Collector[(String, Double, Double)]): Unit = { //获取上次的温度值 val lastTemp = lastTempState.value() //用当前的温度值和上次的求差,如果大于值,输出报警信息 abs绝对值 val diff = (i.temperature - lastTemp).abs if (diff > threshold){ collector.collect((i.id,lastTemp,i.temperature)) } lastTempState.update(i.temperature) } }

    flatmap-RichFlatMapFunction

    import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector //传感器读书样列类 case class SensorReading(id :String,timestamp:Long,temperature:Double) object ProcessFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream = env.socketTextStream("hadoop102", 7777) val dataStream = stream.map(data=>{ val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000 }) val processedStream = dataStream.keyBy(_.id) .flatMap(new TempChangeAlert(10.0)) dataStream.print("input data") processedStream.print("processed data") env.execute() } } class TempChangeAlert(threshold:Double) extends RichFlatMapFunction[SensorReading,(String,Double,Double)] { private var lastTempState:ValueState[Double] = _ //用lazy也可以,但是这里我们用open override def open(parameters: Configuration): Unit = { //初始化的时候声明state变量 lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double])) } override def flatMap(i: SensorReading, collector: Collector[(String, Double, Double)]): Unit = { //获取上次的温度值 val lastTemp = lastTempState.value() //用当前的温度值和上次的求差,如果大于值,输出报警信息 abs绝对值 val diff = (i.temperature - lastTemp).abs if (diff > threshold){ collector.collect((i.id,lastTemp,i.temperature)) } lastTempState.update(i.temperature) } }
    Processed: 0.012, SQL: 9