This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
Important: Maven artifacts which depend on Scala are now suffixed with the Scala major version, e.g. "2.10" or "2.11". Please consult the migration guide on the project Wiki.

Apache Kafka Connector

This connector provides access to event streams served by Apache Kafka.

Flink provides special Kafka Connectors for reading and writing data from/to Kafka topics. The Flink Kafka Consumer integrates with Flink’s checkpointing mechanism to provide exactly-once processing semantics. To achieve that, Flink does not purely rely on Kafka’s consumer group offset tracking, but tracks and checkpoints these offsets internally as well.

Please pick a package (maven artifact id) and class name for your use-case and environment. For most users, the FlinkKafkaConsumer08 (part of flink-connector-kafka) is appropriate.

Maven Dependency Supported since Consumer and
Producer Class name
Kafka version Notes
flink-connector-kafka 0.9.1, 0.10 FlinkKafkaConsumer082
FlinkKafkaProducer
0.8.x Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK by Flink.
flink-connector-kafka-0.8_2.10 1.0.0 FlinkKafkaConsumer08
FlinkKafkaProducer08
0.8.x Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK by Flink.
flink-connector-kafka-0.9_2.10 1.0.0 FlinkKafkaConsumer09
FlinkKafkaProducer09
0.9.x Uses the new Consumer API Kafka.

Then, import the connector in your maven project:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
  <version>1.0.3</version>
</dependency>

Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution here.

Installing Apache Kafka

  • Follow the instructions from Kafka’s quickstart to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
  • On 32 bit computers this problem may occur.
  • If the Kafka and Zookeeper servers are running on a remote machine, then the advertised.host.name setting in the config/server.properties file must be set to the machine’s IP address.

Kafka Consumer

Flink’s Kafka consumer is called FlinkKafkaConsumer08 (or 09). It provides access to one or more Kafka topics.

The constructor accepts the following arguments:

  1. The topic name / list of topic names
  2. A DeserializationSchema / KeyedDeserializationSchema for deserializing the data from Kafka
  3. Properties for the Kafka consumer. The following properties are required:
    • “bootstrap.servers” (comma separated list of Kafka brokers)
    • “zookeeper.connect” (comma separated list of Zookeeper servers) (only required for Kafka 0.8)
    • “group.id” the id of the consumer group

Example:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
	.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties))
	.print();
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
stream = env
    .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
    .print
The DeserializationSchema

The FlinkKafkaConsumer08 needs to know how to turn the data in Kafka into Java objects. The DeserializationSchema allows users to specify such a schema. The T deserialize(byte[] message) method gets called for each Kafka message, passing the value from Kafka. For accessing both the key and value of the Kafka message, the KeyedDeserializationSchema has the following deserialize method ` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)`.

For convenience, Flink provides a TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema) which creates a schema based on a Flink TypeInformation.

Kafka Consumers and Fault Tolerance

With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that where stored in the checkpoint.

The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.

To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs

Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers.

If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.

Kafka Producer

The FlinkKafkaProducer08 writes data to a Kafka topic. The producer can specify a custom partitioner that assigns records to partitions.

Example:

stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))

You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to the Apache Kafka documentation for details on how to configure Kafka Producers.

Note: By default, the number of retries is set to “0”. This means that the producer fails immediately on errors, including leader changes. The value is set to “0” by default to avoid duplicate messages in the target topic. For most production environments with frequent broker changes, we recommend setting the number of retries to a higher value.

There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery into a Kafka topic.