Node.jsのstreamのReadableを非同期ジェネレーターに変換する方法を紹介します。
Node.jsでは、child_process.spawnのstdoutなど様々なI/OでReadableオブジェクトが返されます。Readableオブジェクトからデータを取得するには、コールバック関数を使ったイベントドリブンなAPIを使うのが一般的です。
let dataSize = 0;
readable
.on("data", (chunk) => { dataSize += chunk.length; })
.on("end", () => { console.log(dataSize); };
イベントドリブンな書き方だと読みづらかったりもするので、非同期ジェネレーター(AsyncGenerator)に変換して手続き型の書き方にすると、コードが馴染みのある読みやすい形になることがあります。
AsyncGeneratorを使った場合
let dataSize = 0;
for await (const chunk of toIterator(readable)) {
dataSize += chunk.length;
}
console.log(dataSize);
ReadableをAsyncGeneratorに変換する関数
次がReadableをAsyncGeneratorに変換する関数です。一からイテレータークラスを実装するなど、他にもいろいろなやり方があると思いますが、これが一番短く手っ取り早く書ける形だと思います。
import { type Readable } from "stream";
const toIterator = async function* (readable: Readable): AsyncGenerator<any> {
let resolve: (reason?: Error) => void;
let reject: (value: any) => void;
let promise: Promise<any> | undefined = new Promise((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});
readable
.on("data", (chunk) => {
resolve(chunk);
promise = new Promise((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});
})
.on("error", (err) => {
reject(err);
})
.on("end", () => {
promise = undefined;
});
while (promise) {
yield await promise;
}
};
使用例: 個プロセスの標準出力をループする
read.ts
import child_process from "child_process";
import { type Readable } from "stream";
const toIterator = /* ...上の実装... */;
const ping = child_process.spawn("ping", ["-c", "5", "127.0.0.1"], {
stdio: ["ignore", "pipe", "inherit"],
});
(async () => {
for await (const chunk of toIterator(ping.stdout)) {
console.log({ chunk: chunk.toString() });
}
})();