23
22

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Mahoutをsparkで実行する

Last updated at Posted at 2014-06-18

Spark上で実行されるmahout

sparkのモジュールに、mllibと呼ばれる機械学習ライブラリが含まれていますが、mahoutもsparkに対応しました。
ただし、mahoutサイトにも記載されていますが、

Please keep in mind that this code is still in a very early experimental stage

まだ実験段階のようですので感じだけでもつかめたらと思います。

今回、Playing with Mahout's Spark Shellを参考にspark-shell上でのmahoutの実行環境を構築しました。

Install

予めインストールしておくもの

oracle JDK 7以上
maven 3.2.x以上
subversion

Apache sparkのインストール

今回はcdh5のApache sparkをインストールしました。
ここで、インストールするsparkバージョンは0.9.xです。
(1.0.xでためしたところ、サンプルの実行時にエラーが出力されました。)

// CDH5のリポジトリを登録
$ sudo wget http://archive.cloudera.com/cdh5/one-click-install/redhat/6/x86_64/cloudera-cdh-5-0.x86_64.rpm
$ sudo yum --nogpgcheck localinstall cloudera-cdh-5-0.x86_64.rpm

// sparkのインストール
$ sudo yum install spark-core spark-master spark-worker spark-python

installが完了したらsparkを起動しておきます。

$ sudo service spark-master start
$ sudo service spark-worker start

http://<hostname>:18080にアクセスすることで、以下のような画面が確認できると思います。
spark1.JPG

mahoutのビルド

最新のリポジトリをsvnを利用してチェックアウトし、ビルドを実行します.
mahoutは、任意の場所で構いません。

cd ~/
svn co https://svn.apache.org/repos/asf/mahout/trunk/ mahout
cd mahout/
mvn -DskipTests clean install

mahout shellの起動

環境変数を設定します。

//チェックアウトしたmahoutディレクトリの場所
$ export MAHOUT_HOME=/home/<user>/mahout
// "spark://"から始まるmaster url
$ export MASTER=spark://ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal:7077
// apache sparkのインストールディレクトリ 
$ export SPARK_HOME=/usr/lib/spark

shellの実行

cd $MAHOUT_HOME
bin/mahout spark-shell
MAHOUT_LOCAL is not set; adding HADOOP_CONF_DIR to classpath.
14/06/17 11:29:35 INFO spark.HttpServer: Starting HTTP Server
14/06/17 11:29:35 INFO server.Server: jetty-7.x.y-SNAPSHOT
14/06/17 11:29:35 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:50964
...
Type :help for more information.
mahout>

エラーが発生せず、mahoutシェルが起動すればOKです。

回帰分析処理を実行してみる。

Playing with Mahout's Spark Shellのexampleを実行してみました。
このexampleは、dataset about cerealsに穀類についてのデータの一部を使用しています。

DRM (線形代数処理)用の関数にデータを取り込みます。
drmParallelizeと呼ばれる関数に渡すことで分散処理をさせることができるみたいです。

mahout > val drmData = drmParallelize(dense(
  (2, 2, 10.5, 10, 29.509541),  // Apple Cinnamon Cheerios
  (1, 2, 12,   12, 18.042851),  // Cap'n'Crunch
  (1, 1, 12,   13, 22.736446),  // Cocoa Puffs
  (2, 1, 11,   13, 32.207582),  // Froot Loops
  (1, 2, 12,   11, 21.871292),  // Honey Graham Ohs
  (2, 1, 16,   8,  36.187559),  // Wheaties Honey Gold
  (6, 2, 17,   1,  50.764999),  // Cheerios
  (3, 2, 13,   7,  40.400208),  // Clusters
  (3, 3, 13,   4,  45.811716)), // Great Grains Pecan
  numPartitions = 2);
drmData: org.apache.mahout.sparkbindings.drm.CheckpointedDrm[Int] = org.apache.mahout.sparkbindings.drm.CheckpointedDrmBase@2bc41091
.
.
mahout> val drmX = drmData(::, 0 until 4)
drmX: org.apache.mahout.sparkbindings.drm.DrmLike[Int] = org.apache.mahout.sparkbindings.drm.plan.OpMapBlock@3605a553
.
.
mahout> val y = drmData.collect(::, 4)
y: org.apache.mahout.math.Vector = {0:29.509541,1:18.042851,2:22.736446,3:32.207582,4:21.871292,5:36.187559,6:50.764999,7:40.400208,8:45.811716}
.
.
mahout> val drmXtX = drmX.t %*% drmX                                                                                                            
drmXtX: org.apache.mahout.sparkbindings.drm.DrmLike[Int] = OpAB(OpAt(org.apache.mahout.sparkbindings.drm.plan.OpMapBlock@3605a553),org.apache.mahout.sparkbindings.drm.plan.OpMapBlock@3605a553)
.
.
mahout> val drmXval drmXty = drmX.t %*% y                                                                                                               
drmXty: org.apache.mahout.sparkbindings.drm.DrmLike[Int] = OpAx(OpAt(org.apache.mahout.sparkbindings.drm.plan.OpMapBlock@3605a553),{0:29.509541,1:18.042851,2:22.736446,3:32.207582,4:21.871292,5:36.187559,6:50.764999,7:40.400208,8:45.811716})
.
.
mahout> val XtX = drmXtX.collect
XtX: org.apache.mahout.math.Matrix = 
{
  0  => {0:69.0,1:40.0,2:291.0,3:137.0}
  1  => {0:40.0,1:32.0,2:207.0,3:128.0}
  2  => {0:291.0,1:207.0,2:1546.25,3:968.0}
  3  => {0:137.0,1:128.0,2:968.0,3:833.0}
}
.
.
mahout> val Xty = drmXty.collect(::, 0)
Xty: org.apache.mahout.math.Vector = {0:821.6857190000001,1:549.744517,2:3978.7015894999995,3:2272.779989}
.
.
mahout> val beta = solve(XtX, Xty)
beta: org.apache.mahout.math.Vector = {0:5.247349465378446,1:2.750794578467531,2:1.1527813010791554,3:0.10312017617608908}
.
.
mahout> val yFitted = (drmX %*% beta).collect(::, 0)
yFitted: org.apache.mahout.math.Vector = {0:29.131693510783975,1:25.819756349376444,2:23.172081947084997,3:27.266650111384287,4:25.716636173200357,5:32.514955735899626,6:56.68608824372747,7:36.95163570033205,8:39.393069750271316}
.
.
mahout> (y - yFitted).norm(2)
res1: Double = 14.200396723606845
.
.
mahout> def ols(drmX: DrmLike[Int], y: Vector) = {
     |   val XtX = (drmX.t %*% drmX).collect
     |   val Xty = (drmX.t %*% y).collect(::, 0)
     |   solve(XtX, Xty)
     | }
ols: (drmX: org.apache.mahout.sparkbindings.drm.DrmLike[Int], y: org.apache.mahout.math.Vector)org.apache.mahout.math.Vector
.
.
mahout> def goodnessOfFit(drmX: DrmLike[Int], beta: Vector, y: Vector) = {                                                                      
     |   val fittedY = (drmX %*% beta).collect(::, 0)
     |   (y - fittedY).norm(2)
     | }
goodnessOfFit: (drmX: org.apache.mahout.sparkbindings.drm.DrmLike[Int], beta: org.apache.mahout.math.Vector, y: org.apache.mahout.math.Vector)Double
.
.
mahout> val drmXwithBiasColumn = drmX.mapBlock(ncol = drmX.ncol + 1) {
     |   case(keys, block) =>
     |     val blockWithBiasColumn = block.like(block.nrow, block.ncol + 1)
     |     blockWithBiasColumn(::, 0 until block.ncol) := block
     |     blockWithBiasColumn(::, block.ncol) := 1
     |     keys -> blockWithBiasColumn
     | }
drmXwithBiasColumn: org.apache.mahout.sparkbindings.drm.DrmLike[Int] = org.apache.mahout.sparkbindings.drm.plan.OpMapBlock@19170713
.
.
mahout> val betaWithBiasTerm = ols(drmXwithBiasColumn, y)
betaWithBiasTerm: org.apache.mahout.math.Vector = {0:-1.3362653883272289,1:-13.15770132067483,2:-4.152654199020216,3:-5.679908094232256,4:163.1793268784127}
.
.
mahout> 14/06/17 12:35:09 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 23.0 from pool 
.
.
mahout> goodnessOfFit(drmXwithBiasColumn, betaWithBiasTerm, y)
res2: Double = 7.623280714561956

最後、 7.62... という数字がでました。
この数値が0に近ければ近いほど分析の結果(線の形?)が良いということになる。

感想

  • R言語に近い感覚で式を書くことができるということだが、SparkRと呼ばれるものがあるので、どちらが解析などに向いているのか気になる
  • mahoutのことも勉強しないと・・・。
23
22
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
23
22

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?