xstream を開発する動機
xstream の開発者である Staltz さんは RxJS のコントリビューターでもあり、RxJS を使ったフレームワークである Cycle.js の作者です。Staltz さんは WHY WE BUILT XSTREAM の記事のなかで xstream を開発する動機を述べています。
ストリームとは何か、リアクティブプログラミングとは何かについては「あなたが求めていたリアクティブプログラミング入門」の記事が参考になります。「Everything is stream」(すべてがストリーム) がマントラです。
xstream の位置づけは RxJS を補うものであり、一般的なリアクティブプログラミングの用途では RxJS のほうがすぐれているとのことです。
xstream のオペレーターの数は RxJS と比べて少なく、初心者にわかりやすい直感的な名前にしたとのことです、RxJS と比べてファイルサイズがとても小さくなります。
RxJS の Observable (ストリーム) がホット (hot) もしくはコールド (cold) の2種類にわかれるのに対して、xstream のストリームはホットに統一され、デバッグの際にストリームがどちらのモードなのか悩まなくてすみます (以前、ホットをコールドと誤記していました。podhmo さんありがとうございます)。
RxJS のマニュアルによれば、Cold Observable は動画、Hot Observable はライブパフォーマンスに例えられます。動画を再生する場合、動画の再生は見る人から独立していて、すべての効果を見ることができるのに対して、ライブパフォーマンスの場合、複数の観客に共有され、会場に到達することが遅れれば、内容の一部を見逃してしまいます。
一般的には、xsteam が適している状況は、アプリケーションでの subscribe() の呼び出し回数がとても少なく、ロジックの大半がオペレーターのチェーンのなかにある場合とのことです。逆に RxJS のほうが適している状況は、複数の Observable の subscribe()
をたくさん呼び出す場合や、手続き型、オブジェクト指向のプログラミングや多様な設定が混在する場合とのことです。
セットアップ
xstream
npm でパッケージを導入します。
npm install --save xstream
babel-node
ターミナルで ES6 (ES2015) のコードを試す場合、babel-node
が便利です。
npm install -i babel-cli
プリセットも導入する必要があります。@
の後ろのバージョンは Node.js のバージョン番号です。
npm install --save-dev babel-preset-es2015-node@6
スクリプトを実行する際にプリセットを指定する方法は次のとおりです。
babel-node client.js --presets es2015-node
プレセットは package.json
で指定することができます。
"babel": {
"presets": ["es2015-node"]
}
練習
Hello World
Hello World の文字列からストリームをつくってみましょう。of
メソッドを使います。ストリームの個々の値を表示するには addListener
メソッドを使います。
import xs from 'xstream';
let stream = xs.of('Hello World');
stream.addListener({
next: i => console.log(i),
error: err => console.error(err),
complete: () => console.log('completed'),
});
ターミナルからスクリプトを実行してみましょう。
> babel-node client.js
Hello World
completed
複数の値からストリームをつくる
今度は複数の値からストリームをつくってみましょう。同じく of
メソッドを使います。
import xs from 'xstream';
let stream = xs.of(1,2,3);
stream.addListener({
next: i => console.log(i),
error: err => console.error(err),
complete: () => console.log('completed'),
});
ターミナルからスクリプトを実行してみましょう。順番に表示されます。
> babel-node client.js
1
2
3
completed
1秒ごとに値を表示する
import xs from 'xstream';
let stream = xs.periodic(1000).take(5);
stream.addListener({
next: i => console.log(i),
error: err => console.error(err),
complete: () => console.log('completed'),
});
実行結果です。
> babel-node client.js
0
1
2
3
4
completed
ストリームの生成を次のように書いても同じ結果になる。
let stream = xs
.periodic(1000)
.endWhen(xs.periodic(6000)
.take(1));
その他
RxJS の練習記録を別の記事で投稿しました。
xstream の基本的な4つの概念
マニュアルの説明を翻訳しました。
Stream
Stream は複数の Listener をもつイベントエミッターです。Stream でイベントが起きたとき、すべての Listerner に同時に広められます。
Stream は map
、filter
、fold
、take
などのオペレーターと呼ばれるメソッドをもっています。呼び出されたとき、オペレーターは別の Stream をつくり、返します。戻り値の Stream は実際にはソース Stream の Listener です (Stream が Listener になる可能性についてお伝えしていませんでした)。いったんソース Stream がイベントをブロードキャストすると、イベントはオペレーターのロジックを通過し、戻り値の Stream は ソース Stream をもとに独自のイベントをブロードキャストすることができます。
shamefullySend メソッドを使うことで、Stream 上のイベントを発動させることもできます。しかしそのようなことはしたくないでしょう。実際、リアクティブな方法ではないからそれを避けると、このライブラリの要点を見逃すことになります。
Listener
Listener は3つの関数: next
、 error
、と complete
が加えられたオブジェクトです。Stream がエミットできるそれぞれのイベントの種類に対して1つの関数があります。
-
next
イベントは典型的なタイプで、値を配信します。 -
error
イベントは Stream の実行を中断 (停止)させます。Stream のなかで何がおかしいときに起きます (もしくはオペレーターのチェーンのなかでの上流のどこか)。 -
complete
イベントは Stream の実行の穏やかな停止を発します。
典型的な Listener の例です。
var listener = {
next: (value) => {
console.log('The Stream gave me a value: ', value);
},
error: (err) => {
console.error('The Stream gave me an error: ', err);
},
complete: () => {
console.log('The Stream told me it is done.');
},
}
Stream に Listener をアタッチする方法は次のようになります。
stream.addListener(listener)
Listener の役目が終わったと思ったら、Stream から取り除くことができます。
stream.removeListener(listener)
Producer
Producer は Stream でブロードキャストされるイベントを生み出すマシンのようなものです。
Stream からのイベントは何らかの場所から来なければなりません。Producer を必要とする理由はそういうわけです。これは2つの関数が添付されたオブジェクトです: start(listener) と stop()。Listener で呼び出すことを始めると、Producer はイベントを生成することを始め、Listener にそれらを送信します。stop()
を呼び出すとき、Producer は自分の活動を辞めます。
Stream は Listener なので、start(stream)
に Stream を Listener として提供すれば、必然的に Producer は Stream にブロードキャストされるイベントを生成します。すばらしいでしょう?一連の Listener は Stream に付属させることが加納で、これらは Producer からもともとはやってきたイベントをすべて手に入れます。それが xs.create(producer)
が新しい Stream の中心になる Producer を受け取る理由です。確認してみましょう。
var producer = {
start: function (listener) {
this.id = setInterval(() => listener.next('yo'), 1000)
},
stop: function () {
clearInterval(this.id)
},
id: 0,
}
// 次のイベントで1秒ごとに 'yo' を配信します
var stream = xs.create(producer)
Producer は1つの Listener だけをもつのに対して、Stream は複数の Listener をもつ可能性があることを覚えておいてください。
「start
と stop
が呼び出されるときはいつなのか」ということに疑問をもつかもしれません。それは実際にかなりトリッキーなトピックなので、すぐその話に戻りましょう。MemoryStream について説明させてください。
MemoryStream
MemoryStream は Stream のようなものです。オペレーターをもち、Listener が付属し、shamefull にイベントを送信できるなどです。しかし、1つの特別なプロパティ、memory をもちます。このプロパティは Listener に送り出す直前のイベント (1つ) を記憶します。
なぜそれが便利なのか?次のイベントが送信された後に新しい Listener が追加されると、MemoryStream はその値を memory に保存し、新たに添付された Listener に送信します。
これは重要なことなので MemoryStream は一定の時間が経過した後でも、関連する値もしくは状態をあらわすことができます。これらを失いたくないのであれば、これらをキープし、もとのイベントがつくられた後で、後で到達する Listener に送るとよいでしょう。
Stream はどのように開始と停止するのか
Stream は Listener の数に応じて Producer をコントロールします。同期的な start とキャンセル可能な非同期的な stop で参照カウントを使います。これが Stream がどのように開始し停止するのかの基本です。通常、アプリケーションを構築する際に xstream のこの部分はあまり関係性がありませんが、デバッグもしくは好奇心から理解したい方のために、平易な説明をします。
xs.create(producer)
で Stream をつくるとき、Producer の start()
関数はまだ呼び出されていません。Stream は「アイドル」の状態です。Stream は Producer をもっていますが、Producer はオンになっていません。
最初の Listener が Stream に追加されると、付属するたくさんの数の Listener が 0 から 1 に突然変化します。これが Stream が start を呼び出すときです。 結局のところ、この Stream に関連する Listener が少なくとも1つ存在するからです。
将来においてより多くの Listener が追加されることがありますが、これらは Producer が動き続けるもしくは停止することには影響を与えません。最初の Listener は Stream がいつ始まるのかを指示するだけです。
Producer を停止するために重要なものは stream.removeListener
です。最後の Listener が去るとき (もしくは言い換えると、たくさんの数の Listener が 1 から 0 に突然変化するとき)、Stream は次のイベントループで起きる producer.stop()
のスケジュールを組みます。すなわち、非同期処理であるということです。しかしながら、そのスケジュールされた瞬間の前に新しい Listener が追加される場合 (数値が 0 から 1 になる)、producer.stop()
はキャンセルされ、Producer は Stream に対するイベントの生成を通常どおりに継続します。
Producer が (同期的に) 急に停止しない理由は Stream の Listener をスワップするが、進行形の実行を保つことが頻繁に必要だからです。
var listenerA = {/* ... */}
var listenerB = {/* ... */}
// 数値は 0 から 1 に変わり、Stream の Producer は開始します
stream.addListener(listenerA)
// ...
// 数値は 1 から 0 に移るが、
// listenerB が追加されたので、即座に 1 に戻る
stream.removeListener(listenerA)
stream.addListener(listenerB)
// Stream の Producer は停止しないが、すべては以前通りに継続する
無意味な計算を望まないのであれば、Stream 内部の Producer を (非同期的に) やがて停止させるために便利です。