In this section you will learn about the APIs that Flink provides for working
with event time timestamps and watermarks. For an introduction to event
time, processing time, and ingestion time, please refer to the
introduction to event time.
In order to work with event time, Flink needs to know the events
timestamps, meaning each element in the stream needs to have its event
timestamp assigned. This is usually done by accessing/extracting the
timestamp from some field in the element by using a TimestampAssigner.
Timestamp assignment goes hand-in-hand with generating watermarks, which tell
the system about progress in event time. You can configure this by specifying a
The Flink API expects a WatermarkStrategy that contains both a
TimestampAssigner and WatermarkGenerator. A number of common strategies
are available out of the box as static methods on WatermarkStrategy, but
users can also build their own strategies when required.
Here is the interface for completeness’ sake:
As mentioned, you usually don’t implement this interface yourself but use the
static helper methods on WatermarkStrategy for common watermark strategies or
to bundle together a custom TimestampAssigner with a WatermarkGenerator.
For example, to use bounded-out-of-orderness watermarks and a lambda function as a
timestamp assigner you use this:
(Using Scala Lambdas here currently doesn’t work because Scala is stupid and it’s hard to support this. #fus)
Specifying a TimestampAssigner is optional and in most cases you don’t
actually want to specify one. For example, when using Kafka or Kinesis you
would get timestamps directly from the Kafka/Kinesis records.
Attention: Both timestamps and watermarks
are specified as milliseconds since the Java epoch of 1970-01-01T00:00:00Z.
Using Watermark Strategies
There are two places in Flink applications where a WatermarkStrategy can be
used: 1) directly on sources and 2) after non-source operation.
The first option is preferable, because it allows sources to exploit knowledge
about shards/partitions/splits in the watermarking logic. Sources can usually
then track watermarks at a finer level and the overall watermark produced by a
source will be more accurate. Specifying a WatermarkStrategy directly on the
source usually means you have to use a source specific interface/ Refer to
Watermark Strategies and the Kafka
Connector for how this works on
a Kafka Connector and for more details about how per-partition watermarking
The second option (setting a WatermarkStrategy after arbitrary operations)
should only be used if you cannot set a strategy directly on the source:
Using a WatermarkStrategy this way takes a stream and produce a new stream
with timestamped elements and watermarks. If the original stream had timestamps
and/or watermarks already, the timestamp assigner overwrites them.
Dealing With Idle Sources
If one of the input splits/partitions/shards does not carry events for a while
this means that the WatermarkGenerator also does not get any new information
on which to base a watermark. We call this an idle input or an idle source.
This is a problem because it can happen that some of your partitions do still
carry events. In that case, the watermark will be held back, because it is
computed as the minimum over all the different parallel watermarks.
To deal with this, you can use a WatermarkStrategy that will detect idleness
and mark an input as idle. WatermarkStrategy provides a convenience helper
A TimestampAssigner is a simple function that extracts a field from an event, we therefore don’t need to look at them in detail. A WatermarkGenerator, on the other hand, is a bit more complicated to write and we will look at how you can do that in the next two sections. This is the WatermarkGenerator interface:
There are two different styles of watermark generation: periodic and
A periodic generator usually observes to the incoming events via onEvent()
and then emits a watermark when the framework calls onPeriodicEmit().
A puncutated generator will look at events in onEvent() and wait for special
marker events or punctuations that carry watermark information in the
stream. When it sees one of these events it emits a watermark immediately.
Usually, punctuated generators don’t emit a watermark from onPeriodicEmit().
We will look at how to implement generators for each style next.
Writing a Periodic WatermarkGenerator
A periodic generator observes stream events and generates
watermarks periodically (possibly depending on the stream elements, or purely
based on processing time).
The interval (every n milliseconds) in which the watermark will be generated
is defined via ExecutionConfig.setAutoWatermarkInterval(...). The
generators’s onPeriodicEmit() method will be called each time, and a new
watermark will be emitted if the returned watermark is non-null and larger than
the previous watermark.
Here we show two simple examples of watermark generators that use periodic
watermark generation. Note that Flink ships with
BoundedOutOfOrdernessWatermarks, which is a WatermarkGenerator that works
similarly to the BoundedOutOfOrdernessGenerator shown below. You can read
about using that here.
Writing a Punctuated WatermarkGenerator
A punctuated watermark generator will observe the stream of
events and emit a watermark whenever it sees a special element that carries
This is how you can implement a punctuated generator that emits a watermark
whenever an event indicates that it carries a certain marker:
Note: It is possible to
generate a watermark on every single event. However, because each watermark
causes some computation downstream, an excessive number of watermarks degrades
Watermark Strategies and the Kafka Connector
When using Apache Kafka as a data source, each Kafka
partition may have a simple event time pattern (ascending timestamps or bounded
out-of-orderness). However, when consuming streams from Kafka, multiple
partitions often get consumed in parallel, interleaving the events from the
partitions and destroying the per-partition patterns (this is inherent in how
Kafka’s consumer clients work).
In that case, you can use Flink’s Kafka-partition-aware watermark generation.
Using that feature, watermarks are generated inside the Kafka consumer, per
Kafka partition, and the per-partition watermarks are merged in the same way as
watermarks are merged on stream shuffles.
For example, if event timestamps are strictly ascending per Kafka partition,
generating per-partition watermarks with the ascending timestamps watermark
will result in perfect overall watermarks. Note, that we don’t provide a
TimestampAssigner in the example, the timestamps of the Kafka records
themselves will be used instead.
The illustrations below show how to use the per-Kafka-partition watermark
generation, and how watermarks propagate through the streaming dataflow in that
How Operators Process Watermarks
As a general rule, operators are required to completely process a given
watermark before forwarding it downstream. For example, WindowOperator will
first evaluate all windows that should be fired, and only after producing all of
the output triggered by the watermark will the watermark itself be sent
downstream. In other words, all elements produced due to occurrence of a
watermark will be emitted before the watermark.
The same rule applies to TwoInputStreamOperator. However, in this case the
current watermark of the operator is defined as the minimum of both of its
The details of this behavior are defined by the implementations of the
The Deprecated AssignerWithPeriodicWatermarks and AssignerWithPunctuatedWatermarks
Prior to introducing the current abstraction of WatermarkStrategy,
TimestampAssigner, and WatermarkGenerator, Flink used
AssignerWithPeriodicWatermarks and AssignerWithPeriodicWatermarks. You will
still see them in the API but it is recommended to use the new interfaces
because they offer a clearer separation of concerns and also unify periodic and
punctuated styles of watermark generation.