Programs written in the Data Stream API often hold state in various forms:
CheckpointedFunctioninterface to make their local variables fault tolerant
See also state section in the streaming API guide.
When checkpointing is activated, such state is persisted upon checkpoints to guard against data loss and recover consistently. How the state is represented internally, and how and where it is persisted upon checkpoints depends on the chosen State Backend.
Out of the box, Flink bundles these state backends:
If nothing else is configured, the system will use the MemoryStateBackend.
The MemoryStateBackend holds data internally as objects on the Java heap. Key/value state and window operators hold hash tables that store the values, triggers, etc.
Upon checkpoints, this state backend will snapshot the state and send it as part of the checkpoint acknowledgement messages to the JobManager (master), which stores it on its heap as well.
The MemoryStateBackend can be configured to use asynchronous snapshots. While we strongly encourage the use of asynchronous snapshots to avoid blocking pipelines, please note that this is currently enabled
by default. To disable this feature, users can instantiate a
MemoryStateBackend with the corresponding boolean flag in the constructor set to
false(this should only used for debug), e.g.:
Limitations of the MemoryStateBackend:
The MemoryStateBackend is encouraged for:
The FsStateBackend is configured with a file system URL (type, address, path), such as “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
The FsStateBackend holds in-flight data in the TaskManager’s memory. Upon checkpointing, it writes state snapshots into files in the configured file system and directory. Minimal metadata is stored in the JobManager’s memory (or, in high-availability mode, in the metadata checkpoint).
The FsStateBackend uses asynchronous snapshots by default to avoid blocking the processing pipeline while writing state checkpoints. To disable this feature, users can instantiate a
FsStateBackend with the corresponding boolean flag in the constructor set to
The FsStateBackend is encouraged for:
The RocksDBStateBackend is configured with a file system URL (type, address, path), such as “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
The RocksDBStateBackend holds in-flight data in a RocksDB database that is (per default) stored in the TaskManager data directories. Upon checkpointing, the whole RocksDB database will be checkpointed into the configured file system and directory. Minimal metadata is stored in the JobManager’s memory (or, in high-availability mode, in the metadata checkpoint).
The RocksDBStateBackend always performs asynchronous snapshots.
Limitations of the RocksDBStateBackend:
The RocksDBStateBackend is encouraged for:
Note that the amount of state that you can keep is only limited by the amount of disk space available. This allows keeping very large state, compared to the FsStateBackend that keeps state in memory. This also means, however, that the maximum throughput that can be achieved will be lower with this state backend. All reads/writes from/to this backend have to go through de-/serialization to retrieve/store the state objects, which is also more expensive than always working with the on-heap representation as the heap-based backends are doing.
RocksDBStateBackend is currently the only backend that offers incremental checkpoints (see here).
Certain RocksDB native metrics are available but disabled by default, you can find full documentation here
The default state backend, if you specify nothing, is the jobmanager. If you wish to establish a different default for all jobs on your cluster, you can do so by defining a new default state backend in flink-conf.yaml. The default state backend can be overridden on a per-job basis, as shown below.
The per-job state backend is set on the
StreamExecutionEnvironment of the job, as shown in the example below:
If you want to use the
RocksDBStateBackend in your IDE or configure it programmatically in your Flink job, you will have to add the following dependency to your Flink project.
state.backendand further checkpointing and RocksDB-specific parameters in your
A default state backend can be configured in the
flink-conf.yaml, using the configuration key
Possible values for the config entry are jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend), or the fully qualified class
name of the class that implements the state backend factory StateBackendFactory,
org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory for RocksDBStateBackend.
state.checkpoints.dir option defines the directory to which all backends write checkpoint data and meta data files.
You can find more details about the checkpoint directory structure here.
A sample section in the configuration file could look as follows:
|1||Integer||The number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend.|
|(none)||String||The local directory (on the TaskManager) where RocksDB puts its files.|
|(none)||MemorySize||The fixed total amount of memory, shared among all RocksDB instances per slot. This option overrides the 'state.backend.rocksdb.memory.managed' option when configured. If neither this option, nor the 'state.backend.rocksdb.memory.managed' optionare set, then each RocksDB column family state has its own memory caches (as controlled by the column family options).|
|0.1||Double||The fraction of cache memory that is reserved for high-priority data like index, filter, and compression dictionary blocks. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.|
|true||Boolean||If set, the RocksDB state backend will automatically configure itself to use the managed memory budget of the task slot, and divide the memory over write buffers, indexes, block caches, etc. That way, the three major uses of memory of RocksDB will be capped.|
|0.5||Double||The maximum amount of memory that write buffers may take, as a fraction of the total shared memory. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.|
|"org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory"||String||The options factory class for RocksDB to create DBOptions and ColumnFamilyOptions. The default options factory is org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory, and it would read the configured options which provided in 'RocksDBConfigurableOptions'.|
|"DEFAULT"||String||The predefined settings for RocksDB DBOptions and ColumnFamilyOptions by Flink community. Current supported candidate predefined-options are DEFAULT, SPINNING_DISK_OPTIMIZED, SPINNING_DISK_OPTIMIZED_HIGH_MEM or FLASH_SSD_OPTIMIZED. Note that user customized options and options from the OptionsFactory are applied on top of these predefined ones.|
|"ROCKSDB"||String||This determines the factory for timer service state implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB .|