要求:将小于32℃输入到侧输入流,大于32℃输入到主流
import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector //传感器读书样列类 case class SensorReading(id :String,timestamp:Long,temperature:Double) object SideOutput { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream = env.socketTextStream("hadoop102", 7777) val dataStream = stream.map(data=>{ val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp*1000 }) val processedStream = dataStream .process(new FreeZingAlert()) processedStream.print("processed data") processedStream.getSideOutput(new OutputTag[String]("freezing alert")).print("alter data") env.execute("test") } } //冰点报警,如果小于32F,输出报警信息到测输出流 class FreeZingAlert() extends ProcessFunction[SensorReading,SensorReading]{ lazy val alertOutput:OutputTag[String] = new OutputTag[String]("freezing alert") override def processElement(i: SensorReading, context: ProcessFunction[SensorReading, SensorReading] #Context, collector: Collector[SensorReading]): Unit = { if (i.temperature<32.0) { context.output(alertOutput,"freezing alter for " + i.id) }else{ //主流 collector.collect(i) } } }