In addition to CPU and memory, many workloads also need some other resources, e.g. GPUs for deep learning. To support external resources, Flink provides an external resource framework. The framework supports requesting various types of resources from the underlying resource management systems (e.g., Kubernetes), and supplies information needed for using these resources to the operators. Different resource types can be supported. You can either leverage built-in plugins provided by Flink (currently only for GPU support), or implement your own plugins for custom resource types.
In general, the external resource framework does two things:
Set the corresponding fields of the resource requests (for requesting resources from the underlying system) with respect to your configuration.
Provide operators with the information needed for using the resources.
When deployed on resource management systems (Kubernetes / Yarn), the external resource framework will ensure that the allocated pod/container will contain the desired external resources. Currently, many resource management systems support external resources. For example, Kubernetes supports GPU, FPGA, etc. through its Device Plugin mechanism since v1.10, and Yarn supports GPU and FPGA resources since 2.10 and 3.1. External resources are not supported by Flink’s Mesos integration at the moment. In Standalone mode, the user has to ensure that the external resources are available.
The external resource framework will provide the corresponding information to operators. The external resource information, which contains the basic properties needed for using the resources, is generated by the configured external resource drivers.
To enable an external resource with the external resource framework, you need to:
Prepare the external resource plugin.
Set configurations for the external resource.
Get the external resource information from
RuntimeContext and use it in your operators.
You need to prepare the external resource plugin and put it into the
plugins/ folder of your Flink distribution, see
Flink Plugins. Apache Flink provides a first-party plugin for GPU resources. You can also
implement a plugin for your custom resource type.
First, you need to add resource names for all the external resource types to the external resource list (with the configuration key ‘external-resources’) with delimiter “;”, e.g. “external-resources: gpu;fpga” for two external resources “gpu” and “fpga”. Only the <resource_name> defined here will go into effect in the external resource framework.
For each external resource, you could configure the below options. The <resource_name> in all the below configuration options corresponds to the name listed in the external resource list:
external.<resource_name>.amount): This is the quantity of the external resource that should be requested from the external system.
Config key in Yarn (
external-resource.<resource_name>.yarn.config-key): optional. If configured, the external
resource framework will add this key to the resource profile of container requests for Yarn. The value will be set to the
Config key in Kubernetes (
external-resource.<resource_name>.kubernetes.config-key): optional. If configured,
external resource framework will add
resources.requests.<config-key> to the main
container spec of TaskExecutor and set the value to the value of
Driver Factory (
external-resource.<resource_name>.driver-factory.class): optional. Defines the factory class
name for the external resource identified by <resource_name>. If configured, the factory will be used to instantiate
drivers in the external resource framework.
Driver Parameters (
external-resource.<resource_name>.param.<param>): optional. The naming pattern of custom
config options for the external resource specified by <resource_name>. Only the configurations that follow this pattern
will be passed into the driver factory of that external resource.
An example configuration that specifies two external resources:
To use the resources, operators need to get the
ExternalResourceInfo set from the
wraps the information needed for using the resource, which can be retrieved with
getProperty. What properties are available
and how to access the resource with the properties depends on the specific plugin.
Operators can get the
ExternalResourceInfo set of a specific external resource from
getExternalResourceInfos(String resourceName). The
resourceName here should have the same value as the name configured in the
external resource list. It can be used as follows:
ExternalResourceInfo contains one or more properties with keys representing the different dimensions of the resource.
You could get all valid keys by
To implement a plugin for your custom resource type, you need to:
Add your own external resource driver by implementing the
Add a driver factory, which instantiates the driver, by implementing the
Add a service entry. Create a file
which contains the class name of your driver factory class (see the Java Service Loader docs for more details).
For example, to implement a plugin for external resource named “FPGA”, you need to implement
Create a file with name
and write the factory class name (e.g.
your.domain.FPGADriverFactory) to it.
Then, create a jar which includes
META-INF/services/ and all the external dependencies.
Make a directory in
plugins/ of your Flink distribution with an arbitrary name, e.g. “fpga”, and put the jar into this directory.
See Flink Plugin for more details.
Currently, Flink supports GPUs as external resources.
We provide a first-party plugin for GPU resources. The plugin leverages a discovery script to discover indexes of GPU devices, which can be accessed from the resource information via the property “index”. We provide a default discovery script that can be used to discover NVIDIA GPUs. You can also provide your custom script.
We provide an example which shows how to use the GPUs to do matrix-vector multiplication in Flink.
To make GPU resources accessible, certain prerequisites are needed depending on your environment:
For standalone mode, administrators should ensure the NVIDIA driver is installed and GPU resources are accessible on all the nodes in the cluster.
For Yarn deployment, administrators should configure the Yarn cluster to enable GPU scheduling. Notice the required Hadoop version is 2.10+ or 3.1+.
For Kubernetes deployment, administrators should make sure the NVIDIA GPU device plugin is installed. Notice the required version is 1.10+. At the moment, Kubernetes only supports NVIDIA GPU and AMD GPU. Flink only provides discovery script for NVIDIA GPUs, but you can provide a custom discovery script for AMD GPUs yourself, see Discovery script.
As mentioned in Enable external resources for your workload, you also need to do two things to enable GPU resources:
Configure the GPU resource.
Get the information of GPU resources, which contains the GPU index as property with key “index”, in operators.
For the GPU plugin, you need to specify the common external resource configurations:
external-resources: You need to append your resource name (e.g. gpu) for GPU resources to it.
external-resource.<resource_name>.amount: The amount of GPU devices per TaskManager.
external-resource.<resource_name>.yarn.config-key: For Yarn, the config key of GPU is
yarn.io/gpu. Notice that
Yarn only supports NVIDIA GPU at the moment.
external-resource.<resource_name>.kubernetes.config-key: For Kubernetes, the config key of GPU is
Currently, “nvidia” and “amd” are the two supported vendors. Notice that if you use AMD GPUs, you need to provide a discovery
script yourself, see Discovery script.
In addition, there are some specific configurations for the GPU plugin:
external-resource.<resource_name>.param.discovery-script.path: The path of the discovery script. It
can either be an absolute path, or a relative path to
FLINK_HOME when defined or current directory otherwise. If not
explicitly configured, the default script will be used.
external-resource.<resource_name>.param.discovery-script.args: The arguments passed to the discovery script. For the default
discovery script, see Default Script for the available parameters.
An example configuration for GPU resource:
GPUDriver leverages a discovery script to discover GPU resources and generate the GPU resource information.
We provide a default discovery script for NVIDIA GPU, located at
plugins/external-resource-gpu/nvidia-gpu-discovery.sh of your
Flink distribution. The script gets the indexes of visible GPU resources through the
nvidia-smi command. It tries to return
the required amount (specified by
external-resource.<resource_name>.amount) of GPU indexes in a list, and exit with non-zero if the amount cannot be satisfied.
For standalone mode, multiple TaskManagers might be co-located on the same machine, and each GPU device is visible to all the TaskManagers. The default discovery script supports a coordination mode, in which it leverages a coordination file to synchronize the allocation state of GPU devices and ensure each GPU device can only be used by one TaskManager process. The relevant arguments are:
--enable-coordination-mode: Enable the coordination mode.
--coordination-file filePath: The path of the coordination file used to synchronize the allocation state of GPU resources. The default path is
You can also provide a discovery script to address your custom requirements, e.g. discovering AMD GPU. Please make sure
the path of your custom script is accessible to Flink and configured (
The contract of the discovery script:
GPUDriver passes the amount (specified by
external-resource.<resource_name>.amount) as the first argument into the script.
The user-defined arguments in
external-resource.<resource_name>.param.discovery-script.args would be appended after it.
The script should return a list of the available GPU indexes, split by a comma. Whitespace only indexes will be ignored.
The script can also suggest that the discovery is not properly performed, by exiting with non-zero. In that case, no gpu information will be provided to operators.