This Getting Started section guides you through setting up a fully functional Flink Cluster on YARN.
Apache Hadoop YARN is a resource provider popular with many data processing frameworks. Flink services are submitted to YARN’s ResourceManager, which spawns containers on machines managed by YARN NodeManagers. Flink deploys its JobManager and TaskManager instances into such containers.
Flink can dynamically allocate and de-allocate TaskManager resources depending on the number of processing slots required by the job(s) running on the JobManager.
This Getting Started section assumes a functional YARN environment, starting from version 2.4.1. YARN environments are provided most conveniently through services such as Amazon EMR, Google Cloud DataProc or products like Cloudera. Manually setting up a YARN environment locally or on a cluster is not recommended for following through this Getting Started tutorial.
yarn top. It should show no error messages.
HADOOP_CLASSPATHenvironment variable is set up (it can be checked by running
echo $HADOOP_CLASSPATH). If not, set it up using
Once you’ve made sure that the
HADOOP_CLASSPATH environment variable is set, you can launch a Flink on YARN session, and submit an example job:
Congratulations! You have successfully run a Flink application by deploying Flink on YARN.
For production use, we recommend deploying Flink Applications in the Per-Job or Application Mode, as these modes provide a better isolation for the Applications.
Application Mode will launch a Flink cluster on YARN, where the main() method of the application jar gets executed on the JobManager in YARN.
The cluster will shut down as soon as the application has finished. You can manually stop the cluster using
yarn application -kill <ApplicationId> or by cancelling the Flink job.
Once an Application Mode cluster is deployed, you can interact with it for operations like cancelling or taking a savepoint.
Note that cancelling your job on an Application Cluster will stop the cluster.
To unlock the full potential of the application mode, consider using it with the
yarn.provided.lib.dirs configuration option
and pre-upload your application jar to a location accessible by all nodes in your cluster. In this case, the
command could look like:
The above will allow the job submission to be extra lightweight as the needed Flink jars and the application jar are going to be picked up by the specified remote locations rather than be shipped to the cluster by the client.
The Per-Job Cluster mode will launch a Flink cluster on YARN, then run the provided application jar locally and finally submit the JobGraph to the JobManager on YARN. If you pass the
--detached argument, the client will stop once the submission is accepted.
The YARN cluster will stop once the job has stopped.
Once a Per-Job Cluster is deployed, you can interact with it for operations like cancelling or taking a savepoint.
Note that cancelling your job on an Per-Job Cluster will stop the cluster.
We describe deployment with the Session Mode in the Getting Started guide at the top of the page.
The Session Mode has two operation modes:
yarn-session.shclient submits the Flink cluster to YARN, but the client keeps running, tracking the state of the cluster. If the cluster fails, the client will show the error. If the client gets terminated, it will signal the cluster to shut down as well.
yarn-session.shclient submits the Flink cluster to YARN, then the client returns. Another invocation of the client, or YARN tools is needed to stop the Flink cluster.
The session mode will create a hidden YARN properties file in
/tmp/.yarn-properties-<username>, which will be picked up for cluster discovery by the command line interface when submitting a job.
You can also manually specifiy the target YARN cluster in the command line interface when submitting a Flink job. Here’s an example:
./bin/flink run -t yarn-session -Dyarn.application.id=application_XXXX_YY ./examples/streaming/TopSpeedWindowing.jar
You can re-attach to a YARN session using the following command:
./bin/yarn-session.sh -id application_XXXX_YY
Besides passing configuration via the
conf/flink-conf.yaml file, you can also pass any configuration at submission time to the
./bin/yarn-session.sh client using
The YARN session client also has a few “shortcut arguments” for commonly used settings. They can be listed with
The YARN-specific configurations are listed on the configuration page.
The following configuration parameters are managed by Flink on YARN, as they might get overwritten by the framework at runtime:
jobmanager.rpc.address(dynamically set to the address of the JobManager container by Flink on YARN)
io.tmp.dirs(If not set, Flink sets the temporary directories defined by YARN)
high-availability.cluster-id(automatically generated ID to distinguish multiple clusters in the HA service)
If you need to pass additional Hadoop configuration files to Flink, you can do so via the
HADOOP_CONF_DIR environment variable, which accepts a directory name containing Hadoop configuration files. By default, all required Hadoop configuration files are loaded from the classpath via the
HADOOP_CLASSPATH environment variable.
A JobManager running on YARN will request additional TaskManagers, if it can not run all submitted jobs with the existing resources. In particular when running in Session Mode, the JobManager will, if needed, allocate additional TaskManagers as additional jobs are submitted. Unused TaskManagers are freed up again after a timeout.
The memory configurations for JobManager and TaskManager processes will be respected by the YARN implementation. The number of reported VCores is by default equal to the number of configured slots per TaskManager. The yarn.containers.vcores allows overwriting the number of vcores with a custom value. In order for this parameter to work you should enable CPU scheduling in your YARN cluster.
Failed containers (including the JobManager) are replaced by YARN. The maximum number of JobManager container restarts is configured via yarn.application-attempts (default 1). The YARN Application will fail once all attempts are exhausted.
High-Availability on YARN is achieved through a combination of YARN and a high availability service.
Once a HA service is configured, it will persist JobManager metadata and perform leader elections.
YARN is taking care of restarting failed JobManagers. The maximum number of JobManager restarts is defined through two configuration parameters. First Flink’s yarn.application-attempts configuration will default 2. This value is limited by YARN’s yarn.resourcemanager.am.max-attempts, which also defaults to 2.
Note that Flink is managing the
high-availability.cluster-id configuration parameter when deploying on YARN.
Flink sets it per default to the YARN application id.
You should not overwrite this parameter when deploying an HA cluster on YARN.
The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper).
Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.
Flink on YARN is compiled against Hadoop 2.4.1, and all Hadoop versions
>= 2.4.1 are supported, including Hadoop 3.x.
For providing Flink with the required Hadoop dependencies, we recommend setting the
HADOOP_CLASSPATH environment variable already introduced in the Getting Started / Preparation section.
If that is not possible, the dependencies can also be put into the
lib/ folder of Flink.
Flink also offers pre-bundled Hadoop fat jars for placing them in the
lib/ folder, on the Downloads / Additional Components section of the website. These pre-bundled fat jars are shaded to avoid dependency conflicts with common libraries. The Flink community is not testing the YARN integration against these pre-bundled jars.
Some YARN clusters use firewalls for controlling the network traffic between the cluster and the rest of the network. In those setups, Flink jobs can only be submitted to a YARN session from within the cluster’s network (behind the firewall). If this is not feasible for production use, Flink allows to configure a port range for its REST endpoint, used for the client-cluster communication. With this range configured, users can also submit jobs to Flink crossing the firewall.
The configuration parameter for specifying the REST endpoint port is rest.bind-port. This configuration option accepts single ports (for example: “50010”), ranges (“50000-50025”), or a combination of both.
By default Flink will include the user jars into the system classpath when running a single job. This behavior can be controlled with the yarn.per-job-cluster.include-user-jar parameter.
When setting this to
DISABLED Flink will include the jar in the user classpath instead.
The user-jars position in the classpath can be controlled by setting the parameter to one of the following:
ORDER: (default) Adds the jar to the system classpath based on the lexicographic order.
FIRST: Adds the jar to the beginning of the system classpath.
LAST: Adds the jar to the end of the system classpath.