Operators
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Operators #

Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated dataflow topologies.

DataStream Transformations #

DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., mapping, filtering, reducing). Please see operators for an overview of the available stream transformations in Python DataStream API.

Functions #

Most transformations require a user-defined function as input to define the functionality of the transformation. The following describes different ways of defining user-defined functions.

Implementing Function Interfaces #

Different Function interfaces are provided for different transformations in the Python DataStream API. For example, MapFunction is provided for the map transformation, FilterFunction is provided for the filter transformation, etc. Users can implement the corresponding Function interface according to the type of the transformation. Take MapFunction for instance:

# Implementing MapFunction
class MyMapFunction(MapFunction):
    
    def map(self, value):
        return value + 1
        
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())

Note In Python DataStream API, users can specify the output type information of the transformation explicityly. If not specified, the output type will be Types.PICKLED_BYTE_ARRAY so that data will be in a form of byte array generated by the pickle seriallizer. For more details about the Pickle Serialization, please refer to DataTypes.

Lambda Function #

As shown in the following example, all the transformations can also accept a lambda function to define the functionality of the transformation:

data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())

Note Operations ConnectedStream.map() and ConnectedStream.flat_map() do not support lambda function and must accept CoMapFunction and CoFlatMapFunction seperately.

Python Function #

Users can also use Python function:

def my_map_func(value):
    return value + 1

data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())