1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【Node.js】検証と実践:Writable.write()の普通の使い方

Last updated at Posted at 2022-08-11

fs.Readablefs.Writableを使って画像ファイルをコピーするプログラムを作り

ストリームの基本的な使い方を検証、理解していきます。

利用するNode.jsのバージョンはv16.xです。

この記事は何?

次のような方向けになると思います。

  • Node.jsのstream APIを一通り目を通した人。
  • 具体的にwritable.write()を使った最低限レベルでのストリームを使った実装方法を知りたい人。

fs.writeFile()のような一旦ファイルの内容をすべてメモリを展開するメソッドを使わずに
メモリが節約できるストリームを使っていく方法を模索していきます。

ストリームの基本的な使い方といいつつ、
trasformduplexストリームについてはこの記事で扱いません。
ReadableWritableのみ扱います。

この記事の流れですが、
まず誤解したままでざっくり実装してみて、その後間違っている部分を公式の説明をみたりして解消していき、改善したコードに作り直していきます。

なげーです。

実践

画像ファイル./dist/in/cat.pngから./dist/out/cat.pngへコピーしたファイルを生成します。
Node.jsのstream, FileSystem APIを一通り読んだ人がとにかく書いてみたプログラムになります。

大いに誤解と間違いを含んでいるコードになります。

このコードが期待した通りに動かないことを確認し、
その原因解明をこの記事でまとめていき、最終的に正しい利用方法で改善されたコードに直します。

コードは次の思い込みで作られています。

  • WritableストリームはautoClose:trueで作成されたので読み取るデータがなくなったら勝手にWritableは閉じられる
  • Readableストリームは読み取るデータがなくなったら勝手にReadableを閉じる
  • 各ストリームはhighWaterMarkを1024byteで指定しているので毎度ストリームは1024byte読み取って1024byte書き込む
  • 各ストリームの各イベントハンドラはひとまず追加しているだけでどうすればいいのかわかっていない
  • Readabledataイベントで取得したchunkはそのままwritable.write()へわたしていい
  • Readableはflowingモードで運用されるからreadable.readableFlowingはtrueになるはず

画像ファイルはMicrosoftのDirectXのチュートリアルページからダウンロードした画像で約15kbサイズになります。

import * as stream from 'node:stream';
import * as fs from 'node:fs';
import * as path from 'node:path';
import * as crypto from 'node:crypto';

const outPath = path.join(__dirname, "out");
const inPath = path.join(__dirname, "in");

// ランダムな文字列を生成する
const randomString = (upto: number): string => {
    // ...
    return randomCharactors;
}


// 指定のパスにディレクトリは存在するのか確認する関数
const isDirExist = (path: string): boolean => {
    // ...
    return existOrNotExistBoolean;
}

// fs.Readableを生成する
const createRfs = (): fs.ReadStream => {
    if(!isDirExist(inPath)) throw new Error(`The path: ${inPath} does not exist.`);

    return fs.createReadStream(
        path.join(inPath, "cat.png"), 
        {
            encoding: 'binary',     /* default: 'utf8' */
            autoClose: true,
            emitClose: true,
            highWaterMark: 1024     /* default: 64 * 1024 */
        }
    );
}

// fs.Writableを生成する
const createWfs = (): fs.WriteStream => {
    if(!isDirExist(outPath)) throw new Error(`The path: ${outPath} does not exist.`);

    return fs.createWriteStream(
        path.join(outPath, "cat" + randomString(5) + ".png"), 
        { 
            encoding: 'binary',     /* default: 'utf8' */
            autoClose: true,
            emitClose: true,
            highWaterMark: 1024     /* default: 64 * 1024 */
    });
}

(function() {
    const rfs: fs.ReadStream = createRfs();
    const wfs: fs.WriteStream = createWfs();

    // --- Readable Event Handlers ------------

    rfs.on('open', () => {
        console.log("readable stream has been opened");
    });

    rfs.on('ready', () => {
        console.log("readable stream is ready");
    });

    rfs.on('close', () => {
        console.log('readable stream has been closed');
    });

    rfs.on('data', (chunk: string | Buffer) => {
        console.log('data read!');
        console.log(`state: ${rfs.readableFlowing}`);
        console.log(`Received ${chunk.length} bytes of data.`);

        wfs.write(chunk, (e: Error | null | undefined) => {
            if(e) console.error(e.message);
            else console.log("Write data has been completed");
        })
    });

    rfs.on('end', () => {
        console.log('End read stream');
        console.log('There is no more data to be consumed from the stream');
    });

    rfs.on('resume', () => {
        console.log('There is no more data to be consumed from the stream');
    });

    rfs.on('error', (e: Error) => {
        console.error(e.message);
        rfs.resume();
    });

    rfs.on('pause', () => {
        console.log("readable paused");
    })

    // --- Writable Event Handlers -----

    wfs.on('close', () => {
        console.log("Writable stream closed");
    });

    wfs.on('drain', () => {
        console.log("Drained");
    });

    wfs.on('finish', () => {
        console.log("Finished");
    });

    wfs.on('pipe', () => {
        console.log("piped");
    });

    wfs.on('unpiped', () => {
        console.log("unpiped");
    })

    wfs.on('error', (e: Error) => {
        console.error(e.message);
        if(wfs.destroyed) wfs.destroy();
    });
})();

結果

readable stream has been opened
readable stream is ready

data read!
state: true
Received 1024 bytes of data.

data read!
state: true
Received 1024 bytes of data.
Write data has been completed

data read!
state: true
Received 1024 bytes of data.
Write data has been completed

data read!
state: true
Received 1024 bytes of data.
Write data has been completed

data read!
state: true
Received 1024 bytes of data.
Write data has been completed

data read!
state: true
Received 1024 bytes of data.
Write data has been completed

data read!
state: true
Received 1024 bytes of data.
Write data has been completed

data read!
state: true
Received 1024 bytes of data.
Write data has been completed

data read!
state: true
Received 1024 bytes of data.
Write data has been completed

data read!
state: true
Received 1024 bytes of data.
Write data has been completed

data read!
state: true
Received 1024 bytes of data.
Write data has been completed

data read!
state: true
Received 1024 bytes of data.
Write data has been completed

data read!
state: true
Received 1024 bytes of data.

data read!
state: true
Received 1024 bytes of data.
Write data has been completed

data read!
state: true
Received 905 bytes of data.

End read stream
There is no more data to be consumed from the stream

Write data has been completed
Write data has been completed

readable stream has been closed

Drained

Write data has been completed

clean exit - waiting for changes before restart

コピーファイルは画像ファイルとして開くことができました...しかし

期待通りじゃなかったこと:

  • Writableストリームが閉じられていない

    closeイベントはWritableにおいて発生していないことが確認できます。
    自動的に閉じると勘違いしているからなのですが。

  • drainイベントが一度しか起こらなかった

    highWaterMarkの閾値にまで内部バッファがたまったらdrainイベントまで書込みはできないと思っていました。
    しかし既にhighWaterMarkで1024byteを何度も受け取っているのに一度しかdrainが発生しないのはおかしいです。

期待通りだったこと:

  • ReadableはhighWaterMarkで指定した通りのサイズを読み取ってきた
  • Readableは読み取るデータがなくなったら自動的に閉じられた

ここ以降、実践でうまくいかなかった原因を追究してその学習内容をまとめ、
最終的にはAPIを正しく使用できているプログラムに改善します。

Writableストリームを適切に閉じる方法

結論:書き込みストリームの破棄は明示的に実行しなくてはならない

次のような勘違いをしていました。

Writableインスタンスを生成するときにコンストラクタにautoClose: true, emitClose: trueを渡したら、Writableは自動的に閉じてくれるはず」

これは当然ですが間違いでした。

このままではWritableが閉じられない理由は2つあって、

  • そもそもReadableが閉じられたことを自動的にWritableが知る仕組みはないから。
  • Writableにおいてerrorまたはcloseまたはfinishiイベントが発行されていないから。

公式を見てみる:

fs.createWriteStream()より

「error」または「finish」時に autoClose が true (デフォルトの動作) に設定されている場合、ファイル記述子は自動的に閉じられます。

デフォルトでは、ストリームは破棄された後に「close」イベントを発行します。この動作を変更するには、emitClose オプションを false に設定します。

fs.Writable::Event:'close'より

「close」イベントは、ストリームとその基になるリソース (ファイル記述子など) が閉じられたときに発行されます。このイベントは、これ以上イベントが発行されず、それ以上の計算が行われないことを示します。

つまりcloseイベントはWritableストリームの仕事が完全に終わったときに発行されるべき。

fs.Writable::Event'finish'より

「finish」イベントは、stream.end() メソッドが呼び出された後に発行され、すべてのデータが基盤となるシステムにフラッシュされます。

ということは、

実はWritableautoClose: trueにしても勝手に閉じるのではなくて、閉じるためにイベントを発行させなくてはならないのである。

autoClose: trueerrorfinishイベントが発行されたら書き込みストリームを閉じるよという意味で、
emitClose: trueは「ストリームが破棄されたときにcloseイベントを発行するよ」という意味で、

「書き込みストリームの破棄」は自動で行うよとは一言も言っていないのである。
なので書き込みストリームの破棄は明示的に実行しなくてはならない

そして書き込みストリームが自動的に閉じてもらうには、
autoClose: trueを前提にerrorfinishイベントを発行しなくてはならないのです。

Writableはいつ閉じればいいのか

Writableは明示的に閉じる処理を実行しない限り閉じられることはないことがわかりました。

(readble.pipe()を使えばその限りではないですが。)

では実際にいつ、どうやって閉じればいいのでしょうか。

そこは各ストリームの内部バッファについて考慮しなくてはなりません。

公式の説明から、ストリームは読み取り先からデータを吸い取ったら各ストリームが持つ内部バッファへ一旦保存することがわかります。

内部読み取りバッファーの合計サイズが、highWaterMark で指定されたしきい値に達すると、ストリームは、現在バッファーされているデータが消費されるまで、基になるリソースからのデータの読み取りを一時的に停止します

内部書き込みバッファの合計サイズが highWaterMark によって設定されたしきい値を下回っている間、writable.write() への呼び出しは true を返します。内部バッファーのサイズが highWaterMark に達するか超えると、false が返されます。

いずれのストリームもhighWaterMarkで指定した閾値まで内部バッファにデータがたまると一旦データの吸い取りを停止し、消費されるまで止まったままになる(またはすべき)ということが書いてあります。

この内部バッファにデータを残した状態でストリームが閉じられると、内部バッファデータはメモリに残り続けていずれガベージコレクションに登録されます。

ということで、

ストリームを閉じるには常に内部バッファが空であるかを考慮しなくてはなりません。

正常動作の後に閉じたいときは内部バッファは空っぽにしておくべきで、
エラーが起こった場合はこれ以上データを吸い取らないようにすぐに破棄すべきです。
その方法を次でまとめます。

writable.end()writable.destroy()の使い分け

Writableを閉じるには明示的に閉じる処理を呼び出すこと、内部バッファは空にしてから閉じることが正しい使い方であることを知りました。

では具体的に何を呼び出せばいいのかと、どこで呼び出せばいいのかが分かれば正しい実装が実現できそうです。
ここでは何を呼び出せばいいかをまとめます。
(Readableは読み取り先からもう読み取るものがないと自動的に閉じるので割愛しました。)

  1. Writableを破棄する前に内部バッファをフラッシュしたいときはwritable.end()を呼び出そう。

なぜならば、writable.end()ならばcloseイベントの前にfinishイベントを発行させることができるからである。

公式より:

  • closeイベントが発行されるとこれ以降の書き込みは受け付けなくなる。

  • finishイベントが発行されると内部バッファのデータがすべて書き込まれる(フラッシュされる)。

  • もしもWritableautoClose: trueで作成されてあったら、finishイベント時にWritableが破棄される(fs.createWritableStream()より)

つまり、

書き込みストリームを破棄する前に、内部バッファにあるデータを書き込み先に書き込んじゃいたいときにはfinishイベントを呼び出さない限り書き込む方法は失われるのである。

finishイベントを呼び出す前にcloseイベントが発行されると内部バッファにデータがあってもこれ以上の書き込みは受け付けなくなっているので、そのデータは行き先を失ってガベージコレクションに追加される。

たとえば、

内部バッファに残ったデータ量が大きいときにcloseイベントを発行してしまうと大きなメモリリークになりかねない。
イベント発行タイミングと内部バッファがちょうどクリアされているタイミングが一致するのはあんまり期待できない。

なので、
普段使うときはwritable.end(),finsih,closeのながれでWritableを閉じていくのが推奨の流れといえるでしょう。

  1. エラーが起こったけど内部バッファをフラッシュしたいときはwritable.end()か次のdrainイベントを待つ

やはり内部バッファを出し切るためにはfinishイベントが発行されなくてはならないので。

データを閉じる前にフラッシュする必要がある場合は、destroy の代わりに end() を使用するか、ストリームを破棄する前に「drain」イベントを待ちます。

この使い方は、errorイベントハンドラが発火したらコールバックでwritable.end()を呼び出すことになります。

次のdrainイベントを待つ場合、

drainイベントが発行されると「書き込み可能になった」わけなので、

データを吸い取るwriteble.write()が呼び出される前に直ちにwritable.end()を呼び出すことになります。

  1. ただちにWritableを破棄しなくてはならないならwritable.destoryを呼び出そう

なぜならば、writable.destroy()は呼び出されると直ちに書き込みストリームが破棄されるからである。

つまりこれ以上の書き込みがなされないので、内部バッファにデータが残ったままになります。

Writableはこのときに閉じます

Readableからもうデータがおくられてくることが最早ないならWritableを閉じればいい。

その適切なタイミングといえば、Readableendイベントが発行された時である。

「終了」イベントは、ストリームから消費されるデータがなくなると発行されます。 データが完全に消費されない限り、「終了」イベントは発行されません。

ということでReadableendイベントが発行されたら、

Readableで内部バッファにあるデータは完全に消費しきったというお墨付きということなので

endイベントハンドラ内部でwritable.end()を呼び出すのが適切となります。

まぁそうなるよねという結論ですが、

自信をもってこうであるというにはAPIドキュメントを何度も読み返して先のように順序だてる必要がありました。

ここまでで、

Writableストリームを適切に閉じる方法の解答を導き出すことができました。

drainイベントの適切な処理の仕方

先の実践のところでのコードではdrainイベントハンドラで何もしていませんでした。

しかしwritable.write()を使う以上drainイベントは適切に処理しなくてはなりません。

drainwritable.write()の仕組み

重要:writable.write()falseを返したらdrainイベントが発行されるまでデータ書き込みは停止せよ

参考:

stream.write()呼出がfalseを返したら、ストリームへのデータの書き込みを再開するのが適切なときに「drain」イベントが発行されます。

(読み取りストリームは)ひとた内部バッファに保存しているデータ量がhighWaterMarkで指定した閾値に到達したら、読取ストリームはデータが消費されるまで一時的にデータを読み取るのを停止する。

(書込みストリームは)writable.write()が継続的に呼び出されるとデータはWritableストリームへバッファされる。

(書込みストリームの)内部バッファがhighWaterMarkで指定した閾値へ保存データ量が到達まで、writable.write()はtrueを返し、到達したらfalseを返す。

(writable.write()は)内部バッファが、チャンクを受け入れた後にストリームが作成されたときに構成された highWaterMarkよりも小さい場合、戻り値はtrueです。

falseが返された場合、drainイベントが発行されるまで、ストリームへのデータ書き込みのさらなる試みは停止する必要があります。

ひとたびバッファへ保存されたchunkがドレインしたら(内部バッファに保存されたデータが書き込み可能になったら)drainイベントが発行されます。

writable.write()falseを返したら、drainイベントが発行されるまでchunkの書込みを停止するのが推奨されます。

ということで、まとめると...

  • highWaterMarkは内部バッファの「満タン」を(形式的に)定義してストリームを制御する

  • writable.write()falseを返したらdrainイベントが発行されるまでデータ書き込みは即座に停止しなくてはならない

  • drainイベントが発行されてからchunkの書き込みを再開せよ

ナルホド。

ではwrtiable.write()の戻り値を毎回チェックするとして、falseが返されたら実際にどう処理すればいいのか?

公式では次を示されている

書き込むデータをオンデマンドで生成またはフェッチできる場合は、ロジックを Readable にカプセル化し、stream.pipe() を使用することをお勧めします。

ただし、write() の呼び出しが優先される場合は、「drain」イベントを使用してバックプレッシャーを尊重し、メモリの問題を回避することができます。

つまり上でいうところの推奨はradable.pipe()を使えである。

const rfs: fs.Readable = createReadStream(/**/);
const wfs: fs.Writable = createWriteStream(/**/);
rfs.pipe(wfs);

なんでかというと、pipeは内部的にdrainを制御しているからです。

参考:


Readable.prototype.pipe = function(dest) {
  var src = this;
  src.on('data', (chunk) => {
    var ret = dest.write(chunk); // 読み込んだデータを dest に書き込む
    if (false === ret) { // highWaterMark に達していたら
      src.pause(); // 読み込み一時停止
    }
  });

  dest.on('drain', () => { // highWaterMark を下回る
    flow(src); // 読み込み再開
  });
};

つまりpipeを使えばdrainイベントに関する制御を丸投げできるのです。

それでもpipe()を使えない事情があるとか完全に使用メモリ量を制御したいなどの事情があるならば、

writable.write()を直接呼出す実装を自分で定義することになります。

さて、

この記事ではwritable.write()を適切に使う方法を追求するので、

pipe()がやってくれている内容を自分で実装しなくてはなりません。

つまり、writable.write(chunk)falseを返したらデータ読み取りを一時停止して、

次回drainイベントが起こったらデータ読み取りを再開する

そんな処理をどうやって実現すればいいでしょうか。

参考:

公式や参考のサイトではprocess.nextTick(), flow()みたいな関数を使われていた

writable.once('drain')みたいなイベントハンドラもあるみたい

しかし、readable.pause()readable.resume()を使えばいいのです。

Readableのモード切替方法

Readableには2つのモードがある

  • flowingモード:データの取得はシステムが自動的に行ってくれてデータはイベントハンドラで取得できる
  • pausedモード:stream.read()を明示的に呼び出してストリームのチャンクを取得する

この2つのモードを切り替える方法

flowingモードに切り替える方法:

  • dataイベントハンドラを追加する
  • stream.resume()を呼び出す
  • stream.pipe()Writableへデータを送信する

pausedモードに切り替える方法:

  • pipeの到達地点がないときに、stream.pause()を呼出したとき
  • pipeの到達地点があるときにstream.unpipe()を呼び出すと起こりうる

ということで、

明示的に両モードを切り替えられるメソッドがreadable.resume()readable.pause()であようです。

stream.resume()はflowingモードに切り替えて、stream.pause()はflowingモードから解除をします。

readable.pause() メソッドは、フロー モードのストリームに「データ」イベントの発行を停止させ、フロー モードから切り替えます。利用可能になったデータは内部バッファに残ります。

readable.resume() メソッドは、明示的に一時停止された Readable ストリームに「データ」イベントの発行を再開させ、ストリームをフロー モードに切り替えます。

今回の実践のところではReadabledataイベントハンドラをアタッチしているのでFlowingモードで動作させています。

なので、

writable.write(chunk)falseが帰ってきたらreadable.pause()ReadableをPausedモードにして、

drainイベントが発行されたらreadable.resume()で明示的にFlowingモードに戻せば、

そのままdataイベントハンドラが自動的にデータ取得を再開してくれる。

という実装をすればいいということになる。

実装:改善版

ここまでで学習して導き出した適切な実装方法を先のコードに適用してみます。

以下のプログラムは次の動作をするはずである:

highWaterMarkが1024で指定しているので

毎回読み取りストリームがhighWaterMarkマックスまでデータを読み取って

その都度writable.write()が内部バッファへ1024byte書き込み

こちらもhighWaterMarkへ到達するのでfalseを返すはずである。

そしたらreadable.pause()で一時停止してdrainイベントまで読み取りを停止する。

読み取りストリームがendイベントを発行したら、

書き込みストリームは明示的にwritable.end()するのでfinishイベントが発行され、

その時点の書き込みストリームの内部バッファがフラッシュされる(ファイルへ書き込まれる)

そうしたのち書き込みストリームは閉じられて、以降書き込みは許されなくなる。


import * as fs from 'node:fs';
import * as path from 'node:path';
import * as crypto from 'node:crypto';
import { readFileSync } from 'node:fs';

const outPath = path.join(__dirname, "out");
const inPath = path.join(__dirname, "in");

// ランダムな文字列を生成する
const randomString = (upto: number): string => {
    // ...
    return randomCharactors;
}


// 指定のパスにディレクトリは存在するのか確認する関数
const isDirExist = (path: string): boolean => {
    // ...
    return existOrNotExistBoolean;
}

const createRfs = (): fs.ReadStream => {
    if(!isDirExist(inPath)) throw new Error(`The path: ${inPath} does not exist.`);

    return fs.createReadStream(
        path.join(inPath, "cat.png"), 
        {
            encoding: 'binary',     /* default: 'utf8' */
            autoClose: true,
            emitClose: true,
            highWaterMark: 1024     /* default: 64 * 1024 */
        }
    );
}


const createWfs = (): fs.WriteStream => {
    if(!isDirExist(outPath)) throw new Error(`The path: ${outPath} does not exist.`);

    return fs.createWriteStream(
        path.join(outPath, "cat" + randomString(4) + ".png"), 
        { 
            encoding: 'binary',     /* default: 'utf8' */
            autoClose: true,
            emitClose: true,
            highWaterMark: 1024     /* default: 64 * 1024 */
    });
}



(function() {
    const rfs: fs.ReadStream = createRfs();
    const wfs: fs.WriteStream = createWfs();

    // Writableへ書き込み可能ならtrue。そうでないならfalse。
    let draining: boolean = true;

    rfs.on('data', (chunk) => {
        console.log(`Readable read ${chunk.length} byte of data`)
        draining = wfs.write(chunk, (e: Error | null | undefined) => {
            if(e) {
                // ここでエラーが起こったら`error`イベント前にこの
                // コールバックが実行される
                // で、`error`イベントが発行される
                console.error(e.message);
            }
        });
        // chunkを書き込んだ後のwriteの戻り値がfalseなら
        // 読取ストリームはすぐに停止する
        if(!draining) {
            console.log('Paused Readable because of reaching highWaterMark');
            rfs.pause();
        }
    });

    // `drain`イベントは書込みが再開できるときに発行される
    wfs.on('drain', () => {
        console.log('Drained and resume Readable again.');
        // drainイベントが発行されたら読取ストリームの読取を再開する
        draining = true;
        rfs.resume();
    });

    wfs.on('end', () => {
        console.log('End Writable');
    });

    wfs.on('finish', () => {
        console.log('Finished');
    });

    wfs.on('close', () => {
        console.log('Writable closed');
    });

    wfs.on('error', (e: Error) => {
        if(!rfs.destroyed) rfs.destroy(e);
        if(!wfs.destroyed) wfs.destroy();
    });

    rfs.on('end', () => {
        console.log('there is no more data to be consumed from Readable');
        // Readableでendイベントが発行されたら、完全に内部バッファにデータがない証拠なので
        // ここでWritableを閉じる
        wfs.end();
    })

    rfs.on('error', (e: Error) => {
        if(!rfs.destroyed) rfs.destroy();
        if(!wfs.destroyed) wfs.destroy(e);
    });
})();

結果

[nodemon] starting `node ./dist/index.js`

# highWaterMarkで指定したデータ量いっぱいまでデータを読み取って
Readable read 1024 byte of data
# そのデータをそのままWritableへ渡す。
# WritableもhighWaterMarkいっぱいまでデータ取得したので
# writable.write()がfalseを返し、readable.pause()で一時停止
Paused Readable because of reaching highWaterMark
# 書き込み再開できるようになったのでdrainイベントが発行され
# readable.resume()して再開。
Drained and resume Readable again.

# 以降同じ展開が続く...

Readable read 1024 byte of data
Paused Readable because of reaching highWaterMark
Drained and resume Readable again.
Readable read 1024 byte of data
Paused Readable because of reaching highWaterMark
Drained and resume Readable again.
Readable read 1024 byte of data
Paused Readable because of reaching highWaterMark
Drained and resume Readable again.
Readable read 1024 byte of data
Paused Readable because of reaching highWaterMark
Drained and resume Readable again.
Readable read 1024 byte of data
Paused Readable because of reaching highWaterMark
Drained and resume Readable again.
Readable read 1024 byte of data
Paused Readable because of reaching highWaterMark
Drained and resume Readable again.
Readable read 1024 byte of data
Paused Readable because of reaching highWaterMark
Drained and resume Readable again.
Readable read 1024 byte of data
Paused Readable because of reaching highWaterMark
Drained and resume Readable again.
Readable read 1024 byte of data
Paused Readable because of reaching highWaterMark
Drained and resume Readable again.
Readable read 1024 byte of data
Paused Readable because of reaching highWaterMark
Drained and resume Readable again.
Readable read 1024 byte of data
Paused Readable because of reaching highWaterMark
Drained and resume Readable again.
Readable read 1024 byte of data
Paused Readable because of reaching highWaterMark
Drained and resume Readable again.
Readable read 1024 byte of data
Paused Readable because of reaching highWaterMark
Drained and resume Readable again.
# 最後に読み取ったデータはhighWaterMark未満なので
# そのまま流れていった
Readable read 905 byte of data
# 最後のデータを読み取ってReadableでendイベントが発行されて
# Writable.end()が実行される。
there is no more data to be consumed from Readable
# Writableでfinishイベントが発行されて内部バッファがフラッシュされて
Finished
# 自動的にWritableは閉じられた。
Writable closed
[nodemon] clean exit - waiting for changes before restart

出力先のディレクトリに正しく画像ファイルのコピーが生成されているのを確認できました。

読み込み・書き込み両ストリームは閉じられているし、drainイベントと読み取りストリームの一時停止と再開が正常に動いているのも確認できます。

完全に期待通りに動作しました。

先の実践コードから改善ができたと思います。

【余談】APIスタイルは一つだけにして複数のAPIを使わないこと

開発者はデータ消費の為に一つの方法だけを選択し、一つのストリームに対して複数の方法をデータ消費のために使ってはならない。

特に、on('data'),on('readable'), pipe()を併用

してはならないそうです。

例えば、

pipe()を使っている最中にdataイベントハンドラを追加してはならないということです。

そう考えると、pipe()を使っているときはdataイベントは監視できないといえます。

まぁpipe()を使うなら監視する必要もないでしょうが。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?