Configuration

For single-node setups Flink is ready to go out of the box and you don’t need to change the default configuration to get started.

The out of the box configuration will use your default Java installation. You can manually set the environment variable JAVA_HOME or the configuration key env.java.home in conf/flink-conf.yaml if you want to manually override the Java runtime to use.

This page lists the most common options that are typically needed to set up a well performing (distributed) installation. In addition a full list of all available configuration parameters is listed here.

All configuration is done in conf/flink-conf.yaml, which is expected to be a flat collection of YAML key value pairs with format key: value.

The system and run scripts parse the config at startup time. Changes to the configuration file require restarting the Flink JobManager and TaskManagers.

The configuration files for the TaskManagers can be different, Flink does not assume uniform machines in the cluster.

Common Options

Key Default Type Description
jobmanager.heap.size
"1024m" String JVM heap size for the JobManager.
taskmanager.memory.total-flink.size
(none) String Total Flink Memory size for the TaskExecutors. This includes all the memory that a TaskExecutor consumes, except for JVM Metaspace and JVM Overhead. It consists of Framework Heap Memory, Task Heap Memory, Task Off-Heap Memory, Managed Memory, and Shuffle Memory.
parallelism.default
1 Integer Default parallelism for jobs.
taskmanager.numberOfTaskSlots
1 Integer The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).
state.backend
(none) String The state backend to be used to store and checkpoint state.
state.checkpoints.dir
(none) String The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers).
state.savepoints.dir
(none) String The default directory for savepoints. Used by the state backends that write savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend).
high-availability
"NONE" String Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.
high-availability.storageDir
(none) String File system path (URI) where Flink persists metadata in high-availability setups.
security.ssl.internal.enabled
false Boolean Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc).
security.ssl.rest.enabled
false Boolean Turns on SSL for external communication via the REST endpoints.

Full Reference

HDFS

Note: These keys are deprecated and it is recommended to configure the Hadoop path with the environment variable HADOOP_CONF_DIR instead.

See also how to configure Hadoop.

These parameters configure the default HDFS used by Flink. Setups that do not specify an HDFS configuration have to specify the full path to HDFS files (hdfs://address:port/path/to/files) Files will also be written with default HDFS parameters (block size, replication factor).

  • fs.hdfs.hadoopconf: The absolute path to the Hadoop File System’s (HDFS) configuration directory (OPTIONAL VALUE). Specifying this value allows programs to reference HDFS files using short URIs (hdfs:///path/to/files, without including the address and port of the NameNode in the file URI). Without this option, HDFS files can be accessed, but require fully qualified URIs like hdfs://address:port/path/to/files. This option also causes file writers to pick up the HDFS’s default values for block sizes and replication factors. Flink will look for the “core-site.xml” and “hdfs-site.xml” files in the specified directory.

  • fs.hdfs.hdfsdefault: The absolute path of Hadoop’s own configuration file “hdfs-default.xml” (DEFAULT: null).

  • fs.hdfs.hdfssite: The absolute path of Hadoop’s own configuration file “hdfs-site.xml” (DEFAULT: null).

Core

Key Default Type Description
classloader.parent-first-patterns.additional
(none) String A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. These patterns are appended to "classloader.parent-first-patterns.default".
classloader.parent-first-patterns.default
"java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback" String A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. To add another pattern we recommend to use "classloader.parent-first-patterns.additional" instead.
classloader.resolve-order
"child-first" String Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).
io.tmp.dirs
'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone. String Directories for temporary files, separated by",", "|", or the system's java.io.File.pathSeparator.
parallelism.default
1 Integer Default parallelism for jobs.

Execution

Key Default Type Description
execution.attached
false Boolean Specifies if the pipeline is submitted in attached or detached mode.
execution.shutdown-on-attached-exit
false Boolean If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.
execution.target
(none) String The deployment target for the execution, e.g. "local" for local execution.
Key Default Type Description
execution.savepoint.ignore-unclaimed-state
false Boolean Allow to skip savepoint state that cannot be restored. Allow this if you removed an operator from your pipeline after the savepoint was triggered.
execution.savepoint.path
(none) String Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).

JobManager

Key Default Type Description
jobmanager.archive.fs.dir
(none) String Dictionary for JobManager to store the archives of completed jobs.
jobmanager.execution.attempts-history-size
16 Integer The maximum number of prior execution attempts kept in history.
jobmanager.execution.failover-strategy
"full" String This option specifies how the job computation recovers from task failures. Accepted values are:
  • 'full': Restarts all tasks to recover the job.
  • 'region': Restarts all tasks that could be affected by the task failure. More details can be found here.
jobmanager.heap.size
"1024m" String JVM heap size for the JobManager.
jobmanager.rpc.address
(none) String The config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.
jobmanager.rpc.port
6123 Integer The config parameter defining the network port to connect to for communication with the job manager. Like jobmanager.rpc.address, this value is only interpreted in setups where a single JobManager with static name/address and port exists (simple standalone setups, or container setups with dynamic service name resolution). This config option is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.
jobstore.cache-size
52428800 Long The job store cache size in bytes which is used to keep completed jobs in memory.
jobstore.expiration-time
3600 Long The time in seconds after which a completed job expires and is purged from the job store.
jobstore.max-capacity
2147483647 Integer The max number of completed jobs that can be kept in the job store.
slot.idle.timeout
50000 Long The timeout in milliseconds for a idle slot in Slot Pool.
slot.request.timeout
300000 Long The timeout in milliseconds for requesting a slot from Slot Pool.

Restart Strategies

Configuration options to control Flink’s restart behaviour in case of job failures.

Key Default Type Description
restart-strategy
(none) String Defines the restart strategy to use in case of job failures.
Accepted values are:
  • none, off, disable: No restart strategy.
  • fixeddelay, fixed-delay: Fixed delay restart strategy. More details can be found here.
  • failurerate, failure-rate: Failure rate restart strategy. More details can be found here.
If checkpointing is disabled, the default value is none. If checkpointing is enabled, the default value is fixed-delay with Integer.MAX_VALUE restart attempts and '1 s' delay.

Fixed Delay Restart Strategy

Key Default Type Description
restart-strategy.fixed-delay.attempts
1 Integer The number of times that Flink retries the execution before the job is declared as failed if restart-strategy has been set to fixed-delay.
restart-strategy.fixed-delay.delay
"1 s" String Delay between two consecutive restart attempts if restart-strategy has been set to fixed-delay. Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted. It can be specified using notation: "1 min", "20 s"

Failure Rate Restart Strategy

Key Default Type Description
restart-strategy.failure-rate.delay
"1 s" String Delay between two consecutive restart attempts if restart-strategy has been set to failure-rate. It can be specified using notation: "1 min", "20 s"
restart-strategy.failure-rate.failure-rate-interval
"1 min" String Time interval for measuring failure rate if restart-strategy has been set to failure-rate. It can be specified using notation: "1 min", "20 s"
restart-strategy.failure-rate.max-failures-per-interval
1 Integer Maximum number of restarts in given time interval before failing a job if restart-strategy has been set to failure-rate.

TaskManager

Key Default Type Description
task.cancellation.interval
30000 Long Time interval between two successive task cancellation attempts in milliseconds.
task.cancellation.timeout
180000 Long Timeout in milliseconds after which a task cancellation times out and leads to a fatal TaskManager error. A value of 0 deactivates the watch dog.
task.cancellation.timers.timeout
7500 Long Time we wait for the timers in milliseconds to finish all pending timer threads when the stream task is cancelled.
task.checkpoint.alignment.max-size
-1 Long The maximum number of bytes that a checkpoint alignment may buffer. If the checkpoint alignment buffers more than the configured amount of data, the checkpoint is aborted (skipped). A value of -1 indicates that there is no limit.
taskmanager.debug.memory.log
false Boolean Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
taskmanager.debug.memory.log-interval
5000 Long The interval (in ms) for the log thread to log the current memory usage.
taskmanager.exit-on-fatal-akka-error
false Boolean Whether the quarantine monitor for task managers shall be started. The quarantine monitor shuts down the actor system if it detects that it has quarantined another actor system or if it has been quarantined by another actor system.
taskmanager.host
(none) String The address of the network interface that the TaskManager binds to. This option can be used to define explicitly a binding address. Because different TaskManagers need different values for this option, usually it is specified in an additional non-shared TaskManager-specific config file.
taskmanager.jvm-exit-on-oom
false Boolean Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.
taskmanager.network.bind-policy
"ip" String The automatic address binding policy used by the TaskManager if "taskmanager.host" is not set. The value should be one of the following:
  • "name" - uses hostname as binding address
  • "ip" - uses host's ip address as binding address
taskmanager.numberOfTaskSlots
1 Integer The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).
taskmanager.registration.initial-backoff
"500 ms" String The initial registration backoff between two consecutive registration attempts. The backoff is doubled for each new registration attempt until it reaches the maximum registration backoff.
taskmanager.registration.max-backoff
"30 s" String The maximum registration backoff between two consecutive registration attempts. The max registration backoff requires a time unit specifier (ms/s/min/h/d).
taskmanager.registration.refused-backoff
"10 s" String The backoff after a registration has been refused by the job manager before retrying to connect.
taskmanager.registration.timeout
"5 min" String Defines the timeout for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates.
taskmanager.rpc.port
"0" String The task manager’s IPC port. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.

For batch jobs Flink allocates a fraction of 0.7 of the free memory (total memory configured via taskmanager.heap.size minus memory used for network buffers) for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents OutOfMemoryExceptions because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.

The default fraction for managed memory can be adjusted using the taskmanager.memory.fraction parameter. An absolute value may be set using taskmanager.memory.size (overrides the fraction parameter). If desired, the managed memory may be allocated outside the JVM heap. This may improve performance in setups with large memory sizes.

Key Default Type Description
taskmanager.memory.framework.heap.size
"128m" String Framework Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for TaskExecutor framework, which will not be allocated to task slots.
taskmanager.memory.jvm-metaspace.size
"192m" String JVM Metaspace Size for the TaskExecutors.
taskmanager.memory.jvm-overhead.fraction
0.1 Float Fraction of Total Process Memory to be reserved for JVM Overhead. This is off-heap memory reserved for JVM overhead, such as thread stack space, I/O direct memory, compile cache, etc. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.
taskmanager.memory.jvm-overhead.max
"1g" String Max JVM Overhead size for the TaskExecutors. This is off-heap memory reserved for JVM overhead, such as thread stack space, I/O direct memory, compile cache, etc. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.
taskmanager.memory.jvm-overhead.min
"128m" String Min JVM Overhead size for the TaskExecutors. This is off-heap memory reserved for JVM overhead, such as thread stack space, I/O direct memory, compile cache, etc. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.
taskmanager.memory.managed.fraction
0.5 Float Fraction of Total Flink Memory to be used as Managed Memory, if Managed Memory size is not explicitly specified.
taskmanager.memory.managed.off-heap.fraction
-1.0 Float Fraction of Managed Memory that Off-Heap Managed Memory takes, if Off-Heap Managed Memory size is not explicitly specified. If the fraction is not explicitly specified (or configured with negative values), it will be derived from the legacy config option 'taskmanager.memory.off-heap', to use either all on-heap memory or all off-heap memory for Managed Memory.
taskmanager.memory.managed.off-heap.size
(none) String Off-Heap Managed Memory size for TaskExecutors. This is the part of Managed Memory that is off-heap, while the remaining is on-heap. If unspecified, it will be derived to make up the configured fraction of the Managed Memory size.
taskmanager.memory.managed.size
(none) String Managed Memory size for TaskExecutors. This is the size of memory managed by the memory manager, including both On-Heap Managed Memory and Off-Heap Managed Memory, reserved for sorting, hash tables, caching of intermediate results and state backends. Memory consumers can either allocate memory from the memory manager in the form of MemorySegments, or reserve bytes from the memory manager and keep their memory usage within that boundary. If unspecified, it will be derived to make up the configured fraction of the Total Flink Memory.
taskmanager.memory.segment-size
"32kb" String Size of memory buffers used by the network stack and the memory manager.
taskmanager.memory.shuffle.fraction
0.1 Float Fraction of Total Flink Memory to be used as Shuffle Memory. Shuffle Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Shuffle Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Shuffle Memory can be explicitly specified by setting the min/max size to the same value.
taskmanager.memory.shuffle.max
"1g" String Max Shuffle Memory size for TaskExecutors. Shuffle Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Shuffle Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Shuffle Memory can be explicitly specified by setting the min/max to the same value.
taskmanager.memory.shuffle.min
"64m" String Min Shuffle Memory size for TaskExecutors. Shuffle Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Shuffle Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Shuffle Memory can be explicitly specified by setting the min/max to the same value.
taskmanager.memory.task.heap.size
(none) String Task Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for user code. If not specified, it will be derived as Total Flink Memory minus Framework Heap Memory, Task Off-Heap Memory, (On-Heap and Off-Heap) Managed Memory and Shuffle Memory.
taskmanager.memory.task.off-heap.size
"0b" String Task Heap Memory size for TaskExecutors. This is the size of off heap memory (JVM direct memory or native memory) reserved for user code.
taskmanager.memory.total-flink.size
(none) String Total Flink Memory size for the TaskExecutors. This includes all the memory that a TaskExecutor consumes, except for JVM Metaspace and JVM Overhead. It consists of Framework Heap Memory, Task Heap Memory, Task Off-Heap Memory, Managed Memory, and Shuffle Memory.
taskmanager.memory.total-process.size
(none) String Total Process Memory size for the TaskExecutors. This includes all the memory that a TaskExecutor consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. On containerized setups, this should be set to the container memory.

Distributed Coordination

Key Default Type Description
cluster.evenly-spread-out-slots
false Boolean Enable the slot spread out allocation strategy. This strategy tries to spread out the slots evenly across all available TaskExecutors.
cluster.registration.error-delay
10000 Long The pause made after an registration attempt caused an exception (other than timeout) in milliseconds.
cluster.registration.initial-timeout
100 Long Initial registration timeout between cluster components in milliseconds.
cluster.registration.max-timeout
30000 Long Maximum registration timeout between cluster components in milliseconds.
cluster.registration.refused-registration-delay
30000 Long The pause made after the registration attempt was refused in milliseconds.
cluster.services.shutdown-timeout
30000 Long The shutdown timeout for cluster services like executors in milliseconds.

Distributed Coordination (via Akka)

Key Default Type Description
akka.ask.timeout
"10 s" String Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d).
akka.client-socket-worker-pool.pool-size-factor
1.0 Double The pool size factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the pool-size-min and pool-size-max values.
akka.client-socket-worker-pool.pool-size-max
2 Integer Max number of threads to cap factor-based number to.
akka.client-socket-worker-pool.pool-size-min
1 Integer Min number of threads to cap factor-based number to.
akka.client.timeout
"60 s" String Timeout for all blocking calls on the client side.
akka.fork-join-executor.parallelism-factor
2.0 Double The parallelism factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the parallelism-min and parallelism-max values.
akka.fork-join-executor.parallelism-max
64 Integer Max number of threads to cap factor-based parallelism number to.
akka.fork-join-executor.parallelism-min
8 Integer Min number of threads to cap factor-based parallelism number to.
akka.framesize
"10485760b" String Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier.
akka.jvm-exit-on-fatal-error
true Boolean Exit JVM on fatal Akka errors.
akka.log.lifecycle.events
false Boolean Turns on the Akka’s remote logging of events. Set this value to 'true' in case of debugging.
akka.lookup.timeout
"10 s" String Timeout used for the lookup of the JobManager. The timeout value has to contain a time-unit specifier (ms/s/min/h/d).
akka.retry-gate-closed-for
50 Long Milliseconds a gate should be closed for after a remote connection was disconnected.
akka.server-socket-worker-pool.pool-size-factor
1.0 Double The pool size factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the pool-size-min and pool-size-max values.
akka.server-socket-worker-pool.pool-size-max
2 Integer Max number of threads to cap factor-based number to.
akka.server-socket-worker-pool.pool-size-min
1 Integer Min number of threads to cap factor-based number to.
akka.ssl.enabled
true Boolean Turns on SSL for Akka’s remote communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true.
akka.startup-timeout
(none) String Timeout after which the startup of a remote component is considered being failed.
akka.tcp.timeout
"20 s" String Timeout for all outbound connections. If you should experience problems with connecting to a TaskManager due to a slow network, you should increase this value.
akka.throughput
15 Integer Number of messages that are processed in a batch before returning the thread to the pool. Low values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness.
akka.transport.heartbeat.interval
"1000 s" String Heartbeat interval for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the interval to a very high value. In case you should need the transport failure detector, set the interval to some reasonable value. The interval value requires a time-unit specifier (ms/s/min/h/d).
akka.transport.heartbeat.pause
"6000 s" String Acceptable heartbeat pause for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the pause to a very high value. In case you should need the transport failure detector, set the pause to some reasonable value. The pause value requires a time-unit specifier (ms/s/min/h/d).
akka.transport.threshold
300.0 Double Threshold for the transport failure detector. Since Flink uses TCP, the detector is not necessary and, thus, the threshold is set to a high value.

REST

Key Default Type Description
rest.address
(none) String The address that should be used by clients to connect to the server.
rest.await-leader-timeout
30000 Long The time in ms that the client waits for the leader address, e.g., Dispatcher or WebMonitorEndpoint
rest.bind-address
(none) String The address that the server binds itself.
rest.bind-port
"8081" String The port that the server binds itself. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Rest servers are running on the same machine.
rest.client.max-content-length
104857600 Integer The maximum content length in bytes that the client will handle.
rest.connection-timeout
15000 Long The maximum time in ms for the client to establish a TCP connection.
rest.idleness-timeout
300000 Long The maximum time in ms for a connection to stay idle before failing.
rest.port
8081 Integer The port that the client connects to. If rest.bind-port has not been specified, then the REST server will bind to this port.
rest.retry.delay
3000 Long The time in ms that the client waits between retries (See also `rest.retry.max-attempts`).
rest.retry.max-attempts
20 Integer The number of retries the client will attempt if a retryable operations fails.
rest.server.max-content-length
104857600 Integer The maximum content length in bytes that the server will handle.
rest.server.numThreads
4 Integer The number of threads for the asynchronous processing of requests.
rest.server.thread-priority
5 Integer Thread priority of the REST server's executor for processing asynchronous requests. Lowering the thread priority will give Flink's main components more CPU time whereas increasing will allocate more time for the REST server's processing.

Blob Server

Key Default Type Description
blob.client.connect.timeout
0 Integer The connection timeout in milliseconds for the blob client.
blob.client.socket.timeout
300000 Integer The socket timeout in milliseconds for the blob client.
blob.fetch.backlog
1000 Integer The config parameter defining the backlog of BLOB fetches on the JobManager.
blob.fetch.num-concurrent
50 Integer The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves.
blob.fetch.retries
5 Integer The config parameter defining number of retires for failed BLOB fetches.
blob.offload.minsize
1048576 Integer The minimum size for messages to be offloaded to the BlobServer.
blob.server.port
"0" String The config parameter defining the server port of the blob service.
blob.service.cleanup.interval
3600 Long Cleanup interval of the blob caches at the task managers (in seconds).
blob.service.ssl.enabled
true Boolean Flag to override ssl support for the blob service transport.
blob.storage.directory
(none) String The config parameter defining the storage directory to be used by the blob server.

Heartbeat Manager

Key Default Type Description
heartbeat.interval
10000 Long Time interval for requesting heartbeat from sender side.
heartbeat.timeout
50000 Long Timeout for requesting and receiving heartbeat for both sender and receiver sides.

SSL Settings

Key Default Type Description
security.ssl.algorithms
"TLS_RSA_WITH_AES_128_CBC_SHA" String The comma separated list of standard SSL algorithms to be supported. Read more here
security.ssl.internal.close-notify-flush-timeout
-1 Integer The timeout (in ms) for flushing the `close_notify` that was triggered by closing a channel. If the `close_notify` was not flushed in the given timeout the channel will be closed forcibly. (-1 = use system default)
security.ssl.internal.enabled
false Boolean Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc).
security.ssl.internal.handshake-timeout
-1 Integer The timeout (in ms) during SSL handshake. (-1 = use system default)
security.ssl.internal.key-password
(none) String The secret to decrypt the key in the keystore for Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.internal.keystore
(none) String The Java keystore file with SSL Key and Certificate, to be used Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.internal.keystore-password
(none) String The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.internal.session-cache-size
-1 Integer The size of the cache used for storing SSL session objects. According to https://github.com/netty/netty/issues/832, you should always set this to an appropriate number to not run into a bug with stalling IO threads during garbage collection. (-1 = use system default).
security.ssl.internal.session-timeout
-1 Integer The timeout (in ms) for the cached SSL session objects. (-1 = use system default)
security.ssl.internal.truststore
(none) String The truststore file containing the public CA certificates to verify the peer for Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.internal.truststore-password
(none) String The password to decrypt the truststore for Flink's internal endpoints (rpc, data transport, blob server).
security.ssl.key-password
(none) String The secret to decrypt the server key in the keystore.
security.ssl.keystore
(none) String The Java keystore file to be used by the flink endpoint for its SSL Key and Certificate.
security.ssl.keystore-password
(none) String The secret to decrypt the keystore file.
security.ssl.protocol
"TLSv1.2" String The SSL protocol version to be supported for the ssl transport. Note that it doesn’t support comma separated list.
security.ssl.provider
"JDK" String The SSL engine provider to use for the ssl transport:
  • JDK: default Java-based SSL engine
  • OPENSSL: openSSL-based SSL engine using system libraries
OPENSSL is based on netty-tcnative and comes in two flavours:
  • dynamically linked: This will use your system's openSSL libraries (if compatible) and requires opt/flink-shaded-netty-tcnative-dynamic-*.jar to be copied to lib/
  • statically linked: Due to potential licensing issues with openSSL (see LEGAL-393), we cannot ship pre-built libraries. However, you can build the required library yourself and put it into lib/:
    git clone https://github.com/apache/flink-shaded.git && cd flink-shaded && mvn clean package -Pinclude-netty-tcnative-static -pl flink-shaded-netty-tcnative-static
security.ssl.rest.authentication-enabled
false Boolean Turns on mutual SSL authentication for external communication via the REST endpoints.
security.ssl.rest.enabled
false Boolean Turns on SSL for external communication via the REST endpoints.
security.ssl.rest.key-password
(none) String The secret to decrypt the key in the keystore for Flink's external REST endpoints.
security.ssl.rest.keystore
(none) String The Java keystore file with SSL Key and Certificate, to be used Flink's external REST endpoints.
security.ssl.rest.keystore-password
(none) String The secret to decrypt the keystore file for Flink's for Flink's external REST endpoints.
security.ssl.rest.truststore
(none) String The truststore file containing the public CA certificates to verify the peer for Flink's external REST endpoints.
security.ssl.rest.truststore-password
(none) String The password to decrypt the truststore for Flink's external REST endpoints.
security.ssl.truststore
(none) String The truststore file containing the public CA certificates to be used by flink endpoints to verify the peer’s certificate.
security.ssl.truststore-password
(none) String The secret to decrypt the truststore.
security.ssl.verify-hostname
true Boolean Flag to enable peer’s hostname verification during ssl handshake.

Netty Shuffle Environment

Key Default Type Description
taskmanager.data.port
0 Integer The task manager’s port used for data exchange operations.
taskmanager.data.ssl.enabled
true Boolean Enable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true
taskmanager.network.detailed-metrics
false Boolean Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.
taskmanager.network.memory.buffers-per-channel
2 Integer Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization.
taskmanager.network.memory.floating-buffers-per-gate
8 Integer Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.
taskmanager.network.request-backoff.initial
100 Integer Minimum backoff in milliseconds for partition requests of input channels.
taskmanager.network.request-backoff.max
10000 Integer Maximum backoff in milliseconds for partition requests of input channels.

Network Communication (via Netty)

These parameters allow for advanced tuning. The default values are sufficient when running concurrent high-throughput jobs on a large cluster.

Key Default Type Description
taskmanager.network.netty.client.connectTimeoutSec
120 Integer The Netty client connection timeout.
taskmanager.network.netty.client.numThreads
-1 Integer The number of Netty client threads.
taskmanager.network.netty.num-arenas
-1 Integer The number of Netty arenas.
taskmanager.network.netty.sendReceiveBufferSize
0 Integer The Netty send and receive buffer size. This defaults to the system buffer size (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.
taskmanager.network.netty.server.backlog
0 Integer The netty server connection backlog.
taskmanager.network.netty.server.numThreads
-1 Integer The number of Netty server threads.
taskmanager.network.netty.transport
"nio" String The Netty transport type, either "nio" or "epoll"

Web Frontend

Key Default Type Description
web.access-control-allow-origin
"*" String Access-Control-Allow-Origin header for all responses from the web-frontend.
web.address
(none) String Address for runtime monitor web-frontend server.
web.backpressure.cleanup-interval
600000 Integer Time, in milliseconds, after which cached stats are cleaned up if not accessed.
web.backpressure.delay-between-samples
50 Integer Delay between samples to determine back pressure in milliseconds.
web.backpressure.num-samples
100 Integer Number of samples to take to determine back pressure.
web.backpressure.refresh-interval
60000 Integer Time, in milliseconds, after which available stats are deprecated and need to be refreshed (by resampling).
web.checkpoints.history
10 Integer Number of checkpoints to remember for recent history.
web.history
5 Integer Number of archived jobs for the JobManager.
web.log.path
(none) String Path to the log file (may be in /log for standalone but under log directory when using YARN).
web.refresh-interval
3000 Long Refresh interval for the web-frontend in milliseconds.
web.ssl.enabled
true Boolean Flag indicating whether to override SSL support for the JobManager Web UI.
web.submit.enable
true Boolean Flag indicating whether jobs can be uploaded and run from the web-frontend.
web.timeout
10000 Long Timeout for asynchronous operations by the web monitor in milliseconds.
web.tmpdir
System.getProperty("java.io.tmpdir") String Flink web directory which is used by the webmonitor.
web.upload.dir
(none) String Directory for uploading the job jars. If not specified a dynamic directory will be used under the directory specified by JOB_MANAGER_WEB_TMPDIR_KEY.

File Systems

Key Default Type Description
fs.default-scheme
(none) String The default filesystem scheme, used for paths that do not declare a scheme explicitly. May contain an authority, e.g. host:port in case of an HDFS NameNode.
fs.output.always-create-directory
false Boolean File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to "true", writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to "false", the writer will directly create the file directly at the output path, without creating a containing directory.
fs.overwrite-files
false Boolean Specifies whether file output writers should overwrite existing files by default. Set to "true" to overwrite by default,"false" otherwise.

Compiler/Optimizer

Key Default Type Description
compiler.delimited-informat.max-line-samples
10 Integer he maximum number of line samples taken by the compiler for delimited inputs. The samples are used to estimate the number of records. This value can be overridden for a specific input with the input format’s parameters.
compiler.delimited-informat.max-sample-len
2097152 Integer The maximal length of a line sample that the compiler takes for delimited inputs. If the length of a single sample exceeds this value (possible because of misconfiguration of the parser), the sampling aborts. This value can be overridden for a specific input with the input format’s parameters.
compiler.delimited-informat.min-line-samples
2 Integer The minimum number of line samples taken by the compiler for delimited inputs. The samples are used to estimate the number of records. This value can be overridden for a specific input with the input format’s parameters

Runtime Algorithms

Key Default Type Description
taskmanager.runtime.hashjoin-bloom-filters
false Boolean Flag to activate/deactivate bloom filters in the hybrid hash join implementation. In cases where the hash join needs to spill to disk (datasets larger than the reserved fraction of memory), these bloom filters can greatly reduce the number of spilled records, at the cost some CPU cycles.
taskmanager.runtime.max-fan
128 Integer The maximal fan-in for external merge joins and fan-out for spilling hash tables. Limits the number of file handles per operator, but may cause intermediate merging/partitioning, if set too small.
taskmanager.runtime.sort-spilling-threshold
0.8 Float A sort operation starts spilling when this fraction of its memory budget is full.

Resource Manager

The configuration keys in this section are independent of the used resource management framework (YARN, Mesos, Standalone, …)

Key Default Type Description
containerized.heap-cutoff-min
600 Integer Minimum amount of heap memory to remove in containers, as a safety margin.
containerized.heap-cutoff-ratio
0.25 Float Percentage of heap space to remove from containers (YARN / Mesos), to compensate for other JVM memory usage.
local.number-resourcemanager
1 Integer The number of resource managers start.
resourcemanager.job.timeout
"5 minutes" String Timeout for jobs which don't have a job manager as leader assigned.
resourcemanager.rpc.port
0 Integer Defines the network port to connect to for communication with the resource manager. By default, the port of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges.
resourcemanager.standalone.start-up-time
-1 Long Time in milliseconds of the start-up period of a standalone cluster. During this time, resource manager of the standalone cluster expects new task executors to be registered, and will not fail slot requests that can not be satisfied by any current registered slots. After this time, it will fail pending and new coming requests immediately that can not be satisfied by registered slots. If not set, 'slotmanager.request-timeout' will be used by default.
resourcemanager.taskmanager-timeout
30000 Long The timeout for an idle task manager to be released.

Shuffle Service

Key Default Type Description
shuffle-service-factory.class
"org.apache.flink.runtime.io.network.NettyShuffleServiceFactory" String The full class name of the shuffle service factory implementation to be used by the cluster. The default implementation uses Netty for network communication and local memory as well disk space to store results on a TaskExecutor.

YARN

Key Default Type Description
yarn.application-attempt-failures-validity-interval
10000 Long Time window in milliseconds which defines the number of application attempt failures when restarting the AM. Failures which fall outside of this window are not being considered. Set this value to -1 in order to count globally. See here for more information.
yarn.application-attempts
(none) String Number of ApplicationMaster restarts. Note that that the entire Flink cluster will restart and the YARN Client will loose the connection. Also, the JobManager address will change and you’ll need to set the JM host:port manually. It is recommended to leave this option at 1.
yarn.application-master.port
"0" String With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting. For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.
yarn.application.id
(none) String The YARN application id of the running yarn cluster. This is the YARN cluster where the pipeline is going to be executed.
yarn.application.name
(none) String A custom name for your YARN application.
yarn.application.node-label
(none) String Specify YARN node label for the YARN application.
yarn.application.priority
-1 Integer A non-negative integer indicating the priority for submitting a Flink YARN application. It will only take effect if YARN priority scheduling setting is enabled. Larger integer corresponds with higher priority. If priority is negative or set to '-1'(default), Flink will unset yarn priority setting and use cluster default priority. Please refer to YARN's official documentation for specific settings required to enable priority scheduling for the targeted YARN version.
yarn.application.queue
(none) String The YARN queue on which to put the current pipeline.
yarn.application.type
(none) String A custom type for your YARN application..
yarn.appmaster.rpc.address
(none) String The hostname or address where the application master RPC system is listening.
yarn.appmaster.rpc.port
-1 Integer The port where the application master RPC system is listening.
yarn.appmaster.vcores
1 Integer The number of virtual cores (vcores) used by YARN application master.
yarn.containers.vcores
-1 Integer The number of virtual cores (vcores) per YARN container. By default, the number of vcores is set to the number of slots per TaskManager, if set, or to 1, otherwise. In order for this parameter to be used your cluster must have CPU scheduling enabled. You can do this by setting the org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.
yarn.flink-dist-jar
(none) String The location of the Flink dist jar.
yarn.heartbeat.container-request-interval
500 Integer Time between heartbeats with the ResourceManager in milliseconds if Flink requests containers:
  • The lower this value is, the faster Flink will get notified about container allocations since requests and allocations are transmitted via heartbeats.
  • The lower this value is, the more excessive containers might get allocated which will eventually be released but put pressure on Yarn.
If you observe too many container allocations on the ResourceManager, then it is recommended to increase this value. See this link for more information.
yarn.heartbeat.interval
5 Integer Time between heartbeats with the ResourceManager in seconds.
yarn.maximum-failed-containers
(none) String Maximum number of containers the system is going to reallocate in case of a failure.
yarn.per-job-cluster.include-user-jar
"ORDER" String Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on their name ("ORDER").
yarn.properties-file.location
(none) String When a Flink job is submitted to YARN, the JobManager’s host and the number of available processing slots is written into a properties file, so that the Flink client is able to pick those details up. This configuration parameter allows changing the default location of that file (for example for environments sharing a Flink installation between users).
yarn.ship-directories
(none) List<String> A semicolon-separated list of directories to be shipped to the YARN cluster.
yarn.tags
(none) String A comma-separated list of tags to apply to the Flink YARN application.

Mesos

Key Default Type Description
mesos.failover-timeout
604800 Integer The failover timeout in seconds for the Mesos scheduler, after which running tasks are automatically shut down.
mesos.master
(none) String The Mesos master URL. The value should be in one of the following forms:
  • host:port
  • zk://host1:port1,host2:port2,.../path
  • zk://username:password@host1:port1,host2:port2,.../path
  • file:///path/to/file
mesos.resourcemanager.artifactserver.port
0 Integer The config parameter defining the Mesos artifact server port to use. Setting the port to 0 will let the OS choose an available port.
mesos.resourcemanager.artifactserver.ssl.enabled
true Boolean Enables SSL for the Flink artifact server. Note that security.ssl.enabled also needs to be set to true encryption to enable encryption.
mesos.resourcemanager.declined-offer-refuse-duration
5000 Long Amount of time to ask the Mesos master to not resend a declined resource offer again. This ensures a declined resource offer isn't resent immediately after being declined
mesos.resourcemanager.framework.name
"Flink" String Mesos framework name
mesos.resourcemanager.framework.principal
(none) String Mesos framework principal
mesos.resourcemanager.framework.role
"*" String Mesos framework role definition
mesos.resourcemanager.framework.secret
(none) String Mesos framework secret
mesos.resourcemanager.framework.user
(none) String Mesos framework user
mesos.resourcemanager.tasks.port-assignments
(none) String Comma-separated list of configuration keys which represent a configurable port. All port keys will dynamically get a port assigned through Mesos.
mesos.resourcemanager.unused-offer-expiration
120000 Long Amount of time to wait for unused expired offers before declining them. This ensures your scheduler will not hoard unuseful offers.

Mesos TaskManager

Key Default Type Description
mesos.constraints.hard.hostattribute
(none) String Constraints for task placement on Mesos based on agent attributes. Takes a comma-separated list of key:value pairs corresponding to the attributes exposed by the target mesos agents. Example: az:eu-west-1a,series:t2
mesos.resourcemanager.tasks.bootstrap-cmd
(none) String A command which is executed before the TaskManager is started.
mesos.resourcemanager.tasks.container.docker.force-pull-image
false Boolean Instruct the docker containerizer to forcefully pull the image rather than reuse a cached version.
mesos.resourcemanager.tasks.container.docker.parameters
(none) String Custom parameters to be passed into docker run command when using the docker containerizer. Comma separated list of "key=value" pairs. The "value" may contain '='.
mesos.resourcemanager.tasks.container.image.name
(none) String Image name to use for the container.
mesos.resourcemanager.tasks.container.type
"mesos" String Type of the containerization used: “mesos” or “docker”.
mesos.resourcemanager.tasks.container.volumes
(none) String A comma separated list of [host_path:]container_path[:RO|RW]. This allows for mounting additional volumes into your container.
mesos.resourcemanager.tasks.cpus
0.0 Double CPUs to assign to the Mesos workers.
mesos.resourcemanager.tasks.disk
0 Integer Disk space to assign to the Mesos workers in MB.
mesos.resourcemanager.tasks.gpus
0 Integer GPUs to assign to the Mesos workers.
mesos.resourcemanager.tasks.hostname
(none) String Optional value to define the TaskManager’s hostname. The pattern _TASK_ is replaced by the actual id of the Mesos task. This can be used to configure the TaskManager to use Mesos DNS (e.g. _TASK_.flink-service.mesos) for name lookups.
mesos.resourcemanager.tasks.mem
1024 Integer Memory to assign to the Mesos workers in MB.
mesos.resourcemanager.tasks.taskmanager-cmd
"$FLINK_HOME/bin/mesos-taskmanager.sh" String
mesos.resourcemanager.tasks.uris
(none) String A comma separated list of URIs of custom artifacts to be downloaded into the sandbox of Mesos workers.
taskmanager.numberOfTaskSlots
1 Integer The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).

High Availability (HA)

Key Default Type Description
high-availability
"NONE" String Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.
high-availability.cluster-id
"/default" String The ID of the Flink cluster, used to separate multiple Flink clusters from each other. Needs to be set for standalone clusters but is automatically inferred in YARN and Mesos.
high-availability.jobmanager.port
"0" String Optional port (range) used by the job manager in high-availability mode.
high-availability.storageDir
(none) String File system path (URI) where Flink persists metadata in high-availability setups.

ZooKeeper-based HA Mode

Key Default Type Description
high-availability.zookeeper.client.acl
"open" String Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos).
high-availability.zookeeper.client.connection-timeout
15000 Integer Defines the connection timeout for ZooKeeper in ms.
high-availability.zookeeper.client.max-retry-attempts
3 Integer Defines the number of connection retries before the client gives up.
high-availability.zookeeper.client.retry-wait
5000 Integer Defines the pause between consecutive retries in ms.
high-availability.zookeeper.client.session-timeout
60000 Integer Defines the session timeout for the ZooKeeper session in ms.
high-availability.zookeeper.path.checkpoint-counter
"/checkpoint-counter" String ZooKeeper root path (ZNode) for checkpoint counters.
high-availability.zookeeper.path.checkpoints
"/checkpoints" String ZooKeeper root path (ZNode) for completed checkpoints.
high-availability.zookeeper.path.jobgraphs
"/jobgraphs" String ZooKeeper root path (ZNode) for job graphs
high-availability.zookeeper.path.latch
"/leaderlatch" String Defines the znode of the leader latch which is used to elect the leader.
high-availability.zookeeper.path.leader
"/leader" String Defines the znode of the leader which contains the URL to the leader and the current leader session ID.
high-availability.zookeeper.path.mesos-workers
"/mesos-workers" String The ZooKeeper root path for persisting the Mesos worker information.
high-availability.zookeeper.path.root
"/flink" String The root path under which Flink stores its entries in ZooKeeper.
high-availability.zookeeper.path.running-registry
"/running_job_registry/" String
high-availability.zookeeper.quorum
(none) String The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.

ZooKeeper Security

Key Default Type Description
zookeeper.sasl.disable
false Boolean
zookeeper.sasl.login-context-name
"Client" String
zookeeper.sasl.service-name
"zookeeper" String

Kerberos-based Security

Key Default Type Description
security.kerberos.login.contexts
(none) String A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication)
security.kerberos.login.keytab
(none) String Absolute path to a Kerberos keytab file that contains the user credentials.
security.kerberos.login.principal
(none) String Kerberos principal name associated with the keytab.
security.kerberos.login.use-ticket-cache
true Boolean Indicates whether to read from your Kerberos ticket cache.

Environment

Key Default Type Description
env.hadoop.conf.dir
(none) String Path to hadoop configuration directory. It is required to read HDFS and/or YARN configuration. You can also set it via environment variable.
env.java.opts
(none) String Java options to start the JVM of all Flink processes with.
env.java.opts.historyserver
(none) String Java options to start the JVM of the HistoryServer with.
env.java.opts.jobmanager
(none) String Java options to start the JVM of the JobManager with.
env.java.opts.taskmanager
(none) String Java options to start the JVM of the TaskManager with.
env.log.dir
(none) String Defines the directory where the Flink logs are saved. It has to be an absolute path. (Defaults to the log directory under Flink’s home)
env.log.max
5 Integer The maximum number of old log files to keep.
env.ssh.opts
(none) String Additional command line options passed to SSH clients when starting or stopping JobManager, TaskManager, and Zookeeper services (start-cluster.sh, stop-cluster.sh, start-zookeeper-quorum.sh, stop-zookeeper-quorum.sh).
env.yarn.conf.dir
(none) String Path to yarn configuration directory. It is required to run flink on YARN. You can also set it via environment variable.

Pipeline

Key Default Type Description
pipeline.classpaths
(none) List<String> A semicolon-separated list of the classpaths to package with the job jars to be sent to the cluster. These have to be valid URLs.
pipeline.jars
(none) List<String> A semicolon-separated list of the jars to package with the job jars to be sent to the cluster. These have to be valid paths.

Checkpointing

Key Default Type Description
state.backend
(none) String The state backend to be used to store and checkpoint state.
state.backend.async
true Boolean Option whether the state backend should use an asynchronous snapshot method where possible and configurable. Some state backends may not support asynchronous snapshots, or only support asynchronous snapshots, and ignore this option.
state.backend.fs.memory-threshold
1024 Integer The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file.
state.backend.fs.write-buffer-size
4096 Integer The default size of the write buffer for the checkpoint streams that write to file systems. The actual write buffer size is determined to be the maximum of the value of this option and option 'state.backend.fs.memory-threshold'.
state.backend.incremental
false Boolean Option whether the state backend should create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Some state backends may not support incremental checkpoints and ignore this option.
state.backend.local-recovery
false Boolean This option configures local recovery for this state backend. By default, local recovery is deactivated. Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend does not support local recovery and ignore this option.
state.checkpoints.dir
(none) String The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers).
state.checkpoints.num-retained
1 Integer The maximum number of completed checkpoints to retain.
state.savepoints.dir
(none) String The default directory for savepoints. Used by the state backends that write savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend).
taskmanager.state.local.root-dirs
(none) String The config parameter defining the root directories for storing file-based state for local recovery. Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend does not support local recovery and ignore this option

RocksDB State Backend

Key Default Type Description
state.backend.rocksdb.checkpoint.transfer.thread.num
1 Integer The number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend.
state.backend.rocksdb.localdir
(none) String The local directory (on the TaskManager) where RocksDB puts its files.
state.backend.rocksdb.options-factory
"org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory" String The options factory class for RocksDB to create DBOptions and ColumnFamilyOptions. The default options factory is org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory, and it would read the configured options which provided in 'RocksDBConfigurableOptions'.
state.backend.rocksdb.predefined-options
"DEFAULT" String The predefined settings for RocksDB DBOptions and ColumnFamilyOptions by Flink community. Current supported candidate predefined-options are DEFAULT, SPINNING_DISK_OPTIMIZED, SPINNING_DISK_OPTIMIZED_HIGH_MEM or FLASH_SSD_OPTIMIZED. Note that user customized options and options from the OptionsFactory are applied on top of these predefined ones.
state.backend.rocksdb.timer-service.factory
"HEAP" String This determines the factory for timer service state implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB .
state.backend.rocksdb.ttl.compaction.filter.enabled
false Boolean This determines if compaction filter to cleanup state with TTL is enabled for backend.Note: User can still decide in state TTL configuration in state descriptor whether the filter is active for particular state or not.

RocksDB Configurable Options

Specific RocksDB configurable options, provided by Flink, to create a corresponding ConfigurableOptionsFactory. And the created one would be used as default OptionsFactory in RocksDBStateBackend unless user define a OptionsFactory and set via RocksDBStateBackend.setOptions(optionsFactory)

Key Default Type Description
state.backend.rocksdb.block.blocksize
(none) String The approximate size (in bytes) of user data packed per block. RocksDB has default blocksize as '4KB'.
state.backend.rocksdb.block.cache-size
(none) String The amount of the cache for data blocks in RocksDB. RocksDB has default block-cache size as '8MB'.
state.backend.rocksdb.compaction.level.max-size-level-base
(none) String The upper-bound of the total size of level base files in bytes. RocksDB has default configuration as '10MB'.
state.backend.rocksdb.compaction.level.target-file-size-base
(none) String The target file size for compaction, which determines a level-1 file size. RocksDB has default configuration as '2MB'.
state.backend.rocksdb.compaction.level.use-dynamic-size
(none) String If true, RocksDB will pick target size of each level dynamically. From an empty DB, RocksDB would make last level the base level, which means merging L0 data into the last level, until it exceeds max_bytes_for_level_base. And then repeat this process for second last level and so on. RocksDB has default configuration as 'false'. For more information, please refer to RocksDB's doc.
state.backend.rocksdb.compaction.style
(none) String The specified compaction style for DB. Candidate compaction style is LEVEL, FIFO or UNIVERSAL, and RocksDB choose 'LEVEL' as default style.
state.backend.rocksdb.files.open
(none) String The maximum number of open files (per TaskManager) that can be used by the DB, '-1' means no limit. RocksDB has default configuration as '5000'.
state.backend.rocksdb.thread.num
(none) String The maximum number of concurrent background flush and compaction jobs (per TaskManager). RocksDB has default configuration as '1'.
state.backend.rocksdb.writebuffer.count
(none) String Tne maximum number of write buffers that are built up in memory. RocksDB has default configuration as '2'.
state.backend.rocksdb.writebuffer.number-to-merge
(none) String The minimum number of write buffers that will be merged together before writing to storage. RocksDB has default configuration as '1'.
state.backend.rocksdb.writebuffer.size
(none) String The amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. RocksDB has default writebuffer size as '4MB'.

Queryable State

Key Default Type Description
queryable-state.client.network-threads
0 Integer Number of network (Netty's event loop) Threads for queryable state client.
queryable-state.enable
false Boolean Option whether the queryable state proxy and server should be enabled where possible and configurable.
queryable-state.proxy.network-threads
0 Integer Number of network (Netty's event loop) Threads for queryable state proxy.
queryable-state.proxy.ports
"9069" String The port range of the queryable state proxy. The specified range can be a single port: "9123", a range of ports: "50100-50200", or a list of ranges and ports: "50100-50200,50300-50400,51234".
queryable-state.proxy.query-threads
0 Integer Number of query Threads for queryable state proxy. Uses the number of slots if set to 0.
queryable-state.server.network-threads
0 Integer Number of network (Netty's event loop) Threads for queryable state server.
queryable-state.server.ports
"9067" String The port range of the queryable state server. The specified range can be a single port: "9123", a range of ports: "50100-50200", or a list of ranges and ports: "50100-50200,50300-50400,51234".
queryable-state.server.query-threads
0 Integer Number of query Threads for queryable state server. Uses the number of slots if set to 0.

Metrics

Key Default Type Description
metrics.fetcher.update-interval
10000 Long Update interval for the metric fetcher used by the web UI in milliseconds. Decrease this value for faster updating metrics. Increase this value if the metric fetcher causes too much load. Setting this value to 0 disables the metric fetching completely.
metrics.internal.query-service.port
"0" String The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port.
metrics.internal.query-service.thread-priority
1 Integer The thread priority used for Flink's internal metric query service. The thread is created by Akka's thread pool executor. The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing this value may bring the main Flink components down.
metrics.latency.granularity
"operator" String Defines the granularity of latency metrics. Accepted values are:
  • single - Track latency without differentiating between sources and subtasks.
  • operator - Track latency while differentiating between sources, but not subtasks.
  • subtask - Track latency while differentiating between sources and subtasks.
metrics.latency.history-size
128 Integer Defines the number of measured latencies to maintain at each operator.
metrics.latency.interval
0 Long Defines the interval at which latency tracking marks are emitted from the sources. Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly impact the performance of the cluster.
metrics.reporter.<name>.<parameter>
(none) String Configures the parameter <parameter> for the reporter named <name>.
metrics.reporter.<name>.class
(none) String The reporter class to use for the reporter named <name>.
metrics.reporter.<name>.interval
(none) String The reporter interval to use for the reporter named <name>.
metrics.reporters
(none) String An optional list of reporter names. If configured, only reporters whose name matches any of the names in the list will be started. Otherwise, all reporters that could be found in the configuration will be started.
metrics.scope.delimiter
"." String Delimiter used to assemble the metric identifier.
metrics.scope.jm
"<host>.jobmanager" String Defines the scope format string that is applied to all metrics scoped to a JobManager.
metrics.scope.jm.job
"<host>.jobmanager.<job_name>" String Defines the scope format string that is applied to all metrics scoped to a job on a JobManager.
metrics.scope.operator
"<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>" String Defines the scope format string that is applied to all metrics scoped to an operator.
metrics.scope.task
"<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>" String Defines the scope format string that is applied to all metrics scoped to a task.
metrics.scope.tm
"<host>.taskmanager.<tm_id>" String Defines the scope format string that is applied to all metrics scoped to a TaskManager.
metrics.scope.tm.job
"<host>.taskmanager.<tm_id>.<job_name>" String Defines the scope format string that is applied to all metrics scoped to a job on a TaskManager.
metrics.system-resource
false Boolean Flag indicating whether Flink should report system resource metrics such as machine's CPU, memory or network usage.
metrics.system-resource-probing-interval
5000 Long Interval between probing of system resource metrics specified in milliseconds. Has an effect only when 'metrics.system-resource' is enabled.

RocksDB Native Metrics

Certain RocksDB native metrics may be forwarded to Flink’s metrics reporter. All native metrics are scoped to operators and then further broken down by column family; values are reported as unsigned longs.

Note: Enabling native metrics may cause degraded performance and should be set carefully.
Key Default Type Description
state.backend.rocksdb.metrics.actual-delayed-write-rate
false Boolean Monitor the current actual delayed write rate. 0 means no delay.
state.backend.rocksdb.metrics.background-errors
false Boolean Monitor the number of background errors in RocksDB.
state.backend.rocksdb.metrics.column-family-as-variable
false Boolean Whether to expose the column family as a variable.
state.backend.rocksdb.metrics.compaction-pending
false Boolean Track pending compactions in RocksDB. Returns 1 if a compaction is pending, 0 otherwise.
state.backend.rocksdb.metrics.cur-size-active-mem-table
false Boolean Monitor the approximate size of the active memtable in bytes.
state.backend.rocksdb.metrics.cur-size-all-mem-tables
false Boolean Monitor the approximate size of the active and unflushed immutable memtables in bytes.
state.backend.rocksdb.metrics.estimate-live-data-size
false Boolean Estimate of the amount of live data in bytes.
state.backend.rocksdb.metrics.estimate-num-keys
false Boolean Estimate the number of keys in RocksDB.
state.backend.rocksdb.metrics.estimate-pending-compaction-bytes
false Boolean Estimated total number of bytes compaction needs to rewrite to get all levels down to under target size. Not valid for other compactions than level-based.
state.backend.rocksdb.metrics.estimate-table-readers-mem
false Boolean Estimate the memory used for reading SST tables, excluding memory used in block cache (e.g.,filter and index blocks) in bytes.
state.backend.rocksdb.metrics.mem-table-flush-pending
false Boolean Monitor the number of pending memtable flushes in RocksDB.
state.backend.rocksdb.metrics.num-deletes-active-mem-table
false Boolean Monitor the total number of delete entries in the active memtable.
state.backend.rocksdb.metrics.num-deletes-imm-mem-tables
false Boolean Monitor the total number of delete entries in the unflushed immutable memtables.
state.backend.rocksdb.metrics.num-entries-active-mem-table
false Boolean Monitor the total number of entries in the active memtable.
state.backend.rocksdb.metrics.num-entries-imm-mem-tables
false Boolean Monitor the total number of entries in the unflushed immutable memtables.
state.backend.rocksdb.metrics.num-immutable-mem-table
false Boolean Monitor the number of immutable memtables in RocksDB.
state.backend.rocksdb.metrics.num-live-versions
false Boolean Monitor number of live versions. Version is an internal data structure. See RocksDB file version_set.h for details. More live versions often mean more SST files are held from being deleted, by iterators or unfinished compactions.
state.backend.rocksdb.metrics.num-running-compactions
false Boolean Monitor the number of currently running compactions.
state.backend.rocksdb.metrics.num-running-flushes
false Boolean Monitor the number of currently running flushes.
state.backend.rocksdb.metrics.num-snapshots
false Boolean Monitor the number of unreleased snapshots of the database.
state.backend.rocksdb.metrics.size-all-mem-tables
false Boolean Monitor the approximate size of the active, unflushed immutable, and pinned immutable memtables in bytes.
state.backend.rocksdb.metrics.total-sst-files-size
false Boolean Monitor the total size (bytes) of all SST files.WARNING: may slow down online queries if there are too many files.

History Server

You have to configure jobmanager.archive.fs.dir in order to archive terminated jobs and add it to the list of monitored directories via historyserver.archive.fs.dir if you want to display them via the HistoryServer’s web frontend.

  • jobmanager.archive.fs.dir: Directory to upload information about terminated jobs to. You have to add this directory to the list of monitored directories of the history server via historyserver.archive.fs.dir.
Key Default Type Description
historyserver.archive.clean-expired-jobs
false Boolean Whether HistoryServer should cleanup jobs that are no longer present `historyserver.archive.fs.dir`.
historyserver.archive.fs.dir
(none) String Comma separated list of directories to fetch archived jobs from. The history server will monitor these directories for archived jobs. You can configure the JobManager to archive jobs to a directory via `jobmanager.archive.fs.dir`.
historyserver.archive.fs.refresh-interval
10000 Long Interval in milliseconds for refreshing the archived job directories.
historyserver.web.address
(none) String Address of the HistoryServer's web interface.
historyserver.web.port
8082 Integer Port of the HistoryServers's web interface.
historyserver.web.refresh-interval
10000 Long The refresh interval for the HistoryServer web-frontend in milliseconds.
historyserver.web.ssl.enabled
false Boolean Enable HTTPs access to the HistoryServer web frontend. This is applicable only when the global SSL flag security.ssl.enabled is set to true.
historyserver.web.tmpdir
(none) String This configuration parameter allows defining the Flink web directory to be used by the history server web interface. The web interface will copy its static files into the directory.

Python

Key Default Type Description
python.fn-execution.bundle.size
1000 Integer The maximum number of elements to include in a bundle for Python user-defined function execution. The elements are processed asynchronously. One bundle of elements are processed before processing the next bundle of elements. A larger value can improve the throughput, but at the cost of more memory usage and higher latency.
python.fn-execution.bundle.time
1000 Long Sets the waiting timeout(in milliseconds) before processing a bundle for Python user-defined function execution. The timeout defines how long the elements of a bundle will be buffered before being processed. Lower timeouts lead to lower tail latencies, but may affect throughput.

Legacy

  • mode: Execution mode of Flink. Possible values are legacy and new. In order to start the legacy components, you have to specify legacy (DEFAULT: new).

Background

Configuring the Network Buffers

If you ever see the Exception java.io.IOException: Insufficient number of network buffers, you need to adapt the amount of memory used for network buffers in order for your program to run on your task managers.

Network buffers are a critical resource for the communication layers. They are used to buffer records before transmission over a network, and to buffer incoming data before dissecting it into records and handing them to the application. A sufficient number of network buffers is critical to achieve a good throughput.

Since Flink 1.3, you may follow the idiom "more is better" without any penalty on the latency (we prevent excessive buffering in each outgoing and incoming channel, i.e. *buffer bloat*, by limiting the actual number of buffers used by each channel).

In general, configure the task manager to have enough buffers that each logical network connection you expect to be open at the same time has a dedicated buffer. A logical network connection exists for each point-to-point exchange of data over the network, which typically happens at repartitioning or broadcasting steps (shuffle phase). In those, each parallel task inside the TaskManager has to be able to talk to all other parallel tasks.

Note: Since Flink 1.5, network buffers will always be allocated off-heap, i.e. outside of the JVM heap, irrespective of the value of taskmanager.memory.off-heap. This way, we can pass these buffers directly to the underlying network stack layers.

Setting Memory Fractions

Previously, the number of network buffers was set manually which became a quite error-prone task (see below). Since Flink 1.3, it is possible to define a fraction of memory that is being used for network buffers with the following configuration parameters:

  • taskmanager.network.memory.fraction: Fraction of JVM memory to use for network buffers (DEFAULT: 0.1),
  • taskmanager.network.memory.min: Minimum memory size for network buffers (DEFAULT: 64MB),
  • taskmanager.network.memory.max: Maximum memory size for network buffers (DEFAULT: 1GB), and
  • taskmanager.memory.segment-size: Size of memory buffers used by the memory manager and the network stack in bytes (DEFAULT: 32KB).

Setting the Number of Network Buffers directly

Note: This way of configuring the amount of memory used for network buffers is deprecated. Please consider using the method above by defining a fraction of memory to use.

The required number of buffers on a task manager is total-degree-of-parallelism (number of targets) * intra-node-parallelism (number of sources in one task manager) * n with n being a constant that defines how many repartitioning-/broadcasting steps you expect to be active at the same time. Since the intra-node-parallelism is typically the number of cores, and more than 4 repartitioning or broadcasting channels are rarely active in parallel, it frequently boils down to

#slots-per-TM^2 * #TMs * 4

Where #slots per TM are the number of slots per TaskManager and #TMs are the total number of task managers.

To support, for example, a cluster of 20 8-slot machines, you should use roughly 5000 network buffers for optimal throughput.

Each network buffer has by default a size of 32 KiBytes. In the example above, the system would thus allocate roughly 300 MiBytes for network buffers.

The number and size of network buffers can be configured with the following parameters:

  • taskmanager.network.numberOfBuffers, and
  • taskmanager.memory.segment-size.

Configuring Temporary I/O Directories

Although Flink aims to process as much data in main memory as possible, it is not uncommon that more data needs to be processed than memory is available. Flink’s runtime is designed to write temporary data to disk to handle these situations.

The io.tmp.dirs parameter specifies a list of directories into which Flink writes temporary files. The paths of the directories need to be separated by ‘:’ (colon character). Flink will concurrently write (or read) one temporary file to (from) each configured directory. This way, temporary I/O can be evenly distributed over multiple independent I/O devices such as hard disks to improve performance. To leverage fast I/O devices (e.g., SSD, RAID, NAS), it is possible to specify a directory multiple times.

If the io.tmp.dirs parameter is not explicitly specified, Flink writes temporary data to the temporary directory of the operating system, such as /tmp in Linux systems.

Configuring TaskManager processing slots

Flink executes a program in parallel by splitting it into subtasks and scheduling these subtasks to processing slots.

Each Flink TaskManager provides processing slots in the cluster. The number of slots is typically proportional to the number of available CPU cores of each TaskManager. As a general recommendation, the number of available CPU cores is a good default for taskmanager.numberOfTaskSlots.

When starting a Flink application, users can supply the default number of slots to use for that job. The command line value therefore is called -p (for parallelism). In addition, it is possible to set the number of slots in the programming APIs for the whole application and for individual operators.

Configuration Runtime Environment Variables

You have to set config with prefix containerized.master.env. and containerized.taskmanager.env. in order to set redefined environment variable in ApplicationMaster and TaskManager.

  • containerized.master.env.: Prefix for passing custom environment variables to Flink’s master process. For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set containerized.master.env.LD_LIBRARY_PATH: “/usr/lib/native” in the flink-conf.yaml.
  • containerized.taskmanager.env.: Similar to the above, this configuration prefix allows setting custom environment variables for the workers (TaskManagers).

Back to top