47
Help us understand the problem. What are the problem?

More than 5 years have passed since last update.

posted at

updated at

Spark 1.4.0 の SparkR を動かす

先日、Spark 1.4.0 がリリースされ、多数のアップデートがある

  • SparkR
  • 運用モニタリングとDAGのビジュアライゼーション
  • REST API
  • DataFrame API

この中でも、SparkR という、統計言語 R から Sparkを利用できる拡張を今回は試したい。他のHadoop関連記事では、無視されやすい Windows も取り扱う。

R には、以前から、SparkR-pkg(https://github.com/amplab-extras/SparkR-pkg/) というプロジェクトが Github上にあり、今回、これが本家に統合された形のようだ。

ビルド済みパッケージの入手

まずビルドからはじめるが、ビルドが面倒ならば、Windows にも対応したビルド済みパッケージを以下から入手できる。

Spark 1.4.0 のビルド

以前のこの記事シリーズと同じく、まずは、ビルドから始める。Linux、Mac は、ビルドツールさえ揃っていれば、何も考えずにビルド可能だが、Windows では、多少、手間がかかる。
今回は、複数のマシンに展開しやすいよう配布パッケージを作りたい。Spark では、ビルドそのものは Maven でできるのだが、なぜかパッケージには、以前から専用のスクリプト make-distribution.sh を使う。

また、今回 SparkR を使う上で、ビルド時にも、R にパスが通っていなければならない。Windows では、R をパスに追加する(システムのプロパティでグローバルに追加したほうがよい)。

set PATH=C:\Progra~1\R\R-3.1.3\bin\x64;%PATH%

配布パッケージの作成

make-distribution.sh に以下のオプションをつけてビルドする。

sh ./make-distribution.sh --tgz --skip-java-test -Phadoop-2.6 -Phive -Psparkr -DskipTests

初めてビルドするときは、ネットワークとマシンによっては、40-50分かかることもあるようだ。
ビルドが成功すれば、カレントディレクトリに spark-1.4.0-bin-2.6.0.tgz のようなファイル名でパッケージができている。また、dist ディレクトリは、そのパッケージの展開した内容そのものなので、このディレクトリをどこかへコピーしてもよい。

Windows では、別途、hadoop のネイティブライブラリも必要になるが、以下より入手できるようにしてある。

Windows (mingw) 向けの修正

このビルドスクリプトを Windows の mingw で動かすとエラーするので、これを修正する

make-distribution.diff
--- make-distribution.sh    2015-06-03 10:07:24.000000000 +0900
+++ make-distribution.mingw.sh  2015-06-17 18:58:31.000000000 +0900
@@ -139 +139 @@
-    | fgrep --count "<id>hive</id>";\
+    | grep -f --count "<id>hive</id>";\
@@ -146 +146,2 @@
-if [[ ! "$JAVA_VERSION" =~ "1.6" && -z "$SKIP_JAVA_TEST" ]]; then
+echo "$JAVA_VERSION" | grep "1.6" && true
+if [[ $? -ne 0 && -z "$SKIP_JAVA_TEST" ]]; then
@@ -154 +155 @@
-  if [[ ! "$REPLY" =~ ^[Yy]$ ]]; then
+  if [[ ! "$REPLY" == y ]]; then

Spark の展開と設定

配布パッケージができたら、任意の場所(Linux,Macであれば、/opt/spark など、Windows であれば C:\Apache\Spark など)に展開する。

Hadoop 2.6 を別途展開していない Windows では、winutils.zip を入手し、展開したディレクトリに、 null というディレクトリを作成し、bin ディレクトリに、 winutils の中身をコピーする。

正しい利用には、conf/spark-env.sh をいろいろ設定する必要があるが、今回は、HADOOP_HOME と SPARK_HOME、それと SPARK_PUBLIC_DNS だけを設定しておく。

SPARK_PUBLIC_DNS は、SparkR を動かすホストの IPアドレスを設定しておく。これは、ちょっとしたワークアラウンドで、これを設定せずに SparkR を動かすと、SparkR の WebUI がマシン上の任意のインターフェイス上に展開されてしまい、場合によっては、うまくアクセスできない場合があるためだ。今回は、ローカルマシンで動かすので、127.0.0.1 を設定しておく。

Hadoop 2.6を別途展開していない Windows で、Spark をマスタースレーブで起動する場合、HADOOP_HOME を SPARK_HOME\null に設定しておく。ローカルモードだけで SparkR を使う場合は、null\bin ディレクトリに winutils があれば、HADOOP_HOME を設定しなくてもよい。

SparkR の起動と調整(ローカルモード)

R に PATH が通っている事を確認し、
カレントディレクトリを SPARK_HOME にし、以下のコマンドで起動する。

bin/sparkR
# Windows では、CMD.EXE のコマンドプロンプトから実行する
bin\sparkR

ログの抑制

ログがずらずらと表示されると思うが、最初は、動作を確認するのに役に立つ。慣れてくると、邪魔になるので、conf/log4j.properties ファイルの最初のほうに、以下のようにしてログメッセージを抑制する。

log4.properties
log4j.rootCategory=WARN, console

R のメッセージの文字化け抑制

また、Windows では、コマンドプロンプトから R を起動すると、文字化けするので、R のプロンプトが表示されたら、以下のようにして、メッセージを英語に設定する。

R
Sys.setlocale("LC_ALL","English_US.437")

毎回、入力するのが面倒なので、%SPARK_HOME%\R\lib\SparkR\profile\shell.R をエディタで開き、.First <- function() { の直後に、上記のコマンドを書いておくとよい。

SparkR を試用する

R のプロンプトが表示されたら、Spark のコンテキストも設定されて利用できるようになっている。
とりあえず、データをSparkRで作成し、フィルタしてみる。

R
library(magrittr)

dq1 <- createDataFrame(sqlContext,quakes)
dq1 %>% 
  select("depth","mag","stations") %>%
  filter(dq1$mag > 5.5) %>%
  collect

ここでは、R に最初からついている地震データ quakes から SparkR の DataFrame オブジェクトを作り、深さ、マグニチュード、観測所数のカラムだけにし、マグニチュードが 5.5 以上のデータを求めている。

SparkR の DataFrame オブジェクトとは、R の data.frame とは、単に表形式であるというのが似ているだけで、全く関係がないオブジェクトなので注意する。
Spark 1.4 の SparkR は、RDD ではなく、SparkSQL 用に作られた新しい DataFrame オブジェクトに対して、データ処理をするようになっている。DataFrame オブジェクトは内部で RDD を持っていて、これを処理するプロトコルになっている。

演算には、magrittr というパイプ演算子ライブラリを使っているが、これをみると、おかしいことに気づく人もいるだろう。
Spark 1.4 の SparkR は、 dplyr というデータ変換ライブラリとほぼ同様のプロトコルを持っていて、dplyr を使い慣れた人なら、簡単に利用できる。だが、その実装は、まだ十分でなく、dplyr + magrittr のようなエレガントな書き方ができない。このため、select で指定するカラムは、文字列にして、filter で指定するカラムは、明示的にデータオブジェクトを指定した dq1$mag としている。
ここから、本家の dplyr をロードするとさらに混乱した状態になり、同じ関数名を持つ dplyr のライブラリが優先されて、上記の式が動作しなくなる。この場合、直接、名前空間を指定して、以下のようにする。

R
dq1 %>% 
  SparkR::select("depth","mag","stations") %>%
  SparkR::filter(dq1$mag > 5.5) %>%
  SparkR::collect()

ほか、外部データを読み込む例として、Sparkのサイトから、借りてくると、以下のような例がある。

R
people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json")

registerTempTable(people, "people")

teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19")

head(teenagers)

jsonデータを読み込んで、それを SparkSQL上のテーブルに変換し、SQLでテータを検索するもので、興味深い処理ができている。R から、SQLを使って、分散データ処理ができるというのは、いろいろ応用ができるだろう。

read.df は、今のところ、json だけを読み込めるようだ。
write.df は、json と parquet のどちらかで、書き出すこともできるが、HADOOPでの書き出しのような形になるので、通常のファイルとして書き出したいなら、collect してから、R の write.table を使う方がよいだろう。

people %>% write.df(path="people.parquet", source="parquet", mode="overwrite")

parquet を扱うとき、Windows では Hadoop のネイティブライブラリを見つけられないとエラーする。このときは、HADOOP_HOME を確認する。

また、今回は扱っていないが、Hive のデータを読み込むこともできるので、Hadoop と連携したRを活用できる場面が増えるだろう。

マスタースレーブで SparkR を利用する

ローカルモードで、動作が確認できたら、マスタースレーブで利用してみる。
まずは、Spark のマスターとワーカーを起動する必要がある。

マスターは以下のようにして起動する。

bin/spark-class org.apache.spark.deploy.master.Master --ip 127.0.0.1 --port 7077 --webui-port 4000

ここで、WebUI のポートはデフォルトから 4000 に変更している。Windows では、bash ではなく、コマンドプロンプトから起動する。最初に start をつけて起動すれば、別ウィンドウで起動する。

ワーカーは、次のようにして起動する。

bin/spark-class org.apache.spark.deploy.worker.Worker --ip 127.0.0.1 --port 7100 --webui-port 4001 spark://127.0.0.1:7077

マスターの URL は、spark://127.0.0.1:7077 であるが、最後に / (スラッシュ)をつけるとエラーするので注意する。
ここでワーカーのポートは、ランダムから 7100に固定している。複数のワーカーを起動するときにはポートを連続した領域で管理したほうが都合がよいこともあるだろう(セキュリティ的に)。

Windows でワーカーを起動するときの注意として、HADOOP_HOME を設定すること(Hadoop 2.6を展開していない場合は、%SPARK_HOME%\nullに設定する)と、R に PATHが通っていることを確認しておく。

さて、このアプリケーションとなる SparkR は、次のようにして起動する。

export SPARK_PUBLIC_DNS=127.0.0.1 
bin/sparkR --master spark://127.0.0.1:7077 --conf spark.cores.max=1

SPARK_PUBLIC_DNS は、前述したワークアラウンドで、SparkR の WebUI のためのものである。残りのオプション --conf spark.cores.max=1 は、SparkR のシェル(ドライバー)そのものがワーカーのCPUを占有しないように制限するためのもので、これをしないと、SparkR の中から、Spark のジョブが動作しない。
SparkR の WebUI ポートを指定していないが、デフォルトで 4040 に割り当てられている。

ここまで、起動して、すべてに WebUI があることに気づいただろう。ブラウザから、http://localhost:4000/ を開くと、Spark のマスターのWebUI を確認できる。
起動しているワーカーや、アプリケーションを確認でき、また、それぞれをクリックすると、ワーカーの状態や、アプリケーションの状態(http://localhost:4040/) も確認できる。特に、アプリケーションの WebUI は、ジョブが、実行された時間や、そのDAGなど、詳細に情報が確認できるようになっているので、非常に便利である。

RStudio から使う

コマンドプロンプトの R から使えるだけでも便利だが、R ならば、はやり RStudio から使いたい。
Spark をマスタースレーブで起動していれば、以下のようにして、RStudio からも利用できる。

R
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
Sys.setenv(NOAWT=1)
oldPkg <- getOption("defaultPackages")
options(defaultPackages = c(oldPkg, "SparkR"))

detach("package:SparkR", unload=T)
library(SparkR, verbose=T)   # need SPARK_HOME

sc <- SparkR::sparkR.init(master="spark://127.0.0.1:7077",sparkEnv=list(spark.cores.max="1"))
sqlContext <- SparkR::sparkRSQL.init(sc)

SparkR から RDD を利用する

Spark 1.4 で追加された SparkR は、本家だった SparkR-pkg が RDDのサポートをメインとしていたのと異なり、SparkSQL の DataFrame オブジェクトに対する dplyr 互換のプロトコルのサポートにとどまっている。
このため、今ひとつに思うところもあるが、実は、SparkR-pkg でサポートされていた RDD に対するデータ処理も、関数がエクスポートされていないだけで、実装は残されていて、利用できる。

試しに、RDD でベクターを数えてみる。

R
sc %>% SparkR:::parallelize(1:100000) %>% count

エクスポートされていない関数への名前空間の指定は、トリプルコロン(:::)を使う。
うまく動作すれば、あとは、関数名をアサインしておけば、便利に利用できる。

R
assign("parallelize",SparkR:::parallelize,pos=.GlobalEnv)
assign("map",SparkR:::map,pos=.GlobalEnv)
assign("reduceByKey",SparkR:::reduceByKey,pos=.GlobalEnv)

おなじみワードカウントをしてみる。

R
ss <- strsplit("grape orange grape apple orange",split=" ")
wl <- list()
for (i in 1:length(ss[[1]])) { wl[i] <- ss[[1]][i] }

sc %>% 
  parallelize(wl) %>%
  map(function(x) list(x,1)) %>%
  reduceByKey(function(x,y) x+y,1L) %>%
  collect

もしうまく動作しないときは、parallelize, map, reduceByKey に直接名前空間を指定してみてほしい。

SparkR から MLlib, GraphX をどうやって使うか

上記のような情況なので、Spark 1.4 の SparkR は、基本的には、DataFrame オブジェクトを使った、分散データ処理ができる R という位置づけになるだろう。
このため、現状は、MLlib や GraphX のインターフェイスもないので、こうしたものを利用するには、別途ビルドした scala/Java クラスを実行するのがよいだろう。
そのための、例として、SparkPageRank サンプルを動作させてみる。
(といっても、要は、spark-submit しているだけなのだが)

R
SparkR:::launchBackend(
  sparkSubmitOpts="--class org.apache.spark.examples.SparkPageRank lib/spark-examples-1.4.0-hadoop2.6.0.jar",
  jars="",
  args="data/mllib/pagerank_data.txt 5",
  sparkHome=Sys.getenv("SPARK_HOME"))

中身は、system を呼び出しているだけなので、引数を覚えるのが面倒ならば、直接 system を呼び出すだけだ。

R
system2("bin/spark-submit","--class org.apache.spark.examples.SparkPageRank lib/spark-examples-1.4.0-hadoop2.6.0.jar data/mllib/pagerank_data.txt 5",wait=T)

なお、launchBackend は、バックグランド処理として呼び出される。system で同様にバックグランド処理として扱うには、 wait引数を FALSEに設定する。

結局 Spark 1.4 の SparkR とは

SparkSQL の DataFrame オブジェクトを使った分散処理ができ、そのプロトコルは、dplyr 互換になっている。
また、データは、SQL で処理する事もでき、読み書きに、Hadoop を利用できるので、Hive や HDFS 上のデータも利用できる。
こうした、R への分散データ処理環境が簡単に利用できることは、魅力的だろう。

Spark 本来の RDD を扱うには、エクスポートされていないプロトコルを、名前空間を指定することで呼び出して処理できる。

また、大きく扱わなかったが、Spark 1.4 の WebUI は、よくできていて、データ処理ジョブを把握するもの容易になので、このためだけに Spark 1.4 を利用してもよいだろう。

Register as a new user and use Qiita more conveniently

  1. You can follow users and tags
  2. you can stock useful information
  3. You can make editorial suggestions for articles
What you can do with signing up
47
Help us understand the problem. What are the problem?