External Resource Framework

In addition to CPU and memory, many workloads also need some other resources, e.g. GPUs for deep learning. To support external resources, Flink provides an external resource framework. The framework supports requesting various types of resources from the underlying resource management systems (e.g., Kubernetes), and supplies information needed for using these resources to the operators. Different resource types can be supported. You can either leverage built-in plugins provided by Flink (currently only for GPU support), or implement your own plugins for custom resource types.

What the external resource framework does

In general, the external resource framework does two things:

  • Set the corresponding fields of the resource requests (for requesting resources from the underlying system) with respect to your configuration.

  • Provide operators with the information needed for using the resources.

When deployed on resource management systems (Kubernetes / Yarn), the external resource framework will ensure that the allocated pod/container will contain the desired external resources. Currently, many resource management systems support external resources. For example, Kubernetes supports GPU, FPGA, etc. through its Device Plugin mechanism since v1.10, and Yarn supports GPU and FPGA resources since 2.10 and 3.1. External resources are not supported by Flink’s Mesos integration at the moment. In Standalone mode, the user has to ensure that the external resources are available.

The external resource framework will provide the corresponding information to operators. The external resource information, which contains the basic properties needed for using the resources, is generated by the configured external resource drivers.

Enable the external resource framework for your workload

To enable an external resource with the external resource framework, you need to:

  • Prepare the external resource plugin.

  • Set configurations for the external resource.

  • Get the external resource information from RuntimeContext and use it in your operators.

Prepare plugins

You need to prepare the external resource plugin and put it into the plugins/ folder of your Flink distribution, see Flink Plugins. Apache Flink provides a first-party plugin for GPU resources. You can also implement a plugin for your custom resource type.

Configurations

First, you need to add resource names for all the external resource types to the external resource list (with the configuration key ‘external-resources’) with delimiter “;”, e.g. “external-resources: gpu;fpga” for two external resources “gpu” and “fpga”. Only the <resource_name> defined here will go into effect in the external resource framework.

For each external resource, you could configure the below options. The <resource_name> in all the below configuration options corresponds to the name listed in the external resource list:

  • Amount (external.<resource_name>.amount): This is the quantity of the external resource that should be requested from the external system.

  • Config key in Yarn (external-resource.<resource_name>.yarn.config-key): optional. If configured, the external resource framework will add this key to the resource profile of container requests for Yarn. The value will be set to the value of external-resource.<resource_name>.amount.

  • Config key in Kubernetes (external-resource.<resource_name>.kubernetes.config-key): optional. If configured, external resource framework will add resources.limits.<config-key> and resources.requests.<config-key> to the main container spec of TaskManager and set the value to the value of external-resource.<resource_name>.amount.

  • Driver Factory (external-resource.<resource_name>.driver-factory.class): optional. Defines the factory class name for the external resource identified by <resource_name>. If configured, the factory will be used to instantiate drivers in the external resource framework. If not configured, the requested resource will still exist in the TaskManager as long as the relevant options are configured. However, the operator will not get any information of the resource from RuntimeContext in that case.

  • Driver Parameters (external-resource.<resource_name>.param.<param>): optional. The naming pattern of custom config options for the external resource specified by <resource_name>. Only the configurations that follow this pattern will be passed into the driver factory of that external resource.

An example configuration that specifies two external resources:

external-resources: gpu;fpga # Define two external resources, "gpu" and "fpga".

external-resource.gpu.driver-factory.class: org.apache.flink.externalresource.gpu.GPUDriverFactory # Define the driver factory class of gpu resource.
external-resource.gpu.amount: 2 # Define the amount of gpu resource per TaskManager.
external-resource.gpu.param.discovery-script.args: --enable-coordination # Define the custom param discovery-script.args which will be passed into the gpu driver.

external-resource.fpga.driver-factory.class: org.apache.flink.externalresource.fpga.FPGADriverFactory # Define the driver factory class of fpga resource.
external-resource.fpga.amount: 1 # Define the amount of fpga resource per TaskManager.
external-resource.fpga.yarn.config-key: yarn.io/fpga # Define the corresponding config key of fpga in Yarn.

Use the resources

To use the resources, operators need to get the ExternalResourceInfo set from the RuntimeContext. ExternalResourceInfo wraps the information needed for using the resource, which can be retrieved with getProperty. What properties are available and how to access the resource with the properties depends on the specific plugin.

Operators can get the ExternalResourceInfo set of a specific external resource from RuntimeContext or FunctionContext by getExternalResourceInfos(String resourceName). The resourceName here should have the same value as the name configured in the external resource list. It can be used as follows:

public class ExternalResourceMapFunction extends RichMapFunction<String, String> {
    private static final String RESOURCE_NAME = "foo";

    @Override
    public String map(String value) {
        Set<ExternalResourceInfo> externalResourceInfos = getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME);
        List<String> addresses = new ArrayList<>();
        externalResourceInfos.iterator().forEachRemaining(externalResourceInfo ->
            addresses.add(externalResourceInfo.getProperty("address").get()));
        // map function with addresses.
        // ...
    }
}
class ExternalResourceMapFunction extends RichMapFunction[(String, String)] {
    var RESOURCE_NAME = "foo"

    override def map(value: String): String = {
        val externalResourceInfos = getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME)
        val addresses = new util.ArrayList[String]
        externalResourceInfos.asScala.foreach(
        externalResourceInfo => addresses.add(externalResourceInfo.getProperty("address").get()))

        // map function with addresses.
        // ...
    }
}

Each ExternalResourceInfo contains one or more properties with keys representing the different dimensions of the resource. You could get all valid keys by ExternalResourceInfo#getKeys.

Note: Currently, the information returned by RuntimeContext#getExternalResourceInfos is available to all the operators.

Implement a plugin for your custom resource type

To implement a plugin for your custom resource type, you need to:

  • Add your own external resource driver by implementing the org.apache.flink.api.common.externalresource.ExternalResourceDriver interface.

  • Add a driver factory, which instantiates the driver, by implementing the org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory.

  • Add a service entry. Create a file META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory which contains the class name of your driver factory class (see the Java Service Loader docs for more details).

For example, to implement a plugin for external resource named “FPGA”, you need to implement FPGADriver and FPGADriverFactory first:

public class FPGADriver implements ExternalResourceDriver {
	@Override
	public Set<FPGAInfo> retrieveResourceInfo(long amount) {
		// return the information set of "FPGA"
	}
}

public class FPGADriverFactory implements ExternalResourceDriverFactory {
	@Override
	public ExternalResourceDriver createExternalResourceDriver(Configuration config) {
		return new FPGADriver();
	}
}

// Also implement FPGAInfo which contains basic properties of "FPGA" resource.
public class FPGAInfo implements ExternalResourceInfo {
	@Override
	public Optional<String> getProperty(String key) {
		// return the property with the given key.
	}

	@Override
	public Collection<String> getKeys() {
		// return all property keys.
	}
}
class FPGADriver extends ExternalResourceDriver {
  override def retrieveResourceInfo(amount: Long): Set[FPGAInfo] = {
    // return the information set of "FPGA"
  }
}

class FPGADriverFactory extends ExternalResourceDriverFactory {
  override def createExternalResourceDriver(config: Configuration): ExternalResourceDriver = {
    new FPGADriver()
  }
}

// Also implement FPGAInfo which contains basic properties of "FPGA" resource.
class FPGAInfo extends ExternalResourceInfo {
  override def getProperty(key: String): Option[String] = {
    // return the property with the given key.
  }

  override def getKeys(): util.Collection[String] = {
    // return all property keys.
  }
}

Create a file with name org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory in META-INF/services/ and write the factory class name (e.g. your.domain.FPGADriverFactory) to it.

Then, create a jar which includes FPGADriver, FPGADriverFactory, META-INF/services/ and all the external dependencies. Make a directory in plugins/ of your Flink distribution with an arbitrary name, e.g. “fpga”, and put the jar into this directory. See Flink Plugin for more details.

Note: External resources are shared by all operators running on the same machine. The community might add external resource isolation in a future release.

Existing supported external resource plugins

Currently, Flink supports GPUs as external resources.

Plugin for GPU resources

We provide a first-party plugin for GPU resources. The plugin leverages a discovery script to discover indexes of GPU devices, which can be accessed from the resource information via the property “index”. We provide a default discovery script that can be used to discover NVIDIA GPUs. You can also provide your custom script.

We provide an example which shows how to use the GPUs to do matrix-vector multiplication in Flink.

Note: Currently, for all the operators, RuntimeContext#getExternalResourceInfos returns the same set of resource information. That means, the same set of GPU devices are always accessible to all the operators running in the same TaskManager. There is no operator level isolation at the moment.

Pre-requisites

To make GPU resources accessible, certain prerequisites are needed depending on your environment:

  • For standalone mode, administrators should ensure the NVIDIA driver is installed and GPU resources are accessible on all the nodes in the cluster.

  • For Yarn deployment, administrators should configure the Yarn cluster to enable GPU scheduling. Notice the required Hadoop version is 2.10+ or 3.1+.

  • For Kubernetes deployment, administrators should make sure the NVIDIA GPU device plugin is installed. Notice the required version is 1.10+. At the moment, Kubernetes only supports NVIDIA GPU and AMD GPU. Flink only provides discovery script for NVIDIA GPUs, but you can provide a custom discovery script for AMD GPUs yourself, see Discovery script.

Enable GPU resources for your workload

As mentioned in Enable external resources for your workload, you also need to do two things to enable GPU resources:

  • Configure the GPU resource.

  • Get the information of GPU resources, which contains the GPU index as property with key “index”, in operators.

Configurations

For the GPU plugin, you need to specify the common external resource configurations:

  • external-resources: You need to append your resource name (e.g. gpu) for GPU resources to it.

  • external-resource.<resource_name>.amount: The amount of GPU devices per TaskManager.

  • external-resource.<resource_name>.yarn.config-key: For Yarn, the config key of GPU is yarn.io/gpu. Notice that Yarn only supports NVIDIA GPU at the moment.

  • external-resource.<resource_name>.kubernetes.config-key: For Kubernetes, the config key of GPU is <vendor>.com/gpu. Currently, “nvidia” and “amd” are the two supported vendors. Notice that if you use AMD GPUs, you need to provide a discovery script yourself, see Discovery script.

  • external-resource..driver-factory.class: Should be set to org.apache.flink.externalresource.gpu.GPUDriverFactory.

In addition, there are some specific configurations for the GPU plugin:

  • external-resource.<resource_name>.param.discovery-script.path: The path of the discovery script. It can either be an absolute path, or a relative path to FLINK_HOME when defined or current directory otherwise. If not explicitly configured, the default script will be used.

  • external-resource.<resource_name>.param.discovery-script.args: The arguments passed to the discovery script. For the default discovery script, see Default Script for the available parameters.

An example configuration for GPU resource:

external-resources: gpu
external-resource.gpu.driver-factory.class: org.apache.flink.externalresource.gpu.GPUDriverFactory # Define the driver factory class of gpu resource.
external-resource.gpu.amount: 2 # Define the amount of gpu resource per TaskManager.
external-resource.gpu.param.discovery-script.path: plugins/external-resource-gpu/nvidia-gpu-discovery.sh
external-resource.gpu.param.discovery-script.args: --enable-coordination # Define the custom param "discovery-script.args" which will be passed into the gpu driver.

external-resource.gpu.yarn.config-key: yarn.io/gpu # for Yarn

external-resource.gpu.kubernetes.config-key: nvidia.com/gpu # for Kubernetes

Discovery script

The GPUDriver leverages a discovery script to discover GPU resources and generate the GPU resource information.

Default Script

We provide a default discovery script for NVIDIA GPU, located at plugins/external-resource-gpu/nvidia-gpu-discovery.sh of your Flink distribution. The script gets the indexes of visible GPU resources through the nvidia-smi command. It tries to return the required amount (specified by external-resource.<resource_name>.amount) of GPU indexes in a list, and exit with non-zero if the amount cannot be satisfied.

For standalone mode, multiple TaskManagers might be co-located on the same machine, and each GPU device is visible to all the TaskManagers. The default discovery script supports a coordination mode, in which it leverages a coordination file to synchronize the allocation state of GPU devices and ensure each GPU device can only be used by one TaskManager process. The relevant arguments are:

  • --enable-coordination-mode: Enable the coordination mode. By default the coordination mode is disabled.

  • --coordination-file filePath: The path of the coordination file used to synchronize the allocation state of GPU resources. The default path is /var/tmp/flink-gpu-coordination.

Note: The coordination mode only ensures that a GPU device is not shared by multiple TaskManagers of the same Flink cluster. Please be aware that another Flink cluster (with a different coordination file) or a non-Flink application can still use the same GPU devices.

Custom Script

You can also provide a discovery script to address your custom requirements, e.g. discovering AMD GPU. Please make sure the path of your custom script is accessible to Flink and configured (external-resource.<resource_name>.param.discovery-script.path) correctly. The contract of the discovery script:

  • GPUDriver passes the amount (specified by external-resource.<resource_name>.amount) as the first argument into the script. The user-defined arguments in external-resource.<resource_name>.param.discovery-script.args would be appended after it.

  • The script should return a list of the available GPU indexes, split by a comma. Whitespace only indexes will be ignored.

  • The script can also suggest that the discovery is not properly performed, by exiting with non-zero. In that case, no gpu information will be provided to operators.