This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

$$
\newcommand{\R}{\mathbb{R}}
\newcommand{\E}{\mathbb{E}}
\newcommand{\x}{\mathbf{x}}
\newcommand{\y}{\mathbf{y}}
\newcommand{\wv}{\mathbf{w}}
\newcommand{\av}{\mathbf{\alpha}}
\newcommand{\bv}{\mathbf{b}}
\newcommand{\N}{\mathbb{N}}
\newcommand{\id}{\mathbf{I}}
\newcommand{\ind}{\mathbf{1}}
\newcommand{\0}{\mathbf{0}}
\newcommand{\unit}{\mathbf{e}}
\newcommand{\one}{\mathbf{1}}
\newcommand{\zero}{\mathbf{0}}
\newcommand\rfrac[2]{^{#1}\!/_{#2}}
\newcommand{\norm}[1]{\left\lVert#1\right\rVert}
$$
# SVM using CoCoA

**This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.**
## Description

## Operations

### Fit

### Predict

## Parameters

## Examples

Implements an SVM with soft-margin using the communication-efficient distributed dual coordinate ascent algorithm with hinge-loss function. The algorithm solves the following minimization problem:

with $\mathbf{w}$ being the weight vector, $\lambda$ being the regularization constant, being the data points and being the convex loss functions, which can also depend on the labels . In the current implementation the regularizer is the $\ell_2$-norm and the loss functions are the hinge-loss functions:

With these choices, the problem definition is equivalent to a SVM with soft-margin. Thus, the algorithm allows us to train a SVM with soft-margin.

The minimization problem is solved by applying stochastic dual coordinate ascent (SDCA). In order to make the algorithm efficient in a distributed setting, the CoCoA algorithm calculates several iterations of SDCA locally on a data block before merging the local updates into a valid global state. This state is redistributed to the different data partitions where the next round of local SDCA iterations is then executed. The number of outer iterations and local SDCA iterations control the overall network costs, because there is only network communication required for each outer iteration. The local SDCA iterations are embarrassingly parallel once the individual data partitions have been distributed across the cluster.

The implementation of this algorithm is based on the work of Jaggi et al.

`SVM`

is a `Predictor`

.
As such, it supports the `fit`

and `predict`

operation.

SVM is trained given a set of `LabeledVector`

:

`fit: DataSet[LabeledVector] => Unit`

SVM predicts for all subtypes of FlinkML’s `Vector`

the corresponding class label:

`predict[T <: Vector]: DataSet[T] => DataSet[(T, Double)]`

, where the`(T, Double)`

tuple corresponds to (original_features, label)

If we call evaluate with a `DataSet[(Vector, Double)]`

, we make a prediction on the class label
for each example, and return a `DataSet[(Double, Double)]`

. In each tuple the first element
is the true value, as was provided from the input `DataSet[(Vector, Double)]`

and the second element
is the predicted value. You can then use these `(truth, prediction)`

tuples to evaluate
the algorithm’s performance.

`predict: DataSet[(Vector, Double)] => DataSet[(Double, Double)]`

The SVM implementation can be controlled by the following parameters:

Parameters | Description |
---|---|

Blocks |
Sets the number of blocks into which the input data will be split.
On each block the local stochastic dual coordinate ascent method is executed.
This number should be set at least to the degree of parallelism.
If no value is specified, then the parallelism of the input DataSet is used as the number of blocks.
(Default value: |

Iterations |
Defines the maximum number of iterations of the outer loop method.
In other words, it defines how often the SDCA method is applied to the blocked data.
After each iteration, the locally computed weight vector updates have to be reduced to update the global weight vector value.
The new weight vector is broadcast to all SDCA tasks at the beginning of each iteration.
(Default value: |

LocalIterations |
Defines the maximum number of SDCA iterations.
In other words, it defines how many data points are drawn from each local data block to calculate the stochastic dual coordinate ascent.
(Default value: |

Regularization |
Defines the regularization constant of the SVM algorithm.
The higher the value, the smaller will the 2-norm of the weight vector be.
In case of a SVM with hinge loss this means that the SVM margin will be wider even though it might contain some false classifications.
(Default value: |

Stepsize |
Defines the initial step size for the updates of the weight vector.
The larger the step size is, the larger will be the contribution of the weight vector updates to the next weight vector value.
The effective scaling of the updates is $\frac{stepsize}{blocks}$.
This value has to be tuned in case that the algorithm becomes unstable.
(Default value: |

ThresholdValue |
Defines the limiting value for the decision function above which examples are labeled as
positive (+1.0). Examples with a decision function value below this value are classified
as negative (-1.0). In order to get the raw decision function values you need to indicate it by
using the OutputDecisionFunction parameter. (Default value: |

OutputDecisionFunction |
Determines whether the predict and evaluate functions of the SVM should return the distance
to the separating hyperplane, or binary class labels. Setting this to true will
return the raw distance to the hyperplane for each example. Setting it to false will
return the binary class label (+1.0, -1.0) (Default value: |

Seed |
Defines the seed to initialize the random number generator.
The seed directly controls which data points are chosen for the SDCA method.
(Default value: |

```
import org.apache.flink.api.scala._
import org.apache.flink.ml.math.Vector
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.classification.SVM
import org.apache.flink.ml.RichExecutionEnvironment
val pathToTrainingFile: String = ???
val pathToTestingFile: String = ???
val env = ExecutionEnvironment.getExecutionEnvironment
// Read the training data set, from a LibSVM formatted file
val trainingDS: DataSet[LabeledVector] = env.readLibSVM(pathToTrainingFile)
// Create the SVM learner
val svm = SVM()
.setBlocks(10)
// Learn the SVM model
svm.fit(trainingDS)
// Read the testing data set
val testingDS: DataSet[Vector] = env.readLibSVM(pathToTestingFile).map(_.vector)
// Calculate the predictions for the testing data set
val predictionDS: DataSet[(Vector, Double)] = svm.predict(testingDS)
```