PyFlink Table 和 Pandas DataFrame 互转
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

PyFlink Table 和 Pandas DataFrame 互转 #

PyFlink 支持 PyFlink Table 和 Pandas DataFrame 之间进行互转。

PyFlink 支持将 Pandas DataFrame 转换成 PyFlink Table。在内部实现上,会在客户端将 Pandas DataFrame 序列化成 Arrow 列存格式,序列化后的数据 在作业执行期间,在 Arrow 源中会被反序列化,并进行处理。Arrow 源除了可以用在批作业中外,还可以用于流作业,它将正确处理检查点并提供恰好一次的保证。

以下示例显示如何从 Pandas DataFrame 创建 PyFlink Table:

from pyflink.table import DataTypes

import pandas as pd
import numpy as np

# 创建一个Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(1000, 2))

# 由Pandas DataFrame创建PyFlink表
table = t_env.from_pandas(pdf)

# 由Pandas DataFrame创建指定列名的PyFlink表
table = t_env.from_pandas(pdf, ['f0', 'f1'])

# 由Pandas DataFrame创建指定列类型的PyFlink表
table = t_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()])

# 由Pandas DataFrame创建列名和列类型的PyFlink表
table = t_env.from_pandas(pdf,
                          DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.DOUBLE()),
                                         DataTypes.FIELD("f1", DataTypes.DOUBLE())]))

除此之外,还支持将 PyFlink Table 转换为 Pandas DataFrame。在内部实现上,它将执行表的计算逻辑,得到物化之后的表的执行结果,并 在客户端将其序列化为 Arrow 列存格式,最大 Arrow 批处理大小 由配置选项python.fn-execution.arrow.batch.size 确定。 序列化后的数据将被转换为 Pandas DataFrame。这意味着需要把表的内容收集到客户端,因此在调用此函数之前,请确保表的内容可以容纳在内存中。 可以通过 Table.limit ,设置收集到客户端的数据的条数。

以下示例显示了如何将 PyFlink Table 转换为 Pandas DataFrame:

from pyflink.table.expressions import col

import pandas as pd
import numpy as np

# 创建PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)

# 转换PyFlink Table为Pandas DataFrame
pdf = table.limit(100).to_pandas()