大数据计算引擎之Flink Flink状态管理和容错

原文地址:大数据计算引擎之Flink Flink状态管理和容错

有状态计算

在Flink架构体系中,有状态计算可以说是Flink非常重要的特征之一。有状态计算是指在程序计算过程中,在Flink程序内部,存储计算产生的中间结果,并提供给Functions 或 孙子计算结果使用。如图所示:
大数据计算引擎之Flink Flink状态管理和容错
状态数据可以维系在本地存储中,这里的存储可以是 Flink 的堆内存或者堆外内存,也可以借助第三方的存储介质,例如:Flink中已经实现的RocksDB,当然用户也可以自己实现相应的缓存系统去存储状态信息,以完成更加复杂的计算逻辑。和状态计算不同的是,无状态计算不会存储计算过程中产生的结果,也不会将结果用于下一步计算过程中,程序只会在当前的计算流程中实行计算,计算完成就输出结果,然后下一条数据接入,然后处理。
无状态计算实现的复杂度相对较低,实现起来比较容易,但是无法完成提到的比较复杂的业务场景,例如:

  • [ ] 用户想实现CEP(复杂事件处理),获取符合某一特定时间规则的事件,状态计算就可以将接入的事件进行存储,然后等待符合规则的事件触发;
  • [ ] 用户想要按照 minutes / hour / day 等进行聚合计算,求取当前最大值、均值等聚合指标,这就需要利用状态来维护当前计算过程中产生的结果,例如事件的总数、总和以及最大,最小值等;
  • [ ] 用户想在 Srteam 上实现机器学习的模型训练,状态计算可以帮助用户维护当前版本模型使用的参数;
  • [ ] 用户想使用历史的数据进行计算,状态计算可以帮助用户对数据进行缓存,使用户可以直接从状态中获取相应的历史数据。

Flink 状态及应用

状态类型

在 Flink 中根据数据集是否根据 Key 进行分区,将状态分为 Keyed State 和 Operator State(Non-Keyed State) 两种类型。

Keyed State
表示和key相关的一种state ,只能用于 KeyedStream 类型数据集对应的Functions和Operators之上。Keyed State 是 Operator State 的特例,区别在于 Keyed State 事先按照 key 对数据集进行了分区,每个 Key State 仅对应一个 Operator 和 Key 的组合。 Keyed State 可以通过 Key Group 进行管理,主要用于当算子并行度发生变化时,自动重新分布 Keyed State 数据。

Operator State
与 Keyed State 不同的是,Operator State 只和并行的算子实例绑定,和数据元素中的 Key 无关,每个算子实例中持有所有数据元素中的一部分状态数据。 Operator State 支持当算子实例并行度发生变化时自动重新分配状态数据。

同时在Flink中 Keyed State 和 Operator State 均具有两种形式,其中一种为托管状态(Managered State)形式,由Flink Runtime 中控制和管理状态数据,并将状态数据转换称为内存Hash tables 或 Recks DB 的对象存储,然后将这些状态数据通过内部接口持久化到 Checkpoints 中,任务异常时可以通过这些状态数据恢复任务。另外一种是原生状态(Row State)形式,由算子自己管理数据结构,当触发 Checkpoints 过程中,Flink并不知道状态数据内部的数据结构,只是将数据转换成 bytes 数据存储在 Checkpoints 中,当从 Checkpoints 恢复任务时,算子自己在反序列化出状态的数据结构。
Notes: Flink中推荐用户使用 Managered State 管理状态数据,主要原因是:Manager State 能够更好的支持状态数据的重平衡以及更加完善的内存管理。

Managered Keyed State

Flink 有以下Managered Keyed State 类型可以使用,每种状态都有相应的的使用场景,用户可以根据实际需求选择使用。

  • [ ] ValueState[T]: 与 Key 对应单个值的状态,例如统计 user_id 对应的交易次数,每次用户交易都会在 count 状态值上进行更新。 ValueState 对应的更新方法是 update(T) , 取值是 T value() ;
  • [ ] ListState[T]: 与 Key 对应元素列表的状态,状态中存放元素的 List 列表。例如定义 ListValue存储用户经常访问的 IP 地址。在 ListState 中添加元素使用 add(T) , addAll(List[T]) 两个方法。获取元素使用 Iterable<T> get() 方法,更新元素使用 update(List[T])方法;
  • [ ] ReducingState[T]: 定义与 Key 相关的数据元素单个聚合值的状态,用户存储经过指定 ReduceFunction 计算之后的指标,因此,ReduceState 需要指定ReduceFunction 完成状态数据的聚合。ReducingState 添加元素使用 add(T)方法,获取元素使用 T get() ;
  • [ ] AggregeateState[IN,OUT]: 定义 与key相关的数据元素单个聚合值的状态,用于维护数据经过指定 AggregateFunction 计算之后的指标。和ReducingState相比,AggregeateState 的输入输出类型不一定相同,但ReducingState 输入/出 类型必须保持一致。和ListState相似,AggregatingState 需要指定AggregateFunction完成状态数据的聚合操作。AggregatringState添加元素使用 add(IN) 方法, 获取元素使用 OUT get() 方法;
  • [ ] MapState<UK, UV>:这会保留一个映射列表。您可以将键值对放入状态并检索Iterable所有当前存储的映射。使用put(UK, UV)或 添加映射putAll(Map[UK,UV])(Map<UK, UV>)。可以使用来检索与用户键关联的值get(UK)。对于映射,键和值可迭代视图可以使用被检索entries()keys()values()分别。

Stateful Function定义
示例:
在RichFlatMapFunction 中定义 ValueState,已完成最小值的获取:

inputStream.keyBy(_._1).flatMap(
      // (String,Long,Int) 输入类型
      // (String,Long,Long) 输出类型
      new RichFlatMapFunction[(Int,Long) , (Int,Long,Long)] {
        private var leastValueState:ValueState[Long] = _
        // 定义状态名称
        private var leastValueStateDesc:ValueStateDescriptor[Long] = _
        override def open(parameters: Configuration): Unit = {
          // 指定状态类型
          leastValueStateDesc = new ValueStateDescriptor[Long]("leastValueState" , classOf[Long])
          // 通过 getRuntimeContext.getState 拿到状态
          leastValueState = getRuntimeContext.getState(leastValueStateDesc)
        }
        override def flatMap(value: (Int, Long), out: Collector[(Int, Long, Long)]): Unit = {
          // 通过 value 拿到最小值
          val leastValue: Long = leastValueState.value()

          // 如果前一个指标大于最小值,则直接输出数据元素和最小值
          if ( leastValue != 0L && value._2 > leastValue){
            out.collect((value._1 , value._2 , leastValue))
          }else{
            // 如果当前指标小于最小值,则更新状态中的最小值
            leastValueState.update(value._2)
            // 将当前数据中的指标作为最小值输出
            out.collect(value._1 , value._2 , value._2)
          }
        }
      }).print()

State生命周期
对于任何类型 Keyed State 都可以设定状态生命周期(TTL),以确保能够在规定时间内即时清理状态数据。状态生命周期功能可通过 StateTtlConfig 配置然后将 StateTtlConfig 配置传入StateDescriptor 中的 enableTimeToLive 方法中即可。Keyed State 配置实例如下所示:

val config: StateTtlConfig = StateTtlConfig
            // 指定TTL时长为 5s
            .newBuilder(Time.seconds(5))
            // 指定TTL 刷新只对创建和写入操作有效
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            // 指定状态可见性不返回过期数据
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build()
          leastValueStateDesc.enableTimeToLive(config)

在StateTtlConfig中除了通过 newBuilder() 方法中设定过期时间的参数是必须的之外,其他的参数都是可选的或使用默认值。其中 setUpdateType方法中传入的类型有两种:

  1. StateTtlConfig.UpdateType.onCreateAndWrite 仅在创建和写入时更新 TTL ;
  2. StateTtlConfig.UpdateType.OnReadAndWriter 仅在读与写操作都更新 TTL ;
    需要注意的是,过期的状态数据根据UpdateType参数进行配置,只有被写入或者读取的是时间才会更新TTL,也就是说如果某个状态指标一直不被使用活着更新,则永远不会触发对该状态数据的清理操作,这种情况可能会导致系统中的状态数据越来越大。

另外,可以通过 setStateVisibility 方法设定状态的可见性,根据过期数据是否被清理来确定是否返回状态数据:

  1. StateTtlConfig.StateVisibility.NeverReturnExpired: 状态数据过期就不会返回(默认)
  2. StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp: 状态数据即使过期但没有被清理依然返回

Scala DataStream API中使用状态

直接上代码片段:

inputStream.keyBy(_._1)
      // 指定输入参数类型和状态参数类型
      .mapWithState((in:(Int,Long) , count : Option[Int]) =>
        // 判断count 类型是否非空
        count match {
          // 输出 key , count 并在原来 count 数据上累加
          case Some(c) => ((in._1 , c) , Some(c + in._2))
            // 如果状态为空,则将指标填入
          case None => ((in._1 , 0) , Some(in._2))
        }
      )

Manager Operator State

Operator State 是一种 non-keyed-state ,与并行的操作算子实例相关联,例如在 Kafka Connector 中,每个 Kafka 消费端算子实例都对应到 Kafka 的一个分区中,维护Topic分区和 Offsets 偏移量作为算子的 Operator State. 在Flink中可以实现 CheckpointedFunction 或者 ListCheckpoint<T extends Serializable>两个接口来定义操作 Managered Operator State 的函数。

通过 CheckpointedFunction 接口操作Operator State
CheckpointedFunction 接口定义如图:

@PublicEvolving
@SuppressWarnings("deprecation")
public interface CheckpointedFunction {

    /**
     * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
     * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
     * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
     *
     * @param context the context for drawing a snapshot of the operator
     * @throws Exception
     */
    void snapshotState(FunctionSnapshotContext context) throws Exception;

    /**
     * This method is called when the parallel function instance is created during distributed
     * execution. Functions typically set up their state storing data structures in this method.
     *
     * @param context the context for initializing the operator
     * @throws Exception
     */
    void initializeState(FunctionInitializationContext context) throws Exception;
}

在每个独立的算子中,Managered Operator State 都是以 List 形式存储的,算子和算子之间的状态数据相互独立,List存储比较适合于状态数据的重新分布,Flink目前支持Manager Operator State 两种重要分布策略,分别是 Event-split Redistribution 和 Union Redistribution。

  • [ ] Event-split Redistribution: 每个算子实例中含有部分元素的List列表,整个状态数据是所有List列表,整个状态数据是所有List列表的合集。当触发 restore/redistribution 动作时,通过将状态数据平均分配成与算子并行度相同数量的List列表,每个 task 实例中有一个 List,其可以为空或者含有多个元素。
  • [ ] Union Redistribution: 每个算子实例中含有所有状态元素的List 列表,当触发 restore/redistribution 动作时,每个算子可以获取到完整的状态元素列表。

Checkpoints 和 Savepoints

状态管理器

Querable State

相关推荐