ReactiveExtensions
RxJS
Rx
RxJSDay 17

RxJS における ConnectableObservable を返す (Cold / Hot 変換) Operators

More than 3 years have passed since last update.

この記事は bouzuya's RxJS Advent Calendar 2015 の 17 日目かつ RxJS Advent Calendar 2015 の 17 日目です。


はじめに

今日は ConnectableObservable を返す、いわゆる Cold / Hot 変換と呼ばれている Operator を見ていきます。

詳細は RxJS 4.0.7 を参照してください。

サンプルコードは以下のコマンドで試しています。

$ npm i rx babel-cli babel-preset-es2015

$ $(npm bin)/babel --presets es2015 index.js -o out.js && node out.js


おさらい (ConnectableObservableSubjectpublishreplay)


ConnectableObservable & publish & replay

ConnectableObservable については 13 日目に書きました。

RxJS の Operators (11) - Connectable Observable Operators - Qiita

登場したのは以下の要素です。


  • Observable.prototype.publish

  • Observable.prototype.replay

  • ConnectableObservable

  • ConnectableObservable.prototype.connect

  • ConnectableObservable.prototype.refCount

publishreplay などで ConnectableObservable を生成します。

publishnew ConnectableObservable(this, new Subject()) です。

replaynew ConnectableObservable(this, new ReplaySubject()) です。

ConnectableObservable はコンストラクタで Observable (source) と Subject (subject) とを取ります。source から subject に流します。

ConnectableObservable.prototype.connect は event を流しはじめるタイミングを制御します。多くの Operator で subscribe を呼び出したタイミングで流しはじめてしまうのに対して、ConnectableObservable を返す Operator は connect を呼び出したタイミングで流しはじめます。

ConnectableObservable.prototype.refCountsubscribedispose のカウントにあわせて、自動で connect および dispose してくれるように RefCountObservable で wrap して返すものです。


Subject

Subject については 16 日目に書きました。

RxJS の Subject とその種類 - Qiita

登場したのは以下の要素です。


  • Subject

  • AsyncSubject

  • BehaviorSubject

  • ReplaySubject

各 Subject は特徴として「複数の Observer 」を持ち、それらに値を流すことができます。

Subject は渡された event をそのまま observers へ流します。

AsyncSubject は完了時に最後の値を observers へ流します。

BehaviorSubject は直近の値を保持して subscribe した際にも直近の値を流します。

ReplaySubject は直近の N 個の値を保持して subscribe した際にそれらの値を流します。

上記の記事を組み合わせたうえで publish / replay を整理すると、

Observable.prototype.publish は……


  • new ConnectableObservable(this, new Subject())

  • 1 つの Observable で複数の Observer を取ります


  • connect で値を流しはじめます


  • subscribe 以前に流れた値は捨てられます

Observable.prototype.replay は……


  • new ConnectableObservable(this, new ReplaySubject())

  • 1 つの Observable で複数の Observer を取ります


  • connect で値を流しはじめます


  • subscribe 以前に流れた値も ReplaySubjectbufferSize におさまっていれば流れます


他の ConnectableObservable を返す Operators

他の ConnectableObservable を返す Operator を紹介していきます。


Observable.prototype.multicast

まず押さえるべきは Observable.prototype.multicast です。

multicast は引数に subject を取ります。multicast の実装は new ConnectableObservable(this, subject) です。

前述の通り、最悪これだけあれば publishreplay も不要です。

ここまで書いていませんでしたが publishthis.multicast(new Subject()) ですし、replaythis.multicast(new ReplaySubject(...)) です。


Observable.prototype.share

次は Observable.prototype.share です。

Observable.prototype.share

これは厳密には ConnectableObservable を返すわけではないのですが、refCount で返すので性質的には似たようなものになります。実装は new ConnectableObservable(this, new Subject()).refCount() です。実際には this.publish().refCount() です。refCount を使用することで connect を不要にしています。ConnectableObservable.prototype.refCount についても既に説明しているので難しいことはないですね。


一覧表

ここまでの Operator と紹介しなかったものを表でまとめます。

Operator
実装イメージ (引数は厳密ではない)

multicast
new ConnectableObservable(this, subject)

publish
new ConnectableObservable(this, Subject())

publishLast
new ConnectableObservable(this, AsyncSubject())

publishValue
new ConnectableObservable(this, BehaviorSubject())

replay
new ConnectableObservable(this, ReplaySubject())

share
new ConnectableObservable(this, Subject()).refCount()

-
new ConnectableObservable(this, AsyncSubject()).refCount()

shareValue
new ConnectableObservable(this, BehaviorSubject()).refCount()

shareReplay
new ConnectableObservable(this, ReplaySubject()).refCount()


おわりに

今日は ConnectableObservableRefCountObservable を返す、 Cold / Hot 変換と呼ばれている Operator について確認しました。

Operator の名前から動作を想像するよりも実装を見るほうが早いです。