JDBC Connector #

This connector provides a sink that writes data to a JDBC database.

To use it, add the following dependency to your project (along with your JDBC driver):


Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution here.

JdbcSink.sink #

The JDBC sink provides at-least-once guarantee. Effectively though, exactly-once can be achieved by crafting upsert SQL statements or idempotent SQL updates. Configuration goes as follow (see also JdbcSink javadoc ).

      	sqlDmlStatement,                       // mandatory
      	jdbcStatementBuilder,                  // mandatory   	
      	jdbcExecutionOptions,                  // optional
      	jdbcConnectionOptions                  // mandatory

SQL DML statement and JDBC statement builder #

The sink builds one JDBC prepared statement from a user-provider SQL string, e.g.:

INSERT INTO some_table field1, field2 values (?, ?)

It then repeatedly calls a user-provided function to update that prepared statement with each value of the stream, e.g.:

(preparedStatement, someRecord) -> { ... update here the preparedStatement with values from someRecord ... }

JDBC execution options #

The SQL DML statements are executed in batches, which can optionally be configured with the following instance (see also JdbcExecutionOptions javadoc )

        .withBatchIntervalMs(200)             // optional: default = 0, meaning no time-based execution is done
        .withBathSize(1000)                   // optional: default = 5000 values
        .withMaxRetries(5)                    // optional: default = 3 

A JDBC batch is executed as soon as one of the following conditions is true:

  • the configured batch interval time is elapsed
  • the maximum batch size is reached
  • a Flink checkpoint has started

JDBC connection parameters #

The connection to the database is configured with a JdbcConnectionOptions instance. Please see JdbcConnectionOptions javadoc for details

Full example #

public class JdbcSinkExample {

    static class Book {
        public Book(Long id, String title, String authors, Integer year) {
            this.id = id;
            this.title = title;
            this.authors = authors;
            this.year = year;
        final Long id;
        final String title;
        final String authors;
        final Integer year;

    public static void main(String[] args) throws Exception {
        var env = StreamExecutionEnvironment.getExecutionEnvironment();

                new Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
                new Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
                new Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
                new Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
                        "insert into books (id, title, authors, year) values (?, ?, ?, ?)",
                        (statement, book) -> {
                            statement.setLong(1, book.id);
                            statement.setString(2, book.title);
                            statement.setString(3, book.authors);
                            statement.setInt(4, book.year);
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

JdbcSink.exactlyOnceSink #

Since 1.13, Flink JDBC sink supports exactly-once mode. The implementation relies on the JDBC driver support of XA standard.

Attention: In 1.13, Flink JDBC sink does not support exactly-once mode with MySQL or other databases that do not support multiple XA transaction per connection. We will improve the support in FLINK-22239.

To use it, create a sink using exactlyOnceSink() method as above and additionally provide:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
                (ps, t) -> {
                    ps.setInt(1, t.id);
                    ps.setString(2, t.title);
                    ps.setString(3, t.author);
                    ps.setDouble(4, t.price);
                    ps.setInt(5, t.qty);
                () -> {
                    // create a driver-specific XA DataSource
                    EmbeddedXADataSource ds = new EmbeddedXADataSource();
                    return ds;

Please refer to the JdbcXaSinkFunction documentation for more details.