[case49]聊聊flink的checkpoint配置
序
本文主要研究下flink的checkpoint配置
实例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // start a checkpoint every 1000 ms env.enableCheckpointing(1000); // advanced options: // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000); // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure. env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
- 使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint;具体可以使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode);interval用于指定checkpoint的触发间隔(
单位milliseconds
),而CheckpointingMode默认是CheckpointingMode.EXACTLY_ONCE,也可以指定为CheckpointingMode.AT_LEAST_ONCE - 也可以通过StreamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode来设置CheckpointingMode,一般对于超低延迟的应用(
大概几毫秒
)可以使用CheckpointingMode.AT_LEAST_ONCE,其他大部分应用使用CheckpointingMode.EXACTLY_ONCE就可以 - checkpointTimeout用于指定checkpoint执行的超时时间(
单位milliseconds
),超时没完成就会被abort掉 - minPauseBetweenCheckpoints用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1
- maxConcurrentCheckpoints用于指定运行中的checkpoint最多可以有多少个,用于包装topology不会花太多的时间在checkpoints上面;如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(
大于1的值不起作用
) - enableExternalizedCheckpoints用于开启checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state;ExternalizedCheckpointCleanup用于指定当job canceled的时候externalized checkpoint该如何清理,DELETE_ON_CANCELLATION的话,在job canceled的时候会自动删除externalized state,但是如果是FAILED的状态则会保留;RETAIN_ON_CANCELLATION则在job canceled的时候会保留externalized checkpoint state
- failOnCheckpointingErrors用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行
flink-conf.yaml相关配置
#============================================================================== # Fault tolerance and checkpointing #============================================================================== # The backend that will be used to store operator state checkpoints if # checkpointing is enabled. # # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the # <class-name-of-factory>. # # state.backend: filesystem # Directory for checkpoints filesystem, when using any of the default bundled # state backends. # # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints # Default target directory for savepoints, optional. # # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints # Flag to enable/disable incremental checkpoints for backends that # support incremental checkpoints (like the RocksDB state backend). # # state.backend.incremental: false
- state.backend用于指定checkpoint state存储的backend,默认为none
- state.backend.async用于指定backend是否使用异步snapshot(
默认为true
),有些不支持async或者只支持async的state backend可能会忽略这个参数 - state.backend.fs.memory-threshold,默认为1024,用于指定存储于files的state大小阈值,如果小于该值则会存储在root checkpoint metadata file
- state.backend.incremental,默认为false,用于指定是否采用增量checkpoint,有些不支持增量checkpoint的backend会忽略该配置
- state.backend.local-recovery,默认为false
- state.checkpoints.dir,默认为none,用于指定checkpoint的data files和meta data存储的目录,该目录必须对所有参与的TaskManagers及JobManagers可见
- state.checkpoints.num-retained,默认为1,用于指定保留的已完成的checkpoints个数
- state.savepoints.dir,默认为none,用于指定savepoints的默认目录
- taskmanager.state.local.root-dirs,默认为none
小结
- 可以通过使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint;具体可以使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode)
- checkpoint的高级配置可以配置checkpointTimeout(
用于指定checkpoint执行的超时时间,单位milliseconds
),minPauseBetweenCheckpoints(用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint
),maxConcurrentCheckpoints(用于指定运行中的checkpoint最多可以有多少个,如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数大于1的值不起作用
),enableExternalizedCheckpoints(用于开启checkpoints的外部持久化,在job failed的时候externalized checkpoint state无法自动清理,但是在job canceled的时候可以配置是删除还是保留state
) - 在flink-conf.yaml里头也有checkpoint的相关配置,主要是state backend的配置,比如state.backend.async、state.backend.incremental、state.checkpoints.dir、state.savepoints.dir等
doc
相关推荐
暗夜之城 2020-05-07
silencehgt 2020-05-07
woshigzp 2020-01-21
daixiang 2020-01-21
WSNjiang 2020-01-21
jianghuchuanke 2019-11-17
louishao 2019-10-20
xiaoyutongxue 2019-08-16
eternityzzy 2013-05-21
aliushuo 2011-03-06
农村外出务工男 2019-06-21
ououlal 2012-11-13
xorxos 2018-11-08
亚恋之欣 2019-03-25
budding0 2016-05-06
aotou 2012-11-07
PostgreSql 2019-02-24
81901032 2019-02-25