Scan Source: Unbounded Sink: Streaming Append Mode
The Kafka connector allows for reading data from and writing data into Kafka topics.
Apache Flink ships with multiple Kafka connectors: universal, 0.10, and 0.11.
This universal Kafka connector attempts to track the latest version of the Kafka client.
The version of the client it uses may change between Flink releases.
Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later.
For most users the universal Kafka connector is the most appropriate.
However, for Kafka versions 0.11.x and 0.10.x, we recommend using the dedicated
0.10 connectors, respectively.
For details on Kafka compatibility, please refer to the official Kafka documentation.
|Kafka Version||Maven dependency||SQL Client JAR|
||Only available for stable releases|
The Kafka connectors are not currently part of the binary distribution. See how to link with them for cluster execution here.
The example below shows how to create a Kafka table:
|required||(none)||String||Specify what connector to use, for Kafka use
|required for sink||(none)||String||Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like
|optional||(none)||String||The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern" and "topic" can be specified for sources.|
|required||(none)||String||Comma separated list of Kafka brokers.|
|required by source||(none)||String||The id of the consumer group for Kafka source, optional for Kafka sink.|
|required||(none)||String||The format used to deserialize and serialize Kafka messages.
The supported formats are
|optional||group-offsets||String||Startup mode for Kafka consumer, valid values are
|optional||(none)||String||Specify offsets for each partition in case of
|optional||(none)||Long||Start from the specified epoch timestamp (milliseconds) used in case of
|optional||(none)||Duration||Interval for consumer to discover dynamically created Kafka topics and partitions periodically.|
|optional||(none)||String||Output partitioning from Flink's partitions into Kafka's partitions. Valid values are
|optional||at-least-once||String||Defines the delivery semantic for the Kafka sink. Valid enumerationns are
The config option
topic-pattern specifies the topics or topic pattern to consume for source. The config option
topic can accept topic list using semicolon separator like ‘topic-1;topic-2’.
The config option
topic-pattern will use regular expression to discover the matched topic. For example, if the
test-topic-[0-9], then all topics with names that match the specified regular expression (starting with
test-topic- and ending with a single digit)) will be subscribed by the consumer when the job starts running.
To allow the consumer to discover dynamically created topics after the job started running, set a non-negative value for
scan.topic-partition-discovery.interval. This allows the consumer to discover partitions of new topics with names that also match the specified pattern.
Please refer to Kafka DataStream Connector documentation for more about topic and partition discovery.
Notice that topic list and topic pattern only work in source. In sink, Flink currently only supports single topic.
The config option
scan.startup.mode specifies the startup mode for Kafka consumer. The valid enumerations are:
group-offsets: start from committed offsets in ZK / Kafka brokers of a specific consumer group.
earliest-offset: start from the earliest offset possible.
latest-offset: start from the latest offset.
timestamp: start from user-supplied timestamp for each partition.
specific-offsets: start from user-supplied specific offsets for each partition.
The default option value is
group-offsets which indicates to consume from last committed offsets in ZK / Kafka brokers.
timestamp is specified, another config option
scan.startup.timestamp-millis is required to specify a specific startup timestamp in milliseconds since January 1, 1970 00:00:00.000 GMT.
specific-offsets is specified, another config option
scan.startup.specific-offsets is required to specify specific startup offsets for each partition,
e.g. an option value
partition:0,offset:42;partition:1,offset:300 indicates offset
42 for partition
0 and offset
300 for partition
Flink natively supports Kafka as a changelog source. If messages in Kafka topic is change event captured from other databases using CDC tools, then you can use a CDC format to interpret messages as INSERT/UPDATE/DELETE messages into Flink SQL system. Flink provides two CDC formats debezium-json and canal-json to interpret change events captured by Debezium and Canal. The changelog source is a very useful feature in many cases, such as synchronizing incremental data from databases to other systems, auditing logs, materialized views on databases, temporal join changing history of a database table and so on. See more about how to use the CDC formats in debezium-json and canal-json.
The config option
sink.partitioner specifies output partitioning from Flink’s partitions into Kafka’s partitions.
By default, a Kafka sink writes to at most as many partitions as its own parallelism (each parallel instance of the sink writes to exactly one partition).
In order to distribute the writes to more partitions or control the routing of rows into partitions, a custom sink partitioner can be provided. The
round-robin partitioner is useful to avoid an unbalanced partitioning.
However, it will cause a lot of network connections between all the Flink instances and all the Kafka brokers.
By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with checkpointing enabled.
With Flink’s checkpointing enabled, the
kafka connector can provide exactly-once delivery guarantees.
Besides enabling Flink’s checkpointing, you can also choose three different modes of operating chosen by passing appropriate
none: Flink will not guarantee anything. Produced records can be lost or they can be duplicated.
at-least-once(default setting): This guarantees that no records will be lost (although they can be duplicated).
exactly-once: Kafka transactions will be used to provide exactly-once semantic. Whenever you write to Kafka using transactions, do not forget about setting desired
read_uncommitted- the latter one is the default value) for any application consuming records from Kafka.
Please refer to Kafka documentation for more caveats about delivery guarantees.
Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or data types. The Kafka messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.