ブラウザやDeno、Node.js、Cloudflare Workersで実装されているWeb Streamは便利で性能もいいのですが、まだ新しいAPIということで最小限の機能しか存在しません。
そのため、Web Streamの仕様策定を行っているwhatwg/streamsでは、ストリーミング処理を簡潔に読み書きするための新しいメソッドの導入を進めています。
将来的に、以下で紹介するような新しいメソッドが導入されると、シンプルにストリーミング処理が書けるようになります。
ReadableStream.prototype[Symbol.asyncIterator]
このメソッドはReadableStreamを非同期イテラブルにするものです。
この機能の便利さは導入前後の記述量を比較してもらえれば一目瞭然だと思います。
const reader = readableStream.getReader();
while (true) {
const { done, value } = reader.read();
if (done) break;
console.log(value)
}
for await (const chunk of readableStream) { // 非同期イテレーターなのでfor-await-of文で読める
console.log(chunk);
}
コードからわかるように、ReadableStreamをfor-await-of
文に突っ込むと細切れのデータを読み出すことができます。
非同期イテレーターを配列に変換する関数であるArray.fromAsync提案(Stage 2)と組み合わせると、簡単に配列に変換できて便利そうです。
const { body } = await fetch("https://qiita.com/");
const readableStream = body.pipeThrough(new TextDecoderStream())
const chunkList = await Array.fromAsync(readableStream);
console.log(chunkList)
そもそも非同期イテレーターは
- Promiseベース
- 一度だけ読み出せる
- 遅延評価
という点でWeb Streamと非常に相性がいいものです。これらを相互変換することで処理が書きやすくなるのは間違いないですね。
ステータス(2022.5)
- 仕様に入っています。 https://streams.spec.whatwg.org/#rs-asynciterator
- Denoで実装されています https://doc.deno.land/deno/stable/~/ReadableStream#[Symbol.asyncIterator]
- Node.jsで実装されています https://nodejs.org/api/webstreams.html#async-iteration
- ブラウザでは実装されていません https://github.com/whatwg/streams/issues/778#issuecomment-461341033
ブラウザの対応状況が悪いのはWeb IDLの非同期イテレーターへの対応を待っている?ようです。
ReadableStream.prototype.values()
これは実はReadableStream.prototype[Symbol.asyncIterator]
と同じことをするのですが、iterator-helpers提案(Stage 2)との組み合わせで効果を発揮します。
readableStream
.values()
.map(chunk => chunk ** 2) // すべてのchunkを2倍する
.forEach(chunk => console.log(chunk))
例では.values()
で非同期イテレーターに変換し、それをiterator-helpers提案の.map()
で変換しています。
iterator-helpersには.map()
以外にも.filter()
、.take()
、.drop()
、.find()
などの様々な便利メソッドが含まれており、メソッドチェーンでデータを加工していくことができます。
しかも非同期イテレーターなので遅延評価され、メモリの使用量を抑えることができます。
ステータス(2022.5)
- 仕様に入っています。 https://streams.spec.whatwg.org/#rs-asynciterator
- Denoで実装されています https://doc.deno.land/deno/stable/~/ReadableStream が、ドキュメントに記載がありません。
- Node.jsで実装されています https://nodejs.org/api/webstreams.html#readablestreamvaluesoptions
- ブラウザでは実装されていません https://github.com/whatwg/streams/issues/778#issuecomment-461341033
ReadableStream.from(asyncIterable)
これまで紹介したのはReadableStreamを非同期イテレーターに変換する方法でしたが、この関数は逆で、非同期イテレーターをReadableStreamに変換します。
これはインスタンスのメソッドではないため、ReadableStream.from
のようにインスタンスを生成せずに使います。
const r1 = ReadableStream.from(function* () {
yield 0;
yield 1;
yield 2;
});
const r2 = ReadableStream.from([0, 1, 2]);
ReadableStream.from
が無い現状では、WritableStreamにデータを書き込みたいとき、writer.write(data)
のような低レベルのAPIを使用しなければなりません。
const data = ["a", "b", "c"] // 書き込みたいデータ
const writer = writableStream.getWriter();
for (const chunk of data) {
await writer.ready.then(() => writer.write(data));
}
// または
const data = ["a", "b", "c"] // 書き込みたいデータ
const readable = new ReadableStream({
start(controller) {
for (const chunk of data) {
controller.enqueue(chunk);
}
controller.close();
}
})
await readable.pipeTo(writableStream); // writableStreamへ書き込み
このように現状の書き方はfor文で愚直にwriteするかenqueueするしかなく、めんどくさいのですが、ReadableStream.from
を使うと短く書くことができます。
await ReadableStream.from(["a", "b", "c"]).pipeTo(writableStream); // writableStreamへ書き込み
実際書き込むときはUint8Arrayへの変換などの処理が挟まりますが、それも.pipeThrough(new TextEncoderStream())
のようにメソッドチェーンできます。
for文を使った手続き的な書き方をやめて、JSらしいメソッドチェーンを書けるのが非常にうれしいポイントです。
ステータス(2022.5)
- 仕様策定中です。 https://github.com/whatwg/streams/pull/1083
- 実装しているランタイムはありません。
- Deno標準ライブラリには同様のことを行うユーティリティー関数があります。 https://doc.deno.land/https://deno.land/std@0.140.0/streams/mod.ts/~/readableStreamFromIterable
まとめ
Web Streamに追加される新しい機能の中から、便利そうなものをピックアップして紹介しました。
- ReadableStream.prototype[Symbol.asyncIterator]:ReadableStreamを非同期イテラブルにする
- ReadableStream.prototype.values():ReadableStreamを非同期イテラブルにしてメソッドチェーンできる
- ReadableStream.from(asyncIterable):非同期イテラブルをReadableStreamにする
ただし、これらはまだブラウザに実装されていません。仕様書には定義されているのに実装されていないため使えないというのはなんとも歯がゆいですが、これらが実装される日を待ちましょう。
polyfill
上記の通り、これらのメソッドはまだブラウザに実装されていません。これらのメソッドがない状態でWeb Streamを読み書きしていくのは辛い感じがあるので、ポリフィルを入れるのがオススメです。
以下をコピペして使います。
ReadableStream.prototype[Symbol.asyncIterator]
// https://bugs.chromium.org/p/chromium/issues/detail?id=929585#c10
if (typeof ReadableStream.prototype[Symbol.asyncIterator] !== "function") {
Object.defineProperty(ReadableStream.prototype, Symbol.asyncIterator, {
async *value() {
const reader = this.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) return;
yield value;
}
} finally {
reader.releaseLock();
}
},
writable: true,
enumerable: false,
configurable: true,
});
}
ReadableStream.prototype.values()
// https://bugs.chromium.org/p/chromium/issues/detail?id=929585#c10
if (typeof ReadableStream.prototype.values !== "function") {
Object.defineProperty(ReadableStream.prototype, "values", {
async *value() {
const reader = this.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) return;
yield value;
}
} finally {
reader.releaseLock();
}
},
writable: true,
enumerable: false,
configurable: true,
});
}
ReadableStream.from(asyncIterable)
Deno標準ライブラリの実装
https://deno.land/std@0.140.0/streams/conversion.ts#L174