Edited at
MongoDBDay 21

MongoでMapReduceする

More than 5 years have passed since last update.

この記事はMongoDB Advent Calender2013の21日目です。

MongoDBで手軽にMapReduceする方法について書かせていただきます。


MongoDBでMapReduce

世間的にMongoでM/Rするのは情弱、世間知らず、自殺行為などと色々disられてはおりますが、やはりスキーマレスで好きなデータを突っ込んでおいて、あとで集計をかけるというお手軽さからいうとMongoのM/Rも充分選択肢としてありだと思います。


前準備

実際に自分が今やっているプロジェクトのうちの1つに、apacheのアクセスログをfluentd経由でMongoに書き出しているものがあります。

ちなみにデフォルトのfluentdのプラグインだとpathが1フィールドに登録されてM/Rしずらいので、クエリストリングスをパースして1クエリを1フィールドに入れるout_exec_filterを自作しています。


parser.rb

#!/usr/lib64/fluent/ruby/bin/ruby

require 'json'
require 'uri'
require 'cgi'
require 'msgpack'

STDOUT.sync = true
while line = STDIN.gets
parsed_hash = {}
line_to_json = JSON.parse(line)
line_to_json.each do |key,l|
parsed_hash[key] = l
if key == "path"
uri = URI.split(URI.escape(l))
query_string = CGI::parse(uri[7])
query_string.map{ |k,v|
parsed_hash[k] = v[0]
}
end
end
puts parsed_hash.to_msgpack
end


ちなみにこちらは有名なperl版パクリrubyに書き換えた版です。

正常に動くと、mongoのコレクションにこのようにログが溜まり始めます(今回はaccess_logsコレクションにApacheのログを溜め込んでいます)。


access_logs

> db.access_logs.find()

{ "_id" : ObjectId("50f666eb0c27772733000001"), "host" : "xxx.xxx.xxx.xxx", "user" : "-", "method" : "GET", "path" : "/front?medium_id=1&site_id=1&guid=ON", "site_id" : "1", "guid" : "ON", "code" : "200", "size" : "11", "referer" : "-", "agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.8; rv:18.0) Gecko/20100101 Firefox/18.0", "dcmguid" : "-", "subno" : "-", "jpone_uid" : "-",, "medium_id" : "1", "response_time" : "0.014", "time" : ISODate("2013-01-16T08:37:12Z") }
{ "_id" : ObjectId("50f666eb0c27772733000002"), "host" : "xxx.xxx.xxx.xxx", "user" : "-", "method" : "GET", "path" : "/front.js?medium_id=1&site_id=1&guid=ON", "site_id" : "1", "guid" : "ON", "code" : "200", "size" : "341", "referer" : "-", "agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.8; rv:18.0) Gecko/20100101 Firefox/18.0", "dcmguid" : "-", "subno" : "-", "jpone_uid" : "-", "medium_id" : "1", "response_time" : "0.021", "time" : ISODate("2013-01-16T08:37:16Z") }
{ "_id" : ObjectId("50f66b720c27772733000003"), "host" : "xxx.xxx.xxx.xxx", "user" : "-", "method" : "GET", "path" : "/front.js?medium_id=3&site_id=2", "site_id" : "2", "code" : "200", "size" : "343", "referer" : "-", "agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.8; rv:18.0) Gecko/20100101 Firefox/18.0", "dcmguid" : "-", "subno" : "-", "jpone_uid" : "-", "medium_id" : "3", "response_time" : "0.016", "time" : ISODate("2013-01-16T08:57:14Z") }

/frontのクエリストリングが別々のカラムとして登録されているのが確認できます。


MapReduceしてみる

データを扱いやすくMongoDBに格納したところで、実際にMapReduceしてみたいと思います。

MapReduceはmongo shellから、MapReduce関数を使用して行います。


map_reduce

> db.access_logs.mapReduce(

"function() {
emit(this.site_id, {count:1});
}
",
"function(key,values) {
var result = {count:0};
values.forEach(function(v){result.count += v.count;});
return result;
}
",
{query:{medium_id:"10"},out:{inline:1}})

実行結果


result

{

"results" : [
{
"_id" : "1",
"value" : {
"count" : 4
}
},
{
"_id" : "4",
"value" : {
"count" : 646513"
}
}
],
"timeMillis" : 26719,
"counts" : {
"input" : 646517,
"emit" : 646517,
"reduce" : 6467,
"output" : 2
},
"ok" : 1,
}

アクセスログのうち、medium_id="10"かつsite_id="1"のレコードが4件、site_id="4"のレコードが646513件見つかりました。

MapReduceに渡した引数のうち、第一引数がMap関数と呼ばれるものです。

Reduce関数で使用するためのkeyとvaludeをemit関数を使用して作成します。


map

function() {

emit(this.site_id, {count:1});
}

第二引数にはReduce処理を行う関数を渡します。

Map関数で準備されたKey/Valueを集計する処理です。


reduce

function(key,values) {

var result = {count:0};
values.forEach(function(v){result.count += v.count;});
return result;
}

同一のkey値(site_id )を持ったドキュメントが加算されます。

第三引数はオプション引数を渡します。


option

{query:{medium_id:"10"},out:{inline:1}}


今回は出力結果をmongo shellにインライン出力したので、{out:{inline:1}}を指定しました。

また、{query:{medium_id:"10"}}で、予めMapするデータをmedium_id="10"でフィルタリングしています。

さらに詳しいことを知りたい方は公式ドキュメントをご参考ください。


状態を見る

このように簡単にMapReduceできるのがMongoDBの強みですが、MongoのMapReduceはクラスタリングできなかったり、負荷が重かったりで、一回実行するとアプリケーションに影響をおよぼす可能性があります。

MapReduce走らせたけど、応答なくて怖ええええええみたいな状況に陥ったら、currentOpで現在の状況を確認しましょう。


currentOp

> db.currentOp()

{
"inprog" : [
{
"opid" : 106542343,
"active" : true,
"secs_running" : 1,
"op" : "query",
"ns" : "hogehoge.access_logs",
"query" : {
"mapreduce" : "access_logs",
"map" : "function() { emit(this.site_id, {count:1}); }",
"reduce" : "function(key,values) { var result = {count:0}; values.forEach(function(v){result.count += v.count;}); return result; }",
"query" : {
"medium_id" : "10"
},
"out" : {
"inline" : 1
}
},
"client" : "127.0.0.1:56233",
"desc" : "conn1776",
"threadId" : "0x7fdb5f7e5700",
"connectionId" : 1776,
"locks" : {
"^" : "r",
"^ranking_production" : "R"
},
"waitingForLock" : false,
"msg" : "m/r: (1/3) emit phase 23366/646517 3%",
"progress" : {
"done" : 23367,
"total" : 646517
},
"numYields" : 234,
"lockStats" : {
"timeLockedMicros" : {
"r" : NumberLong(2315600),
"w" : NumberLong(0)
},
"timeAcquiringMicros" : {
"r" : NumberLong(1062271),
"w" : NumberLong(0)
}
}
}
]
}

currentOp()の結果のうち、msgprogress でMapReduceの進行状況が確認できます。

上記の場合


msg

            "msg" : "m/r: (1/3) emit phase 23366/646517 3%",

"progress" : {
"done" : 23367,
"total" : 646517
},

ということなのでこのクエリは現在emitフェーズで、M/Rのまだ1/3で、全データ(646517件)のうち、3%のMapが終わってますよという意味になります。

これでいつまでもパーセンテージが進まない、時間がかかりそうな場合はkillOp(opid)でオペレーションを殺してしまいましょう。


killOp

> db.killOp(106542343)


ところでこのmsg、いつやっても"m/r: (1/3) emit phase"ばかりで、他のメッセージを見たことがないんですが、果たしてどんなメッセージがあるのか、ソースを調べてみました。


mr.cpp

"m/r: merge post processing"

"m/r: reduce post processing"
"m/r: merge sort and reduce"
"m/r: (1/3) emit phase"
"m/r: (2/3) final reduce in memory",
"m/r: (3/3) final reduce to collection"

重いreduce処理なんかを流すと2/3も計測できるんでしょうか。


さいごに

いかがでしたでしょうか。MongoでMapReduce。生き恥をさらす覚悟で書きましたが、皆さん楽しんでいただけましたでしょうか。

今後ともMongoDBをよろしくお願い致します。