node.jsのコールバック地獄が楽しく感じられるようになりました
#やりたいこと
テキストファイルを改行で区切り MongoDBにinsertする
簡単ですよね。行区切りには readline使います
var mongo = require('mongodb'),
fs = require('fs'),
readline = require('readline');
rs = fs.ReadStream('test.txt');
rl = readline.createInterface({'input': rs, 'output': {}});
rl.on( 'line', function (line) {
var json ={};
json['value'] = line.trim();
json['hoge'] = 'hoge';
mongo.Db.connect( "mongodb://localhost/test", function (err, db) {
db.collection( "table", function(err, collection) {
collection.insert( json, {safe:true}, function(err, result) {
console.log( "insert:" + line);
db.close();
});
});
});
});
rl.resume();
わりとイージーですよね。
fs を使いファイルから読み込み
readline を使いストリーム入力する
1行ごとに readline::on が呼ばれるので 取得したデータをjsonにし
MongoDBに接続し
collectionを取得し
insertする
以下 全ての行が終わるまで繰り返す。
はい、うまくいきましたね!
__ データ件数が少ないときは! __
#データが多いとエラーになる
10万件のデータを流したところ、ログをみるかぎり
DBのコネクト処理を行わず、一気に10万行のレコードを処理し
タスクキューに詰んでるようでした
__ そりゃパンクしますよね!! __
要は、行読み込みを非同期で行わず
DBへのinsertまでを逐次処理すれば
nodeの良さを潰してますが、希望の処理が出来そうです
#逐次処理どうする?
readlineはソースを追っかけたところ、非同期固定のようなので
使用をあきらめ、ファイルを全部読み込み 改行でsplitし
foreachで回す事にします
var mongo = require('mongodb'),
fs = require('fs');
fs.readFileSync('test.txt').toString().split('\n').forEach(function (line) {
var json ={};
json['value'] = line.trim();
json['hoge'] = 'hoge';
mongo.Db.connect( "mongodb://localhost/test", function (err, db) {
db.collection( "table", function(err, collection) {
collection.insert( json, {safe:true}, function(err, result) {
console.log( "insert:" + line);
db.close();
});
});
});
});
__ しかしダメです!! __
どうも、readFileSyncはファイルの読み込みは同期だが その後のforEachが非同期なので
先ほどと同じ事になり
しかも ストリーミングを使ってないのでメモり圧迫も多いダメダメです
#asyncってのがあるよ
名前からするに、非同期処理のライブラリだと思ってたんですが
非同期も同期もサポートするライブラリのようです!! すごいじゃん!!
ってことで これの forEachSeries を使います
名前からして ばっちりですね!
var mongo = require('mongodb'),
fs = require('fs');
async.forEachSeries( fs.readFileSync('test.txt').toString().split('\n'), function (line, cb) {
var json ={};
json['value'] = line.trim();
json['hoge'] = 'hoge';
mongo.Db.connect( "mongodb://localhost/test", function (err, db) {
db.collection( "table", function(err, collection) {
collection.insert( json, {safe:true}, function(err, result) {
console.log( "insert:" + line);
db.close();
cb();
});
});
});
});
この async::forEachSeries ですが、コールバックの2番目のパラメータ(ここではcb)を呼ぶまでブロックするので
1件insertが終わりDBをcloseした後 cb(); をすることで 逐次処理が出来
タスクがあふれる事がなくなりそうです
__ 前より長持ちしたがダメだった・・・ __
#Why?
仮説ですが db.close() し、cb();を呼び 次のデータ処理を行いますが
db.close()が非同期?ガーベージコレクター????
ちょっと 理由はわかってませんが
db.close() と コールバックを呼ぶ間に sleepもどきを挟んだら
なんとか動きました
(このあたり また調査したいですが 知っている方 教えてください!!)
###最終ソース
var mongo = require('mongodb'),
fs = require('fs');
function sleep(milliSeconds) {
var startTime = new Date().getTime();
while (new Date().getTime() < startTime + milliSeconds);
}
async.forEachSeries( fs.readFileSync('test.txt').toString().split('\n'), function (line, cb) {
var json ={};
json['value'] = line.trim();
json['hoge'] = 'hoge';
mongo.Db.connect( "mongodb://localhost/test", function (err, db) {
db.collection( "table", function(err, collection) {
collection.insert( json, {safe:true}, function(err, result) {
console.log( "insert:" + line);
db.close();
sleep(50);
cb();
});
});
})
});
#ファイナルアンサー?
shuheiさんの指摘より、DBへのconnect closeは毎回必要なのか。
そう、nodeの場合、forEachは非同期で走るため、毎回別のセッションなので
connectと closeは必要なのですが
最終形態は、forEachSeriesを使い 同期処理になってます
・・・という事は connect closeは1回でよい
って事で ファイナルアンサー!
そして、insert時にエラーが発生した場合、callbackにerrを渡すとその時点で中断してくれるらしい
凄い便利!
あと closeのタイミングは forEachSeriesの第三引数に ループ終了時の処理を書けるので
そこで行うと
すごい! nodeがどんどん実用的に! 皆様ありがとうございます!
var mongo = require('mongodb'),
fs = require('fs');
mongo.Db.connect( "mongodb://localhost/test", function (err, db) {
db.collection( "table", function(err, collection) {
async.forEachSeries( fs.readFileSync('test.txt').toString().split('\n'), function (line, cb) {
var json ={};
json['value'] = line.trim();
json['hoge'] = 'hoge';
collection.insert( json, {safe:true}, function(err, result) {
console.log( "insert:" + line);
cb(err);
});
},
function(){db.close();
});
});
});
Sleepも不要、コネクションは1回張ればOK
ソースもシンプルになり
30分かかったinsertも 10秒程でおわるようになりました
願わくは readlineに同期メソッド欲しい(あるかもしれないからもう一度しらべてみよう)
本当にありがとうございました。