Metrics

PyFlink exposes a metric system that allows gathering and exposing metrics to external systems.

Registering metrics

You can access the metric system from a User-defined Function by calling function_context.get_metric_group() in the open method. The get_metric_group() method returns a MetricGroup object on which you can create and register new metrics.

Metric types

PyFlink supports Counters, Gauges, Distribution and Meters.

Counter

A Counter is used to count something. The current value can be in- or decremented using inc()/inc(n: int) or dec()/dec(n: int). You can create and register a Counter by calling counter(name: str) on a MetricGroup.

from pyflink.table.udf import ScalarFunction

class MyUDF(ScalarFunction):

    def __init__(self):
        self.counter = None

    def open(self, function_context):
        self.counter = function_context.get_metric_group().counter("my_counter")

    def eval(self, i):
        self.counter.inc(i)
        return i

Gauge

A Gauge provides a value on demand. You can register a gauge by calling gauge(name: str, obj: Callable[[], int]) on a MetricGroup. The Callable object will be used to report the values. Gauge metrics are restricted to integer-only values.

from pyflink.table.udf import ScalarFunction

class MyUDF(ScalarFunction):

    def __init__(self):
        self.length = 0

    def open(self, function_context):
        function_context.get_metric_group().gauge("my_gauge", lambda : self.length)

    def eval(self, i):
        self.length = i
        return i - 1

Distribution

A metric that reports information(sum, count, min, max and mean) about the distribution of reported values. The value can be updated using update(n: int). You can register a distribution by calling distribution(name: str) on a MetricGroup. Distribution metrics are restricted to integer-only distributions.

from pyflink.table.udf import ScalarFunction

class MyUDF(ScalarFunction):

    def __init__(self):
        self.distribution = None

    def open(self, function_context):
        self.distribution = function_context.get_metric_group().distribution("my_distribution")

    def eval(self, i):
        self.distribution.update(i)
        return i - 1

Meter

A Meter measures an average throughput. An occurrence of an event can be registered with the mark_event() method. The occurrence of multiple events at the same time can be registered with mark_event(n: int) method. You can register a meter by calling meter(self, name: str, time_span_in_seconds: int = 60) on a MetricGroup. The default value of time_span_in_seconds is 60.

from pyflink.table.udf import ScalarFunction

class MyUDF(ScalarFunction):

    def __init__(self):
        self.meter = None

    def open(self, function_context):
        super().open(function_context)
        # an average rate of events per second over 120s, default is 60s.
        self.meter = function_context.get_metric_group().meter("my_meter", time_span_in_seconds=120)

    def eval(self, i):
        self.meter.mark_event(i)
        return i - 1

Scope

You can refer to the Java metric document for more details on Scope definition.

User Scope

You can define a user scope by calling MetricGroup.add_group(key: str, value: str = None). If value is not None, creates a new key-value MetricGroup pair. The key group is added to this group’s sub-groups, while the value group is added to the key group’s sub-groups. In this case, the value group will be returned and a user variable will be defined.

function_context
    .get_metric_group()
    .add_group("my_metrics")
    .counter("my_counter")

function_context
    .get_metric_group()
    .add_group("my_metrics_key", "my_metrics_value")
    .counter("my_counter")

System Scope

You can refer to the Java metric document for more details on System Scope.

List of all Variables

You can refer to the Java metric document for more details on List of all Variables.

User Variables

You can define a user variable by calling MetricGroup.addGroup(key: str, value: str = None) and specifying the value parameter.

Important: User variables cannot be used in scope formats.

function_context
    .get_metric_group()
    .add_group("my_metrics_key", "my_metrics_value")
    .counter("my_counter")

You can refer to the Java metric document for more details on the following sections:

Back to top