In this guide we will start from scratch and go from setting up a Flink Python project to running a Python Table API program.
Firstly, you can fire up your favorite IDE and create a Python project and then
you need to install the PyFlink package. You can install the latest PyFlink from PyPI via
pip install apache-flink. If you want to build and install PyFlink from source code, please
see Build PyFlink
for more details about this.
The first step in a Flink Python Table API program is to create a
StreamTableEnvironment if you are writing a streaming job). It is the main entry point
for Python Table API jobs.
StreamExecutionEnvironment if you are writing a streaming job)
can be used to set execution parameters, such as the restart strategy, default parallelism, etc.
TableConfig can be used by setting the parameters such as the built-in catalog name, the
threshold where generating code, etc.
Next we will create a source table and a sink table.
This registers a table named
mySource and a table named
mySink in the
ExecutionEnvironment. The table
mySource has only one column: word.
It represents the words read from file
/tmp/input. The table
mySink has two columns:
word and count. It writes data to file
\t as the field delimiter.
Then we need to create a job which reads input from table
mySource, preforms some
operations and writes the results to table
The last thing is to start the actual Flink Python Table API job. All operations, such as
creating sources, transformations and sinks only build up a graph of internal operations.
t_env.execute(job_name) is called, this graph of operations will be thrown on a cluster or
executed on your local machine.
The complete code so far is as follows:
You can run this example in your IDE or on the command line (suppose the job script file is WordCount.py):
The command builds and runs the Python Table API program in a local mini cluster. You can also submit the Python Table API program to a remote cluster, you can refer Job Submission Examples for more details.
This should get you started with writing your own Flink Python Table API programs. To learn more about the Python Table API, you can refer Flink Python Table API Docs for more details.