This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

Upsert Kafka SQL Connector

Scan Source: Unbounded Sink: Streaming Upsert Mode

The Upsert Kafka connector allows for reading data from and writing data into Kafka topics in the upsert fashion.

As a source, the upsert-kafka connector produces a changelog stream, where each data record represents an update or delete event. More precisely, the value in a data record is interpreted as an UPDATE of the last value for the same key, if any (if a corresponding key doesn’t exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE”.

As a sink, the upsert-kafka connector can consume a changelog stream. It will write INSERT/UPDATE_AFTER data as normal Kafka messages value, and write DELETE data as Kafka messages with null values (indicate tombstone for the key). Flink will guarantee the message ordering on the primary key by partition data on the values of the primary key columns, so the update/deletion messages on the same key will fall into the same partition.

Dependencies

In order to use the Upsert Kafka connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Upsert Kafka version Maven dependency SQL Client JAR
universal <dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.12.7</version>
</dependency>
Download

Full Example

The example below shows how to create and use an Upsert Kafka table:

CREATE TABLE pageviews_per_region (
  user_region STRING,
  pv BIGINT,
  uv BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'avro',
  'value.format' = 'avro'
);

CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  user_region STRING,
  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = '...',
  'format' = 'json'
);

-- calculate the pv, uv and insert into the upsert-kafka sink
INSERT INTO pageviews_per_region
SELECT
  user_region,
  COUNT(*),
  COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

Attention Make sure to define the primary key in the DDL.

Available Metadata

See the regular Kafka connector for a list-kafka.md of all available metadata fields.

Connector Options

Option Required Default Type Description
connector
required (none) String Specify which connector to use, for the Upsert Kafka use: 'upsert-kafka'.
topic
required (none) String The Kafka topic name to read from and write to.
properties.bootstrap.servers
required (none) String Comma separated list of Kafka brokers.
properties.*
optional (none) String This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via 'properties.allow.auto.create.topics' = 'false'. But there are some configurations that do not support to set, because Flink will override them, e.g. 'key.deserializer' and 'value.deserializer'.
key.format
required (none) String

The format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options.

Attention Compared to the regular Kafka connector, the key fields are specified by the PRIMARY KEY syntax.
key.fields-prefix
optional (none) String Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and 'key.fields' will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that 'value.fields-include' must be set to 'EXCEPT_KEY'.
value.format
required (none) String The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options.
value.fields-include
optional ALL

Enum

Possible values: [ALL, EXCEPT_KEY]
Defines a strategy how to deal with key columns in the data type of the value format. By default, 'ALL' physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format.
sink.parallelism
optional (none) Integer Defines the parallelism of the upsert-kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.

Features

Key and Value Formats

See the regular Kafka connector for more explanation around key and value formats. However, note that this connector requires both a key and value format where the key fields are derived from the PRIMARY KEY constraint.

The following example shows how to specify and configure key and value formats. The format options are prefixed with either the 'key' or 'value' plus format identifier.

CREATE TABLE KafkaTable (
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  ...

  'key.format' = 'json',
  'key.json.ignore-parse-errors' = 'true',

  'value.format' = 'json',
  'value.json.fail-on-missing-field' = 'false',
  'value.fields-include' = 'EXCEPT_KEY'
)

Primary Key Constraints

The Upsert Kafka always works in the upsert fashion and requires to define the primary key in the DDL. With the assumption that records with the same key should be ordered in the same partition, the primary key semantic on the changelog source means the materialized changelog is unique on the primary keys. The primary key definition will also control which fields should end up in Kafka’s key.

Consistency Guarantees

By default, an Upsert Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with checkpointing enabled.

This means, Flink may write duplicate records with the same key into the Kafka topic. But as the connector is working in the upsert mode, the last record on the same key will take effect when reading back as a source. Therefore, the upsert-kafka connector achieves idempotent writes just like the HBase sink.

Source Per-Partition Watermarks

Flink supports to emit per-partition watermarks for Upsert Kafka. Watermarks are generated inside the Kafka consumer. The per-partition watermarks are merged in the same way as watermarks are merged during streaming shuffles. The output watermark of the source is determined by the minimum watermark among the partitions it reads. If some partitions in the topics are idle, the watermark generator will not advance. You can alleviate this problem by setting the 'table.exec.source.idle-timeout' option in the table configuration.

Please refer to Kafka watermark strategies for more details.

Data Type Mapping

Upsert Kafka stores message keys and values as bytes, so Upsert Kafka doesn’t have schema or data types. The messages are serialized and deserialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.

Back to top