ストリームを行ごとに処理~readline編~

  • 33
    Like
  • 0
    Comment
More than 1 year has passed since last update.

ストリームを行ごとに処理~Readline編~

はじめに

本投稿は、自分がログファイルなどのでかい容量のファイルにおいて、一定行ごとに処理を上手く実装できないかいろいろな方法を模索している最中のメモ帳みたいなものです。
グダグダですが参考になれば幸いです。
一応Readline、Stream2のReadable・Transformについて調査する予定です。

readlineについて

ストリームを行ごとに読み込むことができるモジュール
主な使用用途としてターミナルからの入力を想定していそうです。
詳細は下記マニュアル参照
http://nodejs.org/api/readline.html

1行ごとに処理

基本的な例として、ストリームを1行ごとに読み込み、読み込んだ行ごとに処理をしていく例を作ってみました。

例1:行を逆順にして出力

main.js
var fs = require('fs');
var rs = fs.createReadStream('./hoge.txt');
var readline = require('readline');

var rl = readline.createInterface(rs, {});

rl.on('line', function(line) {
  // 受け取ったlineを逆順にして出力
  console.log(line.split('').revers().join(''));
});
hoge.txt
abcde
fghij
klmno
pqrst
uvwxy
z

実行結果

edcba
jihgf
tsrqp
yxwvu
z

説明

5行目の第一引数はreadStream,第二引数にwriteStreamを渡す必要があり、今回は読み込みのみのため第二引数には空オブジェクトを渡しています。
7行目でlineイベントをセットしており、lineイベントは['\n']を見つけたとき発火し、その行を引数として取得しています。
今回は受け取った文字列を逆順にしてコンソールに出力する関数をlineイベントに登録しています。
(lineイベントを登録した段階で初めてstreamの読み込みが始まるため、すぐ出力されます)

複数行ごとに処理

1行ごとの処理なら、上記のlineイベントを適切に設定することで上手くかけそうでした。
しかし、個人的にやりたいことは一定行ごとにまとめてDBに保存するなりなんなり処理することだったため、ストリームを指定行ごとに取得できるような例も作ってみました。

例2:複数行ごとに処理

linereader.js
var readline = require('readline');

function Linereader(stream) {
  var self = this;
  self.rl = readline.createInterface({
    input: stream,
    output: {},
  });
  self.list = [];
}

module.exports = Linereader;

/**
 * @param {Number} unit // unit行ごとに取得する
 * @param {Function} fn // unit行取得した際に実行する関数
 * @param {Function} callback // ストリームの読み込みが終わった際のコールバック
 */
Linereader.prototype.forEach = function(unit, fn, callback) {
  var self = this;

  self.rl.on('line', function(line) {
    self.list.push(line);
    if (self.list.length >= unit) {
      fn(self.list);
      self.list = [];
    }
  });
  self.rl.on('close', function(line) {
    if (self.list.length) {
      fn(self.list);
    }
    callback();
  });
};
main.js
var fs = require('fs');
var rs = fs.createReadStream('./hoge.txt');
var Linereader = require('./linereader');

var linereader = new Linereader(rs);

linereader.forEach(5, function(list) {
  console.log(list);
}, function() {
  console.log('end');
});

実行結果

[ 'abcde', 'fghij', 'klmno', 'pqrst', 'uvwxy' ]
[ 'z' ]
end

説明

上記のプログラムは、hoge.txtを5行(指定行)ごとの配列として取得し、コンソールに出力しています。
また、全て取得し終わった際にendを出力させています。
linereader.forEachの第二引数に処理を書くことで、複数行ごとに処理を実行することができます。
また、第三引数に処理をかくと、ストリーム読み込み後に処理を実行することができます。

検討

例2のプログラムで、自分がしたかった複数行ごとに処理を挟むことができるようになりました。
ただ、このままだとlinereader.forEachの第二引数に非同期の処理を入れた場合、処理を待たずに次の処理が開始されてしまいます。
自分的には大きなファイルを複数行ごとに対してDBにどんどん入れていくような処理がしたいため、callbackできるようにできないか調べてみたのですが、Readlineのソース( https://github.com/joyent/node/blob/master/lib/readline.js )を確認したところ、以下の2点の理由からReadlineを使うことを断念しました。
・冒頭でも述べたようにそもそもReadlineが端末からの入力などのようなものを想定して組まれていること
・改行ごとにlineイベントをcallbackではなくfor文で回して発火しているため、非同期にできないこと

以上から、次は非同期に対応できるようStream2のReadableを用いて、Bufferから適切に行を取得できるようなプログラムについて検討したいと思います。