この記事は bouzuya's RxJS Advent Calendar 2015 の 13 日目かつ RxJS Advent Calendar 2015 の 13 日目です。
はじめに
今日は ReactiveX の Connectable Observable Operators について RxJS の API ドキュメントを読んだりサンプルコードを書いたりしていきます。
また RxJS 4.0.7 を対象にしています。
Connectable Observable
ConnectableObservable
Rx.ConnectableObservable は Cold / Hot 変換メソッドの要となる重要なクラスです。しかし、RxJS には ConnectableObservable のメソッドのドキュメントはありますが、 ConnectableObservable 自体のドキュメントはありません (なぜ!) 。
ConnectableObservable は Observable を継承したクラスです。指定された source を connect 呼び出しタイミングで subscribe します。後続には指定された subject を subscribe できるようにします。connect できる Observable なので ConnectableObservable です。
コンストラクタは ConnectableObservable(source, subject) です。まだ Subject については書いていないので、詳細は割愛します。
独自のメソッドとしては
ConnectableObservable.prototype.connectConnectableObservable.prototype.refCount
の 2 つを持ちます。各メソッドの詳細は後述します。
ConnectableObservable の生成は Observable.prototype.publish をはじめ多くのメソッドがあります。ですから Subject を明示する必要はほとんどありません。この記事内では次の 2 つのメソッドを紹介します。
Observable.prototype.publishObservable.prototype.replay
さて、見ていきましょう。
ConnectableObservable.prototype.connect / Observable.prototype.publish
- ReactiveX - Connect operator
- ReactiveX - Publish operator
ConnectableObservable.prototype.connectAPI DocumentConnectableObservable.prototype.connectSource CodeObservable.prototype.publishAPI DocumentObservable.prototype.publishSource Code
Observable.prototype.publish は ConnectableObservable のインスタンス (subject として Subject クラスを使用) を返します。ConnectableObservable.prototype.connect は wrap している source の subscribe を開始します。
いわゆる Cold / Hot 変換のメソッドです。Cold Observable / Hot Observable についてはまた別の機会に書きます。いままでの Operator とは挙動が大きく違うので注意してください。
import { Observable } from 'rx';
const connectableObservable = Observable
.from([1, 2, 3])
.do(value => console.log(`do: ${value}`))
.publish();
connectableObservable
.subscribe(
value => console.log(`observer1 onNext: ${value}`),
error => console.log(`observer1 onError: ${error}`),
() => console.log('observer1 onCompleted')
);
console.log('observer1 subscribed');
connectableObservable
.subscribe(
value => console.log(`observer2 onNext: ${value}`),
error => console.log(`observer2 onError: ${error}`),
() => console.log('observer2 onCompleted')
);
console.log('observer2 subscribed');
connectableObservable.connect();
// observer1 subscribed
// observer2 subscribed
// do: 1
// observer1 onNext: 1
// observer2 onNext: 1
// do: 2
// observer1 onNext: 2
// observer2 onNext: 2
// do: 3
// observer1 onNext: 3
// observer2 onNext: 3
// observer1 onCompleted
// observer2 onCompleted
この例だけを見ても何がしたいのか分かりづらいと思うので説明します。
まず注目すべきは connect が呼び出されるまでデータが流れていない点です。これまで紹介してきた Operator は subscribe の瞬間にデータが流れはじめていました。しかし、この例では違っていて、subscribe したあと connect を呼ぶまで流れません。connect の行をコメントにするとまったく値は流れません。出力は次のようになります。
observer1 subscribed
observer2 subscribed
次に注目すべきは do の呼び出されている回数です。元のソース ([1, 2, 3]) のデータの個数分 (3 回) しか呼び出されていません。これまで紹介してきた Operator であれば subscribe ごとに別のソース扱いになっていたため、元のソースのデータの個数 * Observer の個数 (3 * 2 の 6 回) 呼び出されていたでしょう。
上記の 2 点に注目しつつ、publish なしの例をご覧ください。
import { Observable } from 'rx';
const observable = Observable
.from([1, 2, 3])
.do(value => console.log(`do: ${value}`));
observable
.subscribe(
value => console.log(`observer1 onNext: ${value}`),
error => console.log(`observer1 onError: ${error}`),
() => console.log('observer1 onCompleted')
);
console.log('observer1 subscribed');
observable
.subscribe(
value => console.log(`observer2 onNext: ${value}`),
error => console.log(`observer2 onError: ${error}`),
() => console.log('observer2 onCompleted')
);
console.log('observer2 subscribed');
// do: 1
// observer1 onNext: 1
// do: 2
// observer1 onNext: 2
// do: 3
// observer1 onNext: 3
// observer1 onCompleted
// observer1 subscribed
// do: 1
// observer2 onNext: 1
// do: 2
// observer2 onNext: 2
// do: 3
// observer2 onNext: 3
// observer2 onCompleted
// observer2 subscribed
publish の挙動が分かると思います。おそらく複数の Observer に publish するという意味だと思います。イメージを「分岐」で説明されることも多いです。
+----> (observer1)
|
---->(publish)
|
+----> (observer2)
実装についてはまた別の機会に説明します。
Cold / Hot に関しては他の Qiita 記事 『RxのHotとColdについて - Qiita』を参照すると良いです。
ConnectableObservable.prototype.refCount
- ReactiveX - RefCount operator
ConnectableObservable.prototype.refCountAPI DocumentConnectableObservable.prototype.refCountSource Code
subscribe されるたびインクリメント dispose されるたびデクリメントして、参照カウント (refCount) し、最初の subscribe でソースに connect し、最後の dispose でソースを dispose します。
import { Observable } from 'rx';
const observable = Observable
.from([1, 2, 3])
.do(value => console.log(`do: ${value}`))
.publish() // returns ConnectableObservable
.refCount(); // call ConnectableObservable.prototype.refCount
observable
.subscribe(
value => console.log(`observer1 onNext: ${value}`),
error => console.log(`observer1 onError: ${error}`),
() => console.log('observer1 onCompleted')
);
console.log('observer1 subscribed');
observable
.subscribe(
value => console.log(`observer2 onNext: ${value}`),
error => console.log(`observer2 onError: ${error}`),
() => console.log('observer2 onCompleted')
);
console.log('observer2 subscribed');
// do: 1
// observer1 onNext: 1
// do: 2
// observer1 onNext: 2
// do: 3
// observer1 onNext: 3
// observer1 onCompleted
// observer1 subscribed
// observer2 onCompleted
// observer2 subscribed
また分かりづらい例なので説明します。
まず注目すべきは publish しているにも関わらず connect の呼び出しがない点です。代わりに refCount を呼び出しています。
前述のとおり publish は connect するまで後続に値を流しません。しかし refCount は最初の subscribe で自動的に connect します。そして今回のソースは同期的 (from([1, 2, 3])) です。
つまり、最初の subscribe で自動的に connect し同期的にすべての値を流しきってしまいます。ですから connect したタイミングで subscribe していない observer2 には値は流れません。onCompleted が流れておしまいです。
もうひとつ別の例を挙げます。ソースが非同期の場合の例を挙げておきましょう。
import { Observable } from 'rx';
const observable = Observable
.interval(100) // async
.take(3) // -0-1-2->
.do(value => console.log(`do: ${value}`))
.publish() // returns ConnectableObservable
.refCount(); // call ConnectableObservable.prototype.refCount
observable
.subscribe(
value => console.log(`observer1 onNext: ${value}`),
error => console.log(`observer1 onError: ${error}`),
() => console.log('observer1 onCompleted')
);
console.log('observer1 subscribed');
observable
.subscribe(
value => console.log(`observer2 onNext: ${value}`),
error => console.log(`observer2 onError: ${error}`),
() => console.log('observer2 onCompleted')
);
console.log('observer2 subscribed');
// observer1 subscribed
// observer2 subscribed
// do: 0
// observer1 onNext: 0
// observer2 onNext: 0
// do: 1
// observer1 onNext: 1
// observer2 onNext: 1
// do: 2
// observer1 onNext: 2
// observer2 onNext: 2
// observer1 onCompleted
// observer2 onCompleted
ソースが非同期であるため、observer1 の subscribe で connect が呼び出されているのですが、その時点では値は流れずに observer2 の subscribe のあとで値が流れています。
2 つの例から refCount の注意点をまとめます。
refCount は subscribe で自動 connect するので、connect 呼び出しのタイミングを考えずに済みますが、代わりに同期的なソースの場合には値を取りこぼす可能性が高いです (非同期でももちろん値を取りこぼす可能性はあります) 。
Observable.prototype.replay
- ReactiveX - Replay operator
Observable.prototype.replayAPI DocumentObservable.prototype.replaySource Code
Observable.prototype.replay は ConnectableObservable のインスタンス (subject として ReplaySubject クラスを使用) を返します。publish との違いは、publish が Subject を使うのに対し replay が ReplaySubject を使う点です。subscribe したタイミングで指定した個数の直近に流れたデータが再度流 (replay) されます。
import { Observable } from 'rx';
const observable = Observable
.from([1, 2, 3])
.do(value => console.log(`do: ${value}`))
.replay(null, 3) // null = selector, 3 = ReplaySubject buffer
.refCount();
observable
.subscribe(
value => console.log(`observer1 onNext: ${value}`),
error => console.log(`observer1 onError: ${error}`),
() => console.log('observer1 onCompleted')
);
console.log('observer1 subscribed');
observable
.subscribe(
value => console.log(`observer2 onNext: ${value}`),
error => console.log(`observer2 onError: ${error}`),
() => console.log('observer2 onCompleted')
);
console.log('observer2 subscribed');
// do: 1
// observer1 onNext: 1
// do: 2
// observer1 onNext: 2
// do: 3
// observer1 onNext: 3
// observer1 onCompleted
// observer1 subscribed
// observer2 onNext: 1
// do: 1
// observer2 onNext: 2
// do: 2
// observer2 onNext: 3
// do: 3
// observer2 onCompleted
// observer2 subscribed
上記の例は、refCount の最初の例とほとんど同じですが、publish ではなく replay(null, 2) を使用しています。replay(null, 3) は bufferSize が 3 の ReplaySubject です。直近 3 回分を replay します。
replay(null, 3) の null は selector です。実は publish() も selector を取ることができます。ただ、selector を指定した場合には ConnectableObservable を返しません。説明が難しくなるのでここでは割愛します。
おわりに
ReactiveX の Connectable Observable Operators を見ました。
ConnectableObservable とそのメソッド connect と refCount 。そして ConnectableObservable を生成するメソッド publish と replay です。
今回の Cold / Hot 変換メソッドは、挙動の理解に注意が必要なので、これらの Operator をまとめた share や shareReplay やもっと低レイヤーな multicast などを合わせて紹介することを控えました。ConnectableObservable の実装を確認する際に触れたいと思います。
昨日とは打って変わって難しい動きでした。