向量化Python用户自定义函数,是在执行时,通过在JVM和Python VM之间以Arrow列存格式批量传输数据,来执行的函数。
向量化Python用户自定义函数的性能通常比非向量化Python用户自定义函数要高得多,因为向量化Python用户自定义函数可以大大减少序列化/反序列化的开销和调用开销。
此外,用户可以利用流行的Python库(例如Pandas,Numpy等)来实现向量化Python用户自定义函数的逻辑。这些Python库通常经过高度优化,并提供了高性能的数据结构和功能。
向量化用户自定义函数的定义,与非向量化用户自定义函数具有相似的方式,
用户只需要在调用udf
装饰器时添加一个额外的参数udf_type="pandas"
,将其标记为一个向量化用户自定义函数即可。
注意:要执行Python UDF,需要安装PyFlink的Python版本(3.5、3.6或3.7)。客户端和群集端都需要安装它。
向量化Python标量函数以pandas.Series
类型的参数作为输入,并返回与输入长度相同的pandas.Series
。
在内部实现中,Flink会将输入数据拆分为多个批次,并将每一批次的输入数据转换为Pandas.Series
类型,
然后为每一批输入数据调用用户自定义的向量化Python标量函数。请参阅配置选项
python.fn-execution.arrow.batch.size,
以获取有关如何配置批次大小的更多详细信息。
向量化Python标量函数可以在任何可以使用非向量化Python标量函数的地方使用。
以下示例显示了如何定义自己的向量化Python标量函数,该函数计算两列的总和,并在查询中使用它:
注意如果不使用RocksDB作为状态后端,则还可以通过 将python.fn-execution.memory.managed设置为true ,来配置python worker以使用taskmanager的托管内存, 则无需配置taskmanager.memory.task.off-heap.size 。