Google Cloud Storage
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Google Cloud Storage #

Google Cloud storage (GCS) provides cloud storage for a variety of use cases. You can use it for reading and writing data, and for checkpoint storage when using FileSystemCheckpointStorage) with the streaming state backends.

You can use GCS objects like regular files by specifying paths in the following format:

gs://<your-bucket>/<endpoint>

The endpoint can either be a single file or a directory, for example:

// Read from GSC bucket
env.readTextFile("gs://<bucket>/<endpoint>");

// Write to GCS bucket
stream.writeAsText("gs://<bucket>/<endpoint>");

// Use GCS as checkpoint storage
env.getCheckpointConfig().setCheckpointStorage("gs://<bucket>/<endpoint>");

Libraries #

You must include the following jars in Flink’s lib directory to connect Flink with gcs:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-shaded-hadoop2-uber</artifactId>
  <version>${flink.shared_hadoop_latest_version}</version>
</dependency>

<dependency>
  <groupId>com.google.cloud.bigdataoss</groupId>
  <artifactId>gcs-connector</artifactId>
  <version>hadoop2-2.2.0</version>
</dependency>

We have tested with flink-shared-hadoop2-uber version >= 2.8.3-1.8.3. You can track the latest version of the gcs-connector hadoop 2.

Authentication to access GCS #

Most operations on GCS require authentication. Please see the documentation on Google Cloud Storage authentication for more information.

You can use the following method for authentication

  • Configure via core-site.xml You would need to add the following properties to core-site.xml

    <configuration>
      <property>
        <name>google.cloud.auth.service.account.enable</name>
        <value>true</value>
      </property>
      <property>
        <name>google.cloud.auth.service.account.json.keyfile</name>
        <value><PATH TO GOOGLE AUTHENTICATION JSON></value>
      </property>
    </configuration>
    

    You would need to add the following to flink-conf.yaml

    flinkConfiguration:
      fs.hdfs.hadoopconf: <DIRECTORY PATH WHERE core-site.xml IS SAVED>
    
  • You can provide the necessary key via the GOOGLE_APPLICATION_CREDENTIALS environment variable.

Back to top