Source code for pyflink.datastream.functions

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import abc
from abc import ABC
from typing import Union, Any, Dict

from py4j.java_gateway import JavaObject

from pyflink.datastream.time_domain import TimeDomain
from pyflink.java_gateway import get_gateway

__all__ = [
    'RuntimeContext',
    'MapFunction',
    'CoMapFunction',
    'FlatMapFunction',
    'CoFlatMapFunction',
    'ReduceFunction',
    'KeySelector',
    'FilterFunction',
    'Partitioner',
    'SourceFunction',
    'SinkFunction',
    'ProcessFunction',
    'KeyedProcessFunction']


[docs]class RuntimeContext(object): """ A RuntimeContext contains information about the context in which functions are executed. Each parallel instance of the function will have a context through which it can access static contextual information (such as the current parallelism). """ def __init__(self, task_name: str, task_name_with_subtasks: str, number_of_parallel_subtasks: int, max_number_of_parallel_subtasks: int, index_of_this_subtask: int, attempt_number: int, job_parameters: Dict[str, str]): self._task_name = task_name self._task_name_with_subtasks = task_name_with_subtasks self._number_of_parallel_subtasks = number_of_parallel_subtasks self._max_number_of_parallel_subtasks = max_number_of_parallel_subtasks self._index_of_this_subtask = index_of_this_subtask self._attempt_number = attempt_number self._job_parameters = job_parameters
[docs] def get_task_name(self) -> str: """ Returns the name of the task in which the UDF runs, as assigned during plan construction. """ return self._task_name
[docs] def get_number_of_parallel_subtasks(self) -> int: """ Gets the parallelism with which the parallel task runs. """ return self._number_of_parallel_subtasks
[docs] def get_max_number_of_parallel_subtasks(self) -> int: """ Gets the number of max-parallelism with which the parallel task runs. """ return self._max_number_of_parallel_subtasks
[docs] def get_index_of_this_subtask(self) -> int: """ Gets the number of this parallel subtask. The numbering starts from 0 and goes up to parallelism-1 (parallelism as returned by :func:`~RuntimeContext.get_number_of_parallel_subtasks`). """ return self._index_of_this_subtask
[docs] def get_attempt_number(self) -> int: """ Gets the attempt number of this parallel subtask. First attempt is numbered 0. """ return self._attempt_number
[docs] def get_task_name_with_subtasks(self) -> str: """ Returns the name of the task, appended with the subtask indicator, such as "MyTask (3/6)", where 3 would be (:func:`~RuntimeContext.get_index_of_this_subtask` + 1), and 6 would be :func:`~RuntimeContext.get_number_of_parallel_subtasks`. """ return self._task_name_with_subtasks
[docs] def get_job_parameter(self, key: str, default_value: str): """ Gets the global job parameter value associated with the given key as a string. """ return self._job_parameters[key] if key in self._job_parameters else default_value
class Function(abc.ABC): """ The base class for all user-defined functions. """ def open(self, runtime_context: RuntimeContext): pass def close(self): pass
[docs]class MapFunction(Function): """ Base class for Map functions. Map functions take elements and transform them, element wise. A Map function always produces a single result element for each input element. Typical applications are parsing elements, converting data types, or projecting out fields. Operations that produce multiple result elements from a single input element can be implemented using the FlatMapFunction. The basic syntax for using a MapFunction is as follows: :: >>> ds = ... >>> new_ds = ds.map(MyMapFunction()) """
[docs] @abc.abstractmethod def map(self, value): """ The mapping method. Takes an element from the input data and transforms it into exactly one element. :param value: The input value. :return: The transformed value. """ pass
[docs]class CoMapFunction(Function): """ A CoMapFunction implements a map() transformation over two connected streams. The same instance of the transformation function is used to transform both of the connected streams. That way, the stream transformations can share state. The basic syntax for using a CoMapFunction is as follows: :: >>> ds1 = ... >>> ds2 = ... >>> new_ds = ds1.connect(ds2).map(MyCoMapFunction()) """
[docs] @abc.abstractmethod def map1(self, value): """ This method is called for each element in the first of the connected streams. :param value: The stream element :return: The resulting element """ pass
[docs] @abc.abstractmethod def map2(self, value): """ This method is called for each element in the second of the connected streams. :param value: The stream element :return: The resulting element """ pass
[docs]class FlatMapFunction(Function): """ Base class for flatMap functions. FlatMap functions take elements and transform them, into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists and arrays. Operations that produce multiple strictly one result element per input element can also use the MapFunction. The basic syntax for using a MapFUnction is as follows: :: >>> ds = ... >>> new_ds = ds.flat_map(MyFlatMapFunction()) """
[docs] @abc.abstractmethod def flat_map(self, value): """ The core mthod of the FlatMapFunction. Takes an element from the input data and transforms it into zero, one, or more elements. A basic implementation of flat map is as follows: :: >>> class MyFlatMapFunction(FlatMapFunction): >>> def flat_map(self, value): >>> for i in range(value): >>> yield i :param value: The input value. :return: A generator """ pass
[docs]class CoFlatMapFunction(Function): """ A CoFlatMapFunction implements a flat-map transformation over two connected streams. The same instance of the transformation function is used to transform both of the connected streams. That way, the stream transformations can share state. An example for the use of connected streams would be to apply rules that change over time onto elements of a stream. One of the connected streams has the rules, the other stream the elements to apply the rules to. The operation on the connected stream maintains the current set of rules in the state. It may receive either a rule update (from the first stream) and update the state, or a data element (from the second stream) and apply the rules in the state to the element. The result of applying the rules would be emitted. The basic syntax for using a CoFlatMapFunction is as follows: :: >>> ds1 = ... >>> ds2 = ... >>> class MyCoFlatMapFunction(CoFlatMapFunction): >>> def flat_map1(self, value): >>> for i in range(value): >>> yield i >>> def flat_map2(self, value): >>> for i in range(value): >>> yield i >>> new_ds = ds1.connect(ds2).flat_map(MyCoFlatMapFunction()) """
[docs] @abc.abstractmethod def flat_map1(self, value): """ This method is called for each element in the first of the connected streams. :param value: The input value. :return: A generator """ pass
[docs] @abc.abstractmethod def flat_map2(self, value): """ This method is called for each element in the second of the connected streams. :param value: The input value. :return: A generator """ pass
[docs]class ReduceFunction(Function): """ Base interface for Reduce functions. Reduce functions combine groups of elements to a single value, by taking always two elements and combining them into one. Reduce functions may be used on entire data sets, or on grouped data sets. In the latter case, each group is reduced individually. The basic syntax for using a ReduceFunction is as follows: :: >>> ds = ... >>> new_ds = ds.key_by(lambda x: x[1]).reduce(MyReduceFunction()) """
[docs] @abc.abstractmethod def reduce(self, value1, value2): """ The core method of ReduceFunction, combining two values into one value of the same type. The reduce function is consecutively applied to all values of a group until only a single value remains. :param value1: The first value to combine. :param value2: The second value to combine. :return: The combined value of both input values. """ pass
[docs]class KeySelector(Function): """ The KeySelector allows to use deterministic objects for operations such as reduce, reduceGroup, join coGroup, etc. If invoked multiple times on the same object, the returned key must be the same. The extractor takes an object an returns the deterministic key for that object. """
[docs] @abc.abstractmethod def get_key(self, value): """ User-defined function that deterministically extracts the key from an object. :param value: The object to get the key from. :return: The extracted key. """ pass
[docs]class FilterFunction(Function): """ A filter function is a predicate applied individually to each record. The predicate decides whether to keep the element, or to discard it. The basic syntax for using a FilterFunction is as follows: :: >>> ds = ... >>> result = ds.filter(MyFilterFunction()) Note that the system assumes that the function does not modify the elements on which the predicate is applied. Violating this assumption can lead to incorrect results. """
[docs] @abc.abstractmethod def filter(self, value): """ The filter function that evaluates the predicate. :param value: The value to be filtered. :return: True for values that should be retained, false for values to be filtered out. """ pass
[docs]class Partitioner(Function): """ Function to implement a custom partition assignment for keys. """
[docs] @abc.abstractmethod def partition(self, key: Any, num_partitions: int) -> int: """ Computes the partition for the given key. :param key: The key. :param num_partitions: The number of partitions to partition into. :return: The partition index. """ pass
class FunctionWrapper(Function): """ A basic wrapper class for user defined function. """ def __init__(self, func): self._func = func class MapFunctionWrapper(FunctionWrapper): """ A wrapper class for MapFunction. It's used for wrapping up user defined function in a MapFunction when user does not implement a MapFunction but directly pass a function object or a lambda function to map() function. """ def __init__(self, func): """ The constructor of MapFunctionWrapper. :param func: user defined function object. """ super(MapFunctionWrapper, self).__init__(func) def map(self, value): """ A delegated map function to invoke user defined function. :param value: The input value. :return: the return value of user defined map function. """ return self._func(value) class FlatMapFunctionWrapper(FunctionWrapper): """ A wrapper class for FlatMapFunction. It's used for wrapping up user defined function in a FlatMapFunction when user does not implement a FlatMapFunction but directly pass a function object or a lambda function to flat_map() function. """ def __init__(self, func): """ The constructor of MapFunctionWrapper. :param func: user defined function object. """ super(FlatMapFunctionWrapper, self).__init__(func) def flat_map(self, value): """ A delegated flat_map function to invoke user defined function. :param value: The input value. :return: the return value of user defined flat_map function. """ return self._func(value) class FilterFunctionWrapper(FunctionWrapper): """ A wrapper class for FilterFunction. It's used for wrapping up user defined function in a FilterFunction when user does not implement a FilterFunction but directly pass a function object or a lambda function to filter() function. """ def __init__(self, func): super(FilterFunctionWrapper, self).__init__(func) def filter(self, value): return self._func(value) class ReduceFunctionWrapper(FunctionWrapper): """ A wrapper class for ReduceFunction. It's used for wrapping up user defined function in a ReduceFunction when user does not implement a ReduceFunction but directly pass a function object or a lambda function to reduce() function. """ def __init__(self, func): """ The constructor of ReduceFunctionWrapper. :param func: user defined function object. """ super(ReduceFunctionWrapper, self).__init__(func) def reduce(self, value1, value2): """ A delegated reduce function to invoke user defined function. :param value1: The first value to combine. :param value2: The second value to combine. :return: The combined value of both input values. """ return self._func(value1, value2) class KeySelectorFunctionWrapper(FunctionWrapper): """ A wrapper class for KeySelector. It's used for wrapping up user defined function in a KeySelector when user does not implement a KeySelector but directly pass a function object or a lambda function to key_by() function. """ def __init__(self, func): """ The constructor of MapFunctionWrapper. :param func: user defined function object. """ super(KeySelectorFunctionWrapper, self).__init__(func) def get_key(self, value): """ A delegated get_key function to invoke user defined function. :param value: The input value. :return: the return value of user defined get_key function. """ return self._func(value) class PartitionerFunctionWrapper(FunctionWrapper): """ A wrapper class for Partitioner. It's used for wrapping up user defined function in a Partitioner when user does not implement a Partitioner but directly pass a function object or a lambda function to partition_custom() function. """ def __init__(self, func): """ The constructor of PartitionerFunctionWrapper. :param func: user defined function object. """ super(PartitionerFunctionWrapper, self).__init__(func) def partition(self, key: Any, num_partitions: int) -> int: """ A delegated partition function to invoke user defined function. :param key: The key. :param num_partitions: The number of partitions to partition into. :return: The partition index. """ return self._func(key, num_partitions) def _get_python_env(): """ An util function to get a python user defined function execution environment. """ gateway = get_gateway() exec_type = gateway.jvm.org.apache.flink.table.functions.python.PythonEnv.ExecType.PROCESS return gateway.jvm.org.apache.flink.table.functions.python.PythonEnv(exec_type) class JavaFunctionWrapper(object): """ A wrapper class that maintains a Function implemented in Java. """ def __init__(self, j_function: Union[str, JavaObject]): if isinstance(j_function, str): j_func_class = get_gateway().jvm.__getattr__(j_function) j_function = j_func_class() self._j_function = j_function def get_java_function(self): return self._j_function
[docs]class SourceFunction(JavaFunctionWrapper): """ Base class for all stream data source in Flink. """ def __init__(self, source_func: Union[str, JavaObject]): """ Constructor of SinkFunction. :param source_func: The java SourceFunction object. """ super(SourceFunction, self).__init__(source_func)
[docs]class SinkFunction(JavaFunctionWrapper): """ The base class for SinkFunctions. """ def __init__(self, sink_func: Union[str, JavaObject]): """ Constructor of SinkFunction. :param sink_func: The java SinkFunction object or the full name of the SinkFunction class. """ super(SinkFunction, self).__init__(sink_func)
[docs]class TimerService(abc.ABC): """ Interface for working with time and timers. """
[docs] @abc.abstractmethod def current_processing_time(self): """ Returns the current processing time. """ pass
[docs] @abc.abstractmethod def current_watermark(self): """ Returns the current event-time watermark. """ pass
[docs] @abc.abstractmethod def register_processing_time_timer(self, time: int): """ Registers a timer to be fired when processing time passes the given time. Timers can internally be scoped to keys and/or windows. When you set a timer in a keyed context, such as in an operation on KeyedStream then that context will so be active when you receive the timer notification. :param time: The processing time of the timer to be registered. """ pass
[docs] @abc.abstractmethod def register_event_time_timer(self, time: int): """ Registers a timer tobe fired when the event time watermark passes the given time. Timers can internally be scoped to keys and/or windows. When you set a timer in a keyed context, such as in an operation on KeyedStream then that context will so be active when you receive the timer notification. :param time: The event time of the timer to be registered. """ pass
[docs] def delete_processing_time_timer(self, time: int): """ Deletes the processing-time timer with the given trigger time. This method has only an effect if such a timer was previously registered and did not already expire. Timers can internally be scoped to keys and/or windows. When you delete a timer, it is removed from the current keyed context. :param time: The given trigger time of timer to be deleted. """ pass
[docs] def delete_event_time_timer(self, time: int): """ Deletes the event-time timer with the given trigger time. This method has only an effect if such a timer was previously registered and did not already expire. Timers can internally be scoped to keys and/or windows. When you delete a timer, it is removed from the current keyed context. :param time: The given trigger time of timer to be deleted. """ pass
[docs]class ProcessFunction(Function): """ A function that process elements of a stream. For every element in the input stream process_element(value, ctx, out) is invoked. This can produce zero or more elements as output. Implementations can also query the time and set timers through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This can again produce zero or more elements as output and register further timers. Note that access to keyed state and timers (which are also scoped to a key) is only available if the ProcessFunction is applied on a KeyedStream. """
[docs] class Context(abc.ABC): """ Information available in an invocation of process_element(value, ctx, out) or on_timer(value, ctx, out). """
[docs] @abc.abstractmethod def timer_service(self) -> TimerService: """ A Timer service for querying time and registering timers. """ pass
[docs] @abc.abstractmethod def timestamp(self) -> int: """ Timestamp of the element currently being processed or timestamp of a firing timer. This might be None, for example if the time characteristic of your program is set to TimeCharacteristic.ProcessTime. """ pass
[docs] @abc.abstractmethod def process_element(self, value, ctx: 'ProcessFunction.Context'): """ Process one element from the input stream. This function can output zero or more elements using the Collector parameter and also update internal state or set timers using the Context parameter. :param value: The input value. :param ctx: A Context that allows querying the timestamp of the element and getting a TimerService for registering timers and querying the time. The context is only valid during the invocation of this method, do not store it. """ pass
class KeyedProcessFunction(Function, ABC): """ A keyed function processes elements of a stream. For every element in the input stream, process_element() is invoked. This can produce zero or more elements as output. Implementations can also query the time and set timers through the provided Context. For firing timers on_timer() will be invoked. This can again produce zero or more elements as output and register further timers. Note that access to keyed state and timers (which are also scoped to a key) is only available if the KeyedProcessFunction is applied on a KeyedStream. """ """ A function that process elements of a stream. For every element in the input stream process_element(value, ctx, out) is invoked. This can produce zero or more elements as output. Implementations can also query the time and set timers through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This can again produce zero or more elements as output and register further timers. Note that access to keyed state and timers (which are also scoped to a key) is only available if the ProcessFunction is applied on a KeyedStream. """ class Context(ABC): @abc.abstractmethod def get_current_key(self): pass @abc.abstractmethod def timer_service(self) -> TimerService: """ A Timer service for querying time and registering timers. """ pass @abc.abstractmethod def timestamp(self) -> int: """ Timestamp of the element currently being processed or timestamp of a firing timer. This might be None, for example if the time characteristic of your program is set to TimeCharacteristic.ProcessTime. """ pass class OnTimerContext(Context): @abc.abstractmethod def time_domain(self) -> TimeDomain: """ The TimeDomain of the firing timer. :return: The TimeDomain of current fired timer. """ pass @abc.abstractmethod def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): """ Process one element from the input stream. This function can output zero or more elements using the Collector parameter and also update internal state or set timers using the Context parameter. :param value: The input value. :param ctx: A Context that allows querying the timestamp of the element and getting a TimerService for registering timers and querying the time. The context is only valid during the invocation of this method, do not store it. """ pass def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'): """ Called when a timer set using TimerService fires. :param timestamp: The timestamp of the firing timer. :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer, querying the TimeDomain of the firing timer and getting a TimerService for registering timers and querying the time. The context is only valid during the invocation of this method, do not store it. """ pass