API Migration Guides

As mentioned in the State documentation, Flink has two types of state: keyed and non-keyed state (also called operator state). Both types are available to both operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1 function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern the deprecation of the aligned window operators from Flink 1.1 (see Aligned Processing Time Window Operators).

The migration process will serve two goals:

  1. allow your functions to take advantage of the new features introduced in Flink 1.2, such as rescaling,

  2. make sure that your new Flink 1.2 job will be able to resume execution from a savepoint generated by its Flink 1.1 predecessor.

After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2 simply by taking a savepoint with your Flink 1.1 job and giving it to your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where its Flink 1.1 predecessor left off.

Example User Functions

As running examples for the remainder of this document we will use the CountMapper and the BufferingSink functions. The first is an example of a function with keyed state, while the second has non-keyed state. The code for the aforementioned two functions in Flink 1.1 is presented below:

public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

    private transient ValueState<Integer> counter;

    private final int numberElements;

    public CountMapper(int numberElements) {
        this.numberElements = numberElements;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        counter = getRuntimeContext().getState(
            new ValueStateDescriptor<>("counter", Integer.class, 0));
    }

    @Override
    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
        int count = counter.value() + 1;
        counter.update(count);

        if (count % numberElements == 0) {
            out.collect(Tuple2.of(value.f0, count));
            counter.update(0); // reset to 0
        }
    }
}

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
    Checkpointed<ArrayList<Tuple2<String, Integer>>> {

    private final int threshold;

    private ArrayList<Tuple2<String, Integer>> bufferedElements;

    BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
	        // send it to the sink
	    }
	    bufferedElements.clear();
	}
    }

    @Override
    public ArrayList<Tuple2<String, Integer>> snapshotState(
        long checkpointId, long checkpointTimestamp) throws Exception {
	    return bufferedElements;
    }

    @Override
    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
        bufferedElements.addAll(state);
    }
}

The CountMapper is a RichFlatMapFuction which assumes a grouped-by-key input stream of the form (word, 1). The function keeps a counter for each incoming key (ValueState<Integer> counter) and if the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted containing the word itself and the number of occurrences.

The BufferingSink is a SinkFunction that receives elements (potentially the output of the CountMapper) and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink. This is a common way to avoid many expensive calls to a database or an external storage system. To do the buffering in a fault-tolerant manner, the buffered elements are kept in a list (bufferedElements) which is periodically checkpointed.

State API Migration

To leverage the new features of Flink 1.2, the code above should be modified to use the new state abstractions. After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you are guaranteed that the new version of your job will start from where its predecessor left off.

Keyed State: Something to note before delving into the details of the migration process is that if your function has only keyed state, then the exact same code from Flink 1.1 also works for Flink 1.2 with full support for the new features and full backwards compatibility. Changes could be made just for better code organization, but this is just a matter of style.

With the above said, the rest of this section focuses on the non-keyed state.

Rescaling and new state abstractions

The first modification is the transition from the old Checkpointed<T extends Serializable> state interface to the new ones. In Flink 1.2, a stateful function can implement either the more general CheckpointedFunction interface, or the ListCheckpointed<T extends Serializable> interface, which is semantically closer to the old Checkpointed one.

In both cases, the non-keyed state is expected to be a List of serializable objects, independent from each other, thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the BufferingSink contains elements (test1, 2) and (test2, 2), when increasing the parallelism to 2, (test1, 2) may end up in task 0, while (test2, 2) will go to task 1.

More details on the principles behind rescaling of both keyed state and non-keyed state can be found in the State documentation.

ListCheckpointed

The ListCheckpointed interface requires the implementation of two methods:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;

Their semantics are the same as their counterparts in the old Checkpointed interface. The only difference is that now snapshotState() should return a list of objects to checkpoint, as stated earlier, and restoreState has to handle this list upon recovery. If the state is not re-partitionable, you can always return a Collections.singletonList(MY_STATE) in the snapshotState(). The updated code for BufferingSink is included below:

public class BufferingSinkListCheckpointed implements
        SinkFunction<Tuple2<String, Integer>>,
        ListCheckpointed<Tuple2<String, Integer>>,
        CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSinkListCheckpointed(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        this.bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public List<Tuple2<String, Integer>> snapshotState(
            long checkpointId, long timestamp) throws Exception {
        return this.bufferedElements;
    }

    @Override
    public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
        if (!state.isEmpty()) {
            this.bufferedElements.addAll(state);
        }
    }

    @Override
    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
        // this is from the CheckpointedRestoring interface.
        this.bufferedElements.addAll(state);
    }
}

As shown in the code, the updated function also implements the CheckpointedRestoring interface. This is for backwards compatibility reasons and more details will be explained at the end of this section.

CheckpointedFunction

The CheckpointedFunction interface requires again the implementation of two methods:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

As in Flink 1.1, snapshotState() is called whenever a checkpoint is performed, but now initializeState() (which is the counterpart of the restoreState()) is called every time the user-defined function is initialized, rather than only in the case that we are recovering from a failure. Given this, initializeState() is not only the place where different types of state are initialized, but also where state recovery logic is included. An implementation of the CheckpointedFunction interface for BufferingSink is presented below.

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
        CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        checkpointedState = context.getOperatorStateStore().
            getSerializableListState("buffered-elements");

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }

    @Override
    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
        // this is from the CheckpointedRestoring interface.
        this.bufferedElements.addAll(state);
    }
}

The initializeState takes as argument a FunctionInitializationContext. This is used to initialize the non-keyed state “container”. This is a container of type ListState where the non-keyed state objects are going to be stored upon checkpointing:

this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");

After initializing the container, we use the isRestored() method of the context to check if we are recovering after a failure. If this is true, i.e. we are recovering, the restore logic is applied.

As shown in the code of the modified BufferingSink, this ListState recovered during state initialization is kept in a class variable for future use in snapshotState(). There the ListState is cleared of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.

As a side note, the keyed state can also be initialized in the initializeState() method. This can be done using the FunctionInitializationContext given as argument, instead of the RuntimeContext, which is the case for Flink 1.1. If the CheckpointedFunction interface was to be used in the CountMapper example, the old open() method could be removed and the new snapshotState() and initializeState() methods would look like this:

public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
        implements CheckpointedFunction {

    private transient ValueState<Integer> counter;

    private final int numberElements;

    public CountMapper(int numberElements) {
        this.numberElements = numberElements;
    }

    @Override
    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
        int count = counter.value() + 1;
        counter.update(count);

        if (count % numberElements == 0) {
            out.collect(Tuple2.of(value.f0, count));
            counter.update(0); // reset to 0
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // all managed, nothing to do.
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        counter = context.getKeyedStateStore().getState(
            new ValueStateDescriptor<>("counter", Integer.class, 0));
    }
}

Notice that the snapshotState() method is empty as Flink itself takes care of snapshotting managed keyed state upon checkpointing.

So far we have seen how to modify our functions to take advantage of the new features introduced by Flink 1.2. The question that remains is “Can I make sure that my modified (Flink 1.2) job will start from where my already running job from Flink 1.1 stopped?”.

The answer is yes, and the way to do it is pretty straightforward. For the keyed state, you have to do nothing. Flink will take care of restoring the state from Flink 1.1. For the non-keyed state, your new function has to implement the CheckpointedRestoring interface, as shown in the code above. This has a single method, the familiar restoreState() from the old Checkpointed interface from Flink 1.1. As shown in the modified code of the BufferingSink, the restoreState() method is identical to its predecessor.

Aligned Processing Time Window Operators

In Flink 1.1, and only when operating on processing time with no specified evictor or trigger, the command timeWindow() on a keyed stream would instantiate a special type of WindowOperator. This could be either an AggregatingProcessingTimeWindowOperator or an AccumulatingProcessingTimeWindowOperator. Both of these operators are referred to as aligned window operators as they assume their input elements arrive in order. This is valid when operating in processing time, as elements get as timestamp the wall-clock time at the moment they arrive at the window operator. These operators were restricted to using the memory state backend, and had optimized data structures for storing the per-window elements which leveraged the in-order input element arrival.

In Flink 1.2, the aligned window operators are deprecated, and all windowing operations go through the generic WindowOperator. This migration requires no change in the code of your Flink 1.1 job, as Flink will transparently read the state stored by the aligned window operators in your Flink 1.1 savepoint, translate it into a format that is compatible with the generic WindowOperator, and resume execution using the generic WindowOperator.

Note Although deprecated, you can still use the aligned window operators in Flink 1.2 through special WindowAssigners introduced for exactly this purpose. These assigners are the SlidingAlignedProcessingTimeWindows and the TumblingAlignedProcessingTimeWindows assigners, for sliding and tumbling windows respectively. A Flink 1.2 job that uses aligned windowing has to be a new job, as there is no way to resume execution from a Flink 1.1 savepoint while using these operators.

Attention The aligned window operators provide no rescaling capabilities and no backwards compatibility with Flink 1.1.

The code to use the aligned window operators in Flink 1.2 is presented below:

// for tumbling windows
DataStream<Tuple2<String, Integer>> window1 = source
	.keyBy(0)
	.window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
	.apply(your-function)

// for sliding windows
DataStream<Tuple2<String, Integer>> window1 = source
	.keyBy(0)
	.window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
	.apply(your-function)
// for tumbling windows
val window1 = source
    .keyBy(0)
    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
    .apply(your-function)

// for sliding windows
val window2 = source
    .keyBy(0)
    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
    .apply(your-function)