Edited at

Node.js StreamAPI から逃げちゃダメだ。逃げちゃダメだ。。

More than 1 year has passed since last update.

業務でnode.jsを使ったシステムを保守エンハンスをしていますが。

長い間理解できなかった、重い腰を上げてStreamAPIについて勉強したので、覚え書きをします。

Node.jsの世界で有名な格言として

「Stream を制するものは Node.jsを制す」

と言われております。さあNodeを我が物にしましょう。

※ StreamAPIのバージョンはStream1/Stream2/Stream3とありますが、今回はStream1の話になります。

※ 業務で使っているnode.jsのバージョンがv0.8なので。

※ v0.10系からStream2。v0.12系からStream3になっています。背景違いなどは追ってキャッチアップする予定。


StreamAPIの情報

勉強に用いたソース。


公式Doc


参考記事


Streamとは

データの流れを扱うAPIです。

Node.jsが出る前に存在した概念で、もろもろ調べたが下記問題を解決する概念らしい。


  • 大量データをメモリ上に展開する際に生じる問題

  • 一連のI/O処理の最適化問題

  • コールバック地獄 ※ node.jsに限る

アイディアとしては、データを一気に処理するのではなく、ある単位(破片)で読み込み、

後工程に回して処理するようにしたこと。

メモリ上に展開する読み込みデータも抑えられ、かつI/O待ちをしながらも後続の処理を実行できるので

処理全体のスピードを最適化できるのがStreamAPIのスゴイところです。

要はバケツリレー的な感じです。


Node.js における StreamAPIの実装方法

Node.js本体にStreamインターフェースが用意されております。

もちろん、インターフェースなので必要なイベント/プロパティ/メソッドをStreamAPIの概念を理解して実装する必要があります。

実装に必要なメソッドについて書いていきます。


データの読み込みを行う ReadableStream

全てのストリームはEventEmitterのインスタンスです。

基本的に覚えておくのは下記イベントとメソッド


  • Event:data



    • function (data) {} データ読み込み時に発生するdataイベント。断片的に読み込んだデータを扱います。



  • Event:end



    • function () {} データ読み込み完了時に発生する endイベント。データ読み込み完了時に行いたい処理、リソースの開放などを記述します。



  • Stream.pause()


    • データの読み込みを止める時に呼び出します。



  • Stream.resume()


    • データの読み込みを再開する時に呼び出します。



  • Stream.pipe(writableStream)


    • 自身の読み込みストリームを書き込みストリームに接続する時に呼び出します。Unixの|(パイプ)でデータを次に渡して処理する概念を踏襲している。 (例: $ cat hoge.txt | grep HOGE )




WritableStream

Readable Stream で断片的に読み込まれるデータを応用する時に用いるストリーム。

書き込み可能なストリームと言うらしいが、個人的にはピンときていないので、読み込んだデータを使って応用するものだと解釈している。

基本的に覚えておくのは下記イベントとメソッド


  • Event:drain



    • function () {} 読み込みストリームをpauseしていた状態から、メモリが空いたなどでデータを読み込み可能になった状態を示す。



  • Event:pipe



    • function (src) {} 読み込みストリームsrcが自身のwritableストリームにpipe()された時に発生するイベント。



  • Stream.write(data)


    • 読み込みストリームでdataイベントが発生した時に呼ばれるメソッド。このメソッドでデータを加工します。これ以上読み込みたくない時はfalseを返す。引き続きデータを読み込むときにはtrueを返すのが決まり。




練習問題

実際コードを書いてみないと理解できないので、練習問題を作りました。

ldjson形式の読み込みファイルを用意。


練習1. ReadableStream


問題


paractice1.txt

fs.createReadStream を用いて、sample.ldjson を10byteづつ読み込んで、

dataイベント発生時に、読み込んだ内容を 標準出力(console.log) してください。

Readableストリームを使ってみましょうという問題。


解答例


practice1.js

"use strict";

var fs = require('fs');
var fileName = '100.ldjson';

var fileReaderStream = fs.createReadStream(fileName, {bufferSize: 10});
fileReaderStream.setEncoding('utf8');

var count = 0;
fileReaderStream.on('data', function(data) {
count++;
console.log(count + ': ' + data);
});

fileReaderStream.resume();



出力

断片的にデータが読み込まれることがわかると思います。


練習1.out

1: {"gender":

2: "male", "
3: age": 51,
4: "weapon":
5: "HokoYari"
6: , "power":
7: 222347}
{
...略...


練習2. ReadableStream -> WritableStream


問題


練習2.txt

練習1で作成したStream に1行ずつ改行毎にDataイベントを発生させるWritableなLineStreamを接続(pipe)してください。LineStreamのdataイベント発生時に、読み込んだ内容を 標準出力(console.log) してください。


ReadableストリームとWritableストリームとの連結方法、Writableストリームの使い方を掴みましょう。


解答例


practice2.js

var LineStream = require('./line_stream');

var fileName = 'sample.ldjson'
var fileReaderStream = require('fs').createReadStream(fileName, {bufferSize: 10});
fileReaderStream.setEncoding('utf8');

var lineStream = new LineStream()
var index = 0;
lineStream.on('data', function(data) {
index++;
console.log(index + ': ' + data);
});

fileReaderStream.pipe(lineStream);
lineStream.resume();



line_stream.js

"use strict";

module.exports = LineStream;

var stream = require('stream');
var util = require('util');

function LineStream() {
this.writable = true;
this.readable = true;
this.ended = false;
this.paused = true;
this.encoding = 'utf8';
this.buf = '';
};

util.inherits(LineStream, stream.Stream);

/* readable stream function */
LineStream.prototype.setEncoding = function (encoding) {
this.encoding = encoding;
};

LineStream.prototype.pause = function () {
this.paused = true;
};

LineStream.prototype.resume = function () {
this.paused = false;
/* TODO: 'drain' */
};

/* writable stream function */
LineStream.prototype.write = function (data) {
if (this.ended || this.paused) {
return false;
}
if (Buffer.isBuffer(data)) {
this.buf += data.toString(this.encoding);
} else {
this.buf += data;
}
this.searchLine();
return true;
};

LineStream.prototype.searchLine = function () {
while(true) {
var pos = 0;
var index = this.buf.indexOf('\n');
var line;
if (index > -1) {
line = this.buf.substring(pos, index);
this.emit('data', line);
this.buf = this.buf.substring(index + 1);
continue;
}
break;
}
};

LineStream.prototype.pipe = function (dest) {
this.on('data', function(data) {
dest.write(data);
});
return dest;
};

LineStream.prototype.end = function () {
if (this.buf) {
this.emit('data', this.buf);
}
this.ended = true;
this.destroy();
};

LineStream.prototype.destroy = function () {
this.ended = true;
this.readable = false;
this.writable = false;
this.emit('end');
this.emit('close');
};



出力


練習2.out

1: {"gender": "male", "age": 51, "weapon": "HokoYari", "power": 222347}

2: {"gender": "male", "age": 69, "weapon": "HokoYari", "power": 562426}
3: {"gender": "female", "age": 70, "weapon": "pike", "power": 871983}
4: {"gender": "female", "age": 94, "weapon": "HokoYari", "power": 204004}
... 略 ...


練習3. ReadableStream -> 流入制限したWritableStream


問題


練習3.txt

練習2のLineStreamを改良して、1秒間に1行ずつ`data`イベントを発生するストリームに変更しましょう。


ストリームの肝である、pause()/resume()/drainイベントの一連の流れを掴みましょう。


解答例


practice3.js

"use strict";

var fs = require('fs');
var LineRestrictStream = require('./line_restrict_stream');

var fileName = '100.ldjson';
var fileReaderStream = fs.createReadStream(fileName, {bufferSize: 10});
fileReaderStream.setEncoding('utf8');

var lineRestrictStream = new LineRestrictStream(100);

var count = 0;
lineRestrictStream.on('data', function(data) {
count++;
console.log(count + ' : ' + data);
});

lineRestrictStream.on('drain', function() {
//console.log(' LineRestrictStream Event \'drain\' occurred.')
fileReaderStream.resume();
});

lineRestrictStream.on('pipe', function(src) {
lineRestrictStream.setSrcStream(src);
});

fileReaderStream.pipe(lineRestrictStream);
lineRestrictStream.resume();



line_restrict_stream.js

"use strict";

module.exports = LineRestrictStream;

var stream = require('stream');
var util = require('util');

function LineRestrictStream(interval) {
this.srcStream;
this.writable = true;
this.readable = true;
this.ended = false;
this.paused = true;
this.encoding = 'utf8';
this.bufStr = '';
this.buf = [];

// For Stream Restriction
this.interval = interval || 1000;
this.lastDequeuedAt = 0;
this.hasTimer = false;
};

util.inherits(LineRestrictStream, stream.Stream);

/* readable stream function */
LineRestrictStream.prototype.setEncoding = function (encoding) {
this.encoding = encoding;
};

LineRestrictStream.prototype.pause = function () {
this.paused = true;
};

LineRestrictStream.prototype.resume = function () {
this.paused = false;
this.dequeue();
};

LineRestrictStream.prototype.setSrcStream = function (srcStream) {
this.srcStream = srcStream;
};

/* writable stream function */
LineRestrictStream.prototype.write = function (data) {
if (this.ended || this.paused) {
return false;
}
if (Buffer.isBuffer(data)) {
data = data.toString(this.encoding);
}
this.bufStr += data;
this.searchLine();
if (this.buf.length > 5) {
return false;
}
this.dequeue();
return true;
};

LineRestrictStream.prototype.searchLine = function () {
while(true) {
var pos = 0;
var index = this.bufStr.indexOf('\n');
var line;
if (index > -1) {
line = this.bufStr.substring(pos, index);
this.enqueue(line);
this.bufStr = this.bufStr.substring(index + 1);
continue;
}
break;
}
};

LineRestrictStream.prototype.enqueue = function (line) {
this.buf.push(line);
};

LineRestrictStream.prototype.dequeue = function (isTimerProcess) {
if (!isTimerProcess && this.hasTimer) {
return;
}
if (this.buf.length == 0) {
this.emit('drain');
return;
}
this.startTimer();
};

LineRestrictStream.prototype.startTimer = function () {
var now = Date.now();
var diff = now - this.lastDequeuedAt;

if (diff < this.interval) {
if (!this.hasTimer) {
this.setTimer(this.interval - diff);
}
return;
}
console.log(' \tYou waited ' + diff + 'ms.' + ' (queue_size: ' + this.buf.length +')');
this.lastDequeuedAt = now;
this.emit('data', this.buf.shift());
this.setDequeueTimer(this.interval);
};

LineRestrictStream.prototype.setDequeueTimer = function (time) {
var self = this;
setTimeout(function() {
self.dequeue(true);
this.hasTimer = false;
if (self.buf.length == 0) {
self.emit('drain');
}
}, time);
this.hasTimer = true;
};

LineRestrictStream.prototype.pipe = function (dest) {
this.on('data', function(data) {
dest.write(data);
});
return dest;
};

LineRestrictStream.prototype.end = function () {
if (this.buf) {
this.dequeue();
}
this.ended = true;
this.destroy();
};

LineRestrictStream.prototype.destroy = function () {
this.ended = true;
this.readable = false;
this.writable = false;
this.emit('end');
this.emit('close');
};



出力


練習3.out

    You waited 1434933713380ms. (queue_size: 1)

1 : {"gender": "male", "age": 51, "weapon": "HokoYari", "power": 222347}
You waited 119ms. (queue_size: 6)
2 : {"gender": "male", "age": 69, "weapon": "HokoYari", "power": 562426}
You waited 101ms. (queue_size: 5)
3 : {"gender": "female", "age": 70, "weapon": "pike", "power": 871983}
You waited 101ms. (queue_size: 4)
4 : {"gender": "female", "age": 94, "weapon": "HokoYari", "power": 204004}
You waited 101ms. (queue_size: 3)
5 : {"gender": "male", "age": 74, "weapon": "sword", "power": 549572}
You waited 101ms. (queue_size: 2)
... 略 ...


まとめ

当初想定していたより、記述量が多い印象を持った。Stream2/Stream3ではどの程度改善されたのかは追って調べたい。

Node.jsは随所にstreamを用いた実装がされているので、この少しnode.jsと仲良くなれた気がします。

理解に相当時間かかってしまった。。ふぅー。