Scala
Spark
DeepLearning
Sparkling-Water
h2o.ai

Sparkling Water (H2O + Spark)で、Deep Learningを試す (ローカル環境 - Windows編)

More than 3 years have passed since last update.

H2Oが出しているApache Sparkの拡張、Sparkling Water。

残念ながら、Spark組み込みの機械学習ライブラリMLlibには、Deep Learningは実装されていないわけですが、ちょうどそれを補完するように、Sparkiling Waterの方には実装がありますね。

Sparkling Water - 0xdata

h2oai/sparkling-water - github

で、Exampleを試してたのですが、イマイチしっくり来ず。

いろいろ眺めていたところ「Kaggle Digit Recognizer」のデータに対する分類が試しやすそうだったので、これをSparkling Waterでやるとどうなるか、試した結果を書いてみたいと思います。今回は、Windowsのローカル環境でのお話です。


環境構築


OS

Windows 8 64bit (無印版, not pro)


必要なモノ


Kaggle Digit Recognizerのデータ

https://www.kaggle.com/c/digit-recognizer/data


  • train.csv

  • test.csv

ダウンロードには、アカウントの作成が必要です


JDK8 & SBT

JDKとSBTについては、インストーラーでさくっと入りますね。


Apache Spark 1.2.0

https://spark.apache.org/downloads.html


  1. Chose a Spark release: 1.2.0

  2. Chose a package type: PreBuild for hadoop 2.4 and later

  3. Chose a download type: Direct Download

とすると、4.で「spark-1.2.0-bin-hadoop2.4.tgz」のダウンロードリンクが出てきます。

[Note]

現時点では、Spark 1.2.1以降はサポートされていません。なんか動かなくてrevertしたcommitが見当たりますね。。。

https://github.com/h2oai/sparkling-water/commit/bbb777498e7587a3513f7e4d45937f593f540400


h2o Sparkling-Water

https://github.com/h2oai/sparkling-water

readmeの中ほどに、「Downloads of binaries -> Sparkling Water - Latest version」というリンクがあると思うので、そこから。

これ書いてる時点のversionは、0.2.12-92

追記(2015/05/06):

0.2.14-97でいろいろ変わってます。


hadoopのwinutils.exeと環境変数HADOOP_PATH

Windows特有の問題ですが、実行時、winutils.exeが無いよってエラーが出てきます。

WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)

これ、無視しても問題無いのですが、気になる方はこちらを参考に。


リソースの作成 & 配置


フォルダ作成

適当にフォルダを掘っていきます、こんな感じで。

> mkdir c:\scala\kaggle

> mkdir c:\scala\kaggle\libs
> mkdir c:\scala\kaggle\data\digit


build.sbt

c:\scala\kagglebuild.sbtを作ります


build.sbt

name := "Kaggle"

version := "0.0.1"

organization := "my"

scalaVersion := "2.10.4"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature", "-Xelide-below", "ALL")

unmanagedBase := baseDirectory.value / "libs"

fork :=true

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.2.0",
"org.apache.spark" %% "spark-sql" % "1.2.0",
"org.apache.commons" % "commons-lang3" % "3.4"
)


[Note]

commons-lang3はDeepLearningParameterのdump用。ToStringしてもダメなのでReflectionToStringBuilderで。


sparkling-waterのjarを、外部jarフォルダに配置

sparkling-waterのzipを解凍して、assembly\build\libsにあるsparkling-water-assembly-0.2.12-92-all.jarを、c:\scala\kaggle\libsにコピーしてください

[Note]

build.sbtでunmanagedBase := baseDirectory.value / "libs"という行を入れてありますが、これは外部jarを使うための設定で、sparkling-waterはそこから読ませる方針です。本当はMaven Repoから引っ張ってきたいんですが、バージョン古いし依存関係めちゃくちゃだしで無理でした。。。


Data フォルダにKaggle Digitのデータを配置

c:\scala\kaggle\data\digitに、kaggleからダウンロードした2ファイルを置きます


  • train.csv

  • test.csv


DigitRecognizer.scala

c:\scala\kaggleDigitRecognizer.scalaを作成。やってることを要約すると、


  1. SparkContext, H20Contextを起動

  2. train.csvをh2oのDataFrameとして読み込む

  3. DataFrameを「train:valid = 8:2」で分割

  4. deep learningのprediction modelを作成

  5. validデータに対するpredictionのsummaryを出力

というような流れです。


DigitRecognizer.scala

package my

import org.apache.spark
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._

import org.apache.spark.h2o._
import org.apache.spark.examples.h2o._
import water.fvec.H2OFrame
import hex.deeplearning.DeepLearning
import hex.deeplearning.DeepLearningModel.DeepLearningParameters
import hex.deeplearning.DeepLearningModel.DeepLearningParameters.{Activation, Loss, InitialWeightDistribution}
import hex.{FrameSplitter, SplitFrame}

import org.apache.commons.lang.builder._

object DigitRecognizer
{
val trainCsv = "C:/scala/kaggle/data/digit/train.csv"
val testCsv = "C:/scala/kaggle/data/digit/test.csv"

def main(args: Array[String]): Unit =
{
val dlParams = new DeepLearningParameters()
//dlParams._epochs = 0.01

Execute(dlParams, false)
}

def Execute(dlParams: DeepLearningParameters, useTestCsv: Boolean): Unit =
{
logging("Start")
val startTime = System.currentTimeMillis

val sparkConf = new SparkConf()
.setAppName("Kaggle-Digit")
.setMaster("local")

logging("Initialize SparkContext")
val sc = new SparkContext(sparkConf)

logging("Initialize H2OContext")
val h2oContext = new H2OContext(sc)

logging("Start H2OContext")
h2oContext.start()
logging(h2oContext.toString)

logging("Load TrainData")
val trainData = new H2OFrame(new java.io.File(trainCsv))

// For Classification, we need to convert the label value to Enum
trainData.replace(0, trainData.vec(0).toEnum) // Column 0 is "label"

logging("Split Data for Train/Test")
val sf = new FrameSplitter(trainData, Array(0.8), Array("train", "valid").map(water.Key.make(_)), null)
water.H2O.submitTask(sf)
val splits = sf.getResult
val spTrain = splits(0)
val spValid = splits(1)

logging("DeepLearning - Set Train / Valid data to param")
val resColName = "label"
dlParams._response_column = resColName
if(useTestCsv)
{
// All data for train
dlParams._train = h2oContext.dataFrameToKey(trainData)
}
else
{
// Use splitted data for train/valid
dlParams._train = h2oContext.dataFrameToKey(spTrain)
dlParams._valid = h2oContext.dataFrameToKey(spValid)
}

logging("DeepLearning - \"new\" object")
val dl = new DeepLearning(dlParams)

logging("DeepLearning - Training data, and generating model")
val dlModel = dl.trainModel.get

logging("dlModel")
println(dlModel)

logging("dlModel.score(train)")
println(dlModel.score(spTrain))

if(useTestCsv)
{
logging("dlModel.score(testCsv)")
val testData = new DataFrame(new java.io.File(testCsv))
val predRes = dlModel.score(testData, resColName)

logging("Convert predicted values to array")
val sqlContext = new SQLContext(sc)
val predAry =
(h2oContext.asSchemaRDD(new org.apache.spark.h2o.DataFrame(predRes))(sqlContext))
.collect().map(row => row.getString(0))

logging("Output predicted values to text")
//predAry.foreach(println)
val file = new java.io.File("submission.txt")
val pw = new java.io.PrintWriter(new java.io.FileWriter(file))
pw.println("ImageId,Label")
var i = 1
for (line <- predAry)
{
pw.println(i.toString + "," + line)
i += 1
}
pw.close()
}
else
{
logging("dlModel.score(valid)")
println(dlModel.score(spValid))
}

logging("Parameters for model")
println(dumpDlParams(dlParams))

val execTime = System.currentTimeMillis - startTime
logging("Done, ExecTime (sec): " + execTime / 1000)

logging("Stop SparkContext / H2O")
sc.stop()
water.H2O.shutdown()
}

def logging(msg: String) =
{
val currntDateTime = "%tF-%<tT" format new java.util.Date
val logStr = currntDateTime + ": " + msg
println("####################")
println(logStr)
println("####################")
}

def dumpDlParams(dlParams: DeepLearningParameters): String =
{
val builder = ReflectionToStringBuilder.toString(dlParams, ToStringStyle.MULTI_LINE_STYLE)
builder.toString
}
}


追記(2015/05/06):

0.2.14-97でwater.fvec.DataFrameがH2OFrameと名前変更。また、FrameSpliterのコンストラクタの引数も変わっております。


配置図

最終的に、こんなフォルダ構成になってるはず。

c:\scala\kaggle

| build.sbt
| DigitRecognizer.scala
└libs
| sparkling-water-assembly-0.2.12-92-all.jar
└data\digit
train.csv
test.csv


実行 & 結果


sbt runで実行

PowerShellだとTee-Objectが使えるので、そちらで出力を保存しつつ、画面に出しつつ。

> cd c:\scala\kaggle

> sbt run | Tee-Object -file out.txt


結果

validデータに対してpredictionしてみた結果のsummaryと、実行時間等々が出てくるはず。

####################

dlModel.score(valid)
####################
INFO: Confusion Matrix:
INFO: Act/Pred 0 1 2 3 4 5 6 7 8 9 Error
INFO: 0 842 0 1 0 2 0 2 0 3 2 0.0117 = 10 / 852
INFO: 1 0 936 3 0 2 1 0 4 4 0 0.0147 = 14 / 950
INFO: 2 6 1 771 4 1 0 4 7 4 3 0.0375 = 30 / 801
INFO: 3 3 1 8 845 0 9 0 7 4 5 0.0420 = 37 / 882
INFO: 4 1 2 0 0 786 0 4 2 4 12 0.0308 = 25 / 811
INFO: 5 6 2 1 9 5 708 3 1 5 7 0.0522 = 39 / 747
INFO: 6 10 3 2 0 2 2 812 0 0 0 0.0229 = 19 / 831
INFO: 7 2 0 2 1 3 1 0 873 0 2 0.0124 = 11 / 884
INFO: 8 3 6 1 4 4 7 1 0 771 5 0.0387 = 31 / 802
INFO: 9 6 1 2 4 8 0 0 15 2 802 0.0452 = 38 / 840
INFO: Totals 879 952 791 867 813 728 826 909 797 838 0.0302 = 254 / 8,400
####################
Parameters
####################
hex.deeplearning.DeepLearningModel$DeepLearningParameters@469d003c[
_n_folds=0
_keep_cross_validation_splits=false
_checkpoint=<null>
_override_with_best_model=true
_autoencoder=false
_use_all_factor_levels=true
_activation=Rectifier
_hidden={200,200}
_epochs=10.0
_train_samples_per_iteration=-2
_target_ratio_comm_to_comp=0.02
_seed=-4159351283397432880
_adaptive_rate=true
_rho=0.99
_epsilon=1.0E-8
_rate=0.005
_rate_annealing=1.0E-6
_rate_decay=1.0
_momentum_start=0.0
_momentum_ramp=1000000.0
_momentum_stable=0.0
_nesterov_accelerated_gradient=true
_input_dropout_ratio=0.0
_hidden_dropout_ratios=<null>
_l1=0.0
_l2=0.0
_max_w2=Infinity
_initial_weight_distribution=UniformAdaptive
_initial_weight_scale=1.0
_loss=CrossEntropy
_score_interval=5.0
_score_training_samples=10000
_score_validation_samples=0
_score_duty_cycle=0.1
_classification_stop=0.0
_regression_stop=1.0E-6
_quiet_mode=false
_score_validation_sampling=Uniform
_diagnostics=true
_variable_importances=false
_fast_mode=true
_ignore_const_cols=true
_force_load_balance=true
_replicate_training_data=false
_single_node_mode=false
_shuffle_training_data=false
_missing_values_handling=MeanImputation
_sparse=false
_col_major=false
_average_activation=0.0
_sparsity_beta=0.0
_max_categorical_features=2147483647
_reproducible=false
_response_column=label
_balance_classes=false
_max_after_balance_size=5.0
_class_sampling_factors=<null>
_max_hit_ratio_k=10
_destination_key=<null>
_train=train_part0.hex
_valid=train_part1.hex
_ignored_columns=<null>
_dropNA20Cols=false
_dropConsCols=true
_score_each_iteration=false
_max_confusion_matrix_size=20
_ice_id=58
]####################
Done, ExecTime (sec): 1601
####################

パラメーター全部デフォルトで、trainデータを8:2でtrain/validに割った場合、「0.0302 = 254 / 8,400」、正解率96.9%程度。

なお、これを実行した私のノートPC、CPUが「AMD E1-1200 APU」というネットブック仕様の代物なので、実行時間は通常もっと早いはず。


Kaggle に提出するsubmissionを出力する

DigitRecognizer.scalaのmainメソッドに、Execute(dlParams, false)という行があります。これをExecute(dlParams, true)に変更して実行すると、以下の流れでsubmission.txtを出力するようにしてあります。


  1. train.csvを分割せず、全てtrainデータとして扱う

  2. それで出来たモデルに対して、test.csvへのpredictionの結果を吐く

実際に出来たものをsubmitしてみると、「0.96914」で207位。流石にデフォルトだと、もうすこしがんばりましょうって感じになりますな。


Next Step

正直なところ、Windowsのローカル環境でやる分には、RからH2Oを叩けばいいんですが、EMR上での動作を見据えつつ、ということで。