この記事は bouzuya's RxJS Advent Calendar 2015 の 13 日目かつ RxJS Advent Calendar 2015 の 13 日目です。
はじめに
今日は ReactiveX の Connectable Observable Operators について RxJS の API ドキュメントを読んだりサンプルコードを書いたりしていきます。
また RxJS 4.0.7 を対象にしています。
Connectable Observable
ConnectableObservable
Rx.ConnectableObservable
は Cold / Hot 変換メソッドの要となる重要なクラスです。しかし、RxJS には ConnectableObservable
のメソッドのドキュメントはありますが、 ConnectableObservable
自体のドキュメントはありません (なぜ!) 。
ConnectableObservable
は Observable
を継承したクラスです。指定された source
を connect
呼び出しタイミングで subscribe
します。後続には指定された subject
を subscribe
できるようにします。connect
できる Observable
なので ConnectableObservable
です。
コンストラクタは ConnectableObservable(source, subject)
です。まだ Subject
については書いていないので、詳細は割愛します。
独自のメソッドとしては
ConnectableObservable.prototype.connect
ConnectableObservable.prototype.refCount
の 2 つを持ちます。各メソッドの詳細は後述します。
ConnectableObservable
の生成は Observable.prototype.publish
をはじめ多くのメソッドがあります。ですから Subject
を明示する必要はほとんどありません。この記事内では次の 2 つのメソッドを紹介します。
Observable.prototype.publish
Observable.prototype.replay
さて、見ていきましょう。
ConnectableObservable.prototype.connect
/ Observable.prototype.publish
- ReactiveX - Connect operator
- ReactiveX - Publish operator
ConnectableObservable.prototype.connect
API DocumentConnectableObservable.prototype.connect
Source CodeObservable.prototype.publish
API DocumentObservable.prototype.publish
Source Code
Observable.prototype.publish
は ConnectableObservable
のインスタンス (subject
として Subject
クラスを使用) を返します。ConnectableObservable.prototype.connect
は wrap している source
の subscribe
を開始します。
いわゆる Cold / Hot 変換のメソッドです。Cold Observable / Hot Observable についてはまた別の機会に書きます。いままでの Operator とは挙動が大きく違うので注意してください。
import { Observable } from 'rx';
const connectableObservable = Observable
.from([1, 2, 3])
.do(value => console.log(`do: ${value}`))
.publish();
connectableObservable
.subscribe(
value => console.log(`observer1 onNext: ${value}`),
error => console.log(`observer1 onError: ${error}`),
() => console.log('observer1 onCompleted')
);
console.log('observer1 subscribed');
connectableObservable
.subscribe(
value => console.log(`observer2 onNext: ${value}`),
error => console.log(`observer2 onError: ${error}`),
() => console.log('observer2 onCompleted')
);
console.log('observer2 subscribed');
connectableObservable.connect();
// observer1 subscribed
// observer2 subscribed
// do: 1
// observer1 onNext: 1
// observer2 onNext: 1
// do: 2
// observer1 onNext: 2
// observer2 onNext: 2
// do: 3
// observer1 onNext: 3
// observer2 onNext: 3
// observer1 onCompleted
// observer2 onCompleted
この例だけを見ても何がしたいのか分かりづらいと思うので説明します。
まず注目すべきは connect
が呼び出されるまでデータが流れていない点です。これまで紹介してきた Operator は subscribe
の瞬間にデータが流れはじめていました。しかし、この例では違っていて、subscribe
したあと connect
を呼ぶまで流れません。connect
の行をコメントにするとまったく値は流れません。出力は次のようになります。
observer1 subscribed
observer2 subscribed
次に注目すべきは do
の呼び出されている回数です。元のソース ([1, 2, 3]
) のデータの個数分 (3 回) しか呼び出されていません。これまで紹介してきた Operator であれば subscribe
ごとに別のソース扱いになっていたため、元のソースのデータの個数 * Observer の個数 (3 * 2 の 6 回) 呼び出されていたでしょう。
上記の 2 点に注目しつつ、publish
なしの例をご覧ください。
import { Observable } from 'rx';
const observable = Observable
.from([1, 2, 3])
.do(value => console.log(`do: ${value}`));
observable
.subscribe(
value => console.log(`observer1 onNext: ${value}`),
error => console.log(`observer1 onError: ${error}`),
() => console.log('observer1 onCompleted')
);
console.log('observer1 subscribed');
observable
.subscribe(
value => console.log(`observer2 onNext: ${value}`),
error => console.log(`observer2 onError: ${error}`),
() => console.log('observer2 onCompleted')
);
console.log('observer2 subscribed');
// do: 1
// observer1 onNext: 1
// do: 2
// observer1 onNext: 2
// do: 3
// observer1 onNext: 3
// observer1 onCompleted
// observer1 subscribed
// do: 1
// observer2 onNext: 1
// do: 2
// observer2 onNext: 2
// do: 3
// observer2 onNext: 3
// observer2 onCompleted
// observer2 subscribed
publish
の挙動が分かると思います。おそらく複数の Observer
に publish
するという意味だと思います。イメージを「分岐」で説明されることも多いです。
+----> (observer1)
|
---->(publish)
|
+----> (observer2)
実装についてはまた別の機会に説明します。
Cold / Hot に関しては他の Qiita 記事 『RxのHotとColdについて - Qiita』を参照すると良いです。
ConnectableObservable.prototype.refCount
- ReactiveX - RefCount operator
ConnectableObservable.prototype.refCount
API DocumentConnectableObservable.prototype.refCount
Source Code
subscribe
されるたびインクリメント dispose
されるたびデクリメントして、参照カウント (refCount
) し、最初の subscribe
でソースに connect
し、最後の dispose
でソースを dispose
します。
import { Observable } from 'rx';
const observable = Observable
.from([1, 2, 3])
.do(value => console.log(`do: ${value}`))
.publish() // returns ConnectableObservable
.refCount(); // call ConnectableObservable.prototype.refCount
observable
.subscribe(
value => console.log(`observer1 onNext: ${value}`),
error => console.log(`observer1 onError: ${error}`),
() => console.log('observer1 onCompleted')
);
console.log('observer1 subscribed');
observable
.subscribe(
value => console.log(`observer2 onNext: ${value}`),
error => console.log(`observer2 onError: ${error}`),
() => console.log('observer2 onCompleted')
);
console.log('observer2 subscribed');
// do: 1
// observer1 onNext: 1
// do: 2
// observer1 onNext: 2
// do: 3
// observer1 onNext: 3
// observer1 onCompleted
// observer1 subscribed
// observer2 onCompleted
// observer2 subscribed
また分かりづらい例なので説明します。
まず注目すべきは publish
しているにも関わらず connect
の呼び出しがない点です。代わりに refCount
を呼び出しています。
前述のとおり publish
は connect
するまで後続に値を流しません。しかし refCount
は最初の subscribe
で自動的に connect
します。そして今回のソースは同期的 (from([1, 2, 3])
) です。
つまり、最初の subscribe
で自動的に connect
し同期的にすべての値を流しきってしまいます。ですから connect
したタイミングで subscribe
していない observer2
には値は流れません。onCompleted
が流れておしまいです。
もうひとつ別の例を挙げます。ソースが非同期の場合の例を挙げておきましょう。
import { Observable } from 'rx';
const observable = Observable
.interval(100) // async
.take(3) // -0-1-2->
.do(value => console.log(`do: ${value}`))
.publish() // returns ConnectableObservable
.refCount(); // call ConnectableObservable.prototype.refCount
observable
.subscribe(
value => console.log(`observer1 onNext: ${value}`),
error => console.log(`observer1 onError: ${error}`),
() => console.log('observer1 onCompleted')
);
console.log('observer1 subscribed');
observable
.subscribe(
value => console.log(`observer2 onNext: ${value}`),
error => console.log(`observer2 onError: ${error}`),
() => console.log('observer2 onCompleted')
);
console.log('observer2 subscribed');
// observer1 subscribed
// observer2 subscribed
// do: 0
// observer1 onNext: 0
// observer2 onNext: 0
// do: 1
// observer1 onNext: 1
// observer2 onNext: 1
// do: 2
// observer1 onNext: 2
// observer2 onNext: 2
// observer1 onCompleted
// observer2 onCompleted
ソースが非同期であるため、observer1
の subscribe
で connect
が呼び出されているのですが、その時点では値は流れずに observer2
の subscribe
のあとで値が流れています。
2 つの例から refCount
の注意点をまとめます。
refCount
は subscribe
で自動 connect
するので、connect
呼び出しのタイミングを考えずに済みますが、代わりに同期的なソースの場合には値を取りこぼす可能性が高いです (非同期でももちろん値を取りこぼす可能性はあります) 。
Observable.prototype.replay
- ReactiveX - Replay operator
Observable.prototype.replay
API DocumentObservable.prototype.replay
Source Code
Observable.prototype.replay
は ConnectableObservable
のインスタンス (subject
として ReplaySubject
クラスを使用) を返します。publish
との違いは、publish
が Subject
を使うのに対し replay
が ReplaySubject
を使う点です。subscribe
したタイミングで指定した個数の直近に流れたデータが再度流 (replay
) されます。
import { Observable } from 'rx';
const observable = Observable
.from([1, 2, 3])
.do(value => console.log(`do: ${value}`))
.replay(null, 3) // null = selector, 3 = ReplaySubject buffer
.refCount();
observable
.subscribe(
value => console.log(`observer1 onNext: ${value}`),
error => console.log(`observer1 onError: ${error}`),
() => console.log('observer1 onCompleted')
);
console.log('observer1 subscribed');
observable
.subscribe(
value => console.log(`observer2 onNext: ${value}`),
error => console.log(`observer2 onError: ${error}`),
() => console.log('observer2 onCompleted')
);
console.log('observer2 subscribed');
// do: 1
// observer1 onNext: 1
// do: 2
// observer1 onNext: 2
// do: 3
// observer1 onNext: 3
// observer1 onCompleted
// observer1 subscribed
// observer2 onNext: 1
// do: 1
// observer2 onNext: 2
// do: 2
// observer2 onNext: 3
// do: 3
// observer2 onCompleted
// observer2 subscribed
上記の例は、refCount
の最初の例とほとんど同じですが、publish
ではなく replay(null, 2)
を使用しています。replay(null, 3)
は bufferSize
が 3
の ReplaySubject
です。直近 3
回分を replay
します。
replay(null, 3)
の null
は selector
です。実は publish()
も selector
を取ることができます。ただ、selector
を指定した場合には ConnectableObservable
を返しません。説明が難しくなるのでここでは割愛します。
おわりに
ReactiveX の Connectable Observable Operators を見ました。
ConnectableObservable
とそのメソッド connect
と refCount
。そして ConnectableObservable
を生成するメソッド publish
と replay
です。
今回の Cold / Hot 変換メソッドは、挙動の理解に注意が必要なので、これらの Operator をまとめた share
や shareReplay
やもっと低レイヤーな multicast
などを合わせて紹介することを控えました。ConnectableObservable
の実装を確認する際に触れたいと思います。
昨日とは打って変わって難しい動きでした。