Streamとはファイルの読み書きやSocketでの通信など、主にI/Oにおけるデータの流れを制御するAPIです。
Streamは0,1,2,3のバージョンがありますが、Node.js v0.12からは3が使われているのでそれについて書きます。
公式ドキュメントNode.js v5.2.0 Documentation Streamがわかりやすいですが、特に重要だと思った部分だけ大雑把にまとめてみました。(ドキュメントの日本語訳はv0.11.11のものがあります。多少異なりますが参考までに)
##利用する側
ReadableStream
例えばReadableStreamによるファイル読み込み処理は次のように書きます。
全てのまとまったデータが一度に取れるのではなく'data'
イベントで少しずつデータが流れてくるのが特徴的です。
var readableStream = fs.createReadStream('filename');
readableStream.on('data', function(data) {
console.log(data);
});
readableStream.on('end', function() {
console.log('end');
});
また、read([size])
メソッドを使い
var chunk;
while (null !== (chunk = readable.read())) {
console.log(chunk);
}
のようにsize
バイトを指定(任意)して直接データを取得することもできます。
WritableStream
WritableStreamに対してはwrite(chunk[,encoding][,callback])
を使いデータを書き込むことができます。
- chunk {String | Buffer} 書き込まれるデータ
- encoding {String} もし chunk が文字列なら、そのエンコーディング
- callback {Function} データが掃き出された時に呼び出されるコールバック
- Returns: {Boolean} データが完全に処理された場合は true。
このメソッドはデータを下層のシステムに書き込み、データが完全に処理されると 与えられたコールバックを一度だけ呼び出します。
戻り値は書き込みをすぐに続けていいかどうかを示します。 もしデータが内部にバッファリングされなければならないなら false を返します。 そうでなければ true を返します。
この戻り値は完全にアドバイス的です。 もしこれが false を返しても、あなたは書き込みを続けることが「できます」。 しかしながら、書き込まれたデータはメモリにバッファリングされるため、 これを過剰にしないことが最善です。 代わりに、より多くのデータを書く前に 'drain' イベントを待機してください。
次のコードは100万回data
を書き込む関数の例です。
function writeOneMillionTimes(writer, data, encoding, callback) {
var i = 1000000;
write();
function write() {
var ok = true;
do {
i -= 1;
if (i === 0) {
// last time!
writer.write(data, encoding, callback);
} else {
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// 書き込みを再開してもよい適切なタイミングでdrainが発行される。
writer.once('drain', write);
}
}
}
readable.pipe()
Readableから流れてくるデータをそのままWritableに書き込みたい場合はpipe
を使うことができます。
例えばHTTPで受け取ったレスポンスをファイルに書き込みたい場合はrequest(url).pipe(fs.createWriteStream(fileName))
というように書けます。
ストリーム、特に pipe() メソッドの目的は、データのバッファリングを 許容できるレベルに制限することです。そのため、様々な速度の入力元と出力先で、 利用可能なメモリを圧迫しません。
上の例で言えば、受信するスピードが書き込むスピードより非常に早かったとしてもpipe
を使えばメモリを圧迫せずよしなにデータの流れを制御してくれます。
内部的には、上述の100万回書き込みのようにwrite
の返り値や'drain'
を監視しながら、Readableの流れを止めたり再開したりしています。
他のストリーム
Writable, Readableの他にDuplex, Transformがあります。
DuplexはWritableとReadableの両方の機能を持つストリームです。TCPのソケットやHTTPではクライアント側にしてもサーバ側にしても受信、送信ができるわけなので実装にはDuplexがよく使われています。
TransformはDuplexの子クラスで、データを変形して次に渡すストリームです。例えば、gzipの圧縮はTransformStreamであるgzip
を使えば次のように出来ます。
var inp = fs.createReadStream('file.txt');
var gzip = zlib.createGzip();
var out = fs.createWriteStream('file.txt.gz');
inp.pipe(gzip).pipe(out);
##実装する側
Streamのクラスを作りたい場合は、各クラスを継承し必要なメソッドを実装しなければなりません。
(参考:Node.js v5.2.0 Documentation Stream#API for Stream Implementors)
継承するクラス | 実装しなければならないメソッド |
---|---|
Readable | _read |
Writable | _write |
Duplex |
_read , _write
|
Transform | _transform |
これらのメソッドは子クラスが実装し、Streamの内部から良い感じのタイミングで呼び出されるもので、クラスの利用者や実装者が直接呼んではいけません。
ReadableStream
_read(size)
は次のデータが読み込まれる時に呼ばれます。return chunk
のように返り値にしてデータを渡したり、this.emit('data', chunk)
のようにイベントを発行してデータを渡したり するのではなく 、this.push(chunk)
として内部で管理されているキューにプッシュします。こうすることで内部実装が良い感じのタイミングに'data'
イベントを発行してくれたり、read([size])
の返り値として適切にデータを返してくれます。
WritableStream
データが渡されてきたときに_write(chunk, encoding, callback)
が呼ばれます。
chunk
にはBufferかStringで書き込まれるチャンクが渡されます。
encoding
はStringで、chunk
がStringである場合にエンコーディング方式が指定されます。
callback
は、書き込みの処理が終わった時に呼ばなければならない関数です。
DuplexStream
_read
, _write
の各メソッドをReadable, Writableと同じように実装します。
TransformStream
データが渡されてきたときに_transform(chunk, encoding, callback)
が呼ばれるので、この入力チャンクを処理したものをpush(outputChunk)
として出力し、処理が完了したときにcallback
を呼びます。