Spark 1.4.0 の SparkR を動かす

  • 44
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

先日、Spark 1.4.0 がリリースされ、多数のアップデートがある

  • SparkR
  • 運用モニタリングとDAGのビジュアライゼーション
  • REST API
  • DataFrame API

この中でも、SparkR という、統計言語 R から Sparkを利用できる拡張を今回は試したい。他のHadoop関連記事では、無視されやすい Windows も取り扱う。

R には、以前から、SparkR-pkg(https://github.com/amplab-extras/SparkR-pkg/) というプロジェクトが Github上にあり、今回、これが本家に統合された形のようだ。

ビルド済みパッケージの入手

まずビルドからはじめるが、ビルドが面倒ならば、Windows にも対応したビルド済みパッケージを以下から入手できる。

http://osdn.jp/projects/win-hadoop/downloads/63406/spark-1.4.0-bin-hadoop2.6-sparkr-windows.tgz/

Spark 1.4.0 のビルド

以前のこの記事シリーズと同じく、まずは、ビルドから始める。Linux、Mac は、ビルドツールさえ揃っていれば、何も考えずにビルド可能だが、Windows では、多少、手間がかかる。
今回は、複数のマシンに展開しやすいよう配布パッケージを作りたい。Spark では、ビルドそのものは Maven でできるのだが、なぜかパッケージには、以前から専用のスクリプト make-distribution.sh を使う。

また、今回 SparkR を使う上で、ビルド時にも、R にパスが通っていなければならない。Windows では、R をパスに追加する(システムのプロパティでグローバルに追加したほうがよい)。

set PATH=C:\Progra~1\R\R-3.1.3\bin\x64;%PATH%

配布パッケージの作成

make-distribution.sh に以下のオプションをつけてビルドする。

sh ./make-distribution.sh --tgz --skip-java-test -Phadoop-2.6 -Phive -Psparkr -DskipTests

初めてビルドするときは、ネットワークとマシンによっては、40-50分かかることもあるようだ。
ビルドが成功すれば、カレントディレクトリに spark-1.4.0-bin-2.6.0.tgz のようなファイル名でパッケージができている。また、dist ディレクトリは、そのパッケージの展開した内容そのものなので、このディレクトリをどこかへコピーしてもよい。

Windows では、別途、hadoop のネイティブライブラリも必要になるが、以下より入手できるようにしてある。

http://osdn.jp/projects/win-hadoop/downloads/62852/hadoop-winutils-2.6.0.zip/

Windows (mingw) 向けの修正

このビルドスクリプトを Windows の mingw で動かすとエラーするので、これを修正する

make-distribution.diff
--- make-distribution.sh    2015-06-03 10:07:24.000000000 +0900
+++ make-distribution.mingw.sh  2015-06-17 18:58:31.000000000 +0900
@@ -139 +139 @@
-    | fgrep --count "<id>hive</id>";\
+    | grep -f --count "<id>hive</id>";\
@@ -146 +146,2 @@
-if [[ ! "$JAVA_VERSION" =~ "1.6" && -z "$SKIP_JAVA_TEST" ]]; then
+echo "$JAVA_VERSION" | grep "1.6" && true
+if [[ $? -ne 0 && -z "$SKIP_JAVA_TEST" ]]; then
@@ -154 +155 @@
-  if [[ ! "$REPLY" =~ ^[Yy]$ ]]; then
+  if [[ ! "$REPLY" == y ]]; then

Spark の展開と設定

配布パッケージができたら、任意の場所(Linux,Macであれば、/opt/spark など、Windows であれば C:\Apache\Spark など)に展開する。

Hadoop 2.6 を別途展開していない Windows では、winutils.zip を入手し、展開したディレクトリに、 null というディレクトリを作成し、bin ディレクトリに、 winutils の中身をコピーする。

正しい利用には、conf/spark-env.sh をいろいろ設定する必要があるが、今回は、HADOOP_HOME と SPARK_HOME、それと SPARK_PUBLIC_DNS だけを設定しておく。

SPARK_PUBLIC_DNS は、SparkR を動かすホストの IPアドレスを設定しておく。これは、ちょっとしたワークアラウンドで、これを設定せずに SparkR を動かすと、SparkR の WebUI がマシン上の任意のインターフェイス上に展開されてしまい、場合によっては、うまくアクセスできない場合があるためだ。今回は、ローカルマシンで動かすので、127.0.0.1 を設定しておく。

Hadoop 2.6を別途展開していない Windows で、Spark をマスタースレーブで起動する場合、HADOOP_HOME を SPARK_HOME\null に設定しておく。ローカルモードだけで SparkR を使う場合は、null\bin ディレクトリに winutils があれば、HADOOP_HOME を設定しなくてもよい。

SparkR の起動と調整(ローカルモード)

R に PATH が通っている事を確認し、
カレントディレクトリを SPARK_HOME にし、以下のコマンドで起動する。

bin/sparkR
# Windows では、CMD.EXE のコマンドプロンプトから実行する
bin\sparkR

ログの抑制

ログがずらずらと表示されると思うが、最初は、動作を確認するのに役に立つ。慣れてくると、邪魔になるので、conf/log4j.properties ファイルの最初のほうに、以下のようにしてログメッセージを抑制する。

log4.properties
log4j.rootCategory=WARN, console

R のメッセージの文字化け抑制

また、Windows では、コマンドプロンプトから R を起動すると、文字化けするので、R のプロンプトが表示されたら、以下のようにして、メッセージを英語に設定する。

R
Sys.setlocale("LC_ALL","English_US.437")

毎回、入力するのが面倒なので、%SPARK_HOME%\R\lib\SparkR\profile\shell.R をエディタで開き、.First <- function() { の直後に、上記のコマンドを書いておくとよい。

SparkR を試用する

R のプロンプトが表示されたら、Spark のコンテキストも設定されて利用できるようになっている。
とりあえず、データをSparkRで作成し、フィルタしてみる。

R
library(magrittr)

dq1 <- createDataFrame(sqlContext,quakes)
dq1 %>% 
  select("depth","mag","stations") %>%
  filter(dq1$mag > 5.5) %>%
  collect

ここでは、R に最初からついている地震データ quakes から SparkR の DataFrame オブジェクトを作り、深さ、マグニチュード、観測所数のカラムだけにし、マグニチュードが 5.5 以上のデータを求めている。

SparkR の DataFrame オブジェクトとは、R の data.frame とは、単に表形式であるというのが似ているだけで、全く関係がないオブジェクトなので注意する。
Spark 1.4 の SparkR は、RDD ではなく、SparkSQL 用に作られた新しい DataFrame オブジェクトに対して、データ処理をするようになっている。DataFrame オブジェクトは内部で RDD を持っていて、これを処理するプロトコルになっている。

演算には、magrittr というパイプ演算子ライブラリを使っているが、これをみると、おかしいことに気づく人もいるだろう。
Spark 1.4 の SparkR は、 dplyr というデータ変換ライブラリとほぼ同様のプロトコルを持っていて、dplyr を使い慣れた人なら、簡単に利用できる。だが、その実装は、まだ十分でなく、dplyr + magrittr のようなエレガントな書き方ができない。このため、select で指定するカラムは、文字列にして、filter で指定するカラムは、明示的にデータオブジェクトを指定した dq1$mag としている。
ここから、本家の dplyr をロードするとさらに混乱した状態になり、同じ関数名を持つ dplyr のライブラリが優先されて、上記の式が動作しなくなる。この場合、直接、名前空間を指定して、以下のようにする。

R
dq1 %>% 
  SparkR::select("depth","mag","stations") %>%
  SparkR::filter(dq1$mag > 5.5) %>%
  SparkR::collect()

ほか、外部データを読み込む例として、Sparkのサイトから、借りてくると、以下のような例がある。

R
people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json")

registerTempTable(people, "people")

teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19")

head(teenagers)

jsonデータを読み込んで、それを SparkSQL上のテーブルに変換し、SQLでテータを検索するもので、興味深い処理ができている。R から、SQLを使って、分散データ処理ができるというのは、いろいろ応用ができるだろう。

read.df は、今のところ、json だけを読み込めるようだ。
write.df は、json と parquet のどちらかで、書き出すこともできるが、HADOOPでの書き出しのような形になるので、通常のファイルとして書き出したいなら、collect してから、R の write.table を使う方がよいだろう。

people %>% write.df(path="people.parquet", source="parquet", mode="overwrite")

parquet を扱うとき、Windows では Hadoop のネイティブライブラリを見つけられないとエラーする。このときは、HADOOP_HOME を確認する。

また、今回は扱っていないが、Hive のデータを読み込むこともできるので、Hadoop と連携したRを活用できる場面が増えるだろう。

マスタースレーブで SparkR を利用する

ローカルモードで、動作が確認できたら、マスタースレーブで利用してみる。
まずは、Spark のマスターとワーカーを起動する必要がある。

マスターは以下のようにして起動する。

bin/spark-class org.apache.spark.deploy.master.Master --ip 127.0.0.1 --port 7077 --webui-port 4000

ここで、WebUI のポートはデフォルトから 4000 に変更している。Windows では、bash ではなく、コマンドプロンプトから起動する。最初に start をつけて起動すれば、別ウィンドウで起動する。

ワーカーは、次のようにして起動する。

bin/spark-class org.apache.spark.deploy.worker.Worker --ip 127.0.0.1 --port 7100 --webui-port 4001 spark://127.0.0.1:7077

マスターの URL は、spark://127.0.0.1:7077 であるが、最後に / (スラッシュ)をつけるとエラーするので注意する。
ここでワーカーのポートは、ランダムから 7100に固定している。複数のワーカーを起動するときにはポートを連続した領域で管理したほうが都合がよいこともあるだろう(セキュリティ的に)。

Windows でワーカーを起動するときの注意として、HADOOP_HOME を設定すること(Hadoop 2.6を展開していない場合は、%SPARK_HOME%\nullに設定する)と、R に PATHが通っていることを確認しておく。

さて、このアプリケーションとなる SparkR は、次のようにして起動する。

export SPARK_PUBLIC_DNS=127.0.0.1 
bin/sparkR --master spark://127.0.0.1:7077 --conf spark.cores.max=1

SPARK_PUBLIC_DNS は、前述したワークアラウンドで、SparkR の WebUI のためのものである。残りのオプション --conf spark.cores.max=1 は、SparkR のシェル(ドライバー)そのものがワーカーのCPUを占有しないように制限するためのもので、これをしないと、SparkR の中から、Spark のジョブが動作しない。
SparkR の WebUI ポートを指定していないが、デフォルトで 4040 に割り当てられている。

ここまで、起動して、すべてに WebUI があることに気づいただろう。ブラウザから、http://localhost:4000/ を開くと、Spark のマスターのWebUI を確認できる。
起動しているワーカーや、アプリケーションを確認でき、また、それぞれをクリックすると、ワーカーの状態や、アプリケーションの状態(http://localhost:4040/) も確認できる。特に、アプリケーションの WebUI は、ジョブが、実行された時間や、そのDAGなど、詳細に情報が確認できるようになっているので、非常に便利である。

RStudio から使う

コマンドプロンプトの R から使えるだけでも便利だが、R ならば、はやり RStudio から使いたい。
Spark をマスタースレーブで起動していれば、以下のようにして、RStudio からも利用できる。

R
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
Sys.setenv(NOAWT=1)
oldPkg <- getOption("defaultPackages")
options(defaultPackages = c(oldPkg, "SparkR"))

detach("package:SparkR", unload=T)
library(SparkR, verbose=T)   # need SPARK_HOME

sc <- SparkR::sparkR.init(master="spark://127.0.0.1:7077",sparkEnv=list(spark.cores.max="1"))
sqlContext <- SparkR::sparkRSQL.init(sc)

SparkR から RDD を利用する

Spark 1.4 で追加された SparkR は、本家だった SparkR-pkg が RDDのサポートをメインとしていたのと異なり、SparkSQL の DataFrame オブジェクトに対する dplyr 互換のプロトコルのサポートにとどまっている。
このため、今ひとつに思うところもあるが、実は、SparkR-pkg でサポートされていた RDD に対するデータ処理も、関数がエクスポートされていないだけで、実装は残されていて、利用できる。

試しに、RDD でベクターを数えてみる。

R
sc %>% SparkR:::parallelize(1:100000) %>% count

エクスポートされていない関数への名前空間の指定は、トリプルコロン(:::)を使う。
うまく動作すれば、あとは、関数名をアサインしておけば、便利に利用できる。

R
assign("parallelize",SparkR:::parallelize,pos=.GlobalEnv)
assign("map",SparkR:::map,pos=.GlobalEnv)
assign("reduceByKey",SparkR:::reduceByKey,pos=.GlobalEnv)

おなじみワードカウントをしてみる。

R
ss <- strsplit("grape orange grape apple orange",split=" ")
wl <- list()
for (i in 1:length(ss[[1]])) { wl[i] <- ss[[1]][i] }

sc %>% 
  parallelize(wl) %>%
  map(function(x) list(x,1)) %>%
  reduceByKey(function(x,y) x+y,1L) %>%
  collect

もしうまく動作しないときは、parallelize, map, reduceByKey に直接名前空間を指定してみてほしい。

SparkR から MLlib, GraphX をどうやって使うか

上記のような情況なので、Spark 1.4 の SparkR は、基本的には、DataFrame オブジェクトを使った、分散データ処理ができる R という位置づけになるだろう。
このため、現状は、MLlib や GraphX のインターフェイスもないので、こうしたものを利用するには、別途ビルドした scala/Java クラスを実行するのがよいだろう。
そのための、例として、SparkPageRank サンプルを動作させてみる。
(といっても、要は、spark-submit しているだけなのだが)

R
SparkR:::launchBackend(
  sparkSubmitOpts="--class org.apache.spark.examples.SparkPageRank lib/spark-examples-1.4.0-hadoop2.6.0.jar",
  jars="",
  args="data/mllib/pagerank_data.txt 5",
  sparkHome=Sys.getenv("SPARK_HOME"))

中身は、system を呼び出しているだけなので、引数を覚えるのが面倒ならば、直接 system を呼び出すだけだ。

R
system2("bin/spark-submit","--class org.apache.spark.examples.SparkPageRank lib/spark-examples-1.4.0-hadoop2.6.0.jar data/mllib/pagerank_data.txt 5",wait=T)

なお、launchBackend は、バックグランド処理として呼び出される。system で同様にバックグランド処理として扱うには、 wait引数を FALSEに設定する。

結局 Spark 1.4 の SparkR とは

SparkSQL の DataFrame オブジェクトを使った分散処理ができ、そのプロトコルは、dplyr 互換になっている。
また、データは、SQL で処理する事もでき、読み書きに、Hadoop を利用できるので、Hive や HDFS 上のデータも利用できる。
こうした、R への分散データ処理環境が簡単に利用できることは、魅力的だろう。

Spark 本来の RDD を扱うには、エクスポートされていないプロトコルを、名前空間を指定することで呼び出して処理できる。

また、大きく扱わなかったが、Spark 1.4 の WebUI は、よくできていて、データ処理ジョブを把握するもの容易になので、このためだけに Spark 1.4 を利用してもよいだろう。