Usually, one can assume that Flink produces correct results outside of a user-defined function. Therefore, it is recommended to test those classes that contain the main business logic with unit tests as much as possible.
Unit Testing Stateless, Timeless UDFs
For example, let’s take the following stateless MapFunction.
It is very easy to unit test such a function with your favorite testing framework by passing suitable arguments and verifying the output.
Similarly, a user-defined function which uses an org.apache.flink.util.Collector (e.g. a FlatMapFunction or ProcessFunction) can be easily tested by providing a mock object instead of a real collector. A FlatMapFunction with the same functionality as the IncrementMapFunction could be unit tested as follows.
Unit Testing Stateful or Timely UDFs & Custom Operators
Testing the functionality of a user-defined function, which makes use of managed state or timers is more difficult because it involves testing the interaction between the user code and Flink’s runtime.
For this Flink comes with a collection of so called test harnesses, which can be used to test such user-defined functions as well as custom operators:
OneInputStreamOperatorTestHarness (for operators on DataStreamss)
KeyedOneInputStreamOperatorTestHarness (for operators on KeyedStreams)
TwoInputStreamOperatorTestHarness (for operators of ConnectedStreams of two DataStreams)
KeyedTwoInputStreamOperatorTestHarness (for operators on ConnectedStreams of two KeyedStreams)
To use the test harnesses a set of additional dependencies (test scoped) is needed.
Now, the test harnesses can be used to push records and watermarks into your user-defined functions or custom operators, control processing time and finally assert on the output of the operator (including side outputs).
KeyedOneInputStreamOperatorTestHarness and KeyedTwoInputStreamOperatorTestHarness are instantiated by additionally providing a KeySelector including TypeInformation for the class of the key.
Many more examples for the usage of these test harnesses can be found in the Flink code base, e.g.:
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest is a good example for testing operators and user-defined functions, which depend on processing or event time.
org.apache.flink.streaming.api.functions.sink.filesystem.LocalStreamingFileSinkTest shows how to test a custom sink with the AbstractStreamOperatorTestHarness. Specifically, it uses AbstractStreamOperatorTestHarness.snapshot and AbstractStreamOperatorTestHarness.initializeState to tests its interaction with Flink’s checkpointing mechanism.
Note Be aware that AbstractStreamOperatorTestHarness and its derived classes are currently not part of the public API and can be subject to change.
Unit Testing ProcessFunction
Given its importance, in addition to the previous test harnesses that can be used directly to test a ProcessFunction, Flink provides a test harness factory named ProcessFunctionTestHarnesses that allows for easier test harness instantiation. Considering this example:
Note Be aware that to use this test harness, you also need to introduce the dependencies mentioned in the last section.
It is very easy to unit test such a function with ProcessFunctionTestHarnesses by passing suitable arguments and verifying the output.
For more examples on how to use the ProcessFunctionTestHarnesses in order to test the different flavours of the ProcessFunction, e.g. KeyedProcessFunction, KeyedCoProcessFunction, BroadcastProcessFunction, etc, the user is encouraged to look at the ProcessFunctionTestHarnessesTest.
Testing Flink Jobs
JUnit Rule MiniClusterWithClientResource
Apache Flink provides a JUnit rule called MiniClusterWithClientResource for testing complete jobs against a local, embedded mini cluster.
To use MiniClusterWithClientResource one additional dependency (test scoped) is needed.
Let us take the same simple MapFunction as in the previous sections.
A simple pipeline using this MapFunction can now be tested in a local Flink cluster as follows.
A few remarks on integration testing with MiniClusterWithClientResource:
In order not to copy your whole pipeline code from production to test, make sources and sinks pluggable in your production code and inject special test sources and test sinks in your tests.
The static variable in CollectSink is used here because Flink serializes all operators before distributing them across a cluster.
Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue.
Alternatively, you could write the data to files in a temporary directory with your test sink.
You can implement a custom parallel source function for emitting watermarks if your job uses event timer timers.
It is recommended to always test your pipelines locally with a parallelism > 1 to identify bugs which only surface for the pipelines executed in parallel.
Prefer @ClassRule over @Rule so that multiple tests can share the same Flink cluster. Doing so saves a significant amount of time since the startup and shutdown of Flink clusters usually dominate the execution time of the actual tests.
If your pipeline contains custom state handling, you can test its correctness by enabling checkpointing and restarting the job within the mini cluster. For this, you need to trigger a failure by throwing an exception from (a test-only) user-defined function in your pipeline.