The ProcessFunction is a low-level stream processing operation, giving access to the basic building blocks of
all (acyclic) streaming applications:
events (stream elements)
state (fault-tolerant, consistent, only on keyed stream)
timers (event time and processing time, only on keyed stream)
The ProcessFunction can be thought of as a FlatMapFunction with access to keyed state and timers. It handles events
by being invoked for each event received in the input stream(s).
For fault-tolerant state, the ProcessFunction gives access to Flink’s keyed state, accessible via the
RuntimeContext, similar to the way other stateful functions can access keyed state.
The timers allow applications to react to changes in processing time and in event time.
Every call to the function processElement(...) gets a Context object which gives access to the element’s
event time timestamp, and to the TimerService. The TimerService can be used to register callbacks for future
event-/processing-time instants. When a timer’s particular time is reached, the onTimer(...) method is
called. During that call, all states are again scoped to the key with which the timer was created, allowing
timers to manipulate keyed state.
Note If you want to access keyed state and timers you have
to apply the ProcessFunction on a keyed stream:
To realize low-level operations on two inputs, applications can use CoProcessFunction. This
function is bound to two different inputs and gets individual calls to processElement1(...) and
processElement2(...) for records from the two different inputs.
Implementing a low level join typically follows this pattern:
Create a state object for one input (or both)
Update the state upon receiving elements from its input
Upon receiving elements from the other input, probe the state and produce the joined result
For example, you might be joining customer data to financial trades,
while keeping state for the customer data. If you care about having
complete and deterministic joins in the face of out-of-order events,
you can use a timer to evaluate and emit the join for a trade when the
watermark for the customer data stream has passed the time of that
The following example maintains counts per key, and emits a key/count pair whenever a minute passes (in event time) without an update for that key:
The count, key, and last-modification-timestamp are stored in a ValueState, which is implicitly scoped by key.
For each record, the ProcessFunction increments the counter and sets the last-modification timestamp
The function also schedules a callback one minute into the future (in event time)
Upon each callback, it checks the callback’s event time timestamp against the last-modification time of the stored count
and emits the key/count if they match (i.e., no further update occurred during that minute)
Note This simple example could have been implemented with
session windows. We use ProcessFunction here to illustrate the basic pattern it provides.
NOTE: Before Flink 1.4.0, when called from a processing-time timer, the ProcessFunction.onTimer() method sets
the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it’s
harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic
depends on this wrong timestamp highly likely is unintendedly faulty. So we’ve decided to fix it. Upon upgrading to 1.4.0, Flink jobs
that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic.
KeyedProcessFunction, as an extension of ProcessFunction, gives access to the key of timers in its onTimer(...)
Every timer registered at the TimerService via registerEventTimeTimer() or
registerProcessingTimeTimer() will be stored on the Java heap and enqueued for execution. There is,
however, a maximum of one timer per key and timestamp at a millisecond resolution and thus, in the
worst case, every key may have a timer for each upcoming millisecond. Even if you do not do any
processing for outdated timers in onTimer, this may put a significant burden on the
Since there is only one timer per key and timestamp, however, you may coalesce timers by reducing the
timer resolution. For a timer resolution of 1 second (event or processing time), for example, you
can round down the target time to full seconds and therefore allow the timer to fire at most 1
second earlier but not later than with millisecond accuracy. As a result, there would be at most
one timer for each combination of key and timestamp:
Since event-time timers only fire with watermarks coming in, you may also schedule and coalesce
these timers with the next watermark by using the current one: