This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

Metrics

Flink exposes a metric system that allows gathering and exposing metrics to external systems.

Registering metrics

You can access the metric system from any user function that extends RichFunction by calling getRuntimeContext().getMetricGroup(). This method returns a MetricGroup object on which you can create and register new metrics.

Metric types

Flink supports Counters, Gauges, Histograms and Meters.

Counter

A Counter is used to count something. The current value can be in- or decremented using inc()/inc(long n) or dec()/dec(long n). You can create and register a Counter by calling counter(String name) on a MetricGroup.

public class MyMapper extends RichMapFunction<String, Integer> {
  private Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @public Integer map(String value) throws Exception {
    this.counter.inc();
  }
}

Alternatively you can also use your own Counter implementation:

public class MyMapper extends RichMapFunction<String, Integer> {
  private Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCustomCounter", new CustomCounter());
  }
}

Gauge

A Gauge provides a value of any type on demand. In order to use a Gauge you must first create a class that implements the org.apache.flink.metrics.Gauge interface. There is no restriction for the type of the returned value. You can register a gauge by calling gauge(String name, Gauge gauge) on a MetricGroup.

public class MyMapper extends RichMapFunction<String, Integer> {
  private int valueToExpose;

  @Override
  public void open(Configuration config) {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", new Gauge<Integer>() {
        @Override
        public Integer getValue() {
          return valueToExpose;
        }
      });
  }
}
public class MyMapper extends RichMapFunction[String,Int] {
  val valueToExpose = 5

  override def open(parameters: Configuration): Unit = {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
  }
  ...
}

Note that reporters will turn the exposed object into a String, which means that a meaningful toString() implementation is required.

Histogram

A Histogram measures the distribution of long values. You can register one by calling histogram(String name, Histogram histogram) on a MetricGroup.

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Histogram histogram;

  @Override
  public void open(Configuration config) {
    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram());
  }

  @public Integer map(Long value) throws Exception {
    this.histogram.update(value);
  }
}

Flink does not provide a default implementation for Histogram, but offers a Wrapper that allows usage of Codahale/DropWizard histograms. To use this wrapper add the following dependency in your pom.xml:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.3.2</version>
</dependency>

You can then register a Codahale/DropWizard histogram like this:

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Histogram histogram;

  @Override
  public void open(Configuration config) {
    com.codahale.metrics.Histogram histogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));

    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new DropwizardHistogramWrapper(histogram));
  }
}

Meter

A Meter measures an average throughput. An occurrence of an event can be registered with the markEvent() method. Occurrence of multiple events at the same time can be registered with markEvent(long n) method. You can register a meter by calling meter(String name, Meter meter) on a MetricGroup.

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Meter meter;

  @Override
  public void open(Configuration config) {
    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter());
  }

  @public Integer map(Long value) throws Exception {
    this.meter.markEvent();
  }
}

Flink offers a Wrapper that allows usage of Codahale/DropWizard meters. To use this wrapper add the following dependency in your pom.xml:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.3.2</version>
</dependency>

You can then register a Codahale/DropWizard meter like this:

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Meter meter;

  @Override
  public void open(Configuration config) {
    com.codahale.metrics.Meter meter = new com.codahale.metrics.Meter();

    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new DropwizardMeterWrapper(meter));
  }
}

Scope

Every metric is assigned an identifier under which it will be reported that is based on 3 components: the user-provided name when registering the metric, an optional user-defined scope and a system-provided scope. For example, if A.B is the sytem scope, C.D the user scope and E the name, then the identifier for the metric will be A.B.C.D.E.

You can configure which delimiter to use for the identifier (default: .) by setting the metrics.scope.delimiter key in conf/flink-conf.yaml.

User Scope

You can define a user scope by calling either MetricGroup#addGroup(String name) or MetricGroup#addGroup(int name).

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter");

System Scope

The system scope contains context information about the metric, for example in which task it was registered or what job that task belongs to.

Which context information should be included can be configured by setting the following keys in conf/flink-conf.yaml. Each of these keys expect a format string that may contain constants (e.g. “taskmanager”) and variables (e.g. “<task_id>”) which will be replaced at runtime.

  • metrics.scope.jm
    • Default: <host>.jobmanager
    • Applied to all metrics that were scoped to a job manager.
  • metrics.scope.jm.job
    • Default: <host>.jobmanager.<job_name>
    • Applied to all metrics that were scoped to a job manager and job.
  • metrics.scope.tm
    • Default: <host>.taskmanager.<tm_id>
    • Applied to all metrics that were scoped to a task manager.
  • metrics.scope.tm.job
    • Default: <host>.taskmanager.<tm_id>.<job_name>
    • Applied to all metrics that were scoped to a task manager and job.
  • metrics.scope.task
    • Default: <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
    • Applied to all metrics that were scoped to a task.
  • metrics.scope.operator
    • Default: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
    • Applied to all metrics that were scoped to an operator.

There are no restrictions on the number or order of variables. Variables are case sensitive.

The default scope for operator metrics will result in an identifier akin to localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric

If you also want to include the task name but omit the task manager information you can specify the following format:

metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>

This could create the identifier localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric.

Note that for this format string an identifier clash can occur should the same job be run multiple times concurrently, which can lead to inconsistent metric data. As such it is advised to either use format strings that provide a certain degree of uniqueness by including IDs (e.g <job_id>) or by assigning unique names to jobs and operators.

List of all Variables

  • JobManager: <host>
  • TaskManager: <host>, <tm_id>
  • Job: <job_id>, <job_name>
  • Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>
  • Operator: <operator_name>, <subtask_index>

Reporter

Metrics can be exposed to an external system by configuring one or several reporters in conf/flink-conf.yaml. These reporters will be instantiated on each job and task manager when they are started.

  • metrics.reporters: The list of named reporters.
  • metrics.reporter.<name>.<config>: Generic setting <config> for the reporter named <name>.
  • metrics.reporter.<name>.class: The reporter class to use for the reporter named <name>.
  • metrics.reporter.<name>.interval: The reporter interval to use for the reporter named <name>.
  • metrics.reporter.<name>.scope.delimiter: The delimiter to use for the identifier (default value use metrics.scope.delimiter) for the reporter named <name>.

All reporters must at least have the class property, some allow specifying a reporting interval. Below, we will list more settings specific to each reporter.

Example reporter configuration that specifies multiple reporters:

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

Important: The jar containing the reporter must be accessible when Flink is started by placing it in the /lib folder.

You can write your own Reporter by implementing the org.apache.flink.metrics.reporter.MetricReporter interface. If the Reporter should send out reports regularly you have to implement the Scheduled interface as well.

The following sections list the supported reporters.

JMX (org.apache.flink.metrics.jmx.JMXReporter)

You don’t have to include an additional dependency since the JMX reporter is available by default but not activated.

Parameters:

  • port - (optional) the port on which JMX listens for connections. This can also be a port range. When a range is specified the actual port is shown in the relevant job or task manager log. If this setting is set Flink will start an extra JMX connector for the given port/range. Metrics are always available on the default local JMX interface.

Example configuration:

metrics.reporters: jmx
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789

Metrics exposed through JMX are identified by a domain and a list of key-properties, which together form the object name.

The domain always begins with org.apache.flink followed by a generalized metric identifier. In contrast to the usual identifier it is not affected by scope-formats, does not contain any variables and is constant across jobs. An example for such a domain would be org.apache.flink.job.task.numBytesOut.

The key-property list contains the values for all variables, regardless of configured scope formats, that are associated with a given metric. An example for such a list would be host=localhost,job_name=MyJob,task_name=MyTask.

The domain thus identifies a metric class, while the key-property list identifies one (or multiple) instances of that metric.

Ganglia (org.apache.flink.metrics.ganglia.GangliaReporter)

In order to use this reporter you must copy /opt/flink-metrics-ganglia-1.3.2.jar into the /lib folder of your Flink distribution.

Parameters:

  • host - the gmond host address configured under udp_recv_channel.bind in gmond.conf
  • port - the gmond port configured under udp_recv_channel.port in gmond.conf
  • tmax - soft limit for how long an old metric should be retained
  • dmax - hard limit for how long an old metric should be retained
  • ttl - time-to-live for transmitted UDP packets
  • addressingMode - UDP addressing mode to use (UNICAST/MULTICAST)

Example configuration:

metrics.reporters: gang
metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
metrics.reporter.gang.host: localhost
metrics.reporter.gang.port: 8649
metrics.reporter.gang.tmax: 60
metrics.reporter.gang.dmax: 0
metrics.reporter.gang.ttl: 1
metrics.reporter.gang.addressingMode: MULTICAST

Graphite (org.apache.flink.metrics.graphite.GraphiteReporter)

In order to use this reporter you must copy /opt/flink-metrics-graphite-1.3.2.jar into the /lib folder of your Flink distribution.

Parameters:

  • host - the Graphite server host
  • port - the Graphite server port
  • protocol - protocol to use (TCP/UDP)

Example configuration:

metrics.reporters: grph
metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP

StatsD (org.apache.flink.metrics.statsd.StatsDReporter)

In order to use this reporter you must copy /opt/flink-metrics-statsd-1.3.2.jar into the /lib folder of your Flink distribution.

Parameters:

  • host - the StatsD server host
  • port - the StatsD server port

Example configuration:

metrics.reporters: stsd
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125

Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter)

In order to use this reporter you must copy /opt/flink-metrics-datadog-1.3.2.jar into the /lib folder of your Flink distribution.

Note any variables in Flink metrics, such as <host>, <job_name>, <tm_id>, <subtask_index>, <task_name>, and <operator_name>, will be sent to Datadog as tags. Tags will look like host:localhost and job_name:myjobname.

Parameters:

  • apikey - the Datadog API key
  • tags - (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma only

Example configuration:

metrics.reporters: dghttp
metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: xxx
metrics.reporter.dghttp.tags: myflinkapp,prod

System metrics

By default Flink gathers several metrics that provide deep insights on the current state. This section is a reference of all these metrics.

The tables below generally feature 4 columns:

  • The “Scope” column describes which scope format is used to generate the system scope. For example, if the cell contains “Operator” then the scope format for “metrics.scope.operator” is used. If the cell contains multiple values, separated by a slash, then the metrics are reported multiple times for different entities, like for both job- and taskmanagers.

  • The (optional)”Infix” column describes which infix is appended to the system scope.

  • The “Metrics” column lists the names of all metrics that are registered for the given scope and infix.

  • The “Description” column provides information as to what a given metric is measuring.

Note that all dots in the infix/metric name columns are still subject to the “metrics.delimiter” setting.

Thus, in order to infer the metric identifier:

  1. Take the scope-format based on the “Scope” column
  2. Append the value in the “Infix” column if present, and account for the “metrics.delimiter” setting
  3. Append metric name.

CPU:

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.CPU Load The recent CPU usage of the JVM.
Time The CPU time used by the JVM.

Memory:

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.Memory Heap.Used The amount of heap memory currently used.
Heap.Committed The amount of heap memory guaranteed to be available to the JVM.
Heap.Max The maximum amount of heap memory that can be used for memory management.
NonHeap.Used The amount of non-heap memory currently used.
NonHeap.Committed The amount of non-heap memory guaranteed to be available to the JVM.
NonHeap.Max The maximum amount of non-heap memory that can be used for memory management.
Direct.Count The number of buffers in the direct buffer pool.
Direct.MemoryUsed The amount of memory used by the JVM for the direct buffer pool.
Direct.TotalCapacity The total capacity of all buffers in the direct buffer pool.
Mapped.Count The number of buffers in the mapped buffer pool.
Mapped.MemoryUsed The amount of memory used by the JVM for the mapped buffer pool.
Mapped.TotalCapacity The number of buffers in the mapped buffer pool.

Threads:

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.ClassLoader Threads.Count The total number of live threads.

GarbageCollection:

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.GarbageCollector <GarbageCollector>.Count The total number of collections that have occurred.
<GarbageCollector>.Time The total time spent performing garbage collection.

ClassLoader:

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.ClassLoader ClassesLoaded The total number of classes loaded since the start of the JVM.
ClassesUnloaded The total number of classes unloaded since the start of the JVM.

Network:

Scope Infix Metrics Description
TaskManager Status.Network AvailableMemorySegments The number of unused memory segments.
TotalMemorySegments The number of allocated memory segments.
Task buffers inputQueueLength The number of queued input buffers.
outputQueueLength The number of queued output buffers.
inPoolUsage An estimate of the input buffers usage.
outPoolUsage An estimate of the output buffers usage.
Network.<Input|Output>.<gate>
(only available if taskmanager.net.detailed-metrics config option is set)
totalQueueLen Total number of queued buffers in all input/output channels.
minQueueLen Minimum number of queued buffers in all input/output channels.
maxQueueLen Maximum number of queued buffers in all input/output channels.
avgQueueLen Average number of queued buffers in all input/output channels.

Cluster:

Scope Metrics Description
JobManager numRegisteredTaskManagers The number of registered taskmanagers.
numRunningJobs The number of running jobs.
taskSlotsAvailable The number of available task slots.
taskSlotsTotal The total number of task slots.

Availability:

Scope Metrics Description
Job (only available on JobManager) restartingTime The time it took to restart the job, or how long the current restart has been in progress.
uptime The time that the job has been running without interruption.

Returns -1 for completed jobs.

downtime For jobs currently in a failing/recovering situation, the time elapsed during this outage.

Returns 0 for running jobs and -1 for completed jobs.

fullRestarts The total number of full restarts since this job was submitted.

Checkpointing:

Scope Metrics Description
Job (only available on JobManager) lastCheckpointDuration The time it took to complete the last checkpoint.
lastCheckpointSize The total size of the last checkpoint.
lastCheckpointExternalPath The path where the last external checkpoint was stored.
lastCheckpointRestoreTimestamp Timestamp when the last checkpoint was restored at the coordinator.
lastCheckpointAlignmentBuffered The number of buffered bytes during alignment over all subtasks for the last checkpoint.
numberOfInProgressCheckpoints The number of in progress checkpoints.
numberOfCompletedCheckpoints The number of successfully completed checkpoints.
numberOfFailedCheckpoints The number of failed checkpoints.
totalNumberOfCheckpoints The number of total checkpoints (in progress, completed, failed).
Task checkpointAlignmentTime The time in nanoseconds that the last barrier alignment took to complete, or how long the current alignment has taken so far.

IO:

Scope Metrics Description
Task currentLowWatermark The lowest watermark this task has received.
numBytesInLocal The total number of bytes this task has read from a local source.
numBytesInLocalPerSecond The number of bytes this task reads from a local source per second.
numBytesInRemote The total number of bytes this task has read from a remote source.
numBytesInRemotePerSecond The number of bytes this task reads from a remote source per second.
numBytesOut The total number of bytes this task has emitted.
numBytesOutPerSecond The number of bytes this task emits per second.
Task/Operator numRecordsIn The total number of records this operator/task has received.
numRecordsInPerSecond The number of records this operator/task receives per second.
numRecordsOut The total number of records this operator/task has emitted.
numRecordsOutPerSecond The number of records this operator/task sends per second.
Operator latency The latency distributions from all incoming sources.
numSplitsProcessed The total number of InputSplits this data source has processed (if the operator is a data source).

Connectors:

Kafka Connectors
Scope Metrics Description
Operator commitsSucceeded Kafka offset commit success count if Kafka commit is turned on and checkpointing is enabled.
Operator commitsFailed Kafka offset commit failure count if Kafka commit is turned on and checkpointing is enabled.

Latency tracking

Flink allows to track the latency of records traveling through the system. To enable the latency tracking a latencyTrackingInterval (in milliseconds) has to be set to a positive value in the ExecutionConfig.

At the latencyTrackingInterval, the sources will periodically emit a special record, called a LatencyMarker. The marker contains a timestamp from the time when the record has been emitted at the sources. Latency markers can not overtake regular user records, thus if records are queuing up in front of an operator, it will add to the latency tracked by the marker.

Note that the latency markers are not accounting for the time user records spend in operators as they are bypassing them. In particular the markers are not accounting for the time records spend for example in window buffers. Only if operators are not able to accept new records, thus they are queuing up, the latency measured using the markers will reflect that.

All intermediate operators keep a list of the last n latencies from each source to compute a latency distribution. The sink operators keep a list from each source, and each parallel source instance to allow detecting latency issues caused by individual machines.

Currently, Flink assumes that the clocks of all machines in the cluster are in sync. We recommend setting up an automated clock synchronisation service (like NTP) to avoid false latency results.

Dashboard integration

Metrics that were gathered for each task or operator can also be visualized in the Dashboard. On the main page for a job, select the Metrics tab. After selecting one of the tasks in the top graph you can select metrics to display using the Add Metric drop-down menu.

  • Task metrics are listed as <subtask_index>.<metric_name>.
  • Operator metrics are listed as <subtask_index>.<operator_name>.<metric_name>.

Each metric will be visualized as a separate graph, with the x-axis representing time and the y-axis the measured value. All graphs are automatically updated every 10 seconds, and continue to do so when navigating to another page.

There is no limit as to the number of visualized metrics; however only numeric metrics can be visualized.

Back to top