先日、Spark 1.4.0 がリリースされ、多数のアップデートがある
- SparkR
- 運用モニタリングとDAGのビジュアライゼーション
- REST API
- DataFrame API
この中でも、SparkR という、統計言語 R から Sparkを利用できる拡張を今回は試したい。他のHadoop関連記事では、無視されやすい Windows も取り扱う。
R には、以前から、SparkR-pkg(https://github.com/amplab-extras/SparkR-pkg/) というプロジェクトが Github上にあり、今回、これが本家に統合された形のようだ。
ビルド済みパッケージの入手
まずビルドからはじめるが、ビルドが面倒ならば、Windows にも対応したビルド済みパッケージを以下から入手できる。
Spark 1.4.0 のビルド
以前のこの記事シリーズと同じく、まずは、ビルドから始める。Linux、Mac は、ビルドツールさえ揃っていれば、何も考えずにビルド可能だが、Windows では、多少、手間がかかる。
今回は、複数のマシンに展開しやすいよう配布パッケージを作りたい。Spark では、ビルドそのものは Maven でできるのだが、なぜかパッケージには、以前から専用のスクリプト make-distribution.sh を使う。
また、今回 SparkR を使う上で、ビルド時にも、R にパスが通っていなければならない。Windows では、R をパスに追加する(システムのプロパティでグローバルに追加したほうがよい)。
set PATH=C:\Progra~1\R\R-3.1.3\bin\x64;%PATH%
配布パッケージの作成
make-distribution.sh に以下のオプションをつけてビルドする。
sh ./make-distribution.sh --tgz --skip-java-test -Phadoop-2.6 -Phive -Psparkr -DskipTests
初めてビルドするときは、ネットワークとマシンによっては、40-50分かかることもあるようだ。
ビルドが成功すれば、カレントディレクトリに spark-1.4.0-bin-2.6.0.tgz のようなファイル名でパッケージができている。また、dist ディレクトリは、そのパッケージの展開した内容そのものなので、このディレクトリをどこかへコピーしてもよい。
Windows では、別途、hadoop のネイティブライブラリも必要になるが、以下より入手できるようにしてある。
Windows (mingw) 向けの修正
このビルドスクリプトを Windows の mingw で動かすとエラーするので、これを修正する
--- make-distribution.sh 2015-06-03 10:07:24.000000000 +0900
+++ make-distribution.mingw.sh 2015-06-17 18:58:31.000000000 +0900
@@ -139 +139 @@
- | fgrep --count "<id>hive</id>";\
+ | grep -f --count "<id>hive</id>";\
@@ -146 +146,2 @@
-if [[ ! "$JAVA_VERSION" =~ "1.6" && -z "$SKIP_JAVA_TEST" ]]; then
+echo "$JAVA_VERSION" | grep "1.6" && true
+if [[ $? -ne 0 && -z "$SKIP_JAVA_TEST" ]]; then
@@ -154 +155 @@
- if [[ ! "$REPLY" =~ ^[Yy]$ ]]; then
+ if [[ ! "$REPLY" == y ]]; then
Spark の展開と設定
配布パッケージができたら、任意の場所(Linux,Macであれば、/opt/spark など、Windows であれば C:\Apache\Spark など)に展開する。
Hadoop 2.6 を別途展開していない Windows では、winutils.zip を入手し、展開したディレクトリに、 null というディレクトリを作成し、bin ディレクトリに、 winutils の中身をコピーする。
正しい利用には、conf/spark-env.sh をいろいろ設定する必要があるが、今回は、HADOOP_HOME と SPARK_HOME、それと SPARK_PUBLIC_DNS だけを設定しておく。
SPARK_PUBLIC_DNS は、SparkR を動かすホストの IPアドレスを設定しておく。これは、ちょっとしたワークアラウンドで、これを設定せずに SparkR を動かすと、SparkR の WebUI がマシン上の任意のインターフェイス上に展開されてしまい、場合によっては、うまくアクセスできない場合があるためだ。今回は、ローカルマシンで動かすので、127.0.0.1 を設定しておく。
Hadoop 2.6を別途展開していない Windows で、Spark をマスタースレーブで起動する場合、HADOOP_HOME を SPARK_HOME\null に設定しておく。ローカルモードだけで SparkR を使う場合は、null\bin ディレクトリに winutils があれば、HADOOP_HOME を設定しなくてもよい。
SparkR の起動と調整(ローカルモード)
R に PATH が通っている事を確認し、
カレントディレクトリを SPARK_HOME にし、以下のコマンドで起動する。
bin/sparkR
# Windows では、CMD.EXE のコマンドプロンプトから実行する
bin\sparkR
ログの抑制
ログがずらずらと表示されると思うが、最初は、動作を確認するのに役に立つ。慣れてくると、邪魔になるので、conf/log4j.properties ファイルの最初のほうに、以下のようにしてログメッセージを抑制する。
log4j.rootCategory=WARN, console
R のメッセージの文字化け抑制
また、Windows では、コマンドプロンプトから R を起動すると、文字化けするので、R のプロンプトが表示されたら、以下のようにして、メッセージを英語に設定する。
Sys.setlocale("LC_ALL","English_US.437")
毎回、入力するのが面倒なので、%SPARK_HOME%\R\lib\SparkR\profile\shell.R をエディタで開き、.First <- function() { の直後に、上記のコマンドを書いておくとよい。
SparkR を試用する
R のプロンプトが表示されたら、Spark のコンテキストも設定されて利用できるようになっている。
とりあえず、データをSparkRで作成し、フィルタしてみる。
library(magrittr)
dq1 <- createDataFrame(sqlContext,quakes)
dq1 %>%
select("depth","mag","stations") %>%
filter(dq1$mag > 5.5) %>%
collect
ここでは、R に最初からついている地震データ quakes から SparkR の DataFrame オブジェクトを作り、深さ、マグニチュード、観測所数のカラムだけにし、マグニチュードが 5.5 以上のデータを求めている。
SparkR の DataFrame オブジェクトとは、R の data.frame とは、単に表形式であるというのが似ているだけで、全く関係がないオブジェクトなので注意する。
Spark 1.4 の SparkR は、RDD ではなく、SparkSQL 用に作られた新しい DataFrame オブジェクトに対して、データ処理をするようになっている。DataFrame オブジェクトは内部で RDD を持っていて、これを処理するプロトコルになっている。
演算には、magrittr というパイプ演算子ライブラリを使っているが、これをみると、おかしいことに気づく人もいるだろう。
Spark 1.4 の SparkR は、 dplyr というデータ変換ライブラリとほぼ同様のプロトコルを持っていて、dplyr を使い慣れた人なら、簡単に利用できる。だが、その実装は、まだ十分でなく、dplyr + magrittr のようなエレガントな書き方ができない。このため、select で指定するカラムは、文字列にして、filter で指定するカラムは、明示的にデータオブジェクトを指定した dq1$mag としている。
ここから、本家の dplyr をロードするとさらに混乱した状態になり、同じ関数名を持つ dplyr のライブラリが優先されて、上記の式が動作しなくなる。この場合、直接、名前空間を指定して、以下のようにする。
dq1 %>%
SparkR::select("depth","mag","stations") %>%
SparkR::filter(dq1$mag > 5.5) %>%
SparkR::collect()
ほか、外部データを読み込む例として、Sparkのサイトから、借りてくると、以下のような例がある。
people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json")
registerTempTable(people, "people")
teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
jsonデータを読み込んで、それを SparkSQL上のテーブルに変換し、SQLでテータを検索するもので、興味深い処理ができている。R から、SQLを使って、分散データ処理ができるというのは、いろいろ応用ができるだろう。
read.df は、今のところ、json だけを読み込めるようだ。
write.df は、json と parquet のどちらかで、書き出すこともできるが、HADOOPでの書き出しのような形になるので、通常のファイルとして書き出したいなら、collect してから、R の write.table を使う方がよいだろう。
people %>% write.df(path="people.parquet", source="parquet", mode="overwrite")
parquet を扱うとき、Windows では Hadoop のネイティブライブラリを見つけられないとエラーする。このときは、HADOOP_HOME を確認する。
また、今回は扱っていないが、Hive のデータを読み込むこともできるので、Hadoop と連携したRを活用できる場面が増えるだろう。
マスタースレーブで SparkR を利用する
ローカルモードで、動作が確認できたら、マスタースレーブで利用してみる。
まずは、Spark のマスターとワーカーを起動する必要がある。
マスターは以下のようにして起動する。
bin/spark-class org.apache.spark.deploy.master.Master --ip 127.0.0.1 --port 7077 --webui-port 4000
ここで、WebUI のポートはデフォルトから 4000 に変更している。Windows では、bash ではなく、コマンドプロンプトから起動する。最初に start をつけて起動すれば、別ウィンドウで起動する。
ワーカーは、次のようにして起動する。
bin/spark-class org.apache.spark.deploy.worker.Worker --ip 127.0.0.1 --port 7100 --webui-port 4001 spark://127.0.0.1:7077
マスターの URL は、spark://127.0.0.1:7077 であるが、最後に / (スラッシュ)をつけるとエラーするので注意する。
ここでワーカーのポートは、ランダムから 7100に固定している。複数のワーカーを起動するときにはポートを連続した領域で管理したほうが都合がよいこともあるだろう(セキュリティ的に)。
Windows でワーカーを起動するときの注意として、HADOOP_HOME を設定すること(Hadoop 2.6を展開していない場合は、%SPARK_HOME%\nullに設定する)と、R に PATHが通っていることを確認しておく。
さて、このアプリケーションとなる SparkR は、次のようにして起動する。
export SPARK_PUBLIC_DNS=127.0.0.1
bin/sparkR --master spark://127.0.0.1:7077 --conf spark.cores.max=1
SPARK_PUBLIC_DNS は、前述したワークアラウンドで、SparkR の WebUI のためのものである。残りのオプション --conf spark.cores.max=1 は、SparkR のシェル(ドライバー)そのものがワーカーのCPUを占有しないように制限するためのもので、これをしないと、SparkR の中から、Spark のジョブが動作しない。
SparkR の WebUI ポートを指定していないが、デフォルトで 4040 に割り当てられている。
ここまで、起動して、すべてに WebUI があることに気づいただろう。ブラウザから、http://localhost:4000/ を開くと、Spark のマスターのWebUI を確認できる。
起動しているワーカーや、アプリケーションを確認でき、また、それぞれをクリックすると、ワーカーの状態や、アプリケーションの状態(http://localhost:4040/) も確認できる。特に、アプリケーションの WebUI は、ジョブが、実行された時間や、そのDAGなど、詳細に情報が確認できるようになっているので、非常に便利である。
RStudio から使う
コマンドプロンプトの R から使えるだけでも便利だが、R ならば、はやり RStudio から使いたい。
Spark をマスタースレーブで起動していれば、以下のようにして、RStudio からも利用できる。
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
Sys.setenv(NOAWT=1)
oldPkg <- getOption("defaultPackages")
options(defaultPackages = c(oldPkg, "SparkR"))
detach("package:SparkR", unload=T)
library(SparkR, verbose=T) # need SPARK_HOME
sc <- SparkR::sparkR.init(master="spark://127.0.0.1:7077",sparkEnv=list(spark.cores.max="1"))
sqlContext <- SparkR::sparkRSQL.init(sc)
SparkR から RDD を利用する
Spark 1.4 で追加された SparkR は、本家だった SparkR-pkg が RDDのサポートをメインとしていたのと異なり、SparkSQL の DataFrame オブジェクトに対する dplyr 互換のプロトコルのサポートにとどまっている。
このため、今ひとつに思うところもあるが、実は、SparkR-pkg でサポートされていた RDD に対するデータ処理も、関数がエクスポートされていないだけで、実装は残されていて、利用できる。
試しに、RDD でベクターを数えてみる。
sc %>% SparkR:::parallelize(1:100000) %>% count
エクスポートされていない関数への名前空間の指定は、トリプルコロン(:::)を使う。
うまく動作すれば、あとは、関数名をアサインしておけば、便利に利用できる。
assign("parallelize",SparkR:::parallelize,pos=.GlobalEnv)
assign("map",SparkR:::map,pos=.GlobalEnv)
assign("reduceByKey",SparkR:::reduceByKey,pos=.GlobalEnv)
おなじみワードカウントをしてみる。
ss <- strsplit("grape orange grape apple orange",split=" ")
wl <- list()
for (i in 1:length(ss[[1]])) { wl[i] <- ss[[1]][i] }
sc %>%
parallelize(wl) %>%
map(function(x) list(x,1)) %>%
reduceByKey(function(x,y) x+y,1L) %>%
collect
もしうまく動作しないときは、parallelize, map, reduceByKey に直接名前空間を指定してみてほしい。
SparkR から MLlib, GraphX をどうやって使うか
上記のような情況なので、Spark 1.4 の SparkR は、基本的には、DataFrame オブジェクトを使った、分散データ処理ができる R という位置づけになるだろう。
このため、現状は、MLlib や GraphX のインターフェイスもないので、こうしたものを利用するには、別途ビルドした scala/Java クラスを実行するのがよいだろう。
そのための、例として、SparkPageRank サンプルを動作させてみる。
(といっても、要は、spark-submit しているだけなのだが)
SparkR:::launchBackend(
sparkSubmitOpts="--class org.apache.spark.examples.SparkPageRank lib/spark-examples-1.4.0-hadoop2.6.0.jar",
jars="",
args="data/mllib/pagerank_data.txt 5",
sparkHome=Sys.getenv("SPARK_HOME"))
中身は、system を呼び出しているだけなので、引数を覚えるのが面倒ならば、直接 system を呼び出すだけだ。
system2("bin/spark-submit","--class org.apache.spark.examples.SparkPageRank lib/spark-examples-1.4.0-hadoop2.6.0.jar data/mllib/pagerank_data.txt 5",wait=T)
なお、launchBackend は、バックグランド処理として呼び出される。system で同様にバックグランド処理として扱うには、 wait引数を FALSEに設定する。
結局 Spark 1.4 の SparkR とは
SparkSQL の DataFrame オブジェクトを使った分散処理ができ、そのプロトコルは、dplyr 互換になっている。
また、データは、SQL で処理する事もでき、読み書きに、Hadoop を利用できるので、Hive や HDFS 上のデータも利用できる。
こうした、R への分散データ処理環境が簡単に利用できることは、魅力的だろう。
Spark 本来の RDD を扱うには、エクスポートされていないプロトコルを、名前空間を指定することで呼び出して処理できる。
また、大きく扱わなかったが、Spark 1.4 の WebUI は、よくできていて、データ処理ジョブを把握するもの容易になので、このためだけに Spark 1.4 を利用してもよいだろう。