@Public public abstract class FileSystem extends Object
Flink implements and supports some file system types directly (for example the default machine-local file system). Other file system types are accessed by an implementation that bridges to the suite of file systems supported by Hadoop (such as for example HDFS).
The purpose of this abstraction is not to give user programs an abstraction with extreme flexibility and control across all possible file systems. That mission would be a folly, as the differences in characteristics of even the most common file systems are already quite large. It is expected that user programs that need specialized functionality of certain file systems in their functions, operations, sources, or sinks instantiate the specialized file system adapters directly.
output streams
are used to persistently store data,
both for results of streaming applications and for fault tolerance and recovery. It is therefore
crucial that the persistence semantics of these streams are well defined.
LocalFileSystem
does not provide any durability guarantees for crashes of both
hardware and operating system, while replicated distributed file systems (like HDFS)
typically guarantee durability in the presence of at most n concurrent node failures,
where n is the replication factor.Updates to the file's parent directory (such that the file shows up when listing the directory contents) are not required to be complete for the data in the file stream to be considered persistent. This relaxation is important for file systems where updates to directory contents are only eventually consistent.
The FSDataOutputStream
has to guarantee data persistence for the written bytes
once the call to FSDataOutputStream.close()
returns.
Whether data has hit non-volatile storage on the storage nodes depends on the specific guarantees of the particular file system.
The metadata updates to the file's parent directory are not required to have reached a consistent state. It is permissible that some machines see the file when listing the parent directory's contents while others do not, as long as access to the file by its absolute path is possible on all nodes.
The above implies specifically that data may still be in the OS cache when considered persistent from the local file system's perspective. Crashes that cause the OS cache to loose data are considered fatal to the local machine and are not covered by the local file system's guarantees as defined by Flink.
That means that computed results, checkpoints, and savepoints that are written only to the local filesystem are not guaranteed to be recoverable from the local machine's failure, making local file systems unsuitable for production setups.
To avoid these consistency issues, the implementations of failure/recovery mechanisms in Flink strictly avoid writing to the same file path more than once.
FileSystem
must be thread-safe: The same instance of FileSystem
is frequently shared across multiple threads in Flink and must be able to concurrently
create input/output streams and list file metadata.
The FSDataOutputStream
and FSDataOutputStream
implementations are strictly
not thread-safe. Instances of the streams should also not be passed between threads
in between read or write operations, because there are no guarantees about the visibility of
operations across threads (many operations do not create memory fences).
get(URI)
or via
Path.getFileSystem()
), the FileSystem instantiates a safety net for that FileSystem.
The safety net ensures that all streams created from the FileSystem are closed when the
application task finishes (or is canceled or failed). That way, the task's threads do not
leak connections.
Internal runtime code can explicitly obtain a FileSystem that does not use the safety
net via getUnguardedFileSystem(URI)
.
FSDataInputStream
,
FSDataOutputStream
Modifier and Type | Class and Description |
---|---|
static class |
FileSystem.WriteMode
The possible write modes.
|
Constructor and Description |
---|
FileSystem() |
Modifier and Type | Method and Description |
---|---|
FSDataOutputStream |
create(Path f,
boolean overwrite)
Deprecated.
Use
create(Path, WriteMode) instead. |
abstract FSDataOutputStream |
create(Path f,
boolean overwrite,
int bufferSize,
short replication,
long blockSize)
Deprecated.
Deprecated because not well supported across types of file systems.
Control the behavior of specific file systems via configurations instead.
|
abstract FSDataOutputStream |
create(Path f,
FileSystem.WriteMode overwriteMode)
Opens an FSDataOutputStream to a new file at the given path.
|
abstract boolean |
delete(Path f,
boolean recursive)
Delete a file.
|
boolean |
exists(Path f)
Check if exists.
|
static FileSystem |
get(URI uri)
Returns a reference to the
FileSystem instance for accessing the
file system identified by the given URI . |
long |
getDefaultBlockSize()
Return the number of bytes that large input files should be optimally be split into to minimize I/O time.
|
abstract BlockLocation[] |
getFileBlockLocations(FileStatus file,
long start,
long len)
Return an array containing hostnames, offset and size of
portions of the given file.
|
abstract FileStatus |
getFileStatus(Path f)
Return a file status object that represents the path.
|
abstract Path |
getHomeDirectory()
Returns the path of the user's home directory in this file system.
|
abstract FileSystemKind |
getKind()
Gets a description of the characteristics of this file system.
|
static FileSystem |
getLocalFileSystem()
Returns a reference to the
FileSystem instance for accessing the
local file system. |
static FileSystem |
getUnguardedFileSystem(URI uri) |
abstract URI |
getUri()
Returns a URI whose scheme and authority identify this file system.
|
abstract Path |
getWorkingDirectory()
Returns the path of the file system's current working directory.
|
abstract void |
initialize(URI name)
Called after a new FileSystem instance is constructed.
|
boolean |
initOutPathDistFS(Path outPath,
FileSystem.WriteMode writeMode,
boolean createDirectory)
Initializes output directories on distributed file systems according to the given write mode.
|
boolean |
initOutPathLocalFS(Path outPath,
FileSystem.WriteMode writeMode,
boolean createDirectory)
Initializes output directories on local file systems according to the given write mode.
|
abstract boolean |
isDistributedFS()
Returns true if this is a distributed file system.
|
static boolean |
isFlinkSupportedScheme(String scheme)
Returns a boolean indicating whether a scheme has built-in Flink support.
|
abstract FileStatus[] |
listStatus(Path f)
List the statuses of the files/directories in the given path if the path is
a directory.
|
abstract boolean |
mkdirs(Path f)
Make the given file and all non-existent parents into directories.
|
abstract FSDataInputStream |
open(Path f)
Opens an FSDataInputStream at the indicated Path.
|
abstract FSDataInputStream |
open(Path f,
int bufferSize)
Opens an FSDataInputStream at the indicated Path.
|
abstract boolean |
rename(Path src,
Path dst)
Renames the file/directory src to dst.
|
static void |
setDefaultScheme(Configuration config)
Sets the default filesystem scheme based on the user-specified configuration parameter
fs.default-scheme . |
public static FileSystem getLocalFileSystem()
FileSystem
instance for accessing the
local file system.FileSystem
instance for accessing the
local file system.public static void setDefaultScheme(Configuration config) throws IOException
Sets the default filesystem scheme based on the user-specified configuration parameter
fs.default-scheme
. By default this is set to file:///
(see ConfigConstants.FILESYSTEM_SCHEME
and
ConfigConstants.DEFAULT_FILESYSTEM_SCHEME
),
and the local filesystem is used.
As an example, if set to hdfs://localhost:9000/
, then an HDFS deployment
with the namenode being on the local node and listening to port 9000 is going to be used.
In this case, a file path specified as /user/USERNAME/in.txt
is going to be transformed into hdfs://localhost:9000/user/USERNAME/in.txt
. By
default this is set to file:///
which points to the local filesystem.
config
- the configuration from where to fetch the parameter.IOException
@Internal public static FileSystem getUnguardedFileSystem(URI uri) throws IOException
IOException
public static FileSystem get(URI uri) throws IOException
FileSystem
instance for accessing the
file system identified by the given URI
.uri
- the URI
identifying the file systemFileSystem
instance for accessing the file system identified by the given
URI
.IOException
- thrown if a reference to the file system instance could not be obtainedpublic static boolean isFlinkSupportedScheme(String scheme)
scheme
- a file system schemepublic abstract Path getWorkingDirectory()
public abstract Path getHomeDirectory()
public abstract URI getUri()
public abstract void initialize(URI name) throws IOException
name
- a URI
whose authority section names the host, port, etc. for this file systemIOException
public abstract FileStatus getFileStatus(Path f) throws IOException
f
- The path we want information fromFileNotFoundException
- when the path does not exist;
IOException see specific implementationIOException
public abstract BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException
IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException
f
- the file name to openbufferSize
- the size of the buffer to be used.IOException
public abstract FSDataInputStream open(Path f) throws IOException
f
- the file to openIOException
public long getDefaultBlockSize()
public abstract FileStatus[] listStatus(Path f) throws IOException
f
- given pathIOException
public boolean exists(Path f) throws IOException
f
- source fileIOException
public abstract boolean delete(Path f, boolean recursive) throws IOException
f
- the path to deleterecursive
- if path is a directory and set to true
, the directory is deleted else throws an exception. In
case of a file the recursive can be set to either true
or false
true
if delete is successful, false
otherwiseIOException
public abstract boolean mkdirs(Path f) throws IOException
f
- the directory/directories to be createdtrue
if at least one new directory has been created, false
otherwiseIOException
- thrown if an I/O error occurs while creating the directory@Deprecated public abstract FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException
This method is deprecated, because most of its parameters are ignored by most file systems. To control for example the replication factor and block size in the Hadoop Distributed File system, make sure that the respective Hadoop configuration file is either linked from the Flink configuration, or in the classpath of either Flink or the user code.
f
- the file name to openoverwrite
- if a file with this name already exists, then if true,
the file will be overwritten, and if false an error will be thrown.bufferSize
- the size of the buffer to be used.replication
- required block replication for the file.blockSize
- the size of the file blocksIOException
- Thrown, if the stream could not be opened because of an I/O, or because
a file already exists at that path and the write mode indicates to not
overwrite the file.@Deprecated public FSDataOutputStream create(Path f, boolean overwrite) throws IOException
create(Path, WriteMode)
instead.f
- the file name to openoverwrite
- if a file with this name already exists, then if true,
the file will be overwritten, and if false an error will be thrown.IOException
- Thrown, if the stream could not be opened because of an I/O, or because
a file already exists at that path and the write mode indicates to not
overwrite the file.public abstract FSDataOutputStream create(Path f, FileSystem.WriteMode overwriteMode) throws IOException
If the file already exists, the behavior depends on the given WriteMode
.
If the mode is set to FileSystem.WriteMode.NO_OVERWRITE
, then this method fails with an
exception.
f
- The file path to write tooverwriteMode
- The action to take if a file or directory already exists at the given path.IOException
- Thrown, if the stream could not be opened because of an I/O, or because
a file already exists at that path and the write mode indicates to not
overwrite the file.public abstract boolean rename(Path src, Path dst) throws IOException
src
- the file/directory to renamedst
- the new name of the file/directorytrue
if the renaming was successful, false
otherwiseIOException
public abstract boolean isDistributedFS()
public abstract FileSystemKind getKind()
public boolean initOutPathLocalFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory) throws IOException
Files contained in an existing directory are not deleted, because multiple instances of a DataSinkTask might call this function at the same time and hence might perform concurrent delete operations on the file system (possibly deleting output files of concurrently running tasks). Since concurrent DataSinkTasks are not aware of each other, coordination of delete and create operations would be difficult.
outPath
- Output path that should be prepared.writeMode
- Write mode to consider.createDirectory
- True, to initialize a directory at the given path, false to prepare space for a file.IOException
- Thrown, if any of the file system access operations failed.public boolean initOutPathDistFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory) throws IOException
outPath
- Output path that should be prepared.writeMode
- Write mode to consider.createDirectory
- True, to initialize a directory at the given path, false otherwise.IOException
- Thrown, if any of the file system access operations failed.Copyright © 2014–2018 The Apache Software Foundation. All rights reserved.