Apache Kafka Connector

This connector provides access to event streams served by Apache Kafka.

Flink provides special Kafka Connectors for reading and writing data from/to Kafka topics. The Flink Kafka Consumer integrates with Flink’s checkpointing mechanism to provide exactly-once processing semantics. To achieve that, Flink does not purely rely on Kafka’s consumer group offset tracking, but tracks and checkpoints these offsets internally as well.

Please pick a package (maven artifact id) and class name for your use-case and environment. For most users, the FlinkKafkaConsumer08 (part of flink-connector-kafka) is appropriate.

Maven Dependency Supported since Consumer and
Producer Class name
Kafka version Notes
flink-connector-kafka-0.8_2.10 1.0.0 FlinkKafkaConsumer08
FlinkKafkaProducer08
0.8.x Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK by Flink.
flink-connector-kafka-0.9_2.10 1.0.0 FlinkKafkaConsumer09
FlinkKafkaProducer09
0.9.x Uses the new Consumer API Kafka.
flink-connector-kafka-0.10_2.10 1.2.0 FlinkKafkaConsumer010
FlinkKafkaProducer010
0.10.x This connector supports Kafka messages with timestamps both for producing and consuming.

Then, import the connector in your maven project:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
  <version>1.2-SNAPSHOT</version>
</dependency>

Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution here.

Installing Apache Kafka

  • Follow the instructions from Kafka’s quickstart to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
  • If the Kafka and Zookeeper servers are running on a remote machine, then the advertised.host.name setting in the config/server.properties file must be set to the machine’s IP address.

Kafka Consumer

Flink’s Kafka consumer is called FlinkKafkaConsumer08 (or 09 for Kafka 0.9.0.x versions). It provides access to one or more Kafka topics.

The constructor accepts the following arguments:

  1. The topic name / list of topic names
  2. A DeserializationSchema / KeyedDeserializationSchema for deserializing the data from Kafka
  3. Properties for the Kafka consumer. The following properties are required:
    • “bootstrap.servers” (comma separated list of Kafka brokers)
    • “zookeeper.connect” (comma separated list of Zookeeper servers) (only required for Kafka 0.8)
    • “group.id” the id of the consumer group

Example:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
	.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
stream = env
    .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
    .print

The current FlinkKafkaConsumer implementation will establish a connection from the client (when calling the constructor) for querying the list of topics and partitions.

For this to work, the consumer needs to be able to access the consumers from the machine submitting the job to the Flink cluster. If you experience any issues with the Kafka consumer on the client side, the client log might contain information about failed requests, etc.

The DeserializationSchema

The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into Java/Scala objects. The DeserializationSchema allows users to specify such a schema. The T deserialize(byte[] message) method gets called for each Kafka message, passing the value from Kafka.

It is usually helpful to start from the AbstractDeserializationSchema, which takes care of describing the produced Java/Scala type to Flink’s type system. Users that implement a vanilla DeserializationSchema need to implement the getProducedType(...) method themselves.

For accessing both the key and value of the Kafka message, the KeyedDeserializationSchema has the following deserialize method ` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)`.

For convenience, Flink provides the following schemas:

  1. TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema) which creates a schema based on a Flink’s TypeInformation. This is useful if the data is both written and read by Flink. This schema is a performant Flink-specific alternative to other generic serialization approaches.

  2. JsonDeserializationSchema (and JSONKeyValueDeserializationSchema) which turns the serialized JSON into an ObjectNode object, from which fields can be accessed using objectNode.get(“field”).as(Int/String/…)(). The KeyValue objectNode contains a “key” and “value” field which contain all fields, as well as an optional “metadata” field that exposes the offset/partition/topic for this message.

Kafka Consumers and Fault Tolerance

With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that where stored in the checkpoint.

The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.

To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs

Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers.

If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.

Kafka Consumers and Timestamp Extraction/Watermark Emission

In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself. In addition, the user may want to emit watermarks either periodically, or in an irregular fashion, e.g. based on special records in the Kafka stream that contain the current event-time watermark. For these cases, the Flink Kafka Consumer allows the specification of an AssignerWithPeriodicWatermarks or an AssignerWithPunctuatedWatermarks.

You can specify your custom timestamp extractor/watermark emitter as described here, or use one from the predefined ones. After doing so, you can pass it to your consumer in the following way:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());

DataStream<String> stream = env
	.addSource(myConsumer)
	.print();
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
stream = env
    .addSource(myConsumer)
    .print

Internally, an instance of the assigner is executed per Kafka partition. When such an assigner is specified, for each record read from Kafka, the extractTimestamp(T element, long previousElementTimestamp) is called to assign a timestamp to the record and the Watermark getCurrentWatermark() (for periodic) or the Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) (for punctuated) is called to determine if a new watermark should be emitted and with which timestamp.

Kafka Producer

The FlinkKafkaProducer08 writes data to a Kafka topic. The producer can specify a custom partitioner that assigns records to partitions.

Example:

stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);

You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to the Apache Kafka documentation for details on how to configure Kafka Producers.

Similar to the consumer, the producer also allows using an advanced serialization schema which allows serializing the key and value separately. It also allows to override the target topic id, so that one producer instance can send data to multiple topics.

The interface of the serialization schema is called KeyedSerializationSchema.

Note: By default, the number of retries is set to “0”. This means that the producer fails immediately on errors, including leader changes. The value is set to “0” by default to avoid duplicate messages in the target topic. For most production environments with frequent broker changes, we recommend setting the number of retries to a higher value.

There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery into a Kafka topic.

Since Apache Kafka 0.10., Kafka’s messages can carry timestamps, indicating the time the event has occurred (see “event time” in Apache Flink) or the time when the message has been written to the Kafka broker.

The FlinkKafkaConsumer010 will emit records with the timestamp attached, if the time characteristic in Flink is set to TimeCharacteristic.EventTime (StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)).

The Kafka consumer does not emit watermarks. To emit watermarks, the same mechanisms as described above in “Kafka Consumers and Timestamp Extraction/Watermark Emission” using the assignTimestampsAndWatermarks method are applicable.

There is no need to define a timestamp extractor when using the timestamps from Kafka. The previousElementTimestamp argument of the extractTimestamp() method contains the timestamp carried by the Kafka message.

A timestamp extractor for a Kafka consumer would look like this:

public long extractTimestamp(Long element, long previousElementTimestamp) {
    return previousElementTimestamp;
}

The FlinkKafkaProducer010 only emits the record timestamp, if setWriteTimestampToKafka(true) is set.

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
config.setWriteTimestampToKafka(true);

Kafka Connector metrics

Flink’s Kafka connectors provide some metrics through Flink’s metrics system to analyze the behavior of the connector. The producers export Kafka’s internal metrics through Flink’s metric system for all supported versions. The consumers export all metrics starting from Kafka version 0.9. The Kafka documentation lists all exported metrics in its documentation.

In addition to these metrics, all consumers expose the current-offsets and committed-offsets for each topic partition. The current-offsets refers to the current offset in the partition. This refers to the offset of the last element that we retrieved and emitted successfully. The committed-offsets is the last committed offset.

The Kafka Consumers in Flink commit the offsets back to Zookeeper (Kafka 0.8) or the Kafka brokers (Kafka 0.9+). If checkpointing is disabled, offsets are committed periodically. With checkpointing, the commit happens once all operators in the streaming topology have confirmed that they’ve created a checkpoint of their state. This provides users with at-least-once semantics for the offsets committed to Zookeer or the broker. For offsets checkpointed to Flink, the system provides exactly once guarantees.

The offsets committed to ZK or the broker can also be used to track the read progress of the Kafka consumer. The difference between the committed offset and the most recent offset in each partition is called the consumer lag. If the Flink topology is consuming the data slower from the topic than new data is added, the lag will increase and the consumer will fall behind. For large production deployments we recommend monitoring that metric to avoid increasing latency.