public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
AbstractKeyedStateBackend
that stores its state in RocksDB
and will serialize state to
streams provided by a CheckpointStreamFactory
upon
checkpointing. This state backend can store very large state that exceeds memory and spills
to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe.Modifier and Type | Field and Description |
---|---|
protected org.rocksdb.RocksDB |
db
Our RocksDB data base, this is used by the actual subclasses of
AbstractRocksDBState
to store state. |
cancelStreamRegistry, currentKey, keyGroupRange, keySerializer, keyValueStatesByName, kvStateRegistry, numberOfKeyGroups, userCodeClassLoader
Constructor and Description |
---|
RocksDBKeyedStateBackend(String operatorIdentifier,
ClassLoader userCodeClassLoader,
File instanceBasePath,
org.rocksdb.DBOptions dbOptions,
org.rocksdb.ColumnFamilyOptions columnFamilyOptions,
TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
ExecutionConfig executionConfig,
boolean enableIncrementalCheckpointing) |
Modifier and Type | Method and Description |
---|---|
protected <N,T,ACC,R> |
createAggregatingState(TypeSerializer<N> namespaceSerializer,
AggregatingStateDescriptor<T,ACC,R> stateDesc)
Creates and returns a new
AggregatingState . |
protected <N,T,ACC> InternalFoldingState<N,T,ACC> |
createFoldingState(TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T,ACC> stateDesc)
Creates and returns a new
FoldingState . |
protected <N,T> InternalListState<N,T> |
createListState(TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc)
Creates and returns a new
ListState . |
protected <N,UK,UV> InternalMapState<N,UK,UV> |
createMapState(TypeSerializer<N> namespaceSerializer,
MapStateDescriptor<UK,UV> stateDesc)
Creates and returns a new
MapState . |
protected <N,T> InternalReducingState<N,T> |
createReducingState(TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<T> stateDesc)
Creates and returns a new
ReducingState . |
protected <N,T> InternalValueState<N,T> |
createValueState(TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<T> stateDesc)
Creates and returns a new
ValueState . |
void |
dispose()
Should only be called by one thread, and only after all accesses to the DB happened.
|
protected <N,S> org.rocksdb.ColumnFamilyHandle |
getColumnFamily(StateDescriptor<?,S> descriptor,
TypeSerializer<N> namespaceSerializer)
Creates a column family handle for use with a k/v state.
|
File |
getInstanceBasePath()
Only visible for testing, DO NOT USE.
|
int |
getKeyGroupPrefixBytes() |
void |
notifyCheckpointComplete(long completedCheckpointId)
This method is called as a notification once a distributed checkpoint has been completed.
|
int |
numStateEntries()
Returns the total number of state entries across all keys/namespaces.
|
void |
restore(Collection<KeyedStateHandle> restoreState)
Restores state that was previously snapshotted from the provided parameters.
|
RunnableFuture<KeyedStateHandle> |
snapshot(long checkpointId,
long timestamp,
CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions)
Triggers an asynchronous snapshot of the keyed state backend from RocksDB.
|
boolean |
supportsAsynchronousSnapshots() |
close, getCurrentKey, getCurrentKeyGroupIndex, getKeyGroupRange, getKeySerializer, getNumberOfKeyGroups, getOrCreateKeyedState, getPartitionedState, setCurrentKey
protected org.rocksdb.RocksDB db
AbstractRocksDBState
to store state. The different k/v states that we have don't each have their own RocksDB
instance. They all write to this instance but to their own column family.public RocksDBKeyedStateBackend(String operatorIdentifier, ClassLoader userCodeClassLoader, File instanceBasePath, org.rocksdb.DBOptions dbOptions, org.rocksdb.ColumnFamilyOptions columnFamilyOptions, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, boolean enableIncrementalCheckpointing) throws IOException
IOException
public void dispose()
dispose
in interface KeyedStateBackend<K>
dispose
in class AbstractKeyedStateBackend<K>
public int getKeyGroupPrefixBytes()
public RunnableFuture<KeyedStateHandle> snapshot(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception
dispose()
. For each backend, this method must always
be called by the same thread.checkpointId
- The Id of the checkpoint.timestamp
- The timestamp of the checkpoint.streamFactory
- The factory that we can use for writing our state to streams.checkpointOptions
- Options for how to perform this checkpoint.Exception
public void restore(Collection<KeyedStateHandle> restoreState) throws Exception
Snapshotable
restoreState
- the old state to restore.Exception
public void notifyCheckpointComplete(long completedCheckpointId)
CheckpointListener
completedCheckpointId
- The ID of the checkpoint that has been completed.protected <N,S> org.rocksdb.ColumnFamilyHandle getColumnFamily(StateDescriptor<?,S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException
This also checks whether the StateDescriptor
for a state matches the one
that we checkpointed, i.e. is already in the map of column families.
IOException
StateMigrationException
protected <N,T> InternalValueState<N,T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception
AbstractKeyedStateBackend
ValueState
.createValueState
in class AbstractKeyedStateBackend<K>
N
- The type of the namespace.T
- The type of the value that the ValueState
can store.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
protected <N,T> InternalListState<N,T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception
AbstractKeyedStateBackend
ListState
.createListState
in class AbstractKeyedStateBackend<K>
N
- The type of the namespace.T
- The type of the values that the ListState
can store.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
protected <N,T> InternalReducingState<N,T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception
AbstractKeyedStateBackend
ReducingState
.createReducingState
in class AbstractKeyedStateBackend<K>
N
- The type of the namespace.T
- The type of the values that the ListState
can store.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
protected <N,T,ACC,R> InternalAggregatingState<N,T,R> createAggregatingState(TypeSerializer<N> namespaceSerializer, AggregatingStateDescriptor<T,ACC,R> stateDesc) throws Exception
AbstractKeyedStateBackend
AggregatingState
.createAggregatingState
in class AbstractKeyedStateBackend<K>
N
- The type of the namespace.T
- The type of the values that the ListState
can store.namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
protected <N,T,ACC> InternalFoldingState<N,T,ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T,ACC> stateDesc) throws Exception
AbstractKeyedStateBackend
FoldingState
.createFoldingState
in class AbstractKeyedStateBackend<K>
N
- The type of the namespace.T
- Type of the values folded into the stateACC
- Type of the value in the statenamespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
protected <N,UK,UV> InternalMapState<N,UK,UV> createMapState(TypeSerializer<N> namespaceSerializer, MapStateDescriptor<UK,UV> stateDesc) throws Exception
AbstractKeyedStateBackend
MapState
.createMapState
in class AbstractKeyedStateBackend<K>
N
- The type of the namespace.UK
- Type of the keys in the stateUV
- Type of the values in the state *namespaceSerializer
- TypeSerializer for the state namespace.stateDesc
- The StateDescriptor
that contains the name of the state.Exception
public File getInstanceBasePath()
public boolean supportsAsynchronousSnapshots()
supportsAsynchronousSnapshots
in class AbstractKeyedStateBackend<K>
@VisibleForTesting public int numStateEntries()
AbstractKeyedStateBackend
numStateEntries
in class AbstractKeyedStateBackend<K>
Copyright © 2014–2018 The Apache Software Foundation. All rights reserved.