Apache Kafka SQL Connector

Scan Source: Unbounded Sink: Streaming Append Mode

The Kafka connector allows for reading data from and writing data into Kafka topics.

Dependencies

Apache Flink ships with multiple Kafka connectors: universal, 0.10, and 0.11. This universal Kafka connector attempts to track the latest version of the Kafka client. The version of the client it uses may change between Flink releases. Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. For most users the universal Kafka connector is the most appropriate. However, for Kafka versions 0.11.x and 0.10.x, we recommend using the dedicated 0.11 and 0.10 connectors, respectively. For details on Kafka compatibility, please refer to the official Kafka documentation.

Kafka Version Maven dependency SQL Client JAR
universal flink-connector-kafka_2.11 Only available for stable releases

The Kafka connectors are not currently part of the binary distribution. See how to link with them for cluster execution here.

How to create a Kafka table

The example below shows how to create a Kafka table:

CREATE TABLE kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'csv',
 'scan.startup.mode' = 'earliest-offset'
)

Connector Options

Option Required Default Type Description
connector
required (none) String Specify what connector to use, for Kafka use: 'kafka'.
topic
required for sink, optional for source(use 'topic-pattern' instead if not set) (none) String Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'. Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.
topic-pattern
optional (none) String The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern" and "topic" can be specified for sources.
properties.bootstrap.servers
required (none) String Comma separated list of Kafka brokers.
properties.group.id
required by source (none) String The id of the consumer group for Kafka source, optional for Kafka sink.
format
required (none) String The format used to deserialize and serialize Kafka messages. The supported formats are 'csv', 'json', 'avro', 'debezium-json' and 'canal-json'. Please refer to Formats page for more details and more format options.
scan.startup.mode
optional group-offsets String Startup mode for Kafka consumer, valid values are 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'. See the following Start Reading Position for more details.
scan.startup.specific-offsets
optional (none) String Specify offsets for each partition in case of 'specific-offsets' startup mode, e.g. 'partition:0,offset:42;partition:1,offset:300'.
scan.startup.timestamp-millis
optional (none) Long Start from the specified epoch timestamp (milliseconds) used in case of 'timestamp' startup mode.
scan.topic-partition-discovery.interval
optional (none) Duration Interval for consumer to discover dynamically created Kafka topics and partitions periodically.
sink.partitioner
optional (none) String Output partitioning from Flink's partitions into Kafka's partitions. Valid values are
  • fixed: each Flink partition ends up in at most one Kafka partition.
  • round-robin: a Flink partition is distributed to Kafka partitions round-robin.
  • Custom FlinkKafkaPartitioner subclass: e.g. 'org.mycompany.MyPartitioner'.
sink.semantic
optional at-least-once String Defines the delivery semantic for the Kafka sink. Valid enumerationns are 'at-lease-once', 'exactly-once' and 'none'. See Consistency guarantees for more details.

Features

Topic and Partition Discovery

The config option topic and topic-pattern specifies the topics or topic pattern to consume for source. The config option topic can accept topic list using semicolon separator like ‘topic-1;topic-2’. The config option topic-pattern will use regular expression to discover the matched topic. For example, if the topic-pattern is test-topic-[0-9], then all topics with names that match the specified regular expression (starting with test-topic- and ending with a single digit)) will be subscribed by the consumer when the job starts running.

To allow the consumer to discover dynamically created topics after the job started running, set a non-negative value for scan.topic-partition-discovery.interval. This allows the consumer to discover partitions of new topics with names that also match the specified pattern.

Please refer to Kafka DataStream Connector documentation for more about topic and partition discovery.

Notice that topic list and topic pattern only work in source. In sink, Flink currently only supports single topic.

Start Reading Position

The config option scan.startup.mode specifies the startup mode for Kafka consumer. The valid enumerations are:

  • group-offsets: start from committed offsets in ZK / Kafka brokers of a specific consumer group.
  • earliest-offset: start from the earliest offset possible.
  • latest-offset: start from the latest offset.
  • timestamp: start from user-supplied timestamp for each partition.
  • specific-offsets: start from user-supplied specific offsets for each partition.

The default option value is group-offsets which indicates to consume from last committed offsets in ZK / Kafka brokers.

If timestamp is specified, another config option scan.startup.timestamp-millis is required to specify a specific startup timestamp in milliseconds since January 1, 1970 00:00:00.000 GMT.

If specific-offsets is specified, another config option scan.startup.specific-offsets is required to specify specific startup offsets for each partition, e.g. an option value partition:0,offset:42;partition:1,offset:300 indicates offset 42 for partition 0 and offset 300 for partition 1.

Changelog Source

Flink natively supports Kafka as a changelog source. If messages in Kafka topic is change event captured from other databases using CDC tools, then you can use a CDC format to interpret messages as INSERT/UPDATE/DELETE messages into Flink SQL system. Flink provides two CDC formats debezium-json and canal-json to interpret change events captured by Debezium and Canal. The changelog source is a very useful feature in many cases, such as synchronizing incremental data from databases to other systems, auditing logs, materialized views on databases, temporal join changing history of a database table and so on. See more about how to use the CDC formats in debezium-json and canal-json.

Sink Partitioning

The config option sink.partitioner specifies output partitioning from Flink’s partitions into Kafka’s partitions. By default, a Kafka sink writes to at most as many partitions as its own parallelism (each parallel instance of the sink writes to exactly one partition). In order to distribute the writes to more partitions or control the routing of rows into partitions, a custom sink partitioner can be provided. The round-robin partitioner is useful to avoid an unbalanced partitioning. However, it will cause a lot of network connections between all the Flink instances and all the Kafka brokers.

Consistency guarantees

By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with checkpointing enabled.

With Flink’s checkpointing enabled, the kafka connector can provide exactly-once delivery guarantees.

Besides enabling Flink’s checkpointing, you can also choose three different modes of operating chosen by passing appropriate sink.semantic option:

  • NONE: Flink will not guarantee anything. Produced records can be lost or they can be duplicated.
  • AT_LEAST_ONCE (default setting): This guarantees that no records will be lost (although they can be duplicated).
  • EXACTLY_ONCE: Kafka transactions will be used to provide exactly-once semantic. Whenever you write to Kafka using transactions, do not forget about setting desired isolation.level (read_committed or read_uncommitted - the latter one is the default value) for any application consuming records from Kafka.

Please refer to Kafka documentation for more caveats about delivery guarantees.

Data Type Mapping

Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or data types. The Kafka messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.

Back to top