Hadoop Streamingという技術を使ってR言語で並列分散処理をやって見たメモです。
Hadoop Streamingとは
Hadoopの分散処理フレームワークMapReduceは基本的にJavaで記述して実行します。でも「Javaやだー」という人も安心、他言語で動かすことができる仕組みが提供されています。
MapReduceフーレムワークでは名前の通りMap処理とReduce処理が順に実行されるわけですが、そのデータのやり取りを標準入出力でやれば何の言語で書いてもいいじゃん!というのがHadoopStreamingです。
ビッグデータとR言語
「ビッグデータ」とか「データマイニング」というキーワードとともにHadoopが登場することがよくあります。巨大データを効率よく処理しようというのがHadoopなのでまぁ当然です。
で、Hadoopと同様にR言語が取り上げられることがあります。データを分析する、という観点だとR言語が強いです。でもR言語って巨大データを扱うには秀でていないです。基本的にメモリー上にデータ読み込んで集計や分析を行うので、ビッグデータだとメモリーがパンクします。
それじゃぁ、HadoopStreamingで分散&R言語で分析したらいいかも、というのが今回の動機です。
定番のワードカウント
ワードカウントの概要だけ。
Is this a pen?
No, this is a Map-Reduce.
のような訳の分からない英単語があった場合に
this,2
a,2
pen,1
・・・
のように単語ごとに出現回数をカウントする、というものです。
Map処理
#!/usr/local/bin/Rscript
con = file(description="stdin",open="r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
str<-unlist(strsplit(line," "))
for(word in str){
cat(sprintf("%s\t%s\r\n", word, 1),sep="")
}
}
close(con)
標準入力から受け取り、標準出力に出力しています。
出力する際はタブ区切りでKeyとValueを指定しますが、今回はKeyを単語にし、Valueは数字の"1"にしています。
そうするとReduceには単語ごとに"1"が流れてくるので、合計すればOKという仕組みです。
Reduce処理
#!/usr/local/bin/Rscript
con = file(description="stdin",open="r")
key<-""; key.old<-""; cnt<-0;
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
str<-unlist(strsplit(line,"\t"))
key<-str[1]
v<-as.numeric(str[2])
if(key!=key.old && cnt!=0){
cat(sprintf("%s\t%s\n", key.old, cnt),sep="")
cnt<-1
}else{
cnt<-cnt+v
}
key.old<-key
}
close(con)
if(cnt!=0){
cat(sprintf("%s\t%s\n", key.old, cnt),sep="")
}
もう少しスマートに書けそうな気もしますが、Reduceはこんな感じです。Map処理の後はソートが走るのでKeyが変わるタイミングで出力しています。
ネット上のサンプルではHashMapや連想配列のようにKey-Valueで情報を保管できる仕組みに入れてカウントしているものも見かけますが、以下の理由からあまり好きじゃないです。
- ソートされた順番が崩れる
- ストリームなデータの処理なのでできるだけデータを溜めたくない
まぁやりたい処理次第ですが。
実行
Hadoop1.2.1で実行した例です。
hadoop jar /usr/share/hadoop/contrib/streaming/hadoop-streaming-1.2.1.jar \
-file map.R \ #読み込むファイルを列挙します
-file reduce.R \
-input /stream/input/input \ #HDFS上の入力パス指定
-output /stream/out/ \ #HDFS上の出力パス指定
-mapper map.R \ #Map処理として動かすプログラム指定
-reducer reduce.R #Reduce処理として動かすプログラム指定
プログラムの動作確認したい場合は以下のようにすると楽です。
cat input | Rscript map.R | sort | Rscript reduce.R
感想
正直、R言語でやるメリットがあまり感じられませんでした。
上記の通り、R言語が本領発揮するのはある程度データを読み込んだ状態で豊富な統計・分析機能を行うような場面なので、MapReduce上で実現しようと思うとそれなりにきっちり考えて作らないといけない印象。
それならデータの分割&前処理はHadoop⇒その後R、で十分かなと。
しかしJavaに比べてテキスト周りの処理などは手軽にできるので、その点は良いですね。
…次HadoopStreamingするならPythonかな。