この記事は bouzuya's RxJS Advent Calendar 2015 の 17 日目かつ RxJS Advent Calendar 2015 の 17 日目です。
はじめに
今日は ConnectableObservable を返す、いわゆる Cold / Hot 変換と呼ばれている Operator を見ていきます。
詳細は RxJS 4.0.7 を参照してください。
サンプルコードは以下のコマンドで試しています。
$ npm i rx babel-cli babel-preset-es2015
$ $(npm bin)/babel --presets es2015 index.js -o out.js && node out.js
おさらい (ConnectableObservable と Subject と publish と replay)
ConnectableObservable & publish & replay
ConnectableObservable については 13 日目に書きました。
RxJS の Operators (11) - Connectable Observable Operators - Qiita
登場したのは以下の要素です。
Observable.prototype.publishObservable.prototype.replayConnectableObservableConnectableObservable.prototype.connectConnectableObservable.prototype.refCount
publish や replay などで ConnectableObservable を生成します。
publish は new ConnectableObservable(this, new Subject()) です。
replay は new ConnectableObservable(this, new ReplaySubject()) です。
ConnectableObservable はコンストラクタで Observable (source) と Subject (subject) とを取ります。source から subject に流します。
ConnectableObservable.prototype.connect は event を流しはじめるタイミングを制御します。多くの Operator で subscribe を呼び出したタイミングで流しはじめてしまうのに対して、ConnectableObservable を返す Operator は connect を呼び出したタイミングで流しはじめます。
ConnectableObservable.prototype.refCount は subscribe と dispose のカウントにあわせて、自動で connect および dispose してくれるように RefCountObservable で wrap して返すものです。
Subject
Subject については 16 日目に書きました。
登場したのは以下の要素です。
SubjectAsyncSubjectBehaviorSubjectReplaySubject
各 Subject は特徴として「複数の Observer 」を持ち、それらに値を流すことができます。
Subject は渡された event をそのまま observers へ流します。
AsyncSubject は完了時に最後の値を observers へ流します。
BehaviorSubject は直近の値を保持して subscribe した際にも直近の値を流します。
ReplaySubject は直近の N 個の値を保持して subscribe した際にそれらの値を流します。
上記の記事を組み合わせたうえで publish / replay を整理すると、
Observable.prototype.publish は……
new ConnectableObservable(this, new Subject())- 1 つの
Observableで複数のObserverを取ります -
connectで値を流しはじめます -
subscribe以前に流れた値は捨てられます
Observable.prototype.replay は……
new ConnectableObservable(this, new ReplaySubject())- 1 つの
Observableで複数のObserverを取ります -
connectで値を流しはじめます -
subscribe以前に流れた値もReplaySubjectのbufferSizeにおさまっていれば流れます
他の ConnectableObservable を返す Operators
他の ConnectableObservable を返す Operator を紹介していきます。
Observable.prototype.multicast
まず押さえるべきは Observable.prototype.multicast です。
multicast は引数に subject を取ります。multicast の実装は new ConnectableObservable(this, subject) です。
前述の通り、最悪これだけあれば publish も replay も不要です。
ここまで書いていませんでしたが publish は this.multicast(new Subject()) ですし、replay は this.multicast(new ReplaySubject(...)) です。
Observable.prototype.share
次は Observable.prototype.share です。
これは厳密には ConnectableObservable を返すわけではないのですが、refCount で返すので性質的には似たようなものになります。実装は new ConnectableObservable(this, new Subject()).refCount() です。実際には this.publish().refCount() です。refCount を使用することで connect を不要にしています。ConnectableObservable.prototype.refCount についても既に説明しているので難しいことはないですね。
一覧表
ここまでの Operator と紹介しなかったものを表でまとめます。
Operator | 実装イメージ (引数は厳密ではない)
-------------+----------------------------------------------------------------
multicast | new ConnectableObservable(this, subject)
publish | new ConnectableObservable(this, Subject())
publishLast | new ConnectableObservable(this, AsyncSubject())
publishValue | new ConnectableObservable(this, BehaviorSubject())
replay | new ConnectableObservable(this, ReplaySubject())
share | new ConnectableObservable(this, Subject()).refCount()
-
| `new ConnectableObservable(this, AsyncSubject()).refCount()`
shareValue | new ConnectableObservable(this, BehaviorSubject()).refCount()
shareReplay | new ConnectableObservable(this, ReplaySubject()).refCount()
おわりに
今日は ConnectableObservable や RefCountObservable を返す、 Cold / Hot 変換と呼ばれている Operator について確認しました。
Operator の名前から動作を想像するよりも実装を見るほうが早いです。