This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
Important: Maven artifacts which depend on Scala are now suffixed with the Scala major version, e.g. "2.10" or "2.11". Please consult the migration guide on the project Wiki.

Windows

Windows on Keyed Data Streams

Flink offers a variety of methods for defining windows on a KeyedStream. All of these group elements per key, i.e., each window will contain elements with the same key value.

Basic Window Constructs

Flink offers a general window mechanism that provides flexibility, as well as a number of pre-defined windows for common use cases. See first if your use case can be served by the pre-defined windows below before moving to defining your own windows.


Transformation Description
Tumbling time window
KeyedStream → WindowedStream

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time is specified by the selected TimeCharacteristic (see time).

keyedStream.timeWindow(Time.seconds(5));

Sliding time window
KeyedStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at most 4 seconds) The notion of time is specified by the selected TimeCharacteristic (see time).

keyedStream.timeWindow(Time.seconds(5), Time.seconds(1));

Tumbling count window
KeyedStream → WindowedStream

Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.

keyedStream.countWindow(1000);

Sliding count window
KeyedStream → WindowedStream

Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at most 900 elements).

keyedStream.countWindow(1000, 100)


Transformation Description
Tumbling time window
KeyedStream → WindowedStream

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time is specified by the selected TimeCharacteristic (see time).

keyedStream.timeWindow(Time.seconds(5))

Sliding time window
KeyedStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at most 4 seconds) The notion of time is specified by the selected TimeCharacteristic (see time).

keyedStream.timeWindow(Time.seconds(5), Time.seconds(1))

Tumbling count window
KeyedStream → WindowedStream

Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.

keyedStream.countWindow(1000)

Sliding count window
KeyedStream → WindowedStream

Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at most 900 elements).

keyedStream.countWindow(1000, 100)

Advanced Window Constructs

The general mechanism can define more powerful windows at the cost of more verbose syntax. For example, below is a window definition where windows hold elements of the last 5 seconds and slides every 1 second, but the execution of the window function is triggered when 100 elements have been added to the window, and every time execution is triggered, 10 elements are retained in the window:

keyedStream
    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    .trigger(CountTrigger.of(100))
    .evictor(CountEvictor.of(10));
keyedStream
    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    .trigger(CountTrigger.of(100))
    .evictor(CountEvictor.of(10))

The general recipe for building a custom window is to specify (1) a WindowAssigner, (2) a Trigger (optionally), and (3) an Evictor (optionally).

The WindowAssigner defines how incoming elements are assigned to windows. A window is a logical group of elements that has a begin-value, and an end-value corresponding to a begin-time and end-time. Elements with timestamp (according to some notion of time described above within these values are part of the window).

For example, the SlidingEventTimeWindows assigner in the code above defines a window of size 5 seconds, and a slide of 1 second. Assume that time starts from 0 and is measured in milliseconds. Then, we have 6 windows that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. Each incoming element is assigned to the windows according to its timestamp. For example, an element with timestamp 2000 will be assigned to the first three windows. Flink comes bundled with window assigners that cover the most common use cases. You can write your own window types by extending the WindowAssigner class.

Transformation Description
Global window
KeyedStream → WindowedStream

All incoming elements of a given key are assigned to the same window. The window does not contain a default trigger, hence it will never be triggered if a trigger is not explicitly specified.

stream.window(GlobalWindows.create());
Tumbling time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (1 second below) based on their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. This assigner comes with a default trigger that fires for a window when a watermark with value higher than its end-value is received.

stream.window(TumblingEventTimeWindows.of(Time.seconds(1)));
Sliding time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows "slide" by the provided value (1 second in the example), and hence overlap. This assigner comes with a default trigger that fires for a window when a watermark with value higher than its end-value is received.

stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)));
Tumbling processing time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (1 second below) based on the current processing time. Windows do not overlap, i.e., each element is assigned to exactly one window. This assigner comes with a default trigger that fires for a window a window when the current processing time exceeds its end-value.

stream.window(TumblingProcessingTimeWindows.of(Time.seconds(1)));
Sliding processing time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows "slide" by the provided value (1 second in the example), and hence overlap. This assigner comes with a default trigger that fires for a window a window when the current processing time exceeds its end-value.

stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)));
Transformation Description
Global window
KeyedStream → WindowedStream

All incoming elements of a given key are assigned to the same window. The window does not contain a default trigger, hence it will never be triggered if a trigger is not explicitly specified.

stream.window(GlobalWindows.create)
Tumbling time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (1 second below) based on their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. This assigner comes with a default trigger that fires for a window when a watermark with value higher than its end-value is received.

stream.window(TumblingEventTimeWindows.of(Time.seconds(1)))
Sliding time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows "slide" by the provided value (1 second in the example), and hence overlap. This assigner comes with a default trigger that fires for a window when a watermark with value higher than its end-value is received.

stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
Tumbling processing time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (1 second below) based on the current processing time. Windows do not overlap, i.e., each element is assigned to exactly one window. This assigner comes with a default trigger that fires for a window a window when the current processing time exceeds its end-value.

stream.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
Sliding processing time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows "slide" by the provided value (1 second in the example), and hence overlap. This assigner comes with a default trigger that fires for a window a window when the current processing time exceeds its end-value.

stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))

The Trigger specifies when the function that comes after the window clause (e.g., sum, count) is evaluated (“fires”) for each window. If a trigger is not specified, a default trigger for each window type is used (that is part of the definition of the WindowAssigner). Flink comes bundled with a set of triggers if the ones that windows use by default do not fit the application. You can write your own trigger by implementing the Trigger interface. Note that specifying a trigger will override the default trigger of the window assigner.

Transformation Description
Processing time trigger

A window is fired when the current processing time exceeds its end-value. The elements on the triggered window are henceforth discarded.

windowedStream.trigger(ProcessingTimeTrigger.create());
Watermark trigger

A window is fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are henceforth discarded.

windowedStream.trigger(EventTimeTrigger.create());
Continuous processing time trigger

A window is periodically considered for being fired (every 5 seconds in the example). The window is actually fired only when the current processing time exceeds its end-value. The elements on the triggered window are retained.

windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)));
Continuous watermark time trigger

A window is periodically considered for being fired (every 5 seconds in the example). A window is actually fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are retained.

windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)));
Count trigger

A window is fired when it has more than a certain number of elements (1000 below). The elements of the triggered window are retained.

windowedStream.trigger(CountTrigger.of(1000));
Purging trigger

Takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering.

windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000)));
Delta trigger

A window is periodically considered for being fired (every 5000 milliseconds in the example). A window is actually fired when the value of the last added element exceeds the value of the first element inserted in the window according to a `DeltaFunction`.

windowedStream.trigger(new DeltaTrigger.of(5000.0, new DeltaFunction<Double>() {
    @Override
    public double getDelta (Double old, Double new) {
        return (new - old > 0.01);
    }
}));
Transformation Description
Processing time trigger

A window is fired when the current processing time exceeds its end-value. The elements on the triggered window are henceforth discarded.

windowedStream.trigger(ProcessingTimeTrigger.create);
Watermark trigger

A window is fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are henceforth discarded.

windowedStream.trigger(EventTimeTrigger.create);
Continuous processing time trigger

A window is periodically considered for being fired (every 5 seconds in the example). The window is actually fired only when the current processing time exceeds its end-value. The elements on the triggered window are retained.

windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)));
Continuous watermark time trigger

A window is periodically considered for being fired (every 5 seconds in the example). A window is actually fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are retained.

windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)));
Count trigger

A window is fired when it has more than a certain number of elements (1000 below). The elements of the triggered window are retained.

windowedStream.trigger(CountTrigger.of(1000));
Purging trigger

Takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering.

windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000)));
Delta trigger

A window is periodically considered for being fired (every 5000 milliseconds in the example). A window is actually fired when the value of the last added element exceeds the value of the first element inserted in the window according to a `DeltaFunction`.

windowedStream.trigger(DeltaTrigger.of(5000.0, { (old,new) => new - old > 0.01 }))

After the trigger fires, and before the function (e.g., sum, count) is applied to the window contents, an optional Evictor removes some elements from the beginning of the window before the remaining elements are passed on to the function. Flink comes bundled with a set of evictors You can write your own evictor by implementing the Evictor interface.

Transformation Description
Time evictor

Evict all elements from the beginning of the window, so that elements from end-value - 1 second until end-value are retained (the resulting window size is 1 second).

triggeredStream.evictor(TimeEvictor.of(Time.seconds(1)));
Count evictor

Retain 1000 elements from the end of the window backwards, evicting all others.

triggeredStream.evictor(CountEvictor.of(1000));
Delta evictor

Starting from the beginning of the window, evict elements until an element with value lower than the value of the last element is found (by a threshold and a DeltaFunction).

triggeredStream.evictor(DeltaEvictor.of(5000, new DeltaFunction<Double>() {
  public double (Double oldValue, Double newValue) {
      return newValue - oldValue;
  }
}));
Transformation Description
Time evictor

Evict all elements from the beginning of the window, so that elements from end-value - 1 second until end-value are retained (the resulting window size is 1 second).

triggeredStream.evictor(TimeEvictor.of(Time.seconds(1)));
Count evictor

Retain 1000 elements from the end of the window backwards, evicting all others.

triggeredStream.evictor(CountEvictor.of(1000));
Delta evictor

Starting from the beginning of the window, evict elements until an element with value lower than the value of the last element is found (by a threshold and a DeltaFunction).

windowedStream.evictor(DeltaEvictor.of(5000.0, { (old,new) => new - old > 0.01 }))

Recipes for Building Windows

The mechanism of window assigner, trigger, and evictor is very powerful, and it allows you to define many different kinds of windows. Flink’s basic window constructs are, in fact, syntactic sugar on top of the general mechanism. Below is how some common types of windows can be constructed using the general mechanism

Window type Definition
Tumbling count window
stream.countWindow(1000)
stream.window(GlobalWindows.create())
  .trigger(PurgingTrigger.of(CountTrigger.of(size)))
Sliding count window
stream.countWindow(1000, 100)
stream.window(GlobalWindows.create())
  .evictor(CountEvictor.of(1000))
  .trigger(CountTrigger.of(100))
Tumbling event time window
stream.timeWindow(Time.seconds(5))
stream.window(TumblingEventTimeWindows.of((Time.seconds(5)))
  .trigger(EventTimeTrigger.create())
Sliding event time window
stream.timeWindow(Time.seconds(5), Time.seconds(1))
stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
  .trigger(EventTimeTrigger.create())
Tumbling processing time window
stream.timeWindow(Time.seconds(5))
stream.window(TumblingProcessingTimeWindows.of((Time.seconds(5)))
  .trigger(ProcessingTimeTrigger.create())
Sliding processing time window
stream.timeWindow(Time.seconds(5), Time.seconds(1))
stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
  .trigger(ProcessingTimeTrigger.create())

Windows on Unkeyed Data Streams

You can also define windows on regular (non-keyed) data streams using the windowAll transformation. These windowed data streams have all the capabilities of keyed windowed data streams, but are evaluated at a single task (and hence at a single computing node). The syntax for defining triggers and evictors is exactly the same:

nonKeyedStream
    .windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    .trigger(CountTrigger.of(100))
    .evictor(CountEvictor.of(10));
nonKeyedStream
    .windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    .trigger(CountTrigger.of(100))
    .evictor(CountEvictor.of(10))

Basic window definitions are also available for windows on non-keyed streams:


Transformation Description
Tumbling time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time used is controlled by the StreamExecutionEnvironment.

nonKeyedStream.timeWindowAll(Time.seconds(5));

Sliding time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) The notion of time used is controlled by the StreamExecutionEnvironment.

nonKeyedStream.timeWindowAll(Time.seconds(5), Time.seconds(1));

Tumbling count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.

nonKeyedStream.countWindowAll(1000)

Sliding count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements).

nonKeyedStream.countWindowAll(1000, 100)


Transformation Description
Tumbling time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time used is controlled by the StreamExecutionEnvironment.

nonKeyedStream.timeWindowAll(Time.seconds(5));

Sliding time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) The notion of time used is controlled by the StreamExecutionEnvironment.

nonKeyedStream.timeWindowAll(Time.seconds(5), Time.seconds(1));

Tumbling count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.

nonKeyedStream.countWindowAll(1000)

Sliding count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements).

nonKeyedStream.countWindowAll(1000, 100)