Help us understand the problem. What is going on with this article?

Node.jsのStream APIの概要

More than 3 years have passed since last update.

Streamとはファイルの読み書きやSocketでの通信など、主にI/Oにおけるデータの流れを制御するAPIです。
Streamは0,1,2,3のバージョンがありますが、Node.js v0.12からは3が使われているのでそれについて書きます。

公式ドキュメントNode.js v5.2.0 Documentation Streamがわかりやすいですが、特に重要だと思った部分だけ大雑把にまとめてみました。(ドキュメントの日本語訳はv0.11.11のものがあります。多少異なりますが参考までに)

利用する側

ReadableStream

例えばReadableStreamによるファイル読み込み処理は次のように書きます。
全てのまとまったデータが一度に取れるのではなく'data'イベントで少しずつデータが流れてくるのが特徴的です。

var readableStream = fs.createReadStream('filename');
readableStream.on('data', function(data) {
  console.log(data);
});
readableStream.on('end', function() {
  console.log('end');
});

また、read([size])メソッドを使い

  var chunk;
  while (null !== (chunk = readable.read())) {
    console.log(chunk);
  }

のようにsizeバイトを指定(任意)して直接データを取得することもできます。

WritableStream

WritableStreamに対してはwrite(chunk[,encoding][,callback])を使いデータを書き込むことができます。

  • chunk {String | Buffer} 書き込まれるデータ
  • encoding {String} もし chunk が文字列なら、そのエンコーディング
  • callback {Function} データが掃き出された時に呼び出されるコールバック
  • Returns: {Boolean} データが完全に処理された場合は true。

このメソッドはデータを下層のシステムに書き込み、データが完全に処理されると 与えられたコールバックを一度だけ呼び出します。

戻り値は書き込みをすぐに続けていいかどうかを示します。 もしデータが内部にバッファリングされなければならないなら false を返します。 そうでなければ true を返します。

この戻り値は完全にアドバイス的です。 もしこれが false を返しても、あなたは書き込みを続けることが「できます」。 しかしながら、書き込まれたデータはメモリにバッファリングされるため、 これを過剰にしないことが最善です。 代わりに、より多くのデータを書く前に 'drain' イベントを待機してください。

次のコードは100万回dataを書き込む関数の例です。

function writeOneMillionTimes(writer, data, encoding, callback) {
  var i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i -= 1;
      if (i === 0) {
        // last time!
        writer.write(data, encoding, callback);
      } else {
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // 書き込みを再開してもよい適切なタイミングでdrainが発行される。
      writer.once('drain', write);
    }
  }
}

readable.pipe()

Readableから流れてくるデータをそのままWritableに書き込みたい場合はpipeを使うことができます。
例えばHTTPで受け取ったレスポンスをファイルに書き込みたい場合はrequest(url).pipe(fs.createWriteStream(fileName))というように書けます。

ストリーム、特に pipe() メソッドの目的は、データのバッファリングを 許容できるレベルに制限することです。そのため、様々な速度の入力元と出力先で、 利用可能なメモリを圧迫しません。

上の例で言えば、受信するスピードが書き込むスピードより非常に早かったとしてもpipeを使えばメモリを圧迫せずよしなにデータの流れを制御してくれます。
内部的には、上述の100万回書き込みのようにwriteの返り値や'drain'を監視しながら、Readableの流れを止めたり再開したりしています。

他のストリーム

Writable, Readableの他にDuplex, Transformがあります。

DuplexはWritableとReadableの両方の機能を持つストリームです。TCPのソケットやHTTPではクライアント側にしてもサーバ側にしても受信、送信ができるわけなので実装にはDuplexがよく使われています。

TransformはDuplexの子クラスで、データを変形して次に渡すストリームです。例えば、gzipの圧縮はTransformStreamであるgzipを使えば次のように出来ます。

var inp = fs.createReadStream('file.txt');
var gzip = zlib.createGzip();
var out = fs.createWriteStream('file.txt.gz');
inp.pipe(gzip).pipe(out);

実装する側

Streamのクラスを作りたい場合は、各クラスを継承し必要なメソッドを実装しなければなりません。
(参考:Node.js v5.2.0 Documentation Stream#API for Stream Implementors)

継承するクラス 実装しなければならないメソッド
Readable _read
Writable _write
Duplex _read, _write
Transform _transform

これらのメソッドは子クラスが実装し、Streamの内部から良い感じのタイミングで呼び出されるもので、クラスの利用者や実装者が直接呼んではいけません。

ReadableStream

_read(size)は次のデータが読み込まれる時に呼ばれます。return chunkのように返り値にしてデータを渡したり、this.emit('data', chunk)のようにイベントを発行してデータを渡したり するのではなくthis.push(chunk)として内部で管理されているキューにプッシュします。こうすることで内部実装が良い感じのタイミングに'data'イベントを発行してくれたり、read([size])の返り値として適切にデータを返してくれます。

WritableStream

データが渡されてきたときに_write(chunk, encoding, callback)が呼ばれます。
chunkにはBufferかStringで書き込まれるチャンクが渡されます。
encodingはStringで、chunkがStringである場合にエンコーディング方式が指定されます。
callbackは、書き込みの処理が終わった時に呼ばなければならない関数です。

DuplexStream

_read, _writeの各メソッドをReadable, Writableと同じように実装します。

TransformStream

データが渡されてきたときに_transform(chunk, encoding, callback)が呼ばれるので、この入力チャンクを処理したものをpush(outputChunk)として出力し、処理が完了したときにcallbackを呼びます。

takaaki7
https://plaid.co.jp/
https://plaid.co.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away