注意 唯一的区别是,Python表值函数的返回类型必须是iterable(可迭代子类), iterator(迭代器) or generator(生成器)。
聚合函数(AggregateFunction)
A user-defined aggregate function (UDAGG) maps scalar values of multiple rows to a new scalar value.
NOTE: Currently the general user-defined aggregate function is only supported in the GroupBy aggregation of the blink planner in streaming mode. For batch mode or windowed aggregation, it’s currently not supported and it is recommended to use the Vectorized Aggregate Functions.
The behavior of an aggregate function is centered around the concept of an accumulator. The accumulator
is an intermediate data structure that stores the aggregated values until a final aggregation result
is computed.
For each set of rows that need to be aggregated, the runtime will create an empty accumulator by calling
create_accumulator(). Subsequently, the accumulate(...) method of the aggregate function will be called for each input
row to update the accumulator. Currently after each row has been processed, the get_value(...) method of the
aggregate function will be called to compute the aggregated result.
The following example illustrates the aggregation process:
In the above example, we assume a table that contains data about beverages. The table consists of three columns (id, name,
and price) and 5 rows. We would like to find the highest price of all beverages in the table, i.e., perform
a max() aggregation.
In order to define an aggregate function, one has to extend the base class AggregateFunction in
pyflink.table and implement the evaluation method named accumulate(...).
The result type and accumulator type of the aggregate function can be specified by one of the following two approaches:
Implement the method named get_result_type() and get_accumulator_type().
Wrap the function instance with the decorator udaf in pyflink.table.udf and specify the parameters result_type and accumulator_type.
The following example shows how to define your own aggregate function and call it in a query.
The accumulate(...) method of our WeightedAvg class takes three input arguments. The first one is the accumulator
and the other two are user-defined inputs. In order to calculate a weighted average value, the accumulator
needs to store the weighted sum and count of all the data that have already been accumulated. In our example, we
use a Row object as the accumulator. Accumulators will be managed
by Flink’s checkpointing mechanism and are restored in case of failover to ensure exactly-once semantics.
Mandatory and Optional Methods
The following methods are mandatory for each AggregateFunction:
create_accumulator()
accumulate(...)
get_value(...)
The following methods of AggregateFunction are required depending on the use case:
retract(...) is required when there are operations that could generate retraction messages before the current aggregation operation, e.g. group aggregate, outer join.
This method is optional, but it is strongly recommended to be implemented to ensure the UDAF can be used in any use case.
get_result_type() and get_accumulator_type() is required if the result type and accumulator type would not be specified in the udaf decorator.
ListView and MapView
If an accumulator needs to store large amounts of data, pyflink.table.ListView and pyflink.table.MapView
could be used instead of list and dict. These two data structures provide the similar functionalities as list and dict,
however usually having better performance by leveraging Flink’s state backend to eliminate unnecessary state access.
You can use them by declaring DataTypes.LIST_VIEW(...) and DataTypes.MAP_VIEW(...) in the accumulator type, e.g.:
Currently there are 2 limitations to use the ListView and MapView:
The accumulator must be a Row.
The ListView and MapView must be the first level children of the Row accumulator.
NOTE: For reducing the data transmission cost between Python UDF worker and Java process caused by accessing the data in Flink states(e.g. accumulators and data views),
there is a cached layer between the raw state handler and the Python state backend. You can adjust the values of these configuration options to change the behavior of the cache layer for best performance:
python.state.cache-size, python.map-state.read-cache-size, python.map-state.write-cache-size, python.map-state.iterate-response-batch-size.
For more details please refer to the Python Configuration Documentation.