This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
$$ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \newcommand\rfrac[2]{^{#1}\!/_{#2}} \newcommand{\norm}[1]{\left\lVert#1\right\rVert} $$
Important: Maven artifacts which depend on Scala are now suffixed with the Scala major version, e.g. "2.10" or "2.11". Please consult the migration guide on the project Wiki.

Optimization

Mathematical Formulation

The optimization framework in FlinkML is a developer-oriented package that can be used to solve optimization problems common in Machine Learning (ML) tasks. In the supervised learning context, this usually involves finding a model, as defined by a set of parameters $w$, that minimize a function $f(\wv)$ given a set of $(\x, y)$ examples, where $\x$ is a feature vector and $y$ is a real number, which can represent either a real value in the regression case, or a class label in the classification case. In supervised learning, the function to be minimized is usually of the form:

\begin{equation} \label{eq:objectiveFunc} f(\wv) := \frac1n \sum_{i=1}^n L(\wv;\x_i,y_i) + \lambda\, R(\wv) \ . \end{equation}

where $L$ is the loss function and $R(\wv)$ the regularization penalty. We use $L$ to measure how well the model fits the observed data, and we use $R$ in order to impose a complexity cost to the model, with $\lambda > 0$ being the regularization parameter.

Loss Functions

In supervised learning, we use loss functions in order to measure the model fit, by penalizing errors in the predictions $p$ made by the model compared to the true $y$ for each example. Different loss functions can be used for regression (e.g. Squared Loss) and classification (e.g. Hinge Loss) tasks.

Some common loss functions are:

  • Squared Loss: $ \frac{1}{2} \left(\wv^T \cdot \x - y\right)^2, \quad y \in \R $
  • Hinge Loss: $ \max \left(0, 1 - y ~ \wv^T \cdot \x\right), \quad y \in {-1, +1} $
  • Logistic Loss: $ \log\left(1+\exp\left( -y ~ \wv^T \cdot \x\right)\right), \quad y \in {-1, +1}$

Regularization Types

Regularization in machine learning imposes penalties to the estimated models, in order to reduce overfitting. The most common penalties are the $L_1$ and $L_2$ penalties, defined as:

  • $L_1$: $R(\wv) = \norm{\wv}_1$
  • $L_2$: $R(\wv) = \frac{1}{2}\norm{\wv}_2^2$

The $L_2$ penalty penalizes large weights, favoring solutions with more small weights rather than few large ones. The $L_1$ penalty can be used to drive a number of the solution coefficients to 0, thereby producing sparse solutions. The regularization constant $\lambda$ in $\eqref{eq:objectiveFunc}$ determines the amount of regularization applied to the model, and is usually determined through model cross-validation. A good comparison of regularization types can be found in this paper by Andrew Ng. Which regularization type is supported depends on the actually used optimization algorithm.

Stochastic Gradient Descent

In order to find a (local) minimum of a function, Gradient Descent methods take steps in the direction opposite to the gradient of the function $\eqref{eq:objectiveFunc}$ taken with respect to the current parameters (weights). In order to compute the exact gradient we need to perform one pass through all the points in a dataset, making the process computationally expensive. An alternative is Stochastic Gradient Descent (SGD) where at each iteration we sample one point from the complete dataset and update the parameters for each point, in an online manner.

In mini-batch SGD we instead sample random subsets of the dataset, and compute the gradient over each batch. At each iteration of the algorithm we update the weights once, based on the average of the gradients computed from each mini-batch.

An important parameter is the learning rate $\eta$, or step size, which can be determined by one of five methods, listed below. The setting of the initial step size can significantly affect the performance of the algorithm. For some practical tips on tuning SGD see Leon Botou’s “Stochastic Gradient Descent Tricks”.

The current implementation of SGD uses the whole partition, making it effectively a batch gradient descent. Once a sampling operator has been introduced in Flink, true mini-batch SGD will be performed.

Regularization

FlinkML supports Stochastic Gradient Descent with L1, L2 and no regularization. The following list contains a mapping between the implementing classes and the regularization function.

Class Name Regularization function $R(\wv)$
SimpleGradient $R(\wv) = 0$
GradientDescentL1 $R(\wv) = \norm{\wv}_1$
GradientDescentL2 $R(\wv) = \frac{1}{2}\norm{\wv}_2^2$

Parameters

The stochastic gradient descent implementation can be controlled by the following parameters:

Parameter Description
LossFunction

The loss function to be optimized. (Default value: None)

RegularizationConstant

The amount of regularization to apply. (Default value: 0.1)

Iterations

The maximum number of iterations. (Default value: 10)

LearningRate

Initial learning rate for the gradient descent method. This value controls how far the gradient descent method moves in the opposite direction of the gradient. (Default value: 0.1)

ConvergenceThreshold

When set, iterations stop if the relative change in the value of the objective function $\eqref{eq:objectiveFunc}$ is less than the provided threshold, $\tau$. The convergence criterion is defined as follows: $\left| \frac{f(\wv)_{i-1} - f(\wv)_i}{f(\wv)_{i-1}}\right| < \tau$. (Default value: None)

LearningRateMethod

(Default value: LearningRateMethod.Default)

Decay

(Default value: 0.0)

Loss Function

The loss function which is minimized has to implement the LossFunction interface, which defines methods to compute the loss and the gradient of it. Either one defines ones own LossFunction or one uses the GenericLossFunction class which constructs the loss function from an outer loss function and a prediction function. An example can be seen here

val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction)

The full list of supported outer loss functions can be found here. The full list of supported prediction functions can be found here.

Partial Loss Function Values

Function Name Description Loss Loss Derivative
SquaredLoss

Loss function most commonly used for regression tasks.

$\frac{1}{2} (\wv^T \cdot \x - y)^2$ $\wv^T \cdot \x - y$

Prediction Function Values

Function Name Description Prediction Prediction Gradient
LinearPrediction

The function most commonly used for linear models, such as linear regression and linear classifiers.

$\x^T \cdot \wv$ $\x$

Effective Learning Rate

Where:

  • $j$ is the iteration number

  • $\eta_j$ is the step size on step $j$

  • $\eta_0$ is the initial step size

  • $\lambda$ is the regularization constant

  • $\tau$ is the decay constant, which causes the learning rate to be a decreasing function of $j$, that is to say as iterations increase, learning rate decreases. The exact rate of decay is function specific, see Inverse Scaling and Wei Xu’s Method (which is an extension of the Inverse Scaling method).

Function Name Description Function Called As
Default

The function default method used for determining the step size. This is equivalent to the inverse scaling method for $\tau$ = 0.5. This special case is kept as the default to maintain backwards compatibility.

$\eta_j = \eta_0/\sqrt{j}$ LearningRateMethod.Default
Constant

The step size is constant throughout the learning task.

$\eta_j = \eta_0$ LearningRateMethod.Constant
Leon Bottou's Method

This is the 'optimal' method of sklearn. The optimal initial value $t_0$ has to be provided. Sklearn uses the following heuristic: $t_0 = \max(1.0, L^\prime(-\beta, 1.0) / (\alpha \cdot \beta)$ with $\beta = \sqrt{\frac{1}{\sqrt{\alpha}}}$ and $L^\prime(prediction, truth)$ being the derivative of the loss function.

$\eta_j = 1 / (\lambda \cdot (t_0 + j -1)) $ LearningRateMethod.Bottou
Inverse Scaling

A very common method for determining the step size.

$\eta_j = \eta_0 / j^{\tau}$ LearningRateMethod.InvScaling
Wei Xu's Method

Method proposed by Wei Xu in Towards Optimal One Pass Large Scale Learning with Averaged Stochastic Gradient Descent

$\eta_j = \eta_0 \cdot (1+ \lambda \cdot \eta_0 \cdot j)^{-\tau} $ LearningRateMethod.Xu

Examples

In the Flink implementation of SGD, given a set of examples in a DataSet[LabeledVector] and optionally some initial weights, we can use GradientDescentL1.optimize() in order to optimize the weights for the given data.

The user can provide an initial DataSet[WeightVector], which contains one WeightVector element, or use the default weights which are all set to 0. A WeightVector is a container class for the weights, which separates the intercept from the weight vector. This allows us to avoid applying regularization to the intercept.

// Create stochastic gradient descent solver
val sgd = GradientDescentL1()
  .setLossFunction(SquaredLoss())
  .setRegularizationConstant(0.2)
  .setIterations(100)
  .setLearningRate(0.01)
  .setLearningRateMethod(LearningRateMethod.Xu(-0.75))


// Obtain data
val trainingDS: DataSet[LabeledVector] = ...

// Optimize the weights, according to the provided data
val weightDS = sgd.optimize(trainingDS)