JDBC
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

JDBC Connector #

This connector provides a sink that writes data to a JDBC database.

To use it, add the following dependency to your project (along with your JDBC driver):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.13-SNAPSHOT</version>
</dependency>

Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution here.

JdbcSink.sink #

The JDBC sink provides at-least-once guarantee. Effectively though, exactly-once can be achieved by crafting upsert SQL statements or idempotent SQL updates. Configuration goes as follow (see also JdbcSink javadoc .

JdbcSink.sink(
      	sqlDmlStatement,                       // mandatory
      	jdbcStatementBuilder,                  // mandatory   	
      	jdbcExecutionOptions,                  // optional
      	jdbcConnectionOptions                  // mandatory
);

SQL DML statement and JDBC statement builder #

The sink builds one JDBC prepared statement from a user-provider SQL string, e.g.:

INSERT INTO some_table field1, field2 values (?, ?)

It then repeatedly calls a user-provided function to update that prepared statement with each value of the stream, e.g.:

(preparedStatement, someRecord) -> { ... update here the preparedStatement with values from someRecord ... }

JDBC execution options #

The SQL DML statements are executed in batches, which can optionally be configured with the following instance (see also JdbcExecutionOptions javadoc )

JdbcExecutionOptions.builder()
        .withBatchIntervalMs(200)             // optional: default = 0, meaning no time-based execution is done
        .withBathSize(1000)                   // optional: default = 5000 values
        .withMaxRetries(5)                    // optional: default = 3 
.build()

A JDBC batch is executed as soon as one of the following condition is true:

  • the configured batch interval time is elapsed
  • the maximum batch size is reached
  • a Flink checkpoint has started

JDBC connection parameters #

The connection to the database is configured with a JdbcConnectionOptions instance. Please see JdbcConnectionOptions javadoc ) for details

Full example #

public class JdbcSinkExample {

    static class Book {
        public Book(Long id, String title, String authors, Integer year) {
            this.id = id;
            this.title = title;
            this.authors = authors;
            this.year = year;
        }
        final Long id;
        final String title;
        final String authors;
        final Integer year;
    }

    public static void main(String[] args) throws Exception {
        var env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.fromElements(
                new Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
                new Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
                new Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
                new Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
        ).addSink(
                JdbcSink.sink(
                        "insert into books (id, title, authors, year) values (?, ?, ?, ?)",
                        (statement, book) -> {
                            statement.setLong(1, book.id);
                            statement.setString(2, book.title);
                            statement.setString(3, book.authors);
                            statement.setInt(4, book.year);
                        },
                        JdbcExecutionOptions.builder()
                                .withBatchSize(1000)
                                .withBatchIntervalMs(200)
                                .withMaxRetries(5)
                                .build(),
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
                                .withDriverName("org.postgresql.Driver")
                                .withUsername("someUser")
                                .withPassword("somePassword")
                                .build()
                ));
                
        env.execute();
    }
}

JdbcSink.exactlyOnceSink #

Since 1.13, Flink JDBC sink supports exactly-once mode. The implementation relies on the JDBC driver support of XA standard.

To use it, create a sink using exactlyOnceSink() method as above and additionally provide:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
        .fromElements(...)
        .addSink(JdbcSink.exactlyOnceSink(
                "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
                (ps, t) -> {
                    ps.setInt(1, t.id);
                    ps.setString(2, t.title);
                    ps.setString(3, t.author);
                    ps.setDouble(4, t.price);
                    ps.setInt(5, t.qty);
                },
                JdbcExecutionOptions.builder().build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(getDbMetadata().getUrl())
                        .withDriverName(getDbMetadata().getDriverClass())
                        .build()),
                JdbcExactlyOnceOptions.defaults(),
                () -> {
                    // create a driver-specific XA DataSource
                    EmbeddedXADataSource ds = new EmbeddedXADataSource();
                    ds.setDatabaseName("my_db");
                    return ds;
                });
env.execute();

Please refer to the JdbcXaSinkFunction documentation for more details.