Help us understand the problem. What is going on with this article?

nodejs v12(LTS)におけるasync, awaitを用いたstream処理

nodejs v12(LTS)におけるasync, awaitを用いたstream処理

QualiArts Advent Calendar 2019、3日目の記事になります。

はじめに

2019年10月21日にnodejs v12のLTS版が公開されました。

nodeは奇数バージョンが開発版、偶数バージョンが安定版となるため、v11以降の今まで実プロジェクトだと利用しにくかった機能がこれによりいくつか使えるようになりました。

そのなかでもasync-generatorsやfor-await-of loops構文が大手を振って使えるようになったことにより、stream関連の処理が大きく変わると感じたため、すでにいくつか紹介されている記事も有りますが、この機会に改めて紹介したいと思います。
また、最後に簡単なサンプル処理も記述しているので、ご参考いただければ幸いです。

for-await-of loops

今までstreamはeventEmitterを利用し、発火したeventをトリガーに処理を記述していました。
for-await-of loopsを用いると下記のようにわかりやすくかけるようになります。
for-await-of自体は単純にfor-ofのasyncも利用可能になったものとなります。

for-await-of_loops1.js
const fs = require('fs');

const reader = async (rs) => {
  for await (const chunk of rs) {
    console.log(chunk);
  }
};
(async function () {
  const rs = fs.createReadStream('./input.txt', { encoding: 'utf8' });
  await reader(rs);
})();
input.txt
abcde
fghij
klmno
pqrxy
z

実行結果

terminal
abcde
fghij
klmno
pqrxy
z

従来だと、下記のように終了イベントをpromise化するなどで、全体のpromise化などは簡単にできますが、イテレータ内部のchunk単位でpromise化を行う場合非常に可読性が悪くなってしまっていました。
(v10以降であれば下記のようにstream.finishedをpromisifyすることで全体のpromise化は簡略可能です)

これが上記のように簡単に記述できるようになったのは非常にやりやすくなったと感じます。

for-await-of_loops2.js
const fs = require('fs');
const stream = require('stream');
const util = require('util');

const finished = util.promisify(stream.finished);
const msleep = util.promisify(setTimeout);

// streamを用いた場合の処理(全体終了部分のみのpromise化)
const reader1 = async (rs) => {
  rs.on('data', (chunk) => {
    console.log(chunk);
  });

  await finished(rs);
  // stream.finishedを使わない場合下記のようなpromiseを生成する
  // await new Promise((resolve, reject) => {
  //   rs.once('finished', (chunk) => {
  //     return resolve(data);
  //   });
  //   rs.once('error', (err) => {
  //     return reject(err);
  //   });
  // });
};

// streamを用いた場合の処理(chunk単位でのpromise化)
const reader2 = async (rs, iterator) => {
  let buffer = '';
  rs.on('data', (chunk) => {
    buffer = chunk;
    rs.pause();
  });

  let isEnd = false;
  rs.once('end', () => {
    isEnd = true;
  });
  let error;
  rs.once('error', (err) => {
    error = err;
  });

  while (true) {
    if (error) {
      throw error;
    }
    if (buffer) {
      await iterator(buffer);
      buffer = '';
    } else if (isEnd) {
      return;
    } else if (rs.isPaused()) {
      rs.resume();
    }
    // 非同期メソッドがないと無限ループしてしまうため
    await msleep(0);
  }
}

(async function () {
  const rs1 = fs.createReadStream('./input.txt', { encoding: 'utf8' });
  await reader1(rs1);
  const rs2 = fs.createReadStream('./input.txt', { encoding: 'utf8' });
  await reader2(rs2, async (chunk) => {
    console.log(chunk);
  });
})();

実行結果

terminal
abcde
fghij
klmno
pqrxy
z
abcde
fghij
klmno
pqrxy
z

async-generators

今までは同期メソッドでしか使えなかったyieldがasyncにも対応しました。
async function*でasyncIteratorのジェネレータメソッドを生成でき、await対応したnextメソッドを呼び出すことができます。
(nextで呼び出した場合返り値はObjectになります)

async-generators1.js
const util = require('util');
const msleep = util.promisify(setTimeout);

async function* generate() {
  for (let i = 1; i <= 3; i++) {
    await msleep(1000);
    yield i;
  }
}

const asyncIterator = generate();
(async () => {
  console.log(await asyncIterator.next());
  console.log(await asyncIterator.next());
  console.log(await asyncIterator.next());
  console.log(await asyncIterator.next());
})();

実行結果

{ value: 1, done: false }
{ value: 2, done: false }
{ value: 3, done: false }
{ value: undefined, done: true }

こちらはfor-await-of loopsも利用可能です。
こちらを利用すると簡単にラグのあるstreamデータの生成が可能になります。

async-generators2.js
const util = require('util');
const msleep = util.promisify(setTimeout);

async function* generate() {
  for (let i = 1; i <= 3; i++) {
    await msleep(1000);
    yield i;
  }
}

const asyncIterator = generate();
(async () => {
  for await (const v of asyncIterator) {
    console.log(v);
  }
})();

実行結果

1
2
3

行ごとにawait処理を行うサンプル

上記の機能が実装されたことで、行ごとのように一定windowずつstreamで非同期メソッドを実行する処理が非常に簡単にかけるようになりました。
下記は得られたstreamを、行ごとに非同期メソッドを実行する場合のサンプルになります。
可読性重視&行単位でeventループが回るため、パフォーマンスがシビアな場合は別途実装することをおすすめします。

readlineモジュールを利用した場合

line-reader1.js
const fs = require('fs');
const readline = require('readline');
const util = require('util');

const msleep = util.promisify(setTimeout);

const asyncLineReader = async (iterater) => {
  const rl = readline.createInterface({
    input: fs.createReadStream('input.txt', { encoding: 'utf8' }),
    crlfDelay: Infinity
  });

  for await (const line of rl) {
    await iterater(line);
  }
}

(async () => {
  await asyncLineReader(async (line) => {
    await msleep(100);
    console.log(line);
  });
})();

実行結果

terminal
abcde
fghij
klmno
pqrxy
z

解説

こちらはreadlineモジュールを利用したものになります。
以前も行ごとに処理を行えたのですが、非同期メソッドの実行はできませんでした。
v1.12からasyncIteratorに対応したことで、上記のように簡単に非同期メソッドが実行できるようになりました。

stream(for-await-of利用)

line-reader2.js
const fs = require('fs');
const util = require('util');

const msleep = util.promisify(setTimeout);

// streamを用いた場合の処理(for-await-of使用)
const asyncLineReader = async (iterater) => {
  const rs = fs.createReadStream('./input.txt', { encoding: 'utf8' });

  let buffer = '';
  for await (const chunk of rs) {
    buffer += chunk;
    const list = buffer.split('\n');
    // 最後の要素は改行が含まれているわけではないため、bufferに戻す
    buffer = list.pop();
    for (let i = 0; i < list.length; i++) {
      await iterater(list[i]);
    }
  }
  if (buffer) {
    // 終了時にbufferに残っている文字列もiteratorにわたす
    await iterater(buffer);
  }
}

(async () => {
  await asyncLineReader(async (line) => {
    await msleep(100);
    console.log(line);
  });
})();

実行結果

terminal
abcde
fghij
klmno
pqrxy
z

解説

こちらはstreamのfor-await-ofを利用したものになります。
イベントループの回数が他と比べて半分以下なので、このなかでは一番パフォーマンスが良いです。
実装を合わせるために1行ずつ処理していますが、こちらで複数行制御してlistをiteratorに渡すような実装が実利用だと良いかもしれません。
比較的簡単に可読性良く記述できるようになっているかと思います。

stream(for-await-of未使用)

line-reader2.js
const fs = require('fs');
const util = require('util');

const msleep = util.promisify(setTimeout);

// streamを用いた場合の処理(for-await-of未使用)
const asyncLineReader = async (iterator) => {
  const rs = fs.createReadStream('./input.txt', { encoding: 'utf8' });

  let buffer = '';
  let rows = [];
  rs.on('data', (chunk) => {
    buffer += chunk;
    const list = buffer.split('\n');
    buffer = list.pop();
    if (list.length) {
      rows.push(...list);
      rs.pause();
    }
  });

  let isEnd = false;
  rs.once('end', () => {
    isEnd = true;
  });
  let error;
  rs.once('error', (err) => {
    error = err;
  });

  while (true) {
    if (error) {
      // errorがあれば終了
      throw error;
    }

    if (rows.length) {
      for (let i = 0; i < rows.length; i++) {
        await iterator(rows[i]);
      }
      rows = [];
    } else if (isEnd) {
      if (buffer) {
        // 終了時にbufferに残っている文字列もiteratorにわたす
        await iterator(buffer);
      }
      return;
    } else if (rs.isPaused()) {
      rs.resume();
    }
    // 非同期メソッドがないと無限ループしてしまうため、setImmediate代わりに実行
    await msleep(0);
  }
}

(async () => {
  await asyncLineReader(async (line) => {
    await msleep(100);
    console.log(line);
  });
})();

実行結果

terminal
abcde
fghij
klmno
pqrxy
z

解説

こちらはstreamのfor-await-ofの未使用版になります。
かなり複雑になり可読性も落ちている事がわかります。
ただ、async, awaitが対応しているv8以降であれば利用可能なため、nodeのバージョン次第では利用できるかもしれません。

まとめ

これらの機能の追加により、async, awaitを用いたstream処理が非常に簡潔に記述できるようになりました。
v1.12LTSに上げることで非常にstream周りが記述しやすくなっているため、この機会にぜひ試してみてはどうでしょうか?

Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away