Source code for pyflink.datastream.state

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from abc import ABC, abstractmethod

from typing import TypeVar, Generic, Iterable, List, Iterator, Dict, Tuple

from pyflink.common.typeinfo import TypeInformation, Types, PickledBytesTypeInfo

__all__ = [
    'ValueStateDescriptor',
    'ValueState',
    'ListStateDescriptor',
    'ListState',
    'MapStateDescriptor',
    'MapState',
    'ReducingStateDescriptor',
    'ReducingState',
    'AggregatingStateDescriptor',
    'AggregatingState'
]

T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')
IN = TypeVar('IN')
OUT = TypeVar('OUT')


class State(ABC):
    """
    Interface that different types of partitioned state must implement.
    """

    @abstractmethod
    def clear(self) -> None:
        """
        Removes the value mapped under the current key.
        """
        pass


[docs]class ValueState(State, Generic[T]): """ :class:`State` interface for partitioned single-value state. The value can be retrieved or updated. The state is accessed and modified by user functions, and checkpointed consistently by the system as part of the distributed snapshots. """
[docs] @abstractmethod def value(self) -> T: """ Returns the current value for the state. When the state is not partitioned the returned value is the same for all inputs in a given operator instance. If state partitioning is applied, the value returned depends on the current operator input, as the operator maintains an independent state for each partition. """ pass
[docs] @abstractmethod def update(self, value: T) -> None: """ Updates the operator state accessible by :func:`value` to the given value. The next time :func:`value` is called (for the same state partition) the returned state will represent the updated value. When a partitioned state is updated with null, the state for the current key will be removed and the default value is returned on the next access. """ pass
class AppendingState(State, Generic[IN, OUT]): """ Base interface for partitioned state taht supports adding elements and inspecting the current state. Elements can either be kept in a buffer (list-like) or aggregated into one value. This state is accessed and modified by user functions, and checkpointed consistently by the system as part of the distributed snapshots. The state is only accessible by functions applied on a KeyedStream. The key is automatically supplied by the system, so the function always sees the value mapped to the key of the current element. That way, the system can handle stream and state partitioning consistently together. """ @abstractmethod def get(self) -> OUT: """ Returns the elements under the current key. """ pass @abstractmethod def add(self, value: IN) -> None: """ Adding the given value to the tail of this list state. """ pass class MergingState(AppendingState[IN, OUT]): """ Extension of AppendingState that allows merging of state. That is, two instance of MergingState can be combined into a single instance that contains all the information of the two merged states. """ pass
[docs]class ReducingState(MergingState[T, T]): """ :class:`State` interface for reducing state. Elements can be added to the state, they will be combined using a reduce function. The current state can be inspected. The state is accessed and modified by user functions, and checkpointed consistently by the system as part of the distributed snapshots. The state is only accessible by functions applied on a KeyedStream. The key is automatically supplied by the system, so the function always sees the value mapped to the key of the current element. That way, the system can handle stream and state partitioning consistently together. """ pass
[docs]class AggregatingState(MergingState[IN, OUT]): """ :class:`State` interface for aggregating state, based on an :class:`~pyflink.datastream.functions.AggregateFunction`. Elements that are added to this type of state will be eagerly pre-aggregated using a given AggregateFunction. The state holds internally always the accumulator type of the AggregateFunction. When accessing the result of the state, the function's :func:`~pyflink.datastream.functions.AggregateFunction.get_result` method. The state is accessed and modified by user functions, and checkpointed consistently by the system as part of the distributed snapshots. The state is only accessible by functions applied on a KeyedStream. The key is automatically supplied by the system, so the function always sees the value mapped to the key of the current element. That way, the system can handle stream and state partitioning consistently together. """ pass
[docs]class ListState(MergingState[T, Iterable[T]]): """ :class:`State` interface for partitioned list state in Operations. The state is accessed and modified by user functions, and checkpointed consistently by the system as part of the distributed snapshots. Currently only keyed list state is supported. When it is a keyed list state, the state key is automatically supplied by the system, so the user function always sees the value mapped to the key of the current element. That way, the system can handle stream and state partitioning consistently together. """
[docs] @abstractmethod def update(self, values: List[T]) -> None: """ Updating existing values to to the given list of values. """ pass
[docs] @abstractmethod def add_all(self, values: List[T]) -> None: """ Adding the given values to the tail of this list state. """ pass
def __iter__(self) -> Iterator[T]: return iter(self.get())
[docs]class MapState(State, Generic[K, V]): """ :class:`State` interface for partitioned key-value state. The key-value pair can be added, updated and retrieved. The state is accessed and modified by user functions, and checkpointed consistently by the system as part of the distributed snapshots. The state key is automatically supplied by the system, so the function always sees the value mapped to the key of the current element. That way, the system can handle stream and state partitioning consistently together. """
[docs] @abstractmethod def get(self, key: K) -> V: """ Returns the current value associated with the given key. """ pass
[docs] @abstractmethod def put(self, key: K, value: V) -> None: """ Associates a new value with the given key. """ pass
[docs] @abstractmethod def put_all(self, dict_value: Dict[K, V]) -> None: """ Copies all of the mappings from the given map into the state. """ pass
[docs] @abstractmethod def remove(self, key: K) -> None: """ Deletes the mapping of the given key. """ pass
[docs] @abstractmethod def contains(self, key: K) -> bool: """ Returns whether there exists the given mapping. """ pass
[docs] @abstractmethod def items(self) -> Iterable[Tuple[K, V]]: """ Returns all the mappings in the state. """ pass
[docs] @abstractmethod def keys(self) -> Iterable[K]: """ Returns all the keys in the state. """ pass
[docs] @abstractmethod def values(self) -> Iterable[V]: """ Returns all the values in the state. """ pass
[docs] @abstractmethod def is_empty(self) -> bool: """ Returns true if this state contains no key-value mappings, otherwise false. """ pass
def __getitem__(self, key: K) -> V: return self.get(key) def __setitem__(self, key: K, value: V) -> None: self.put(key, value) def __delitem__(self, key: K) -> None: self.remove(key) def __contains__(self, key: K) -> bool: return self.contains(key) def __iter__(self) -> Iterator[K]: return iter(self.keys())
class StateDescriptor(ABC): """ Base class for state descriptors. A StateDescriptor is used for creating partitioned State in stateful operations. """ def __init__(self, name: str, type_info: TypeInformation): """ Constructor for StateDescriptor. :param name: The name of the state :param type_info: The type information of the value. """ self.name = name self.type_info = type_info def get_name(self) -> str: """ Get the name of the state. :return: The name of the state. """ return self.name
[docs]class ValueStateDescriptor(StateDescriptor): """ StateDescriptor for ValueState. This can be used to create partitioned value state using RuntimeContext.get_state(ValueStateDescriptor). """ def __init__(self, name: str, value_type_info: TypeInformation): """ Constructor of the ValueStateDescriptor. :param name: The name of the state. :param value_type_info: the type information of the state. """ if not isinstance(value_type_info, PickledBytesTypeInfo): raise ValueError("The type information of the value could only be PickledBytesTypeInfo " "(created via Types.PICKLED_BYTE_ARRAY()) currently, got %s." % type(value_type_info)) super(ValueStateDescriptor, self).__init__(name, value_type_info)
[docs]class ListStateDescriptor(StateDescriptor): """ StateDescriptor for ListState. This can be used to create state where the type is a list that can be appended and iterated over. """ def __init__(self, name: str, elem_type_info: TypeInformation): """ Constructor of the ListStateDescriptor. :param name: The name of the state. :param elem_type_info: the type information of the state element. """ if not isinstance(elem_type_info, PickledBytesTypeInfo): raise ValueError("The type information of the element could only be " "PickledBytesTypeInfo (created via Types.PICKLED_BYTE_ARRAY()) " "currently, got %s" % type(elem_type_info)) super(ListStateDescriptor, self).__init__(name, Types.LIST(elem_type_info))
[docs]class MapStateDescriptor(StateDescriptor): """ StateDescriptor for MapState. This can be used to create state where the type is a map that can be updated and iterated over. """ def __init__(self, name: str, key_type_info: TypeInformation, value_type_info: TypeInformation): """ Constructor of the MapStateDescriptor. :param name: The name of the state. :param key_type_info: The type information of the key. :param value_type_info: the type information of the value. """ if not isinstance(key_type_info, PickledBytesTypeInfo): raise ValueError("The type information of the key could only be PickledBytesTypeInfo " "(created via Types.PICKLED_BYTE_ARRAY()) currently, got %s" % type(key_type_info)) if not isinstance(value_type_info, PickledBytesTypeInfo): raise ValueError("The type information of the value could only be PickledBytesTypeInfo " "(created via Types.PICKLED_BYTE_ARRAY()) currently, got %s" % type(value_type_info)) super(MapStateDescriptor, self).__init__(name, Types.MAP(key_type_info, value_type_info))
[docs]class ReducingStateDescriptor(StateDescriptor): """ StateDescriptor for ReducingState. This can be used to create partitioned reducing state using RuntimeContext.get_reducing_state(ReducingStateDescriptor). """ def __init__(self, name: str, reduce_function, type_info: TypeInformation): """ Constructor of the ReducingStateDescriptor. :param name: The name of the state. :param reduce_function: The ReduceFunction used to aggregate the state. :param type_info: The type of the values in the state. """ super(ReducingStateDescriptor, self).__init__(name, type_info) from pyflink.datastream.functions import ReduceFunction, ReduceFunctionWrapper if not isinstance(reduce_function, ReduceFunction): if callable(reduce_function): reduce_function = ReduceFunctionWrapper(reduce_function) # type: ignore else: raise TypeError("The input must be a ReduceFunction or a callable function!") if not isinstance(type_info, PickledBytesTypeInfo): raise ValueError("The type information of the state could only be PickledBytesTypeInfo " "(created via Types.PICKLED_BYTE_ARRAY()) currently, got %s" % type(type_info)) self._reduce_function = reduce_function
[docs] def get_reduce_function(self): return self._reduce_function
[docs]class AggregatingStateDescriptor(StateDescriptor): """ A StateDescriptor for AggregatingState. The type internally stored in the state is the type of the Accumulator of the :func:`~pyflink.datastream.functions.AggregateFunction`. """ def __init__(self, name: str, agg_function, state_type_info): super(AggregatingStateDescriptor, self).__init__(name, state_type_info) from pyflink.datastream.functions import AggregateFunction if not isinstance(agg_function, AggregateFunction): raise TypeError("The input must be a pyflink.datastream.functions.AggregateFunction!") if not isinstance(state_type_info, PickledBytesTypeInfo): raise ValueError("The type information of the state could only be PickledBytesTypeInfo " "(created via Types.PICKLED_BYTE_ARRAY()) currently, got %s" % type(state_type_info)) self._agg_function = agg_function
[docs] def get_agg_function(self): return self._agg_function