Avro Format

Format: Serialization Schema Format: Deserialization Schema

Apache Avro format 允许基于 Avro schema 读取和写入 Avro 数据。目前,Avro schema 从 table schema 推导而来。

依赖

为了设置 Avro format,下表提供了使用自动化构建工具(例如 Maven 或 SBT)和带有 SQL JAR 捆绑包的 SQL Client 的项目依赖信息。

您可以从 这里 下载 flink-avro,除此之外还需要额外的 Hadoop 依赖 用于集群执行。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.11.2</version>
</dependency>

如何使用 Avro format 创建表

这是使用 Kafka 连接器和 Avro format 创建表的示例。

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'avro'
)

Format 参数

参数 是否必选 默认值 类型 描述
format
必要 (none) String 指定使用什么 format,这里应该是 'avro'
avro.codec
可选 (none) String 仅用于 filesystem,avro 压缩编解码器。默认不压缩。目前支持:deflate、snappy、bzip2、xz。

数据类型映射

目前,Avro schema 通常是从 table schema 中推导而来。尚不支持显式定义 Avro schema。因此,下表列出了从 Flink 类型到 Avro 类型的类型映射。

Flink SQL 类型 Avro 类型 Avro 逻辑类型
CHAR / VARCHAR / STRING string
BOOLEAN boolean
BINARY / VARBINARY bytes
DECIMAL fixed decimal
TINYINT int
SMALLINT int
INT int
BIGINT long
FLOAT float
DOUBLE double
DATE int date
TIME int time-millis
TIMESTAMP long timestamp-millis
ARRAY array
MAP
(key 必须是 string/char/varchar 类型)
map
MULTISET
(元素必须是 string/char/varchar 类型)
map
ROW record

除了上面列出的类型,Flink 支持读取/写入 nullable 的类型。Flink 将 nullable 的类型映射到 Avro union(something, null),其中 something 是从 Flink 类型转换的 Avro 类型。

您可以参考 Avro 规范 获取更多有关 Avro 类型的信息。