Spark 1.4.0 から SparkR が正式採用されました。
さっそくやってみたという方からの報告があります。
SparkR の特徴として
- 主に DataFrame を扱う用
- RDD を扱う関数は隠蔽されている
-
magrittr
との相性が良い -
dplyr
ライクなデータハンドリング - しかし、NSE が使えない
という感じ。
インストールはめっちゃ簡単で、バイナリをダウンロード・解凍して適当なところ(ホームの下とか)に置き、bin/sparkR
を叩くだけで起動できます。
パスを通せば RStudio から使うこともできます。
私は同じサーバに RStudio Server を立てて実行しています。
Quick Start
とりあえず動くことを確認するために、Spark のホームページにある Quick Start を SparkR で実行してみます。
SparkR の RDD 操作関数はエクスポートされていないため、SparkR:::textFile()
のように、:::
を使って呼び出す必要があります。
まずは、RStudio で動かすために、パスを通して Spark を起動します。
.Rprofile
に書いておけば、R を起動した段階でこの状態にできます。
# Initialize for RStudio --------------------------------------------------
Sys.setenv(SPARK_HOME="/home/***/bin/spark")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master="local")
Basics
README.md
を読み込んで、行数を数えます。
# Basics ------------------------------------------------------------------
library(magrittr)
file <- file.path(Sys.getenv("SPARK_HOME"), "README.md")
textFile <- sc %>% SparkR:::textFile(file)
textFile %>% count
[1] 98
最初の行を取得します。
textFile %>% first
[1] "# Apache Spark"
Spark という文字列を含む行のみを抽出し、行数を数えます。
lineWithSpark <- textFile %>%
SparkR:::filterRDD(function(line) grepl("Spark", line))
lineWithSpark %>% count
[1] 19
More on RDD Operations
単語数の最も多い行の単語数を計算します。
# More on RDD Operations --------------------------------------------------
textFile %>%
SparkR:::map(function(line) length(strsplit(line, " ")[[1]])) %>%
SparkR:::reduce(function(a, b) ifelse(a > b, a, b))
[1] 14
上と同じことを、max
関数を用いて行います。
textFile %>%
SparkR:::map(function(line) length(strsplit(line, " ")[[1]])) %>%
SparkR:::reduce(max)
[1] 14
文章中の単語を数えます。
結果はそのままでは見にくいため、データフレームにしています。
wordCounts <- textFile %>%
SparkR:::flatMap(function(line) strsplit(line, " ")[[1]]) %>%
SparkR:::map(function(word) list(word, 1)) %>%
SparkR:::reduceByKey(function(a, b) a + b, 1)
# wordCounts %>% collect # この結果は R では見にくい
wordCounts %>% collect %>%
Map(function(x) data.frame(word=x[[1]], count=x[[2]]), .) %>%
Reduce(rbind, .) %>%
head
word count
1 programs 2
2 online 1
3 Thriftserver 1
4 against 1
5 Alternatively, 1
6 Running 1
Spark の停止は
sparkR.stop()
Lambdarize
とりあえず Quick Start の実行ができました。
しかし、上記のコードは function
をいっぱい書かなくてはならないので疲れますね。
そんなときは、lambdaR
パッケージを使って lambdarize した関数を作ることで、ラムダ式を使った簡潔な書き方ができるようになります。
# Lambdarize --------------------------------------------------------------
# devtools::install_github("hoxo-m/lambdaR") # インストール
library(lambdaR)
lambdarize <- function(rdd_func) function(rdd, ...) rdd_func(rdd, lambda(...))
filterRDD_ <- lambdarize(SparkR:::filterRDD)
map_ <- lambdarize(SparkR:::map)
reduce_ <- lambdarize(SparkR:::reduce)
flatMap_ <- lambdarize(SparkR:::flatMap)
reduceByKey_ <- function(rdd, ..., numPartitions) SparkR:::reduceByKey(rdd, lambda(...), numPartitions)
# Basics ------------------------------------------------------------------
library(magrittr)
file <- file.path(Sys.getenv("SPARK_HOME"), "README.md")
textFile <- sc %>% SparkR:::textFile(file)
textFile %>% count
textFile %>% first
lineWithSpark <- textFile %>%
filterRDD_(line: grepl("Spark", line))
lineWithSpark %>% count
# More on RDD Operations --------------------------------------------------
textFile %>%
map_(line: length(strsplit(line, " ")[[1]])) %>%
reduce_(a, b: ifelse(a > b, a, b))
textFile %>%
map_(line: length(strsplit(line, " ")[[1]])) %>%
reduce_(max)
wordCounts <- textFile %>%
flatMap_(line: strsplit(line, " ")[[1]]) %>%
map_(word: list(word, 1)) %>%
reduceByKey_(a, b: a + b, numPartitions=1)
wordCounts %>% collect %>%
Map_(x: data.frame(word=x[[1]], count=x[[2]])) %>%
Reduce_(rbind) %>%
head
めっちゃすっきりしました。
lambdaR
パッケージについては、下記記事をご参照ください。
Enjoy!