Every function and operator in Flink can be stateful (see working with state for details). Stateful functions store data across the processing of individual elements/events, making state a critical building block for any type of more elaborate operation.
In order to make state fault tolerant, Flink needs to checkpoint the state. Checkpoints allow Flink to recover state and positions in the streams to give the application the same semantics as a failure-free execution.
The documentation on streaming fault tolerance describe in detail the technique behind Flink’s streaming fault tolerance mechanism.
Flink’s checkpointing mechanism interacts with durable storage for streams and state. In general, it requires:
By default, checkpointing is disabled. To enable checkpointing, call
enableCheckpointing(n) on the
StreamExecutionEnvironment, where n is the checkpoint interval in milliseconds.
Other parameters for checkpointing include:
exactly-once vs. at-least-once: You can optionally pass a mode to the
enableCheckpointing(n) method to choose between the two guarantee levels.
Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.
checkpoint timeout: The time after which a checkpoint-in-progress is aborted, if it did not complete by then.
minimum time between checkpoints: To make sure that the streaming application makes a certain amount of progress between checkpoints, one can define how much time needs to pass between checkpoints. If this value is set for example to 5000, the next checkpoint will be started no sooner than 5 seconds after the previous checkpoint completed, regardless of the checkpoint duration and the checkpoint interval. Note that this implies that the checkpoint interval will never be smaller than this parameter.
It is often easier to configure applications by defining the “time between checkpoints” then the checkpoint interval, because the “time between checkpoints” is not susceptible to the fact that checkpoints may sometimes take longer than on average (for example if the target storage system is temporarily slow).
Note that this value also implies that the number of concurrent checkpoints is one.
number of concurrent checkpoints: By default, the system will not trigger another checkpoint while one is still in progress. This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints (100s of milliseconds) to re-process very little upon failures.
This option cannot be used when a minimum time between checkpoints is defined.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // start a checkpoint every 1000 ms env.enableCheckpointing(1000); // advanced options: // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
val env = StreamExecutionEnvironment.getExecutionEnvironment() // start a checkpoint every 1000 ms env.enableCheckpointing(1000) // advanced options: // set mode to exactly-once (this is the default) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig.setCheckpointTimeout(60000) // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
The checkpointing mechanism stores the progress in the data sources and data sinks, the state of windows, as well as the user-defined state consistently to provide exactly once processing semantics. Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured State Backend.
By default state will be kept in memory, and checkpoints will be stored in-memory at the master node (the JobManager). For proper persistence of large state,
Flink supports various forms of storing and checkpointing state in so called State Backends, which can be set via
See state backends for more details on the available state backends and options for job-wide and cluster-wide configuration.
Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing:
env.enableCheckpointing(interval, force = true).
Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.
Flink supports different restart strategies which control how the jobs are restarted in case of a failure. For more information, see Restart Strategies.