The Table API and SQL are integrated in a joint API. The central concept of this API is a Table which serves as input and output of queries. This document shows the common structure of programs with Table API and SQL queries, how to register a Table, how to query a Table, and how to emit a Table.
Blink treats batch jobs as a special case of streaming. As such, the conversion between Table and DataSet is also not supported, and batch jobs will not be translated into DateSet programs but translated into DataStream programs, the same as the streaming jobs.
The Blink planner does not support BatchTableSource, use bounded StreamTableSource instead of it.
The Blink planner only support the brand new Catalog and does not support ExternalCatalog which is deprecated.
The implementations of FilterableTableSource for the old planner and the Blink planner are incompatible. The old planner will push down PlannerExpressions into FilterableTableSource, while the Blink planner will push down Expressions.
String based key-value config options (Please see the documentation about Configuration for details) are only used for the Blink planner.
The implementation(CalciteConfig) of PlannerConfig in two planners is different.
The Blink planner will optimize multiple-sinks into one DAG (supported only on TableEnvironment, not on StreamTableEnvironment). The old planner will always optimize each sink into a new DAG, where all DAGs are independent of each other.
The old planner does not support catalog statistics now, while the Blink planner does.
Structure of Table API and SQL Programs
All Table API and SQL programs for batch and streaming follow the same pattern. The following code example shows the common structure of Table API and SQL programs.
Note: Table API and SQL queries can be easily integrated with and embedded into DataStream or DataSet programs. Have a look at the Integration with DataStream and DataSet API section to learn how DataStreams and DataSets can be converted into Tables and vice versa.
The TableEnvironment is a central concept of the Table API and SQL integration. It is responsible for:
Registering a Table in the internal catalog
Registering an external catalog
Executing SQL queries
Registering a user-defined (scalar, table, or aggregation) function
Converting a DataStream or DataSet into a Table
Holding a reference to an ExecutionEnvironment or StreamExecutionEnvironment
A Table is always bound to a specific TableEnvironment. It is not possible to combine tables of different TableEnvironments in the same query, e.g., to join or union them.
A TableEnvironment is created by calling the static BatchTableEnvironment.create() or StreamTableEnvironment.create() method with a StreamExecutionEnvironment or an ExecutionEnvironment and an optional TableConfig. The TableConfig can be used to configure the TableEnvironment or to customize the query optimization and translation process (see Query Optimization).
Make sure to choose the specific planner BatchTableEnvironment/StreamTableEnvironment that matches your programming language.
If both planner jars are on the classpath (the default behavior), you should explicitly set which planner to use in the current program.
Note: If there is only one planner jar in /lib directory, you can use useAnyPlanner (use_any_planner for python) to create specific EnvironmentSettings.
A TableEnvironment maintains a map of catalogs of tables which are created with an identifier. Each
identifier consists of 3 parts: catalog name, database name and object name. If a catalog or database is not
specified, the current default value will be used (see examples in the Table identifier expanding section).
Tables can be either virtual (VIEWS) or regular (TABLES). VIEWS can be created from an
existing Table object, usually the result of a Table API or SQL query. TABLES describe
external data, such as a file, database table, or message queue.
Temporary vs Permanent tables.
Tables may either be temporary, and tied to the lifecycle of a single Flink session, or permanent,
and visible across multiple Flink sessions and clusters.
Permanent tables require a catalog (such as Hive Metastore)
to maintain metadata about the table. Once a permanent table is created, it is visible to any Flink
session that is connected to the catalog and will continue to exist until the table is explicitly
On the other hand, temporary tables are always stored in memory and only exist for the duration of
the Flink session they are created within. These tables are not visible to other sessions. They are
not bound to any catalog or database but can be created in the namespace of one. Temporary tables
are not dropped if their corresponding database is removed.
It is possible to register a temporary table with the same identifier as an existing permanent
table. The temporary table shadows the permanent one and makes the permanent table inaccessible as
long as the temporary one exists. All queries with that identifier will be executed against the
This might be useful for experimentation. It allows running exactly the same query first against a
temporary table that e.g. has just a subset of data, or the data is obfuscated. Once verified that
the query is correct it can be run against the real production table.
Create a Table
A Table API object corresponds to a VIEW (virtual table) in a SQL terms. It encapsulates a logical
query plan. It can be created in a catalog as follows:
Note:Table objects are similar to VIEW’s from relational database
systems, i.e., the query that defines the Table is not optimized but will be inlined when another
query references the registered Table. If multiple queries reference the same registered Table,
it will be inlined for each referencing query and executed multiple times, i.e., the result of the
registered Table will not be shared.
It is also possible to create a TABLE as known from relational databases from a connector declaration.
The connector describes the external system that stores the data of a table. Storage systems such as Apacha Kafka or a regular file system can be declared here.
Expanding Table identifiers
Tables are always registered with a 3 part identifier consisting of catalog, database, and
table name. The first two parts are optional and if they are not provided the set default values will
be used. Identifiers follow SQL requirements which means that they can be escaped with a backtick character (`).
Additionally all SQL reserved keywords must be escaped.
Query a Table
The Table API is a language-integrated query API for Scala and Java. In contrast to SQL, queries are not specified as Strings but are composed step-by-step in the host language.
The API is based on the Table class which represents a table (streaming or batch) and offers methods to apply relational operations. These methods return a new Table object, which represents the result of applying the relational operation on the input Table. Some relational operations are composed of multiple method calls such as table.groupBy(...).select(), where groupBy(...) specifies a grouping of table, and select(...) the projection on the grouping of table.
The Table API document describes all Table API operations that are supported on streaming and batch tables.
The following example shows a simple Table API aggregation query:
Note: The Scala Table API uses Scala Symbols, which start with a single tick (') to reference the attributes of a Table. The Table API uses Scala implicits. Make sure to import org.apache.flink.api.scala._ and org.apache.flink.table.api.scala._ in order to use Scala implicit conversions.
A Table is emitted by writing it to a TableSink. A TableSink is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ).
A batch Table can only be written to a BatchTableSink, while a streaming Table requires either an AppendStreamTableSink, a RetractStreamTableSink, or an UpsertStreamTableSink.
Please see the documentation about Table Sources & Sinks for details about available sinks and instructions for how to implement a custom TableSink.
The Table.insertInto(String tableName) method emits the Table to a registered TableSink. The method looks up the TableSink from the catalog by the name and validates that the schema of the Table is identical to the schema of the TableSink.
The behavior of translating and executing a query is different for the two planners.
Table API and SQL queries are translated into DataStream or DataSet programs depending on whether their input is a streaming or batch input. A query is internally represented as a logical query plan and is translated in two phases:
Optimization of the logical plan
Translation into a DataStream or DataSet program
A Table API or SQL query is translated when:
a Table is emitted to a TableSink, i.e., when Table.insertInto() is called.
a SQL update query is specified, i.e., when TableEnvironment.sqlUpdate() is called.
Both planners on stream can integrate with the DataStream API. Only old planner can integrate with the DataSet API, Blink planner on batch could not be combined with both.
Note: The DataSet API discussed below is only relevant for the old planner on batch.
Table API and SQL queries can be easily integrated with and embedded into DataStream and DataSet programs. For instance, it is possible to query an external table (for example from a RDBMS), do some pre-processing, such as filtering, projecting, aggregating, or joining with meta data, and then further process the data with either the DataStream or DataSet API (and any of the libraries built on top of these APIs, such as CEP or Gelly). Inversely, a Table API or SQL query can also be applied on the result of a DataStream or DataSet program.
This interaction can be achieved by converting a DataStream or DataSet into a Table and vice versa. In this section, we describe how these conversions are done.
Implicit Conversion for Scala
The Scala Table API features implicit conversions for the DataSet, DataStream, and Table classes. These conversions are enabled by importing the package org.apache.flink.table.api.scala._ in addition to org.apache.flink.api.scala._ for the Scala DataStream API.
Create a View from a DataStream or DataSet
A DataStream or DataSet can be registered in a TableEnvironment as a View. The schema of the resulting view depends on the data type of the registered DataStream or DataSet. Please check the section about mapping of data types to table schema for details.
Note: Views created from a DataStream or DataSet can be registered as temporary views only.
A Table can be converted into a DataStream or DataSet. In this way, custom DataStream or DataSet programs can be run on the result of a Table API or SQL query.
When converting a Table into a DataStream or DataSet, you need to specify the data type of the resulting DataStream or DataSet, i.e., the data type into which the rows of the Table are to be converted. Often the most convenient conversion type is Row. The following list gives an overview of the features of the different options:
Row: fields are mapped by position, arbitrary number of fields, support for null values, no type-safe access.
POJO: fields are mapped by name (POJO fields must be named as Table fields), arbitrary number of fields, support for null values, type-safe access.
Case Class: fields are mapped by position, no support for null values, type-safe access.
Tuple: fields are mapped by position, limitation to 22 (Scala) or 25 (Java) fields, no support for null values, type-safe access.
Atomic Type: Table must have a single field, no support for null values, type-safe access.
Convert a Table into a DataStream
A Table that is the result of a streaming query will be updated dynamically, i.e., it is changing as new records arrive on the query’s input streams. Hence, the DataStream into which such a dynamic query is converted needs to encode the updates of the table.
There are two modes to convert a Table into a DataStream:
Append Mode: This mode can only be used if the dynamic Table is only modified by INSERT changes, i.e, it is append-only and previously emitted results are never updated.
Retract Mode: This mode can always be used. It encodes INSERT and DELETE changes with a boolean flag.
Note: A detailed discussion about dynamic tables and their properties is given in the Dynamic Tables document.
Flink’s DataStream and DataSet APIs support very diverse types. Composite types such as Tuples (built-in Scala and Flink Java tuples), POJOs, Scala case classes, and Flink’s Row type allow for nested data structures with multiple fields that can be accessed in table expressions. Other types are treated as atomic types. In the following, we describe how the Table API converts these types into an internal row representation and show examples of converting a DataStream into a Table.
The mapping of a data type to a table schema can happen in two ways: based on the field positions or based on the field names.
Position-based mapping can be used to give fields a more meaningful name while keeping the field order. This mapping is available for composite data types with a defined field order as well as atomic types. Composite data types such as tuples, rows, and case classes have such a field order. However, fields of a POJO must be mapped based on the field names (see next section). Fields can be projected out but can’t be renamed using an alias as.
When defining a position-based mapping, the specified names must not exist in the input data type, otherwise the API will assume that the mapping should happen based on the field names. If no field names are specified, the default field names and field order of the composite type are used or f0 for atomic types.
Name-based mapping can be used for any data type including POJOs. It is the most flexible way of defining a table schema mapping. All fields in the mapping are referenced by name and can be possibly renamed using an alias as. Fields can be reordered and projected out.
If no field names are specified, the default field names and field order of the composite type are used or f0 for atomic types.
Flink treats primitives (Integer, Double, String) or generic types (types that cannot be analyzed and decomposed) as atomic types. A DataStream or DataSet of an atomic type is converted into a Table with a single attribute. The type of the attribute is inferred from the atomic type and the name of the attribute can be specified.
Tuples (Scala and Java) and Case Classes (Scala only)
Flink supports Scala’s built-in tuples and provides its own tuple classes for Java. DataStreams and DataSets of both kinds of tuples can be converted into tables. Fields can be renamed by providing names for all fields (mapping based on position). If no field names are specified, the default field names are used. If the original field names (f0, f1, … for Flink Tuples and _1, _2, … for Scala Tuples) are referenced, the API assumes that the mapping is name-based instead of position-based. Name-based mapping allows for reordering fields and projection with alias (as).
POJO (Java and Scala)
Flink supports POJOs as composite types. The rules for what determines a POJO are documented here.
When converting a POJO DataStream or DataSet into a Table without specifying field names, the names of the original POJO fields are used. The name mapping requires the original names and cannot be done by positions. Fields can be renamed using an alias (with the as keyword), reordered, and projected.
The Row data type supports an arbitrary number of fields and fields with null values. Field names can be specified via a RowTypeInfo or when converting a RowDataStream or DataSet into a Table. The row type supports mapping of fields by position and by name. Fields can be renamed by providing names for all fields (mapping based on position) or selected individually for projection/ordering/renaming (mapping based on name).
Apache Flink leverages Apache Calcite to optimize and translate queries. The optimization currently performed include projection and filter push-down, subquery decorrelation, and other kinds of query rewriting. Old planner does not yet optimize the order of joins, but executes them in the same order as defined in the query (order of Tables in the FROM clause and/or order of join predicates in the WHERE clause).
It is possible to tweak the set of optimization rules which are applied in different phases by providing a CalciteConfig object. This can be created via a builder by calling CalciteConfig.createBuilder()) and is provided to the TableEnvironment by calling tableEnv.getConfig.setPlannerConfig(calciteConfig).
Apache Flink leverages and extends Apache Calcite to perform sophisticated query optimization.
This includes a series of rule and cost-based optimizations such as:
Subquery decorrelation based on Apache Calcite
Sub-plan deduplication to avoid duplicate computation
Special subquery rewriting, including two parts:
Converts IN and EXISTS into left semi-joins
Converts NOT IN and NOT EXISTS into left anti-join
Optional join reordering
Enabled via table.optimizer.join-reorder-enabled
Note: IN/EXISTS/NOT IN/NOT EXISTS are currently only supported in conjunctive conditions in subquery rewriting.
The optimizer makes intelligent decisions, based not only on the plan but also rich statistics available from the data sources and fine-grain costs for each operator such as io, cpu, network, and memory.
Advanced users may provide custom optimizations via a CalciteConfig object that can be provided to the table environment by calling TableEnvironment#getConfig#setPlannerConfig.
Explaining a Table
The Table API provides a mechanism to explain the logical and optimized query plans to compute a Table.
This is done through the TableEnvironment.explain(table) method or TableEnvironment.explain() method. explain(table) returns the plan of a given Table. explain() returns the result of a multiple-sinks plan and is mainly used for the Blink planner. It returns a String describing three plans:
the Abstract Syntax Tree of the relational query, i.e., the unoptimized logical query plan,
the optimized logical query plan, and
the physical execution plan.
The following code shows an example and the corresponding output for given Table using explain(table):
The following code shows an example and the corresponding output for multiple-sinks plan using explain():