Streams 雑感
WHATWG Streams の仕様を読みました。
雑感です。
第1印象は**「常識に囚われてはいけないのですね!」**でした。
仕様は ES6 で実装してから書いてるようで、既に動く実装があります。
ES6 Classes を使って書かれており、主要なクラスはReadableStream
, ReadableStreamReader
, WritableStream
の3つ。
Promise
まず特筆すべき事は、 Streams はイベントを一切持たないという点です。
ワオ。
従来の JavaScript はイベント駆動です。DOM も XHR も WebSockets も EventSource もイベントを通知します。しかし、この新しい Streams はaddEventListener
を持っていません。代わりに、Promise
をイベントのように使っています。
これにはちゃんと利点があって、従来のイベントと異なり、既に発行されたイベントを後から受信できます。
例えばWritableStream
のready
は、自身の書込準備が整った時にFulfilled
になるPromise
を返すGetterプロパティです。
ws.ready
.then(() => ws.write("Hello!")) // write は完了したら Fulfilled になる Promise を返す
.then(() => ws.close()) // close は同上
.catch(err => console.error("Ouch! ", err.message));
もしこれがイベントだったら、準備完了時に1回通知されるだけです。
しかしPromise
なので、準備完了した後ならいつ.then
を呼んでも、コールバックが実行されます。
実装はコンストラクタで渡す
次に驚いたのが、抽象クラス的なものであるにもかかわらず、継承して使うことを想定していない事です。
各自で特化 Streams を定義する場合、その実装はコンストラクタで渡します。
次の例は、Server-Sent Events のメッセージを流すReadableStream
実装です (エラーハンドリングは省略)。
function createEventStream(url, options) {
const source = new EventSource(url, options);
return new ReadableStream({
start(enqueue, close, error) {
source.onmessage = e => enqueue(e.data);
}
close() {
source.close();
}
});
}
extends
なんて使わせるものか! て感じですね。
もちろん理由があって、ES6 Classesにはアクセス制御(private
, protected
)が存在しないので、こういった内部処理を継承で実装させる事には向いていないのです。
しかしながら、ES6 Object Literal Enhancements のおかげでスッキリした書き心地ですね。
読み出せるのは1人だけ
面白いことに、ReadableStream
はデータを読み出すメソッドを持っていません。
データ受信を知らせるイベントもPromise
もありません。
原則的にはWritableStream
にパイプして使うことになります。
rs.pipeTo(ws);
パイプ以外の方法でデータを読み出すには、ReadableStreamReader
を使います。
const reader = rs.getReader();
reader.read().then(chunk => ...);
何故そんなことになっているかというと、読み出しは1人だけしかできないようにするためです。
みんなで1つの Stream から読み出しを行おうとすると、流れるデータが分断されて、断片しか読み取ることができません。
これを防ぐために、パイプしたり、ReadableStreamReader
を生成したりすると Stream はロックされ、他の人が読むことはできなくなります。ReadableStreamReader
は読み取り権限を保管するためのオブジェクト、というわけですね。
他方で、WritableStream
はwrite()
メソッドを持っていて、誰でも書き込めます。
バッファ戦略
Strategy Pattern で各ストリームのバッファ戦略をカスタマイズできるようになっていました。
現在仕様に書かれているのはByteLengthQueuingStrategy
とCountQueuingStrategy
の2つです。
前者はバイナリ/文字列ストリーム用、後者はオブジェクトストリーム用、でしょうか?
もちろん、バッファ溢れ対策の backpressure の仕組みも忘れずに入っています。
TransformStream
Streams の中を流れるデータを変換するには、変換用の Streams を使います。
それは、TypeScript 風に書くと次のようなオブジェクトです。
{readable: ReadableStream, writable: WritableStream}
パイプすると writable
プロパティに対してデータが書き込まれ、readable
プロパティから読み出されるイメージですね。
現在は実験的な実装として、変換関数を与えてこのようなオブジェクトを生成する TransformStream
クラスが作られています。
まとめ
-
EventTarget
じゃない -
Promise
乱舞 - 継承しないで特化する
- 読み込めるのは1人だけ
- Backpressure あるよ
- まだまだ仕様策定中!
詳しいインターフェイスやサンプルコードは仕様をご覧下さい。
それでは最後に、Chrome 42 で Ship されちゃう Fetch API と、この Streams API が組み合わさるとこんな感じかな、というコードを掲載します。
fetch("http://example.com")
.then(response => {
return response.body
.pipeThrough(parseJSONTransform) // pipeThrough はチェインできる
.pipeThrough(yourMagicTransform)
.pipeTo(anyWritableStream); // pipeTo は、データをすべて送信したら Fulfilled になる Promise を返す。もちろんエラーがあったら Rejected になる。
})
// ここで catch すると、fetch 自体の失敗も Body 受信中のエラーもカバーできる
.catch(err => console.error("Oops!"));
最後の最後に、ES7 を見据えてみます。
try {
const response = await fetch("http://example.com");
await response.body
.pipeThrough(parseJSONTransform)
.pipeThrough(yourMagicTransform)
.pipeTo(anyWritableStream);
}
catch (err) {
console.error("Oops!");
}