45
39

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Streams 仕様読んだ

Last updated at Posted at 2015-03-20

Streams 雑感

WHATWG Streams の仕様を読みました。
雑感です。

第1印象は**「常識に囚われてはいけないのですね!」**でした。

仕様は ES6 で実装してから書いてるようで、既に動く実装があります
ES6 Classes を使って書かれており、主要なクラスはReadableStream, ReadableStreamReader, WritableStreamの3つ。

Promise

まず特筆すべき事は、 Streams はイベントを一切持たないという点です。
ワオ。
従来の JavaScript はイベント駆動です。DOM も XHR も WebSockets も EventSource もイベントを通知します。しかし、この新しい Streams はaddEventListenerを持っていません。代わりに、Promiseをイベントのように使っています。
これにはちゃんと利点があって、従来のイベントと異なり、既に発行されたイベントを後から受信できます。

例えばWritableStreamreadyは、自身の書込準備が整った時にFulfilledになるPromiseを返すGetterプロパティです。

ws.ready
  .then(() => ws.write("Hello!")) // write は完了したら Fulfilled になる Promise を返す
  .then(() => ws.close())         // close は同上
  .catch(err => console.error("Ouch! ", err.message));

もしこれがイベントだったら、準備完了時に1回通知されるだけです。
しかしPromiseなので、準備完了した後ならいつ.thenを呼んでも、コールバックが実行されます。

実装はコンストラクタで渡す

次に驚いたのが、抽象クラス的なものであるにもかかわらず、継承して使うことを想定していない事です。
各自で特化 Streams を定義する場合、その実装はコンストラクタで渡します。

次の例は、Server-Sent Events のメッセージを流すReadableStream実装です (エラーハンドリングは省略)。

function createEventStream(url, options) {
  const source = new EventSource(url, options);

  return new ReadableStream({
    start(enqueue, close, error) {
      source.onmessage = e => enqueue(e.data);
    }
    close() {
      source.close();
    }
  });
}

extendsなんて使わせるものか! て感じですね。
もちろん理由があって、ES6 Classesにはアクセス制御(private, protected)が存在しないので、こういった内部処理を継承で実装させる事には向いていないのです。
しかしながら、ES6 Object Literal Enhancements のおかげでスッキリした書き心地ですね。

読み出せるのは1人だけ

面白いことに、ReadableStreamはデータを読み出すメソッドを持っていません。
データ受信を知らせるイベントもPromiseもありません。
原則的にはWritableStreamにパイプして使うことになります。

rs.pipeTo(ws);

パイプ以外の方法でデータを読み出すには、ReadableStreamReaderを使います。

const reader = rs.getReader();
reader.read().then(chunk => ...);

何故そんなことになっているかというと、読み出しは1人だけしかできないようにするためです。
みんなで1つの Stream から読み出しを行おうとすると、流れるデータが分断されて、断片しか読み取ることができません。
これを防ぐために、パイプしたり、ReadableStreamReaderを生成したりすると Stream はロックされ、他の人が読むことはできなくなります。ReadableStreamReaderは読み取り権限を保管するためのオブジェクト、というわけですね。

他方で、WritableStreamwrite()メソッドを持っていて、誰でも書き込めます。

バッファ戦略

Strategy Pattern で各ストリームのバッファ戦略をカスタマイズできるようになっていました。
現在仕様に書かれているのはByteLengthQueuingStrategyCountQueuingStrategyの2つです。
前者はバイナリ/文字列ストリーム用、後者はオブジェクトストリーム用、でしょうか?

もちろん、バッファ溢れ対策の backpressure の仕組みも忘れずに入っています。

TransformStream

Streams の中を流れるデータを変換するには、変換用の Streams を使います。
それは、TypeScript 風に書くと次のようなオブジェクトです。

{readable: ReadableStream, writable: WritableStream}

パイプすると writable プロパティに対してデータが書き込まれ、readable プロパティから読み出されるイメージですね。
現在は実験的な実装として、変換関数を与えてこのようなオブジェクトを生成する TransformStream クラスが作られています。

まとめ

  • EventTarget じゃない
  • Promise 乱舞
  • 継承しないで特化する
  • 読み込めるのは1人だけ
  • Backpressure あるよ
  • まだまだ仕様策定中!

詳しいインターフェイスやサンプルコードは仕様をご覧下さい。

それでは最後に、Chrome 42 で Ship されちゃう Fetch API と、この Streams API が組み合わさるとこんな感じかな、というコードを掲載します。

fetch("http://example.com")
  .then(response => {
    return response.body
      .pipeThrough(parseJSONTransform) // pipeThrough はチェインできる
      .pipeThrough(yourMagicTransform)
      .pipeTo(anyWritableStream); // pipeTo は、データをすべて送信したら Fulfilled になる Promise を返す。もちろんエラーがあったら Rejected になる。
  })
  // ここで catch すると、fetch 自体の失敗も Body 受信中のエラーもカバーできる
  .catch(err => console.error("Oops!"));

最後の最後に、ES7 を見据えてみます。

try {
  const response = await fetch("http://example.com");
  await response.body
    .pipeThrough(parseJSONTransform)
    .pipeThrough(yourMagicTransform)
    .pipeTo(anyWritableStream);
}
catch (err) {
  console.error("Oops!");
}
45
39
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
45
39

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?