This connector provides sinks that writes data into a Apache Cassandra database.
To use this connector, add the following dependency to your project:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.11</artifactId>
<version>1.4.2</version>
</dependency>
Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution here.
There are multiple ways to bring up a Cassandra instance on local machine:
Flink’s Cassandra sink are created by using the static CassandraSink.addSink(DataStream
The following configuration methods can be used:
A checkpoint committer stores additional information about completed checkpoints
in some resource. This information is used to prevent a full replay of the last
completed checkpoint in case of a failure.
You can use a CassandraCommitter
to store these in a separate table in cassandra.
Note that this table will NOT be cleaned up by Flink.
Flink can provide exactly-once guarantees if the query is idempotent (meaning it can be applied multiple times without changing the result) and checkpointing is enabled. In case of a failure the failed checkpoint will be replayed completely.
Furthermore, for non-deterministic programs the write-ahead log has to be enabled. For such a program the replayed checkpoint may be completely different than the previous attempt, which may leave the database in an inconsistent state since part of the first attempt may already be written. The write-ahead log guarantees that the replayed checkpoint is identical to the first attempt. Note that that enabling this feature will have an adverse impact on latency.
Note: The write-ahead log functionality is currently experimental. In many cases it is sufficient to use the connector without enabling it. Please report problems to the development mailing list.
With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
More details on checkpoints docs and fault tolerance guarantee docs
The Cassandra sinks currently support both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to Supported Data Types. We show two implementations based on SocketWindowWordCount, for Pojo and Tuple data types respectively.
In all these examples, we assumed the associated Keyspace example
and Table wordcount
have been created.
CREATE KEYSPACE IF NOT EXISTS example
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
CREATE TABLE IF NOT EXISTS example.wordcount (
word text,
count bigint,
PRIMARY KEY(word)
);
While storing the result with Java/Scala Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery(‘stmt’)) to persist each record back to the database. With the upsert query cached as PreparedStatement
, each Tuple element is converted to parameters of the statement.
For details about PreparedStatement
and BoundStatement
, please visit DataStax Java Driver manual
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<Tuple2<String, Long>> result = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
// normalize and split the line
String[] words = value.toLowerCase().split("\\s");
// emit the pairs
for (String word : words) {
//Do not accept empty word, since word is defined as primary key in C* table
if (!word.isEmpty()) {
out.collect(new Tuple2<String, Long>(word, 1L));
}
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
CassandraSink.addSink(result)
.setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
.setHost("127.0.0.1")
.build();
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')
// parse the data, group it, window it, and aggregate the counts
val result: DataStream[(String, Long)] = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap(_.toLowerCase.split("\\s"))
.filter(_.nonEmpty)
.map((_, 1L))
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
CassandraSink.addSink(result)
.setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
.setHost("127.0.0.1")
.build()
result.print().setParallelism(1)
An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow DataStax Java Driver Manual to annotate the class as each field of this entity is mapped to an associated column of the designated table using the DataStax Java Driver com.datastax.driver.mapping.Mapper
class.
The mapping of each table column can be defined through annotations placed on a field declaration in the Pojo class. For details of the mapping, please refer to CQL documentation on Definition of Mapped Classes and CQL Data types
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordCount> result = text
.flatMap(new FlatMapFunction<String, WordCount>() {
public void flatMap(String value, Collector<WordCount> out) {
// normalize and split the line
String[] words = value.toLowerCase().split("\\s");
// emit the pairs
for (String word : words) {
if (!word.isEmpty()) {
//Do not accept empty word, since word is defined as primary key in C* table
out.collect(new WordCount(word, 1L));
}
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<WordCount>() {
@Override
public WordCount reduce(WordCount a, WordCount b) {
return new WordCount(a.getWord(), a.getCount() + b.getCount());
}
});
CassandraSink.addSink(result)
.setHost("127.0.0.1")
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
.build();
@Table(keyspace = "example", name = "wordcount")
public class WordCount {
@Column(name = "word")
private String word = "";
@Column(name = "count")
private long count = 0;
public WordCount() {}
public WordCount(String word, long count) {
this.setWord(word);
this.setCount(count);
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
@Override
public String toString() {
return getWord() + " : " + getCount();
}
}