pyflink.table package¶
Module contents¶
Important classes of Flink Table API:
pyflink.table.TableEnvironment
Main entry point forTable
and SQL functionality
pyflink.table.Table
The core component of the Table API. Use the methods ofTable
to transform data.
pyflink.table.TableConfig
A config to define the runtime behavior of the Table API. It is necessary when creatingTableEnvironment
.
pyflink.table.EnvironmentSettings
Defines all parameters that initialize a table environment.
pyflink.table.TableSource
Defines an external data source as a table.
pyflink.table.TableSink
Specifies how to emit a table to an external system or location.
pyflink.table.DataTypes
Defines a list of data types available.
pyflink.table.Row
A row in aTable
.
pyflink.table.window
Helper classes for working withpyflink.table.window.GroupWindow
(pyflink.table.window.Tumble
,pyflink.table.window.Session
,pyflink.table.window.Slide
) andpyflink.table.window.OverWindow
window (pyflink.table.window.Over
).
pyflink.table.descriptors
Helper classes that describes DDL information, such as how to connect to another system, the format of data, the schema of table, the event time attribute in the schema, etc.
pyflink.table.catalog
Responsible for reading and writing metadata such as database/table/views/UDFs from a registeredpyflink.table.catalog.Catalog
.
pyflink.table.TableSchema
Represents a table’s structure with field names and data types.
pyflink.table.FunctionContext
Used to obtain global runtime information about the context in which the user-defined function is executed, such as the metric group, and global job parameters, etc.
pyflink.table.ScalarFunction
Base interface for user-defined scalar function.
pyflink.table.StatementSet
Base interface accepts DML statements or Tables.
-
class
pyflink.table.
BatchTableEnvironment
(j_tenv)[source]¶ Bases:
pyflink.table.table_environment.TableEnvironment
-
connect
(connector_descriptor)[source]¶ Creates a temporary table from a descriptor.
Descriptors allow for declaring the communication to external systems in an implementation-agnostic way. The classpath is scanned for suitable table factories that match the desired configuration.
The following example shows how to read from a connector using a JSON format and registering a temporary table as “MyTable”:
>>> table_env \ ... .connect(ExternalSystemXYZ() ... .version("0.11")) \ ... .with_format(Json() ... .json_schema("{...}") ... .fail_on_missing_field(False)) \ ... .with_schema(Schema() ... .field("user-name", "VARCHAR") ... .from_origin_field("u_name") ... .field("count", "DECIMAL")) \ ... .create_temporary_table("MyTable")
- Parameters
connector_descriptor (pyflink.table.descriptors.ConnectorDescriptor) – Connector descriptor describing the external system.
- Returns
A
BatchTableDescriptor
or aStreamTableDescriptor
(for blink planner) used to build the temporary table.- Return type
pyflink.table.descriptors.BatchTableDescriptor or pyflink.table.descriptors.StreamTableDescriptor
Note
Deprecated in 1.11. Use
execute_sql()
to register a table instead.
-
static
create
(execution_environment=None, table_config=None, environment_settings=None)[source]¶ Creates a
BatchTableEnvironment
.Example:
# create with ExecutionEnvironment. >>> env = ExecutionEnvironment.get_execution_environment() >>> table_env = BatchTableEnvironment.create(env) # create with ExecutionEnvironment and TableConfig. >>> table_config = TableConfig() >>> table_config.set_null_check(False) >>> table_env = BatchTableEnvironment.create(env, table_config) # create with EnvironmentSettings. >>> environment_settings = EnvironmentSettings.new_instance().in_batch_mode() \ ... .use_blink_planner().build() >>> table_env = BatchTableEnvironment.create(environment_settings=environment_settings)
- Parameters
execution_environment (pyflink.dataset.ExecutionEnvironment) – The batch
ExecutionEnvironment
of the TableEnvironment.table_config (pyflink.table.TableConfig) – The configuration of the TableEnvironment, optional.
environment_settings (pyflink.table.EnvironmentSettings) – The environment settings used to instantiate the TableEnvironment. It provides the interfaces about planner selection(flink or blink), optional.
- Returns
The BatchTableEnvironment created from given ExecutionEnvironment and configuration.
- Return type
-
-
class
pyflink.table.
CsvTableSink
(field_names, field_types, path, field_delimiter=', ', num_files=-1, write_mode=None)[source]¶ Bases:
pyflink.table.sinks.TableSink
A simple
TableSink
to emit data as CSV files.Example:
>>> CsvTableSink(["a", "b"], [DataTypes.INT(), DataTypes.STRING()], ... "/csv/file/path", "|", 1, WriteMode.OVERWRITE)
- Parameters
field_names – The list of field names.
field_types – The list of field data types.
path – The output path to write the Table to.
field_delimiter – The field delimiter.
num_files – The number of files to write to.
write_mode – The write mode to specify whether existing files are overwritten or not, which contains:
WriteMode.NO_OVERWRITE
andWriteMode.OVERWRITE
.
-
class
pyflink.table.
CsvTableSource
(source_path, field_names, field_types)[source]¶ Bases:
pyflink.table.sources.TableSource
A
TableSource
for simple CSV files with a (logically) unlimited number of fields.- Parameters
source_path – The path to the CSV file.
field_names – The names of the table fields.
field_types – The types of the table fields.
-
class
pyflink.table.
DataTypes
[source]¶ Bases:
object
A
DataType
can be used to declare input and/or output types of operations. This class enumerates all supported data types of the Table & SQL API.-
static
ARRAY
(element_type, nullable=True)[source]¶ Data type of an array of elements with same subtype.
Compared to the SQL standard, the maximum cardinality of an array cannot be specified but is fixed at 2147483647(0x7fffffff). Also, any valid type is supported as a subtype.
- Parameters
element_type –
DataType
of each element in the array.nullable – boolean, whether the type can be null (None) or not.
-
static
BIGINT
(nullable=True)[source]¶ Data type of an 8-byte signed integer with values from -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
BINARY
(length, nullable=True)[source]¶ Data type of a fixed-length binary string (=a sequence of bytes).
- Parameters
length – int, the number of bytes. It must have a value between 1 and 2147483647(0x7fffffff) (both inclusive).
nullable – boolean, whether the type can be null (None) or not.
Note
BinaryType is still not supported yet.
-
static
BOOLEAN
(nullable=True)[source]¶ Data type of a boolean with a (possibly) three-valued logic of TRUE, FALSE, UNKNOWN.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
BYTES
(nullable=True)[source]¶ Data type of a variable-length binary string (=a sequence of bytes) with defined maximum length. This is a shortcut for
DataTypes.VARBINARY(2147483647)
.- Parameters
nullable – boolean, whether the type can be null (None) or not.
See also
-
static
CHAR
(length, nullable=True)[source]¶ Data type of a fixed-length character string.
- Parameters
length – int, the string representation length. It must have a value between 1 and 2147483647(0x7fffffff) (both inclusive).
nullable – boolean, whether the type can be null (None) or not.
Note
CharType is still not supported yet.
-
static
DATE
(nullable=True)[source]¶ Data type of a date consisting of year-month-day with values ranging from
0000-01-01
to9999-12-31
.Compared to the SQL standard, the range starts at year 0000.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
DAY
(precision=2)[source]¶ Resolution in days.
- Parameters
precision – int, the number of digits of days. It must have a value between 1 and 6 (both inclusive), (default: 2).
- Returns
the specified
Resolution
.
See also
-
static
DECIMAL
(precision, scale, nullable=True)[source]¶ Data type of a decimal number with fixed precision and scale.
- Parameters
precision – the number of digits in a number. It must have a value between 1 and 38 (both inclusive).
scale – the number of digits on right side of dot. It must have a value between 0 and precision (both inclusive).
nullable – boolean, whether the type can be null (None) or not.
Note
The precision must be 38 and the scale must be 18 currently.
-
static
DOUBLE
(nullable=True)[source]¶ Data type of an 8-byte double precision floating point number.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
FIELD
(name, data_type, description=None)[source]¶ Field definition with field name, data type, and a description.
- Parameters
name – string, name of the field.
data_type –
DataType
of the field.description – string, description of the field.
-
static
FLOAT
(nullable=True)[source]¶ Data type of a 4-byte single precision floating point number.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
INT
(nullable=True)[source]¶ Data type of a 2-byte signed integer with values from -2,147,483,648 to 2,147,483,647.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
INTERVAL
(upper_resolution, lower_resolution=None)[source]¶ Data type of a temporal interval. There are two types of temporal intervals: day-time intervals with up to nanosecond granularity or year-month intervals with up to month granularity.
An interval of day-time consists of
+days hours:months:seconds.fractional
with values ranging from-999999 23:59:59.999999999
to+999999 23:59:59.999999999
. The type must be parameterized to one of the following resolutions: interval of days, interval of days to hours, interval of days to minutes, interval of days to seconds, interval of hours, interval of hours to minutes, interval of hours to seconds, interval of minutes, interval of minutes to seconds, or interval of seconds. The value representation is the same for all types of resolutions. For example, an interval of seconds of 70 is always represented in an interval-of-days-to-seconds format (with default precisions):+00 00:01:10.000000
.An interval of year-month consists of
+years-months
with values ranging from-9999-11
to+9999-11
. The type must be parameterized to one of the following resolutions: interval of years, interval of years to months, or interval of months. The value representation is the same for all types of resolutions. For example, an interval of months of 50 is always represented in an interval-of-years-to-months format (with default year precision):+04-02
.Examples:
INTERVAL(DAY(2), SECOND(9))
for a day-time interval orINTERVAL(YEAR(4), MONTH())
for a year-month interval.- Parameters
upper_resolution –
Resolution
, the upper resolution of the interval.lower_resolution –
Resolution
, the lower resolution of the interval.
Note
the upper_resolution must be MONTH for YearMonthIntervalType, SECOND for DayTimeIntervalType and the lower_resolution must be None currently.
See also
See also
See also
See also
See also
See also
-
static
MAP
(key_type, value_type, nullable=True)[source]¶ Data type of an associative array that maps keys to values. A map cannot contain duplicate keys; each key can map to at most one value.
There is no restriction of key types; it is the responsibility of the user to ensure uniqueness. The map type is an extension to the SQL standard.
- Parameters
key_type –
DataType
of the keys in the map.value_type –
DataType
of the values in the map.nullable – boolean, whether the type can be null (None) or not.
-
static
MULTISET
(element_type, nullable=True)[source]¶ Data type of a multiset (=bag). Unlike a set, it allows for multiple instances for each of its elements with a common subtype. Each unique value is mapped to some multiplicity.
There is no restriction of element types; it is the responsibility of the user to ensure uniqueness.
- Parameters
element_type –
DataType
of each element in the multiset.nullable – boolean, whether the type can be null (None) or not.
-
static
NULL
()[source]¶ Data type for representing untyped null (None) values. A null type has no other value except null (None), thus, it can be cast to any nullable type.
This type helps in representing unknown types in API calls that use a null (None) literal as well as bridging to formats such as JSON or Avro that define such a type as well.
The null type is an extension to the SQL standard.
Note
NullType is still not supported yet.
-
static
ROW
(row_fields=[], nullable=True)[source]¶ Data type of a sequence of fields. A field consists of a field name, field type, and an optional description. The most specific type of a row of a table is a row type. In this case, each column of the row corresponds to the field of the row type that has the same ordinal position as the column.
Compared to the SQL standard, an optional field description simplifies the handling with complex structures.
- Parameters
row_fields – a list of row field types which can be created via
DataTypes.FIELD()
.nullable – boolean, whether the type can be null (None) or not.
-
static
SECOND
(precision=6)[source]¶ Resolution in seconds and (possibly) fractional seconds.
- Parameters
precision – int, the number of digits of fractional seconds. It must have a value between 0 and 9 (both inclusive), (default: 6).
- Returns
the specified
Resolution
.
Note
the precision must be 3 currently.
See also
-
static
SMALLINT
(nullable=True)[source]¶ Data type of a 2-byte signed integer with values from -32,768 to 32,767.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
STRING
(nullable=True)[source]¶ Data type of a variable-length character string with defined maximum length. This is a shortcut for
DataTypes.VARCHAR(2147483647)
.- Parameters
nullable – boolean, whether the type can be null (None) or not.
See also
-
static
TIME
(precision=0, nullable=True)[source]¶ Data type of a time WITHOUT time zone.
An instance consists of hour:minute:second[.fractional with up to nanosecond precision and values ranging from
00:00:00.000000000
to23:59:59.999999999
.Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported.
- Parameters
precision – int, the number of digits of fractional seconds. It must have a value between 0 and 9 (both inclusive).
nullable – boolean, whether the type can be null (None) or not.
Note
The precision must be 0 currently.
-
static
TIMESTAMP
(precision=6, nullable=True)[source]¶ Data type of a timestamp WITHOUT time zone.
An instance consists of year-month-day hour:minute:second[.fractional with up to nanosecond precision and values ranging from
0000-01-01 00:00:00.000000000
to9999-12-31 23:59:59.999999999
.Compared to the SQL standard, leap seconds (
23:59:60
and23:59:61
) are not supported.This class does not store or represent a time-zone. Instead, it is a description of the date, as used for birthdays, combined with the local time as seen on a wall clock. It cannot represent an instant on the time-line without additional information such as an offset or time-zone.
- Parameters
precision – int, the number of digits of fractional seconds. It must have a value between 0 and 9 (both inclusive). (default: 6)
nullable – boolean, whether the type can be null (None) or not.
Note
The precision must be 3 currently.
-
static
TIMESTAMP_WITH_LOCAL_TIME_ZONE
(precision=6, nullable=True)[source]¶ Data type of a timestamp WITH LOCAL time zone.
An instance consists of year-month-day hour:minute:second[.fractional with up to nanosecond precision and values ranging from
0000-01-01 00:00:00.000000000 +14:59
to9999-12-31 23:59:59.999999999 -14:59
.Compared to the SQL standard, leap seconds (
23:59:60
and23:59:61
) are not supported.The value will be stored internally as a long value which stores all date and time fields, to a precision of nanoseconds, as well as the offset from UTC/Greenwich.
- Parameters
precision – int, the number of digits of fractional seconds. It must have a value between 0 and 9 (both inclusive). (default: 6)
nullable – boolean, whether the type can be null (None) or not.
Note
LocalZonedTimestampType is currently only supported in blink planner and the precision must be 3.
-
static
TINYINT
(nullable=True)[source]¶ Data type of a 1-byte signed integer with values from -128 to 127.
- Parameters
nullable – boolean, whether the type can be null (None) or not.
-
static
VARBINARY
(length, nullable=True)[source]¶ Data type of a variable-length binary string (=a sequence of bytes)
- Parameters
length – int, the maximum number of bytes. It must have a value between 1 and 2147483647(0x7fffffff) (both inclusive).
nullable – boolean, whether the type can be null (None) or not.
Note
The length limit must be 0x7fffffff(2147483647) currently.
See also
-
static
VARCHAR
(length, nullable=True)[source]¶ Data type of a variable-length character string.
- Parameters
length – int, the maximum string representation length. It must have a value between 1 and 2147483647(0x7fffffff) (both inclusive).
nullable – boolean, whether the type can be null (None) or not.
Note
The length limit must be 0x7fffffff(2147483647) currently.
See also
-
static
-
class
pyflink.table.
EnvironmentSettings
(j_environment_settings)[source]¶ Bases:
object
Defines all parameters that initialize a table environment. Those parameters are used only during instantiation of a
TableEnvironment
and cannot be changed afterwards.Example:
>>> EnvironmentSettings.new_instance() \ ... .use_old_planner() \ ... .in_streaming_mode() \ ... .with_built_in_catalog_name("my_catalog") \ ... .with_built_in_database_name("my_database") \ ... .build()
-
class
Builder
[source]¶ Bases:
object
A builder for
EnvironmentSettings
.-
build
()[source]¶ Returns an immutable instance of EnvironmentSettings.
- Returns
an immutable instance of EnvironmentSettings.
- Return type
-
in_batch_mode
()[source]¶ Sets that the components should work in a batch mode. Streaming mode by default.
- Returns
This object.
- Return type
-
in_streaming_mode
()[source]¶ Sets that the components should work in a streaming mode. Enabled by default.
- Returns
This object.
- Return type
-
use_any_planner
()[source]¶ Does not set a planner requirement explicitly.
A planner will be discovered automatically, if there is only one planner available.
By default,
use_old_planner()
is enabled.- Returns
This object.
- Return type
-
use_blink_planner
()[source]¶ Sets the Blink planner as the required module. By default,
use_old_planner()
is enabled.- Returns
This object.
- Return type
-
use_old_planner
()[source]¶ Sets the old Flink planner as the required module.
This is the default behavior.
- Returns
This object.
- Return type
-
with_built_in_catalog_name
(built_in_catalog_name)[source]¶ Specifies the name of the initial catalog to be created when instantiating a
TableEnvironment
. This catalog will be used to store all non-serializable objects such as tables and functions registered via e.g.register_table_sink()
orregister_java_function()
. It will also be the initial value for the current catalog which can be altered viause_catalog()
.Default: “default_catalog”.
- Parameters
built_in_catalog_name (str) – The specified built-in catalog name.
- Returns
This object.
- Return type
-
with_built_in_database_name
(built_in_database_name)[source]¶ Specifies the name of the default database in the initial catalog to be created when instantiating a
TableEnvironment
. The database will be used to store all non-serializable objects such as tables and functions registered via e.g.register_table_sink()
orregister_java_function()
. It will also be the initial value for the current database which can be altered viause_database()
.Default: “default_database”.
- Parameters
built_in_database_name (str) – The specified built-in database name.
- Returns
This object.
- Return type
-
-
get_built_in_catalog_name
()[source]¶ Gets the specified name of the initial catalog to be created when instantiating a
TableEnvironment
.- Returns
The specified name of the initial catalog to be created.
- Return type
str
-
get_built_in_database_name
()[source]¶ Gets the specified name of the default database in the initial catalog to be created when instantiating a
TableEnvironment
.- Returns
The specified name of the default database in the initial catalog to be created.
- Return type
str
-
is_streaming_mode
()[source]¶ Tells if the
TableEnvironment
should work in a batch or streaming mode.- Returns
True if the TableEnvironment should work in a streaming mode, false otherwise.
- Return type
bool
-
class
-
class
pyflink.table.
ExplainDetail
[source]¶ Bases:
object
ExplainDetail defines the types of details for explain result.
New in version 1.11.0.
-
CHANGELOG_MODE
= 1¶
-
ESTIMATED_COST
= 0¶
-
-
class
pyflink.table.
FunctionContext
(base_metric_group)[source]¶ Bases:
object
Used to obtain global runtime information about the context in which the user-defined function is executed. The information includes the metric group, and global job parameters, etc.
-
class
pyflink.table.
GroupWindowedTable
(java_group_windowed_table, t_env)[source]¶ Bases:
object
A table that has been windowed for
GroupWindow
.-
group_by
(fields)[source]¶ Groups the elements by a mandatory window and one or more optional grouping attributes. The window is specified by referring to its alias.
If no additional grouping attribute is specified and if the input is a streaming table, the aggregation will be performed by a single task, i.e., with parallelism 1.
Aggregations are performed per group and defined by a subsequent
select()
clause similar to SQL SELECT-GROUP-BY query.Example:
>>> tab.window(group_window.alias("w")).group_by("w, key").select("key, value.avg")
- Parameters
fields (str) – Group keys.
- Returns
A window grouped table.
- Return type
-
-
class
pyflink.table.
GroupedTable
(java_table, t_env)[source]¶ Bases:
object
A table that has been grouped on a set of grouping keys.
-
select
(fields)[source]¶ Performs a selection operation on a grouped table. Similar to an SQL SELECT statement. The field expressions can contain complex expressions and aggregations.
Example:
>>> tab.group_by("key").select("key, value.avg + ' The average' as average")
- Parameters
fields (str) – Expression string that contains group keys and aggregate function calls.
- Returns
The result table.
- Return type
-
-
class
pyflink.table.
OverWindowedTable
(java_over_windowed_table, t_env)[source]¶ Bases:
object
A table that has been windowed for
OverWindow
.Unlike group windows, which are specified in the GROUP BY clause, over windows do not collapse rows. Instead over window aggregates compute an aggregate for each input row over a range of its neighboring rows.
-
select
(fields)[source]¶ Performs a selection operation on a over windowed table. Similar to an SQL SELECT statement. The field expressions can contain complex expressions and aggregations.
Example:
>>> over_windowed_table.select("c, b.count over ow, e.sum over ow")
- Parameters
fields (str) – Expression string.
- Returns
The result table.
- Return type
-
-
class
pyflink.table.
ResultKind
[source]¶ Bases:
object
ResultKind defines the types of the result.
The statement (e.g. DDL, USE) executes successfully, and the result only contains a simple “OK”.
The statement (e.g. DML, DQL, SHOW) executes successfully, and the result contains important content.
New in version 1.11.0.
-
SUCCESS
= 0¶
-
SUCCESS_WITH_CONTENT
= 1¶
-
-
class
pyflink.table.
Row
[source]¶ Bases:
tuple
A row in Table. The fields in it can be accessed:
like attributes (
row.key
)like dictionary values (
row[key]
)
key in row
will search through row keys.Row can be used to create a row object by using named arguments, the fields will be sorted by names. It is not allowed to omit a named argument to represent the value is None or missing. This should be explicitly set to None in this case.
>>> row = Row(name="Alice", age=11) >>> row Row(age=11, name='Alice') >>> row['name'], row['age'] ('Alice', 11) >>> row.name, row.age ('Alice', 11) >>> 'name' in row True >>> 'wrong_key' in row False
Row can also be used to create another Row like class, then it could be used to create Row objects, such as
>>> Person = Row("name", "age") >>> Person <Row(name, age)> >>> 'name' in Person True >>> 'wrong_key' in Person False >>> Person("Alice", 11) Row(name='Alice', age=11)
-
as_dict
(recursive=False)[source]¶ Returns as a dict.
Example:
>>> Row(name="Alice", age=11).as_dict() == {'name': 'Alice', 'age': 11} True >>> row = Row(key=1, value=Row(name='a', age=2)) >>> row.as_dict() == {'key': 1, 'value': Row(age=2, name='a')} True >>> row.as_dict(True) == {'key': 1, 'value': {'name': 'a', 'age': 2}} True
- Parameters
recursive – turns the nested Row as dict (default: False).
-
class
pyflink.table.
ScalarFunction
[source]¶ Bases:
pyflink.table.udf.UserDefinedFunction
Base interface for user-defined scalar function. A user-defined scalar functions maps zero, one, or multiple scalar values to a new scalar value.
New in version 1.10.0.
-
class
pyflink.table.
SqlDialect
[source]¶ Bases:
object
Enumeration of valid SQL compatibility modes.
In most of the cases, the built-in compatibility mode should be sufficient. For some features, i.e. the “INSERT INTO T PARTITION(a=’xxx’) …” grammar, you may need to switch to the Hive dialect if required.
We may introduce other SQL dialects in the future.
Flink’s default SQL behavior.
HIVE
:SQL dialect that allows some Apache Hive specific grammar.
Note: We might never support all of the Hive grammar. See the documentation for supported features.
-
DEFAULT
= 0¶
-
HIVE
= 1¶
-
-
class
pyflink.table.
StatementSet
(_j_statement_set, t_env)[source]¶ Bases:
object
A StatementSet accepts DML statements or Tables, the planner can optimize all added statements and Tables together and then submit as one job.
Note
The added statements and Tables will be cleared when calling the execute method.
New in version 1.11.0.
-
add_insert
(target_path, table, overwrite=False)[source]¶ add Table with the given sink table name to the set.
- Parameters
target_path (str) – The path of the registered
TableSink
to which theTable
is written.table (pyflink.table.Table) – The Table to add.
overwrite (bool) – The flag that indicates whether the insert should overwrite existing data or not.
- Returns
current StatementSet instance.
- Return type
New in version 1.11.0.
-
add_insert_sql
(stmt)[source]¶ add insert statement to the set.
- Parameters
stmt (str) – The statement to be added.
- Returns
current StatementSet instance.
- Return type
New in version 1.11.0.
-
execute
()[source]¶ execute all statements and Tables as a batch.
Note
The added statements and Tables will be cleared when executing this method.
- Returns
execution result.
New in version 1.11.0.
-
explain
(*extra_details)[source]¶ returns the AST and the execution plan of all statements and Tables.
- Parameters
extra_details (tuple[ExplainDetail] (variable-length arguments of ExplainDetail)) – The extra explain details which the explain result should include, e.g. estimated cost, changelog mode for streaming
- Returns
All statements and Tables for which the AST and execution plan will be returned.
- Return type
str
New in version 1.11.0.
-
-
class
pyflink.table.
StreamTableEnvironment
(j_tenv)[source]¶ Bases:
pyflink.table.table_environment.TableEnvironment
-
connect
(connector_descriptor)[source]¶ Creates a temporary table from a descriptor.
Descriptors allow for declaring the communication to external systems in an implementation-agnostic way. The classpath is scanned for suitable table factories that match the desired configuration.
The following example shows how to read from a connector using a JSON format and registering a temporary table as “MyTable”:
>>> table_env \ ... .connect(ExternalSystemXYZ() ... .version("0.11")) \ ... .with_format(Json() ... .json_schema("{...}") ... .fail_on_missing_field(False)) \ ... .with_schema(Schema() ... .field("user-name", "VARCHAR") ... .from_origin_field("u_name") ... .field("count", "DECIMAL")) \ ... .create_temporary_table("MyTable")
- Parameters
connector_descriptor (pyflink.table.descriptors.ConnectorDescriptor) – Connector descriptor describing the external system.
- Returns
A
StreamTableDescriptor
used to build the temporary table.- Return type
Note
Deprecated in 1.11. Use
execute_sql()
to register a table instead.
-
static
create
(stream_execution_environment=None, table_config=None, environment_settings=None)[source]¶ Creates a
StreamTableEnvironment
.Example:
# create with StreamExecutionEnvironment. >>> env = StreamExecutionEnvironment.get_execution_environment() >>> table_env = StreamTableEnvironment.create(env) # create with StreamExecutionEnvironment and TableConfig. >>> table_config = TableConfig() >>> table_config.set_null_check(False) >>> table_env = StreamTableEnvironment.create(env, table_config) # create with StreamExecutionEnvironment and EnvironmentSettings. >>> environment_settings = EnvironmentSettings.new_instance().use_blink_planner() \ ... .build() >>> table_env = StreamTableEnvironment.create( ... env, environment_settings=environment_settings) # create with EnvironmentSettings. >>> table_env = StreamTableEnvironment.create(environment_settings=environment_settings)
- Parameters
stream_execution_environment (pyflink.datastream.StreamExecutionEnvironment) – The
StreamExecutionEnvironment
of the TableEnvironment.table_config (pyflink.table.TableConfig) – The configuration of the TableEnvironment, optional.
environment_settings (pyflink.table.EnvironmentSettings) – The environment settings used to instantiate the TableEnvironment. It provides the interfaces about planner selection(flink or blink), optional.
- Returns
The StreamTableEnvironment created from given StreamExecutionEnvironment and configuration.
- Return type
-
-
class
pyflink.table.
Table
(j_table, t_env)[source]¶ Bases:
object
A
Table
is the core component of the Table API. Similar to how the batch and streaming APIs have DataSet and DataStream, the Table API is built aroundTable
.Use the methods of
Table
to transform data.Example:
>>> env = StreamExecutionEnvironment.get_execution_environment() >>> env.set_parallelism(1) >>> t_env = StreamTableEnvironment.create(env) >>> ... >>> t_env.register_table_source("source", ...) >>> t = t_env.scan("source") >>> t.select(...) >>> ... >>> t_env.register_table_sink("result", ...) >>> t.insert_into("result") >>> t_env.execute("table_job")
Operations such as
join()
,select()
,where()
andgroup_by()
take arguments in an expression string. Please refer to the documentation for the expression syntax.-
add_columns
(fields)[source]¶ Adds additional columns. Similar to a SQL SELECT statement. The field expressions can contain complex expressions, but can not contain aggregations. It will throw an exception if the added fields already exist.
Example:
>>> tab.add_columns("a + 1 as a1, concat(b, 'sunny') as b1")
- Parameters
fields (str) – Column list string.
- Returns
The result table.
- Return type
-
add_or_replace_columns
(fields)[source]¶ Adds additional columns. Similar to a SQL SELECT statement. The field expressions can contain complex expressions, but can not contain aggregations. Existing fields will be replaced if add columns name is the same as the existing column name. Moreover, if the added fields have duplicate field name, then the last one is used.
Example:
>>> tab.add_or_replace_columns("a + 1 as a1, concat(b, 'sunny') as b1")
- Parameters
fields (str) – Column list string.
- Returns
The result table.
- Return type
-
alias
(field, *fields)[source]¶ Renames the fields of the expression result. Use this to disambiguate fields before joining to operations.
Example:
>>> tab.alias("a", "b")
- Parameters
field (str) – Field alias.
fields (str) – Additional field aliases.
- Returns
The result table.
- Return type
-
distinct
()[source]¶ Removes duplicate values and returns only distinct (different) values.
Example:
>>> tab.select("key, value").distinct()
- Returns
The result table.
- Return type
-
drop_columns
(fields)[source]¶ Drops existing columns. The field expressions should be field reference expressions.
Example:
>>> tab.drop_columns("a, b")
- Parameters
fields (str) – Column list string.
- Returns
The result table.
- Return type
-
execute
()[source]¶ Collects the contents of the current table local client.
Example:
>>> tab.execute()
- Returns
The content of the table.
New in version 1.11.0.
-
execute_insert
(table_path, overwrite=False)[source]¶ Writes the
Table
to aTableSink
that was registered under the specified name, and then execute the insert operation. For the path resolution algorithm seeuse_database()
.Example:
>>> tab.execute_insert("sink")
- Parameters
- Returns
The table result.
New in version 1.11.0.
-
explain
(*extra_details)[source]¶ Returns the AST of this table and the execution plan.
- Parameters
extra_details (tuple[ExplainDetail] (variable-length arguments of ExplainDetail)) – The extra explain details which the explain result should include, e.g. estimated cost, changelog mode for streaming
- Returns
The statement for which the AST and execution plan will be returned.
- Return type
str
New in version 1.11.0.
-
fetch
(fetch)[source]¶ Limits a sorted result to the first n rows. Similar to a SQL FETCH clause. Fetch is technically part of the Order By operator and thus must be preceded by it.
offset()
can be combined with a precedingfetch()
call to return n rows after skipping the first o rows.Example:
Returns the first 3 records.
>>> tab.order_by("name.desc").fetch(3)
Skips the first 10 rows and returns the next 5 rows.
>>> tab.order_by("name.desc").offset(10).fetch(5)
- Parameters
fetch (int) – The number of records to return. Fetch must be >= 0.
- Returns
The result table.
- Return type
-
filter
(predicate)[source]¶ Filters out elements that don’t pass the filter predicate. Similar to a SQL WHERE clause.
Example:
>>> tab.filter("name = 'Fred'")
- Parameters
predicate (str) – Predicate expression string.
- Returns
The result table.
- Return type
-
full_outer_join
(right, join_predicate)[source]¶ Joins two
Table
. Similar to a SQL full outer join. The fields of the two joined operations must not overlap, usealias()
to rename fields if necessary.Note
Both tables must be bound to the same
TableEnvironment
and itsTableConfig
must have null check enabled (default).Example:
>>> left.full_outer_join(right, "a = b").select("a, b, d")
- Parameters
right (pyflink.table.Table) – Right table.
join_predicate (str) – The join predicate expression string.
- Returns
The result table.
- Return type
-
get_schema
()[source]¶ Returns the
TableSchema
of this table.- Returns
The schema of this table.
- Return type
-
group_by
(fields)[source]¶ Groups the elements on some grouping keys. Use this before a selection with aggregations to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
Example:
>>> tab.group_by("key").select("key, value.avg")
- Parameters
fields (str) – Group keys.
- Returns
The grouped table.
- Return type
-
insert_into
(table_path)[source]¶ Writes the
Table
to aTableSink
that was registered under the specified name. For the path resolution algorithm seeuse_database()
.Example:
>>> tab.insert_into("sink")
Note
Deprecated in 1.11. Use
execute_insert()
for single sink, useTableTableEnvironment`#:func:`create_statement_set
for multiple sinks.
-
intersect
(right)[source]¶ Intersects two
Table
with duplicate records removed. Intersect returns records that exist in both tables. If a record is present in one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Similar to a SQL INTERSECT. The fields of the two intersect operations must fully overlap.Note
Both tables must be bound to the same
TableEnvironment
.Example:
>>> left.intersect(right)
- Parameters
right (pyflink.table.Table) – Right table.
- Returns
The result table.
- Return type
-
intersect_all
(right)[source]¶ Intersects two
Table
. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Similar to an SQL INTERSECT ALL. The fields of the two intersect operations must fully overlap.Note
Both tables must be bound to the same
TableEnvironment
.Example:
>>> left.intersect_all(right)
- Parameters
right (pyflink.table.Table) – Right table.
- Returns
The result table.
- Return type
-
join
(right, join_predicate=None)[source]¶ Joins two
Table
. Similar to a SQL join. The fields of the two joined operations must not overlap, usealias()
to rename fields if necessary. You can use where and select clauses after a join to further specify the behaviour of the join.Note
Both tables must be bound to the same
TableEnvironment
.Example:
>>> left.join(right).where("a = b && c > 3").select("a, b, d") >>> left.join(right, "a = b")
- Parameters
right (pyflink.table.Table) – Right table.
join_predicate (str) – Optional, the join predicate expression string.
- Returns
The result table.
- Return type
-
join_lateral
(table_function_call, join_predicate=None)[source]¶ Joins this Table with an user-defined TableFunction. This join is similar to a SQL inner join but works with a table function. Each row of the table is joined with the rows produced by the table function.
Example:
>>> t_env.register_java_function("split", "java.table.function.class.name") >>> tab.join_lateral("split(text, ' ') as (b)", "a = b")
- Parameters
table_function_call (str) – An expression representing a table function call.
join_predicate (str) – Optional, The join predicate expression string, join ON TRUE if not exist.
- Returns
The result Table.
- Return type
-
left_outer_join
(right, join_predicate=None)[source]¶ Joins two
Table
. Similar to a SQL left outer join. The fields of the two joined operations must not overlap, usealias()
to rename fields if necessary.Note
Both tables must be bound to the same
TableEnvironment
and itsTableConfig
must have null check enabled (default).Example:
>>> left.left_outer_join(right).select("a, b, d") >>> left.left_outer_join(right, "a = b").select("a, b, d")
- Parameters
right (pyflink.table.Table) – Right table.
join_predicate (str) – Optional, the join predicate expression string.
- Returns
The result table.
- Return type
-
left_outer_join_lateral
(table_function_call, join_predicate=None)[source]¶ Joins this Table with an user-defined TableFunction. This join is similar to a SQL left outer join but works with a table function. Each row of the table is joined with all rows produced by the table function. If the join does not produce any row, the outer row is padded with nulls.
Example:
>>> t_env.register_java_function("split", "java.table.function.class.name") >>> tab.left_outer_join_lateral("split(text, ' ') as (b)")
- Parameters
table_function_call (str) – An expression representing a table function call.
join_predicate (str) – Optional, The join predicate expression string, join ON TRUE if not exist.
- Returns
The result Table.
- Return type
-
minus
(right)[source]¶ Minus of two
Table
with duplicate records removed. Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.Note
Both tables must be bound to the same
TableEnvironment
.Example:
>>> left.minus(right)
- Parameters
right (pyflink.table.Table) – Right table.
- Returns
The result table.
- Return type
-
minus_all
(right)[source]¶ Minus of two
Table
. Similar to a SQL EXCEPT ALL. Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.Note
Both tables must be bound to the same
TableEnvironment
.Example:
>>> left.minus_all(right)
- Parameters
right (pyflink.table.Table) – Right table.
- Returns
The result table.
- Return type
-
offset
(offset)[source]¶ Limits a sorted result from an offset position. Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and thus must be preceded by it.
offset()
can be combined with a subsequentfetch()
call to return n rows after skipping the first o rows.Example:
# skips the first 3 rows and returns all following rows. >>> tab.order_by("name.desc").offset(3) # skips the first 10 rows and returns the next 5 rows. >>> tab.order_by("name.desc").offset(10).fetch(5)
- Parameters
offset (int) – Number of records to skip.
- Returns
The result table.
- Return type
-
order_by
(fields)[source]¶ Sorts the given
Table
. Similar to SQL ORDER BY. The resulting Table is sorted globally sorted across all parallel partitions.Example:
>>> tab.order_by("name.desc")
- Parameters
fields (str) – Order fields expression string.
- Returns
The result table.
- Return type
-
over_window
(*over_windows)[source]¶ Defines over-windows on the records of a table.
An over-window defines for each record an interval of records over which aggregation functions can be computed.
Example:
>>> table.window(Over.partition_by("c").order_by("rowTime") \ ... .preceding("10.seconds").alias("ow")) \ ... .select("c, b.count over ow, e.sum over ow")
Note
Computing over window aggregates on a streaming table is only a parallel operation if the window is partitioned. Otherwise, the whole stream will be processed by a single task, i.e., with parallelism 1.
Note
Over-windows for batch tables are currently not supported.
- Parameters
over_windows (pyflink.table.window.OverWindow) – over windows created from
Over
.- Returns
A over windowed table.
- Return type
-
rename_columns
(fields)[source]¶ Renames existing columns. Similar to a field alias statement. The field expressions should be alias expressions, and only the existing fields can be renamed.
Example:
>>> tab.rename_columns("a as a1, b as b1")
- Parameters
fields (str) – Column list string.
- Returns
The result table.
- Return type
-
right_outer_join
(right, join_predicate)[source]¶ Joins two
Table
. Similar to a SQL right outer join. The fields of the two joined operations must not overlap, usealias()
to rename fields if necessary.Note
Both tables must be bound to the same
TableEnvironment
and itsTableConfig
must have null check enabled (default).Example:
>>> left.right_outer_join(right, "a = b").select("a, b, d")
- Parameters
right (pyflink.table.Table) – Right table.
join_predicate (str) – The join predicate expression string.
- Returns
The result table.
- Return type
-
select
(fields)[source]¶ Performs a selection operation. Similar to a SQL SELECT statement. The field expressions can contain complex expressions.
Example:
>>> tab.select("key, value + 'hello'")
- Parameters
fields (str) – Expression string.
- Returns
The result table.
- Return type
-
to_pandas
()[source]¶ Converts the table to a pandas DataFrame.
Example:
>>> pdf = pd.DataFrame(np.random.rand(1000, 2)) >>> table = table_env.from_pandas(pdf, ["a", "b"]) >>> table.filter("a > 0.5").to_pandas()
- Returns
the result pandas DataFrame.
New in version 1.11.0.
-
union
(right)[source]¶ Unions two
Table
with duplicate records removed. Similar to a SQL UNION. The fields of the two union operations must fully overlap.Note
Both tables must be bound to the same
TableEnvironment
.Example:
>>> left.union(right)
- Parameters
right (pyflink.table.Table) – Right table.
- Returns
The result table.
- Return type
-
union_all
(right)[source]¶ Unions two
Table
. Similar to a SQL UNION ALL. The fields of the two union operations must fully overlap.Note
Both tables must be bound to the same
TableEnvironment
.Example:
>>> left.union_all(right)
- Parameters
right (pyflink.table.Table) – Right table.
- Returns
The result table.
- Return type
-
where
(predicate)[source]¶ Filters out elements that don’t pass the filter predicate. Similar to a SQL WHERE clause.
Example:
>>> tab.where("name = 'Fred'")
- Parameters
predicate (str) – Predicate expression string.
- Returns
The result table.
- Return type
-
window
(window)[source]¶ Defines group window on the records of a table.
A group window groups the records of a table by assigning them to windows defined by a time or row interval.
For streaming tables of infinite size, grouping into windows is required to define finite groups on which group-based aggregates can be computed.
For batch tables of finite size, windowing essentially provides shortcuts for time-based groupBy.
Note
Computing windowed aggregates on a streaming table is only a parallel operation if additional grouping attributes are added to the
group_by()
clause. If thegroup_by()
only references a GroupWindow alias, the streamed table will be processed by a single task, i.e., with parallelism 1.Example:
>>> tab.window(Tumble.over("10.minutes").on("rowtime").alias("w")) \ ... .group_by("w") \ ... .select("a.sum as a, w.start as b, w.end as c, w.rowtime as d")
- Parameters
window (pyflink.table.window.GroupWindow) – A
GroupWindow
created fromTumble
,Session
orSlide
.- Returns
A group windowed table.
- Return type
-
-
class
pyflink.table.
TableConfig
(j_table_config=None)[source]¶ Bases:
object
Configuration for the current
TableEnvironment
session to adjust Table & SQL API programs.For common or important configuration options, this class provides getters and setters methods with detailed inline documentation.
For more advanced configuration, users can directly access the underlying key-value map via
get_configuration()
. Currently, key-value options are only supported for the Blink planner.Note
Because options are read at different point in time when performing operations, it is recommended to set configuration options early after instantiating a table environment.
-
add_configuration
(configuration)[source]¶ Adds the given key-value configuration to the underlying configuration. It overwrites existing keys.
- Parameters
configuration (Configuration) – Key-value configuration to be added.
-
get_configuration
()[source]¶ Gives direct access to the underlying key-value map for advanced configuration.
- Returns
Entire key-value configuration.
- Return type
-
get_decimal_context
()[source]¶ Returns current context for decimal division calculation, (precision=34, rounding_mode=HALF_EVEN) by default.
See also
- Returns
the current context for decimal division calculation.
- Return type
(int, str)
-
get_local_timezone
()[source]¶ Returns the local timezone id for timestamp with local time zone, either an abbreviation such as “PST”, a full name such as “America/Los_Angeles”, or a custom timezone_id such as “GMT-8:00”.
-
get_max_generated_code_length
()[source]¶ The current threshold where generated code will be split into sub-function calls. Java has a maximum method length of 64 KB. This setting allows for finer granularity if necessary. Default is 64000.
-
get_max_idle_state_retention_time
()[source]¶ State will be cleared and removed if it was not updated for the defined period of time.
- Returns
The maximum time until state which was not updated will be retained.
- Return type
int
-
get_min_idle_state_retention_time
()[source]¶ State might be cleared and removed if it was not updated for the defined period of time.
- Returns
The minimum time until state which was not updated will be retained.
- Return type
int
-
get_null_check
()[source]¶ A boolean value, “True” enables NULL check and “False” disables NULL check.
-
get_python_executable
()[source]¶ Gets the path of the python interpreter which is used to execute the python udf workers. If no path is specified before, it will return a None value.
- Returns
The path of the python interpreter which is used to execute the python udf workers.
- Return type
str
New in version 1.10.0.
-
set_decimal_context
(precision, rounding_mode)[source]¶ Sets the default context for decimal division calculation. (precision=34, rounding_mode=HALF_EVEN) by default.
The precision is the number of digits to be used for an operation. A value of 0 indicates that unlimited precision (as many digits as are required) will be used. Note that leading zeros (in the coefficient of a number) are never significant.
The rounding mode is the rounding algorithm to be used for an operation. It could be:
UP, DOWN, CEILING, FLOOR, HALF_UP, HALF_DOWN, HALF_EVEN, UNNECESSARY
The table below shows the results of rounding input to one digit with the given rounding mode:
Input
UP
DOWN
CEILING
FLOOR
HALF_UP
HALF_DOWN
HALF_EVEN
UNNECESSARY
5.5
6
5
6
5
6
5
6
Exception
2.5
3
2
3
2
3
2
2
Exception
1.6
2
1
2
1
2
2
2
Exception
1.1
2
1
2
1
1
1
1
Exception
1.0
1
1
1
1
1
1
1
1
-1.0
-1
-1
-1
-1
-1
-1
-1
-1
-1.1
-2
-1
-1
-2
-1
-1
-1
Exception
-1.6
-2
-1
-1
-2
-2
-2
-2
Exception
2.5
-3
-2
-2
-3
-3
-2
-2
Exception
5.5
-6
-5
-5
-6
-6
-5
-6
Exception
- Parameters
precision (int) – The precision of the decimal context.
rounding_mode (str) – The rounding mode of the decimal context.
-
set_idle_state_retention_time
(min_time, max_time)[source]¶ Specifies a minimum and a maximum time interval for how long idle state, i.e., state which was not updated, will be retained.
State will never be cleared until it was idle for less than the minimum time and will never be kept if it was idle for more than the maximum time.
When new data arrives for previously cleaned-up state, the new data will be handled as if it was the first data. This can result in previous results being overwritten.
Set to 0 (zero) to never clean-up the state.
Example:
>>> table_config = TableConfig() \ ... .set_idle_state_retention_time(datetime.timedelta(days=1), ... datetime.timedelta(days=3))
Note
Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of minTime and maxTime. The difference between minTime and maxTime must be at least 5 minutes.
- Parameters
min_time (datetime.timedelta) – The minimum time interval for which idle state is retained. Set to 0 (zero) to never clean-up the state.
max_time (datetime.timedelta) – The maximum time interval for which idle state is retained. Must be at least 5 minutes greater than minTime. Set to 0 (zero) to never clean-up the state.
-
set_local_timezone
(timezone_id)[source]¶ Sets the local timezone id for timestamp with local time zone.
- Parameters
timezone_id – The timezone id, either an abbreviation such as “PST”, a full name such as “America/Los_Angeles”, or a custom timezone_id such as “GMT-8:00”.
-
set_max_generated_code_length
(max_generated_code_length)[source]¶ Returns the current threshold where generated code will be split into sub-function calls. Java has a maximum method length of 64 KB. This setting allows for finer granularity if necessary. Default is 64000.
-
set_null_check
(null_check)[source]¶ Sets the NULL check. If enabled, all fields need to be checked for NULL first.
-
set_python_executable
(python_exec)[source]¶ Sets the path of the python interpreter which is used to execute the python udf workers.
e.g. “/usr/local/bin/python3”.
If python UDF depends on a specific python version which does not exist in the cluster, the method
pyflink.table.TableEnvironment.add_python_archive()
can be used to upload a virtual environment. The path of the python interpreter contained in the uploaded environment can be specified via this method.Example:
# command executed in shell # assume that the relative path of python interpreter is py_env/bin/python $ zip -r py_env.zip py_env # python code >>> table_env.add_python_archive("py_env.zip") >>> table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")
Note
Please make sure the uploaded python environment matches the platform that the cluster is running on and that the python version must be 3.5 or higher.
Note
The python udf worker depends on Apache Beam (version == 2.19.0). Please ensure that the specified environment meets the above requirements.
- Parameters
python_exec (str) – The path of python interpreter.
New in version 1.10.0.
-
set_sql_dialect
(sql_dialect)[source]¶ Sets the current SQL dialect to parse a SQL query. Flink’s SQL behavior by default.
- Parameters
sql_dialect (SqlDialect) – The given SQL dialect.
-
-
class
pyflink.table.
TableEnvironment
(j_tenv, serializer=PickleSerializer())[source]¶ Bases:
object
A table environment is the base class, entry point, and central context for creating Table and SQL API programs.
It is unified for bounded and unbounded data processing.
A table environment is responsible for:
Connecting to external systems.
Registering and retrieving
Table
and other meta objects from a catalog.Executing SQL statements.
Offering further configuration options.
The path in methods such as
create_temporary_view()
should be a proper SQL identifier. The syntax is following [[catalog-name.]database-name.]object-name, where the catalog name and database are optional. For path resolution seeuse_catalog()
anduse_database()
. All keywords or other special characters need to be escaped.Example: cat.1.`db`.`Table` resolves to an object named ‘Table’ (table is a reserved keyword, thus must be escaped) in a catalog named ‘cat.1’ and database named ‘db’.
Note
This environment is meant for pure table programs. If you would like to convert from or to other Flink APIs, it might be necessary to use one of the available language-specific table environments in the corresponding bridging modules.
-
add_python_archive
(archive_path, target_dir=None)[source]¶ Adds a python archive file. The file will be extracted to the working directory of python UDF worker.
If the parameter “target_dir” is specified, the archive file will be extracted to a directory named ${target_dir}. Otherwise, the archive file will be extracted to a directory with the same name of the archive file.
If python UDF depends on a specific python version which does not exist in the cluster, this method can be used to upload the virtual environment. Note that the path of the python interpreter contained in the uploaded environment should be specified via the method
pyflink.table.TableConfig.set_python_executable()
.The files uploaded via this method are also accessible in UDFs via relative path.
Example:
# command executed in shell # assert the relative path of python interpreter is py_env/bin/python $ zip -r py_env.zip py_env # python code >>> table_env.add_python_archive("py_env.zip") >>> table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python") # or >>> table_env.add_python_archive("py_env.zip", "myenv") >>> table_env.get_config().set_python_executable("myenv/py_env/bin/python") # the files contained in the archive file can be accessed in UDF >>> def my_udf(): ... with open("myenv/py_env/data/data.txt") as f: ... ...
Note
Please make sure the uploaded python environment matches the platform that the cluster is running on and that the python version must be 3.5 or higher.
Note
Currently only zip-format is supported. i.e. zip, jar, whl, egg, etc. The other archive formats such as tar, tar.gz, 7z, rar, etc are not supported.
- Parameters
archive_path (str) – The archive file path.
target_dir (str) – Optional, the target dir name that the archive file extracted to.
New in version 1.10.0.
-
add_python_file
(file_path)[source]¶ Adds a python dependency which could be python files, python packages or local directories. They will be added to the PYTHONPATH of the python UDF worker. Please make sure that these dependencies can be imported.
- Parameters
file_path (str) – The path of the python dependency.
New in version 1.10.0.
-
abstract
connect
(connector_descriptor)[source]¶ Creates a temporary table from a descriptor.
Descriptors allow for declaring the communication to external systems in an implementation-agnostic way. The classpath is scanned for suitable table factories that match the desired configuration.
The following example shows how to read from a connector using a JSON format and registering a temporary table as “MyTable”:
Example:
>>> table_env \ ... .connect(ExternalSystemXYZ() ... .version("0.11")) \ ... .with_format(Json() ... .json_schema("{...}") ... .fail_on_missing_field(False)) \ ... .with_schema(Schema() ... .field("user-name", "VARCHAR") ... .from_origin_field("u_name") ... .field("count", "DECIMAL")) \ ... .create_temporary_table("MyTable")
- Parameters
connector_descriptor (pyflink.table.descriptors.ConnectorDescriptor) – Connector descriptor describing the external system.
- Returns
A
ConnectTableDescriptor
used to build the temporary table.- Return type
Note
Deprecated in 1.11. Use
execute_sql()
to register a table instead.
-
create_statement_set
()[source]¶ Create a StatementSet instance which accepts DML statements or Tables, the planner can optimize all added statements and Tables together and then submit as one job.
:return statement_set instance :rtype: pyflink.table.StatementSet
New in version 1.11.0.
-
create_temporary_view
(view_path, table)[source]¶ Registers a
Table
API object as a temporary view similar to SQL temporary views.Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.
- Parameters
view_path (str) – The path under which the view will be registered. See also the
TableEnvironment
class description for the format of the path.table (pyflink.table.Table) – The view to register.
New in version 1.10.0.
-
drop_temporary_table
(table_path)[source]¶ Drops a temporary table registered in the given path.
If a permanent table with a given path exists, it will be used from now on for any queries that reference this path.
- Parameters
table_path (str) – The path of the registered temporary table.
- Returns
True if a table existed in the given path and was removed.
- Return type
bool
New in version 1.10.0.
-
drop_temporary_view
(view_path)[source]¶ Drops a temporary view registered in the given path.
If a permanent table or view with a given path exists, it will be used from now on for any queries that reference this path.
- Returns
True if a view existed in the given path and was removed.
- Return type
bool
New in version 1.10.0.
-
execute
(job_name)[source]¶ Triggers the program execution. The environment will execute all parts of the program.
The program execution will be logged and displayed with the provided name.
Note
It is highly advised to set all parameters in the
TableConfig
on the very beginning of the program. It is undefined what configurations values will be used for the execution if queries are mixed with config changes. It depends on the characteristic of the particular parameter. For some of them the value from the point in time of query construction (e.g. the current catalog) will be used. On the other hand some values might be evaluated according to the state from the time when this method is called (e.g. timezone).- Parameters
job_name (str) – Desired name of the job.
- Returns
The result of the job execution, containing elapsed time and accumulators.
Note
Deprecated in 1.11. Use
execute_sql()
for single sink, usecreate_statement_set()
for multiple sinks.
-
execute_sql
(stmt)[source]¶ Execute the given single statement, and return the execution result.
The statement can be DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE. For DML and DQL, this method returns TableResult once the job has been submitted. For DDL and DCL statements, TableResult is returned once the operation has finished.
- :return content for DQL/SHOW/DESCRIBE/EXPLAIN,
the affected row count for DML (-1 means unknown), or a string message (“OK”) for other statements.
New in version 1.11.0.
-
explain
(table=None, extended=False)[source]¶ Returns the AST of the specified Table API and SQL queries and the execution plan to compute the result of the given
Table
or multi-sinks plan.- Parameters
table (pyflink.table.Table) – The table to be explained. If table is None, explain for multi-sinks plan, else for given table.
extended (bool) – If the plan should contain additional properties. e.g. estimated cost, traits
- Returns
The table for which the AST and execution plan will be returned.
- Return type
str
Note
Deprecated in 1.11. Use
Table`#:func:`explain
instead.
-
explain_sql
(stmt, *extra_details)[source]¶ Returns the AST of the specified statement and the execution plan.
- Parameters
stmt (str) – The statement for which the AST and execution plan will be returned.
extra_details (tuple[ExplainDetail] (variable-length arguments of ExplainDetail)) – The extra explain details which the explain result should include, e.g. estimated cost, changelog mode for streaming
- Returns
The statement for which the AST and execution plan will be returned.
- Return type
str
New in version 1.11.0.
-
from_elements
(elements, schema=None, verify_schema=True)[source]¶ Creates a table from a collection of elements. The elements types must be acceptable atomic types or acceptable composite types. All elements must be of the same type. If the elements types are composite types, the composite types must be strictly equal, and its subtypes must also be acceptable types. e.g. if the elements are tuples, the length of the tuples must be equal, the element types of the tuples must be equal in order.
The built-in acceptable atomic element types contains:
int, long, str, unicode, bool, float, bytearray, datetime.date, datetime.time, datetime.datetime, datetime.timedelta, decimal.Decimal
The built-in acceptable composite element types contains:
list, tuple, dict, array,
Row
If the element type is a composite type, it will be unboxed. e.g. table_env.from_elements([(1, ‘Hi’), (2, ‘Hello’)]) will return a table like:
_1
_2
1
Hi
2
Hello
“_1” and “_2” are generated field names.
Example:
# use the second parameter to specify custom field names >>> table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b']) # use the second parameter to specify custom table schema >>> table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ... DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()), ... DataTypes.FIELD("b", DataTypes.STRING())])) # use the thrid parameter to switch whether to verify the elements against the schema >>> table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ... DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()), ... DataTypes.FIELD("b", DataTypes.STRING())]), ... False)
- Parameters
elements (list) – The elements to create a table from.
schema (pyflink.table.types.DataType or list[str]) – The schema of the table.
verify_schema (bool) – Whether to verify the elements against the schema.
- Returns
The result table.
- Return type
-
from_pandas
(pdf, schema: Union[pyflink.table.types.RowType, List[str], Tuple[str], List[pyflink.table.types.DataType], Tuple[pyflink.table.types.DataType]] = None, splits_num: int = 1) → pyflink.table.table.Table[source]¶ Creates a table from a pandas DataFrame.
Example:
>>> pdf = pd.DataFrame(np.random.rand(1000, 2)) # use the second parameter to specify custom field names >>> table_env.from_pandas(pdf, ["a", "b"]) # use the second parameter to specify custom field types >>> table_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()])) # use the second parameter to specify custom table schema >>> table_env.from_pandas(pdf, ... DataTypes.ROW([DataTypes.FIELD("a", DataTypes.DOUBLE()), ... DataTypes.FIELD("b", DataTypes.DOUBLE())]))
- Parameters
pdf – The pandas DataFrame.
schema – The schema of the converted table.
splits_num – The number of splits the given Pandas DataFrame will be split into. It determines the number of parallel source tasks. If not specified, the default parallelism will be used.
- Returns
The result table.
New in version 1.11.0.
-
from_path
(path)[source]¶ Reads a registered table and returns the resulting
Table
.A table to scan must be registered in the
TableEnvironment
.See the documentation of
use_database()
oruse_catalog()
for the rules on the path resolution.Examples:
Reading a table from default catalog and database.
>>> tab = table_env.from_path("tableName")
Reading a table from a registered catalog.
>>> tab = table_env.from_path("catalogName.dbName.tableName")
Reading a table from a registered catalog with escaping. (Table is a reserved keyword). Dots in e.g. a database name also must be escaped.
>>> tab = table_env.from_path("catalogName.`db.Name`.`Table`")
- Parameters
path (str) – The path of a table API object to scan.
- Returns
Either a table or virtual table (=view).
- Return type
See also
See also
New in version 1.10.0.
-
from_table_source
(table_source)[source]¶ Creates a table from a table source.
Example:
>>> csv_table_source = CsvTableSource( ... csv_file_path, ['a', 'b'], [DataTypes.STRING(), DataTypes.BIGINT()]) >>> table_env.from_table_source(csv_table_source)
- Parameters
table_source (pyflink.table.TableSource) – The table source used as table.
- Returns
The result table.
- Return type
-
get_catalog
(catalog_name)[source]¶ Gets a registered
Catalog
by name.- Parameters
catalog_name (str) – The name to look up the
Catalog
.- Returns
The requested catalog, None if there is no registered catalog with given name.
- Return type
-
abstract
get_config
()[source]¶ Returns the table config to define the runtime behavior of the Table API.
- Returns
Current table config.
- Return type
-
get_current_catalog
()[source]¶ Gets the current default catalog name of the current session.
- Returns
The current default catalog name that is used for the path resolution.
- Return type
str
See also
-
get_current_database
()[source]¶ Gets the current default database name of the running session.
- Returns
The name of the current database of the current catalog.
- Return type
str
See also
-
insert_into
(target_path, table)[source]¶ Instructs to write the content of a
Table
API object into a table.See the documentation of
use_database()
oruse_catalog()
for the rules on the path resolution.Example:
>>> tab = table_env.scan("tableName") >>> table_env.insert_into("sink", tab)
- Parameters
target_path (str) – The path of the registered
TableSink
to which theTable
is written.table (pyflink.table.Table) – The Table to write to the sink.
Changed in version 1.10.0: The signature is changed, e.g. the parameter table_path_continued was removed and the parameter target_path is moved before the parameter table.
Note
Deprecated in 1.11. Use
execute_insert()
for single sink, usecreate_statement_set()
for multiple sinks.
-
list_catalogs
()[source]¶ Gets the names of all catalogs registered in this environment.
- Returns
List of catalog names.
- Return type
list[str]
-
list_databases
()[source]¶ Gets the names of all databases in the current catalog.
- Returns
List of database names in the current catalog.
- Return type
list[str]
-
list_functions
()[source]¶ Gets the names of all functions in this environment.
- Returns
List of the names of all functions in this environment.
- Return type
list[str]
New in version 1.10.0.
-
list_modules
()[source]¶ Gets the names of all modules registered in this environment.
- Returns
List of module names.
- Return type
list[str]
New in version 1.10.0.
-
list_tables
()[source]¶ Gets the names of all tables and views in the current database of the current catalog. It returns both temporary and permanent tables and views.
- Returns
List of table and view names in the current database of the current catalog.
- Return type
list[str]
-
list_temporary_tables
()[source]¶ Gets the names of all temporary tables and views available in the current namespace (the current database of the current catalog).
- Returns
A list of the names of all registered temporary tables and views in the current database of the current catalog.
- Return type
list[str]
See also
New in version 1.10.0.
-
list_temporary_views
()[source]¶ Gets the names of all temporary views available in the current namespace (the current database of the current catalog).
- Returns
A list of the names of all registered temporary views in the current database of the current catalog.
- Return type
list[str]
See also
New in version 1.10.0.
-
list_user_defined_functions
()[source]¶ Gets the names of all user defined functions registered in this environment.
- Returns
List of the names of all user defined functions registered in this environment.
- Return type
list[str]
-
list_views
()[source]¶ Gets the names of all views in the current database of the current catalog. It returns both temporary and permanent views.
- Returns
List of view names in the current database of the current catalog.
- Return type
list[str]
New in version 1.11.0.
-
register_catalog
(catalog_name, catalog)[source]¶ Registers a
Catalog
under a unique name. All tables registered in theCatalog
can be accessed.- Parameters
catalog_name (str) – The name under which the catalog will be registered.
catalog (pyflink.table.catalog.Catalog) – The catalog to register.
-
register_function
(name, function)[source]¶ Registers a python user-defined function under a unique name. Replaces already existing user-defined function under this name.
Example:
>>> table_env.register_function( ... "add_one", udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT())) >>> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], ... result_type=DataTypes.BIGINT()) ... def add(i, j): ... return i + j >>> table_env.register_function("add", add) >>> class SubtractOne(ScalarFunction): ... def eval(self, i): ... return i - 1 >>> table_env.register_function( ... "subtract_one", udf(SubtractOne(), DataTypes.BIGINT(), DataTypes.BIGINT()))
- Parameters
name (str) – The name under which the function is registered.
function (pyflink.table.udf.UserDefinedFunctionWrapper) – The python user-defined function to register.
New in version 1.10.0.
-
register_java_function
(name, function_class_name)[source]¶ Registers a java user defined function under a unique name. Replaces already existing user-defined functions under this name. The acceptable function type contains ScalarFunction, TableFunction and AggregateFunction.
Example:
>>> table_env.register_java_function("func1", "java.user.defined.function.class.name")
- Parameters
name (str) – The name under which the function is registered.
function_class_name (str) – The java full qualified class name of the function to register. The function must have a public no-argument constructor and can be founded in current Java classloader.
-
register_table
(name, table)[source]¶ Registers a
Table
under a unique name in the TableEnvironment’s catalog. Registered tables can be referenced in SQL queries.Example:
>>> tab = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b']) >>> table_env.register_table("source", tab)
- Parameters
name (str) – The name under which the table will be registered.
table (pyflink.table.Table) – The table to register.
Note
Deprecated in 1.10. Use
create_temporary_view()
instead.
-
register_table_sink
(name, table_sink)[source]¶ Registers an external
TableSink
with given field names and types in thisTableEnvironment
’s catalog. Registered sink tables can be referenced in SQL DML statements.Example:
>>> table_env.register_table_sink("sink", ... CsvTableSink(["a", "b"], ... [DataTypes.INT(), ... DataTypes.STRING()], ... "./2.csv"))
- Parameters
name (str) – The name under which the table sink is registered.
table_sink (pyflink.table.TableSink) – The table sink to register.
Note
Deprecated in 1.10. Use
execute_sql()
instead.
-
register_table_source
(name, table_source)[source]¶ Registers an external
TableSource
in thisTableEnvironment
’s catalog. Registered tables can be referenced in SQL queries.Example:
>>> table_env.register_table_source("source", ... CsvTableSource("./1.csv", ... ["a", "b"], ... [DataTypes.INT(), ... DataTypes.STRING()]))
- Parameters
name (str) – The name under which the table source is registered.
table_source (pyflink.table.TableSource) – The table source to register.
Note
Deprecated in 1.10. Use
execute_sql()
instead.
-
scan
(*table_path)[source]¶ Scans a registered table and returns the resulting
Table
. A table to scan must be registered in the TableEnvironment. It can be either directly registered or be an external member of aCatalog
.See the documentation of
use_database()
oruse_catalog()
for the rules on the path resolution.Examples:
Scanning a directly registered table
>>> tab = table_env.scan("tableName")
Scanning a table from a registered catalog
>>> tab = table_env.scan("catalogName", "dbName", "tableName")
- Parameters
table_path (str) – The path of the table to scan.
- Throws
Exception if no table is found using the given table path.
- Returns
The resulting table.
- Return type
Note
Deprecated in 1.10. Use
from_path()
instead.
-
set_python_requirements
(requirements_file_path, requirements_cache_dir=None)[source]¶ Specifies a requirements.txt file which defines the third-party dependencies. These dependencies will be installed to a temporary directory and added to the PYTHONPATH of the python UDF worker.
For the dependencies which could not be accessed in the cluster, a directory which contains the installation packages of these dependencies could be specified using the parameter “requirements_cached_dir”. It will be uploaded to the cluster to support offline installation.
Example:
# commands executed in shell $ echo numpy==1.16.5 > requirements.txt $ pip download -d cached_dir -r requirements.txt --no-binary :all: # python code >>> table_env.set_python_requirements("requirements.txt", "cached_dir")
Note
Please make sure the installation packages matches the platform of the cluster and the python version used. These packages will be installed using pip, so also make sure the version of Pip (version >= 7.1.0) and the version of SetupTools (version >= 37.0.0).
- Parameters
requirements_file_path (str) – The path of “requirements.txt” file.
requirements_cache_dir (str) – The path of the local directory which contains the installation packages.
New in version 1.10.0.
-
sql_query
(query)[source]¶ Evaluates a SQL query on registered tables and retrieves the result as a
Table
.All tables referenced by the query must be registered in the TableEnvironment.
A
Table
is automatically registered when its__str__()
method is called, for example when it is embedded into a String.Hence, SQL queries can directly reference a
Table
as follows:>>> table = ... # the table is not registered to the table environment >>> table_env.sql_query("SELECT * FROM %s" % table)
- Parameters
query (str) – The sql query string.
- Returns
The result table.
- Return type
-
sql_update
(stmt)[source]¶ Evaluates a SQL statement such as INSERT, UPDATE or DELETE or a DDL statement
Note
Currently only SQL INSERT statements and CREATE TABLE statements are supported.
All tables referenced by the query must be registered in the TableEnvironment. A
Table
is automatically registered when its__str__()
method is called, for example when it is embedded into a String. Hence, SQL queries can directly reference aTable
as follows:# register the table sink into which the result is inserted. >>> table_env.register_table_sink("sink_table", table_sink) >>> source_table = ... # source_table is not registered to the table environment >>> table_env.sql_update("INSERT INTO sink_table SELECT * FROM %s" % source_table)
A DDL statement can also be executed to create/drop a table: For example, the below DDL statement would create a CSV table named tbl1 into the current catalog:
create table tbl1( a int, b bigint, c varchar ) with ( 'connector.type' = 'filesystem', 'format.type' = 'csv', 'connector.path' = 'xxx' )
SQL queries can directly execute as follows:
>>> source_ddl = \ ... ''' ... create table sourceTable( ... a int, ... b varchar ... ) with ( ... 'connector.type' = 'kafka', ... 'update-mode' = 'append', ... 'connector.topic' = 'xxx', ... 'connector.properties.bootstrap.servers' = 'localhost:9092' ... ) ... ''' >>> sink_ddl = \ ... ''' ... create table sinkTable( ... a int, ... b varchar ... ) with ( ... 'connector.type' = 'filesystem', ... 'format.type' = 'csv', ... 'connector.path' = 'xxx' ... ) ... ''' >>> query = "INSERT INTO sinkTable SELECT FROM sourceTable" >>> table_env.sql(source_ddl) >>> table_env.sql(sink_ddl) >>> table_env.sql(query) >>> table_env.execute("MyJob")
- Parameters
stmt (str) – The SQL statement to evaluate.
Note
Deprecated in 1.11. Use
execute_sql()
for single statement, usecreate_statement_set()
for multiple DML statements.
-
use_catalog
(catalog_name)[source]¶ Sets the current catalog to the given value. It also sets the default database to the catalog’s default one. See also
use_database()
.This is used during the resolution of object paths. Both the catalog and database are optional when referencing catalog objects such as tables, views etc. The algorithm looks for requested objects in following paths in that order:
[current-catalog].[current-database].[requested-path]
[current-catalog].[requested-path]
[requested-path]
Example:
Given structure with default catalog set to
default_catalog
and default database set todefault_database
.root: |- default_catalog |- default_database |- tab1 |- db1 |- tab1 |- cat1 |- db1 |- tab1
The following table describes resolved paths:
Requested path
Resolved path
tab1
default_catalog.default_database.tab1
db1.tab1
default_catalog.db1.tab1
cat1.db1.tab1
cat1.db1.tab1
- Parameters
catalog_name (str) – The name of the catalog to set as the current default catalog.
- Throws
CatalogException
thrown if a catalog with given name could not be set as the default one.
See also
-
use_database
(database_name)[source]¶ Sets the current default database. It has to exist in the current catalog. That path will be used as the default one when looking for unqualified object names.
This is used during the resolution of object paths. Both the catalog and database are optional when referencing catalog objects such as tables, views etc. The algorithm looks for requested objects in following paths in that order:
[current-catalog].[current-database].[requested-path]
[current-catalog].[requested-path]
[requested-path]
Example:
Given structure with default catalog set to
default_catalog
and default database set todefault_database
.root: |- default_catalog |- default_database |- tab1 |- db1 |- tab1 |- cat1 |- db1 |- tab1
The following table describes resolved paths:
Requested path
Resolved path
tab1
default_catalog.default_database.tab1
db1.tab1
default_catalog.db1.tab1
cat1.db1.tab1
cat1.db1.tab1
- Throws
CatalogException
thrown if the given catalog and database could not be set as the default ones.
See also
- Parameters
database_name (str) – The name of the database to set as the current database.
-
class
pyflink.table.
TableResult
(j_table_result)[source]¶ Bases:
object
A
TableResult
is the representation of the statement execution result.New in version 1.11.0.
-
get_job_client
()[source]¶ For DML and DQL statement, return the JobClient which associates the submitted Flink job. For other statements (e.g. DDL, DCL) return empty.
- Returns
The job client, optional.
- Return type
New in version 1.11.0.
-
get_result_kind
()[source]¶ Return the ResultKind which represents the result type.
For DDL operation and USE operation, the result kind is always SUCCESS. For other operations, the result kind is always SUCCESS_WITH_CONTENT.
- Returns
The result kind.
- Return type
New in version 1.11.0.
-
get_table_schema
()[source]¶ Get the schema of result.
The schema of DDL, USE, EXPLAIN:
+-------------+-------------+----------+ | column name | column type | comments | +-------------+-------------+----------+ | result | STRING | | +-------------+-------------+----------+
The schema of SHOW:
+---------------+-------------+----------+ | column name | column type | comments | +---------------+-------------+----------+ | <object name> | STRING | | +---------------+-------------+----------+ The column name of `SHOW CATALOGS` is "catalog name", the column name of `SHOW DATABASES` is "database name", the column name of `SHOW TABLES` is "table name", the column name of `SHOW VIEWS` is "view name", the column name of `SHOW FUNCTIONS` is "function name".
The schema of DESCRIBE:
+------------------+-------------+-------------------------------------------------+ | column name | column type | comments | +------------------+-------------+-------------------------------------------------+ | name | STRING | field name | +------------------+-------------+-------------------------------------------------+ | type | STRING | field type expressed as a String | +------------------+-------------+-------------------------------------------------+ | null | BOOLEAN | field nullability: true if a field is nullable, | | | | else false | +------------------+-------------+-------------------------------------------------+ | key | BOOLEAN | key constraint: 'PRI' for primary keys, | | | | 'UNQ' for unique keys, else null | +------------------+-------------+-------------------------------------------------+ | computed column | STRING | computed column: string expression | | | | if a field is computed column, else null | +------------------+-------------+-------------------------------------------------+ | watermark | STRING | watermark: string expression if a field is | | | | watermark, else null | +------------------+-------------+-------------------------------------------------+
The schema of INSERT: (one column per one sink)
+----------------------------+-------------+-----------------------+ | column name | column type | comments | +----------------------------+-------------+-----------------------+ | (name of the insert table) | BIGINT | the insert table name | +----------------------------+-------------+-----------------------+
The schema of SELECT is the selected field names and types.
- Returns
The schema of result.
- Return type
New in version 1.11.0.
-
print
()[source]¶ Print the result contents as tableau form to client console.
For streaming mode, this method guarantees end-to-end exactly-once record delivery which requires the checkpointing mechanism to be enabled. By default, checkpointing is disabled. To enable checkpointing, set checkpointing properties (see ExecutionCheckpointingOptions) through TableConfig#getConfiguration().
New in version 1.11.0.
-
-
class
pyflink.table.
TableSchema
(field_names=None, data_types=None, j_table_schema=None)[source]¶ Bases:
object
A table schema that represents a table’s structure with field names and data types.
-
class
Builder
[source]¶ Bases:
object
Builder for creating a
TableSchema
.-
build
()[source]¶ Returns a
TableSchema
instance.- Returns
The
TableSchema
instance.
-
-
get_field_data_type
(field)[source]¶ Returns the specified data type for the given field index or field name.
- Parameters
field – The index of the field or the name of the field.
- Returns
The data type of the specified field.
-
get_field_data_types
()[source]¶ Returns all field data types as a list.
- Returns
A list of all field data types.
-
class
-
class
pyflink.table.
TableSink
(j_table_sink)[source]¶ Bases:
object
A
TableSink
specifies how to emit a table to an external system or location.
-
class
pyflink.table.
TableSource
(j_table_source)[source]¶ Bases:
object
Defines a table from an external system or location.
-
class
pyflink.table.
UserDefinedType
(nullable=True)[source]¶ Bases:
pyflink.table.types.DataType
User-defined type (UDT).
Note
WARN: Flink Internal Use Only
-
classmethod
java_udt
()[source]¶ The class name of the paired Java UDT (could be ‘’, if there is no corresponding one).
-
classmethod
-
class
pyflink.table.
WindowGroupedTable
(java_window_grouped_table, t_env)[source]¶ Bases:
object
A table that has been windowed and grouped for
GroupWindow
.-
select
(fields)[source]¶ Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement. The field expressions can contain complex expressions and aggregations.
Example:
>>> window_grouped_table.select("key, window.start, value.avg as valavg")
- Parameters
fields (str) – Expression string.
- Returns
The result table.
- Return type
-
pyflink.table.window module¶
-
class
pyflink.table.window.
Tumble
[source]¶ Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups elements in 5 minutes intervals.
Example:
>>> Tumble.over("10.minutes").on("rowtime").alias("w")
-
classmethod
over
(size)[source]¶ Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups elements in 5 minutes intervals.
- Parameters
size – The size of the window as time or row-count interval.
- Returns
A partially defined tumbling window.
-
classmethod
-
class
pyflink.table.window.
Session
[source]¶ Helper class for creating a session window. The boundary of session windows are defined by intervals of inactivity, i.e., a session window is closes if no event appears for a defined gap period.
Example:
>>> Session.with_gap("10.minutes").on("rowtime").alias("w")
-
classmethod
with_gap
(gap)[source]¶ Creates a session window. The boundary of session windows are defined by intervals of inactivity, i.e., a session window is closes if no event appears for a defined gap period.
- Parameters
gap – Specifies how long (as interval of milliseconds) to wait for new data before closing the session window.
- Returns
A partially defined session window.
-
classmethod
-
class
pyflink.table.window.
Slide
[source]¶ Helper class for creating a sliding window. Sliding windows have a fixed size and slide by a specified slide interval. If the slide interval is smaller than the window size, sliding windows are overlapping. Thus, an element can be assigned to multiple windows.
For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive window evaluations.
Example:
>>> Slide.over("10.minutes").every("5.minutes").on("rowtime").alias("w")
-
classmethod
over
(size)[source]¶ Creates a sliding window. Sliding windows have a fixed size and slide by a specified slide interval. If the slide interval is smaller than the window size, sliding windows are overlapping. Thus, an element can be assigned to multiple windows.
For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive window evaluations.
- Parameters
size – The size of the window as time or row-count interval.
- Returns
A partially specified sliding window.
-
classmethod
-
class
pyflink.table.window.
Over
[source]¶ Helper class for creating an over window. Similar to SQL, over window aggregates compute an aggregate for each input row over a range of its neighboring rows.
Over-windows for batch tables are currently not supported.
Example:
>>> Over.partition_by("a").order_by("rowtime").preceding("unbounded_range").alias("w")
-
classmethod
order_by
(order_by)[source]¶ Specifies the time attribute on which rows are ordered.
For streaming tables, reference a rowtime or proctime time attribute here to specify the time mode.
- Parameters
order_by – Field reference.
- Returns
An over window with defined order.
-
classmethod
partition_by
(partition_by)[source]¶ Partitions the elements on some partition keys.
Each partition is individually sorted and aggregate functions are applied to each partition separately.
- Parameters
partition_by – List of field references.
- Returns
An over window with defined partitioning.
-
classmethod
-
class
pyflink.table.window.
GroupWindow
(java_window)[source]¶ A group window specification.
Group windows group rows based on time or row-count intervals and is therefore essentially a special type of groupBy. Just like groupBy, group windows allow to compute aggregates on groups of elements.
Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping is required to apply aggregations on streaming tables.
For finite batch tables, group windows provide shortcuts for time-based groupBy.
pyflink.table.descriptors module¶
-
class
pyflink.table.descriptors.
Rowtime
[source]¶ Rowtime descriptor for describing an event time attribute in the schema.
-
timestamps_from_extractor
(extractor)[source]¶ Sets a custom timestamp extractor to be used for the rowtime attribute.
- Parameters
extractor – The java fully-qualified class name of the TimestampExtractor to extract the rowtime attribute from the physical type. The TimestampExtractor must have a public no-argument constructor and can be founded by in current Java classloader.
- Returns
This rowtime descriptor.
-
timestamps_from_field
(field_name)[source]¶ Sets a built-in timestamp extractor that converts an existing LONG or TIMESTAMP field into the rowtime attribute.
- Parameters
field_name – The field to convert into a rowtime attribute.
- Returns
This rowtime descriptor.
-
timestamps_from_source
()[source]¶ Sets a built-in timestamp extractor that converts the assigned timestamps from a DataStream API record into the rowtime attribute and thus preserves the assigned timestamps from the source.
Note
This extractor only works in streaming environments.
- Returns
This rowtime descriptor.
-
watermarks_from_source
()[source]¶ Sets a built-in watermark strategy which indicates the watermarks should be preserved from the underlying DataStream API and thus preserves the assigned watermarks from the source.
- Returns
This rowtime descriptor.
-
watermarks_from_strategy
(strategy)[source]¶ Sets a custom watermark strategy to be used for the rowtime attribute.
- Parameters
strategy – The java fully-qualified class name of the WatermarkStrategy. The WatermarkStrategy must have a public no-argument constructor and can be founded by in current Java classloader.
- Returns
This rowtime descriptor.
-
watermarks_periodic_ascending
()[source]¶ Sets a built-in watermark strategy for ascending rowtime attributes.
Emits a watermark of the maximum observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp are not late.
- Returns
This rowtime descriptor.
-
watermarks_periodic_bounded
(delay)[source]¶ Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
Emits watermarks which are the maximum observed timestamp minus the specified delay.
- Parameters
delay – Delay in milliseconds.
- Returns
This rowtime descriptor.
-
-
class
pyflink.table.descriptors.
Schema
(schema=None, fields=None, rowtime=None)[source]¶ Describes a schema of a table.
Note
Field names are matched by the exact name by default (case sensitive).
-
field
(field_name, field_type)[source]¶ Adds a field with the field name and the data type or type string. Required. This method can be called multiple times. The call order of this method defines also the order of the fields in a row. Here is a document that introduces the type strings: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#type-strings
- Parameters
field_name – The field name.
field_type – The data type or type string of the field.
- Returns
This schema object.
-
fields
(fields)[source]¶ Adds a set of fields with the field name and the data type or type string stored in a list.
- Parameters
fields – Dict of fields with the field name and the data type or type string stored. E.g, [(‘int_field’, DataTypes.INT()), (‘string_field’, DataTypes.STRING())].
- Returns
This schema object.
New in version 1.11.0.
-
from_origin_field
(origin_field_name)[source]¶ Specifies the origin of the previously defined field. The origin field is defined by a connector or format.
E.g. field(“myString”, Types.STRING).from_origin_field(“CSV_MY_STRING”)
Note
Field names are matched by the exact name by default (case sensitive).
- Parameters
origin_field_name – The origin field name.
- Returns
This schema object.
-
proctime
()[source]¶ Specifies the previously defined field as a processing-time attribute.
E.g. field(“proctime”, Types.SQL_TIMESTAMP).proctime()
- Returns
This schema object.
-
-
class
pyflink.table.descriptors.
OldCsv
(schema=None, field_delimiter=None, line_delimiter=None, quote_character=None, comment_prefix=None, ignore_parse_errors=False, ignore_first_line=False)[source]¶ Format descriptor for comma-separated values (CSV).
Note
This descriptor describes Flink’s non-standard CSV table source/sink. In the future, the descriptor will be replaced by a proper RFC-compliant version. Use the RFC-compliant Csv format in the dedicated flink-formats/flink-csv module instead when writing to Kafka. Use the old one for stream/batch filesystem operations for now.
Note
Deprecated: use the RFC-compliant Csv format instead when writing to Kafka.
-
comment_prefix
(prefix)[source]¶ Sets a prefix to indicate comments, null by default.
- Parameters
prefix – The prefix to indicate comments.
- Returns
This
OldCsv
object.
-
field
(field_name, field_type)[source]¶ Adds a format field with the field name and the data type or type string. Required. This method can be called multiple times. The call order of this method defines also the order of the fields in the format.
- Parameters
field_name – The field name.
field_type – The data type or type string of the field.
- Returns
This
OldCsv
object.
-
field_delimiter
(delimiter)[source]¶ Sets the field delimiter, “,” by default.
- Parameters
delimiter – The field delimiter.
- Returns
This
OldCsv
object.
-
ignore_first_line
()[source]¶ Ignore the first line. Not skip the first line by default.
- Returns
This
OldCsv
object.
-
ignore_parse_errors
()[source]¶ Skip records with parse error instead to fail. Throw an exception by default.
- Returns
This
OldCsv
object.
-
line_delimiter
(delimiter)[source]¶ Sets the line delimiter, “\n” by default.
- Parameters
delimiter – The line delimiter.
- Returns
This
OldCsv
object.
-
-
class
pyflink.table.descriptors.
FileSystem
(path=None)[source]¶ Connector descriptor for a file system.
-
path
(path_str)[source]¶ Sets the path to a file or directory in a file system.
- Parameters
path_str – The path of a file or directory.
- Returns
This
FileSystem
object.
-
-
class
pyflink.table.descriptors.
Kafka
(version=None, topic=None, properties=None, start_from_earliest=False, start_from_latest=False, start_from_group_offsets=True, start_from_specific_offsets_dict=None, start_from_timestamp=None, sink_partitioner_fixed=None, sink_partitioner_round_robin=None, custom_partitioner_class_name=None)[source]¶ Connector descriptor for the Apache Kafka message queue.
-
properties
(property_dict)[source]¶ Sets the configuration properties for the Kafka consumer. Resets previously set properties.
- Parameters
property_dict – The dict object contains configuration properties for the Kafka consumer. Both the keys and values should be strings.
- Returns
This object.
-
property
(key, value)[source]¶ Adds a configuration properties for the Kafka consumer.
- Parameters
key – Property key string for the Kafka consumer.
value – Property value string for the Kafka consumer.
- Returns
This object.
-
sink_partitioner_custom
(partitioner_class_name)[source]¶ Configures how to partition records from Flink’s partitions into Kafka’s partitions.
This strategy allows for a custom partitioner by providing an implementation of
FlinkKafkaPartitioner
.- Parameters
partitioner_class_name – The java canonical class name of the FlinkKafkaPartitioner. The FlinkKafkaPartitioner must have a public no-argument constructor and can be founded by in current Java classloader.
- Returns
This object.
-
sink_partitioner_fixed
()[source]¶ Configures how to partition records from Flink’s partitions into Kafka’s partitions.
This strategy ensures that each Flink partition ends up in one Kafka partition.
Note
One Kafka partition can contain multiple Flink partitions. Examples:
More Flink partitions than Kafka partitions. Some (or all) Kafka partitions contain the output of more than one flink partition:
Flink Sinks ——— Kafka Partitions1 —————-> 12 ————–/3 ————-/4 ————/Fewer Flink partitions than Kafka partitions:
Flink Sinks ——— Kafka Partitions1 —————-> 12 —————-> 2……………. 3……………. 4……………. 5- Returns
This object.
-
sink_partitioner_round_robin
()[source]¶ Configures how to partition records from Flink’s partitions into Kafka’s partitions.
This strategy ensures that records will be distributed to Kafka partitions in a round-robin fashion.
Note
This strategy is useful to avoid an unbalanced partitioning. However, it will cause a lot of network connections between all the Flink instances and all the Kafka brokers.
- Returns
This object.
-
start_from_earliest
()[source]¶ Specifies the consumer to start reading from the earliest offset for all partitions. This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
This method does not affect where partitions are read from when the consumer is restored from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint, only the offsets in the restored state will be used.
- Returns
This object.
-
start_from_group_offsets
()[source]¶ Specifies the consumer to start reading from any committed group offsets found in Zookeeper / Kafka brokers. The “group.id” property must be set in the configuration properties. If no offset can be found for a partition, the behaviour in “auto.offset.reset” set in the configuration properties will be used for the partition.
This method does not affect where partitions are read from when the consumer is restored from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint, only the offsets in the restored state will be used.
- Returns
This object.
-
start_from_latest
()[source]¶ Specifies the consumer to start reading from the latest offset for all partitions. This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
This method does not affect where partitions are read from when the consumer is restored from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint, only the offsets in the restored state will be used.
- Returns
This object.
-
start_from_specific_offset
(partition, specific_offset)[source]¶ Configures to start reading partitions from specific offsets and specifies the given offset for the given partition.
see
pyflink.table.descriptors.Kafka.start_from_specific_offsets()
- Parameters
partition – Partition id.
specific_offset – Specified offset in given partition.
- Returns
This object.
-
start_from_specific_offsets
(specific_offsets_dict)[source]¶ Specifies the consumer to start reading partitions from specific offsets, set independently for each partition. The specified offset should be the offset of the next record that will be read from partitions. This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
If the provided map of offsets contains entries whose partition is not subscribed by the consumer, the entry will be ignored. If the consumer subscribes to a partition that does not exist in the provided map of offsets, the consumer will fallback to the default group offset behaviour(see
pyflink.table.descriptors.Kafka.start_from_group_offsets()
) for that particular partition.If the specified offset for a partition is invalid, or the behaviour for that partition is defaulted to group offsets but still no group offset could be found for it, then the “auto.offset.reset” behaviour set in the configuration properties will be used for the partition.
This method does not affect where partitions are read from when the consumer is restored from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint, only the offsets in the restored state will be used.
- Parameters
specific_offsets_dict – Dict of specific_offsets that the key is int-type partition id and value is int-type offset value.
- Returns
This object.
-
start_from_timestamp
(timestamp)[source]¶ Specifies the consumer to start reading partitions from a specified timestamp. The specified timestamp must be before the current timestamp. This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
The consumer will look up the earliest offset whose timestamp is greater than or equal to the specific timestamp from Kafka. If there’s no such offset, the consumer will use the latest offset to read data from kafka.
This method does not affect where partitions are read from when the consumer is restored from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint, only the offsets in the restored state will be used.
:param timestamp timestamp for the startup offsets, as milliseconds from epoch. :return: This object.
New in version 1.11.0.
-
-
class
pyflink.table.descriptors.
Elasticsearch
(version=None, hostname=None, port=None, protocol=None, index=None, document_type=None, key_delimiter=None, key_null_literal=None, failure_handler_fail=False, failure_handler_ignore=False, failure_handler_retry_rejected=False, custom_failure_handler_class_name=None, disable_flush_on_checkpoint=False, bulk_flush_max_actions=None, bulk_flush_max_size=None, bulk_flush_interval=None, bulk_flush_backoff_constant=False, bulk_flush_backoff_exponential=False, bulk_flush_backoff_max_retries=None, bulk_flush_backoff_delay=None, connection_max_retry_timeout=None, connection_path_prefix=None)[source]¶ Connector descriptor for the Elasticsearch search engine.
-
bulk_flush_backoff_constant
()[source]¶ Configures how to buffer elements before sending them in bulk to the cluster for efficiency.
Sets a constant backoff type to use when flushing bulk requests.
- Returns
This object.
-
bulk_flush_backoff_delay
(delay)[source]¶ Configures how to buffer elements before sending them in bulk to the cluster for efficiency.
Sets the amount of delay between each backoff attempt when flushing bulk requests (in milliseconds).
Make sure to enable backoff by selecting a strategy (
pyflink.table.descriptors.Elasticsearch.bulk_flush_backoff_constant()
orpyflink.table.descriptors.Elasticsearch.bulk_flush_backoff_exponential()
).- Parameters
delay – Delay between each backoff attempt (in milliseconds).
- Returns
This object.
-
bulk_flush_backoff_exponential
()[source]¶ Configures how to buffer elements before sending them in bulk to the cluster for efficiency.
Sets an exponential backoff type to use when flushing bulk requests.
- Returns
This object.
-
bulk_flush_backoff_max_retries
(max_retries)[source]¶ Configures how to buffer elements before sending them in bulk to the cluster for efficiency.
Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
Make sure to enable backoff by selecting a strategy (
pyflink.table.descriptors.Elasticsearch.bulk_flush_backoff_constant()
orpyflink.table.descriptors.Elasticsearch.bulk_flush_backoff_exponential()
).- Parameters
max_retries – The maximum number of retries.
- Returns
This object.
-
bulk_flush_interval
(interval)[source]¶ Configures how to buffer elements before sending them in bulk to the cluster for efficiency.
Sets the bulk flush interval (in milliseconds).
- Parameters
interval – Bulk flush interval (in milliseconds).
- Returns
This object.
-
bulk_flush_max_actions
(max_actions_num)[source]¶ Configures how to buffer elements before sending them in bulk to the cluster for efficiency.
Sets the maximum number of actions to buffer for each bulk request.
- Parameters
max_actions_num – the maximum number of actions to buffer per bulk request.
- Returns
This object.
-
bulk_flush_max_size
(max_size)[source]¶ Configures how to buffer elements before sending them in bulk to the cluster for efficiency.
Sets the maximum size of buffered actions per bulk request (using the syntax of MemorySize).
- Parameters
max_size – The maximum size. E.g. “42 mb”. only MB granularity is supported.
- Returns
This object.
-
connection_max_retry_timeout
(max_retry_timeout)[source]¶ Sets connection properties to be used during REST communication to Elasticsearch.
Sets the maximum timeout (in milliseconds) in case of multiple retries of the same request.
- Parameters
max_retry_timeout – Maximum timeout (in milliseconds).
- Returns
This object.
-
connection_path_prefix
(path_prefix)[source]¶ Sets connection properties to be used during REST communication to Elasticsearch.
Adds a path prefix to every REST communication.
- Parameters
path_prefix – Prefix string to be added to every REST communication.
- Returns
This object.
-
disable_flush_on_checkpoint
()[source]¶ Disables flushing on checkpoint. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints.
Note
If flushing on checkpoint is disabled, a Elasticsearch sink does NOT provide any strong guarantees for at-least-once delivery of action requests.
- Returns
This object.
-
document_type
(document_type)[source]¶ Declares the Elasticsearch document type for every record. Required.
- Parameters
document_type – Elasticsearch document type.
- Returns
This object.
-
failure_handler_custom
(failure_handler_class_name)[source]¶ Configures a failure handling strategy in case a request to Elasticsearch fails.
This strategy allows for custom failure handling using a
ActionRequestFailureHandler
.- Parameters
failure_handler_class_name –
- Returns
This object.
-
failure_handler_fail
()[source]¶ Configures a failure handling strategy in case a request to Elasticsearch fails.
This strategy throws an exception if a request fails and thus causes a job failure.
- Returns
This object.
-
failure_handler_ignore
()[source]¶ Configures a failure handling strategy in case a request to Elasticsearch fails.
This strategy ignores failures and drops the request.
- Returns
This object.
-
failure_handler_retry_rejected
()[source]¶ Configures a failure handling strategy in case a request to Elasticsearch fails.
This strategy re-adds requests that have failed due to queue capacity saturation.
- Returns
This object.
-
host
(hostname, port, protocol)[source]¶ Adds an Elasticsearch host to connect to. Required.
Multiple hosts can be declared by calling this method multiple times.
- Parameters
hostname – Connection hostname.
port – Connection port.
protocol – Connection protocol; e.g. “http”.
- Returns
This object.
-
index
(index)[source]¶ Declares the Elasticsearch index for every record. Required.
- Parameters
index – Elasticsearch index.
- Returns
This object.
-
key_delimiter
(key_delimiter)[source]¶ Sets a custom key delimiter in case the Elasticsearch ID needs to be constructed from multiple fields. Optional.
- Parameters
key_delimiter – Key delimiter; e.g., “$” would result in IDs “KEY1$KEY2$KEY3”.
- Returns
This object.
-
-
class
pyflink.table.descriptors.
HBase
(version=None, table_name=None, zookeeper_quorum=None, zookeeper_node_parent=None, write_buffer_flush_max_size=None, write_buffer_flush_max_rows=None, write_buffer_flush_interval=None)[source]¶ Connector descriptor for Apache HBase.
New in version 1.11.0.
-
table_name
(table_name)[source]¶ Set the HBase table name, Required.
- Parameters
table_name – Name of HBase table. E.g., “testNamespace:testTable”, “testDefaultTable”
- Returns
This object.
New in version 1.11.0.
-
version
(version)[source]¶ Set the Apache HBase version to be used, Required.
- Parameters
version – HBase version. E.g., “1.4.3”.
- Returns
This object.
New in version 1.11.0.
-
write_buffer_flush_interval
(interval)[source]¶ Set an interval when to flushing buffered requesting if the interval passes, in milliseconds. Defaults to not set, i.e. won’t flush based on flush interval, Optional.
- Parameters
interval – flush interval. The string should be in format “{length value}{time unit label}” E.g, “123ms”, “1 s”, if not time unit label is specified, it will be considered as milliseconds.
- Returns
This object.
New in version 1.11.0.
-
write_buffer_flush_max_rows
(write_buffer_flush_max_rows)[source]¶ Set threshold when to flush buffered request based on the number of rows currently added. Defaults to not set, i.e. won;t flush based on the number of buffered rows, Optional.
- Parameters
write_buffer_flush_max_rows – number of added rows when begin the request flushing.
- Returns
This object.
New in version 1.11.0.
-
write_buffer_flush_max_size
(max_size)[source]¶ Set threshold when to flush buffered request based on the memory byte size of rows currently added.
- Parameters
max_size – the maximum size.
- Returns
This object.
New in version 1.11.0.
-
-
class
pyflink.table.descriptors.
Csv
(schema=None, field_delimiter=None, line_delimiter=None, quote_character=None, allow_comments=False, ignore_parse_errors=False, array_element_delimiter=None, escape_character=None, null_literal=None)[source]¶ Format descriptor for comma-separated values (CSV).
This descriptor aims to comply with RFC-4180 (“Common Format and MIME Type for Comma-Separated Values (CSV) Files”) proposed by the Internet Engineering Task Force (IETF).
Note
This descriptor does not describe Flink’s old non-standard CSV table source/sink. Currently, this descriptor can be used when writing to Kafka. The old one is still available as
OldCsv
for stream/batch filesystem operations.-
allow_comments
()[source]¶ Ignores comment lines that start with ‘#’ (disabled by default). If enabled, make sure to also ignore parse errors to allow empty rows.
- Returns
This
Csv
object.
-
array_element_delimiter
(delimiter)[source]¶ Sets the array element delimiter string for separating array or row element values (“;” by default).
- Parameters
delimiter – The array element delimiter.
- Returns
This
Csv
object.
-
derive_schema
()[source]¶ Derives the format schema from the table’s schema. Required if no format schema is defined.
This allows for defining schema information only once.
The names, types, and fields’ order of the format are determined by the table’s schema. Time attributes are ignored if their origin is not a field. A “from” definition is interpreted as a field renaming in the format.
- Returns
This
Csv
object.
-
escape_character
(escape_character)[source]¶ Sets the escape character for escaping values (disabled by default).
- Parameters
escape_character – Escaping character (e.g. backslash).
- Returns
This
Csv
object.
-
field_delimiter
(delimiter)[source]¶ Sets the field delimiter character (‘,’ by default).
- Parameters
delimiter – The field delimiter character.
- Returns
This
Csv
object.
-
ignore_parse_errors
()[source]¶ Skip records with parse error instead to fail. Throw an exception by default.
- Returns
This
Csv
object.
-
line_delimiter
(delimiter)[source]¶ Sets the line delimiter (“\n” by default; otherwise “\r” or “\r\n” are allowed).
- Parameters
delimiter – The line delimiter.
- Returns
This
Csv
object.
-
null_literal
(null_literal)[source]¶ Sets the null literal string that is interpreted as a null value (disabled by default).
- Parameters
null_literal – The null literal string.
- Returns
This
Csv
object.
-
-
class
pyflink.table.descriptors.
Avro
(record_class=None, avro_schema=None)[source]¶ Format descriptor for Apache Avro records.
-
class
pyflink.table.descriptors.
Json
(json_schema=None, schema=None, derive_schema=False)[source]¶ Format descriptor for JSON.
-
derive_schema
()[source]¶ Derives the format schema from the table’s schema described.
This allows for defining schema information only once.
The names, types, and fields’ order of the format are determined by the table’s schema. Time attributes are ignored if their origin is not a field. A “from” definition is interpreted as a field renaming in the format.
- Returns
This object.
-
fail_on_missing_field
(fail_on_missing_field)[source]¶ Sets flag whether to fail if a field is missing or not.
- Parameters
fail_on_missing_field – If set to
True
, the operation fails if there is a missing field. If set toFalse
, a missing field is set to null.- Returns
This object.
-
ignore_parse_errors
(ignore_parse_errors)[source]¶ Sets flag whether to fail when parsing json fails.
- Parameters
ignore_parse_errors – If set to true, the operation will ignore parse errors. If set to false, the operation fails when parsing json fails.
- Returns
This object.
-
json_schema
(json_schema)[source]¶ Sets the JSON schema string with field names and the types according to the JSON schema specification: http://json-schema.org/specification.html
The schema might be nested.
- Parameters
json_schema – The JSON schema string.
- Returns
This object.
-
-
class
pyflink.table.descriptors.
ConnectTableDescriptor
(j_connect_table_descriptor)[source]¶ Common class for table’s created with
pyflink.table.TableEnvironment.connect
.-
create_temporary_table
(path)[source]¶ Registers the table described by underlying properties in a given path.
There is no distinction between source and sink at the descriptor level anymore as this method does not perform actual class lookup. It only stores the underlying properties. The actual source/sink lookup is performed when the table is used.
Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.
Note
The schema must be explicitly defined.
- Parameters
path – path where to register the temporary table
New in version 1.10.0.
-
-
class
pyflink.table.descriptors.
StreamTableDescriptor
(j_stream_table_descriptor, in_append_mode=False, in_retract_mode=False, in_upsert_mode=False)[source]¶ Descriptor for specifying a table source and/or sink in a streaming environment.
See also
parent class:
ConnectTableDescriptor
-
in_append_mode
()[source]¶ Declares how to perform the conversion between a dynamic table and an external connector.
In append mode, a dynamic table and an external connector only exchange INSERT messages.
- Returns
This object.
-
in_retract_mode
()[source]¶ Declares how to perform the conversion between a dynamic table and an external connector.
In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages.
An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for the updating (new) row.
In this mode, a key must not be defined as opposed to upsert mode. However, every update consists of two messages which is less efficient.
- Returns
This object.
-
in_upsert_mode
()[source]¶ Declares how to perform the conversion between a dynamic table and an external connector.
In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages.
This mode requires a (possibly composite) unique key by which updates can be propagated. The external connector needs to be aware of the unique key attribute in order to apply messages correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as DELETE messages.
The main difference to a retract stream is that UPDATE changes are encoded with a single message and are therefore more efficient.
- Returns
This object.
-
-
class
pyflink.table.descriptors.
BatchTableDescriptor
(j_batch_table_descriptor)[source]¶ Descriptor for specifying a table source and/or sink in a batch environment.
See also
parent class:
ConnectTableDescriptor
-
class
pyflink.table.descriptors.
CustomConnectorDescriptor
(type, version, format_needed)[source]¶ Describes a custom connector to an other system.
-
class
pyflink.table.descriptors.
CustomFormatDescriptor
(type, version)[source]¶ Describes the custom format of data.
pyflink.table.catalog module¶
-
class
pyflink.table.catalog.
Catalog
(j_catalog)[source]¶ Catalog is responsible for reading and writing metadata such as database/table/views/UDFs from a registered catalog. It connects a registered catalog and Flink’s Table API.
-
alter_database
(name, new_database, ignore_if_not_exists)[source]¶ Modify an existing database.
- Parameters
name – Name of the database to be modified.
new_database – The new database
CatalogDatabase
definition.ignore_if_not_exists – Flag to specify behavior when the given database does not exist: if set to false, throw an exception, if set to true, do nothing.
- Raise
CatalogException in case of any runtime exception. DatabaseNotExistException if the given database does not exist.
-
alter_function
(function_path, new_function, ignore_if_not_exists)[source]¶ Modify an existing function.
- Parameters
function_path – Path
ObjectPath
of the function.new_function – The function
CatalogFunction
to be modified.ignore_if_not_exists – Flag to specify behavior if the function does not exist: if set to false, throw an exception if set to true, nothing happens
- Raise
CatalogException in case of any runtime exception. FunctionNotExistException if the function does not exist.
-
alter_partition
(table_path, partition_spec, new_partition, ignore_if_not_exists)[source]¶ Alter a partition.
- Parameters
table_path – Path
ObjectPath
of the table.partition_spec – Partition spec
CatalogPartitionSpec
of the partition to alter.new_partition – New partition
CatalogPartition
to replace the old one.ignore_if_not_exists – Flag to specify behavior if the database does not exist: if set to false, throw an exception, if set to true, nothing happens.
- Raise
CatalogException in case of any runtime exception. PartitionNotExistException thrown if the target partition does not exist.
-
alter_partition_column_statistics
(table_path, partition_spec, column_statistics, ignore_if_not_exists)[source]¶ Update the column statistics of a table partition.
- Parameters
table_path – Path
ObjectPath
of the table.partition_spec – Partition spec
CatalogPartitionSpec
of the partition.column_statistics – New column statistics
CatalogColumnStatistics
to update.ignore_if_not_exists – Flag to specify behavior if the partition does not exist: if set to false, throw an exception, if set to true, nothing happens.
- Raise
CatalogException in case of any runtime exception. PartitionNotExistException if the partition does not exist.
-
alter_partition_statistics
(table_path, partition_spec, partition_statistics, ignore_if_not_exists)[source]¶ Update the statistics of a table partition.
- Parameters
table_path – Path
ObjectPath
of the table.partition_spec – Partition spec
CatalogPartitionSpec
of the partition.partition_statistics – New statistics
CatalogTableStatistics
to update.ignore_if_not_exists – Flag to specify behavior if the partition does not exist: if set to false, throw an exception, if set to true, nothing happens.
- Raise
CatalogException in case of any runtime exception. PartitionNotExistException if the partition does not exist.
-
alter_table
(table_path, new_table, ignore_if_not_exists)[source]¶ Modify an existing table or view. Note that the new and old CatalogBaseTable must be of the same type. For example, this doesn’t allow alter a regular table to partitioned table, or alter a view to a table, and vice versa.
- Parameters
table_path – Path
ObjectPath
of the table or view to be modified.new_table – The new table definition
CatalogBaseTable
.ignore_if_not_exists – Flag to specify behavior when the table or view does not exist: if set to false, throw an exception, if set to true, do nothing.
- Raise
CatalogException in case of any runtime exception. TableNotExistException if the table does not exist.
-
alter_table_column_statistics
(table_path, column_statistics, ignore_if_not_exists)[source]¶ Update the column statistics of a table.
- Parameters
table_path – Path
ObjectPath
of the table.column_statistics – New column statistics
CatalogColumnStatistics
to update.ignore_if_not_exists – Flag to specify behavior if the column does not exist: if set to false, throw an exception, if set to true, nothing happens.
- Raise
CatalogException in case of any runtime exception. TableNotExistException if the table does not exist in the catalog.
-
alter_table_statistics
(table_path, table_statistics, ignore_if_not_exists)[source]¶ Update the statistics of a table.
- Parameters
table_path – Path
ObjectPath
of the table.table_statistics – New statistics
CatalogTableStatistics
to update.ignore_if_not_exists – Flag to specify behavior if the table does not exist: if set to false, throw an exception, if set to true, nothing happens.
- Raise
CatalogException in case of any runtime exception. TableNotExistException if the table does not exist in the catalog.
-
create_database
(name, database, ignore_if_exists)[source]¶ Create a database.
- Parameters
name – Name of the database to be created.
database – The
CatalogDatabase
database definition.ignore_if_exists – Flag to specify behavior when a database with the given name already exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do nothing.
- Raise
CatalogException in case of any runtime exception. DatabaseAlreadyExistException if the given database already exists and ignoreIfExists is false.
-
create_function
(function_path, function, ignore_if_exists)[source]¶ Create a function.
- Parameters
function_path – Path
ObjectPath
of the function.function – The function
CatalogFunction
to be created.ignore_if_exists – Flag to specify behavior if a function with the given name already exists: if set to false, it throws a FunctionAlreadyExistException, if set to true, nothing happens.
- Raise
CatalogException in case of any runtime exception. FunctionAlreadyExistException if the function already exist. DatabaseNotExistException if the given database does not exist.
-
create_partition
(table_path, partition_spec, partition, ignore_if_exists)[source]¶ Create a partition.
- Parameters
table_path – Path
ObjectPath
of the table.partition_spec – Partition spec
CatalogPartitionSpec
of the partition.partition – The partition
CatalogPartition
to add.ignore_if_exists – Flag to specify behavior if a table with the given name already exists: if set to false, it throws a TableAlreadyExistException, if set to true, nothing happens.
- Raise
CatalogException in case of any runtime exception. TableNotExistException thrown if the target table does not exist. TableNotPartitionedException thrown if the target table is not partitioned. PartitionSpecInvalidException thrown if the given partition spec is invalid. PartitionAlreadyExistsException thrown if the target partition already exists.
-
create_table
(table_path, table, ignore_if_exists)[source]¶ Create a new table or view.
- Parameters
table_path – Path
ObjectPath
of the table or view to be created.table – The table definition
CatalogBaseTable
.ignore_if_exists – Flag to specify behavior when a table or view already exists at the given path: if set to false, it throws a TableAlreadyExistException, if set to true, do nothing.
- Raise
CatalogException in case of any runtime exception. DatabaseNotExistException if the database in tablePath doesn’t exist. TableAlreadyExistException if table already exists and ignoreIfExists is false.
-
database_exists
(database_name)[source]¶ Check if a database exists in this catalog.
- Parameters
database_name – Name of the database.
- Returns
true if the given database exists in the catalog false otherwise.
- Raise
CatalogException in case of any runtime exception.
-
drop_database
(name, ignore_if_exists)[source]¶ Drop a database.
- Parameters
name – Name of the database to be dropped.
ignore_if_exists – Flag to specify behavior when the database does not exist: if set to false, throw an exception, if set to true, do nothing.
- Raise
CatalogException in case of any runtime exception. DatabaseNotExistException if the given database does not exist.
-
drop_function
(function_path, ignore_if_not_exists)[source]¶ Drop a function.
- Parameters
function_path – Path
ObjectPath
of the function to be dropped.ignore_if_not_exists – Flag to specify behavior if the function does not exist: if set to false, throw an exception if set to true, nothing happens.
- Raise
CatalogException in case of any runtime exception. FunctionNotExistException if the function does not exist.
-
drop_partition
(table_path, partition_spec, ignore_if_not_exists)[source]¶ Drop a partition.
- Parameters
table_path – Path
ObjectPath
of the table.partition_spec – Partition spec
CatalogPartitionSpec
of the partition to drop.ignore_if_not_exists – Flag to specify behavior if the database does not exist: if set to false, throw an exception, if set to true, nothing happens.
- Raise
CatalogException in case of any runtime exception. PartitionNotExistException thrown if the target partition does not exist.
-
drop_table
(table_path, ignore_if_not_exists)[source]¶ Drop a table or view.
- Parameters
table_path – Path
ObjectPath
of the table or view to be dropped.ignore_if_not_exists – Flag to specify behavior when the table or view does not exist: if set to false, throw an exception, if set to true, do nothing.
- Raise
CatalogException in case of any runtime exception. TableNotExistException if the table or view does not exist.
-
function_exists
(function_path)[source]¶ Check whether a function exists or not.
- Parameters
function_path – Path
ObjectPath
of the function.- Returns
true if the function exists in the catalog false otherwise.
- Raise
CatalogException in case of any runtime exception.
-
get_database
(database_name)[source]¶ Get a database from this catalog.
- Parameters
database_name – Name of the database.
- Returns
The requested database
CatalogDatabase
.- Raise
CatalogException in case of any runtime exception. DatabaseNotExistException if the database does not exist.
-
get_default_database
()[source]¶ Get the name of the default database for this catalog. The default database will be the current database for the catalog when user’s session doesn’t specify a current database. The value probably comes from configuration, will not change for the life time of the catalog instance.
- Returns
The name of the current database.
- Raise
CatalogException in case of any runtime exception.
-
get_function
(function_path)[source]¶ Get the function.
- Parameters
function_path – Path
ObjectPath
of the function.- Returns
The requested function
CatalogFunction
.- Raise
CatalogException in case of any runtime exception. FunctionNotExistException if the function does not exist in the catalog.
-
get_partition
(table_path, partition_spec)[source]¶ Get a partition of the given table. The given partition spec keys and values need to be matched exactly for a result.
- Parameters
table_path – Path
ObjectPath
of the table.partition_spec – The partition spec
CatalogPartitionSpec
of partition to get.
- Returns
The requested partition
CatalogPartition
.- Raise
CatalogException in case of any runtime exception. PartitionNotExistException thrown if the partition doesn’t exist.
-
get_partition_column_statistics
(table_path, partition_spec)[source]¶ Get the column statistics of a partition.
- Parameters
table_path – Path
ObjectPath
of the table.partition_spec – Partition spec
CatalogPartitionSpec
of the partition.
- Returns
The column statistics
CatalogColumnStatistics
of the given partition.- Raise
CatalogException in case of any runtime exception. PartitionNotExistException if the partition does not exist.
-
get_partition_statistics
(table_path, partition_spec)[source]¶ Get the statistics of a partition.
- Parameters
table_path – Path
ObjectPath
of the table.partition_spec – Partition spec
CatalogPartitionSpec
of the partition.
- Returns
The statistics
CatalogTableStatistics
of the given partition.- Raise
CatalogException in case of any runtime exception. PartitionNotExistException if the partition does not exist.
-
get_table
(table_path)[source]¶ Get a CatalogTable or CatalogView identified by tablePath.
- Parameters
table_path – Path
ObjectPath
of the table or view.- Returns
The requested table or view
CatalogBaseTable
.- Raise
CatalogException in case of any runtime exception. TableNotExistException if the target does not exist.
-
get_table_column_statistics
(table_path)[source]¶ Get the column statistics of a table.
- Parameters
table_path – Path
ObjectPath
of the table.- Returns
The column statistics
CatalogColumnStatistics
of the given table.- Raise
CatalogException in case of any runtime exception. TableNotExistException if the table does not exist in the catalog.
-
get_table_statistics
(table_path)[source]¶ Get the statistics of a table.
- Parameters
table_path – Path
ObjectPath
of the table.- Returns
The statistics
CatalogTableStatistics
of the given table.- Raise
CatalogException in case of any runtime exception. TableNotExistException if the table does not exist in the catalog.
-
list_databases
()[source]¶ Get the names of all databases in this catalog.
- Returns
A list of the names of all databases.
- Raise
CatalogException in case of any runtime exception.
-
list_functions
(database_name)[source]¶ List the names of all functions in the given database. An empty list is returned if none is registered.
- Parameters
database_name – Name of the database.
- Returns
A list of the names of the functions in this database.
- Raise
CatalogException in case of any runtime exception. DatabaseNotExistException if the database does not exist.
-
list_partitions
(table_path, partition_spec=None)[source]¶ Get CatalogPartitionSpec of all partitions of the table.
- Parameters
table_path – Path
ObjectPath
of the table.partition_spec – The partition spec
CatalogPartitionSpec
to list.
- Returns
A list of
CatalogPartitionSpec
of the table.- Raise
CatalogException in case of any runtime exception. TableNotExistException thrown if the table does not exist in the catalog. TableNotPartitionedException thrown if the table is not partitioned.
-
list_tables
(database_name)[source]¶ Get names of all tables and views under this database. An empty list is returned if none exists.
- Parameters
database_name – Name of the given database.
- Returns
A list of the names of all tables and views in this database.
- Raise
CatalogException in case of any runtime exception. DatabaseNotExistException if the database does not exist.
-
list_views
(database_name)[source]¶ Get names of all views under this database. An empty list is returned if none exists.
- Parameters
database_name – Name of the given database.
- Returns
A list of the names of all views in the given database.
- Raise
CatalogException in case of any runtime exception. DatabaseNotExistException if the database does not exist.
-
partition_exists
(table_path, partition_spec)[source]¶ Check whether a partition exists or not.
- Parameters
table_path – Path
ObjectPath
of the table.partition_spec – Partition spec
CatalogPartitionSpec
of the partition to check.
- Returns
true if the partition exists.
- Raise
CatalogException in case of any runtime exception.
-
rename_table
(table_path, new_table_name, ignore_if_not_exists)[source]¶ Rename an existing table or view.
- Parameters
table_path – Path
ObjectPath
of the table or view to be renamed.new_table_name – The new name of the table or view.
ignore_if_not_exists – Flag to specify behavior when the table or view does not exist: if set to false, throw an exception, if set to true, do nothing.
- Raise
CatalogException in case of any runtime exception. TableNotExistException if the table does not exist.
-
table_exists
(table_path)[source]¶ Check if a table or view exists in this catalog.
- Parameters
table_path – Path
ObjectPath
of the table or view.- Returns
true if the given table exists in the catalog false otherwise.
- Raise
CatalogException in case of any runtime exception.
-
-
class
pyflink.table.catalog.
CatalogDatabase
(j_catalog_database)[source]¶ Represents a database object in a catalog.
-
copy
()[source]¶ Get a deep copy of the CatalogDatabase instance.
- Returns
A copy of CatalogDatabase instance.
-
get_description
()[source]¶ Get a brief description of the database.
- Returns
An optional short description of the database.
-
-
class
pyflink.table.catalog.
CatalogBaseTable
(j_catalog_base_table)[source]¶ CatalogBaseTable is the common parent of table and view. It has a map of key-value pairs defining the properties of the table.
-
copy
()[source]¶ Get a deep copy of the CatalogBaseTable instance.
- Returns
An copy of the CatalogBaseTable instance.
-
get_description
()[source]¶ Get a brief description of the table or view.
- Returns
An optional short description of the table/view.
-
get_detailed_description
()[source]¶ Get a detailed description of the table or view.
- Returns
An optional long description of the table/view.
-
get_options
()[source]¶ Returns a map of string-based options.
In case of CatalogTable, these options may determine the kind of connector and its configuration for accessing the data in the external system.
- Returns
Property map of the table/view.
New in version 1.11.0.
-
-
class
pyflink.table.catalog.
CatalogPartition
(j_catalog_partition)[source]¶ Represents a partition object in catalog.
-
copy
()[source]¶ Get a deep copy of the CatalogPartition instance.
- Returns
A copy of CatalogPartition instance.
-
get_comment
()[source]¶ Get comment of the partition.
- Returns
Comment of the partition.
- Return type
str
-
get_description
()[source]¶ Get a brief description of the partition object.
- Returns
An optional short description of partition object.
-
-
class
pyflink.table.catalog.
CatalogFunction
(j_catalog_function)[source]¶ Interface for a function in a catalog.
-
get_class_name
()[source]¶ Get the full name of the class backing the function.
- Returns
The full name of the class.
-
get_description
()[source]¶ Get a brief description of the function.
- Returns
An optional short description of function.
-
get_detailed_description
()[source]¶ Get a detailed description of the function.
- Returns
An optional long description of the function.
-
-
class
pyflink.table.catalog.
ObjectPath
(database_name=None, object_name=None, j_object_path=None)[source]¶ A database name and object (table/view/function) name combo in a catalog.
-
class
pyflink.table.catalog.
CatalogPartitionSpec
(partition_spec)[source]¶ Represents a partition spec object in catalog. Partition columns and values are NOT of strict order, and they need to be re-arranged to the correct order by comparing with a list of strictly ordered partition keys.
-
class
pyflink.table.catalog.
CatalogTableStatistics
(row_count=None, field_count=None, total_size=None, raw_data_size=None, properties=None, j_catalog_table_statistics=None)[source]¶ Statistics for a non-partitioned table or a partition of a partitioned table.