H2Oが出しているApache Sparkの拡張、Sparkling Water。
残念ながら、Spark組み込みの機械学習ライブラリMLlibには、Deep Learningは実装されていないわけですが、ちょうどそれを補完するように、Sparkiling Waterの方には実装がありますね。
Sparkling Water - 0xdata
h2oai/sparkling-water - github
で、Exampleを試してたのですが、イマイチしっくり来ず。
いろいろ眺めていたところ「Kaggle Digit Recognizer」のデータに対する分類が試しやすそうだったので、これをSparkling Waterでやるとどうなるか、試した結果を書いてみたいと思います。今回は、Windowsのローカル環境でのお話です。
環境構築
OS
Windows 8 64bit (無印版, not pro)
必要なモノ
Kaggle Digit Recognizerのデータ
- train.csv
- test.csv
ダウンロードには、アカウントの作成が必要です
JDK8 & SBT
- JDK 8: http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
- SBT: http://www.scala-sbt.org/
JDKとSBTについては、インストーラーでさくっと入りますね。
Apache Spark 1.2.0
- Chose a Spark release: 1.2.0
- Chose a package type: PreBuild for hadoop 2.4 and later
- Chose a download type: Direct Download
とすると、4.で「spark-1.2.0-bin-hadoop2.4.tgz」のダウンロードリンクが出てきます。
[Note]
現時点では、Spark 1.2.1以降はサポートされていません。なんか動かなくてrevertしたcommitが見当たりますね。。。
https://github.com/h2oai/sparkling-water/commit/bbb777498e7587a3513f7e4d45937f593f540400
h2o Sparkling-Water
readmeの中ほどに、「Downloads of binaries -> Sparkling Water - Latest version」というリンクがあると思うので、そこから。
これ書いてる時点のversionは、0.2.12-92
追記(2015/05/06):
0.2.14-97でいろいろ変わってます。
hadoopのwinutils.exeと環境変数HADOOP_PATH
Windows特有の問題ですが、実行時、winutils.exeが無いよってエラーが出てきます。
WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
これ、無視しても問題無いのですが、気になる方はこちらを参考に。
リソースの作成 & 配置
フォルダ作成
適当にフォルダを掘っていきます、こんな感じで。
> mkdir c:\scala\kaggle
> mkdir c:\scala\kaggle\libs
> mkdir c:\scala\kaggle\data\digit
build.sbt
c:\scala\kaggle
にbuild.sbt
を作ります
name := "Kaggle"
version := "0.0.1"
organization := "my"
scalaVersion := "2.10.4"
scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature", "-Xelide-below", "ALL")
unmanagedBase := baseDirectory.value / "libs"
fork :=true
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.2.0",
"org.apache.spark" %% "spark-sql" % "1.2.0",
"org.apache.commons" % "commons-lang3" % "3.4"
)
[Note]
commons-lang3はDeepLearningParameterのdump用。ToStringしてもダメなのでReflectionToStringBuilderで。
sparkling-waterのjarを、外部jarフォルダに配置
sparkling-waterのzipを解凍して、assembly\build\libsにあるsparkling-water-assembly-0.2.12-92-all.jarを、c:\scala\kaggle\libs
にコピーしてください
[Note]
build.sbtでunmanagedBase := baseDirectory.value / "libs"
という行を入れてありますが、これは外部jarを使うための設定で、sparkling-waterはそこから読ませる方針です。本当はMaven Repoから引っ張ってきたいんですが、バージョン古いし依存関係めちゃくちゃだしで無理でした。。。
Data フォルダにKaggle Digitのデータを配置
c:\scala\kaggle\data\digit
に、kaggleからダウンロードした2ファイルを置きます
- train.csv
- test.csv
DigitRecognizer.scala
c:\scala\kaggle
にDigitRecognizer.scala
を作成。やってることを要約すると、
- SparkContext, H20Contextを起動
- train.csvをh2oのDataFrameとして読み込む
- DataFrameを「train:valid = 8:2」で分割
- deep learningのprediction modelを作成
- validデータに対するpredictionのsummaryを出力
というような流れです。
package my
import org.apache.spark
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.h2o._
import org.apache.spark.examples.h2o._
import water.fvec.H2OFrame
import hex.deeplearning.DeepLearning
import hex.deeplearning.DeepLearningModel.DeepLearningParameters
import hex.deeplearning.DeepLearningModel.DeepLearningParameters.{Activation, Loss, InitialWeightDistribution}
import hex.{FrameSplitter, SplitFrame}
import org.apache.commons.lang.builder._
object DigitRecognizer
{
val trainCsv = "C:/scala/kaggle/data/digit/train.csv"
val testCsv = "C:/scala/kaggle/data/digit/test.csv"
def main(args: Array[String]): Unit =
{
val dlParams = new DeepLearningParameters()
//dlParams._epochs = 0.01
Execute(dlParams, false)
}
def Execute(dlParams: DeepLearningParameters, useTestCsv: Boolean): Unit =
{
logging("Start")
val startTime = System.currentTimeMillis
val sparkConf = new SparkConf()
.setAppName("Kaggle-Digit")
.setMaster("local")
logging("Initialize SparkContext")
val sc = new SparkContext(sparkConf)
logging("Initialize H2OContext")
val h2oContext = new H2OContext(sc)
logging("Start H2OContext")
h2oContext.start()
logging(h2oContext.toString)
logging("Load TrainData")
val trainData = new H2OFrame(new java.io.File(trainCsv))
// For Classification, we need to convert the label value to Enum
trainData.replace(0, trainData.vec(0).toEnum) // Column 0 is "label"
logging("Split Data for Train/Test")
val sf = new FrameSplitter(trainData, Array(0.8), Array("train", "valid").map(water.Key.make(_)), null)
water.H2O.submitTask(sf)
val splits = sf.getResult
val spTrain = splits(0)
val spValid = splits(1)
logging("DeepLearning - Set Train / Valid data to param")
val resColName = "label"
dlParams._response_column = resColName
if(useTestCsv)
{
// All data for train
dlParams._train = h2oContext.dataFrameToKey(trainData)
}
else
{
// Use splitted data for train/valid
dlParams._train = h2oContext.dataFrameToKey(spTrain)
dlParams._valid = h2oContext.dataFrameToKey(spValid)
}
logging("DeepLearning - \"new\" object")
val dl = new DeepLearning(dlParams)
logging("DeepLearning - Training data, and generating model")
val dlModel = dl.trainModel.get
logging("dlModel")
println(dlModel)
logging("dlModel.score(train)")
println(dlModel.score(spTrain))
if(useTestCsv)
{
logging("dlModel.score(testCsv)")
val testData = new DataFrame(new java.io.File(testCsv))
val predRes = dlModel.score(testData, resColName)
logging("Convert predicted values to array")
val sqlContext = new SQLContext(sc)
val predAry =
(h2oContext.asSchemaRDD(new org.apache.spark.h2o.DataFrame(predRes))(sqlContext))
.collect().map(row => row.getString(0))
logging("Output predicted values to text")
//predAry.foreach(println)
val file = new java.io.File("submission.txt")
val pw = new java.io.PrintWriter(new java.io.FileWriter(file))
pw.println("ImageId,Label")
var i = 1
for (line <- predAry)
{
pw.println(i.toString + "," + line)
i += 1
}
pw.close()
}
else
{
logging("dlModel.score(valid)")
println(dlModel.score(spValid))
}
logging("Parameters for model")
println(dumpDlParams(dlParams))
val execTime = System.currentTimeMillis - startTime
logging("Done, ExecTime (sec): " + execTime / 1000)
logging("Stop SparkContext / H2O")
sc.stop()
water.H2O.shutdown()
}
def logging(msg: String) =
{
val currntDateTime = "%tF-%<tT" format new java.util.Date
val logStr = currntDateTime + ": " + msg
println("####################")
println(logStr)
println("####################")
}
def dumpDlParams(dlParams: DeepLearningParameters): String =
{
val builder = ReflectionToStringBuilder.toString(dlParams, ToStringStyle.MULTI_LINE_STYLE)
builder.toString
}
}
追記(2015/05/06):
0.2.14-97でwater.fvec.DataFrameがH2OFrameと名前変更。また、FrameSpliterのコンストラクタの引数も変わっております。
配置図
最終的に、こんなフォルダ構成になってるはず。
c:\scala\kaggle
| build.sbt
| DigitRecognizer.scala
└libs
| sparkling-water-assembly-0.2.12-92-all.jar
└data\digit
train.csv
test.csv
実行 & 結果
sbt runで実行
PowerShellだとTee-Objectが使えるので、そちらで出力を保存しつつ、画面に出しつつ。
> cd c:\scala\kaggle
> sbt run | Tee-Object -file out.txt
結果
validデータに対してpredictionしてみた結果のsummaryと、実行時間等々が出てくるはず。
####################
dlModel.score(valid)
####################
INFO: Confusion Matrix:
INFO: Act/Pred 0 1 2 3 4 5 6 7 8 9 Error
INFO: 0 842 0 1 0 2 0 2 0 3 2 0.0117 = 10 / 852
INFO: 1 0 936 3 0 2 1 0 4 4 0 0.0147 = 14 / 950
INFO: 2 6 1 771 4 1 0 4 7 4 3 0.0375 = 30 / 801
INFO: 3 3 1 8 845 0 9 0 7 4 5 0.0420 = 37 / 882
INFO: 4 1 2 0 0 786 0 4 2 4 12 0.0308 = 25 / 811
INFO: 5 6 2 1 9 5 708 3 1 5 7 0.0522 = 39 / 747
INFO: 6 10 3 2 0 2 2 812 0 0 0 0.0229 = 19 / 831
INFO: 7 2 0 2 1 3 1 0 873 0 2 0.0124 = 11 / 884
INFO: 8 3 6 1 4 4 7 1 0 771 5 0.0387 = 31 / 802
INFO: 9 6 1 2 4 8 0 0 15 2 802 0.0452 = 38 / 840
INFO: Totals 879 952 791 867 813 728 826 909 797 838 0.0302 = 254 / 8,400
####################
Parameters
####################
hex.deeplearning.DeepLearningModel$DeepLearningParameters@469d003c[
_n_folds=0
_keep_cross_validation_splits=false
_checkpoint=<null>
_override_with_best_model=true
_autoencoder=false
_use_all_factor_levels=true
_activation=Rectifier
_hidden={200,200}
_epochs=10.0
_train_samples_per_iteration=-2
_target_ratio_comm_to_comp=0.02
_seed=-4159351283397432880
_adaptive_rate=true
_rho=0.99
_epsilon=1.0E-8
_rate=0.005
_rate_annealing=1.0E-6
_rate_decay=1.0
_momentum_start=0.0
_momentum_ramp=1000000.0
_momentum_stable=0.0
_nesterov_accelerated_gradient=true
_input_dropout_ratio=0.0
_hidden_dropout_ratios=<null>
_l1=0.0
_l2=0.0
_max_w2=Infinity
_initial_weight_distribution=UniformAdaptive
_initial_weight_scale=1.0
_loss=CrossEntropy
_score_interval=5.0
_score_training_samples=10000
_score_validation_samples=0
_score_duty_cycle=0.1
_classification_stop=0.0
_regression_stop=1.0E-6
_quiet_mode=false
_score_validation_sampling=Uniform
_diagnostics=true
_variable_importances=false
_fast_mode=true
_ignore_const_cols=true
_force_load_balance=true
_replicate_training_data=false
_single_node_mode=false
_shuffle_training_data=false
_missing_values_handling=MeanImputation
_sparse=false
_col_major=false
_average_activation=0.0
_sparsity_beta=0.0
_max_categorical_features=2147483647
_reproducible=false
_response_column=label
_balance_classes=false
_max_after_balance_size=5.0
_class_sampling_factors=<null>
_max_hit_ratio_k=10
_destination_key=<null>
_train=train_part0.hex
_valid=train_part1.hex
_ignored_columns=<null>
_dropNA20Cols=false
_dropConsCols=true
_score_each_iteration=false
_max_confusion_matrix_size=20
_ice_id=58
]####################
Done, ExecTime (sec): 1601
####################
パラメーター全部デフォルトで、trainデータを8:2でtrain/validに割った場合、「0.0302 = 254 / 8,400」、正解率96.9%程度。
なお、これを実行した私のノートPC、CPUが「AMD E1-1200 APU」というネットブック仕様の代物なので、実行時間は通常もっと早いはず。
Kaggle に提出するsubmissionを出力する
DigitRecognizer.scalaのmainメソッドに、Execute(dlParams, false)
という行があります。これをExecute(dlParams, true)
に変更して実行すると、以下の流れでsubmission.txt
を出力するようにしてあります。
- train.csvを分割せず、全てtrainデータとして扱う
- それで出来たモデルに対して、test.csvへのpredictionの結果を吐く
実際に出来たものをsubmitしてみると、「0.96914」で207位。流石にデフォルトだと、もうすこしがんばりましょうって感じになりますな。
Next Step
正直なところ、Windowsのローカル環境でやる分には、RからH2Oを叩けばいいんですが、EMR上での動作を見据えつつ、ということで。