pom
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.7.0</version> </dependency> //自定义一个kafka工具类,免得每次都要写一遍重复代码 import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 object MyKafkaUtil { val prop = new Properties() prop.setProperty("bootstrap.servers","192.168.199.100:9092") prop.setProperty("group.id","flinkdemo") def getConsumer(topic:String ):FlinkKafkaConsumer011[String]= { val myKafkaConsumer:FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop) myKafkaConsumer } } import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object WordCountStream { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataStream: DataStream[String] = env.socketTextStream("192.168.199.100",9999) //如果流过来时候输入空格,可能会有空字符 filter(_.nonEmpty)过滤掉空字符 import org.apache.flink.api.scala._ val word2CountDataStream: DataStream[(String, Int)] = dataStream.flatMap(_.split(" ")).filter(_.nonEmpty).map((_,1)).keyBy(0).sum(1) word2CountDataStream.print() env.execute() } }Flink通过checkpoint来保存数据是否处理完成的状态
由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。
执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。
如果宕机需要通过StateBackend进行恢复,只能恢复所有确认提交的操作。
