0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

RとSparkで並列分散処理をやってみた

Last updated at Posted at 2021-11-15

Rの並列・分散処理ができる方法を探してみたところ、Apache SparkとSparkRなるものを発見。
とりあえずやってみたら思った以上に早くなりました。
しかし、日本語の資料があまりにも少なくて苦労したので、何かの役に立つかと思って投稿してみました。

#参考ページ
Sparkについてはこちら
その日本語訳
最初に見つけてすごく役に立ったのがこちら
こちらも、概要を把握するのに役立ちます。

#やってみたこと
 RStudioを使ってVAR(Vector Auto-Regression:ベクトル自己回帰)分析を分散処理してみた。
 処理の方法はlapplyでコトたりたので、mapとかreduceとかは使っていません。
 よくわからないままいろいろ試した結果なので、間違っているところもあると思います。間違ったところがあるとか、もっといいやり方があるとかあれば、コメントください。

#やってみた環境
インターネットに繋がっていないLAN環境
端末はWindows7(32bit)とWindows10(64bit)が混在
Rは3.5.2にパッケージ「vars」をインストール
Sparkは2.0.2を使いました

#単体マシンで試してみる
###1 インターネットにつながっている端末で必要なファイルをダウンロード

  1. ここからSparkをダウンロード。

    私はこちらから旧バージョンのspark-2.0.2-bin-hadoop2.7.tgz をとってきました。
  2. ここからHADOOPwin-utilのダウンロード

    今回はhadoop-2.7.1を使いました。
  3. JAVAはインストール済みのJRE8.0.202を使いました。

    入れてない人はこちらからインストールしてください。
  4. RとRStudioはインストール済みのR-3.5.2を使いました。

    あと、Rのパッケージの「vars」をRにインストールしておいてください。

    必要であればこのへんからどうぞ。

    (CRANにパッケージ「SparkR」があるけど、これは不要)

###2 環境設定

  1. ダウンロードしたSparkとHADOOPwin-utilを端末のどこかに保存します。
  2. Cドライブ直下に「Apache」フォルダを作り、その中にspark-2.0.2-bin-hadoop2.7.tgzを解凍します。

    解凍すると「spark-2.0.2-bin-hadoop2.7」という長い名前のフォルダができるので、フォルダ名を「spark」に変えます。
  3. C:¥Apache¥spark直下に「null」フォルダを作り、その中にHADOOPwin-utilの中身(REASME.mdとbinフォルダ)を全てコピーします。
  4. Java.exeのパスを確認しておきます。

     (私の環境では、C:¥Program Files¥Java¥jre1.8.0_202¥binでした)
  5. C:¥Apache¥spark¥confの中のファイルspark-env.sh.templateのファイル名をspark-env.shに変えます。

    C:¥Apache¥spark¥conf¥spark-env.shをメモ帳などで開き、どこでもいいので以下の4行を追加する。
spark-env.sh
SPARK_HOME,C:¥Apache¥spark
SPARK_PUBLIC_DNS,XXX.XXX.XXX.XXX  # XXX.XXX.XXX.XXXは自端末のIPアドレス
HADOOP_HOME,SPARK_HOME¥null
JAVA_HOME,C:¥Program Files¥Java¥jre1.8.0_202   # 端末のJava.exeのパスに合わせて修正

6.システム環境変数のパスに、Rscript.exeのパスを追加する。
 Rscript.exeは、C:¥Program Files¥R¥binなどにあると思いますが、フルパス(例 C:¥.....¥bin)を確認します。
 環境変数の追加の方法はこのへんのサイトを参考にしてください。

ここまでで、フォルダ構成はこのようになっています。

フォルダ構成
C:¥Apache¥spark¥bin
               ¥conf   <- この中にspark-env.sh
               ¥data
           :
           :
         ¥null    <- この中にhadoop-2.7.1のREASME.mdとbinフォルダ
           :

###3 単体での動作確認

  1. マスタの立ち上げ

    コマンドプロンプトを開いて
コマンドプロンプト
C:
cd ¥Apache¥spark
bin/spark-class org.apache.spark.deploy.master.Master --ip XXX.XXX.XXX.XXX --port 7077 --webui-port 4000
rem XXX.XXX.XXX.XXXは自端末のIPアドレス

を実行。
 しばらく待つと、何やらメッセージがぞろぞろ表示される。
 その中に「INFO Master:Running Spark version....」「INFO Utils:Successfully started service.....」と表示されていれば立ち上げ成功。
 コマンドプロンプトは閉じないでそのまま表示させておく。
 (Sparkを切るときは、[CTRL]+[C]でバッチジョブを終了させる)
 メッセージに「Error ...」と表示されていたら立ち上げ失敗なので、設定や立ち上げコマンドを見直します。
2. webUIで確認
 ブラウザで http://localhost:4040/ を開く。
 ApacheのwebUIが表示されたら、マスタが立ち上がっている証拠です。
3. ワーカの立ち上げ
 もう一枚コマンドプロンプトを開いて

コマンドプロンプト
C:
cd ¥Apache¥spark
bin/spark-class org.apache.spark.deploy.worker.Worker --ip XXX.XXX.XXX.XXX --port 7100 --webui-port 4001 spark://XXX.XXX.XXX.XXX:7077
rem XXX.XXX.XXX.XXXは自端末のIPアドレス

を実行。
 しばらく待つと、またもやメッセージがぞろぞろ表示される。
 その中に「INFO Worker:Successfully refistered with...」と表示されていれば立ち上げ成功。
 コマンドプロンプトは閉じないで表示させておく。
 「Error....」と表示されていたら立ち上げ失敗なので、設定か立ち上げコマンドを見直す。
3. ブラウザのSpark WebUI の表示を更新してみる。
 Worker IDに端末が追加されていることを確認できます。

###4 RStudioからSparkのセッションを起動
RStudioを起動して、以下を実行

RStudio
Sys.setenv(NOAWT=1)
Sys.setenv(SPARK_HOME=normalizePath('C:¥¥Apache¥¥spark'))
Sys.setenv(HADOOP_HOME=normalizePath('C:¥¥Apache¥¥spark¥¥null'))
Sys.setenv(JAVA_HOME=normalizePath('C:¥¥Program Files¥¥Java¥¥jre1.8.0_202'))   #端末のJava.exeのパスに合わせて修正
library(SparkR,lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R" , "lib")))  
sparkR.session(enableHiveSupport=FALSE,master="spark://XXX.XXX.XXX.XXX:7077",appName = "testSPARK")   #XXX.XXX.XXX.XXXは端末のIPアドレス
#------Sparkセッション開始

 ここでブラウザのWebUIの表示を更新してみて、Application IDとアプリケーション名「testSPARK」が表示されていれば、Spark Sessionの立ち上げ成功です。
 確認できたら、いったんSpark Sessionを閉じます。

RStudio
sparkR.session.stop()		#-----Sparkセッション終了

###5 単体で動作確認

  1. sparkの動作確認

    とりあえずの動作確認のため、Sparkデータフレームを作ってみる。
RStudio
#-------Sparkの準備
Sys.setenv(NOAWT=1)
Sys.setenv(SPARK_HOME=normalizePath('C:¥¥Apache¥¥spark'))
Sys.setenv(HADOOP_HOME=normalizePath('C:¥¥Apache¥¥spark¥¥null'))
Sys.setenv(JAVA_HOME=normalizePath('C:¥¥Program Files¥¥Java¥¥jre1.8.0_202'))   #端末のJava.exeのパスに合わせて修正
library(SparkR,lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R" , "lib")))  
sparkR.session(enableHiveSupport=FALSE,master="spark://XXX.XXX.XXX.XXX:7077",appName = "testSPARK")   #XXX.XXX.XXX.XXXは端末のIPアドレス
#------Sparkセッション開始
localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))	#Rのデータフレームを作成
df <- createDataFrame(localDF)		#Rのデータフレームを元にsparkのデータフレームを作成
printSchema(df)		#sparkデータフレームの構成を表示
showDF(df)		#sparkデータフレームの内容を表示

 エラーが出ずに結果が表示されたら成功。
2. spark.lapplyを使ってみる
 ベクトルの要素を一つずつ取り出して1を加算してリストに渡すという処理を考えます。
 普通のRならこんな感じのことです。

普通のR
v_1<- c(1,3,5,7,9,0,8,6,4,2)
ret<-list()
for (i in 1:length(v_1)){
	ret[i] <- v_1[i] + 1
}
head(ret)		# 結果確認

 これをsparkのspark.lapplyを使ってやってみます。
 これ以降は、Spark Sessionは開始しているものとします。

SparkR
v_1 <- as.list(c(1,3,5,7,9,0,8,6,4,2))
ret <- spark.laply(v_1,function(x){
	spRet <- x + 1
})
head(ret)		# 結果確認

 実行させると、「あれ、なんかSpark遅いぞ」と思いますが、とりあえずエラーが出なければいいのです。
 詳しくは後ほど。
3. VAR分析をしてみる
 Sparkを使ってRパッケージ「vars」のVAR分析をやってみる。
 あくまでも試しということで、無駄にVARを10回繰り返してみます。

普通のR
library(vars)
a <- c(1,3,5,7,9,0,8,6,4,2)
b <- c(2,4,6,8,0,9,7,5,3,1)
lab <- cbind(col1=a,col2=b)
lab2 <- list(lab,lab,lab,lab,lab,lab,lab,lab,lab,lab)
ret <- list()
i <- 0
for (x in lab2){
  i <- i + 1
  varRet <- vars:::VAR(x,p=2)
  ret[i] <- causality(varRet,cause="col1")[[1]][[1]][[1]]
}
head(ret)

sparkでの処理

SparkR
sc <- sparkR.session()
SparkR:::includePackage(sc,vars)  #ワーカで使うパッケージの宣言
a <- c(1,3,5,7,9,0,8,6,4,2)
b <- c(2,4,6,8,0,9,7,5,3,1)
lab <- cbind(col1=a,col2=b)
lab2 <- list(lab,lab,lab,lab,lab,lab,lab,lab,lab,lab)
ret <- spark.lapply(lab2,function(x){
  varRet <- vars:::VAR(x,p=2)
  varRet2 <- causality(varRet,cause="col1")[[1]][[1]][[1]]
})
head(ret)

実行させると、「やっぱりSpark遅いぞ」と思います。
では次に、「けっこうSpark速いぞ」と感じるために、無駄に処理量を増やしてみます。

普通のR
library(vars)
a <- c(1,3,5,7,9,0,8,6,4,2)
a <- c(a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a)  #無駄に増やした
b <- c(2,4,6,8,0,9,7,5,3,1)
b <- c(b,b,b,b,b,b,b,b,b,b,b,b,b,b,b,b,b,b,b,b)  #無駄に増やした
lab <- cbind(col1=a,col2=b)
lab2 <- list(lab,lab,lab,lab,lab,lab,lab,lab,lab,lab)
ret <- list()
i <- 0
for (x in lab2){
  for(cnt in 1:500){
    tmp_p <- vars:::VAR(x,p=2)
  }  #無駄にループしてみた
  i <- i + 1
  varRet <- vars:::VAR(x,p=2)
  ret[i] <- causality(varRet,cause="col1")[[1]][[1]][[1]]
}
head(ret)

sparkでの処理

SparkR
sc <- sparkR.session()
SparkR:::includePackage(sc,vars)  #ワーカで使うパッケージの宣言
a <- c(1,3,5,7,9,0,8,6,4,2)
a <- c(a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a)  #無駄に増やした
b <- c(2,4,6,8,0,9,7,5,3,1)
b <- c(b,b,b,b,b,b,b,b,b,b,b,b,b,b,b,b,b,b,b,b)  #無駄に増やした
lab <- cbind(col1=a,col2=b)
lab2 <- list(lab,lab,lab,lab,lab,lab,lab,lab,lab,lab)
ret <- spark.lapply(lab2,function(x){
  for(cnt in 1:500){
    tmp_p <- vars:::VAR(x,p=2)
  }  #無駄にループしてみた
  varRet <- vars:::VAR(x,p=2)
  varRet2 <- causality(varRet,cause="col1")[[1]][[1]][[1]]
})
head(ret)

ちょっとSparkの方が早くなっているはずです。
処理速度はCPUの能力によりますが、CPUコア数が多い端末であれば、一台でも結構な速度改善を体感できます。

#複数の端末で分散処理
###1 ワーカ端末の設定
マスタ端末の環境設定と同じ様に、ワーカ端末を設定します。
IPアドレスやパスは、ワーカ端末のものに変えてください。
忘れやすいのが、
 ・システム環境設定のPATHにRscript.exeのパスをとおす
 ・Rにパッケージをインストールする
 ・spark-env.shの設定(SPARK_PUBLIC_DNSを自端末のIPアドレスに変えるなど)
なので、これもちゃんと確認します。

###2 ワーカの起動
 マスタ端末でSparkマスタを起動しておきます。(単体での動作確認の「1.マスタの立ち上げ」参照)
 ワーカ端末でコマンドプロンプトを開いて

コマンドプロンプト
c:
cd ¥Apache¥spark
bin/spark-class org.apache.spark.deploy.worker.Worker --ip YYY.YYY.YYY.YYY --port 7100 --webui-port 4001 spark://XXX.XXX.XXX.XXX:7077
rem YYY.YYY.YYY.YYYはワーカ端末のIPアドレス XXX.XXX.XXX.XXXはマスタ端末のIPアドレス

を実行。
しばらく待つと、メッセージがぞろぞろ表示されるのはこれまでどおり。
 「INFO Worker:Successfully refistered with...」と表示されていれば立ち上げ成功。
 Spark WebUI の表示を更新して、Worker IDに端末が追加されていることを確認する。

###3 Sparkを試してみる
マスタ端末のRStudioで、先程の無駄に処理量を増やしたコードを実行してみる。

SparkR
sc <- sparkR.session()
SparkR:::includePackage(sc,vars)  #ワーカで使うパッケージの宣言
a <- c(1,3,5,7,9,0,8,6,4,2)
a <- c(a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a)  #無駄に増やした
b <- c(2,4,6,8,0,9,7,5,3,1)
b <- c(b,b,b,b,b,b,b,b,b,b,b,b,b,b,b,b,b,b,b,b)  #無駄に増やした
lab <- cbind(col1=a,col2=b)
lab2 <- list(lab,lab,lab,lab,lab,lab,lab,lab,lab,lab,lab,lab,lab,lab,lab,lab,lab,lab,lab,lab)  #無駄に増やした
ret <- spark.lapply(lab2,function(x){
  for(cnt in 1:500){
    tmp_p <- vars:::VAR(x,p=2)
  }  #無駄にループしてみた
  varRet <- vars:::VAR(x,p=2)
  varRet2 <- causality(varRet,cause="col1")[[1]][[1]][[1]]
})
head(ret)

コードは全く変えていないけど、さっきより速くなっているはずです。

#Spark lapplyの使い方(多分こんな感じ)
lapplyは,引数として指定されたリストの要素を1件ずつ取り出し、何らかの処理をして、その最後の式の値をリストにして結果に返します。

SparkR
結果のリスト <- spark.lapply(引数のリスト,function(x){・・・何らかの処理・・・})

function(x)以下の { から } までの処理(ジョブ)を各ワーカに分散して処理します。
1件ずつの処理スピードは各ワーカのCPU能力によります。
分散に際しては、マスタから各ワーカにジョブを渡してその結果を回収・集約するためにオーバーヘッドが生じ、その分だけ通常の処理よりも時間が多くかかります。
1件のジョブの処理時間がオーバーヘッドよりも短いような単純なものであれば、Sparkで処理する方が遅くなってしまいます。
逆に、ある程度まとまった処理で、オーバーヘッドの時間よりもジョブ1件の処理時間が大きいものであれば、Sparkの効果が出てきます。

また、lapplyでの各ジョブはワーカに分散して処理されるため、{ から } までで完全に閉じた処理にする必要があります。例えば、

SparkR
i <- 0
ret <- spark.lapply(list_1,function(x){ i <- i + 1  ・・・})

のように,ジョブの外にある変数「i」を書き換えることはできません。別のやり方を考える必要があります。
次のように

Spark
ret <- spark.lapply(list_1,function(x){ i <- 0
                                        i <- i + 1  ・・・})

ジョブの中でなら書き換え可能です。

・・・多分そういうことなんだろうと思います。(いろいろ試した結果で書いてます。違っているところあれば指摘してください。)

#最後に
SparkRは、もっと使われて、もっといろんな情報が日本語で共有されたらいいなと思っています。
いつかRを使う理由に「Sparkが使えて速いから」と答えてみたいですね。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?