5
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

spark mllib 分散SGD実装調査メモ

Last updated at Posted at 2015-11-14

をここに書いていく

  • 分散でどうやってgradient updateやっているか
  • workerの結果を平均している。
  • どこで、broadcastで配って, driverで平均とっている
  • shuffleで平均している?
  • 学習係数はどんな感じ?
  • AdaGradとかに更新できそうか?
  • BSPとの違いは?
  • SSPにするには何がいる?
  • 非効率って言われているけど、よさげにできるか?
  • いろいろimprovementが提案、実装されているけど、それってどうなの
  • LRでの SGD, L-BFGSの比較
  • 前の実験結果だとLR-SGDは精度がでてなかった。十分反復させて収束するかみる。分散環境で
  • AdaGrad, ADAMをプロトタイプしてみて目的関数の下がり具合を見てみる。するなら、今週末しかない。

なので、書くことは

  • 現状のSGD実装のソースコードリーディング

  • jiraチケットの関連話題の紹介

  • 特にSGDの効率化、高速化のチケット

  • LRでのSGD, L-BFGSとの収束率の測定

  • ソースいじって、iter毎の目的関数値を出力させるようにする。

  • AdaGrad, ADAMプロトタイピング + 実験

最後までいけたら良いな。。。

jira tickect links

論文 links

@DeveloperApi
class L1Updater extends Updater {
  override def compute(
      weightsOld: Vector,
      gradient: Vector,
      stepSize: Double,
      iter: Int,
      regParam: Double): (Vector, Double) = {
    val thisIterStepSize = stepSize / math.sqrt(iter) ※
    // Take gradient step
    val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
    brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)

thisIterStepSizeは※

GradientDescent.scala

/**
     * For the first iteration, the regVal will be initialized as sum of weight squares
     * if it's L2 updater; for L1 updater, the same logic is followed.
     */
    var regVal = updater.compute(
      weights, Vectors.zeros(weights.size), 0, 1, regParam)._2

    var converged = false // indicates whether converged based on convergenceTol
    var i = 1
    while (!converged && i <= numIterations) {
      val bcWeights = data.context.broadcast(weights)
      // Sample a subset (fraction miniBatchFraction) of the total data
      // compute and sum up the subgradients on this subset (this is one map-reduce)
      val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
        .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
          seqOp = (c, v) => {
            // c: (grad, loss, count), v: (label, features)
            val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
            (c._1, c._2 + l, c._3 + 1)
          },
          combOp = (c1, c2) => {
            // c: (grad, loss, count)
            (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
          })

      if (miniBatchSize > 0) {
        /**
         * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
         * and regVal is the regularization value computed in the previous iteration as well.
         */
        stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
        val update = updater.compute(
          weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
          stepSize, i, regParam)
        weights = update._1
        regVal = update._2

        previousWeights = currentWeights
        currentWeights = Some(weights)
        if (previousWeights != None && currentWeights != None) {
          converged = isConverged(previousWeights.get,
            currentWeights.get, convergenceTol)
        }
      } else {
        logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
      }
      i += 1
    }

    logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
      stochasticLossHistory.takeRight(10).mkString(", ")))

    (weights, stochasticLossHistory.toArray)

  }

step sizeはdefault 1.0,

stepSize = stepSize / sqrt(iteration)

で減衰

/**
   * Set the initial step size of SGD for the first step. Default 1.0.
   * In subsequent steps, the step size will decrease with stepSize/sqrt(t)
   */
  def setStepSize(step: Double): this.type = {
    this.stepSize = step
    this
  }

updaterは

  • gradient, weightvectorを引数にとって、weightvectorを更新して、L1,L2の正則化項を計算する。

gradientは

  • exampleとcurrent weightvectorを引数にとってgradientを計算して、Lossを計算して返す。

treeAggregationは

  • partition毎にaggregateする。
  • さらにscale個のpartionをaggregateして、新しい numPartition/scaleのpartiallyAggregated partionを作る
  • これを繰り返し scale + numPartition /scale以下になったら、最後に一回aggregateして、結果を返す。
  • 目的は、一発のaggregationはreducerが1つになって処理が1workerに偏るのを防ぐために、tree構造で段階的、分散でにaggregationする。

stochasticLossHistory.append(lossSum / miniBatchSize + regVal)

でlossの履歴を追加。minBatchSizeで割っているのは一応全体のロスにするため?

収束判定は

  • lossで見てない

  • weighvectorの変化がconvergenceTolelance以下だったら収束している判断

  • runMiniBatchSGDははweightvectorのlossの変化の履歴を返す。

  • なので、終わったあとに全lossの履歴を表示、プロットすれば収束率はだせる。

  • これを使った、AdaGrad, ADAMで収束率を比較できる

  • treeaggregateは reduceと同じなので、毎回、更新したweightvectorはdriverに帰ってきて、それを毎回broadecastで配っている。

  • driver側で、adagradでweightvectorの成分を変化させて、broadcastで配るのは簡単

  • adamも同じだろうけど、必要な情報がちょっと増える。

  • なんである程度簡易で実装できるが、GradientDescent.scalaでなくて

  • AdaGrad.scalaとかADAM.scalaとか作って処理させるのが良さそう

5
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?