Windows
R
hadoop
Spark

Spark 1.4.0 の SparkR を動かす

More than 3 years have passed since last update.

先日、Spark 1.4.0 がリリースされ、多数のアップデートがある


  • SparkR

  • 運用モニタリングとDAGのビジュアライゼーション

  • REST API

  • DataFrame API

この中でも、SparkR という、統計言語 R から Sparkを利用できる拡張を今回は試したい。他のHadoop関連記事では、無視されやすい Windows も取り扱う。

R には、以前から、SparkR-pkg(https://github.com/amplab-extras/SparkR-pkg/) というプロジェクトが Github上にあり、今回、これが本家に統合された形のようだ。


ビルド済みパッケージの入手

まずビルドからはじめるが、ビルドが面倒ならば、Windows にも対応したビルド済みパッケージを以下から入手できる。

http://osdn.jp/projects/win-hadoop/downloads/63406/spark-1.4.0-bin-hadoop2.6-sparkr-windows.tgz/


Spark 1.4.0 のビルド

以前のこの記事シリーズと同じく、まずは、ビルドから始める。Linux、Mac は、ビルドツールさえ揃っていれば、何も考えずにビルド可能だが、Windows では、多少、手間がかかる。

今回は、複数のマシンに展開しやすいよう配布パッケージを作りたい。Spark では、ビルドそのものは Maven でできるのだが、なぜかパッケージには、以前から専用のスクリプト make-distribution.sh を使う。

また、今回 SparkR を使う上で、ビルド時にも、R にパスが通っていなければならない。Windows では、R をパスに追加する(システムのプロパティでグローバルに追加したほうがよい)。

set PATH=C:\Progra~1\R\R-3.1.3\bin\x64;%PATH%


配布パッケージの作成

make-distribution.sh に以下のオプションをつけてビルドする。

sh ./make-distribution.sh --tgz --skip-java-test -Phadoop-2.6 -Phive -Psparkr -DskipTests

初めてビルドするときは、ネットワークとマシンによっては、40-50分かかることもあるようだ。

ビルドが成功すれば、カレントディレクトリに spark-1.4.0-bin-2.6.0.tgz のようなファイル名でパッケージができている。また、dist ディレクトリは、そのパッケージの展開した内容そのものなので、このディレクトリをどこかへコピーしてもよい。

Windows では、別途、hadoop のネイティブライブラリも必要になるが、以下より入手できるようにしてある。

http://osdn.jp/projects/win-hadoop/downloads/62852/hadoop-winutils-2.6.0.zip/


Windows (mingw) 向けの修正

このビルドスクリプトを Windows の mingw で動かすとエラーするので、これを修正する


make-distribution.diff

--- make-distribution.sh    2015-06-03 10:07:24.000000000 +0900

+++ make-distribution.mingw.sh 2015-06-17 18:58:31.000000000 +0900
@@ -139 +139 @@
- | fgrep --count "<id>hive</id>";\
+ | grep -f --count "<id>hive</id>";\
@@ -146 +146,2 @@
-if [[ ! "$JAVA_VERSION" =~ "1.6" && -z "$SKIP_JAVA_TEST" ]]; then
+echo "$JAVA_VERSION" | grep "1.6" && true
+if [[ $? -ne 0 && -z "$SKIP_JAVA_TEST" ]]; then
@@ -154 +155 @@
- if [[ ! "$REPLY" =~ ^[Yy]$ ]]; then
+ if [[ ! "$REPLY" == y ]]; then


Spark の展開と設定

配布パッケージができたら、任意の場所(Linux,Macであれば、/opt/spark など、Windows であれば C:\Apache\Spark など)に展開する。

Hadoop 2.6 を別途展開していない Windows では、winutils.zip を入手し、展開したディレクトリに、 null というディレクトリを作成し、bin ディレクトリに、 winutils の中身をコピーする。

正しい利用には、conf/spark-env.sh をいろいろ設定する必要があるが、今回は、HADOOP_HOME と SPARK_HOME、それと SPARK_PUBLIC_DNS だけを設定しておく。

SPARK_PUBLIC_DNS は、SparkR を動かすホストの IPアドレスを設定しておく。これは、ちょっとしたワークアラウンドで、これを設定せずに SparkR を動かすと、SparkR の WebUI がマシン上の任意のインターフェイス上に展開されてしまい、場合によっては、うまくアクセスできない場合があるためだ。今回は、ローカルマシンで動かすので、127.0.0.1 を設定しておく。

Hadoop 2.6を別途展開していない Windows で、Spark をマスタースレーブで起動する場合、HADOOP_HOME を SPARK_HOME\null に設定しておく。ローカルモードだけで SparkR を使う場合は、null\bin ディレクトリに winutils があれば、HADOOP_HOME を設定しなくてもよい。


SparkR の起動と調整(ローカルモード)

R に PATH が通っている事を確認し、

カレントディレクトリを SPARK_HOME にし、以下のコマンドで起動する。

bin/sparkR

# Windows では、CMD.EXE のコマンドプロンプトから実行する

bin\sparkR


ログの抑制

ログがずらずらと表示されると思うが、最初は、動作を確認するのに役に立つ。慣れてくると、邪魔になるので、conf/log4j.properties ファイルの最初のほうに、以下のようにしてログメッセージを抑制する。


log4.properties

log4j.rootCategory=WARN, console



R のメッセージの文字化け抑制

また、Windows では、コマンドプロンプトから R を起動すると、文字化けするので、R のプロンプトが表示されたら、以下のようにして、メッセージを英語に設定する。


R

Sys.setlocale("LC_ALL","English_US.437")


毎回、入力するのが面倒なので、%SPARK_HOME%\R\lib\SparkR\profile\shell.R をエディタで開き、.First <- function() { の直後に、上記のコマンドを書いておくとよい。


SparkR を試用する

R のプロンプトが表示されたら、Spark のコンテキストも設定されて利用できるようになっている。

とりあえず、データをSparkRで作成し、フィルタしてみる。


R

library(magrittr)

dq1 <- createDataFrame(sqlContext,quakes)
dq1 %>%
select("depth","mag","stations") %>%
filter(dq1$mag > 5.5) %>%
collect


ここでは、R に最初からついている地震データ quakes から SparkR の DataFrame オブジェクトを作り、深さ、マグニチュード、観測所数のカラムだけにし、マグニチュードが 5.5 以上のデータを求めている。

SparkR の DataFrame オブジェクトとは、R の data.frame とは、単に表形式であるというのが似ているだけで、全く関係がないオブジェクトなので注意する。

Spark 1.4 の SparkR は、RDD ではなく、SparkSQL 用に作られた新しい DataFrame オブジェクトに対して、データ処理をするようになっている。DataFrame オブジェクトは内部で RDD を持っていて、これを処理するプロトコルになっている。

演算には、magrittr というパイプ演算子ライブラリを使っているが、これをみると、おかしいことに気づく人もいるだろう。

Spark 1.4 の SparkR は、 dplyr というデータ変換ライブラリとほぼ同様のプロトコルを持っていて、dplyr を使い慣れた人なら、簡単に利用できる。だが、その実装は、まだ十分でなく、dplyr + magrittr のようなエレガントな書き方ができない。このため、select で指定するカラムは、文字列にして、filter で指定するカラムは、明示的にデータオブジェクトを指定した dq1$mag としている。

ここから、本家の dplyr をロードするとさらに混乱した状態になり、同じ関数名を持つ dplyr のライブラリが優先されて、上記の式が動作しなくなる。この場合、直接、名前空間を指定して、以下のようにする。


R

dq1 %>% 

SparkR::select("depth","mag","stations") %>%
SparkR::filter(dq1$mag > 5.5) %>%
SparkR::collect()

ほか、外部データを読み込む例として、Sparkのサイトから、借りてくると、以下のような例がある。


R

people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json")

registerTempTable(people, "people")

teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19")

head(teenagers)


jsonデータを読み込んで、それを SparkSQL上のテーブルに変換し、SQLでテータを検索するもので、興味深い処理ができている。R から、SQLを使って、分散データ処理ができるというのは、いろいろ応用ができるだろう。

read.df は、今のところ、json だけを読み込めるようだ。

write.df は、json と parquet のどちらかで、書き出すこともできるが、HADOOPでの書き出しのような形になるので、通常のファイルとして書き出したいなら、collect してから、R の write.table を使う方がよいだろう。

people %>% write.df(path="people.parquet", source="parquet", mode="overwrite")

parquet を扱うとき、Windows では Hadoop のネイティブライブラリを見つけられないとエラーする。このときは、HADOOP_HOME を確認する。

また、今回は扱っていないが、Hive のデータを読み込むこともできるので、Hadoop と連携したRを活用できる場面が増えるだろう。


マスタースレーブで SparkR を利用する

ローカルモードで、動作が確認できたら、マスタースレーブで利用してみる。

まずは、Spark のマスターとワーカーを起動する必要がある。

マスターは以下のようにして起動する。

bin/spark-class org.apache.spark.deploy.master.Master --ip 127.0.0.1 --port 7077 --webui-port 4000

ここで、WebUI のポートはデフォルトから 4000 に変更している。Windows では、bash ではなく、コマンドプロンプトから起動する。最初に start をつけて起動すれば、別ウィンドウで起動する。

ワーカーは、次のようにして起動する。

bin/spark-class org.apache.spark.deploy.worker.Worker --ip 127.0.0.1 --port 7100 --webui-port 4001 spark://127.0.0.1:7077

マスターの URL は、spark://127.0.0.1:7077 であるが、最後に / (スラッシュ)をつけるとエラーするので注意する。

ここでワーカーのポートは、ランダムから 7100に固定している。複数のワーカーを起動するときにはポートを連続した領域で管理したほうが都合がよいこともあるだろう(セキュリティ的に)。

Windows でワーカーを起動するときの注意として、HADOOP_HOME を設定すること(Hadoop 2.6を展開していない場合は、%SPARK_HOME%\nullに設定する)と、R に PATHが通っていることを確認しておく。

さて、このアプリケーションとなる SparkR は、次のようにして起動する。

export SPARK_PUBLIC_DNS=127.0.0.1 

bin/sparkR --master spark://127.0.0.1:7077 --conf spark.cores.max=1

SPARK_PUBLIC_DNS は、前述したワークアラウンドで、SparkR の WebUI のためのものである。残りのオプション --conf spark.cores.max=1 は、SparkR のシェル(ドライバー)そのものがワーカーのCPUを占有しないように制限するためのもので、これをしないと、SparkR の中から、Spark のジョブが動作しない。

SparkR の WebUI ポートを指定していないが、デフォルトで 4040 に割り当てられている。

ここまで、起動して、すべてに WebUI があることに気づいただろう。ブラウザから、http://localhost:4000/ を開くと、Spark のマスターのWebUI を確認できる。

起動しているワーカーや、アプリケーションを確認でき、また、それぞれをクリックすると、ワーカーの状態や、アプリケーションの状態(http://localhost:4040/) も確認できる。特に、アプリケーションの WebUI は、ジョブが、実行された時間や、そのDAGなど、詳細に情報が確認できるようになっているので、非常に便利である。


RStudio から使う

コマンドプロンプトの R から使えるだけでも便利だが、R ならば、はやり RStudio から使いたい。

Spark をマスタースレーブで起動していれば、以下のようにして、RStudio からも利用できる。


R

.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))

Sys.setenv(NOAWT=1)
oldPkg <- getOption("defaultPackages")
options(defaultPackages = c(oldPkg, "SparkR"))

detach("package:SparkR", unload=T)
library(SparkR, verbose=T) # need SPARK_HOME

sc <- SparkR::sparkR.init(master="spark://127.0.0.1:7077",sparkEnv=list(spark.cores.max="1"))
sqlContext <- SparkR::sparkRSQL.init(sc)



SparkR から RDD を利用する

Spark 1.4 で追加された SparkR は、本家だった SparkR-pkg が RDDのサポートをメインとしていたのと異なり、SparkSQL の DataFrame オブジェクトに対する dplyr 互換のプロトコルのサポートにとどまっている。

このため、今ひとつに思うところもあるが、実は、SparkR-pkg でサポートされていた RDD に対するデータ処理も、関数がエクスポートされていないだけで、実装は残されていて、利用できる。

試しに、RDD でベクターを数えてみる。


R

sc %>% SparkR:::parallelize(1:100000) %>% count


エクスポートされていない関数への名前空間の指定は、トリプルコロン(:::)を使う。

うまく動作すれば、あとは、関数名をアサインしておけば、便利に利用できる。


R

assign("parallelize",SparkR:::parallelize,pos=.GlobalEnv)

assign("map",SparkR:::map,pos=.GlobalEnv)
assign("reduceByKey",SparkR:::reduceByKey,pos=.GlobalEnv)

おなじみワードカウントをしてみる。


R

ss <- strsplit("grape orange grape apple orange",split=" ")

wl <- list()
for (i in 1:length(ss[[1]])) { wl[i] <- ss[[1]][i] }

sc %>%
parallelize(wl) %>%
map(function(x) list(x,1)) %>%
reduceByKey(function(x,y) x+y,1L) %>%
collect


もしうまく動作しないときは、parallelize, map, reduceByKey に直接名前空間を指定してみてほしい。


SparkR から MLlib, GraphX をどうやって使うか

上記のような情況なので、Spark 1.4 の SparkR は、基本的には、DataFrame オブジェクトを使った、分散データ処理ができる R という位置づけになるだろう。

このため、現状は、MLlib や GraphX のインターフェイスもないので、こうしたものを利用するには、別途ビルドした scala/Java クラスを実行するのがよいだろう。

そのための、例として、SparkPageRank サンプルを動作させてみる。

(といっても、要は、spark-submit しているだけなのだが)


R

SparkR:::launchBackend(

sparkSubmitOpts="--class org.apache.spark.examples.SparkPageRank lib/spark-examples-1.4.0-hadoop2.6.0.jar",
jars="",
args="data/mllib/pagerank_data.txt 5",
sparkHome=Sys.getenv("SPARK_HOME"))

中身は、system を呼び出しているだけなので、引数を覚えるのが面倒ならば、直接 system を呼び出すだけだ。


R

system2("bin/spark-submit","--class org.apache.spark.examples.SparkPageRank lib/spark-examples-1.4.0-hadoop2.6.0.jar data/mllib/pagerank_data.txt 5",wait=T)


なお、launchBackend は、バックグランド処理として呼び出される。system で同様にバックグランド処理として扱うには、 wait引数を FALSEに設定する。


結局 Spark 1.4 の SparkR とは

SparkSQL の DataFrame オブジェクトを使った分散処理ができ、そのプロトコルは、dplyr 互換になっている。

また、データは、SQL で処理する事もでき、読み書きに、Hadoop を利用できるので、Hive や HDFS 上のデータも利用できる。

こうした、R への分散データ処理環境が簡単に利用できることは、魅力的だろう。

Spark 本来の RDD を扱うには、エクスポートされていないプロトコルを、名前空間を指定することで呼び出して処理できる。

また、大きく扱わなかったが、Spark 1.4 の WebUI は、よくできていて、データ処理ジョブを把握するもの容易になので、このためだけに Spark 1.4 を利用してもよいだろう。