This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
Important: Maven artifacts which depend on Scala are now suffixed with the Scala major version, e.g. "2.10" or "2.11". Please consult the migration guide on the project Wiki.

RabbitMQ Connector

License of the RabbitMQ Connector

Flink’s RabbitMQ connector defines a Maven dependency on the “RabbitMQ AMQP Java Client”, licensed under the Mozilla Public License v1.1 (MPL 1.1).

Flink itself neither reuses source code from the “RabbitMQ AMQP Java Client” nor packages binaries from the “RabbitMQ AMQP Java Client”.

Users that create and publish derivative work based on Flink’s RabbitMQ connector (thereby re-distributing the “RabbitMQ AMQP Java Client”) must be aware that this may be subject to conditions declared in the Mozilla Public License v1.1 (MPL 1.1).

RabbitMQ Connector

This connector provides access to data streams from RabbitMQ. To use this connector, add the following dependency to your project:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-rabbitmq_2.10</artifactId>
  <version>1.1.5</version>
</dependency>

Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.

Installing RabbitMQ

Follow the instructions from the RabbitMQ download page. After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched.

RabbitMQ Source

A class which provides an interface for receiving data from RabbitMQ.

The followings have to be provided for the RMQSource(…) constructor in order:

  • RMQConnectionConfig.
  • queueName: The RabbitMQ queue name.
  • usesCorrelationId: true when correlation ids should be used, false otherwise (default is false).
  • deserializationSchema: Deserialization schema to turn messages into Java objects.

This source can be operated in three different modes:

  1. Exactly-once (when checkpointed) with RabbitMQ transactions and messages with unique correlation IDs.
  2. At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism (correlation id is not set).
  3. No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.

Correlation ids are a RabbitMQ application feature. You have to set it in the message properties when injecting messages into RabbitMQ. If you set usesCorrelationId to true and do not supply unique correlation ids, the source will throw an exception (if the correlation id is null) or ignore messages with non-unique correlation ids. If you set usesCorrelationId to false, then you don’t have to supply correlation ids.

Example:

RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost").setPort(5000).setUserName(..)
.setPassword(..).setVirtualHost("/").build();
DataStream<String> streamWithoutCorrelationIds = env
	.addSource(new RMQSource<String>(connectionConfig, "hello", new SimpleStringSchema()))
	.print

DataStream<String> streamWithCorrelationIds = env
	.addSource(new RMQSource<String>(connectionConfig, "hello", true, new SimpleStringSchema()))
	.print
val connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost").setPort(5000).setUserName(..)
.setPassword(..).setVirtualHost("/").build()
streamWithoutCorrelationIds = env
    .addSource(new RMQSource[String](connectionConfig, "hello", new SimpleStringSchema))
    .print

streamWithCorrelationIds = env
    .addSource(new RMQSource[String](connectionConfig, "hello", true, new SimpleStringSchema))
    .print

RabbitMQ Sink

A class providing an interface for sending data to RabbitMQ.

The followings have to be provided for the RMQSink(…) constructor in order:

  1. RMQConnectionConfig
  2. The queue name
  3. Serialization schema

Example:

RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost").setPort(5000).setUserName(..)
.setPassword(..).setVirtualHost("/").build();
stream.addSink(new RMQSink<String>(connectionConfig, "hello", new StringToByteSerializer()));
val connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost").setPort(5000).setUserName(..)
.setPassword(..).setVirtualHost("/").build()
stream.addSink(new RMQSink[String](connectionConfig, "hello", new StringToByteSerializer))

More about RabbitMQ can be found here.