Source code for pyflink.datastream.window

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import math
from abc import ABC, abstractmethod
from enum import Enum
from io import BytesIO
from typing import TypeVar, Generic, Iterable, Collection, Any, cast, Optional

from pyflink.common import Time, Types
from pyflink.common.constants import MAX_LONG_VALUE, MIN_LONG_VALUE
from pyflink.common.serializer import TypeSerializer
from pyflink.datastream.functions import RuntimeContext, InternalWindowFunction, ReduceFunction
from pyflink.datastream.output_tag import OutputTag
from pyflink.datastream.state import StateDescriptor, ReducingStateDescriptor, \
    ValueStateDescriptor, State, ReducingState
from pyflink.metrics import MetricGroup

__all__ = ['Window',
           'TimeWindow',
           'CountWindow',
           'GlobalWindow',
           'WindowAssigner',
           'TumblingProcessingTimeWindows',
           'TumblingEventTimeWindows',
           'SlidingProcessingTimeWindows',
           'SlidingEventTimeWindows',
           'ProcessingTimeSessionWindows',
           'EventTimeSessionWindows',
           'DynamicProcessingTimeSessionWindows',
           'DynamicEventTimeSessionWindows',
           'GlobalWindows',
           'MergingWindowAssigner',
           'CountTumblingWindowAssigner',
           'CountSlidingWindowAssigner',
           'TriggerResult',
           'Trigger',
           'EventTimeTrigger',
           'ProcessingTimeTrigger',
           'ContinuousEventTimeTrigger',
           'ContinuousProcessingTimeTrigger',
           'NeverTrigger',
           'PurgingTrigger',
           'CountTrigger',
           'TimeWindowSerializer',
           'CountWindowSerializer',
           'GlobalWindowSerializer',
           'SessionWindowTimeGapExtractor']


def long_to_int_with_bit_mixing(x: int) -> int:
    x = (x ^ (x >> 30)) * 0xbf58476d1ce4e5b9
    x = (x ^ (x >> 27)) * 0x94d049bb133111eb
    x = x ^ (x >> 31)
    return x


def mod_inverse(x: int) -> int:
    inverse = x * x * x
    inverse *= 2 - x * inverse
    inverse *= 2 - x * inverse
    inverse *= 2 - x * inverse
    return inverse


class Window(ABC):
    """
    Window is a grouping of elements into finite buckets. Windows have a maximum timestamp
    which means that, at some point, all elements that go into one window will have arrived.
    """

    @abstractmethod
    def max_timestamp(self) -> int:
        pass


[docs]class TimeWindow(Window): """ Window that represents a time interval from start (inclusive) to end (exclusive). """ def __init__(self, start: int, end: int): super(TimeWindow, self).__init__() self.start = start self.end = end def max_timestamp(self) -> int: return self.end - 1 def intersects(self, other: 'TimeWindow') -> bool: """ Returns True if this window intersects the given window. """ return self.start <= other.end and self.end >= other.start def cover(self, other: 'TimeWindow') -> 'TimeWindow': """ Returns the minimal window covers both this window and the given window. """ return TimeWindow(min(self.start, other.start), max(self.end, other.end)) @staticmethod def get_window_start_with_offset(timestamp: int, offset: int, window_size: int): """ Method to get the window start for a timestamp. :param timestamp: epoch millisecond to get the window start. :param offset: The offset which window start would be shifted by. :param window_size: The size of the generated windows. :return: window start """ return timestamp - (timestamp - offset + window_size) % window_size @staticmethod def merge_windows(windows: Iterable['TimeWindow'], callback: 'MergingWindowAssigner.MergeCallback[TimeWindow]') -> None: """ Merge overlapping :class`TimeWindow`. """ sorted_windows = list(windows) sorted_windows.sort() merged = [] current_merge = None current_merge_set = set() for candidate in sorted_windows: if current_merge is None: current_merge = candidate current_merge_set.add(candidate) elif current_merge.intersects(candidate): current_merge = current_merge.cover(candidate) current_merge_set.add(candidate) else: merged.append((current_merge, current_merge_set)) current_merge = candidate current_merge_set = set() current_merge_set.add(candidate) if current_merge is not None: merged.append((current_merge, current_merge_set)) for merge_key, merge_set in merged: if len(merge_set) > 1: callback.merge(merge_set, merge_key) def __hash__(self): return self.start + mod_inverse((self.end << 1) + 1) def __eq__(self, other): return self.__class__ == other.__class__ and self.end == other.end \ and self.start == other.start def __lt__(self, other: 'TimeWindow'): if not isinstance(other, TimeWindow): raise Exception("Does not support comparison with non-TimeWindow %s" % other) return self.start == other.start and self.end < other.end or self.start < other.start def __le__(self, other: 'TimeWindow'): return self.__eq__(other) and self.__lt__(other) def __repr__(self): return "TimeWindow(start={}, end={})".format(self.start, self.end)
[docs]class CountWindow(Window): """ A Window that represents a count window. For each count window, we will assign a unique id. Thus this CountWindow can act as namespace part in state. We can attach data to each different CountWindow. """ def __init__(self, id: int): super(CountWindow, self).__init__() self.id = id def max_timestamp(self) -> int: return MAX_LONG_VALUE def __hash__(self): return long_to_int_with_bit_mixing(self.id) def __eq__(self, other): return self.__class__ == other.__class__ and self.id == other.id def __repr__(self): return "CountWindow(id={})".format(self.id)
[docs]class GlobalWindow(Window): """ The default window into which all data is placed GlobalWindows. """ def __init__(self): super(GlobalWindow, self).__init__() @staticmethod def get() -> 'GlobalWindow': return GlobalWindow() def max_timestamp(self) -> int: return MAX_LONG_VALUE def __eq__(self, other): return self.__class__ == other.__class__ def __hash__(self): return 0 def __repr__(self): return "GlobalWindow"
class TimeWindowSerializer(TypeSerializer[TimeWindow]): """ The serializer used to write the TimeWindow type. """ def __init__(self): self._underlying_coder = None def serialize(self, element: TimeWindow, stream: BytesIO) -> None: if self._underlying_coder is None: self._underlying_coder = self._get_coder().get_impl() bytes_data = self._underlying_coder.encode(element) stream.write(bytes_data) def deserialize(self, stream: BytesIO) -> TimeWindow: if self._underlying_coder is None: self._underlying_coder = self._get_coder().get_impl() bytes_data: bytes = stream.read(16) return self._underlying_coder.decode(bytes_data) def _get_coder(self): from pyflink.fn_execution import coders return coders.TimeWindowCoder() class CountWindowSerializer(TypeSerializer[CountWindow]): def __init__(self): self._underlying_coder = None def serialize(self, element: CountWindow, stream: BytesIO) -> None: if self._underlying_coder is None: self._underlying_coder = self._get_coder().get_impl() bytes_data = self._underlying_coder.encode(element) stream.write(bytes_data) def deserialize(self, stream: BytesIO) -> CountWindow: if self._underlying_coder is None: self._underlying_coder = self._get_coder().get_impl() bytes_data = stream.read(8) return self._underlying_coder.decode(bytes_data) def _get_coder(self): from pyflink.fn_execution import coders return coders.CountWindowCoder() class GlobalWindowSerializer(TypeSerializer[GlobalWindow]): """ A TypeSerializer for GlobalWindow. """ def __init__(self): self._underlying_coder = None def serialize(self, element: GlobalWindow, stream: BytesIO) -> None: if self._underlying_coder is None: self._underlying_coder = self._get_coder().get_impl() bytes_data = self._underlying_coder.encode(element) stream.write(bytes_data) def deserialize(self, stream: BytesIO) -> GlobalWindow: if self._underlying_coder is None: self._underlying_coder = self._get_coder().get_impl() bytes_data = stream.read(8) return self._underlying_coder.decode(bytes_data) def _get_coder(self): from pyflink.fn_execution import coders return coders.GlobalWindowCoder() T = TypeVar('T') W = TypeVar('W') W2 = TypeVar('W2') IN = TypeVar('IN') OUT = TypeVar('OUT') KEY = TypeVar('KEY')
[docs]class TriggerResult(Enum): """ Result type for trigger methods. This determines what happens with the window, for example whether the window function should be called, or the window should be discarded. If a :class:`Trigger` returns TriggerResult.FIRE or TriggerResult.FIRE_AND_PURGE but the window does not contain any data the window function will not be invoked, i.e. no data will be produced for the window. - CONTINUE: No action is taken on the window. - FIRE_AND_PURGE: Evaluates the window function and emits the 'window result'. - FIRE: On FIRE, the window is evaluated and results are emitted. The window is not purged though, all elements are retained. - PURGE: All elements in the window are cleared and the window is discarded, without evaluating the window function or emitting any elements. """ CONTINUE = (False, False) FIRE_AND_PURGE = (True, True) FIRE = (True, False) PURGE = (False, True) def is_fire(self) -> bool: return self.value[0] def is_purge(self) -> bool: return self.value[1]
class Trigger(ABC, Generic[T, W]): """ A Trigger determines when a pane of a window should be evaluated to emit the results for that part of the window. A pane is the bucket of elements that have the same key (assigned by the KeySelector) and same Window. An element can be in multiple panes if it was assigned to multiple windows by the WindowAssigner. These panes all have their own instance of the Trigger. Triggers must not maintain state internally since they can be re-created or reused for different keys. All necessary state should be persisted using the state abstraction available on the TriggerContext. When used with a MergingWindowAssigner the Trigger must return true from :func:`can_merge` and :func:`on_merge` most be properly implemented. """ class TriggerContext(ABC): """ A context object that is given to :class:`Trigger` methods to allow them to register timer callbacks and deal with state. """ @abstractmethod def get_current_processing_time(self) -> int: """ :return: The current processing time. """ pass @abstractmethod def get_metric_group(self) -> MetricGroup: """ Returns the metric group for this :class:`Trigger`. This is the same metric group that would be returned from :func:`~pyflink.datasteam.functions.RuntimeContext.get_metric_group` in a user function. :return: The metric group. """ pass @abstractmethod def get_current_watermark(self) -> int: """ :return: The current watermark time. """ pass @abstractmethod def register_processing_time_timer(self, time: int) -> None: """ Register a system time callback. When the current system time passes the specified time :func:`~Trigger.on_processing_time` is called with the time specified here. :param time: The time at which to invoke :func:`~Trigger.on_processing_time`. """ pass @abstractmethod def register_event_time_timer(self, time: int) -> None: """ Register an event-time callback. When the current watermark passes the specified time :func:`~Trigger.on_event_time` is called with the time specified here. :param time: The watermark at which to invoke :func:`~Trigger.on_event_time`. """ pass @abstractmethod def delete_processing_time_timer(self, time: int) -> None: """ Delete the processing time trigger for the given time. """ pass @abstractmethod def delete_event_time_timer(self, time: int) -> None: """ Delete the event-time trigger for the given time. """ pass @abstractmethod def get_partitioned_state(self, state_descriptor: StateDescriptor) -> State: """ Retrieves a :class:`State` object that can be used to interact with fault-tolerant state that is scoped to the window and key of the current trigger invocation. :param state_descriptor: The StateDescriptor that contains the name and type of the state that is being accessed. :return: The partitioned state object. """ pass class OnMergeContext(TriggerContext): """ Extension of :class:`TriggerContext` that is given to :func:`~Trigger.on_merge`. """ @abstractmethod def merge_partitioned_state(self, state_descriptor: StateDescriptor) -> None: pass @abstractmethod def on_element(self, element: T, timestamp: int, window: W, ctx: 'Trigger.TriggerContext') -> TriggerResult: """ Called for every element that gets added to a pane. The result of this will determine whether the pane is evaluated to emit results. :param element: The element that arrived. :param timestamp: The timestamp of the element that arrived. :param window: The window to which the element is being added. :param ctx: A context object that can be used to register timer callbacks. """ pass @abstractmethod def on_processing_time(self, time: int, window: W, ctx: 'Trigger.TriggerContext') -> TriggerResult: """ Called when a processing-time timer that was set using the trigger context fires. :param time: The timestamp at which the timer fired. :param window: The window for which the timer fired. :param ctx: A context object that can be used to register timer callbacks. """ pass @abstractmethod def on_event_time(self, time: int, window: W, ctx: 'Trigger.TriggerContext') -> TriggerResult: """ Called when an event-time timer that was set using the trigger context fires. :param time: The timestamp at which the timer fired. :param window: The window for which the timer fired. :param ctx: A context object that can be used to register timer callbacks. """ pass def can_merge(self) -> bool: """ .. note:: If this returns true you must properly implement :func:`~Trigger.on_merge` :return: True if this trigger supports merging of trigger state and can therefore be used with a MergingWindowAssigner. """ return False @abstractmethod def on_merge(self, window: W, ctx: 'Trigger.OnMergeContext') -> None: """ Called when several windows have been merged into one window by the :class:`WindowAssigner`. :param window: The new window that results from the merge. :param ctx: A context object that can be used to register timer callbacks and access state. """ pass @abstractmethod def clear(self, window: W, ctx: 'Trigger.TriggerContext') -> None: """ Clears any state that the trigger might still hold for the given window. This is called when a window is purged. Timers set using :func:`~TriggerContext.register_event_time_timer` and :func:`~TriggerContext.register_processing_time_timer` should be deleted here as well as state acquired using :func:`~TriggerContext.get_partitioned_state`. """ pass class WindowAssigner(ABC, Generic[T, W]): """ A :class:`WindowAssigner` assigns zero or more :class:`Window` to an element. In a window operation, elements are grouped by their key (if available) and by the windows to which it was assigned. The set of elements with the same key and window is called a pane. When a :class:`Trigger` decides that a certain pane should fire the WindowFunction is applied to produce output elements for that pane. """ class WindowAssignerContext(ABC): """ A context provided to the :class:`WindowAssigner` that allows it to query the current processing time. """ @abstractmethod def get_current_processing_time(self) -> int: """ :return: The current processing time. """ pass @abstractmethod def get_runtime_context(self) -> RuntimeContext: """ :return: The current runtime context. """ pass @abstractmethod def assign_windows(self, element: T, timestamp: int, context: 'WindowAssigner.WindowAssignerContext') -> Collection[W]: """ :param element: The element to which windows should be assigned. :param timestamp: The timestamp of the element. :param context: The :class:`WindowAssignerContext` in which the assigner operates. :return: A collection of windows that should be assigned to the element. """ pass @abstractmethod def get_default_trigger(self, env) -> Trigger[T, W]: """ :param env: The StreamExecutionEnvironment used to compile the DataStream job. :return: The default trigger associated with this :class:`WindowAssigner`. """ pass @abstractmethod def get_window_serializer(self) -> TypeSerializer[W]: """ :return: A TypeSerializer for serializing windows that are assigned by this WindowAssigner. """ pass @abstractmethod def is_event_time(self) -> bool: """ :return: True if elements are assigned to windows based on event time, false otherwise. """ pass
[docs]class MergingWindowAssigner(WindowAssigner[T, W]): """ A `WindowAssigner` that can merge windows. """ class MergeCallback(ABC, Generic[W2]): """ Callback to be used in :func:`~MergingWindowAssigner.merge_windows` for specifying which windows should be merged. """ @abstractmethod def merge(self, to_be_merged: Iterable[W2], merge_result: W2) -> None: """ Specifies that the given windows should be merged into the result window. :param to_be_merged: The list of windows that should be merged into one window. :param merge_result: The resulting merged window. """ pass @abstractmethod def merge_windows(self, windows: Iterable[W], callback: 'MergingWindowAssigner.MergeCallback[W]') -> None: """ Determines which windows (if any) should be merged. :param windows: The window candidates. :param callback: A callback that can be invoked to signal which windows should be merged. """ pass
class WindowOperationDescriptor(object): def __init__(self, assigner: WindowAssigner, trigger: Trigger, allowed_lateness: int, late_data_output_tag: Optional[OutputTag], window_state_descriptor: StateDescriptor, window_serializer: TypeSerializer, internal_window_function: InternalWindowFunction): self.assigner = assigner self.trigger = trigger self.allowed_lateness = allowed_lateness self.late_data_output_tag = late_data_output_tag self.window_state_descriptor = window_state_descriptor self.internal_window_function = internal_window_function self.window_serializer = window_serializer def generate_op_name(self): return type(self.assigner).__name__ def generate_op_desc(self, windowed_stream_type, func_desc): return "%s(%s, %s, %s)" % ( windowed_stream_type, self.assigner, type(self.trigger).__name__, func_desc)
[docs]class SessionWindowTimeGapExtractor(ABC): """ A SessionWindowTimeGapExtractor extracts session time gaps for Dynamic Session Window Assigners. """ @abstractmethod def extract(self, element: Any) -> int: """ Extracts the session time gap. :param element The input element. :return The session time gap in milliseconds. """ pass
[docs]class EventTimeTrigger(Trigger[T, TimeWindow]): """ A Trigger that fires once the watermark passes the end of the window to which a pane belongs. """ def on_element(self, element: T, timestamp: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: if window.max_timestamp() <= ctx.get_current_watermark(): return TriggerResult.FIRE else: ctx.register_event_time_timer(window.max_timestamp()) # No action is taken on the window. return TriggerResult.CONTINUE def on_processing_time(self, time: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: # No action is taken on the window. return TriggerResult.CONTINUE def on_event_time(self, time: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: if time == window.max_timestamp(): return TriggerResult.FIRE else: # No action is taken on the window. return TriggerResult.CONTINUE def can_merge(self) -> bool: return True def on_merge(self, window: TimeWindow, ctx: 'Trigger.OnMergeContext') -> None: window_max_timestamp = window.max_timestamp() if window_max_timestamp > ctx.get_current_watermark(): ctx.register_event_time_timer(window_max_timestamp) def clear(self, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> None: ctx.delete_event_time_timer(window.max_timestamp()) @staticmethod def create() -> 'EventTimeTrigger': return EventTimeTrigger()
[docs]class ContinuousEventTimeTrigger(Trigger[T, TimeWindow]): """ A Trigger that continuously fires based on a given time interval. This fires based Watermarks. """ def __init__(self, interval: int): self.interval = interval self.state_desc = ReducingStateDescriptor("fire-time", Min, Types.LONG()) self.fire_timestamp_state = None @staticmethod def of(interval: Time) -> 'ContinuousEventTimeTrigger': return ContinuousEventTimeTrigger(interval.to_milliseconds()) def on_element(self, element: T, timestamp: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: if window.max_timestamp() <= ctx.get_current_watermark(): # if the watermark is already past the window fire immediately return TriggerResult.FIRE else: ctx.register_event_time_timer(window.max_timestamp()) fire_timestamp_state = cast(ReducingState, ctx.get_partitioned_state(self.state_desc)) if fire_timestamp_state.get() is None: self.register_next_fire_timestamp(timestamp - (timestamp % self.interval), window, ctx, fire_timestamp_state) return TriggerResult.CONTINUE def on_processing_time(self, time: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: return TriggerResult.CONTINUE def on_event_time(self, time: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: if time == window.max_timestamp(): return TriggerResult.FIRE fire_timestamp_state = cast(ReducingState, ctx.get_partitioned_state(self.state_desc)) fire_timestamp = fire_timestamp_state.get() if fire_timestamp is not None and fire_timestamp == time: fire_timestamp_state.clear() self.register_next_fire_timestamp(time, window, ctx, fire_timestamp_state) return TriggerResult.FIRE return TriggerResult.CONTINUE def on_merge(self, window: TimeWindow, ctx: 'Trigger.OnMergeContext') -> None: ctx.merge_partitioned_state(self.state_desc) next_fire_timestamp = cast(ReducingState, ctx.get_partitioned_state(self.state_desc)).get() if next_fire_timestamp is not None: ctx.register_event_time_timer(next_fire_timestamp) def clear(self, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> None: fire_timestamp = cast(ReducingState, ctx.get_partitioned_state(self.state_desc)) timestamp = fire_timestamp.get() if timestamp is not None: ctx.delete_event_time_timer(timestamp) fire_timestamp.clear() def can_merge(self) -> bool: return True def register_next_fire_timestamp(self, time: int, window: TimeWindow, ctx: 'Trigger.TriggerContext', fire_timestamp_state: ReducingState): next_fire_timestamp = min(time + self.interval, window.max_timestamp()) fire_timestamp_state.add(next_fire_timestamp) ctx.register_event_time_timer(next_fire_timestamp)
class Min(ReduceFunction): def reduce(self, value1, value2): return min(value1, value2)
[docs]class ProcessingTimeTrigger(Trigger[T, TimeWindow]): """ A Trigger that fires once the current system time passes the end of the window to which a pane belongs. """ def on_element(self, element: T, timestamp: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: ctx.register_processing_time_timer(window.max_timestamp()) return TriggerResult.CONTINUE def on_processing_time(self, time: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: return TriggerResult.FIRE def on_event_time(self, time: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: return TriggerResult.CONTINUE def can_merge(self) -> bool: return True def on_merge(self, window: TimeWindow, ctx: 'Trigger.OnMergeContext') -> None: window_max_timestamp = window.max_timestamp() if window_max_timestamp > ctx.get_current_processing_time(): ctx.register_processing_time_timer(window_max_timestamp) def clear(self, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> None: ctx.delete_processing_time_timer(window.max_timestamp()) @staticmethod def create() -> 'ProcessingTimeTrigger': return ProcessingTimeTrigger()
[docs]class ContinuousProcessingTimeTrigger(Trigger[T, TimeWindow]): """ A Trigger that continuously fires based on a given time interval as measured by the clock of the machine on which the job is running. """ def __init__(self, interval: int): self.interval = interval self.state_desc = ReducingStateDescriptor("fire-time", Min, Types.LONG()) self.fire_timestamp_state = None @staticmethod def of(interval: Time) -> 'ContinuousProcessingTimeTrigger': return ContinuousProcessingTimeTrigger(interval.to_milliseconds()) def on_element(self, element: T, timestamp: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: fire_timestamp_state = cast(ReducingState, ctx.get_partitioned_state(self.state_desc)) timestamp = ctx.get_current_processing_time() if fire_timestamp_state.get() is None: self.register_next_fire_timestamp(timestamp - (timestamp % self.interval), window, ctx, fire_timestamp_state) return TriggerResult.CONTINUE def on_processing_time(self, time: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: fire_timestamp_state = cast(ReducingState, ctx.get_partitioned_state(self.state_desc)) if fire_timestamp_state.get() == time: fire_timestamp_state.clear() self.register_next_fire_timestamp(time, window, ctx, fire_timestamp_state) return TriggerResult.FIRE return TriggerResult.CONTINUE def on_event_time(self, time: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: return TriggerResult.CONTINUE def on_merge(self, window: TimeWindow, ctx: 'Trigger.OnMergeContext') -> None: # States for old windows will lose after the call. ctx.merge_partitioned_state(self.state_desc) # Register timer for this new window. next_fire_timestamp = cast(ReducingState, ctx.get_partitioned_state(self.state_desc)).get() if next_fire_timestamp is not None: ctx.register_processing_time_timer(next_fire_timestamp) def clear(self, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> None: fire_timestamp_state = cast(ReducingState, ctx.get_partitioned_state(self.state_desc)) timestamp = fire_timestamp_state.get() if timestamp is not None: ctx.delete_processing_time_timer(timestamp) fire_timestamp_state.clear() def can_merge(self) -> bool: return True def register_next_fire_timestamp(self, time: int, window: TimeWindow, ctx: 'Trigger.TriggerContext', fire_timestamp_state: ReducingState): next_fire_timestamp = min(time + self.interval, window.max_timestamp()) fire_timestamp_state.add(next_fire_timestamp) ctx.register_processing_time_timer(next_fire_timestamp)
[docs]class PurgingTrigger(Trigger[T, Window]): """ A trigger that can turn any Trigger into a purging Trigger. When the nested trigger fires, this will return a FIRE_AND_PURGE TriggerResult. """ def __init__(self, nested_trigger: Trigger[T, Window]): self.nested_trigger = nested_trigger @staticmethod def of(nested_trigger: Trigger[T, Window]) -> 'PurgingTrigger': return PurgingTrigger(nested_trigger) def on_element(self, element: T, timestamp: int, window: Window, ctx: 'Trigger.TriggerContext') -> TriggerResult: trigger_result = self.nested_trigger.on_element(element, timestamp, window, ctx) if trigger_result.is_fire() is True: return TriggerResult.FIRE_AND_PURGE else: return trigger_result def on_event_time(self, time: int, window: Window, ctx: 'Trigger.TriggerContext') -> TriggerResult: trigger_result = self.nested_trigger.on_event_time(time, window, ctx) if trigger_result.is_fire() is True: return TriggerResult.FIRE_AND_PURGE else: return trigger_result def on_processing_time(self, time: int, window: Window, ctx: 'Trigger.TriggerContext') -> TriggerResult: trigger_result = self.nested_trigger.on_processing_time(time, window, ctx) if trigger_result.is_fire() is True: return TriggerResult.FIRE_AND_PURGE else: return trigger_result def clear(self, window: Window, ctx: 'Trigger.TriggerContext') -> None: self.nested_trigger.clear(window, ctx) def can_merge(self) -> bool: return self.nested_trigger.can_merge() def on_merge(self, window: Window, ctx: 'Trigger.OnMergeContext') -> None: self.nested_trigger.on_merge(window, ctx)
[docs]class CountTrigger(Trigger[T, CountWindow]): """ A Trigger that fires once the count of elements in a pane reaches the given count. """ def __init__(self, window_size: int): self._window_size = window_size self._count_state_descriptor = ReducingStateDescriptor( "count", lambda a, b: a + b, Types.LONG()) @staticmethod def of(window_size: int) -> 'CountTrigger': return CountTrigger(window_size) def on_element(self, element: T, timestamp: int, window: CountWindow, ctx: Trigger.TriggerContext) -> TriggerResult: count_state = cast(ReducingState, ctx.get_partitioned_state(self._count_state_descriptor)) count_state.add(1) if count_state.get() >= self._window_size: # On FIRE, the window is evaluated and results are emitted. The window is not purged # though, all elements are retained. count_state.clear() return TriggerResult.FIRE else: # No action is taken on the window. return TriggerResult.CONTINUE def on_processing_time(self, time: int, window: CountWindow, ctx: Trigger.TriggerContext) -> TriggerResult: # No action is taken on the window. return TriggerResult.CONTINUE def on_event_time(self, time: int, window: CountWindow, ctx: Trigger.TriggerContext) -> TriggerResult: # No action is taken on the window. return TriggerResult.CONTINUE def can_merge(self) -> bool: return True def on_merge(self, window: CountWindow, ctx: Trigger.OnMergeContext) -> None: ctx.merge_partitioned_state(self._count_state_descriptor) def clear(self, window: CountWindow, ctx: Trigger.TriggerContext) -> None: count_state = ctx.get_partitioned_state(self._count_state_descriptor) count_state.clear()
[docs]class NeverTrigger(Trigger[T, GlobalWindow]): """ A trigger that never fires, as default Trigger for GlobalWindows. """ def on_element(self, element: T, timestamp: int, window: GlobalWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: return TriggerResult.CONTINUE def on_processing_time(self, time: int, window: GlobalWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: return TriggerResult.CONTINUE def on_event_time(self, time: int, window: GlobalWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: return TriggerResult.CONTINUE def on_merge(self, window: GlobalWindow, ctx: 'Trigger.OnMergeContext') -> None: pass def clear(self, window: GlobalWindow, ctx: 'Trigger.TriggerContext') -> None: pass
[docs]class CountTumblingWindowAssigner(WindowAssigner[T, CountWindow]): """ A WindowAssigner that windows elements into fixed-size windows based on the count number of the elements. Windows cannot overlap. """ def __init__(self, window_size: int): """ :param window_size: The size of the windows in number of elements. """ self._window_size = window_size self._count_descriptor = ValueStateDescriptor('tumble-count-assigner', Types.LONG()) @staticmethod def of(window_size: int) -> 'CountTumblingWindowAssigner': return CountTumblingWindowAssigner(window_size) def assign_windows(self, element: T, timestamp: int, context: 'WindowAssigner.WindowAssignerContext') -> Collection[CountWindow]: count_state = context.get_runtime_context().get_state(self._count_descriptor) count_value = count_state.value() if count_value is None: current_count = 0 else: current_count = count_value count_state.update(current_count + 1) return [CountWindow(current_count // self._window_size)] def get_default_trigger(self, env) -> Trigger[T, CountWindow]: return CountTrigger(self._window_size) def get_window_serializer(self) -> TypeSerializer[CountWindow]: return CountWindowSerializer() def is_event_time(self) -> bool: return False def __repr__(self) -> str: return "CountTumblingWindowAssigner(%s)" % self._window_size
[docs]class CountSlidingWindowAssigner(WindowAssigner[T, CountWindow]): """ A WindowAssigner that windows elements into sliding windows based on the count number of the elements. Windows can possibly overlap. """ def __init__(self, window_size: int, window_slide: int): """ :param window_size: The size of the windows in number of elements. :param window_slide: The slide interval in number of elements. """ self._window_size = window_size self._window_slide = window_slide self._count_descriptor = ValueStateDescriptor('slide-count-assigner', Types.LONG()) @staticmethod def of(window_size: int, window_slide: int) -> 'CountSlidingWindowAssigner': return CountSlidingWindowAssigner(window_size, window_slide) def assign_windows(self, element: T, timestamp: int, context: 'WindowAssigner.WindowAssignerContext') -> Collection[CountWindow]: count_state = context.get_runtime_context().get_state(self._count_descriptor) count_value = count_state.value() if count_value is None: current_count = 0 else: current_count = count_value count_state.update(current_count + 1) last_id = current_count // self._window_slide last_start = last_id * self._window_slide last_end = last_start + self._window_size - 1 windows = [] while last_id >= 0 and last_start <= current_count <= last_end: if last_start <= current_count <= last_end: windows.append(CountWindow(last_id)) last_id -= 1 last_start -= self._window_slide last_end -= self._window_slide return windows def get_default_trigger(self, env) -> Trigger[T, CountWindow]: return CountTrigger(self._window_size) def get_window_serializer(self) -> TypeSerializer[CountWindow]: return CountWindowSerializer() def is_event_time(self) -> bool: return False def __repr__(self): return "CountSlidingWindowAssigner(%s, %s)" % (self._window_size, self._window_slide)
[docs]class TumblingProcessingTimeWindows(WindowAssigner[T, TimeWindow]): """ A WindowAssigner that windows elements into windows based on the current system time of the machine the operation is running on. Windows cannot overlap. For example, in order to window into windows of 1 minute, every 10 seconds: :: >>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\ ... .window(TumblingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(10))) """ def __init__(self, size: int, offset: int): if abs(offset) >= size: raise Exception("TumblingProcessingTimeWindows parameters must satisfy " "abs(offset) < size") self._size = size self._offset = offset @staticmethod def of(size: Time, offset: Time = None) -> 'TumblingProcessingTimeWindows': """ Creates a new :class:`TumblingProcessingTimeWindows` :class:`WindowAssigner` that assigns elements to time windows based on the element timestamp and offset. For example, if you want window a stream by hour, but window begins at the 15th minutes of each hour, you can use of(Time.hours(1), Time.minutes(15)), then you will get time windows start at 0:15:00,1:15:00,2:15:00,etc. Rather than that, if you are living in somewhere which is not using UTC±00:00 time, such as China which is using UTC+08:00, and you want a time window with size of one day, and window begins at every 00:00:00 of local time, you may use of(Time.days(1), Time.hours(-8)). The parameter of offset is Time.hours(-8) since UTC+08:00 is 8 hours earlier than UTC time. :param size The size of the generated windows. :param offset The offset which window start would be shifted by. :return The time policy. """ if offset is None: return TumblingProcessingTimeWindows(size.to_milliseconds(), 0) else: return TumblingProcessingTimeWindows(size.to_milliseconds(), offset.to_milliseconds()) def assign_windows(self, element: T, timestamp: int, context: WindowAssigner.WindowAssignerContext) -> Collection[TimeWindow]: current_processing_time = context.get_current_processing_time() start = TimeWindow.get_window_start_with_offset(current_processing_time, self._offset, self._size) return [TimeWindow(start, start + self._size)] def get_default_trigger(self, env) -> Trigger[T, TimeWindow]: return ProcessingTimeTrigger() def get_window_serializer(self) -> TypeSerializer[TimeWindow]: return TimeWindowSerializer() def is_event_time(self) -> bool: return False def __repr__(self): return "TumblingProcessingTimeWindows(%s, %s)" % (self._size, self._offset)
[docs]class TumblingEventTimeWindows(WindowAssigner[T, TimeWindow]): """ A WindowAssigner that windows elements into windows based on the timestamp of the elements. Windows cannot overlap. For example, in order to window into windows of 1 minute: :: >>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\ ... .window(TumblingEventTimeWindows.of(Time.minutes(1))) """ def __init__(self, size: int, offset: int): if abs(offset) >= size: raise Exception("TumblingEventTimeWindows parameters must satisfy abs(offset) < size") self._size = size self._offset = offset @staticmethod def of(size: Time, offset: Time = None) -> 'TumblingEventTimeWindows': """ Creates a new TumblingEventTimeWindows WindowAssigner that assigns elements to time windows based on the element timestamp, offset and a staggering offset, depending on the staggering policy. :param size The size of the generated windows. :param offset The globalOffset which window start would be shifted by. """ if offset is None: return TumblingEventTimeWindows(size.to_milliseconds(), 0) else: return TumblingEventTimeWindows(size.to_milliseconds(), offset.to_milliseconds()) def assign_windows(self, element: T, timestamp: int, context: WindowAssigner.WindowAssignerContext) -> Collection[TimeWindow]: if timestamp > MIN_LONG_VALUE: start = TimeWindow.get_window_start_with_offset(timestamp, self._offset, self._size) return [TimeWindow(start, start + self._size)] else: raise Exception("Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', " + "or did you forget to call " + "'data_stream.assign_timestamps_and_watermarks(...)'?") def get_default_trigger(self, env) -> Trigger[T, TimeWindow]: return EventTimeTrigger() def get_window_serializer(self) -> TypeSerializer[TimeWindow]: return TimeWindowSerializer() def is_event_time(self) -> bool: return True def __repr__(self): return "TumblingEventTimeWindows(%s, %s)" % (self._size, self._offset)
[docs]class SlidingProcessingTimeWindows(WindowAssigner[T, TimeWindow]): """ A WindowAssigner that windows elements into sliding windows based on the current system time of the machine the operation is running on. Windows can possibly overlap. For example, in order to window into windows of 1 minute, every 10 seconds: :: >>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\ ... .window(SlidingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(10))) """ def __init__(self, size: int, slide: int, offset: int): if abs(offset) >= slide or size <= 0: raise Exception("SlidingProcessingTimeWindows parameters must satisfy " + "abs(offset) < slide and size > 0") self._size = size self._slide = slide self._offset = offset self._pane_size = math.gcd(size, slide) @staticmethod def of(size: Time, slide: Time, offset: Time = None) -> 'SlidingProcessingTimeWindows': """ Creates a new :class:`SlidingProcessingTimeWindows` :class:`WindowAssigner` that assigns elements to time windows based on the element timestamp and offset. For example, if you want window a stream by hour,but window begins at the 15th minutes of each hour, you can use of(Time.hours(1),Time.minutes(15)),then you will get time windows start at 0:15:00,1:15:00,2:15:00,etc. Rather than that, if you are living in somewhere which is not using UTC±00:00 time, such as China which is using UTC+08:00, and you want a time window with size of one day, and window begins at every 00:00:00 of local time,you may use of(Time.days(1),Time.hours(-8)). The parameter of offset is Time.hours(-8) since UTC+08:00 is 8 hours earlier than UTC time. :param size The size of the generated windows. :param slide The slide interval of the generated windows. :param offset The offset which window start would be shifted by. :return The time policy. """ if offset is None: return SlidingProcessingTimeWindows(size.to_milliseconds(), slide.to_milliseconds(), 0) else: return SlidingProcessingTimeWindows(size.to_milliseconds(), slide.to_milliseconds(), offset.to_milliseconds()) def assign_windows(self, element: T, timestamp: int, context: 'WindowAssigner.WindowAssignerContext') -> Collection[TimeWindow]: current_processing_time = context.get_current_processing_time() last_start = TimeWindow.get_window_start_with_offset( current_processing_time, self._offset, self._slide) windows = [TimeWindow(start, start + self._size) for start in range(last_start, current_processing_time - self._size, -self._slide)] return windows def get_default_trigger(self, env) -> Trigger[T, TimeWindow]: return ProcessingTimeTrigger() def get_window_serializer(self) -> TypeSerializer[TimeWindow]: return TimeWindowSerializer() def is_event_time(self) -> bool: return False def __repr__(self) -> str: return "SlidingProcessingTimeWindows(%s, %s, %s)" % (self._size, self._slide, self._offset)
[docs]class SlidingEventTimeWindows(WindowAssigner[T, TimeWindow]): """ A WindowAssigner that windows elements into sliding windows based on the timestamp of the elements. Windows can possibly overlap. For example, in order to window into windows of 1 minute, every 10 seconds: :: >>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\ ... .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))) """ def __init__(self, size: int, slide: int, offset: int): if abs(offset) >= slide or size <= 0: raise Exception("SlidingEventTimeWindows parameters must satisfy " + "abs(offset) < slide and size > 0") self._size = size self._slide = slide self._offset = offset self._pane_size = math.gcd(size, slide) @staticmethod def of(size: Time, slide: Time, offset: Time = None) -> 'SlidingEventTimeWindows': """ Creates a new :class:`SlidingEventTimeWindows` :class:`WindowAssigner` that assigns elements to time windows based on the element timestamp and offset. For example, if you want window a stream by hour,but window begins at the 15th minutes of each hour, you can use of(Time.hours(1),Time.minutes(15)),then you will get time windows start at 0:15:00,1:15:00,2:15:00,etc. Rather than that, if you are living in somewhere which is not using UTC±00:00 time, such as China which is using UTC+08:00, and you want a time window with size of one day, and window begins at every 00:00:00 of local time,you may use of(Time.days(1),Time.hours(-8)). The parameter of offset is Time.hours(-8) since UTC+08:00 is 8 hours earlier than UTC time. :param size The size of the generated windows. :param slide The slide interval of the generated windows. :param offset The offset which window start would be shifted by. :return The time policy. """ if offset is None: return SlidingEventTimeWindows(size.to_milliseconds(), slide.to_milliseconds(), 0) else: return SlidingEventTimeWindows(size.to_milliseconds(), slide.to_milliseconds(), offset.to_milliseconds()) def assign_windows(self, element: T, timestamp: int, context: 'WindowAssigner.WindowAssignerContext') -> Collection[TimeWindow]: if timestamp > MIN_LONG_VALUE: last_start = TimeWindow.get_window_start_with_offset(timestamp, self._offset, self._slide) windows = [TimeWindow(start, start + self._size) for start in range(last_start, timestamp - self._size, -self._slide)] return windows else: raise Exception("Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', " "or did you forget to call " + "'data_stream.assign_timestamps_and_watermarks(...)'?") def get_default_trigger(self, env) -> Trigger[T, TimeWindow]: return EventTimeTrigger() def get_window_serializer(self) -> TypeSerializer[TimeWindow]: return TimeWindowSerializer() def is_event_time(self) -> bool: return True def __repr__(self) -> str: return "SlidingEventTimeWindows(%s, %s, %s)" % (self._size, self._slide, self._offset)
[docs]class ProcessingTimeSessionWindows(MergingWindowAssigner[T, TimeWindow]): """ A WindowAssigner that windows elements into sessions based on the current processing time. Windows cannot overlap. For example, the processing interval is set to 1 minutes: :: >>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\ ... .window(ProcessingTimeSessionWindows.with_gap(Time.minutes(1))) """ def __init__(self, session_gap: int): if session_gap <= 0: raise Exception("ProcessingTimeSessionWindows parameters must satisfy 0 < size") self._session_gap = session_gap @staticmethod def with_gap(size: Time) -> 'ProcessingTimeSessionWindows': """ Creates a new SessionWindows WindowAssigner that assigns elements to sessions based on the element timestamp. :param size: The session timeout, i.e. the time gap between sessions :return: The policy. """ return ProcessingTimeSessionWindows(size.to_milliseconds()) @staticmethod def with_dynamic_gap( extractor: SessionWindowTimeGapExtractor) -> 'DynamicProcessingTimeSessionWindows': """ Creates a new SessionWindows WindowAssigner that assigns elements to sessions based on the element timestamp. :param extractor: The extractor to use to extract the time gap from the input elements. :return: The policy. """ return DynamicProcessingTimeSessionWindows(extractor) def merge_windows(self, windows: Iterable[TimeWindow], callback: 'MergingWindowAssigner.MergeCallback[TimeWindow]') -> None: TimeWindow.merge_windows(windows, callback) def assign_windows(self, element: T, timestamp: int, context: 'WindowAssigner.WindowAssignerContext') -> Collection[TimeWindow]: timestamp = context.get_current_processing_time() return [TimeWindow(timestamp, timestamp + self._session_gap)] def get_default_trigger(self, env) -> Trigger[T, TimeWindow]: return ProcessingTimeTrigger() def get_window_serializer(self) -> TypeSerializer[TimeWindow]: return TimeWindowSerializer() def is_event_time(self) -> bool: return False def __repr__(self): return "ProcessingTimeSessionWindows(%s)" % self._session_gap
[docs]class EventTimeSessionWindows(MergingWindowAssigner[T, TimeWindow]): """ A :class:`WindowAssigner` that windows elements into sessions based on the timestamp of the elements. Windows cannot overlap. For example, Set the timestamp of the element to 1 minutes: :: >>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\ ... .window(EventTimeSessionWindows.with_gap(Time.minutes(1))) """ def __init__(self, session_gap: int): if session_gap <= 0: raise Exception("EventTimeSessionWindows parameters must satisfy 0 < size") self._session_gap = session_gap @staticmethod def with_gap(size: Time) -> 'EventTimeSessionWindows': """ Creates a new SessionWindows WindowAssigner that assigns elements to sessions based on the element timestamp. :param size: The session timeout, i.e. the time gap between sessions. :return: The policy. """ return EventTimeSessionWindows(size.to_milliseconds()) @staticmethod def with_dynamic_gap( extractor: SessionWindowTimeGapExtractor) -> 'DynamicEventTimeSessionWindows': """ Creates a new SessionWindows WindowAssigner that assigns elements to sessions based on the element timestamp. :param extractor: The extractor to use to extract the time gap from the input elements. :return: The policy. """ return DynamicEventTimeSessionWindows(extractor) def merge_windows(self, windows: Iterable[TimeWindow], callback: 'MergingWindowAssigner.MergeCallback[TimeWindow]') -> None: TimeWindow.merge_windows(windows, callback) def assign_windows(self, element: T, timestamp: int, context: 'WindowAssigner.WindowAssignerContext') -> Collection[TimeWindow]: return [TimeWindow(timestamp, timestamp + self._session_gap)] def get_default_trigger(self, env) -> Trigger[T, TimeWindow]: return EventTimeTrigger() def get_window_serializer(self) -> TypeSerializer[TimeWindow]: return TimeWindowSerializer() def is_event_time(self) -> bool: return True def __repr__(self): return "EventTimeSessionWindows(%s)" % self._session_gap
[docs]class DynamicProcessingTimeSessionWindows(MergingWindowAssigner[T, TimeWindow]): """ A WindowAssigner that windows elements into sessions based on the current processing time. Windows cannot overlap. For example, in order to window into windows with a dynamic time gap: :: >>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\ ... .window(DynamicProcessingTimeSessionWindows.with_dynamic_gap(extractor)) """ def __init__(self, session_window_time_gap_extractor: SessionWindowTimeGapExtractor): self._session_gap = 0 self._session_window_time_gap_extractor = session_window_time_gap_extractor @staticmethod def with_dynamic_gap( extractor: SessionWindowTimeGapExtractor) -> 'DynamicProcessingTimeSessionWindows': """ Creates a new SessionWindows WindowAssigner that assigns elements to sessions based on the element timestamp. :param extractor: The extractor to use to extract the time gap from the input elements. :return: The policy. """ return DynamicProcessingTimeSessionWindows(extractor) def merge_windows(self, windows: Iterable[TimeWindow], callback: 'MergingWindowAssigner.MergeCallback[TimeWindow]') -> None: TimeWindow.merge_windows(windows, callback) def assign_windows(self, element: T, timestamp: int, context: 'WindowAssigner.WindowAssignerContext') -> Collection[TimeWindow]: timestamp = context.get_current_processing_time() self._session_gap = self._session_window_time_gap_extractor.extract(element) if self._session_gap <= 0: raise Exception("Dynamic session time gap must satisfy 0 < gap") return [TimeWindow(timestamp, timestamp + self._session_gap)] def get_default_trigger(self, env) -> Trigger[T, TimeWindow]: return ProcessingTimeTrigger() def get_window_serializer(self) -> TypeSerializer[TimeWindow]: return TimeWindowSerializer() def is_event_time(self) -> bool: return False def __repr__(self): return "DynamicProcessingTimeSessionWindows(%s)" % self._session_gap
[docs]class DynamicEventTimeSessionWindows(MergingWindowAssigner[T, TimeWindow]): """ A :class:`WindowAssigner` that windows elements into sessions based on the timestamp of the elements. Windows cannot overlap. For example, in order to window into windows with a dynamic time gap: :: >>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \\ ... .window(DynamicEventTimeSessionWindows.with_dynamic_gap(extractor)) """ def __init__(self, session_window_time_gap_extractor: SessionWindowTimeGapExtractor): self._session_gap = 0 self._session_window_time_gap_extractor = session_window_time_gap_extractor @staticmethod def with_dynamic_gap( extractor: SessionWindowTimeGapExtractor) -> 'DynamicEventTimeSessionWindows': """ Creates a new SessionWindows WindowAssigner that assigns elements to sessions based on the element timestamp. :param extractor: The extractor to use to extract the time gap from the input elements. :return: The policy. """ return DynamicEventTimeSessionWindows(extractor) def merge_windows(self, windows: Iterable[TimeWindow], callback: 'MergingWindowAssigner.MergeCallback[TimeWindow]') -> None: TimeWindow.merge_windows(windows, callback) def assign_windows(self, element: T, timestamp: int, context: 'WindowAssigner.WindowAssignerContext') -> Collection[TimeWindow]: self._session_gap = self._session_window_time_gap_extractor.extract(element) if self._session_gap <= 0: raise Exception("Dynamic session time gap must satisfy 0 < gap") return [TimeWindow(timestamp, timestamp + self._session_gap)] def get_default_trigger(self, env) -> Trigger[T, TimeWindow]: return EventTimeTrigger() def get_window_serializer(self) -> TypeSerializer[TimeWindow]: return TimeWindowSerializer() def is_event_time(self) -> bool: return True def __repr__(self): return "DynamicEventTimeSessionWindows(%s)" % self._session_gap
[docs]class GlobalWindows(WindowAssigner[T, GlobalWindow]): """ A WindowAssigner that assigns all elements to the same GlobalWindow. """ def __init__(self) -> None: super().__init__() def assign_windows(self, element: T, timestamp: int, context: 'WindowAssigner.WindowAssignerContext') -> Collection[GlobalWindow]: return [GlobalWindow.get()] @staticmethod def create() -> 'GlobalWindows': """ Creates a new GlobalWindows WindowAssigner that assigns all elements to the same GlobalWindow. """ return GlobalWindows() def get_default_trigger(self, env) -> Trigger[T, GlobalWindow]: return NeverTrigger() def get_window_serializer(self) -> TypeSerializer[GlobalWindow]: return GlobalWindowSerializer() def is_event_time(self) -> bool: return False def __repr__(self) -> str: return "GlobalWindows()"