Flink SVM

2017/07/08 Flink

Flink SVM

SVM

支持向量机是监督式学习框架,通过分析输入数据来解决分类和聚类问题。SVM是非线性分类算法,常见的能解决的问题是:

  • 常规分类问题
  • 文本和超文本的分类问题
  • 图像处理问题
  • 生物和其它的科学问题 Flink使用了一个高效的分布式算法。 解决最小化问题,Flink使用了Stochastic Dual Coordinate Ascent(SDCA)算法,通过使用CoCoA算法使得SDCA计算的结果可以分布式计算进行合并。

SVM 示例

示例中使用鸢尾花数据。如下的四种数据决定了鸢尾花的种类,样例数据如下:

萼片长度 萼片宽度 花瓣长度 花瓣宽度 品种
5.1 3.5 1.4 0.2 1
5.6 2.9 3.6 1.3 2
5.8 2.7 5.1 1.9 3

其中品种数据如下: | 品种编号 | 品种名称 | | —- | —- | | 1 | Iris Setosa | | 2 | Iris Versicolor | | 3 | Iris Virginica | 在处理的时候,可以使用一些开源代码将CSV数据转化为LibSVM的数据。

Scala代码如下:

import org.apache.flink.api.scala._ 
import org.apache.flink.ml.math.Vector 
import org.apache.flink.ml.common.LabeledVector 
import org.apache.flink.ml.classification.SVM 
import org.apache.flink.ml.RichExecutionEnvironment 
 
object MySVMApp { 
  def main(args: Array[String]) { 
    // set up the execution environment 
    val pathToTrainingFile: String = "iris-train.txt" 
    val pathToTestingFile: String = "iris-train.txt" 
    val env = ExecutionEnvironment.getExecutionEnvironment 
 
    // Read the training dataset, from a LibSVM formatted file 
    val trainingDS: DataSet[LabeledVector] = 
    env.readLibSVM(pathToTrainingFile) 
 
    // Create the SVM learner 
    val svm = SVM() 
      .setBlocks(10) 
 
    // Learn the SVM model 
    svm.fit(trainingDS) 
 
    // Read the testing dataset 
    val testingDS: DataSet[Vector] = 
    env.readLibSVM(pathToTestingFile).map(_.vector) 
 
    // Calculate the predictions for the testing dataset 
    val predictionDS: DataSet[(Vector, Double)] = 
    svm.predict(testingDS) 

    predictionDS.writeAsText("out") 
 
    env.execute("Flink SVM App") 
  } 
}

程序输出的结果如下:

(SparseVector((0,5.1), (1,3.5), (2,1.4), (3,0.2)),1.0) 
(SparseVector((0,4.9), (1,3.0), (2,1.4), (3,0.2)),1.0) 
(SparseVector((0,4.7), (1,3.2), (2,1.3), (3,0.2)),1.0) 
(SparseVector((0,4.6), (1,3.1), (2,1.5), (3,0.2)),1.0) 
(SparseVector((0,5.0), (1,3.6), (2,1.4), (3,0.2)),1.0) 
(SparseVector((0,5.4), (1,3.9), (2,1.7), (3,0.4)),1.0) 
(SparseVector((0,4.6), (1,3.4), (2,1.4), (3,0.3)),1.0) 
(SparseVector((0,5.0), (1,3.4), (2,1.5), (3,0.2)),1.0) 
(SparseVector((0,4.4), (1,2.9), (2,1.4), (3,0.2)),1.0) 
(SparseVector((0,4.9), (1,3.1), (2,1.5), (3,0.1)),1.0) 
(SparseVector((0,5.4), (1,3.7), (2,1.5), (3,0.2)),1.0) 
(SparseVector((0,4.8), (1,3.4), (2,1.6), (3,0.2)),1.0) 
(SparseVector((0,4.8), (1,3.0), (2,1.4), (3,0.1)),1.0) 

可以通过调优程序的参数来调整结果

参数 描述
Blocks 输入数据分区的数量,默认是None,可以用来配置并行度
Iterations 每个循环迭代的次数,默认是10
LocalIterations SDCA算法内部迭代次数,默认10
Regularization 值越大,算法值向量越小,默认是1
StepSize 算法向量初始化的步进值,默认1.0
ThresholdValue 默认是0.0,定义了决策函数的极值
OutputDesicionnFunction 会输出样例数据在向量空间的距离
Seed Random函数的种子
Show Disqus Comments

Search

    Table of Contents