This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
Important: Maven artifacts which depend on Scala are now suffixed with the Scala major version, e.g. "2.10" or "2.11". Please consult the migration guide on the project Wiki.

Fault Tolerance

Flinkā€™s fault tolerance mechanism recovers programs in the presence of failures and continues to execute them. Such failures include machine hardware failures, network failures, transient program failures, etc.

Batch Processing Fault Tolerance (DataSet API)

Fault tolerance for programs in the DataSet API works by retrying failed executions. The number of time that Flink retries the execution before the job is declared as failed is configurable via the execution retries parameter. A value of 0 effectively means that fault tolerance is deactivated.

To activate the fault tolerance, set the execution retries to a value larger than zero. A common choice is a value of three.

This example shows how to configure the execution retries for a Flink DataSet program.

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setNumberOfExecutionRetries(3);
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setNumberOfExecutionRetries(3)

You can also define default values for the number of execution retries and the retry delay in the flink-conf.yaml:

execution-retries.default: 3

Retry Delays

Execution retries can be configured to be delayed. Delaying the retry means that after a failed execution, the re-execution does not start immediately, but only after a certain delay.

Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted.

You can set the retry delay for each program as follows (the sample shows the DataStream API - the DataSet API works similarly):

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setExecutionRetryDelay(5000); // 5000 milliseconds delay
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.getConfig.setExecutionRetryDelay(5000) // 5000 milliseconds delay

You can also define the default value for the retry delay in the flink-conf.yaml:

execution-retries.delay: 10 s

Back to top