@PublicEvolving public interface TableEnvironment
It is unified both on a language level for all JVM-based languages (i.e. there is no distinction between Scala and Java API) and for bounded and unbounded data processing.
A table environment is responsible for:
Table
s and other meta objects from a catalog.The path in methods such as createTemporaryView(String, Table)
should be a proper SQL identifier.
The syntax is following [[catalog-name.]database-name.]object-name, where the catalog name and database are
optional. For path resolution see useCatalog(String)
and useDatabase(String)
. All keywords
or other special characters need to be escaped.
Example: `cat.1`.`db`.`Table` resolves to an object named 'Table' (table is a reserved keyword, thus must be escaped) in a catalog named 'cat.1' and database named 'db'.
Note: This environment is meant for pure table programs. If you would like to convert from or to other Flink APIs, it might be necessary to use one of the available language-specific table environments in the corresponding bridging modules.
Modifier and Type | Method and Description |
---|---|
ConnectTableDescriptor |
connect(ConnectorDescriptor connectorDescriptor)
Creates a table source and/or table sink from a descriptor.
|
static TableEnvironment |
create(EnvironmentSettings settings)
Creates a table environment that is the entry point and central context for creating Table and SQL
API programs.
|
void |
createTemporaryView(String path,
Table view)
Registers a
Table API object as a temporary view similar to SQL temporary views. |
boolean |
dropTemporaryTable(String path)
Drops a temporary table registered in the given path.
|
boolean |
dropTemporaryView(String path)
Drops a temporary view registered in the given path.
|
JobExecutionResult |
execute(String jobName)
Triggers the program execution.
|
String |
explain(boolean extended)
Returns the AST of the specified Table API and SQL queries and the execution plan to compute
the result of multiple-sinks plan.
|
String |
explain(Table table)
Returns the AST of the specified Table API and SQL queries and the execution plan to compute
the result of the given
Table . |
String |
explain(Table table,
boolean extended)
Returns the AST of the specified Table API and SQL queries and the execution plan to compute
the result of the given
Table . |
Table |
from(String path)
Reads a registered table and returns the resulting
Table . |
Table |
fromTableSource(TableSource<?> source)
Creates a table from a table source.
|
Optional<Catalog> |
getCatalog(String catalogName)
Gets a registered
Catalog by name. |
String[] |
getCompletionHints(String statement,
int position)
Deprecated.
Will be removed in the next release
|
TableConfig |
getConfig()
Returns the table config that defines the runtime behavior of the Table API.
|
String |
getCurrentCatalog()
Gets the current default catalog name of the current session.
|
String |
getCurrentDatabase()
Gets the current default database name of the running session.
|
void |
insertInto(String targetPath,
Table table)
Instructs to write the content of a
Table API object into a table. |
void |
insertInto(Table table,
String sinkPath,
String... sinkPathContinued)
Deprecated.
|
String[] |
listCatalogs()
Gets the names of all catalogs registered in this environment.
|
String[] |
listDatabases()
Gets the names of all databases registered in the current catalog.
|
String[] |
listFunctions()
Gets the names of all functions in this environment.
|
String[] |
listModules()
Gets an array of names of all modules in this environment in the loaded order.
|
String[] |
listTables()
Gets the names of all tables available in the current namespace (the current database of the current catalog).
|
String[] |
listTemporaryTables()
Gets the names of all temporary tables and views available in the current namespace (the current
database of the current catalog).
|
String[] |
listTemporaryViews()
Gets the names of all temporary views available in the current namespace (the current
database of the current catalog).
|
String[] |
listUserDefinedFunctions()
Gets the names of all user defined functions registered in this environment.
|
void |
loadModule(String moduleName,
Module module)
Loads a
Module under a unique name. |
void |
registerCatalog(String catalogName,
Catalog catalog)
Registers a
Catalog under a unique name. |
void |
registerFunction(String name,
ScalarFunction function)
Registers a
ScalarFunction under a unique name. |
void |
registerTable(String name,
Table table)
Deprecated.
|
void |
registerTableSink(String name,
String[] fieldNames,
TypeInformation<?>[] fieldTypes,
TableSink<?> tableSink)
Deprecated.
Use
connect(ConnectorDescriptor) instead. |
void |
registerTableSink(String name,
TableSink<?> configuredSink)
Deprecated.
Use
connect(ConnectorDescriptor) instead. |
void |
registerTableSource(String name,
TableSource<?> tableSource)
Deprecated.
Use
connect(ConnectorDescriptor) instead. |
Table |
scan(String... tablePath)
Deprecated.
use
from(String) |
Table |
sqlQuery(String query)
Evaluates a SQL query on registered tables and retrieves the result as a
Table . |
void |
sqlUpdate(String stmt)
Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
NOTE: Currently only SQL INSERT statements and CREATE TABLE statements are supported.
|
void |
unloadModule(String moduleName)
Unloads a
Module with given name. |
void |
useCatalog(String catalogName)
Sets the current catalog to the given value.
|
void |
useDatabase(String databaseName)
Sets the current default database.
|
static TableEnvironment create(EnvironmentSettings settings)
It is unified both on a language level for all JVM-based languages (i.e. there is no distinction between Scala and Java API) and for bounded and unbounded data processing.
A table environment is responsible for:
Table
s and other meta objects from a catalog.Note: This environment is meant for pure table programs. If you would like to convert from or to other Flink APIs, it might be necessary to use one of the available language-specific table environments in the corresponding bridging modules.
settings
- The environment settings used to instantiate the TableEnvironment
.Table fromTableSource(TableSource<?> source)
source
- table source used as tablevoid registerCatalog(String catalogName, Catalog catalog)
catalogName
- The name under which the catalog will be registered.catalog
- The catalog to register.Optional<Catalog> getCatalog(String catalogName)
Catalog
by name.catalogName
- The name to look up the Catalog
.void loadModule(String moduleName, Module module)
Module
under a unique name. Modules will be kept in the loaded order.
ValidationException is thrown when there is already a module with the same name.moduleName
- name of the Module
module
- the module instancevoid unloadModule(String moduleName)
Module
with given name.
ValidationException is thrown when there is no module with the given namemoduleName
- name of the Module
void registerFunction(String name, ScalarFunction function)
ScalarFunction
under a unique name. Replaces already existing
user-defined functions under this name.@Deprecated void registerTable(String name, Table table)
createTemporaryView(String, Table)
Table
under a unique name in the TableEnvironment's catalog.
Registered tables can be referenced in SQL queries.
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.
name
- The name under which the table will be registered.table
- The table to register.void createTemporaryView(String path, Table view)
Table
API object as a temporary view similar to SQL temporary views.
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.
path
- The path under which the view will be registered.
See also the TableEnvironment
class description for the format of the path.view
- The view to register.@Deprecated void registerTableSource(String name, TableSource<?> tableSource)
connect(ConnectorDescriptor)
instead.TableSource
in this TableEnvironment
's catalog.
Registered tables can be referenced in SQL queries.
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.
name
- The name under which the TableSource
is registered.tableSource
- The TableSource
to register.@Deprecated void registerTableSink(String name, String[] fieldNames, TypeInformation<?>[] fieldTypes, TableSink<?> tableSink)
connect(ConnectorDescriptor)
instead.TableSink
with given field names and types in this
TableEnvironment
's catalog.
Registered sink tables can be referenced in SQL DML statements.
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.
@Deprecated void registerTableSink(String name, TableSink<?> configuredSink)
connect(ConnectorDescriptor)
instead.TableSink
with already configured field names and field types in
this TableEnvironment
's catalog.
Registered sink tables can be referenced in SQL DML statements.
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.
@Deprecated Table scan(String... tablePath)
from(String)
Table
.
A table to scan must be registered in the TableEnvironment
. It can be either directly
registered or be an external member of a Catalog
.
See the documentation of useDatabase(String)
or
useCatalog(String)
for the rules on the path resolution.
Examples:
Scanning a directly registered table.
Table tab = tableEnv.scan("tableName");
Scanning a table from a registered catalog.
Table tab = tableEnv.scan("catalogName", "dbName", "tableName");
tablePath
- The path of the table to scan.Table
.useCatalog(String)
,
useDatabase(String)
Table from(String path)
Table
.
A table to scan must be registered in the TableEnvironment
.
See the documentation of useDatabase(String)
or
useCatalog(String)
for the rules on the path resolution.
Examples:
Reading a table from default catalog and database.
Table tab = tableEnv.from("tableName");
Reading a table from a registered catalog.
Table tab = tableEnv.from("catalogName.dbName.tableName");
Reading a table from a registered catalog with escaping. (Table
is a reserved keyword).
Dots in e.g. a database name also must be escaped.
Table tab = tableEnv.from("catalogName.`db.Name`.`Table`");
path
- The path of a table API object to scan.useCatalog(String)
,
useDatabase(String)
@Deprecated void insertInto(Table table, String sinkPath, String... sinkPathContinued)
insertInto(String, Table)
Table
to a TableSink
that was registered under the specified name.
See the documentation of useDatabase(String)
or
useCatalog(String)
for the rules on the path resolution.
table
- The Table to write to the sink.sinkPath
- The first part of the path of the registered TableSink
to which the Table
is
written. This is to ensure at least the name of the TableSink
is provided.sinkPathContinued
- The remaining part of the path of the registered TableSink
to which the
Table
is written.void insertInto(String targetPath, Table table)
Table
API object into a table.
See the documentation of useDatabase(String)
or
useCatalog(String)
for the rules on the path resolution.
ConnectTableDescriptor connect(ConnectorDescriptor connectorDescriptor)
Descriptors allow for declaring the communication to external systems in an implementation-agnostic way. The classpath is scanned for suitable table factories that match the desired configuration.
The following example shows how to read from a connector using a JSON format and register a table source as "MyTable":
tableEnv
.connect(
new ExternalSystemXYZ()
.version("0.11"))
.withFormat(
new Json()
.jsonSchema("{...}")
.failOnMissingField(false))
.withSchema(
new Schema()
.field("user-name", "VARCHAR").from("u_name")
.field("count", "DECIMAL")
.registerSource("MyTable");
connectorDescriptor
- connector descriptor describing the external systemString[] listCatalogs()
String[] listModules()
String[] listDatabases()
String[] listTables()
listTemporaryTables()
,
listTemporaryViews()
String[] listTemporaryTables()
listTables()
String[] listTemporaryViews()
listTables()
String[] listUserDefinedFunctions()
String[] listFunctions()
boolean dropTemporaryTable(String path)
If a permanent table with a given path exists, it will be used from now on for any queries that reference this path.
boolean dropTemporaryView(String path)
If a permanent table or view with a given path exists, it will be used from now on for any queries that reference this path.
String explain(Table table)
Table
.table
- The table for which the AST and execution plan will be returned.String explain(Table table, boolean extended)
Table
.table
- The table for which the AST and execution plan will be returned.extended
- if the plan should contain additional properties such as
e.g. estimated cost, traitsString explain(boolean extended)
extended
- if the plan should contain additional properties such as
e.g. estimated cost, traits@Deprecated String[] getCompletionHints(String statement, int position)
statement
- Partial or slightly incorrect SQL statementposition
- cursor positionTable sqlQuery(String query)
Table
.
All tables referenced by the query must be registered in the TableEnvironment.
A Table
is automatically registered when its Table#toString()
method is
called, for example when it is embedded into a String.
Hence, SQL queries can directly reference a Table
as follows:
Table table = ...;
String tableName = table.toString();
// the table is not registered to the table environment
tEnv.sqlQuery("SELECT * FROM tableName");
query
- The SQL query to evaluate.void sqlUpdate(String stmt)
All tables referenced by the query must be registered in the TableEnvironment.
A Table
is automatically registered when its Table#toString()
method is
called, for example when it is embedded into a String.
Hence, SQL queries can directly reference a Table
as follows:
// register the configured table sink into which the result is inserted.
tEnv.registerTableSink("sinkTable", configuredSink);
Table sourceTable = ...
String tableName = sourceTable.toString();
// sourceTable is not registered to the table environment
tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName");
A DDL statement can also be executed to create a table: For example, the below DDL statement would create a CSV table named `tbl1` into the current catalog:
create table tbl1( a int, b bigint, c varchar ) with ( 'connector.type' = 'filesystem', 'format.type' = 'csv', 'connector.path' = 'xxx' )
SQL queries can directly execute as follows:
This code snippet creates a job to read data from Kafka source into a CSV sink.String sinkDDL = "create table sinkTable( a int, b varchar ) with ( 'connector.type' = 'filesystem', 'format.type' = 'csv', 'connector.path' = 'xxx' )"; String sourceDDL ="create table sourceTable( a int, b varchar ) with ( 'connector.type' = 'kafka', 'update-mode' = 'append', 'connector.topic' = 'xxx', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', ... )"; String query = "INSERT INTO sinkTable SELECT * FROM sourceTable"; tEnv.sqlUpdate(sourceDDL); tEnv.sqlUpdate(sinkDDL); tEnv.sqlUpdate(query); tEnv.execute("MyJob");
stmt
- The SQL statement to evaluate.String getCurrentCatalog()
useCatalog(String)
void useCatalog(String catalogName)
useDatabase(String)
.
This is used during the resolution of object paths. Both the catalog and database are optional when referencing catalog objects such as tables, views etc. The algorithm looks for requested objects in following paths in that order:
[current-catalog].[current-database].[requested-path]
[current-catalog].[requested-path]
[requested-path]
Example:
Given structure with default catalog set to default_catalog
and default database set to
default_database
.
root: |- default_catalog |- default_database |- tab1 |- db1 |- tab1 |- cat1 |- db1 |- tab1
The following table describes resolved paths:
Requested path | Resolved path |
---|---|
tab1 | default_catalog.default_database.tab1 |
db1.tab1 | default_catalog.db1.tab1 |
cat1.db1.tab1 | cat1.db1.tab1 |
catalogName
- The name of the catalog to set as the current default catalog.useDatabase(String)
String getCurrentDatabase()
useDatabase(String)
void useDatabase(String databaseName)
This is used during the resolution of object paths. Both the catalog and database are optional when referencing catalog objects such as tables, views etc. The algorithm looks for requested objects in following paths in that order:
[current-catalog].[current-database].[requested-path]
[current-catalog].[requested-path]
[requested-path]
Example:
Given structure with default catalog set to default_catalog
and default database set to
default_database
.
root: |- default_catalog |- default_database |- tab1 |- db1 |- tab1 |- cat1 |- db1 |- tab1The following table describes resolved paths:
Requested path | Resolved path |
---|---|
tab1 | default_catalog.default_database.tab1 |
db1.tab1 | default_catalog.db1.tab1 |
cat1.db1.tab1 | cat1.db1.tab1 |
databaseName
- The name of the database to set as the current database.useCatalog(String)
TableConfig getConfig()
JobExecutionResult execute(String jobName) throws Exception
The program execution will be logged and displayed with the provided name
NOTE:It is highly advised to set all parameters in the TableConfig
on the very beginning of the program. It is undefined what configurations values will
be used for the execution if queries are mixed with config changes. It depends on
the characteristic of the particular parameter. For some of them the value from the
point in time of query construction (e.g. the currentCatalog) will be used. On the
other hand some values might be evaluated according to the state from the time when
this method is called (e.g. timeZone).
Once the execution finishes, any previously defined DMLs will be cleared, no matter
whether the execution succeeds or not. Therefore, if you want to retry in case of
failures, you have to re-define the DMLs, i.e. by calling sqlUpdate(String)
,
before you call this method again.
jobName
- Desired name of the jobException
- which occurs during job execution.Copyright © 2014–2020 The Apache Software Foundation. All rights reserved.