public class RocksDBStateBackend extends AbstractStateBackend
RocksDB
. This state backend can
store very large state that exceeds memory and spills to disk.
All key/value state (including windows) is stored in the key/value index of RocksDB. For persistence against loss of machines, checkpoints take a snapshot of the RocksDB database, and persist that snapshot in a file system (by default) or another configurable state backend.
The behavior of the RocksDB instances can be parametrized by setting RocksDB Options
using the methods setPredefinedOptions(PredefinedOptions)
and
setOptions(OptionsFactory)
.
FS_STATE_BACKEND_NAME, MEMORY_STATE_BACKEND_NAME, ROCKSDB_STATE_BACKEND_NAME
Constructor and Description |
---|
RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend)
Creates a new
RocksDBStateBackend that uses the given state backend to store its
checkpoint data streams. |
RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend,
boolean enableIncrementalCheckpointing)
Creates a new
RocksDBStateBackend that uses the given state backend to store its
checkpoint data streams. |
RocksDBStateBackend(String checkpointDataUri)
Creates a new
RocksDBStateBackend that stores its checkpoint data in the
file system and location defined by the given URI. |
RocksDBStateBackend(String checkpointDataUri,
boolean enableIncrementalCheckpointing)
Creates a new
RocksDBStateBackend that stores its checkpoint data in the
file system and location defined by the given URI. |
RocksDBStateBackend(URI checkpointDataUri)
Creates a new
RocksDBStateBackend that stores its checkpoint data in the
file system and location defined by the given URI. |
RocksDBStateBackend(URI checkpointDataUri,
boolean enableIncrementalCheckpointing)
Creates a new
RocksDBStateBackend that stores its checkpoint data in the
file system and location defined by the given URI. |
Modifier and Type | Method and Description |
---|---|
<K> AbstractKeyedStateBackend<K> |
createKeyedStateBackend(Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry)
Creates a new
AbstractKeyedStateBackend that is responsible for holding keyed state
and checkpointing it. |
OperatorStateBackend |
createOperatorStateBackend(Environment env,
String operatorIdentifier)
Creates a new
OperatorStateBackend that can be used for storing operator state. |
CheckpointStreamFactory |
createSavepointStreamFactory(JobID jobId,
String operatorIdentifier,
String targetLocation)
Creates a
CheckpointStreamFactory that can be used to create streams
that should end up in a savepoint. |
CheckpointStreamFactory |
createStreamFactory(JobID jobId,
String operatorIdentifier)
Creates a
CheckpointStreamFactory that can be used to create streams
that should end up in a checkpoint. |
org.rocksdb.ColumnFamilyOptions |
getColumnOptions()
Gets the RocksDB
ColumnFamilyOptions to be used for all RocksDB instances. |
org.rocksdb.DBOptions |
getDbOptions()
Gets the RocksDB
DBOptions to be used for all RocksDB instances. |
String[] |
getDbStoragePaths() |
OptionsFactory |
getOptions()
Gets the options factory that lazily creates the RocksDB options.
|
PredefinedOptions |
getPredefinedOptions()
Gets the currently set predefined options for RocksDB.
|
void |
setDbStoragePath(String path)
Sets the path where the RocksDB local database files should be stored on the local
file system.
|
void |
setDbStoragePaths(String... paths)
Sets the paths across which the local RocksDB database files are distributed on the local
file system.
|
void |
setOptions(OptionsFactory optionsFactory)
Sets
Options for the RocksDB instances. |
void |
setPredefinedOptions(PredefinedOptions options)
Sets the predefined options for RocksDB.
|
String |
toString() |
loadStateBackendFromConfig, loadStateBackendFromConfigOrCreateDefault
public RocksDBStateBackend(String checkpointDataUri) throws IOException
RocksDBStateBackend
that stores its checkpoint data in the
file system and location defined by the given URI.
A state backend that stores checkpoints in HDFS or S3 must specify the file system host and port in the URI, or have the Hadoop configuration that describes the file system (host / high-availability group / possibly credentials) either referenced from the Flink config, or included in the classpath.
checkpointDataUri
- The URI describing the filesystem and path to the checkpoint data directory.IOException
- Thrown, if no file system can be found for the scheme in the URI.public RocksDBStateBackend(String checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException
RocksDBStateBackend
that stores its checkpoint data in the
file system and location defined by the given URI.
A state backend that stores checkpoints in HDFS or S3 must specify the file system host and port in the URI, or have the Hadoop configuration that describes the file system (host / high-availability group / possibly credentials) either referenced from the Flink config, or included in the classpath.
checkpointDataUri
- The URI describing the filesystem and path to the checkpoint data directory.enableIncrementalCheckpointing
- True if incremental checkpointing is enabled.IOException
- Thrown, if no file system can be found for the scheme in the URI.public RocksDBStateBackend(URI checkpointDataUri) throws IOException
RocksDBStateBackend
that stores its checkpoint data in the
file system and location defined by the given URI.
A state backend that stores checkpoints in HDFS or S3 must specify the file system host and port in the URI, or have the Hadoop configuration that describes the file system (host / high-availability group / possibly credentials) either referenced from the Flink config, or included in the classpath.
checkpointDataUri
- The URI describing the filesystem and path to the checkpoint data directory.IOException
- Thrown, if no file system can be found for the scheme in the URI.public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException
RocksDBStateBackend
that stores its checkpoint data in the
file system and location defined by the given URI.
A state backend that stores checkpoints in HDFS or S3 must specify the file system host and port in the URI, or have the Hadoop configuration that describes the file system (host / high-availability group / possibly credentials) either referenced from the Flink config, or included in the classpath.
checkpointDataUri
- The URI describing the filesystem and path to the checkpoint data directory.enableIncrementalCheckpointing
- True if incremental checkpointing is enabled.IOException
- Thrown, if no file system can be found for the scheme in the URI.public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend)
RocksDBStateBackend
that uses the given state backend to store its
checkpoint data streams. Typically, one would supply a filesystem or database state backend
here where the snapshots from RocksDB would be stored.
The snapshots of the RocksDB state will be stored using the given backend's
checkpoint stream
.
checkpointStreamBackend
- The backend to store thepublic RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing)
RocksDBStateBackend
that uses the given state backend to store its
checkpoint data streams. Typically, one would supply a filesystem or database state backend
here where the snapshots from RocksDB would be stored.
The snapshots of the RocksDB state will be stored using the given backend's
checkpoint stream
.
checkpointStreamBackend
- The backend to store theenableIncrementalCheckpointing
- True if incremental checkponting is enabledpublic CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException
StateBackend
CheckpointStreamFactory
that can be used to create streams
that should end up in a checkpoint.createStreamFactory
in interface StateBackend
createStreamFactory
in class AbstractStateBackend
jobId
- The JobID
of the job for which we are creating checkpoint streams.operatorIdentifier
- An identifier of the operator for which we create streams.IOException
public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId, String operatorIdentifier, String targetLocation) throws IOException
StateBackend
CheckpointStreamFactory
that can be used to create streams
that should end up in a savepoint.
This is only called if the triggered checkpoint is a savepoint. Commonly this will return the same factory as for regular checkpoints, but maybe slightly adjusted.
createSavepointStreamFactory
in interface StateBackend
createSavepointStreamFactory
in class AbstractStateBackend
jobId
- The JobID
of the job for which we are creating checkpoint streams.operatorIdentifier
- An identifier of the operator for which we create streams.targetLocation
- An optional custom location for the savepoint stream.IOException
- Failures during stream creation are forwarded.public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) throws IOException
StateBackend
AbstractKeyedStateBackend
that is responsible for holding keyed state
and checkpointing it.
Keyed State is state where each value is bound to a key.
createKeyedStateBackend
in interface StateBackend
createKeyedStateBackend
in class AbstractStateBackend
K
- The type of the keys by which the state is organized.IOException
public void setDbStoragePath(String path)
Passing null
to this function restores the default behavior, where the configured
temp directories will be used.
path
- The path where the local RocksDB database files are stored.public void setDbStoragePaths(String... paths)
Each distinct state will be stored in one path, but when the state backend creates multiple states, they will store their files on different paths.
Passing null
to this function restores the default behavior, where the configured
temp directories will be used.
paths
- The paths across which the local RocksDB database files will be spread.public String[] getDbStoragePaths()
public void setPredefinedOptions(PredefinedOptions options)
If a user-defined options factory is set (via setOptions(OptionsFactory)
),
then the options from the factory are applied on top of the here specified
predefined options.
options
- The options to set (must not be null).public PredefinedOptions getPredefinedOptions()
setPredefinedOptions(PredefinedOptions)
)
are PredefinedOptions.DEFAULT
.
If a user-defined options factory is set (via setOptions(OptionsFactory)
),
then the options from the factory are applied on top of the predefined options.
public void setOptions(OptionsFactory optionsFactory)
Options
for the RocksDB instances.
Because the options are not serializable and hold native code references,
they must be specified through a factory.
The options created by the factory here are applied on top of the pre-defined
options profile selected via setPredefinedOptions(PredefinedOptions)
.
If the pre-defined options profile is the default
(PredefinedOptions.DEFAULT
), then the factory fully controls the RocksDB
options.
optionsFactory
- The options factory that lazily creates the RocksDB options.public OptionsFactory getOptions()
public org.rocksdb.DBOptions getDbOptions()
DBOptions
to be used for all RocksDB instances.public org.rocksdb.ColumnFamilyOptions getColumnOptions()
ColumnFamilyOptions
to be used for all RocksDB instances.public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception
StateBackend
OperatorStateBackend
that can be used for storing operator state.
Operator state is state that is associated with parallel operator (or function) instances, rather than with keys.
createOperatorStateBackend
in interface StateBackend
createOperatorStateBackend
in class AbstractStateBackend
env
- The runtime environment of the executing task.operatorIdentifier
- The identifier of the operator whose state should be stored.Exception
- This method may forward all exceptions that occur while instantiating the backend.Copyright © 2014–2018 The Apache Software Foundation. All rights reserved.