public abstract class FileMergingSnapshotManagerBase extends Object implements FileMergingSnapshotManager
FileMergingSnapshotManager
.FileMergingSnapshotManager.SubtaskKey
Modifier and Type | Field and Description |
---|---|
protected Path |
checkpointDir |
protected FileSystem |
fs
The
FileSystem that this manager works on. |
protected Executor |
ioExecutor
The executor for I/O operations in this manager.
|
protected Path |
managedExclusiveStateDir
The private state files are merged across subtasks, there is only one directory for
merged-files within one TM per job.
|
protected PhysicalFile.PhysicalFileDeleter |
physicalFileDeleter |
protected Path |
sharedStateDir |
protected boolean |
shouldSyncAfterClosingLogicalFile
File-system dependent value.
|
protected Path |
taskOwnedStateDir |
protected int |
writeBufferSize
The buffer size for writing files to the file system.
|
Constructor and Description |
---|
FileMergingSnapshotManagerBase(String id,
Executor ioExecutor) |
Modifier and Type | Method and Description |
---|---|
void |
close() |
FileMergingCheckpointStateOutputStream |
createCheckpointStateOutputStream(FileMergingSnapshotManager.SubtaskKey subtaskKey,
long checkpointId,
CheckpointedStateScope scope)
Create a new
FileMergingCheckpointStateOutputStream . |
protected LogicalFile |
createLogicalFile(PhysicalFile physicalFile,
long startOffset,
long length,
FileMergingSnapshotManager.SubtaskKey subtaskKey)
Create a logical file on a physical file.
|
protected PhysicalFile |
createPhysicalFile(FileMergingSnapshotManager.SubtaskKey subtaskKey,
CheckpointedStateScope scope)
Create a physical file in right location (managed directory), which is specified by scope of
this checkpoint and current subtask.
|
protected void |
deletePhysicalFile(Path filePath)
Delete a physical file by given file path.
|
protected Path |
generatePhysicalFilePath(Path dirPath)
Generate a file path for a physical file.
|
Path |
getManagedDir(FileMergingSnapshotManager.SubtaskKey subtaskKey,
CheckpointedStateScope scope)
Get the managed directory of the file-merging snapshot manager, created in
FileMergingSnapshotManager.initFileSystem(org.apache.flink.core.fs.FileSystem, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path, int) or FileMergingSnapshotManager.registerSubtaskForSharedStates(org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey) . |
protected abstract PhysicalFile |
getOrCreatePhysicalFileForCheckpoint(FileMergingSnapshotManager.SubtaskKey subtaskKey,
long checkpointId,
CheckpointedStateScope scope)
Get a reused physical file or create one.
|
void |
initFileSystem(FileSystem fileSystem,
Path checkpointBaseDir,
Path sharedStateDir,
Path taskOwnedStateDir,
int writeBufferSize)
Initialize the file system, recording the checkpoint path the manager should work with.
|
void |
registerSubtaskForSharedStates(FileMergingSnapshotManager.SubtaskKey subtaskKey)
Register a subtask and create the managed directory for shared states.
|
protected abstract void |
returnPhysicalFileForNextReuse(FileMergingSnapshotManager.SubtaskKey subtaskKey,
long checkpointId,
PhysicalFile physicalFile)
Try to return an existing physical file to the manager for next reuse.
|
protected final Executor ioExecutor
protected FileSystem fs
FileSystem
that this manager works on.protected Path checkpointDir
protected Path sharedStateDir
protected Path taskOwnedStateDir
protected int writeBufferSize
protected boolean shouldSyncAfterClosingLogicalFile
protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter
protected Path managedExclusiveStateDir
public void initFileSystem(FileSystem fileSystem, Path checkpointBaseDir, Path sharedStateDir, Path taskOwnedStateDir, int writeBufferSize) throws IllegalArgumentException
FileMergingSnapshotManager
The layout of checkpoint directory: /user-defined-checkpoint-dir /{job-id} (checkpointBaseDir) | + --shared/ | + --subtask-1/ + -- merged shared state files + --subtask-2/ + -- merged shared state files + --taskowned/ + -- merged private state files + --chk-1/ + --chk-2/ + --chk-3/
The reason why initializing directories in this method instead of the constructor is that
the FileMergingSnapshotManager itself belongs to the TaskStateManager
, which is
initialized when receiving a task, while the base directories for checkpoint are created by
FsCheckpointStorageAccess
when the state backend initializes per subtask. After the
checkpoint directories are initialized, the managed subdirectories are initialized here.
Note: This method may be called several times, the implementation should ensure
idempotency, and throw IllegalArgumentException
when any of the path in params change
across function calls.
initFileSystem
in interface FileMergingSnapshotManager
fileSystem
- The filesystem to write to.checkpointBaseDir
- The base directory for checkpoints.sharedStateDir
- The directory for shared checkpoint data.taskOwnedStateDir
- The name of the directory for state not owned/released by the
master, but by the TaskManagers.writeBufferSize
- The buffer size for writing files to the file system.IllegalArgumentException
- thrown if these three paths are not deterministic across
calls.public void registerSubtaskForSharedStates(FileMergingSnapshotManager.SubtaskKey subtaskKey)
FileMergingSnapshotManager
registerSubtaskForSharedStates
in interface FileMergingSnapshotManager
subtaskKey
- the subtask key identifying a subtask.for layout information.
protected LogicalFile createLogicalFile(@Nonnull PhysicalFile physicalFile, long startOffset, long length, @Nonnull FileMergingSnapshotManager.SubtaskKey subtaskKey)
physicalFile
- the underlying physical file.startOffset
- the offset in the physical file that the logical file starts from.length
- the length of the logical file.subtaskKey
- the id of the subtask that the logical file belongs to.@Nonnull protected PhysicalFile createPhysicalFile(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope) throws IOException
subtaskKey
- the SubtaskKey
of current subtask.scope
- the scope of the checkpoint.IOException
- if anything goes wrong with file system.public FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope scope)
FileMergingSnapshotManager
FileMergingCheckpointStateOutputStream
. According to the file merging
strategy, the streams returned by multiple calls to this function may share the same
underlying physical file, and each stream writes to a segment of the physical file.createCheckpointStateOutputStream
in interface FileMergingSnapshotManager
subtaskKey
- The subtask key identifying the subtask.checkpointId
- ID of the checkpoint.scope
- The state's scope, whether it is exclusive or shared.protected Path generatePhysicalFilePath(Path dirPath)
dirPath
- the parent directory path for the physical file.protected final void deletePhysicalFile(Path filePath)
filePath
- the given file path to delete.@Nonnull protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope scope) throws IOException
Basic logic of file reusing: whenever a physical file is needed, this method is called
with necessary information provided for acquiring a file. The file will not be reused until
it is written and returned to the reused pool by calling returnPhysicalFileForNextReuse(org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey, long, org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile)
.
subtaskKey
- the subtask key for the callercheckpointId
- the checkpoint idscope
- checkpoint scopeIOException
- thrown if anything goes wrong with file system.protected abstract void returnPhysicalFileForNextReuse(FileMergingSnapshotManager.SubtaskKey subtaskKey, long checkpointId, PhysicalFile physicalFile) throws IOException
Basic logic of file reusing, see getOrCreatePhysicalFileForCheckpoint(org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey, long, org.apache.flink.runtime.state.CheckpointedStateScope)
.
subtaskKey
- the subtask key for the callercheckpointId
- in which checkpoint this physical file is requested.physicalFile
- the returning checkpointIOException
- thrown if anything goes wrong with file system.#getOrCreatePhysicalFileForCheckpoint(SubtaskKey, long, CheckpointedStateScope)
public Path getManagedDir(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope scope)
FileMergingSnapshotManager
FileMergingSnapshotManager.initFileSystem(org.apache.flink.core.fs.FileSystem, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path, int)
or FileMergingSnapshotManager.registerSubtaskForSharedStates(org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey)
.getManagedDir
in interface FileMergingSnapshotManager
subtaskKey
- the subtask key identifying the subtask.scope
- the checkpoint scope.public void close() throws IOException
close
in interface Closeable
close
in interface AutoCloseable
IOException
Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.