LoginSignup
28
28

More than 5 years have passed since last update.

Sparkling Water (H2O + Spark)で、Deep Learningを試す (ローカル環境 - Windows編)

Last updated at Posted at 2015-04-14

H2Oが出しているApache Sparkの拡張、Sparkling Water。

残念ながら、Spark組み込みの機械学習ライブラリMLlibには、Deep Learningは実装されていないわけですが、ちょうどそれを補完するように、Sparkiling Waterの方には実装がありますね。

Sparkling Water - 0xdata
h2oai/sparkling-water - github

で、Exampleを試してたのですが、イマイチしっくり来ず。

いろいろ眺めていたところ「Kaggle Digit Recognizer」のデータに対する分類が試しやすそうだったので、これをSparkling Waterでやるとどうなるか、試した結果を書いてみたいと思います。今回は、Windowsのローカル環境でのお話です。

環境構築

OS

Windows 8 64bit (無印版, not pro)

必要なモノ

Kaggle Digit Recognizerのデータ

  • train.csv
  • test.csv

ダウンロードには、アカウントの作成が必要です

JDK8 & SBT

JDKとSBTについては、インストーラーでさくっと入りますね。

Apache Spark 1.2.0

  1. Chose a Spark release: 1.2.0
  2. Chose a package type: PreBuild for hadoop 2.4 and later
  3. Chose a download type: Direct Download

とすると、4.で「spark-1.2.0-bin-hadoop2.4.tgz」のダウンロードリンクが出てきます。

[Note]
現時点では、Spark 1.2.1以降はサポートされていません。なんか動かなくてrevertしたcommitが見当たりますね。。。
https://github.com/h2oai/sparkling-water/commit/bbb777498e7587a3513f7e4d45937f593f540400

h2o Sparkling-Water

readmeの中ほどに、「Downloads of binaries -> Sparkling Water - Latest version」というリンクがあると思うので、そこから。

これ書いてる時点のversionは、0.2.12-92

追記(2015/05/06):
0.2.14-97でいろいろ変わってます。

hadoopのwinutils.exeと環境変数HADOOP_PATH

Windows特有の問題ですが、実行時、winutils.exeが無いよってエラーが出てきます。

WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
        at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
        at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)

これ、無視しても問題無いのですが、気になる方はこちらを参考に。

リソースの作成 & 配置

フォルダ作成

適当にフォルダを掘っていきます、こんな感じで。

> mkdir c:\scala\kaggle
> mkdir c:\scala\kaggle\libs
> mkdir c:\scala\kaggle\data\digit

build.sbt

c:\scala\kagglebuild.sbtを作ります

build.sbt
name := "Kaggle"

version := "0.0.1"

organization := "my"

scalaVersion := "2.10.4"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature", "-Xelide-below", "ALL")

unmanagedBase := baseDirectory.value / "libs"

fork :=true

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.2.0",
  "org.apache.spark" %% "spark-sql" % "1.2.0",
  "org.apache.commons" % "commons-lang3" % "3.4"
)

[Note]
commons-lang3はDeepLearningParameterのdump用。ToStringしてもダメなのでReflectionToStringBuilderで。

sparkling-waterのjarを、外部jarフォルダに配置

sparkling-waterのzipを解凍して、assembly\build\libsにあるsparkling-water-assembly-0.2.12-92-all.jarを、c:\scala\kaggle\libsにコピーしてください

[Note]
build.sbtでunmanagedBase := baseDirectory.value / "libs"という行を入れてありますが、これは外部jarを使うための設定で、sparkling-waterはそこから読ませる方針です。本当はMaven Repoから引っ張ってきたいんですが、バージョン古いし依存関係めちゃくちゃだしで無理でした。。。

Data フォルダにKaggle Digitのデータを配置

c:\scala\kaggle\data\digitに、kaggleからダウンロードした2ファイルを置きます

  • train.csv
  • test.csv

DigitRecognizer.scala

c:\scala\kaggleDigitRecognizer.scalaを作成。やってることを要約すると、

  1. SparkContext, H20Contextを起動
  2. train.csvをh2oのDataFrameとして読み込む
  3. DataFrameを「train:valid = 8:2」で分割
  4. deep learningのprediction modelを作成
  5. validデータに対するpredictionのsummaryを出力

というような流れです。

DigitRecognizer.scala


package my

import org.apache.spark
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._

import org.apache.spark.h2o._
import org.apache.spark.examples.h2o._
import water.fvec.H2OFrame
import hex.deeplearning.DeepLearning
import hex.deeplearning.DeepLearningModel.DeepLearningParameters
import hex.deeplearning.DeepLearningModel.DeepLearningParameters.{Activation, Loss, InitialWeightDistribution}
import hex.{FrameSplitter, SplitFrame}

import org.apache.commons.lang.builder._


object DigitRecognizer
{
  val trainCsv = "C:/scala/kaggle/data/digit/train.csv"
  val testCsv = "C:/scala/kaggle/data/digit/test.csv"

  def main(args: Array[String]): Unit =
  {
    val dlParams = new DeepLearningParameters()
    //dlParams._epochs = 0.01

    Execute(dlParams, false)
  }

  def Execute(dlParams: DeepLearningParameters, useTestCsv: Boolean): Unit =
  {
    logging("Start")
    val startTime = System.currentTimeMillis

    val sparkConf = new SparkConf()
      .setAppName("Kaggle-Digit")
      .setMaster("local")

    logging("Initialize SparkContext")
    val sc = new SparkContext(sparkConf)

    logging("Initialize H2OContext")
    val h2oContext = new H2OContext(sc)

    logging("Start H2OContext")
    h2oContext.start()
    logging(h2oContext.toString)

    logging("Load TrainData")
    val trainData = new H2OFrame(new java.io.File(trainCsv))

    // For Classification, we need to convert the label value to Enum
    trainData.replace(0, trainData.vec(0).toEnum) // Column 0 is "label"

    logging("Split Data for Train/Test")
    val sf = new FrameSplitter(trainData, Array(0.8), Array("train", "valid").map(water.Key.make(_)), null)
    water.H2O.submitTask(sf)
    val splits = sf.getResult
    val spTrain = splits(0)
    val spValid = splits(1)

    logging("DeepLearning - Set Train / Valid data to param")
    val resColName = "label"
    dlParams._response_column = resColName
    if(useTestCsv)
    {
      // All data for train
      dlParams._train = h2oContext.dataFrameToKey(trainData)
    }
    else
    {
      // Use splitted data for train/valid
      dlParams._train = h2oContext.dataFrameToKey(spTrain)
      dlParams._valid = h2oContext.dataFrameToKey(spValid)
    }

    logging("DeepLearning - \"new\" object")
    val dl = new DeepLearning(dlParams)

    logging("DeepLearning - Training data, and generating model")
    val dlModel = dl.trainModel.get

    logging("dlModel")
    println(dlModel)

    logging("dlModel.score(train)")
    println(dlModel.score(spTrain))

    if(useTestCsv)
    {
      logging("dlModel.score(testCsv)")
      val testData = new DataFrame(new java.io.File(testCsv))
      val predRes = dlModel.score(testData, resColName)

      logging("Convert predicted values to array")
      val sqlContext = new SQLContext(sc)
      val predAry =
        (h2oContext.asSchemaRDD(new org.apache.spark.h2o.DataFrame(predRes))(sqlContext))
        .collect().map(row => row.getString(0))

      logging("Output predicted values to text")
      //predAry.foreach(println)
      val file = new java.io.File("submission.txt")
      val pw = new java.io.PrintWriter(new java.io.FileWriter(file))
      pw.println("ImageId,Label")
      var i = 1
      for (line <- predAry)
      {
        pw.println(i.toString + "," + line)
        i += 1
      }
      pw.close()
    }
    else
    {
      logging("dlModel.score(valid)")
      println(dlModel.score(spValid))
    }

    logging("Parameters for model")
    println(dumpDlParams(dlParams))

    val execTime = System.currentTimeMillis - startTime
    logging("Done, ExecTime (sec): " + execTime / 1000)

    logging("Stop SparkContext / H2O")
    sc.stop()
    water.H2O.shutdown()
  }

  def logging(msg: String) =
  {
    val currntDateTime = "%tF-%<tT" format new java.util.Date
    val logStr = currntDateTime + ": " + msg
    println("####################")
    println(logStr)
    println("####################")
  }

  def dumpDlParams(dlParams: DeepLearningParameters): String =
  {
    val builder = ReflectionToStringBuilder.toString(dlParams, ToStringStyle.MULTI_LINE_STYLE)
    builder.toString
  }
}


追記(2015/05/06):
0.2.14-97でwater.fvec.DataFrameがH2OFrameと名前変更。また、FrameSpliterのコンストラクタの引数も変わっております。

配置図

最終的に、こんなフォルダ構成になってるはず。

c:\scala\kaggle
 | build.sbt
 | DigitRecognizer.scala
 └libs
 |  sparkling-water-assembly-0.2.12-92-all.jar
 └data\digit
    train.csv
    test.csv

実行 & 結果

sbt runで実行

PowerShellだとTee-Objectが使えるので、そちらで出力を保存しつつ、画面に出しつつ。

> cd c:\scala\kaggle
> sbt run | Tee-Object -file out.txt

結果

validデータに対してpredictionしてみた結果のsummaryと、実行時間等々が出てくるはず。

####################
dlModel.score(valid)
####################
INFO: Confusion Matrix:
INFO: Act/Pred   0   1   2   3   4   5   6   7   8   9  Error              
INFO:        0 842   0   1   0   2   0   2   0   3   2 0.0117 =    10 / 852
INFO:        1   0 936   3   0   2   1   0   4   4   0 0.0147 =    14 / 950
INFO:        2   6   1 771   4   1   0   4   7   4   3 0.0375 =    30 / 801
INFO:        3   3   1   8 845   0   9   0   7   4   5 0.0420 =    37 / 882
INFO:        4   1   2   0   0 786   0   4   2   4  12 0.0308 =    25 / 811
INFO:        5   6   2   1   9   5 708   3   1   5   7 0.0522 =    39 / 747
INFO:        6  10   3   2   0   2   2 812   0   0   0 0.0229 =    19 / 831
INFO:        7   2   0   2   1   3   1   0 873   0   2 0.0124 =    11 / 884
INFO:        8   3   6   1   4   4   7   1   0 771   5 0.0387 =    31 / 802
INFO:        9   6   1   2   4   8   0   0  15   2 802 0.0452 =    38 / 840
INFO:   Totals 879 952 791 867 813 728 826 909 797 838 0.0302 = 254 / 8,400
####################
Parameters
####################
hex.deeplearning.DeepLearningModel$DeepLearningParameters@469d003c[
  _n_folds=0
  _keep_cross_validation_splits=false
  _checkpoint=<null>
  _override_with_best_model=true
  _autoencoder=false
  _use_all_factor_levels=true
  _activation=Rectifier
  _hidden={200,200}
  _epochs=10.0
  _train_samples_per_iteration=-2
  _target_ratio_comm_to_comp=0.02
  _seed=-4159351283397432880
  _adaptive_rate=true
  _rho=0.99
  _epsilon=1.0E-8
  _rate=0.005
  _rate_annealing=1.0E-6
  _rate_decay=1.0
  _momentum_start=0.0
  _momentum_ramp=1000000.0
  _momentum_stable=0.0
  _nesterov_accelerated_gradient=true
  _input_dropout_ratio=0.0
  _hidden_dropout_ratios=<null>
  _l1=0.0
  _l2=0.0
  _max_w2=Infinity
  _initial_weight_distribution=UniformAdaptive
  _initial_weight_scale=1.0
  _loss=CrossEntropy
  _score_interval=5.0
  _score_training_samples=10000
  _score_validation_samples=0
  _score_duty_cycle=0.1
  _classification_stop=0.0
  _regression_stop=1.0E-6
  _quiet_mode=false
  _score_validation_sampling=Uniform
  _diagnostics=true
  _variable_importances=false
  _fast_mode=true
  _ignore_const_cols=true
  _force_load_balance=true
  _replicate_training_data=false
  _single_node_mode=false
  _shuffle_training_data=false
  _missing_values_handling=MeanImputation
  _sparse=false
  _col_major=false
  _average_activation=0.0
  _sparsity_beta=0.0
  _max_categorical_features=2147483647
  _reproducible=false
  _response_column=label
  _balance_classes=false
  _max_after_balance_size=5.0
  _class_sampling_factors=<null>
  _max_hit_ratio_k=10
  _destination_key=<null>
  _train=train_part0.hex
  _valid=train_part1.hex
  _ignored_columns=<null>
  _dropNA20Cols=false
  _dropConsCols=true
  _score_each_iteration=false
  _max_confusion_matrix_size=20
  _ice_id=58
]####################
Done, ExecTime (sec): 1601
####################

パラメーター全部デフォルトで、trainデータを8:2でtrain/validに割った場合、「0.0302 = 254 / 8,400」、正解率96.9%程度。

なお、これを実行した私のノートPC、CPUが「AMD E1-1200 APU」というネットブック仕様の代物なので、実行時間は通常もっと早いはず。

Kaggle に提出するsubmissionを出力する

DigitRecognizer.scalaのmainメソッドに、Execute(dlParams, false)という行があります。これをExecute(dlParams, true)に変更して実行すると、以下の流れでsubmission.txtを出力するようにしてあります。

  1. train.csvを分割せず、全てtrainデータとして扱う
  2. それで出来たモデルに対して、test.csvへのpredictionの結果を吐く

実際に出来たものをsubmitしてみると、「0.96914」で207位。流石にデフォルトだと、もうすこしがんばりましょうって感じになりますな。

Next Step

正直なところ、Windowsのローカル環境でやる分には、RからH2Oを叩けばいいんですが、EMR上での動作を見据えつつ、ということで。

28
28
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
28
28