This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
Important: Maven artifacts which depend on Scala are now suffixed with the Scala major version, e.g. "2.10" or "2.11". Please consult the migration guide on the project Wiki.

Fault Tolerance

Flink’s fault tolerance mechanism recovers programs in the presence of failures and continues to execute them. Such failures include machine hardware failures, network failures, transient program failures, etc.

Streaming Fault Tolerance

Flink has a checkpointing mechanism that recovers streaming jobs after failures. The checkpointing mechanism requires a persistent (or durable) source that can be asked for prior records again (Apache Kafka is a good example of such a source).

The checkpointing mechanism stores the progress in the data sources and data sinks, the state of windows, as well as the user-defined state (see Working with State) consistently to provide exactly once processing semantics. Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured state backend.

The docs on streaming fault tolerance describe in detail the technique behind Flink’s streaming fault tolerance mechanism.

To enable checkpointing, call enableCheckpointing(n) on the StreamExecutionEnvironment, where n is the checkpoint interval in milliseconds.

Other parameters for checkpointing include:

  • Number of retries: The setNumberOfExecutionRerties() method defines how many times the job is restarted after a failure. When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often.

  • exactly-once vs. at-least-once: You can optionally pass a mode to the enableCheckpointing(n) method to choose between the two guarantee levels. Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.

  • number of concurrent checkpoints: By default, the system will not trigger another checkpoint while one is still in progress. This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints (100s of milliseconds) to re-process very little upon failures.

  • checkpoint timeout: The time after which a checkpoint-in-progress is aborted, if it did not complete by then.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
val env = StreamExecutionEnvironment.getExecutionEnvironment()

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

Back to top

Fault Tolerance Guarantees of Data Sources and Sinks

Flink can guarantee exactly-once state updates to user-defined state only when the source participates in the snapshotting mechanism. The following table lists the state update guarantees of Flink coupled with the bundled connectors.

Please read the documentation of each connector to understand the details of the fault tolerance guarantees.

Source Guarantees Notes
Apache Kafka exactly once Use the appropriate Kafka connector for your version
AWS Kinesis Streams exactly once
RabbitMQ at most once (v 0.10) / exactly once (v 1.0)
Twitter Streaming API at most once
Collections exactly once
Files exactly once
Sockets at most once

To guarantee end-to-end exactly-once record delivery (in addition to exactly-once state semantics), the data sink needs to take part in the checkpointing mechanism. The following table lists the delivery guarantees (assuming exactly-once state updates) of Flink coupled with bundled sinks:

Sink Guarantees Notes
HDFS rolling sink exactly once Implementation depends on Hadoop version
Elasticsearch at least once
Kafka producer at least once
Cassandra sink at least once / exactly once exactly once only for idempotent updates
AWS Kinesis Streams at least once
File sinks at least once
Socket sinks at least once
Standard output at least once
Redis sink at least once

Back to top

Restart Strategies

Flink supports different restart strategies which control how the jobs are restarted in case of a failure. The cluster can be started with a default restart strategy which is always used when no job specific restart strategy has been defined. In case that the job is submitted with a restart strategy, this strategy overrides the cluster’s default setting.

The default restart strategy is set via Flink’s configuration file flink-conf.yaml. The configuration parameter restart-strategy defines which strategy is taken. Per default, the no-restart strategy is used. See the following list of available restart strategies to learn what values are supported.

Each restart strategy comes with its own set of parameters which control its behaviour. These values are also set in the configuration file. The description of each restart strategy contains more information about the respective configuration values.

Restart Strategy Value for restart-strategy
Fixed delay fixed-delay
Failure rate failure-rate
No restart none

Apart from defining a default restart strategy, it is possible to define for each Flink job a specific restart strategy. This restart strategy is set programmatically by calling the setRestartStrategy method on the ExecutionEnvironment. Note that this also works for the StreamExecutionEnvironment.

The following example shows how we can set a fixed delay restart strategy for our job. In case of a failure the system tries to restart the job 3 times and waits 10 seconds in-between successive restart attempts.

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts 
  Time.of(10, TimeUnit.SECONDS) // delay
));
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts 
  Time.of(10, TimeUnit.SECONDS) // delay
))

Back to top

Fixed Delay Restart Strategy

The fixed delay restart strategy attempts a given number of times to restart the job. If the maximum number of attempts is exceeded, the job eventually fails. In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.

This strategy is enabled as default by setting the following configuration parameter in flink-conf.yaml.

restart-strategy: fixed-delay
Configuration Parameter Description Default Value
restart-strategy.fixed-delay.attempts Number of restart attempts 1
restart-strategy.fixed-delay.delay Delay between two consecutive restart attempts akka.ask.timeout
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

The fixed delay restart strategy can also be set programmatically:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts 
  Time.of(10, TimeUnit.SECONDS) // delay
));
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts 
  Time.of(10, TimeUnit.SECONDS) // delay
))

Restart Attempts

The number of times that Flink retries the execution before the job is declared as failed is configurable via the restart-strategy.fixed-delay.attempts parameter.

The default value is 1.

Retry Delays

Execution retries can be configured to be delayed. Delaying the retry means that after a failed execution, the re-execution does not start immediately, but only after a certain delay.

Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted.

The default value is the value of akka.ask.timeout.

Back to top

Failure Rate Restart Strategy

The failure rate restart strategy restarts job after failure, but when failure rate (failures per time interval) is exceeded, the job eventually fails. In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.

This strategy is enabled as default by setting the following configuration parameter in flink-conf.yaml.

restart-strategy: failure-rate
Configuration Parameter Description Default Value
restart-strategy.failure-rate.max-failures-per-interval Maximum number of restarts in given time interval before failing a job 1
restart-strategy.failure-rate.failure-rate-interval Time interval for measuring failure rate. 1 minute
restart-strategy.failure-rate.delay Delay between two consecutive restart attempts akka.ask.timeout
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

The failure rate restart strategy can also be set programmatically:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // max failures per interval
  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
  Time.of(10, TimeUnit.SECONDS) // delay
));
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // max failures per unit
  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
  Time.of(10, TimeUnit.SECONDS) // delay
))

Back to top

No Restart Strategy

The job fails directly and no restart is attempted.

restart-strategy: none

The no restart strategy can also be set programmatically:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())

Back to top