概要
WHATWG Streams のことを何回かに分けて書く。Kyoto.js #11 の『 WHATWG Streams をためした 』という発表のために調べたものだ。発表だけで終わらせるのはもったいないので、再整理して共有する。
前回は『 WHATWG Streams の Lock - Qiita 』。今回は Back pressure のことを書く。
Back pressure (バックプレッシャー) とは?
- 背圧
- 流量制御
- 例: 書き込み速度よりはやく読み込まれ続けると詰まるので、読み込みを止める
WHATWG Streams におけるバックプレッシャー
- Promise で待たせる
- Source や Sink のメソッドで Promise を返して解決を待たせる
- QueuingStrategy で待たせる
- queue size で Source の
pull()
を待たせる
- queue size で Source の
Promise で待たせる
- Source / Sink の各メソッドは Promise を返しても良い
- Promise を返した場合は、それに続く処理は Promise の解決まで待つ
- 例:
pull()
が Promise を返した場合、次のpull()
は Promise の解決後に呼び出される
- 例:
let index = 0;
new ReadableStream({
pull(controller) {
index += 1;
controller.enqueue(index);
// pull() は Promise の解決を待って (50+ ms 待って) 呼ばれる
return new Promise((ok) => setTimeout(ok, 50));
}
})
.pipeTo(new WritableStream({
write(chunk) {
console.log(chunk);
// write() は Promise の解決を待って (100+ ms 待って) 呼ばれる
return new Promise((ok) => setTimeout(ok, 100));
}
}));
QueuingStrategy で待たせる
- ReadableStream の内部キューをどうためるかという戦略
-
get highWaterMark(): number
とsize(chunk: any): number
を持つ-
highWaterMark
プロパティ (HWM) は queue size の上限 -
size()
メソッドは chunk の size を計算する
-
- QueueingStrategy で指定された上限を超えると ReadableStream が Source の
pull()
を呼ばなくなる -
Source は ReadableStream の内部キューの余裕を無視して enqueue し続けることもできる
- Back pressure の考慮は Source の実装者の責任
class CountQueuingStrategy {
constructor(options: { highWaterMark: number; });
get highWaterMark(): number; // queue size の限界。
size(_chunk: any): number; // chunk size の計算。このクラスでは常に 1 。
}
class ByteLengthQueuingStrategy {
constructor(options: { highWaterMark: number; });
get highWaterMark(): number;
size(chunk: { byteLength: number; }): number;
}
// Source が queue の size を気にしながら enqueue する例
new ReadableStream({
start(controller) {
setTimeout(() => {
let index = 0;
while (controller.desiredSize > 0) { // queue に余裕がある限り enqueue
controller.enqueue(index++);
}
controller.close();
});
}
}, new CountQueuingStrategy({ highWaterMark: 10 })) // queue は 10 個まで
.pipeTo(new WritableStream(...));