search
LoginSignup
166

More than 3 years have passed since last update.

posted at

updated at

Node.js StreamAPI から逃げちゃダメだ。逃げちゃダメだ。。

業務でnode.jsを使ったシステムを保守エンハンスをしていますが。
長い間理解できなかった、重い腰を上げてStreamAPIについて勉強したので、覚え書きをします。

Node.jsの世界で有名な格言として
「Stream を制するものは Node.jsを制す」
と言われております。さあNodeを我が物にしましょう。

※ StreamAPIのバージョンはStream1/Stream2/Stream3とありますが、今回はStream1の話になります。
※ 業務で使っているnode.jsのバージョンがv0.8なので。
※ v0.10系からStream2。v0.12系からStream3になっています。背景違いなどは追ってキャッチアップする予定。

StreamAPIの情報

勉強に用いたソース。

公式Doc

参考記事

Streamとは

データの流れを扱うAPIです。
Node.jsが出る前に存在した概念で、もろもろ調べたが下記問題を解決する概念らしい。

  • 大量データをメモリ上に展開する際に生じる問題
  • 一連のI/O処理の最適化問題
  • コールバック地獄 ※ node.jsに限る

アイディアとしては、データを一気に処理するのではなく、ある単位(破片)で読み込み、
後工程に回して処理するようにしたこと。

メモリ上に展開する読み込みデータも抑えられ、かつI/O待ちをしながらも後続の処理を実行できるので
処理全体のスピードを最適化できるのがStreamAPIのスゴイところです。

要はバケツリレー的な感じです。

Node.js における StreamAPIの実装方法

Node.js本体にStreamインターフェースが用意されております。
もちろん、インターフェースなので必要なイベント/プロパティ/メソッドをStreamAPIの概念を理解して実装する必要があります。
実装に必要なメソッドについて書いていきます。

データの読み込みを行う ReadableStream

全てのストリームはEventEmitterのインスタンスです。

基本的に覚えておくのは下記イベントとメソッド

  • Event:data
    • function (data) {} データ読み込み時に発生するdataイベント。断片的に読み込んだデータを扱います。
  • Event:end
    • function () {} データ読み込み完了時に発生する endイベント。データ読み込み完了時に行いたい処理、リソースの開放などを記述します。
  • Stream.pause()
    • データの読み込みを止める時に呼び出します。
  • Stream.resume()
    • データの読み込みを再開する時に呼び出します。
  • Stream.pipe(writableStream)
    • 自身の読み込みストリームを書き込みストリームに接続する時に呼び出します。Unixの|(パイプ)でデータを次に渡して処理する概念を踏襲している。 (例: $ cat hoge.txt | grep HOGE )

WritableStream

Readable Stream で断片的に読み込まれるデータを応用する時に用いるストリーム。
書き込み可能なストリームと言うらしいが、個人的にはピンときていないので、読み込んだデータを使って応用するものだと解釈している。

基本的に覚えておくのは下記イベントとメソッド

  • Event:drain
    • function () {} 読み込みストリームをpauseしていた状態から、メモリが空いたなどでデータを読み込み可能になった状態を示す。
  • Event:pipe
    • function (src) {} 読み込みストリームsrcが自身のwritableストリームにpipe()された時に発生するイベント。
  • Stream.write(data)
    • 読み込みストリームでdataイベントが発生した時に呼ばれるメソッド。このメソッドでデータを加工します。これ以上読み込みたくない時はfalseを返す。引き続きデータを読み込むときにはtrueを返すのが決まり。

練習問題

実際コードを書いてみないと理解できないので、練習問題を作りました。
ldjson形式の読み込みファイルを用意。

練習1. ReadableStream

問題

paractice1.txt
fs.createReadStream を用いて、sample.ldjson を10byteづつ読み込んで、
dataイベント発生時に、読み込んだ内容を 標準出力(console.log) してください。

Readableストリームを使ってみましょうという問題。

解答例

practice1.js
"use strict";

var fs = require('fs');
var fileName = '100.ldjson';

var fileReaderStream = fs.createReadStream(fileName, {bufferSize: 10});
fileReaderStream.setEncoding('utf8');

var count = 0;
fileReaderStream.on('data', function(data) {
  count++;
  console.log(count + ': ' + data);
});

fileReaderStream.resume();

出力

断片的にデータが読み込まれることがわかると思います。

練習1.out
1: {"gender":
2:  "male", "
3: age": 51, 
4: "weapon": 
5: "HokoYari"
6: , "power":
7:  222347}
{
...略...

練習2. ReadableStream -> WritableStream

問題

練習2.txt
練習1で作成したStream に1行ずつ改行毎にDataイベントを発生させるWritableなLineStreamを接続(pipe)してください。LineStreamのdataイベント発生時に、読み込んだ内容を 標準出力(console.log) してください。

ReadableストリームとWritableストリームとの連結方法、Writableストリームの使い方を掴みましょう。

解答例

practice2.js
var LineStream = require('./line_stream');

var fileName = 'sample.ldjson'
var fileReaderStream = require('fs').createReadStream(fileName, {bufferSize: 10});
fileReaderStream.setEncoding('utf8');

var lineStream = new LineStream()
var index = 0;
lineStream.on('data', function(data) {
  index++;
  console.log(index + ': ' + data);
});

fileReaderStream.pipe(lineStream);
lineStream.resume();
line_stream.js
"use strict";

module.exports = LineStream;

var stream = require('stream');
var util = require('util');

function LineStream() {
    this.writable = true;
    this.readable = true;
    this.ended = false;
    this.paused = true;
    this.encoding = 'utf8';
    this.buf = '';
};

util.inherits(LineStream, stream.Stream);

/* readable stream function */
LineStream.prototype.setEncoding = function (encoding) {
  this.encoding = encoding;
};

LineStream.prototype.pause = function () {
  this.paused = true;
};

LineStream.prototype.resume = function () {
  this.paused = false;
  /* TODO: 'drain' */
};

/* writable stream function */
LineStream.prototype.write = function (data) {
  if (this.ended || this.paused) {
    return false;
  }
  if (Buffer.isBuffer(data)) {
    this.buf += data.toString(this.encoding); 
  } else {
    this.buf += data;
  }
  this.searchLine();
  return true;
};

LineStream.prototype.searchLine = function () {
  while(true) {
    var pos = 0;
    var index = this.buf.indexOf('\n');
    var line;
    if (index > -1) {
      line = this.buf.substring(pos, index);
      this.emit('data', line);
      this.buf = this.buf.substring(index + 1);
      continue;
    }
    break;
  }
};

LineStream.prototype.pipe = function (dest) {
  this.on('data', function(data) {
    dest.write(data);
  });
  return dest;
};

LineStream.prototype.end = function () {
  if (this.buf) {
    this.emit('data', this.buf);
  }
  this.ended = true;
  this.destroy();
};

LineStream.prototype.destroy = function () {
  this.ended = true;
  this.readable = false;
  this.writable = false;
  this.emit('end');
  this.emit('close');
};

出力

練習2.out
1: {"gender": "male", "age": 51, "weapon": "HokoYari", "power": 222347}
2: {"gender": "male", "age": 69, "weapon": "HokoYari", "power": 562426}
3: {"gender": "female", "age": 70, "weapon": "pike", "power": 871983}
4: {"gender": "female", "age": 94, "weapon": "HokoYari", "power": 204004}
... 略 ...

練習3. ReadableStream -> 流入制限したWritableStream

問題

練習3.txt
練習2のLineStreamを改良して、1秒間に1行ずつ`data`イベントを発生するストリームに変更しましょう。

ストリームの肝である、pause()/resume()/drainイベントの一連の流れを掴みましょう。

解答例

practice3.js
"use strict";
var fs = require('fs');
var LineRestrictStream = require('./line_restrict_stream');

var fileName = '100.ldjson';
var fileReaderStream = fs.createReadStream(fileName, {bufferSize: 10});
fileReaderStream.setEncoding('utf8');

var lineRestrictStream = new LineRestrictStream(100);

var count = 0;
lineRestrictStream.on('data', function(data) {
  count++;
  console.log(count + ' : ' + data);
});

lineRestrictStream.on('drain', function() {
  //console.log('  LineRestrictStream Event \'drain\' occurred.')
  fileReaderStream.resume();
});

lineRestrictStream.on('pipe', function(src) {
  lineRestrictStream.setSrcStream(src);
});

fileReaderStream.pipe(lineRestrictStream);
lineRestrictStream.resume();
line_restrict_stream.js
"use strict";

module.exports = LineRestrictStream;

var stream = require('stream');
var util = require('util');

function LineRestrictStream(interval) {
    this.srcStream;
    this.writable = true;
    this.readable = true;
    this.ended = false;
    this.paused = true;
    this.encoding = 'utf8';
    this.bufStr = '';
    this.buf = [];

    // For Stream Restriction
    this.interval = interval || 1000;
    this.lastDequeuedAt = 0;
    this.hasTimer = false;
};

util.inherits(LineRestrictStream, stream.Stream);

/* readable stream function */
LineRestrictStream.prototype.setEncoding = function (encoding) {
  this.encoding = encoding;
};

LineRestrictStream.prototype.pause = function () {
  this.paused = true;
};

LineRestrictStream.prototype.resume = function () {
  this.paused = false;
  this.dequeue();
};

LineRestrictStream.prototype.setSrcStream = function (srcStream) {
  this.srcStream = srcStream;
};

/* writable stream function */
LineRestrictStream.prototype.write = function (data) {
  if (this.ended || this.paused) {
    return false;
  }
  if (Buffer.isBuffer(data)) {
    data = data.toString(this.encoding);
  }
  this.bufStr += data;
  this.searchLine();
  if (this.buf.length > 5) {
    return false;
  }
  this.dequeue();
  return true;
};

LineRestrictStream.prototype.searchLine = function () {
  while(true) {
    var pos = 0;
    var index = this.bufStr.indexOf('\n');
    var line;
    if (index > -1) {
      line = this.bufStr.substring(pos, index);
      this.enqueue(line);
      this.bufStr = this.bufStr.substring(index + 1);
      continue;
    }
    break;
  }
};

LineRestrictStream.prototype.enqueue = function (line) {
  this.buf.push(line);
};

LineRestrictStream.prototype.dequeue = function (isTimerProcess) {
  if (!isTimerProcess && this.hasTimer) {
      return;
  }
  if (this.buf.length == 0) {
    this.emit('drain');
    return;
  }
  this.startTimer();
};

LineRestrictStream.prototype.startTimer = function () {
  var now = Date.now();
  var diff = now - this.lastDequeuedAt;

  if (diff < this.interval) {
    if (!this.hasTimer) {
      this.setTimer(this.interval - diff);
    }
    return;
  }
  console.log('  \tYou waited ' + diff + 'ms.' + ' (queue_size: ' + this.buf.length +')');
  this.lastDequeuedAt = now;
  this.emit('data', this.buf.shift());
  this.setDequeueTimer(this.interval);
};

LineRestrictStream.prototype.setDequeueTimer = function (time) {
  var self = this;
  setTimeout(function() {
    self.dequeue(true);
    this.hasTimer = false;
    if (self.buf.length == 0) {
      self.emit('drain');
    }
  }, time);
  this.hasTimer = true;
};


LineRestrictStream.prototype.pipe = function (dest) {
  this.on('data', function(data) {
    dest.write(data);
  });
  return dest;
};

LineRestrictStream.prototype.end = function () {
  if (this.buf) {
    this.dequeue();
  }
  this.ended = true;
  this.destroy();
};

LineRestrictStream.prototype.destroy = function () {
  this.ended = true;
  this.readable = false;
  this.writable = false;
  this.emit('end');
  this.emit('close');
};

出力

練習3.out
    You waited 1434933713380ms. (queue_size: 1)
1 : {"gender": "male", "age": 51, "weapon": "HokoYari", "power": 222347}
    You waited 119ms. (queue_size: 6)
2 : {"gender": "male", "age": 69, "weapon": "HokoYari", "power": 562426}
    You waited 101ms. (queue_size: 5)
3 : {"gender": "female", "age": 70, "weapon": "pike", "power": 871983}
    You waited 101ms. (queue_size: 4)
4 : {"gender": "female", "age": 94, "weapon": "HokoYari", "power": 204004}
    You waited 101ms. (queue_size: 3)
5 : {"gender": "male", "age": 74, "weapon": "sword", "power": 549572}
    You waited 101ms. (queue_size: 2)
... 略 ...

まとめ

当初想定していたより、記述量が多い印象を持った。Stream2/Stream3ではどの程度改善されたのかは追って調べたい。

Node.jsは随所にstreamを用いた実装がされているので、この少しnode.jsと仲良くなれた気がします。
理解に相当時間かかってしまった。。ふぅー。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
166