- 应用开发
- Python API
- Table API用户指南
- Python Table API 简介
Python Table API 简介
本文档是 Apache Flink 的旧版本。建议访问 最新的稳定版本。
本文档是对 PyFlink Table API 的简要介绍,用于帮助新手用户快速理解 PyFlink Table API 的基本用法。
关于高级用法,请参阅用户指南中的其他文档。
Python Table API 程序的基本结构
所有的 Table API 和 SQL 程序,不管批模式,还是流模式,都遵循相同的结构。下面代码示例展示了 Table API 和 SQL 程序的基本结构。
Back to top
创建 TableEnvironment
TableEnvironment
是 Table API 和 SQL 集成的核心概念。下面代码示例展示了如何创建一个 TableEnvironment
:
关于创建 TableEnvironment
的更多细节,请查阅 TableEnvironment 文档。
TableEnvironment
可以用来:
目前有2个可用的执行器 : flink 执行器 和 blink 执行器。
你应该在当前程序中显式地设置使用哪个执行器,建议尽可能使用 blink 执行器。
Back to top
创建表
Table
是 Python Table API 的核心组件。Table
是 Table API 作业中间结果的逻辑表示。
一个 Table
实例总是与一个特定的 TableEnvironment
相绑定。不支持在同一个查询中合并来自不同 TableEnvironments 的表,例如 join 或者 union 它们。
通过列表类型的对象创建
你可以使用一个列表对象创建一张表:
结果为:
你也可以创建具有指定列名的表:
结果为:
默认情况下,表结构是从数据中自动提取的。
如果自动生成的表模式不符合你的要求,你也可以手动指定:
结果为:
通过 DDL 创建
你可以通过 DDL 创建一张表:
结果为:
通过 Catalog 创建
TableEnvironment
维护了一个使用标识符创建的表的 catalogs 映射。
Catalog 中的表既可以是临时的,并与单个 Flink 会话生命周期相关联,也可以是永久的,跨多个 Flink 会话可见。
通过 SQL DDL 创建的表和视图, 例如 “create table …” 和 “create view …“,都存储在 catalog 中。
你可以通过 SQL 直接访问 catalog 中的表。
如果你要用 Table API 来使用 catalog 中的表,可以使用 “from_path” 方法来创建 Table API 对象:
结果为:
Back to top
查询
Table API 查询
Table
对象有许多方法,可以用于进行关系操作。
这些方法返回新的 Table
对象,表示对输入 Table
应用关系操作之后的结果。
这些关系操作可以由多个方法调用组成,例如 table.group_by(...).select(...)
。
Table API 文档描述了流和批处理上所有支持的 Table API 操作。
以下示例展示了一个简单的 Table API 聚合查询:
结果为:
SQL 查询
Flink 的 SQL 基于 Apache Calcite,它实现了标准的 SQL。SQL 查询语句使用字符串来表达。
SQL 文档描述了 Flink 对流和批处理所支持的 SQL。
下面示例展示了一个简单的 SQL 聚合查询:
结果为:
实际上,上述输出展示了 print 结果表所接收到的 change log。
change log 的格式为:
例如,”2> +I(4,11)” 表示这条消息来自第二个 subtask,其中 “+I” 表示这是一条插入的消息,”(4, 11)” 是这条消息的内容。
另外,”-U” 表示这是一条撤回消息 (即更新前),这意味着应该在 sink 中删除或撤回该消息。
“+U” 表示这是一条更新的记录 (即更新后),这意味着应该在 sink 中更新或插入该消息。
所以,从上面的 change log,我们可以得到如下结果:
Table API 和 SQL 的混合使用
Table API 中的 Table
对象和 SQL 中的 Table 可以自由地相互转换。
下面例子展示了如何在 SQL 中使用 Table
对象:
结果为:
下面例子展示了如何在 Table API 中使用 SQL 表:
结果为:
Back to top
将结果写出
将结果数据收集到客户端
你可以调用 “to_pandas” 方法来 将一个 Table
对象转化成 pandas DataFrame:
结果为:
Note “to_pandas” 会触发表的物化,同时将表的内容收集到客户端内存中,所以通过 Table.limit 来限制收集数据的条数是一种很好的做法。
Note flink planner 不支持 “to_pandas”,并且,并不是所有的数据类型都可以转换为 pandas DataFrames。
将结果写入到一张 Sink 表中
你可以调用 “execute_insert” 方法来将 Table
对象中的数据写入到一张 sink 表中:
结果为:
也可以通过 SQL 来完成:
将结果写入多张 Sink 表中
你也可以使用 StatementSet
在一个作业中将 Table
中的数据写入到多张 sink 表中:
结果为:
Explain 表
Table API 提供了一种机制来查看 Table
的逻辑查询计划和优化后的查询计划。
这是通过 Table.explain()
或者 StatementSet.explain()
方法来完成的。Table.explain()
可以返回一个 Table
的执行计划。StatementSet.explain()
则可以返回含有多个 sink 的作业的执行计划。这些方法会返回一个字符串,字符串描述了以下三个方面的信息:
- 关系查询的抽象语法树,即未经优化的逻辑查询计划,
- 优化后的逻辑查询计划,
- 物理执行计划。
TableEnvironment.explain_sql()
和 TableEnvironment.execute_sql()
支持执行 EXPLAIN
语句获得执行计划。更多细节请查阅 EXPLAIN。
以下代码展示了如何使用 Table.explain()
方法:
结果为:
以下代码展示了如何使用 StatementSet.explain()
方法:
结果为