Help us understand the problem. What is going on with this article?

さよならStream

More than 1 year has passed since last update.

この記事はNode.js Advent Calendar 2018の10日目の記事です。

Node.jsと非同期処理

Node.js の特徴は非同期処理です。

Node.js では非同期処理をうまく扱うためにいくつかのインターフェースを提供しています。

  1. Callback
  2. Promise (async/await)
  3. EventEmitter (Stream)

Callback は言わずもがな、JavaScript の世界で古くから存在する非同期を処理する方法です。JavaScript を書いていて Callback 関数をみたことがない、という人はいないでしょう。Callback は JavaScript を発展させてきた素晴らしいインターフェースですが、Callback Hell など問題も多く抱えています。そこで新しく Promise というインターフェースが開発されました。これは Callback Hell やエラーハンドリングなど、Callback が多く抱えている問題点を解消し JavaScript の非同期処理に革命を起こしました。さらに現代 JavaScript では Promise でも残っていたループ処理や分岐処理などの弱点を解消し、同期風な記述を可能とした async/await が導入され、JavaScript は非同期処理をさらに馴染みやすいものへ進化させました。

EventEmitter

Node.js ではそんな Callback に加えて、古くから EventEmitter というインターフェースを実装しています。これは、あるイベントが起きた時に起きる処理をあらかじめ記録しておく、というものです。ブラウザコードでいうと click event にハンドラーを追加するイメージのインターフェースです。

const EventEmitter = require('events');

class MyEmitter extends EventEmitter {}

const myEmitter = new MyEmitter();

// 発火したイベントの受け取り
myEmitter.on('myevent', () => {
  console.log('event 発火!');
});

// イベントの発火
myEmitter.emit('myevent');

Node.js 内のモジュールの多くは、この EventEmitter を下敷きに作られています。Node.js はシングルスレッドなので、設計上 IO などの停止処理はなるべく細切れにすることが重要です。そのような「処理を細かく、なんども分断する」という点で EventEmitter は優れています。

Stream

さらにその EventEmitter の特性を利用し、Event をつなぎ合わせたりデータのハンドリングや変換処理をしやすくしてくれるインターフェースが Stream です。
Node.js では下記4つの Stream が利用できます。

  • Writable - データの書き込みに利用する Stream(ex: fs.createWriteStream())
  • Readable - データを読み取りに利用する Stream(ex: fs.createReadStream())
  • Duplex - 書き込み/読み取りの両方に対応した Stream(ex: net.Socket)
  • Transform - Duplex を継承し読み書きしたデータを変換する Stream(ex: zlib.createDeflate())

http モジュールを例にとると、リクエストを受け取る処理は Readable Stream、リクエストを送る処理は Writable Stream で管理されています。このように連続するデータの流れを効率的に扱えるようにすることで、Node.js はシングルスレッドで多くの処理を同時に捌くことができます。

また、Streamはパイプしてつなげたり一度に流れるデータの容量を調整する highWaterMark を利用することでメモリを効率的に利用できるというメリットがあります。

しかし、Stream はいくつか難易度が高い点があります。

  • Promise におけるフロー制御に Stream を導入することが難しい。
  • 独自の Stream を作成するには Stream に精通する必要がある。
  • エラーハンドリングを忘れるとプロセスがクラッシュする。
  • pause が難しい。

上記の難易度からか Node.js のドキュメント上では「Stream の仕組みを知ることは重要だが、モジュールから利用するのを推奨する」と書かれています。
https://nodejs.org/api/stream.html#stream_stream

公式のドキュメントですらもStreamの有用性は認めつつ難易度が高いことを認めているのです。

対して async/await は同期的な見た目にできることで、利用のハードルが低くJavaScriptに明るくない技術者に対しても可読性を損ないづらいという点で優位にたっています。

多くの場合は async/await だけでなんとかなります。どうしてもパフォーマンスがきになる部分(例えばRedisのkey listを取得する処理など)にのみ Stream を導入するのが可読性を下げないポイントでしょう。

個人的にはフロー制御は以下の優先度で採用するのがよいと考えます。

  1. async/await (Promise)
  2. Stream
  3. Callback

このうち、async/await と Callback は util.promisify などを利用することで、組み合わせることが可能ですが、Stream は単体で利用するしかありません。

そこで導入されたものが AsyncIterator です。これは Experimental 扱いですが Node.js v10 でも動作します。

AsyncIterator の使い方

例えば以下のような要件のバッチ処理を考えてみます。

  1. ファイルを読み込む
  2. スリープする
  3. 続きを読み込む

メモリの少ないサーバで実行する時に効率的に実行したいというシチュエーションです。

素直に上から順番に実装していきましょう。

fs モジュールには createReadStream というファイルから ReadStream を生成する関数があります。これを利用して対象のファイルから読み込み用の Stream を生成します。(本当はreadFileなどを使うと1行読み込むのは楽ですが、今回はあえてcreaateReadStreamで説明しています。)

読み込みが終わるまでなんども data イベントが呼ばれます。この時注意しなければならないのが、data イベントは1行ごとに発火するわけではなく、 highWaterMark で指定されたバイト数読み込んだら発火するということです。例えば下記の例ではデフォルトの16KB読み込んだ時に data イベントが発火します。これくらいのコード量であれば一回で全て読み込んでしまいますが、highWaterMark の容量を下げるとより細かく読み込みます。highWaterMark を64に設定してみると僕の環境では6回 data イベントが発生しました。

const fs = require('fs');

const stream = fs.createReadStream(__filename, { encoding: 'utf8', highWaterMark: 64 * 1024 });

let counter = 0
stream.on('data', (chunk) => {
  counter++
  console.log(chunk);
});

stream.on('close', () => {
  console.log('close', counter);
});

stream.on('error', (e) => {
  console.log('error', e);
});

仕様の2は非同期の処理なので async/await でフロー制御をしたくなります。この程度であれば同期処理でも大した見た目にはならないと思いますが、この前後にAPIを叩くなどの処理が加わった場合、 async/await で書きたくなるきもちがわかると思います。ここで、単純に data イベントのハンドラを async function にすればいいんだ!とすると罠にハマります。

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
const fs = require('fs');

const stream = fs.createReadStream(__filename, { encoding: 'utf8', highWaterMark: 64 * 1024 });

let counter = 0;
stream.on('data', async (chunk) => {
  counter++
  await sleep(1000)
  console.log(chunk);
});

stream.on('close', () => {
  console.log('close', counter);
});

stream.on('error', (e) => {
  console.log('error', e);
});

data イベントのハンド内のみで async/await によるフロー制御をすることは可能ですが、data イベントそのものの処理を止めることはできないため、data イベントは post してる間にも連続して発生してしまいます。じゃあ pause すればいいか、としてみると内部的には htighWaterMark までは読み込み続けてしまうらしく意図通りには止まってくれません。なにより処理がぱっと理解しづらいです。

stream.on('data', (chunk) => {
  stream.pause();
  sleep(1000).then(() => stream.resume());
});

そこでついに AsyncIterator が登場です。上記のコードを AsyncIterator で書き直すと次のようになります。下記のコードを実際に実行してみると、少しずつコードが表示される様子が見て取れると思います。

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
const fs = require('fs');

async function main() {
  const stream = fs.createReadStream(__filename, { encoding: 'utf8', highWaterMark: 64 });

  let counter = 0;
  for await (const chunk of stream) {
    counter++;
    console.log(chunk);
    await sleep(1000);
  }

  console.log(counter);
}

main().then(() => console.log('done')).catch((e) => console.error(e));

この部分がキモのコードで、data イベントを for で待ち受けることができるようになります。async/await になることによって可読性もかなり上がりいいことづくめです。

for await (const chunk of stream) {
  counter++;
  console.log(chunk);
  await sleep(1000);
}

AsyncIterator のメリット・デメリット

AsyncIterator のメリットとデメリットをまとめると下記のような表にまとめられます。

メリット デメリット
async/await のフロー制御に組み込める Experimental である(まだテストやlintとの相性が悪い)
try/catch で包括的なエラーハンドリングができる
可読性が高い

まとめ

Stream は AsyncIterator のおかげで async/await のフロー制御に組み込みやすくなります。Stream は非常に有用なインターフェースですが難易度も高く、可読性の面でも async/await に優位性があります。直近では Stream を完全になくすことはできないですが、未来においては AsyncIterator を取り込むことでより綺麗な形で効率的なフロー制御が行えるようになります。読みやすく効率的ならば採用しない理由がありません。
難しいインターフェースから卒業してメンテナブルなコードを手にいれましょう!

おまけ

たまたま AsyncIterator を使わずにファイルを1行ずつ読み込んで async/await に組み込めむコードを書く必要があって今回の記事を思いつきました。はやく AsyncIterator も stable になってほしいですね。

ちなみにこのコードを書いた時点では ReadLine は AsyncIterator 対応がされていませんでした。ReadLine の内部実装はfsのreadStreamの継承だと思っていたので、AsyncIteratorで回せると思い込んでいました。

お、これは Node.js 本体へコミットチャンス、と思ったら同じことを考える人はたくさんいるもので、すでに対応されていました。#18904
このプルリクを参考にすると他の部分にコミットチャンスがあるかもしれないですね。

main.js
const axios = require('axios')
const { readLine } = require('./asyncread')

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms))

const main = async () => {
  const path = __filename

  const handler = async (line, counter) => {
    const param = JSON.parse(line)
    await axios.post(`https://api.com`, { param: param })
    await sleep(1000)
  }

  await readline(path, handler)
}

main().then(() => console.log('done')).catch((e) => console.error(e))
asyncread.js
const util = require('util')
const fs = require('fs')
const open = util.promisify(fs.open)
const read = util.promisify(fs.read)

exports.readLine = async (filepath, readLineHandler, options = {}) => {
  const encoding = options.encoding || 'utf8'
  const offset = options.hasOwnProperty('offset') ? options.offset : 0
  const fd = await open(filepath, 'r')
  const chunkSize = 64 * 1024
  const buffer = Buffer.alloc(chunkSize)

  let position = 0
  let line = ''

  let counter = 0
  while (true) {
    const res = await read(fd, buffer, 0, chunkSize, position)
    line += buffer.toString(encoding, 0, res.bytesRead)

    if (line.includes('\n')) {
      let splited = line.split('\n')
      line = splited[splited.length - 1]
      if (!/\n$/.test(line)) {
        splited = splited.slice(0, splited.length - 1)
      }

      for (const l of splited) {
        counter++
        if (counter <= offset) continue

        await readLineHandler(l, counter)
      }
    }

    if (res.bytesRead <= 0) {
      break
    }

    position += chunkSize
  }

  await readLineHandler(line, counter)

  await close(fd)
}
koh110
JavaScript系エンジニアです
http://koh.dev
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした