This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

Confluent Avro Format

Format: Serialization Schema Format: Deserialization Schema

The Avro Schema Registry (avro-confluent) format allows you to read records that were serialized by the io.confluent.kafka.serializers.KafkaAvroSerializer and to write records that can in turn be read by the io.confluent.kafka.serializers.KafkaAvroDeserializer.

When reading (deserializing) a record with this format the Avro writer schema is fetched from the configured Confluent Schema Registry based on the schema version id encoded in the record while the reader schema is inferred from table schema.

When writing (serializing) a record with this format the Avro schema is inferred from the table schema and used to retrieve a schema id to be encoded with the data. The lookup is performed with in the configured Confluent Schema Registry under the subject given in avro-confluent.schema-registry.subject.

The Avro Schema Registry format can only be used in conjunction with Apache Kafka SQL connector.

Dependencies

In order to use the Avro Schema Registry format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency SQL Client JAR
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro-confluent-registry</artifactId>
  <version>1.12.7</version>
</dependency>
Download

How to create a table with Avro-Confluent format

Here is an example to create a table using Kafka connector and Confluent Avro format.

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'localhost:9092',
  'topic' = 'user_behavior',
  'format' = 'avro-confluent',
  'avro-confluent.schema-registry.url' = 'http://localhost:8081',
  'avro-confluent.schema-registry.subject' = 'user_behavior'
)

Format Options

Option Required Default Type Description
format
required (none) String Specify what format to use, here should be 'avro-confluent'.
avro-confluent.basic-auth.credentials-source
optional (none) String Basic auth credentials source for Schema Registry
avro-confluent.basic-auth.user-info
optional (none) String Basic auth user info for schema registry
avro-confluent.bearer-auth.credentials-source
optional (none) String Bearer auth credentials source for Schema Registry
avro-confluent.bearer-auth.token
optional (none) String Bearer auth token for Schema Registry
avro-confluent.ssl.keystore.location
optional (none) String Location / File of SSL keystore
avro-confluent.ssl.keystore.password
optional (none) String Password for SSL keystore
avro-confluent.ssl.truststore.location
optional (none) String Location / File of SSL truststore
avro-confluent.ssl.truststore.password
optional (none) String Password for SSL truststore
avro-confluent.schema-registry.subject
optional (none) String The Confluent Schema Registry subject under which to register the schema used by this format during serialization. By default, 'kafka' and 'upsert-kafka' connectors use '<topic_name>-value' or '<topic_name>-key' as the default subject name if this format is used as the value or key format. But for other connectors (e.g. 'filesystem'), the subject option is required when used as sink.
avro-confluent.schema-registry.url
required (none) String The URL of the Confluent Schema Registry to fetch/register schemas.

Data Type Mapping

Currently, Apache Flink always uses the table schema to derive the Avro reader schema during deserialization and Avro writer schema during serialization. Explicitly defining an Avro schema is not supported yet. See the Apache Avro Format for the mapping between Avro and Flink DataTypes.

In addition to the types listed there, Flink supports reading/writing nullable types. Flink maps nullable types to Avro union(something, null), where something is the Avro type converted from Flink type.

You can refer to Avro Specification for more information about Avro types.