This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
This document is a short introduction to the PyFlink Table API, which is used to help novice users quickly understand the basic usage of PyFlink Table API.
For advanced usage, please refer to other documents in this User Guide.
All Table API and SQL programs, both batch and streaming, follow the same pattern. The following code example shows the common structure of Table API and SQL programs.
Table is a core component of the Python Table API. A Table is a logical representation of the intermediate result of a Table API Job.
A Table is always bound to a specific TableEnvironment. It is not possible to combine tables from different TableEnvironments in same query, e.g., to join or union them.
Create using a List Object
You can create a Table from a list object:
The result is:
You can also create the Table with specified column names:
The result is:
By default the table schema is extracted from the data automatically.
If the automatically generated table schema isn’t satisfactory, you can specify it manually:
The result is:
Create using a Connector
You can create a Table using connector DDL:
The result is:
Create using a Catalog
A TableEnvironment maintains a map of catalogs of tables which are created with an identifier.
The tables in a catalog may either be temporary, and tied to the lifecycle of a single Flink session, or permanent, and visible across multiple Flink sessions.
The tables and views created via SQL DDL, e.g. “create table …” and “create view …” are also stored in a catalog.
You can directly access the tables in a catalog via SQL.
If you want to use tables from a catalog with the Table API, you can use the “from_path” method to create the Table API objects:
The Table object offers many methods for applying relational operations.
These methods return new Table objects representing the result of applying the relational operations on the input Table.
These relational operations may be composed of multiple method calls, such as table.group_by(...).select(...).
The Table API documentation describes all Table API operations that are supported on streaming and batch tables.
The following example shows a simple Table API aggregation query:
The result is:
Write SQL Queries
Flink’s SQL integration is based on Apache Calcite, which implements the SQL standard. SQL queries are specified as Strings.
The SQL documentation describes Flink’s SQL support for streaming and batch tables.
The following example shows a simple SQL aggregation query:
The result is:
In fact, this shows the change logs received by the print sink.
The output format of a change log is:
For example, “2> +I(4,11)” means this message comes from the 2nd subtask, and “+I” means it is an insert message. “(4, 11)” is the content of the message.
In addition, “-U” means a retract record (i.e. update-before), which means this message should be deleted or retracted from the sink.
“+U” means this is an update record (i.e. update-after), which means this message should be updated or inserted by the sink.
So, we get this result from the change logs above:
Mix the Table API and SQL
The Table objects used in Table API and the tables used in SQL can be freely converted to each other.
The following example shows how to use a Table object in SQL:
The result is:
And the following example shows how to use SQL tables in the Table API:
Note “to_pandas” will trigger the materialization of the table and collect table content to the memory of the client, it’s good practice to limit the number of rows collected via Table.limit.
Note “to_pandas” is not supported by the flink planner, and not all data types can be emitted to pandas DataFrames.
Emit Results to One Sink Table
You can call the “execute_insert” method to emit the data in a Table object to a sink table:
The result is:
This could also be done using SQL:
Emit Results to Multiple Sink Tables
You can use a StatementSet to emit the Tables to multiple sink tables in one job:
The result is:
Explain Tables
The Table API provides a mechanism to explain the logical and optimized query plans used to compute a Table.
This is done through the Table.explain() or StatementSet.explain() methods. Table.explain()returns the plan of a Table. StatementSet.explain() returns the plan for multiple sinks. These methods return a string describing three things:
the Abstract Syntax Tree of the relational query, i.e., the unoptimized logical query plan,
the optimized logical query plan, and
the physical execution plan.
TableEnvironment.explain_sql() and TableEnvironment.execute_sql() support executing an EXPLAIN statement to get the plans. Please refer to the EXPLAIN page for more details.
The following code shows how to use the Table.explain() method:
The result is:
The following code shows how to use the StatementSet.explain() method: