State Backends

Data Stream API 编写的程序通常以各种形式保存状态:

  • 在 Window 触发之前要么收集元素、要么聚合
  • 转换函数可以使用 key/value 格式的状态接口来存储状态
  • 转换函数可以实现 CheckpointedFunction 接口,使其本地变量具有容错能力

另请参阅 Streaming API 指南中的 状态部分

在启动 CheckPoint 机制时,状态会随着 CheckPoint 而持久化,以防止数据丢失、保障恢复时的一致性。 状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 State Backend

可用的 State Backends

Flink 内置了以下这些开箱即用的 state backends :

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

如果不设置,默认使用 MemoryStateBackend。

MemoryStateBackend

MemoryStateBackend 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子持有存储着状态值、触发器的 hash table。

在 CheckPoint 时,State Backend 对状态进行快照,并将快照信息作为 CheckPoint 应答消息的一部分发送给 JobManager(master),同时 JobManager 也将快照信息存储在堆内存中。

MemoryStateBackend 能配置异步快照。强烈建议使用异步快照来防止数据流阻塞,注意,异步快照默认是开启的。 用户可以在实例化 MemoryStateBackend 的时候,将相应布尔类型的构造参数设置为 false 来关闭异步快照(仅在 debug 的时候使用),例如:

    new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);

MemoryStateBackend 的限制:

  • 默认情况下,每个独立的状态大小限制是 5 MB。在 MemoryStateBackend 的构造器中可以增加其大小。
  • 无论配置的最大状态内存大小(MAX_MEM_STATE_SIZE)有多大,都不能大于 akka frame 大小(看配置参数)。
  • 聚合后的状态必须能够放进 JobManager 的内存中。

MemoryStateBackend 适用场景:

  • 本地开发和调试。
  • 状态很小的 Job,例如:由每次只处理一条记录的函数(Map、FlatMap、Filter 等)构成的 Job。Kafka Consumer 仅仅需要非常小的状态。

FsStateBackend

FsStateBackend 需要配置一个文件系统的 URL(类型、地址、路径),例如:”hdfs://namenode:40010/flink/checkpoints” 或 “file:///data/flink/checkpoints”。

FsStateBackend 将正在运行中的状态数据保存在 TaskManager 的内存中。CheckPoint 时,将状态快照写入到配置的文件系统目录中。 少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其写入到 CheckPoint 的元数据文件中)。

FsStateBackend 默认使用异步快照来防止 CheckPoint 写状态时对数据处理造成阻塞。 用户可以在实例化 FsStateBackend 的时候,将相应布尔类型的构造参数设置为 false 来关闭异步快照,例如:

    new FsStateBackend(path, false);

FsStateBackend 适用场景:

  • 状态比较大、窗口比较长、key/value 状态比较大的 Job。
  • 所有高可用的场景。

RocksDBStateBackend

RocksDBStateBackend 需要配置一个文件系统的 URL (类型、地址、路径),例如:”hdfs://namenode:40010/flink/checkpoints” 或 “file:///data/flink/checkpoints”。

RocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。 CheckPoint 时,整个 RocksDB 数据库被 checkpoint 到配置的文件系统目录中。 少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其存储到 CheckPoint 的元数据文件中)。

RocksDBStateBackend 只支持异步快照。

RocksDBStateBackend 的限制:

  • 由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上, 所以每个 key 和 value 最大支持 2^31 字节。 重要信息: RocksDB 合并操作的状态(例如:ListState)累积数据量大小可以超过 2^31 字节,但是会在下一次获取数据时失败。这是当前 RocksDB JNI 的限制。

RocksDBStateBackend 的适用场景:

  • 状态非常大、窗口非常长、key/value 状态非常大的 Job。
  • 所有高可用的场景。

注意,你可以保留的状态大小仅受磁盘空间的限制。与状态存储在内存中的 FsStateBackend 相比,RocksDBStateBackend 允许存储非常大的状态。 然而,这也意味着使用 RocksDBStateBackend 将会使应用程序的最大吞吐量降低。 所有的读写都必须序列化、反序列化操作,这个比基于堆内存的 state backend 的效率要低很多。

RocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Backend (见 这里)。

可以使用一些 RocksDB 的本地指标(metrics),但默认是关闭的。你能在 这里 找到关于 RocksDB 本地指标的文档。

设置 State Backend

如果没有明确指定,将使用 jobmanager 做为默认的 state backend。你能在 flink-conf.yaml 中为所有 Job 设置其他默认的 State Backend。 每一个 Job 的 state backend 配置会覆盖默认的 state backend 配置,如下所示:

设置每个 Job 的 State Backend

StreamExecutionEnvironment 可以对每个 Job 的 State Backend 进行设置,如下所示:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))

如果你想在 IDE 中使用 RocksDBStateBackend,或者需要在作业中通过编程方式动态配置 RocksDBStateBackend,必须添加以下依赖到 Flink 项目中。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.11-SNAPSHOT</version>
    <scope>provided</scope>
</dependency>
注意: 由于 RocksDB 是 Flink 默认分发包的一部分,所以如果你没在代码中使用 RocksDB,则不需要添加此依赖。而且可以在 flink-conf.yaml 文件中通过 state.backend 配置 State Backend,以及更多的 checkpointingRocksDB 特定的 参数。

设置默认的(全局的) State Backend

flink-conf.yaml 可以通过键 state.backend 设置默认的 State Backend。

可选值包括 jobmanager (MemoryStateBackend)、filesystem (FsStateBackend)、rocksdb (RocksDBStateBackend), 或使用实现了 state backend 工厂 StateBackendFactory 的类的全限定类名, 例如: RocksDBStateBackend 对应为 org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

state.checkpoints.dir 选项指定了所有 State Backend 写 CheckPoint 数据和写元数据文件的目录。 你能在 这里 找到关于 CheckPoint 目录结构的详细信息。

配置文件的部分示例如下所示:

# The backend that will be used to store operator state checkpoints

state.backend: filesystem


# Directory for storing checkpoints

state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

RocksDB State Backend Config Options

Key Default Type Description
state.backend.rocksdb.checkpoint.transfer.thread.num
1 Integer The number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend.
state.backend.rocksdb.localdir
(none) String The local directory (on the TaskManager) where RocksDB puts its files.
state.backend.rocksdb.memory.fixed-per-slot
(none) MemorySize The fixed total amount of memory, shared among all RocksDB instances per slot. This option overrides the 'state.backend.rocksdb.memory.managed' option when configured. If neither this option, nor the 'state.backend.rocksdb.memory.managed' optionare set, then each RocksDB column family state has its own memory caches (as controlled by the column family options).
state.backend.rocksdb.memory.high-prio-pool-ratio
0.1 Double The fraction of cache memory that is reserved for high-priority data like index, filter, and compression dictionary blocks. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.
state.backend.rocksdb.memory.managed
true Boolean If set, the RocksDB state backend will automatically configure itself to use the managed memory budget of the task slot, and divide the memory over write buffers, indexes, block caches, etc. That way, the three major uses of memory of RocksDB will be capped.
state.backend.rocksdb.memory.write-buffer-ratio
0.5 Double The maximum amount of memory that write buffers may take, as a fraction of the total shared memory. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.
state.backend.rocksdb.options-factory
"org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory" String The options factory class for RocksDB to create DBOptions and ColumnFamilyOptions. The default options factory is org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory, and it would read the configured options which provided in 'RocksDBConfigurableOptions'.
state.backend.rocksdb.predefined-options
"DEFAULT" String The predefined settings for RocksDB DBOptions and ColumnFamilyOptions by Flink community. Current supported candidate predefined-options are DEFAULT, SPINNING_DISK_OPTIMIZED, SPINNING_DISK_OPTIMIZED_HIGH_MEM or FLASH_SSD_OPTIMIZED. Note that user customized options and options from the OptionsFactory are applied on top of these predefined ones.
state.backend.rocksdb.timer-service.factory
"ROCKSDB" String This determines the factory for timer service state implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB .

Back to top