flink(8) 状态 checkpoint 状态一致性

    技术2022-07-11  83

    flink中的状态

     

    由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态

    可以认为状态就是一个本地变量,可以背任务的业务逻辑访问

    什么是状态?状态是针对每个算子而言,在每个并行任务中用于计算结果的数据

    可以看作是一个本地变量:一般放在本地内存:flink会统一进行数据类型的管理,方便进行读写传输以及容错保证

    flink会进行状态管理,包括状态一致性,故障处理以及高效存储和访问,以便开发人员可以专注于应用程序的逻辑

    flink 有两种类型的状态 1.算子状态(OperatorState)2.键控状态(Keyed State)

    两种类型的区别在于作用域 operator state 针对当前任务所有输入的数据可见,当前任务输入的所有数据都可以访问同一份状态

    keyed state 状态只针对当前key的数据可见 对每个key维护和管理一份状态实例

    算子状态(OperatiorState)

    算子状态的作用范围限定为算子任务,由同一并行任务所处理的所有数据都可以访问到相同的状态

    状态对于同一子任务而言时共享的

    算子状态不能由相同或不同算子的另一个子任务访问

    算子状态数据结构

    列表状态(List state)

    将状态表示为一组数据的列表

    联合列表状态(Union list state)

    也将状态表示为数据的列表。它与常规列表的状态的区别在于,在发生故障时,或者从保存点启动应用程序是如何恢复

    广播状态 (Broadcast state)

    如果一个算子有多项任务,而他的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态

     

    键控状态(Keyed State)

    键控状态是根据输入数据流中定义的键(Key)来维护和访问的

    Flink为每个Key维护一个状态实例,并将具有相同键所有的数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态

    当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key

     

    键控状态数据结构

    值状态(Value state)

     将状态表示为单个的值

    列表状态(List state)

    将状态表示为一组数据的列表

    映射状态(Map state)

    将状态表示为一组key value

    聚合状态(Reducing state&Aggregating state)

    将状态表示为一个用于聚合操作的列表

    代码调用 各种类型的 state

    object StateTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val inputDataStream: DataStream[String] = env.socketTextStream("127.0.0.1", 9436) val dataStream = inputDataStream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) env.execute("state test job") } } //在keyedProcessFunction 中使用各种状态 class MyProcessor extends KeyedProcessFunction[String,SensorReading,Int]{ //valuestate liststate mapstate 参数都是name 以及 里面的类型声明 // lazy val myState:ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("mystate",classOf[Int])) lazy val myListState:ListState[String] = getRuntimeContext.getListState(new ListStateDescriptor[String]("myliststate",classOf[String])) lazy val myMapState:MapState[String,Double] = getRuntimeContext.getState(new MapStateDescriptor[String,Double]("mymapstate",classOf[String],classOf[Double])) // reducingState 跟其他的state不同的是 需要传一个reduce函数 这里传了一个匿名类 其实也可以 自定义一个函数类实现 lazy val myReducingState:ReducingState[SensorReading] = getRuntimeContext.getReducingState(new ReducingStateDescriptor[SensorReading]("myreducing_state",new ReduceFunction[SensorReading] { override def reduce(t: SensorReading, t1: SensorReading): SensorReading = { SensorReading(t.id,t1.timestamp.max(t.timestamp),t.temperature.min(t1.temperature)) } },classOf[SensorReading])) var myState:ValueState[Int] =_ override def open(parameters: Configuration): Unit = { myState= getRuntimeContext.getState(new ValueStateDescriptor[Int]("mystate",classOf[Int])) } override def processElement(i: SensorReading, context: KeyedProcessFunction[String, SensorReading, Int]#Context, collector: Collector[Int]): Unit = { myState.value() myState.update(1) myListState.add("a") val flag: Boolean = myMapState.contains("a") myMapState.put("a",1) myReducingState.add(i) //此处 会直接聚合进状态 } }

     

    状态后端(State Backends)

    每传入一条数据,有状态的算子任务都会读取和更新状态

    由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问

    状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端

    状态后端主要负责两件事:本地的状态管理,以及将检查点(checkpoint)状态写入远程存储

     flink 提供了三种状态后端

    MemoryStateBackend

    内存级的状态后端,会将键控状态作为内存中的对象进行管理,将他们存储在taskmanager的jvm堆上,而将checkpoint 存储在jobmanager的内存中。特点:快速、低延迟、但不稳定

    FsStateBackend

    将checkpoint存到远程的持久化文件系统(FileSystem)上,而对于本地状态,跟memorystatebackend 一样 也会存在taskmanager堆上 特点:同时拥有内存记得本地访问速度,和更好的容错保证

    RocksDBStateBackend

    将所有状态序列化后,存入本地的RocksDB中存储。

     

    代码层面设置状态后端

    env.setStateBackend(new MemoryStateBackend()) env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints")) //RocksDB的这种方式 需要加单独的依赖 env.setStateBackend(new RocksDBStateBackend("",true))

    除此之外  conf/flink-conf.yaml 也可以设置状态后端

    设置 state.backend 指定 jobmanager 或者 filesystem  或者rocksdb

    然后通过state.checkpoints.dir指定checkpoint的位置

     

    使用keyed state ,必须在keyby之后的操作中使用(基于KeyedStrem)

    状态编程实例 通过状态 实现判断两个连续的数据(同key的)如果差距在10.0以上就输出

    object StateTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val inputDataStream: DataStream[String] = env.socketTextStream("127.0.0.1", 9436) val dataStream = inputDataStream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) val warningStream :DataStream[(String,Double,Double)] = dataStream .keyBy("id") .map(new TempChangeWarning(10.0)) //使用flatmapwithstate方法 也就是自带state方法(就是自定义rich版本的简写版) 有一点 返回的值 要是(List,Option[T])这种 // Option[T] 是一个类型为 T 的可选值的容器: 如果值存在, Option[T] 就是一个 Some[T] ,如果不存在, Option[T] 就是对象 None 。 val warningStream2 :DataStream[(String,Double,Double)] = dataStream .keyBy("id") .flatMapWithState[(String,Double,Double),Double]({ case (inputData :SensorReading,None)=>(List.empty,Some(inputData.temperature)) case (inputData:SensorReading,lastTemp:Some[Double])=>{ val diff =(inputData.temperature-lastTemp.value).abs if(diff>10.0){ (List((inputData.id,lastTemp.get,inputData.temperature)),Some(inputData.temperature)) }else{ (List.empty,Some(inputData.temperature)) } } }) warningStream.print() env.execute("state test job") } } //自定义RichMapFunction class TempChangeWarning(threshold :Double) extends RichMapFunction[SensorReading,(String,Double,Double)]{ //定义状态变量,上一次的温度值 private var lastTempState:ValueState[Double] =_ override def open(parameters: Configuration): Unit ={ lastTempState = getRuntimeContext.getState[Double](new ValueStateDescriptor[Double]("last_temp",classOf[Double])) } override def map(t: SensorReading): (String, Double, Double) = { //从状态中取出上次的温度值 val lastTemp = lastTempState.value() //更新状态 lastTempState.update(t.temperature) //跟当前温度值计算差值 然后跟阈值比较,如果大于就报警 val diff = (t.temperature-lastTemp).abs if(diff > threshold){ (t.id,lastTemp,t.temperature) }else null } } //自定义RichFlatMapFunction,可以输出多个结果 class TempChangewaringWithFlatmap(threshold:Double) extends RichFlatMapFunction[SensorReading,(String,Double,Double)]{ lazy val lastTempState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last_temp",classOf[Double])) override def flatMap(in: SensorReading, collector: Collector[(String, Double, Double)]): Unit = { //从状态中取出上次的温度值 val lastTemp = lastTempState.value() //更新状态 lastTempState.update(in.temperature) //跟当前温度值计算差值 然后跟阈值比较,如果大于就报警 val diff = (in.temperature-lastTemp).abs if(diff > threshold){ collector.collect((in.id,lastTemp,in.temperature)) } } }

    可以认为所有的算子都可以有状态,如map/filter/flatmap本来是无状态的,但是可以通过实现RichFunction在其中自定义状态进行操作 reduce/aggregate/window本来就是有状态,是flink底层直接管理的,也可以实现RichFunction自定义状态

     

    operator state 代码实现

    class myMapper() extends RichMapFunction[SensorReading,Long] with ListCheckpointed[Long]{ var count:Long = 0L override def map(in: SensorReading): Long = { count +=1 count } override def snapshotState(l: Long, l1: Long): util.List[Long] = { val stateList = new util.ArrayList[Long]() stateList.add(count) stateList } override def restoreState(list: util.List[Long]): Unit = { list.forEach(state=>{ count +=state }) } }

    checkpoint 

    一致性检查点(checkpoint)

    flink 故障恢复机制的核心,就是应用状态的一致性检查点

    有状态流应用的一致检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(一份快照) 这个时间点 应该是所有任务都恰好处理完一个相同的输入数据的时候

    从检查点恢复状态

    遇到故障之后 第一步重启应用

     

    第二步是从checkpoint中读取状态,将状态重置 从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同 第三步:开始消费并处理检查点到发生故障之间的所有数据 这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly-oncw)的一致性,因此所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流都会被重置到检查点完成时的位置   注意有一点 checkpoint 保证的 只是 flink中 计算过程的exactly-once

    flink中的checkpoint 保存的是所有任务 状态的快照

    这个状态要求是所有任务都处理完同一个数据之后的状态

     

    flink 检查点实现算法 

    基于Chandy-Lamport算法的分布式快照

    将检查点的保存和数据处理分离开,不暂停整个应用

    检查点分界线(Chekpoint Barrier)

    Flink的检查点算法用到了一种称为分界线(barrier)的特殊数据格式,用来把一条流上数据按照不同的检查点分开

    分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所属的检查点中,而基于分界线之后的数据导致的所有更改,就会被包含在之后的检查点中。

     算法中需要对不同上游发来的barrier进行对齐 checkpoint存储位置,由状态后端state backend 决定 jobmanager触发一个checkpoint操作 会把checkpoint中所有任务状态的拓扑结构保存下来   barrier和watermark类似,都可以看作一个插入数据流中的特殊数据结构 但是  barrier 在数据处理上跟watermark 是两套机制,完全没有关系     保存点Savepoints flink还提供了自定义镜像保存功能,就是保存点(savepoints) 原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点 flink不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作 保存点是一个强大的功能,除了故障恢复外,保存点还可以用于用计划的手动备份 更新应用程序 版本迁移 暂停和重启应用等等   反压机制 spark 的反压是通过一个pid控制器(比例 积分微分来适当调节 而不是直接停掉)实现的   flink的反压 使用credit 实现反压 通过对buffer的剩余空间需求来调节信任度 上游给下游发送数据时 先发送通讯请求 下游根据收到的请求 分配信任度 如果上游的数据 超出 信任度分配的空间的话 只能先发送 信任度空间大小的 数据  此时 如果某一步处理数据 完全处理不了 此时 这里 没有剩余空间 从而 没有给上游的信任度 从而 上游也会积压 以此一步一步反方向推       checkpoint 配置(checkpoint 默认不开启) //checkpoint相关配置 //启用checkpoint 指定触发检查点间隔时间 env.enalbleCheckpointing(1000L) //其他配置 设置 checkpoint的状态一致性的语义 默认就是exactly-once env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //设置checkpoint 超时时间 env.getCheckpointConfig.setCheckpointTimeout(30000L) //设置同一时间 能够同时处理的checkpoint env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) //两次checkpint间至少要有多少时间处理数据 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500L) //传入bool值 挂了之后 true 表示从checkpoint恢复 false表示从自定义的点恢复 env.getCheckpointConfig.setPreferCheckpointForRecovery(false) //允许checkpoint失败几次 可以为0 env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3) //重启策略的配置 //重启几次 间隔多久重启 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000L)) //某个时间间隔内失败多少次 第一个参数是最多几次 第二个参数是多长时间之内 第三个参数是 两次重启的间隔时间 这里的TimeUnit用的是java.util包下的 env.setRestartStrategy(RestartStrategies.failureRateRestart(5,Time.of(5,TimeUnit.MINUTES)),Time.of(10,TimeUnit.SECONDS))

     

    状态一致性

    有状态的流处理,内部每个算子都可以由自己的状态

    对于流处理内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确。

    一条数据不应该丢失,也不应该重复计算

    在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。

     

    状态一致性分类

    at-most-once(最多一次)

    当任务故障时,最简单的做法是什么都不干,既不恢复丢失的状态,也不重播丢失的数据。At-most-once语义的含义是最多处理一次事件。

    at-least-once(至少一次)

    在大多数的真实应用场景,我们希望不丢失事件。这种类型的保障称为at-least-once 意思是所有的事件都得到了处理,而一些事件还可能被处理多次。

    exactly-once(精确一次)

    恰好处理一次是最严格的保证,也是最难实现的。恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次。

     

    flink内部使用了一种轻量级快照机制-检查点(checkpoint)来保证exaclty-once语义 

    但是目前看到的一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的 而在真实应用中 流处理应用除了流处理器外 还包含了数据源(如kafka)和输出到持久化系统    所以还需要 保证端到端(end-to-end)的状态一致性

    端到端的一致性保证 意味着结果的正确性贯穿了整个流处理应用的始终,每一个组建都保证了它自己的一致性

    整个端到端的一致性级别取决于所有组件中一致性最弱的组件

     

    端到端exactly-once

    1.内部保证---checkpoint

    2.source端---可重设数据的读取位置

    3.sink端---从故障恢复时,数据不会重复写入外部系统 (幂等写入 事务写入)

     

    幂等写入(Idempotent Writes)

    所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行 就不起作用

    本质上就是用key 或唯一id 但是不同的存储系统不一定都能保证幂等

    还有一个点就是 幂等 实际上叫做 最终状态一致性 因为在数据恢复的过程中 如果实时在取还是会有数据不一致的情况 当然全部恢复之后 状态和没挂掉是一致的

    事务写入(Transactional Writes)

    应用程序中一系列严密的操作,所有操作必须成功完成,否则每个操作中所做的所有更改都会被撤销

    具有原子性:一个事务中的一系列操作要么全部成功,要么一个都不做

    实现思想:构建的事务对应着checkpoint,等到checkpoint真正完成的时候,才把所有对应的结果写入sink系统中

    实现方式

    1.预习日志

    2.两阶段提交

     

    预写日志(Write-Ahead-Log,WAL)

    把结果数据先当成状态保存,然后在收到checkpoint 完成的通知时,一次性写入sink系统

    简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink系统,都能用这种方式一批搞定

    DataStreamAPI 提供了一个模版类:GenericWriteAheadSink,来实现这种事务性 

     

    但是有缺点:1是实际上流处理就变成批处理了(或者说 批写入) 2是如果外部系统不支持幂等写入的话 往外批量写的过程中 出了问题 会导致重复写 也就是只能保证at-least-once

     

    两阶段提交(Two-Pyase-Commit,2PC)

    对于每个checkpoint,sink任务回启动一个事务,并把接下来所有接收的数据添加到事务里

    然后将这些数据写入外部sink 系统,但不提交他们-这时只是“预提交”

    当它收到checkpoint 完成的通知时,它才正式提交事务,实现结果的真正写入

    这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统。Flink提供了TwoPhaseCommitSinkFunction接口。

    2PC对外部sink系统的要求

    外部sink系统必须提供事务支持,或者sink任务必须能够模拟外部系统上的事务

    在checkpoint的间隔期间里,必须能够开启一个事务并接受数据写入

    在收到checkpoint完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些事件。如果这个时候sink系统关闭事务(如 超时了) ,那么未提交的数据就会丢失

    sink 任务必须能够在进程失败后恢复事务

    提交事务必须是幂等操作

     

    不同source和sink的一致性保证

     

    flink+kafka端到端的一致性保证

    内部---利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性

    source---kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性

    sink---kafka producer 作为sink 采用两阶段提交sink,需要实现一个twoPhaseCommitSinkFunction(FlinkKafkaProducer已经实现)

     

     

    jobManager协调各个TaskManager进行checkpoint存储

    checkpoint保存在stateBackend中,默认stateBackend是内存级的,也可以改为文件级进行持久化保存

    当checkpoint启动时,jobManager会将检查点分界线(barrier)注入数据流 

    barrier会在算子间传递下去

    每个算子会对当前状态做个快照,保存到状态后端

    checkpoint机制可以保证内部状态一致性

    每个内部的transform任务遇到barrier时,都会把状态存到checkpoint里

    sink任务首先把数据写入外部kafka,这些数据都属于预提交的事务;遇到barrier时,把sink的状态保存到状态后端,并开启新的预提交事务 (遇到barrier 新开一个事务 不是 提交事务) 

    当所有算子任务的快照完成,也就是这次checkpoint完成时,jobManager会向所有任务发通知,确认这次checkpoint完成

    sink任务收到确认通知,正式提交之前的事务,kafka中未确认数据改为“已确认”

     

    调用flinkkafkaproducer时要使用2PC实现exactlyonce的话 除了要传之前调用正常的参数意外 还要传入一个Semantic的枚举类型 semantic里面的值时表示 语义的三种 NONE      AT_LEAST_ONCE      EXACTLY_ONCE 如果不传  默认使用 AT_LEAST_ONCE

    面试题

    解释下flink状态机制?

    flink内置的很多算子,包括源source 数据存储sink都是有状态的。在flink中,状态始终与特定算子相关联。flink会以checkpoint的形式对各个状态进行快照,用于保证故障恢复时的状态一致性。Flink通过状态后端来管理状态和checkpoint的存储,状态后端可以有不同的配置选择。

    Processed: 0.012, SQL: 9