LoginSignup
2
5

More than 5 years have passed since last update.

WHATWG Streams の Back pressure

Posted at

概要

WHATWG Streams のことを何回かに分けて書く。Kyoto.js #11 の『 WHATWG Streams をためした 』という発表のために調べたものだ。発表だけで終わらせるのはもったいないので、再整理して共有する。

前回は『 WHATWG Streams の Lock - Qiita 』。今回は Back pressure のことを書く。

Back pressure (バックプレッシャー) とは?

  • 背圧
  • 流量制御
  • 例: 書き込み速度よりはやく読み込まれ続けると詰まるので、読み込みを止める

WHATWG Streams におけるバックプレッシャー

  • Promise で待たせる
    • Source や Sink のメソッドで Promise を返して解決を待たせる
  • QueuingStrategy で待たせる
    • queue size で Source の pull() を待たせる

Promise で待たせる

  • Source / Sink の各メソッドは Promise を返しても良い
  • Promise を返した場合は、それに続く処理は Promise の解決まで待つ
    • 例: pull() が Promise を返した場合、次の pull() は Promise の解決後に呼び出される
let index = 0;
new ReadableStream({
  pull(controller) {
    index += 1;
    controller.enqueue(index);
    // pull() は Promise の解決を待って (50+ ms 待って) 呼ばれる
    return new Promise((ok) => setTimeout(ok, 50));
  }
})
  .pipeTo(new WritableStream({
    write(chunk) {
      console.log(chunk);
      // write() は Promise の解決を待って (100+ ms 待って) 呼ばれる
      return new Promise((ok) => setTimeout(ok, 100));
    }
  }));

QueuingStrategy で待たせる

  • ReadableStream の内部キューをどうためるかという戦略
  • get highWaterMark(): numbersize(chunk: any): number を持つ
    • highWaterMark プロパティ (HWM) は queue size の上限
    • size() メソッドは chunk の size を計算する
  • QueueingStrategy で指定された上限を超えると ReadableStream が Source の pull() を呼ばなくなる
  • Source は ReadableStream の内部キューの余裕を無視して enqueue し続けることもできる
    • Back pressure の考慮は Source の実装者の責任
class CountQueuingStrategy {
  constructor(options: { highWaterMark: number; });
  get highWaterMark(): number; // queue size の限界。
  size(_chunk: any): number; // chunk size の計算。このクラスでは常に 1 。
}

class ByteLengthQueuingStrategy {
  constructor(options: { highWaterMark: number; });
  get highWaterMark(): number;
  size(chunk: { byteLength: number; }): number;
}
// Source が queue の size を気にしながら enqueue する例
new ReadableStream({
  start(controller) {
    setTimeout(() => {
      let index = 0;
      while (controller.desiredSize > 0) { // queue に余裕がある限り enqueue
        controller.enqueue(index++);
      }
      controller.close();
    });
  }
}, new CountQueuingStrategy({ highWaterMark: 10 })) // queue は 10 個まで
  .pipeTo(new WritableStream(...));

参考

2
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
5