This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
$$ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \newcommand\rfrac[2]{^{#1}\!/_{#2}} \newcommand{\norm}[1]{\left\lVert#1\right\rVert} $$

k-Nearest Neighbors Join


Implements an exact k-nearest neighbors join algorithm. Given a training set $A$ and a testing set $B$, the algorithm returns

The brute-force approach is to compute the distance between every training and testing point. To ease the brute-force computation of computing the distance between every training point a quadtree is used. The quadtree scales well in the number of training points, though poorly in the spatial dimension. The algorithm will automatically choose whether or not to use the quadtree, though the user can override that decision by setting a parameter to force use or not use a quadtree.


KNN is a Predictor. As such, it supports the fit and predict operation.


KNN is trained by a given set of Vector:

  • fit[T <: Vector]: DataSet[T] => Unit


KNN predicts for all subtypes of FlinkML’s Vector the corresponding class label:

  • predict[T <: Vector]: DataSet[T] => DataSet[(T, Array[Vector])], where the (T, Array[Vector]) tuple corresponds to (test point, K-nearest training points)


The KNN implementation can be controlled by the following parameters:

Parameters Description

Defines the number of nearest-neighbors to search for. That is, for each test point, the algorithm finds the K-nearest neighbors in the training set (Default value: 5)


Sets the distance metric we use to calculate the distance between two points. If no metric is specified, then [[]] is used. (Default value: EuclideanDistanceMetric)


Sets the number of blocks into which the input data will be split. This number should be set at least to the degree of parallelism. If no value is specified, then the parallelism of the input [[DataSet]] is used as the number of blocks. (Default value: None)


A boolean variable that whether or not to use a quadtree to partition the training set to potentially simplify the KNN search. If no value is specified, the code will automatically decide whether or not to use a quadtree. Use of a quadtree scales well with the number of training and testing points, though poorly with the dimension. (Default value: None)


Specifies whether the training set or test set is small to optimize the cross product operation needed for the KNN search. If the training set is small this should be `CrossHint.FIRST_IS_SMALL` and set to `CrossHint.SECOND_IS_SMALL` if the test set is small. (Default value: None)


import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

// prepare data
val trainingSet: DataSet[Vector] = ...
val testingSet: DataSet[Vector] = ...

val knn = KNN()

// run knn join
val result = knn.predict(testingSet).collect()

For more details on the computing KNN with and without and quadtree, here is a presentation: