This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

Conversions between PyFlink Table and Pandas DataFrame

It supports to convert between PyFlink Table and Pandas DataFrame.

It supports creating a PyFlink Table from a Pandas DataFrame. Internally, it will serialize the Pandas DataFrame using Arrow columnar format at client side and the serialized data will be processed and deserialized in Arrow source during execution. The Arrow source could also be used in streaming jobs and it will properly handle the checkpoint and provides the exactly once guarantees.

The following example shows how to create a PyFlink Table from a Pandas DataFrame:

import pandas as pd
import numpy as np

# Create a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(1000, 2))

# Create a PyFlink Table from a Pandas DataFrame
table = t_env.from_pandas(pdf)

# Create a PyFlink Table from a Pandas DataFrame with the specified column names
table = t_env.from_pandas(pdf, ['f0', 'f1'])

# Create a PyFlink Table from a Pandas DataFrame with the specified column types
table = t_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()])

# Create a PyFlink Table from a Pandas DataFrame with the specified row type
table = t_env.from_pandas(pdf,
                          DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.DOUBLE()),
                                         DataTypes.FIELD("f1", DataTypes.DOUBLE())])

It also supports converting a PyFlink Table to a Pandas DataFrame. Internally, it will materialize the results of the table and serialize them into multiple Arrow batches of Arrow columnar format at client side. The maximum Arrow batch size is determined by the config option python.fn-execution.arrow.batch.size. The serialized data will then be converted to Pandas DataFrame. It will collect the content of the table to the client side and so please make sure that the content of the table could fit in memory before calling this method.

The following example shows how to convert a PyFlink Table to a Pandas DataFrame:

import pandas as pd
import numpy as np

# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")

# Convert the PyFlink Table to a Pandas DataFrame
pdf = table.to_pandas()