There are a few APIs that have been changed since Flink 1.2. Most of the changes are documented in their specific documentations. The following is a consolidated list of API changes and links to details for migration when upgrading to Flink 1.3.
This would be relevant mostly for users implementing custom
TypeSerializers for their state.
Since Flink 1.3, two additional methods have been added that are related to serializer compatibility across savepoint restores. Please see Handling serializer upgrades and compatibility for further details on how to implement these methods.
ProcessFunctionis always a
In Flink 1.2,
ProcessFunction and its rich variant
RichProcessFunction was introduced.
Since Flink 1.3,
RichProcessFunction was removed and
ProcessFunction is now always a
RichFunction with access to
the lifecycle methods and runtime context.
The CEP library in Flink 1.3 ships with a number of new features which have led to some changes in the API. Please visit the CEP Migration docs for details.
In Flink 1.3, to make sure that users can use their own custom logging framework, core Flink artifacts are now clean of specific logger dependencies.
Example and quickstart archtypes already have loggers specified and should not be affected.
For other custom projects, make sure to add logger dependencies. For example, in Maven’s
pom.xml, you can add:
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
As mentioned in the State documentation, Flink has two types of state: keyed and non-keyed state (also called operator state). Both types are available to both operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1 function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern the deprecation of the aligned window operators from Flink 1.1 (see Aligned Processing Time Window Operators).
The migration process will serve two goals:
allow your functions to take advantage of the new features introduced in Flink 1.2, such as rescaling,
make sure that your new Flink 1.2 job will be able to resume execution from a savepoint generated by its Flink 1.1 predecessor.
After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2 simply by taking a savepoint with your Flink 1.1 job and giving it to your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where its Flink 1.1 predecessor left off.
As running examples for the remainder of this document we will use the
CountMapper and the
functions. The first is an example of a function with keyed state, while
the second has non-keyed state. The code for the aforementioned two functions in Flink 1.1 is presented below:
CountMapper is a
RichFlatMapFunction which assumes a grouped-by-key input stream of the form
(word, 1). The function keeps a counter for each incoming key (
ValueState<Integer> counter) and if
the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted
containing the word itself and the number of occurrences.
BufferingSink is a
SinkFunction that receives elements (potentially the output of the
and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink.
This is a common way to avoid many expensive calls to a database or an external storage system. To do the
buffering in a fault-tolerant manner, the buffered elements are kept in a list (
bufferedElements) which is
To leverage the new features of Flink 1.2, the code above should be modified to use the new state abstractions. After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you are guaranteed that the new version of your job will start from where its predecessor left off.
Keyed State: Something to note before delving into the details of the migration process is that if your function has only keyed state, then the exact same code from Flink 1.1 also works for Flink 1.2 with full support for the new features and full backwards compatibility. Changes could be made just for better code organization, but this is just a matter of style.
With the above said, the rest of this section focuses on the non-keyed state.
The first modification is the transition from the old
Checkpointed<T extends Serializable> state interface
to the new ones. In Flink 1.2, a stateful function can implement either the more general
interface, or the
ListCheckpointed<T extends Serializable> interface, which is semantically closer to the old
In both cases, the non-keyed state is expected to be a
List of serializable objects, independent from each other,
thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which
non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the
(test1, 2) and
(test2, 2), when increasing the parallelism to 2,
(test1, 2) may end up in task 0,
(test2, 2) will go to task 1.
More details on the principles behind rescaling of both keyed state and non-keyed state can be found in the State documentation.
ListCheckpointed interface requires the implementation of two methods:
Their semantics are the same as their counterparts in the old
Checkpointed interface. The only difference
is that now
snapshotState() should return a list of objects to checkpoint, as stated earlier, and
restoreState has to handle this list upon recovery. If the state is not re-partitionable, you can always
Collections.singletonList(MY_STATE) in the
snapshotState(). The updated code for
is included below:
As shown in the code, the updated function also implements the
CheckpointedRestoring interface. This is for backwards
compatibility reasons and more details will be explained at the end of this section.
CheckpointedFunction interface requires again the implementation of two methods:
As in Flink 1.1,
snapshotState() is called whenever a checkpoint is performed, but now
initializeState() (which is
the counterpart of the
restoreState()) is called every time the user-defined function is initialized, rather than only
in the case that we are recovering from a failure. Given this,
initializeState() is not only the place where different
types of state are initialized, but also where state recovery logic is included. An implementation of the
CheckpointedFunction interface for
BufferingSink is presented below.
initializeState takes as argument a
FunctionInitializationContext. This is used to initialize
the non-keyed state “container”. This is a container of type
ListState where the non-keyed state objects
are going to be stored upon checkpointing:
this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");
After initializing the container, we use the
isRestored() method of the context to check if we are
recovering after a failure. If this is
true, i.e. we are recovering, the restore logic is applied.
As shown in the code of the modified
ListState recovered during state
initialization is kept in a class variable for future use in
snapshotState(). There the
ListState is cleared
of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.
As a side note, the keyed state can also be initialized in the
initializeState() method. This can be done
FunctionInitializationContext given as argument, instead of the
RuntimeContext, which is the case
for Flink 1.1. If the
CheckpointedFunction interface was to be used in the
open() method could be removed and the new
would look like this:
Notice that the
snapshotState() method is empty as Flink itself takes care of snapshotting managed keyed state
So far we have seen how to modify our functions to take advantage of the new features introduced by Flink 1.2. The question that remains is “Can I make sure that my modified (Flink 1.2) job will start from where my already running job from Flink 1.1 stopped?”.
The answer is yes, and the way to do it is pretty straightforward. For the keyed state, you have to do nothing.
Flink will take care of restoring the state from Flink 1.1. For the non-keyed state, your new function has to
CheckpointedRestoring interface, as shown in the code above. This has a single method, the
restoreState() from the old
Checkpointed interface from Flink 1.1. As shown in the modified code of
restoreState() method is identical to its predecessor.
In Flink 1.1, and only when operating on processing time with no specified evictor or trigger,
timeWindow() on a keyed stream would instantiate a special type of
WindowOperator. This could be
AggregatingProcessingTimeWindowOperator or an
AccumulatingProcessingTimeWindowOperator. Both of
these operators are referred to as aligned window operators as they assume their input elements arrive in
order. This is valid when operating in processing time, as elements get as timestamp the wall-clock time at
the moment they arrive at the window operator. These operators were restricted to using the memory state backend, and
had optimized data structures for storing the per-window elements which leveraged the in-order input element arrival.
In Flink 1.2, the aligned window operators are deprecated, and all windowing operations go through the generic
WindowOperator. This migration requires no change in the code of your Flink 1.1 job, as Flink will transparently
read the state stored by the aligned window operators in your Flink 1.1 savepoint, translate it into a format
that is compatible with the generic
WindowOperator, and resume execution using the generic
Note Although deprecated, you can still use the aligned window operators
in Flink 1.2 through special
WindowAssigners introduced for exactly this purpose. These assigners are the
SlidingAlignedProcessingTimeWindows and the
TumblingAlignedProcessingTimeWindows assigners, for sliding and tumbling
windows respectively. A Flink 1.2 job that uses aligned windowing has to be a new job, as there is no way to
resume execution from a Flink 1.1 savepoint while using these operators.
Attention The aligned window operators provide no rescaling capabilities and no backwards compatibility with Flink 1.1.
The code to use the aligned window operators in Flink 1.2 is presented below: