業務でnode.jsを使ったシステムを保守エンハンスをしていますが。
長い間理解できなかった、重い腰を上げてStreamAPIについて勉強したので、覚え書きをします。
Node.jsの世界で有名な格言として
「Stream を制するものは Node.jsを制す」
と言われております。さあNodeを我が物にしましょう。
※ StreamAPIのバージョンはStream1/Stream2/Stream3とありますが、今回はStream1の話になります。
※ 業務で使っているnode.jsのバージョンがv0.8なので。
※ v0.10系からStream2。v0.12系からStream3になっています。背景違いなどは追ってキャッチアップする予定。
StreamAPIの情報
勉強に用いたソース。
公式Doc
- Stream - Node.js v0.8.26 マニュアル & ドキュメンテーション
-
Events - Node.js v0.8.26 マニュアル & ドキュメンテーション
- EventEmitterがわかっていないとStreamは理解できません、(私がそうでした。)
参考記事
- Node.js の Stream API で「データの流れ」を扱う方法 - Block Rockin’ Codes
- Stream今昔物語-from scratch
- Node.jsのStream APIで大量プッシュ通知を高速化するテクニック-@IT<
Streamとは
データの流れを扱うAPIです。
Node.jsが出る前に存在した概念で、もろもろ調べたが下記問題を解決する概念らしい。
- 大量データをメモリ上に展開する際に生じる問題
- 一連のI/O処理の最適化問題
- コールバック地獄 ※ node.jsに限る
アイディアとしては、データを一気に処理するのではなく、ある単位(破片)で読み込み、
後工程に回して処理するようにしたこと。
メモリ上に展開する読み込みデータも抑えられ、かつI/O待ちをしながらも後続の処理を実行できるので
処理全体のスピードを最適化できるのがStreamAPIのスゴイところです。
要はバケツリレー的な感じです。
Node.js における StreamAPIの実装方法
Node.js本体にStreamインターフェースが用意されております。
もちろん、インターフェースなので必要なイベント/プロパティ/メソッドをStreamAPIの概念を理解して実装する必要があります。
実装に必要なメソッドについて書いていきます。
データの読み込みを行う ReadableStream
全てのストリームはEventEmitterのインスタンスです。
基本的に覚えておくのは下記イベントとメソッド
- Event:data
-
function (data) {}
データ読み込み時に発生するdata
イベント。断片的に読み込んだデータを扱います。
-
- Event:end
-
function () {}
データ読み込み完了時に発生するend
イベント。データ読み込み完了時に行いたい処理、リソースの開放などを記述します。
-
- Stream.pause()
- データの読み込みを止める時に呼び出します。
- Stream.resume()
- データの読み込みを再開する時に呼び出します。
- Stream.pipe(writableStream)
- 自身の読み込みストリームを書き込みストリームに接続する時に呼び出します。Unixの
|
(パイプ)でデータを次に渡して処理する概念を踏襲している。 (例:$ cat hoge.txt | grep HOGE
)
- 自身の読み込みストリームを書き込みストリームに接続する時に呼び出します。Unixの
WritableStream
Readable Stream で断片的に読み込まれるデータを応用する時に用いるストリーム。
書き込み可能なストリームと言うらしいが、個人的にはピンときていないので、読み込んだデータを使って応用するものだと解釈している。
基本的に覚えておくのは下記イベントとメソッド
- Event:drain
-
function () {}
読み込みストリームをpauseしていた状態から、メモリが空いたなどでデータを読み込み可能になった状態を示す。
-
- Event:pipe
-
function (src) {}
読み込みストリームsrc
が自身のwritableストリームにpipe()された時に発生するイベント。
-
- Stream.write(data)
- 読み込みストリームで
data
イベントが発生した時に呼ばれるメソッド。このメソッドでデータを加工します。これ以上読み込みたくない時はfalseを返す。引き続きデータを読み込むときにはtrueを返すのが決まり。
- 読み込みストリームで
練習問題
実際コードを書いてみないと理解できないので、練習問題を作りました。
ldjson形式の読み込みファイルを用意。
練習1. ReadableStream
問題
fs.createReadStream を用いて、sample.ldjson を10byteづつ読み込んで、
dataイベント発生時に、読み込んだ内容を 標準出力(console.log) してください。
Readableストリームを使ってみましょうという問題。
解答例
"use strict";
var fs = require('fs');
var fileName = '100.ldjson';
var fileReaderStream = fs.createReadStream(fileName, {bufferSize: 10});
fileReaderStream.setEncoding('utf8');
var count = 0;
fileReaderStream.on('data', function(data) {
count++;
console.log(count + ': ' + data);
});
fileReaderStream.resume();
出力
断片的にデータが読み込まれることがわかると思います。
1: {"gender":
2: "male", "
3: age": 51,
4: "weapon":
5: "HokoYari"
6: , "power":
7: 222347}
{
...略...
練習2. ReadableStream -> WritableStream
問題
練習1で作成したStream に1行ずつ改行毎にDataイベントを発生させるWritableなLineStreamを接続(pipe)してください。LineStreamのdataイベント発生時に、読み込んだ内容を 標準出力(console.log) してください。
ReadableストリームとWritableストリームとの連結方法、Writableストリームの使い方を掴みましょう。
解答例
var LineStream = require('./line_stream');
var fileName = 'sample.ldjson'
var fileReaderStream = require('fs').createReadStream(fileName, {bufferSize: 10});
fileReaderStream.setEncoding('utf8');
var lineStream = new LineStream()
var index = 0;
lineStream.on('data', function(data) {
index++;
console.log(index + ': ' + data);
});
fileReaderStream.pipe(lineStream);
lineStream.resume();
"use strict";
module.exports = LineStream;
var stream = require('stream');
var util = require('util');
function LineStream() {
this.writable = true;
this.readable = true;
this.ended = false;
this.paused = true;
this.encoding = 'utf8';
this.buf = '';
};
util.inherits(LineStream, stream.Stream);
/* readable stream function */
LineStream.prototype.setEncoding = function (encoding) {
this.encoding = encoding;
};
LineStream.prototype.pause = function () {
this.paused = true;
};
LineStream.prototype.resume = function () {
this.paused = false;
/* TODO: 'drain' */
};
/* writable stream function */
LineStream.prototype.write = function (data) {
if (this.ended || this.paused) {
return false;
}
if (Buffer.isBuffer(data)) {
this.buf += data.toString(this.encoding);
} else {
this.buf += data;
}
this.searchLine();
return true;
};
LineStream.prototype.searchLine = function () {
while(true) {
var pos = 0;
var index = this.buf.indexOf('\n');
var line;
if (index > -1) {
line = this.buf.substring(pos, index);
this.emit('data', line);
this.buf = this.buf.substring(index + 1);
continue;
}
break;
}
};
LineStream.prototype.pipe = function (dest) {
this.on('data', function(data) {
dest.write(data);
});
return dest;
};
LineStream.prototype.end = function () {
if (this.buf) {
this.emit('data', this.buf);
}
this.ended = true;
this.destroy();
};
LineStream.prototype.destroy = function () {
this.ended = true;
this.readable = false;
this.writable = false;
this.emit('end');
this.emit('close');
};
出力
1: {"gender": "male", "age": 51, "weapon": "HokoYari", "power": 222347}
2: {"gender": "male", "age": 69, "weapon": "HokoYari", "power": 562426}
3: {"gender": "female", "age": 70, "weapon": "pike", "power": 871983}
4: {"gender": "female", "age": 94, "weapon": "HokoYari", "power": 204004}
... 略 ...
練習3. ReadableStream -> 流入制限したWritableStream
問題
練習2のLineStreamを改良して、1秒間に1行ずつ`data`イベントを発生するストリームに変更しましょう。
ストリームの肝である、pause()/resume()/drain
イベントの一連の流れを掴みましょう。
解答例
"use strict";
var fs = require('fs');
var LineRestrictStream = require('./line_restrict_stream');
var fileName = '100.ldjson';
var fileReaderStream = fs.createReadStream(fileName, {bufferSize: 10});
fileReaderStream.setEncoding('utf8');
var lineRestrictStream = new LineRestrictStream(100);
var count = 0;
lineRestrictStream.on('data', function(data) {
count++;
console.log(count + ' : ' + data);
});
lineRestrictStream.on('drain', function() {
//console.log(' LineRestrictStream Event \'drain\' occurred.')
fileReaderStream.resume();
});
lineRestrictStream.on('pipe', function(src) {
lineRestrictStream.setSrcStream(src);
});
fileReaderStream.pipe(lineRestrictStream);
lineRestrictStream.resume();
"use strict";
module.exports = LineRestrictStream;
var stream = require('stream');
var util = require('util');
function LineRestrictStream(interval) {
this.srcStream;
this.writable = true;
this.readable = true;
this.ended = false;
this.paused = true;
this.encoding = 'utf8';
this.bufStr = '';
this.buf = [];
// For Stream Restriction
this.interval = interval || 1000;
this.lastDequeuedAt = 0;
this.hasTimer = false;
};
util.inherits(LineRestrictStream, stream.Stream);
/* readable stream function */
LineRestrictStream.prototype.setEncoding = function (encoding) {
this.encoding = encoding;
};
LineRestrictStream.prototype.pause = function () {
this.paused = true;
};
LineRestrictStream.prototype.resume = function () {
this.paused = false;
this.dequeue();
};
LineRestrictStream.prototype.setSrcStream = function (srcStream) {
this.srcStream = srcStream;
};
/* writable stream function */
LineRestrictStream.prototype.write = function (data) {
if (this.ended || this.paused) {
return false;
}
if (Buffer.isBuffer(data)) {
data = data.toString(this.encoding);
}
this.bufStr += data;
this.searchLine();
if (this.buf.length > 5) {
return false;
}
this.dequeue();
return true;
};
LineRestrictStream.prototype.searchLine = function () {
while(true) {
var pos = 0;
var index = this.bufStr.indexOf('\n');
var line;
if (index > -1) {
line = this.bufStr.substring(pos, index);
this.enqueue(line);
this.bufStr = this.bufStr.substring(index + 1);
continue;
}
break;
}
};
LineRestrictStream.prototype.enqueue = function (line) {
this.buf.push(line);
};
LineRestrictStream.prototype.dequeue = function (isTimerProcess) {
if (!isTimerProcess && this.hasTimer) {
return;
}
if (this.buf.length == 0) {
this.emit('drain');
return;
}
this.startTimer();
};
LineRestrictStream.prototype.startTimer = function () {
var now = Date.now();
var diff = now - this.lastDequeuedAt;
if (diff < this.interval) {
if (!this.hasTimer) {
this.setTimer(this.interval - diff);
}
return;
}
console.log(' \tYou waited ' + diff + 'ms.' + ' (queue_size: ' + this.buf.length +')');
this.lastDequeuedAt = now;
this.emit('data', this.buf.shift());
this.setDequeueTimer(this.interval);
};
LineRestrictStream.prototype.setDequeueTimer = function (time) {
var self = this;
setTimeout(function() {
self.dequeue(true);
this.hasTimer = false;
if (self.buf.length == 0) {
self.emit('drain');
}
}, time);
this.hasTimer = true;
};
LineRestrictStream.prototype.pipe = function (dest) {
this.on('data', function(data) {
dest.write(data);
});
return dest;
};
LineRestrictStream.prototype.end = function () {
if (this.buf) {
this.dequeue();
}
this.ended = true;
this.destroy();
};
LineRestrictStream.prototype.destroy = function () {
this.ended = true;
this.readable = false;
this.writable = false;
this.emit('end');
this.emit('close');
};
出力
You waited 1434933713380ms. (queue_size: 1)
1 : {"gender": "male", "age": 51, "weapon": "HokoYari", "power": 222347}
You waited 119ms. (queue_size: 6)
2 : {"gender": "male", "age": 69, "weapon": "HokoYari", "power": 562426}
You waited 101ms. (queue_size: 5)
3 : {"gender": "female", "age": 70, "weapon": "pike", "power": 871983}
You waited 101ms. (queue_size: 4)
4 : {"gender": "female", "age": 94, "weapon": "HokoYari", "power": 204004}
You waited 101ms. (queue_size: 3)
5 : {"gender": "male", "age": 74, "weapon": "sword", "power": 549572}
You waited 101ms. (queue_size: 2)
... 略 ...
まとめ
当初想定していたより、記述量が多い印象を持った。Stream2/Stream3ではどの程度改善されたのかは追って調べたい。
Node.jsは随所にstreamを用いた実装がされているので、この少しnode.jsと仲良くなれた気がします。
理解に相当時間かかってしまった。。ふぅー。