Apache Flink’s State Processor API provides powerful functionality to reading, writing, and modifing savepoints and checkpoints using Flink’s batch DataSet api.
This is useful for tasks such as analyzing state for interesting patterns, troubleshooting or auditing jobs by checking for discrepancies, and bootstrapping state for new applications.
To understand how to best interact with savepoints in a batch context it is important to have a clear mental model of how the data in Flink state relates to a traditional relational database.
A database can be thought of as one or more namespaces, each containing a collection of tables.
Those tables in turn contain columns whose values have some intrinsic relationship between them, such as being scoped under the same key.
A savepoint represents the state of a Flink job at a particular point in time which is made up of many operators.
Those operators contain various kinds of state, both partitioned or keyed state, and non-partitioned or operator state.
This job contains multiple operators along with various kinds of state.
When analyzing that state we can first scope data by its operator, named by setting its uid.
Within each operator we can look at the registered states.
CurrencyConverter has a broadcast state, which is a type of non-partitioned operator state.
In general, there is no relationship between any two elements in an operator state and so we can look at each value as being its own row.
Contrast this with Summarize, which contains two keyed states.
Because both states are scoped under the same key we can safely assume there exists some relationship between the two values.
Therefore, keyed state is best understood as a single table per operator containing one key column along with n value columns, one for each registered state.
All of this means that the state for this job could be described using the following pseudo-sql commands.
In general, the savepoint ↔ database relationship can be summarized as:
* A savepoint is a database
* An operator is a namespace named by its uid
* Each operator state represents a single table
* Each element in an operator state represents a single row in that table
* Each operator containing keyed state has a single “keyed_state” table
* Each keyed_state table has one key column mapping the key value of the operator
* Each registered state represents a single column in the table
* Each row in the table maps to a single key
Reading state begins by specifiying the path to a valid savepoint or checkpoint along with the StateBackend that should be used to restore the data.
The compatability guarantees for restoring state are identical to those when restoring a DataStream application.
When reading operator state, simply specify the operator uid, state name, and type information.
A custom TypeSerializer may also be specified if one was used in the StateDescriptor for the state.
When reading keyed state, users specify a KeyedStateReaderFunction to allow reading arbitrary columns and complex state types such as ListState, MapState, and AggregatingState.
This means if an operator contains a stateful process function such as:
Then it can read by defining an output type and corresponding KeyedStateReaderFunction.
Note: When using a KeyedStateReaderFunction all state descriptors must be registered eagerly inside of open. Any attempt to call RuntimeContext#getState, RuntimeContext#getListState, or RuntimeContext#getMapState will result in a RuntimeException.
Writing New Savepoints
State writers are based around the abstraction of Savepoint, where one Savepoint may have many operators and the state for any particular operator is created using a BootstrapTransformation.
A BootstrapTransformation starts with a DataSet containing the values that are to be written into state.
The transformation may be optionally keyed depending on whether or not you are writing keyed or operator state.
Finally a bootstrap function is applied depending to the transformation; Flink supplies KeyedStateBootstrapFunction for writing keyed state, StateBootstrapFunction for writing non keyed state, and BroadcastStateBootstrapFunction for writing broadcast state.
The KeyedStateBootstrapFunction supports setting event time and processing time timers.
The timers will not fire inside the bootstrap function and only become active once restored within a DataStream application.
If a processing time timer is set but the state is not restored until after that time has passed, the timer will fire immediatly upon start.
Once one or more transformations have been created they may be combined into a single Savepoint.
Savepoint’s are created using a state backend and max parallelism, they may contain any number of operators.
Besides creating a savepoint from scratch, you can base on off an existing savepoint such as when bootstrapping a single new operator for an existing job.
Note: When basing a new savepoint on existing state, the state processor api makes a shallow copy of the pointers to the existing operators. This means that both savepoints share state and one cannot be deleted without corrupting the other!