@PublicEvolving public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend
Each checkpoint individually will store all its files in a subdirectory that includes the
checkpoint number, such as hdfs://namenode:port/flink-checkpoints/chk-17/
.
Working state is kept on the TaskManager heap. If a TaskManager executes multiple tasks concurrently (if the TaskManager has multiple slots, or if slot-sharing is used) then the aggregate state of all tasks needs to fit into that TaskManager's memory.
This state backend stores small state chunks directly with the metadata, to avoid creating
many small files. The threshold for that is configurable. When increasing this threshold, the
size of the checkpoint metadata increases. The checkpoint metadata of all retained completed
checkpoints needs to fit into the JobManager's heap memory. This is typically not a problem,
unless the threshold getMinFileSizeThreshold()
is increased significantly.
Checkpoints from this state backend are as persistent and available as filesystem that is written to. If the file system is a persistent distributed file system, this state backend supports highly available setups. The backend additionally supports savepoints and externalized checkpoints.
As for all state backends, this backend can either be configured within the application (by creating the backend with the respective constructor parameters and setting it on the execution environment) or by specifying it in the Flink configuration.
If the state backend was specified in the application, it may pick up additional configuration
parameters from the Flink configuration. For example, if the backend if configured in the application
without a default savepoint directory, it will pick up a default savepoint directory specified in the
Flink configuration of the running job/cluster. That behavior is implemented via the
configure(Configuration)
method.
Constructor and Description |
---|
FsStateBackend(Path checkpointDataUri)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(Path checkpointDataUri,
boolean asynchronousSnapshots)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(String checkpointDataUri)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(String checkpointDataUri,
boolean asynchronousSnapshots)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(URI checkpointDataUri)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(URI checkpointDataUri,
boolean asynchronousSnapshots)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(URI checkpointDataUri,
int fileStateSizeThreshold)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(URI checkpointDataUri,
int fileStateSizeThreshold,
boolean asynchronousSnapshots)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(URI checkpointDataUri,
URI defaultSavepointDirectory)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
FsStateBackend(URI checkpointDirectory,
URI defaultSavepointDirectory,
int fileStateSizeThreshold,
TernaryBoolean asynchronousSnapshots)
Creates a new state backend that stores its checkpoint data in the file system and location
defined by the given URI.
|
Modifier and Type | Method and Description |
---|---|
FsStateBackend |
configure(Configuration config)
Creates a copy of this state backend that uses the values defined in the configuration
for fields where that were not specified in this state backend.
|
CheckpointStorage |
createCheckpointStorage(JobID jobId)
Creates a storage for checkpoints for the given job.
|
<K> AbstractKeyedStateBackend<K> |
createKeyedStateBackend(Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup)
Creates a new
AbstractKeyedStateBackend that is responsible for holding keyed state
and checkpointing it. |
OperatorStateBackend |
createOperatorStateBackend(Environment env,
String operatorIdentifier)
Creates a new
OperatorStateBackend that can be used for storing operator state. |
Path |
getBasePath()
Deprecated.
Deprecated in favor of
getCheckpointPath() . |
Path |
getCheckpointPath()
Gets the base directory where all the checkpoints are stored.
|
int |
getMinFileSizeThreshold()
Gets the threshold below which state is stored as part of the metadata, rather than in files.
|
boolean |
isUsingAsynchronousSnapshots()
Gets whether the key/value data structures are asynchronously snapshotted.
|
String |
toString() |
getSavepointPath, resolveCheckpoint
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
createKeyedStateBackend, createKeyedStateBackend
public FsStateBackend(String checkpointDataUri)
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots)
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.asynchronousSnapshots
- Switch to enable asynchronous snapshots.public FsStateBackend(Path checkpointDataUri)
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.public FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots)
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.asynchronousSnapshots
- Switch to enable asynchronous snapshots.public FsStateBackend(URI checkpointDataUri)
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.public FsStateBackend(URI checkpointDataUri, @Nullable URI defaultSavepointDirectory)
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.defaultSavepointDirectory
- The default directory to store savepoints to. May be null.public FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots)
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.asynchronousSnapshots
- Switch to enable asynchronous snapshots.public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold)
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.fileStateSizeThreshold
- State up to this size will be stored as part of the metadata,
rather than in filespublic FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold, boolean asynchronousSnapshots)
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDataUri
- The URI describing the filesystem (scheme and optionally authority),
and the path to the checkpoint data directory.fileStateSizeThreshold
- State up to this size will be stored as part of the metadata,
rather than in files (-1 for default value).asynchronousSnapshots
- Switch to enable asynchronous snapshots.public FsStateBackend(URI checkpointDirectory, @Nullable URI defaultSavepointDirectory, int fileStateSizeThreshold, TernaryBoolean asynchronousSnapshots)
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
must be accessible via FileSystem.get(URI)
.
For a state backend targeting HDFS, this means that the URI must either specify the authority (host and port), or that the Hadoop configuration that describes that information must be in the classpath.
checkpointDirectory
- The path to write checkpoint metadata to.defaultSavepointDirectory
- The path to write savepoints to. If null, the value from
the runtime configuration will be used, or savepoint
target locations need to be passed when triggering a savepoint.fileStateSizeThreshold
- State below this size will be stored as part of the metadata,
rather than in files. If -1, the value configured in the
runtime configuration will be used, or the default value (1KB)
if nothing is configured.asynchronousSnapshots
- Flag to switch between synchronous and asynchronous
snapshot mode. If UNDEFINED, the value configured in the
runtime configuration will be used.@Deprecated public Path getBasePath()
getCheckpointPath()
.@Nonnull public Path getCheckpointPath()
getCheckpointPath
in class AbstractFileStateBackend
public int getMinFileSizeThreshold()
If not explicitly configured, this is the default value of
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD
.
public boolean isUsingAsynchronousSnapshots()
If not explicitly configured, this is the default value of
CheckpointingOptions.ASYNC_SNAPSHOTS
.
public FsStateBackend configure(Configuration config)
configure
in interface ConfigurableStateBackend
config
- the configurationpublic CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException
StateBackend
createCheckpointStorage
in interface StateBackend
jobId
- The job to store checkpoint data for.IOException
- Thrown if the checkpoint storage cannot be initialized.public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup)
StateBackend
AbstractKeyedStateBackend
that is responsible for holding keyed state
and checkpointing it.
Keyed State is state where each value is bound to a key.
createKeyedStateBackend
in interface StateBackend
createKeyedStateBackend
in class AbstractStateBackend
K
- The type of the keys by which the state is organized.public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier)
StateBackend
OperatorStateBackend
that can be used for storing operator state.
Operator state is state that is associated with parallel operator (or function) instances, rather than with keys.
createOperatorStateBackend
in interface StateBackend
createOperatorStateBackend
in class AbstractStateBackend
env
- The runtime environment of the executing task.operatorIdentifier
- The identifier of the operator whose state should be stored.Copyright © 2014–2020 The Apache Software Foundation. All rights reserved.