この記事は bouzuya's RxJS Advent Calendar 2015 の 17 日目かつ RxJS Advent Calendar 2015 の 17 日目です。
はじめに
今日は ConnectableObservable
を返す、いわゆる Cold / Hot 変換と呼ばれている Operator を見ていきます。
詳細は RxJS 4.0.7 を参照してください。
サンプルコードは以下のコマンドで試しています。
$ npm i rx babel-cli babel-preset-es2015
$ $(npm bin)/babel --presets es2015 index.js -o out.js && node out.js
おさらい (ConnectableObservable
と Subject
と publish
と replay
)
ConnectableObservable
& publish
& replay
ConnectableObservable
については 13 日目に書きました。
RxJS の Operators (11) - Connectable Observable Operators - Qiita
登場したのは以下の要素です。
Observable.prototype.publish
Observable.prototype.replay
ConnectableObservable
ConnectableObservable.prototype.connect
ConnectableObservable.prototype.refCount
publish
や replay
などで ConnectableObservable
を生成します。
publish
は new ConnectableObservable(this, new Subject())
です。
replay
は new ConnectableObservable(this, new ReplaySubject())
です。
ConnectableObservable
はコンストラクタで Observable
(source
) と Subject
(subject
) とを取ります。source
から subject
に流します。
ConnectableObservable.prototype.connect
は event を流しはじめるタイミングを制御します。多くの Operator で subscribe
を呼び出したタイミングで流しはじめてしまうのに対して、ConnectableObservable
を返す Operator は connect
を呼び出したタイミングで流しはじめます。
ConnectableObservable.prototype.refCount
は subscribe
と dispose
のカウントにあわせて、自動で connect
および dispose
してくれるように RefCountObservable
で wrap して返すものです。
Subject
Subject
については 16 日目に書きました。
登場したのは以下の要素です。
Subject
AsyncSubject
BehaviorSubject
ReplaySubject
各 Subject は特徴として「複数の Observer
」を持ち、それらに値を流すことができます。
Subject
は渡された event をそのまま observers
へ流します。
AsyncSubject
は完了時に最後の値を observers
へ流します。
BehaviorSubject
は直近の値を保持して subscribe
した際にも直近の値を流します。
ReplaySubject
は直近の N 個の値を保持して subscribe
した際にそれらの値を流します。
上記の記事を組み合わせたうえで publish
/ replay
を整理すると、
Observable.prototype.publish
は……
new ConnectableObservable(this, new Subject())
- 1 つの
Observable
で複数のObserver
を取ります -
connect
で値を流しはじめます -
subscribe
以前に流れた値は捨てられます
Observable.prototype.replay
は……
new ConnectableObservable(this, new ReplaySubject())
- 1 つの
Observable
で複数のObserver
を取ります -
connect
で値を流しはじめます -
subscribe
以前に流れた値もReplaySubject
のbufferSize
におさまっていれば流れます
他の ConnectableObservable を返す Operators
他の ConnectableObservable
を返す Operator を紹介していきます。
Observable.prototype.multicast
まず押さえるべきは Observable.prototype.multicast
です。
multicast
は引数に subject
を取ります。multicast
の実装は new ConnectableObservable(this, subject)
です。
前述の通り、最悪これだけあれば publish
も replay
も不要です。
ここまで書いていませんでしたが publish
は this.multicast(new Subject())
ですし、replay
は this.multicast(new ReplaySubject(...))
です。
Observable.prototype.share
次は Observable.prototype.share
です。
これは厳密には ConnectableObservable
を返すわけではないのですが、refCount
で返すので性質的には似たようなものになります。実装は new ConnectableObservable(this, new Subject()).refCount()
です。実際には this.publish().refCount()
です。refCount
を使用することで connect
を不要にしています。ConnectableObservable.prototype.refCount
についても既に説明しているので難しいことはないですね。
一覧表
ここまでの Operator と紹介しなかったものを表でまとめます。
Operator | 実装イメージ (引数は厳密ではない)
-------------+----------------------------------------------------------------
multicast | new ConnectableObservable(this, subject)
publish | new ConnectableObservable(this, Subject())
publishLast | new ConnectableObservable(this, AsyncSubject())
publishValue | new ConnectableObservable(this, BehaviorSubject())
replay | new ConnectableObservable(this, ReplaySubject())
share | new ConnectableObservable(this, Subject()).refCount()
-
| `new ConnectableObservable(this, AsyncSubject()).refCount()`
shareValue | new ConnectableObservable(this, BehaviorSubject()).refCount()
shareReplay | new ConnectableObservable(this, ReplaySubject()).refCount()
おわりに
今日は ConnectableObservable
や RefCountObservable
を返す、 Cold / Hot 変換と呼ばれている Operator について確認しました。
Operator の名前から動作を想像するよりも実装を見るほうが早いです。