はじめに
ElasticSearch(+Kibana) はデータの見える化、特にログ解析の分野では絶大な人気を得ています。
しかし、ログだけの可視化ではモッタイナイ。
MongoDBに保存される全データも、ElasticSearch(+Kibana) で小粋にビジュアライゼーションしたいですよね。
今まではRiverプラグインで Mongo-ES(Kibana)連携していた人も多いかもしれませんが、残念ながらElasticSearch2.0からは、当該プラグインは廃止されてしまいました。
そこで、
- MongoDBの特徴を活かした
- もっと簡単、シンプルで
- リアルタイムに、無駄なく動作する
MongoDB - ElasticSearch連携を、お伝えしていきます。
MongoDB
決め手は OpLog
OpLog
OracleのREDOログ、MySQLのバイナリログと同じように、MongoDBにもOpLogというトランザクションログがあります。
他と違って、MongoDBのOpLogは「操作によって引き起こされたデータの更新(差分)履歴」が記録されます。
つまり、いくらデータを操作しても、実際に更新(差分)が発生した箇所だけが、OpLogに記録されるのです。
これが、抜群に使いやすい!
OpLog 設定
こんなに便利なOpLogですが、インストール直後はOFFになっています。レプリケーション用なので、レプリカセットを設定する必要がありますが、1台構成でも大丈夫です。
設定手順
- mongod.conf を編集。レプリカの名前はrepltestとしました。
replSet=repltest
- Mongoシェルから、レプリカの設定を入れます。
$ mongo
> config = {_id: 'repltest', members: [
{_id: 0, host: '127.0.0.1:27017'}]
};
> rs.initiate(config);
> exit
- 再起動。上手く行けば Mongoシェルのプロンプトが変わります。
$ mongo
repltest:PRIMARY>
- OpLog はlocalデータベースにあるので、確認しておきます。
repltest:PRIMARY> use local
repltest:PRIMARY> show collections
me
oplog.rs ← ありました
startup_log
system.indexes
system.replset
ローカルにMongoDBとElasticSearchが入っていれば
コード一式
https://github.com/exabugs/mongo-es
node index.js
ポート27017で稼働するMongoDBの全コレクションを監視し、更新があれば、ポート9200で稼働するElasticSearchに通知します。
インデックス名は「MongoのDB名.Mongoのコレクション名」です。
重要なポイント
MongoDB の OpLog監視部分が以下になります。
- tailable オプションでカーソルを開きます。
- 接続が切れても、err.tailable が true なら、引き続き監視可能です。
- 本当に切れた場合は、find からやり直します。
- 更新データが無い場合の待ち時間は1秒にしています。
function loop(oplog, ts, callback) {
var condition = {ts: {$gt: ts}};
var option = {tailable: true};
oplog.find(condition, option, function (err, cursor) {
function processItem(err, op) {
if (op) {
// 更新処理
update(oplog.s.db, op, function (err) {
if (err) {
log(err.message);
setTimeout(function () {
loop(oplog, ts, callback);
}, WAIT);
} else {
log("Update ElasticSearch");
ts = op.ts;
fs.writeFileSync(posfile, ts); // どこまで処理したか記憶する
setImmediate(function () {
cursor.next(processItem);
});
}
});
} else if (err && err.tailable) {
// tailable=true なら引き続き監視可能
log(err.message);
setTimeout(function () {
cursor.next(processItem);
}, WAIT);
} else {
// 本当に切れた場合(MongoDB再起動等)は、findからやり直し
log(err.message);
setTimeout(function () {
loop(oplog, ts, callback);
}, WAIT);
}
}
cursor.next(processItem);
});
}
ElasticSearch を更新する部分が以下になります。
- 簡単のために、フィールド単位での差分更新はしていません。
- MongoDBの1コレクションはElasticSearchの1インデックスに対応。
- typeは一律で'default'という名前にしています。
function update(db, op, callback) {
var tags = op.ns.split(/^([^.]+)\./);
var coll = db.db(tags[1]).collection(tags[2]);
var _id = op.o._id || op.o2._id;
async.waterfall([
function (next) {
if (op.op === "d") {
next(null, _id, "DELETE", null);
} else {
coll.findOne({_id: _id}, function (err, obj) {
obj && (delete obj._id);
next(err, _id, "PUT", obj);
});
}
},
function (_id, method, obj, next) {
var options = {
uri: [esearch_url, op.ns, "default", _id].join("/"),
method: method,
json: obj
};
request(options, function (err) {
next(err);
});
}
], function (err) {
callback(err);
});
}
まとめ
- MongoDB の OpLog は、変更の差分だけを伝えてくれるので、処理に無駄がありません。
- OpLog を使って、MongoDB の全コレクションを ElasticSearch に、更新時リアルタイムに反映させています。
- ログだけでなく、データそのものがカジュアルに可視化できるのは便利で気持ち良いです。
- OpLog のフックは、トリガー、または、Outgoing-Webhook として応用できます。
注意
- OpLog は Capped-Collection (サイズが固定) です。常に先端を追っていれば問題ありませんが、何日も前のログは、なくなっている可能性があります。
- 本プログラムでは、OpLogの処理ポインタをローカルファイルに記憶するようにしています。nodeやMongoDBの障害/再起動には耐えますが、マスタが切り替わった場合の処理を省いています。マスタを判断(localDBに接続できること)し、処理ポインタはネットワーク上のファイル、あるいは、MongoDBのコレクションに記憶する必要があります。