Amazon Simple Storage Service (Amazon S3) provides cloud object storage for a variety of use cases. You can use S3 with Flink for reading and writing data as well in conjunction with the streaming state backends.
You can use S3 objects like regular files by specifying paths in the following format:
The endpoint can either be a single file or a directory, for example:
Note that these examples are not exhaustive and you can use S3 in other places as well, including your high availability setup or the RocksDBStateBackend; everywhere that Flink expects a FileSystem URI.
For most use cases, you may use one of our shaded
flink-s3-fs-presto S3 filesystem wrappers which are self-contained and easy to set up.
For some cases, however, e.g., for using S3 as YARN’s resource storage dir, it may be necessary to set up a specific Hadoop S3 FileSystem implementation.
Flink provides two file systems to talk to Amazon S3,
Both implementations are self-contained with no dependency footprint, so there is no need to add Hadoop to the classpath to use them.
flink-s3-fs-presto, registered under the scheme “s3://” and “s3p://”, is based on code from the Presto project.
You can configure it the same way you can configure the Presto file system by placing adding the configurations to your
flink-conf.yaml. Presto is the recommended file system for checkpointing to S3.
flink-s3-fs-hadoop, registered under “s3://” and “s3a://”, based on code from the Hadoop Project.
The file system can be configured exactly like Hadoop’s s3a by placing adding the configurations to your
flink-conf.yaml. Shaded Hadoop is the only S3 file system with support for the StreamingFileSink.
flink-s3-fs-presto register default FileSystem
wrappers for URIs with the
flink-s3-fs-hadoop also registers
flink-s3-fs-presto also registers for
s3p://, so you can
use this to use both at the same time.
For example, the job uses the StreamingFileSink which only supports Hadoop, but uses Presto for checkpointing.
In this case, it is advised to use explicitly “s3a://” as a scheme for the sink (Hadoop) and “s3p://” for checkpointing (Presto).
To use either
flink-s3-fs-presto, copy the respective JAR file from the
opt directory to the
lib directory of your Flink distribution before starting Flink, e.g.
After setting up the S3 FileSystem wrapper, you need to make sure that Flink is allowed to access your S3 buckets.
The recommended way of setting up credentials on AWS is via Identity and Access Management (IAM). You can use IAM features to securely give Flink instances the credentials that they need to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are IAM Roles.
If you set this up correctly, you can manage access to S3 within AWS and don’t need to distribute any access keys to Flink.
Access to S3 can be granted via your access and secret key pair. Please note that this is discouraged since the introduction of IAM roles.
You need to configure both
s3.secret-key in Flink’s
The bundled S3 file systems (
flink-s3-fs-hadoop) support entropy injection. Entropy injection is
a technique to improve the scalability of AWS S3 buckets through adding some random characters near the beginning of the key.
If entropy injection is activated, a configured substring in the path is replaced with random characters. For example, path
s3://my-bucket/checkpoints/_entropy_/dashboard-job/ would be replaced by something like
This only happens when the file creation passes the option to inject entropy!
Otherwise, the file path removes the entropy key substring entirely. See FileSystem.create(Path, WriteOption)
To enable entropy injection, configure the entropy key and the entropy length parameters.
s3.entropy.key: _entropy_ s3.entropy.length: 4 (default)
s3.entropy.key defines the string in paths that is replaced by the random characters. Paths that do not contain the entropy key are left unchanged.
If a file system operation does not pass the “inject entropy” write option, the entropy key substring is simply removed.
s3.entropy.length defines the number of random alphanumeric characters used for entropy.