34
39

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

素人のnode.js デプロイ 大量データの逐次処理

Last updated at Posted at 2014-04-23

node.jsのコールバック地獄が楽しく感じられるようになりました

#やりたいこと
テキストファイルを改行で区切り MongoDBにinsertする

簡単ですよね。行区切りには readline使います

var mongo = require('mongodb'),
    fs = require('fs'),
    readline = require('readline');

    rs = fs.ReadStream('test.txt');
    rl = readline.createInterface({'input': rs, 'output': {}});

    rl.on( 'line', function (line) {    
      var json ={};
      json['value'] = line.trim();
      json['hoge'] = 'hoge';

      mongo.Db.connect( "mongodb://localhost/test", function (err, db) {
        db.collection( "table", function(err, collection) {
          collection.insert( json, {safe:true}, function(err, result) {
          console.log( "insert:" + line);
            db.close();
          });
      });
    });

  });
  rl.resume();

わりとイージーですよね。
fs を使いファイルから読み込み
readline を使いストリーム入力する
1行ごとに readline::on が呼ばれるので 取得したデータをjsonにし
MongoDBに接続し
collectionを取得し
insertする

以下 全ての行が終わるまで繰り返す。

はい、うまくいきましたね!
__ データ件数が少ないときは! __

#データが多いとエラーになる
10万件のデータを流したところ、ログをみるかぎり
DBのコネクト処理を行わず、一気に10万行のレコードを処理し
タスクキューに詰んでるようでした

__ そりゃパンクしますよね!! __

要は、行読み込みを非同期で行わず
DBへのinsertまでを逐次処理すれば
nodeの良さを潰してますが、希望の処理が出来そうです

#逐次処理どうする?
readlineはソースを追っかけたところ、非同期固定のようなので
使用をあきらめ、ファイルを全部読み込み 改行でsplitし
foreachで回す事にします

var mongo = require('mongodb'),
    fs = require('fs');

    fs.readFileSync('test.txt').toString().split('\n').forEach(function (line) {
      var json ={};
      json['value'] = line.trim();
      json['hoge'] = 'hoge';

      mongo.Db.connect( "mongodb://localhost/test", function (err, db) {
        db.collection( "table", function(err, collection) {
          collection.insert( json, {safe:true}, function(err, result) {
          console.log( "insert:" + line);
            db.close();
          });
      });
    });

  });

__ しかしダメです!! __

どうも、readFileSyncはファイルの読み込みは同期だが その後のforEachが非同期なので
先ほどと同じ事になり
しかも ストリーミングを使ってないのでメモり圧迫も多いダメダメです

#asyncってのがあるよ
名前からするに、非同期処理のライブラリだと思ってたんですが
非同期も同期もサポートするライブラリのようです!! すごいじゃん!!
ってことで これの forEachSeries を使います
名前からして ばっちりですね!

var mongo = require('mongodb'),
    fs = require('fs');

  async.forEachSeries( fs.readFileSync('test.txt').toString().split('\n'), function (line, cb) {
      var json ={};
      json['value'] = line.trim();
      json['hoge'] = 'hoge';

      mongo.Db.connect( "mongodb://localhost/test", function (err, db) {
        db.collection( "table", function(err, collection) {
          collection.insert( json, {safe:true}, function(err, result) {
          console.log( "insert:" + line);
            db.close();
            cb();
          });
      });
    });

  });

この async::forEachSeries ですが、コールバックの2番目のパラメータ(ここではcb)を呼ぶまでブロックするので
1件insertが終わりDBをcloseした後 cb(); をすることで 逐次処理が出来
タスクがあふれる事がなくなりそうです

__ 前より長持ちしたがダメだった・・・ __

#Why?
仮説ですが db.close() し、cb();を呼び 次のデータ処理を行いますが
db.close()が非同期?ガーベージコレクター????
ちょっと 理由はわかってませんが
db.close() と コールバックを呼ぶ間に sleepもどきを挟んだら
なんとか動きました
(このあたり また調査したいですが 知っている方 教えてください!!)

###最終ソース

var mongo = require('mongodb'),
    fs = require('fs');

  function sleep(milliSeconds) {
    var startTime = new Date().getTime();
    while (new Date().getTime() < startTime + milliSeconds);
  }

  async.forEachSeries( fs.readFileSync('test.txt').toString().split('\n'), function (line, cb) {
    var json ={};
    json['value'] = line.trim();
    json['hoge'] = 'hoge';

    mongo.Db.connect( "mongodb://localhost/test", function (err, db) {
      db.collection( "table", function(err, collection) {
        collection.insert( json, {safe:true}, function(err, result) {
        console.log( "insert:" + line);
          db.close();
          sleep(50);
          cb();
        });
      });
    })
  });

#ファイナルアンサー?
shuheiさんの指摘より、DBへのconnect closeは毎回必要なのか。
そう、nodeの場合、forEachは非同期で走るため、毎回別のセッションなので
connectと closeは必要なのですが
最終形態は、forEachSeriesを使い 同期処理になってます

・・・という事は connect closeは1回でよい
って事で ファイナルアンサー!

そして、insert時にエラーが発生した場合、callbackにerrを渡すとその時点で中断してくれるらしい
凄い便利!

あと closeのタイミングは forEachSeriesの第三引数に ループ終了時の処理を書けるので
そこで行うと

すごい! nodeがどんどん実用的に! 皆様ありがとうございます!

var mongo = require('mongodb'),
    fs = require('fs');
  
  mongo.Db.connect( "mongodb://localhost/test", function (err, db) {
    db.collection( "table", function(err, collection) {
      async.forEachSeries( fs.readFileSync('test.txt').toString().split('\n'), function (line, cb) {
      var json ={};
      json['value'] = line.trim();
      json['hoge'] = 'hoge';

        collection.insert( json, {safe:true}, function(err, result) {      
          console.log( "insert:" + line);
          cb(err);
        });
      }, 
        function(){db.close();
      });
    });
  });

Sleepも不要、コネクションは1回張ればOK
ソースもシンプルになり
30分かかったinsertも 10秒程でおわるようになりました

願わくは readlineに同期メソッド欲しい(あるかもしれないからもう一度しらべてみよう)
本当にありがとうございました。

34
39
8

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
34
39

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?