Flink provides a Command-Line Interface (CLI)
bin/flink to run programs that
are packaged as JAR files and to control their execution. The CLI is part of any
Flink setup, available in local single node setups and in distributed setups.
It connects to the running JobManager specified in
A prerequisite for the commands listed in this section to work is to have a running Flink deployment like Kubernetes, YARN or any other option available. Feel free to start a Flink cluster locally to try the commands on your own machine.
Submitting a job means uploading the job’s JAR and related dependencies to the Flink cluster and
initiating the job execution. For the sake of this example, we select a long-running job like
examples/streaming/StateMachineExample.jar. Feel free to select any other JAR archive from the
examples/ folder or deploy your own job.
Submitting the job using
--detached will make the command return after the submission is done.
The output contains (besides other things) the ID of the newly submitted job.
Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>] Usage with Kafka: StateMachineExample --kafka-topic <topic> [--brokers <brokers>] Options for both the above setups: [--backend <file|rocks>] [--checkpoint-dir <filepath>] [--async-checkpoints <true|false>] [--incremental-checkpoints <true|false>] [--output <filepath> OR null for stdout] Using standalone source with error rate 0.000000 and sleep delay 1 millis Job has been submitted with JobID cca7bc1061d61cf15238e92312c2fc20
The usage information printed lists job-related parameters that can be added to the end of the job
submission command if necessary. For the purpose of readability, we assume that the returned JobID is
stored in a variable
JOB_ID for the commands below:
There is another action called
run-application available to run the job in
Application Mode. This documentation does not address
this action individually as it works similarly to the
run action in terms of the CLI frontend.
You can monitor any running jobs using the
Waiting for response... ------------------ Running/Restarting Jobs ------------------- 30.11.2020 16:02:29 : cca7bc1061d61cf15238e92312c2fc20 : State machine job (RUNNING) -------------------------------------------------------------- No scheduled jobs.
Jobs that were submitted but not started, yet, would be listed under “Scheduled Jobs”.
Savepoints can be created to save the current state a job is in. All that’s needed is the JobID:
Triggering savepoint for job cca7bc1061d61cf15238e92312c2fc20. Waiting for response... Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab You can resume your program from this savepoint with the run command.
The savepoint folder is optional and needs to be specified if state.savepoints.dir isn’t set.
The path to the savepoint can be used later on to restart the Flink job.
savepoint action can be also used to remove savepoints.
--dispose with the corresponding
savepoint path needs to be added:
Disposing savepoint '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab'. Waiting for response... Savepoint '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab' disposed.
If you use custom state instances (for example custom reducing state or RocksDB state), you have to
specify the path to the program JAR with which the savepoint was triggered. Otherwise, you will run
Triggering the savepoint disposal through the
savepoint action does not only remove the data from
the storage but makes Flink clean up the savepoint-related metadata as well.
Another action for stopping a job is
stop. It is a more graceful way of stopping a running streaming
job as the
stop flows from source to sink. When the user requests to stop a job, all sources will
be requested to send the last checkpoint barrier that will trigger a savepoint, and after the successful
completion of that savepoint, they will finish by calling their
Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint. Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
We have to use
--savepointPath to specify the savepoint folder if
state.savepoints.dir isn’t set.
--drain flag is specified, then a
MAX_WATERMARK will be emitted before the last checkpoint
barrier. This will make all registered event-time timers fire, thus flushing out any state that
is waiting for a specific watermark, e.g. windows. The job will keep running until all sources properly
shut down. This allows the job to finish processing all in-flight data.
Cancelling a job can be achieved through the
Cancelling job cca7bc1061d61cf15238e92312c2fc20. Cancelled job cca7bc1061d61cf15238e92312c2fc20.
The corresponding job’s state will be transitioned from
Cancelled. Any computations
will be stopped.
Starting a job from a savepoint can be achieved using the
Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>] Usage with Kafka: StateMachineExample --kafka-topic <topic> [--brokers <brokers>] Options for both the above setups: [--backend <file|rocks>] [--checkpoint-dir <filepath>] [--async-checkpoints <true|false>] [--incremental-checkpoints <true|false>] [--output <filepath> OR null for stdout] Using standalone source with error rate 0.000000 and sleep delay 1 millis Job has been submitted with JobID 97b20a0a8ffd5c1d656328b0cd6436a6
See how the command is equal to the initial run command except for the
--fromSavepoint parameter which is used to refer to the state of the
previously stopped job. A new JobID is
generated that can be used to maintain the job.
By default, we try to match the whole savepoint state to the job being submitted. If you want to
allow to skip savepoint state that cannot be restored with the new job you can set the
--allowNonRestoredState flag. You need to allow this if you removed an operator from your program
that was part of the program when the savepoint was triggered and you still want to use the savepoint.
This is useful if your program dropped an operator that was part of the savepoint.
Here’s an overview of actions supported by Flink’s CLI tool:
||This action executes jobs. It requires at least the jar containing the job. Flink- or job-related arguments can be passed if necessary.|
This action executes jobs in
Application Mode. Other than that, it requires the same parameters as the
||This action can be used to print an optimized execution graph of the passed job. Again, the jar containing the job needs to be passed.|
||This action lists all running or scheduled jobs.|
This action can be used to create or disposing savepoints for a given job. It might be
necessary to specify a savepoint directory besides the JobID, if the
parameter was not specified in
||This action can be used to cancel running jobs based on their JobID.|
This action combines the
A more fine-grained description of all actions and their parameters can be accessed through
or the usage information of each individual action
bin/flink <action> --help.
The Flink cluster can be also managed using the REST API. The commands
described in previous sections are a subset of what is offered by Flink’s REST endpoints. Therefore,
curl can be used to get even more out of Flink.
Flink is compatible with multiple cluster management frameworks like Kubernetes or YARN which are described in more detail in the Resource Provider section. Jobs can be submitted in different Deployment Modes. The parameterization of a job submission differs based on the underlying framework and Deployment Mode.
bin/flink offers a parameter
--target to handle the different options. In addition to that, jobs
have to be submitted using either
run (for Session
and Per-Job Mode) or
Application Mode). See the following summary of
./bin/flink run --target yarn-session: Submission to an already running Flink on YARN cluster
./bin/flink run --target yarn-per-job: Submission spinning up a Flink on YARN cluster in Per-Job Mode
./bin/flink run-application --target yarn-application: Submission spinning up Flink on YARN cluster in Application Mode
./bin/flink run --target kubernetes-session: Submission to an already running Flink on Kubernetes cluster
./bin/flink run-application --target kubernetes-application: Submission spinning up a Flink on Kubernetes cluster in Application Mode
./bin/flink run --target remote: Submission to an already running Flink on Mesos cluster
./bin/flink run --target local: Local submission using a MiniCluster in Session Mode
./bin/flink run --target remote: Submission to an already running Flink cluster
--target will overwrite the execution.target
specified in the
For more details on the commands and the available options, please refer to the Resource Provider-specific pages of the documentation.
Currently, users are able to submit a PyFlink job via the CLI. It does not require to specify the JAR file path or the entry main class, which is different from the Java job submission.
Note When submitting Python job via
flink run, Flink will run the command “python”. Please run the following command to confirm that the python executable in current environment points to a supported Python version of 3.5+.
The following commands show different PyFlink job submission use-cases:
--pyFileswill be added to the
PYTHONPATHand, therefore, available in the Python code.
--jarfilewill be uploaded to the cluster.
<jobmanagerHost>(adapt the command accordingly):
<ClusterId>, it requires a docker image with PyFlink installed, please refer to Enabling PyFlink in docker: