This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
User-defined functions are important features, because they significantly extend the expressiveness of Python Table API programs.
NOTE: Python UDF execution requires Python version (3.5, 3.6 or 3.7) with PyFlink installed. It’s required on both the client side and the cluster side.
It supports to use Python scalar functions in Python Table API programs. In order to define a Python scalar function,
one can extend the base class ScalarFunction in pyflink.table.udf and implement an evaluation method.
The behavior of a Python scalar function is defined by the evaluation method which is named eval.
The evaluation method can support variable arguments, such as eval(*args).
The following example shows how to define your own Python hash code function, register it in the TableEnvironment, and call it in a query.
Note that you can configure your scalar function via a constructor before it is registered:
Note If not using RocksDB as state backend, you can also configure the python
worker to use the managed memory of taskmanager by setting python.fn-execution.memory.managed to be true.
Then there is no need to set the the configuration taskmanager.memory.task.off-heap.size.
It also supports to use Java/Scala scalar functions in Python Table API programs.
Note If not using RocksDB as state backend, you can also configure the python
worker to use the managed memory of taskmanager by setting python.fn-execution.memory.managed to be true.
Then there is no need to set the the configuration taskmanager.memory.task.off-heap.size.
There are many ways to define a Python scalar function besides extending the base class ScalarFunction.
The following examples show the different ways to define a Python scalar function which takes two columns of
bigint as the input parameters and returns the sum of them as the result.
Table Functions
Similar to a Python user-defined scalar function, a user-defined table function takes zero, one, or
multiple scalar values as input parameters. However in contrast to a scalar function, it can return
an arbitrary number of rows as output instead of a single value. The return type of a Python UDTF
could be of types Iterable, Iterator or generator.
The following example shows how to define your own Python multi emit function, register it in the
TableEnvironment, and call it in a query.
Note If not using RocksDB as state backend, you can also configure the python
worker to use the managed memory of taskmanager by setting python.fn-execution.memory.managed to be true.
Then there is no need to set the the configuration taskmanager.memory.task.off-heap.size.
It also supports to use Java/Scala table functions in Python Table API programs.
Note If not using RocksDB as state backend, you can also configure the python
worker to use the managed memory of taskmanager by setting python.fn-execution.memory.managed to be true.
Then there is no need to set the the configuration taskmanager.memory.task.off-heap.size.
Like Python scalar functions, you can use the above five ways to define Python TableFunctions.
Note The only difference is that the return type of Python Table Functions needs to be an iterable, iterator or generator.