Important: Maven artifacts which depend on Scala are now suffixed with the Scala major version, e.g. "2.10" or "2.11". Please consult the migration guide on the project Wiki.

Cluster Execution

Flink programs can run distributed on clusters of many machines. There are two ways to send a program to a cluster for execution:

Command Line Interface

The command line interface lets you submit packaged programs (JARs) to a cluster (or single machine setup).

Please refer to the Command Line Interface documentation for details.

Remote Environment

The remote environment lets you execute Flink Java programs on a cluster directly. The remote environment points to the cluster on which you want to execute the program.

Maven Dependency

If you are developing your program as a Maven project, you have to add the flink-clients module using this dependency:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.10</artifactId>
  <version>1.0.3</version>
</dependency>

Example

The following illustrates the use of the RemoteEnvironment:

public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment
        .createRemoteEnvironment("flink-master", 6123, "/home/user/udfs.jar");

    DataSet<String> data = env.readTextFile("hdfs://path/to/file");

    data
        .filter(new FilterFunction<String>() {
            public boolean filter(String value) {
                return value.startsWith("http://");
            }
        })
        .writeAsText("hdfs://path/to/result");

    env.execute();
}

Note that the program contains custom user code and hence requires a JAR file with the classes of the code attached. The constructor of the remote environment takes the path(s) to the JAR file(s).

Linking with modules not contained in the binary distribution

The binary distribution contains jar packages in the lib folder that are automatically provided to the classpath of your distrbuted programs. Almost all of Flink classes are located there with a few exceptions, for example the streaming connectors and some freshly added modules. To run code depending on these modules you need to make them accessible during runtime, for which we suggest two options:

  1. Either copy the required jar files to the lib folder onto all of your TaskManagers. Note that you have to restart your TaskManagers after this.
  2. Or package them with your code.

The latter version is recommended as it respects the classloader management in Flink.

Packaging dependencies with your usercode with Maven

To provide these dependencies not included by Flink we suggest two options with Maven.

  1. The maven assembly plugin builds a so-called uber-jar (executable jar) containing all your dependencies. The assembly configuration is straight-forward, but the resulting jar might become bulky. See maven-assembly-plugin for further information.
  2. The maven unpack plugin unpacks the relevant parts of the dependencies and then packages it with your code.

Using the latter approach in order to bundle the Kafka connector, flink-connector-kafka you would need to add the classes from both the connector and the Kafka API itself. Add the following to your plugins section.

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-dependency-plugin</artifactId>
    <version>2.9</version>
    <executions>
        <execution>
            <id>unpack</id>
            <!-- executed just before the package phase -->
            <phase>prepare-package</phase>
            <goals>
                <goal>unpack</goal>
            </goals>
            <configuration>
                <artifactItems>
                    <!-- For Flink connector classes -->
                    <artifactItem>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kafka</artifactId>
                        <version>1.0.3</version>
                        <type>jar</type>
                        <overWrite>false</overWrite>
                        <outputDirectory>${project.build.directory}/classes</outputDirectory>
                        <includes>org/apache/flink/**</includes>
                    </artifactItem>
                    <!-- For Kafka API classes -->
                    <artifactItem>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka_<YOUR_SCALA_VERSION></artifactId>
                        <version><YOUR_KAFKA_VERSION></version>
                        <type>jar</type>
                        <overWrite>false</overWrite>
                        <outputDirectory>${project.build.directory}/classes</outputDirectory>
                        <includes>kafka/**</includes>
                    </artifactItem>
                </artifactItems>
            </configuration>
        </execution>
    </executions>
</plugin>

Now when running mvn clean package the produced jar includes the required dependencies.