ReactiveExtensions
RxJS
Rx
RxJSDay 13

RxJS の Operators (11) - Connectable Observable Operators

More than 3 years have passed since last update.

この記事は bouzuya's RxJS Advent Calendar 2015 の 13 日目かつ RxJS Advent Calendar 2015 の 13 日目です。


はじめに

今日は ReactiveX の Connectable Observable Operators について RxJS の API ドキュメントを読んだりサンプルコードを書いたりしていきます。

また RxJS 4.0.7 を対象にしています。


Connectable Observable


ConnectableObservable

Rx.ConnectableObservable は Cold / Hot 変換メソッドの要となる重要なクラスです。しかし、RxJS には ConnectableObservable のメソッドのドキュメントはありますが、 ConnectableObservable 自体のドキュメントはありません (なぜ!) 。

ConnectableObservableObservable を継承したクラスです。指定された sourceconnect 呼び出しタイミングで subscribe します。後続には指定された subjectsubscribe できるようにします。connect できる Observable なので ConnectableObservable です。

コンストラクタは ConnectableObservable(source, subject) です。まだ Subject については書いていないので、詳細は割愛します。

独自のメソッドとしては


  • ConnectableObservable.prototype.connect

  • ConnectableObservable.prototype.refCount

の 2 つを持ちます。各メソッドの詳細は後述します。

ConnectableObservable の生成は Observable.prototype.publish をはじめ多くのメソッドがあります。ですから Subject を明示する必要はほとんどありません。この記事内では次の 2 つのメソッドを紹介します。


  • Observable.prototype.publish

  • Observable.prototype.replay

さて、見ていきましょう。


ConnectableObservable.prototype.connect / Observable.prototype.publish

Observable.prototype.publishConnectableObservable のインスタンス (subject として Subject クラスを使用) を返します。ConnectableObservable.prototype.connect は wrap している sourcesubscribe を開始します。

いわゆる Cold / Hot 変換のメソッドです。Cold Observable / Hot Observable についてはまた別の機会に書きます。いままでの Operator とは挙動が大きく違うので注意してください。

import { Observable } from 'rx';

const connectableObservable = Observable
.from([1, 2, 3])
.do(value => console.log(`do: ${value}`))
.publish();

connectableObservable
.subscribe(
value => console.log(`observer1 onNext: ${value}`),
error => console.log(`observer1 onError: ${error}`),
() => console.log('observer1 onCompleted')
);
console.log('observer1 subscribed');

connectableObservable
.subscribe(
value => console.log(`observer2 onNext: ${value}`),
error => console.log(`observer2 onError: ${error}`),
() => console.log('observer2 onCompleted')
);
console.log('observer2 subscribed');

connectableObservable.connect();

// observer1 subscribed
// observer2 subscribed
// do: 1
// observer1 onNext: 1
// observer2 onNext: 1
// do: 2
// observer1 onNext: 2
// observer2 onNext: 2
// do: 3
// observer1 onNext: 3
// observer2 onNext: 3
// observer1 onCompleted
// observer2 onCompleted

この例だけを見ても何がしたいのか分かりづらいと思うので説明します。

まず注目すべきは connect が呼び出されるまでデータが流れていない点です。これまで紹介してきた Operator は subscribe の瞬間にデータが流れはじめていました。しかし、この例では違っていて、subscribe したあと connect を呼ぶまで流れません。connect の行をコメントにするとまったく値は流れません。出力は次のようになります。

observer1 subscribed

observer2 subscribed

次に注目すべきは do の呼び出されている回数です。元のソース ([1, 2, 3]) のデータの個数分 (3 回) しか呼び出されていません。これまで紹介してきた Operator であれば subscribe ごとに別のソース扱いになっていたため、元のソースのデータの個数 * Observer の個数 (3 * 2 の 6 回) 呼び出されていたでしょう。

上記の 2 点に注目しつつ、publish なしの例をご覧ください。

import { Observable } from 'rx';

const observable = Observable
.from([1, 2, 3])
.do(value => console.log(`do: ${value}`));

observable
.subscribe(
value => console.log(`observer1 onNext: ${value}`),
error => console.log(`observer1 onError: ${error}`),
() => console.log('observer1 onCompleted')
);
console.log('observer1 subscribed');

observable
.subscribe(
value => console.log(`observer2 onNext: ${value}`),
error => console.log(`observer2 onError: ${error}`),
() => console.log('observer2 onCompleted')
);
console.log('observer2 subscribed');
// do: 1
// observer1 onNext: 1
// do: 2
// observer1 onNext: 2
// do: 3
// observer1 onNext: 3
// observer1 onCompleted
// observer1 subscribed
// do: 1
// observer2 onNext: 1
// do: 2
// observer2 onNext: 2
// do: 3
// observer2 onNext: 3
// observer2 onCompleted
// observer2 subscribed

publish の挙動が分かると思います。おそらく複数の Observerpublish するという意味だと思います。イメージを「分岐」で説明されることも多いです。

         +----> (observer1)

|
---->(publish)
|
+----> (observer2)

実装についてはまた別の機会に説明します。

Cold / Hot に関しては他の Qiita 記事 『RxのHotとColdについて - Qiita』を参照すると良いです。


ConnectableObservable.prototype.refCount

subscribe されるたびインクリメント dispose されるたびデクリメントして、参照カウント (refCount) し、最初の subscribe でソースに connect し、最後の dispose でソースを dispose します。

import { Observable } from 'rx';

const observable = Observable
.from([1, 2, 3])
.do(value => console.log(`do: ${value}`))
.publish() // returns ConnectableObservable
.refCount(); // call ConnectableObservable.prototype.refCount

observable
.subscribe(
value => console.log(`observer1 onNext: ${value}`),
error => console.log(`observer1 onError: ${error}`),
() => console.log('observer1 onCompleted')
);
console.log('observer1 subscribed');

observable
.subscribe(
value => console.log(`observer2 onNext: ${value}`),
error => console.log(`observer2 onError: ${error}`),
() => console.log('observer2 onCompleted')
);
console.log('observer2 subscribed');
// do: 1
// observer1 onNext: 1
// do: 2
// observer1 onNext: 2
// do: 3
// observer1 onNext: 3
// observer1 onCompleted
// observer1 subscribed
// observer2 onCompleted
// observer2 subscribed

また分かりづらい例なので説明します。

まず注目すべきは publish しているにも関わらず connect の呼び出しがない点です。代わりに refCount を呼び出しています。

前述のとおり publishconnect するまで後続に値を流しません。しかし refCount は最初の subscribe で自動的に connect します。そして今回のソースは同期的 (from([1, 2, 3])) です。

つまり、最初の subscribe で自動的に connect し同期的にすべての値を流しきってしまいます。ですから connect したタイミングで subscribe していない observer2 には値は流れません。onCompleted が流れておしまいです。

もうひとつ別の例を挙げます。ソースが非同期の場合の例を挙げておきましょう。

import { Observable } from 'rx';

const observable = Observable
.interval(100) // async
.take(3) // -0-1-2->
.do(value => console.log(`do: ${value}`))
.publish() // returns ConnectableObservable
.refCount(); // call ConnectableObservable.prototype.refCount

observable
.subscribe(
value => console.log(`observer1 onNext: ${value}`),
error => console.log(`observer1 onError: ${error}`),
() => console.log('observer1 onCompleted')
);
console.log('observer1 subscribed');

observable
.subscribe(
value => console.log(`observer2 onNext: ${value}`),
error => console.log(`observer2 onError: ${error}`),
() => console.log('observer2 onCompleted')
);
console.log('observer2 subscribed');

// observer1 subscribed
// observer2 subscribed
// do: 0
// observer1 onNext: 0
// observer2 onNext: 0
// do: 1
// observer1 onNext: 1
// observer2 onNext: 1
// do: 2
// observer1 onNext: 2
// observer2 onNext: 2
// observer1 onCompleted
// observer2 onCompleted

ソースが非同期であるため、observer1subscribeconnect が呼び出されているのですが、その時点では値は流れずに observer2subscribe のあとで値が流れています。

2 つの例から refCount の注意点をまとめます。

refCountsubscribe で自動 connect するので、connect 呼び出しのタイミングを考えずに済みますが、代わりに同期的なソースの場合には値を取りこぼす可能性が高いです (非同期でももちろん値を取りこぼす可能性はあります) 。


Observable.prototype.replay

Observable.prototype.replayConnectableObservable のインスタンス (subject として ReplaySubject クラスを使用) を返します。publish との違いは、publishSubject を使うのに対し replayReplaySubject を使う点です。subscribe したタイミングで指定した個数の直近に流れたデータが再度流 (replay) されます。

import { Observable } from 'rx';

const observable = Observable
.from([1, 2, 3])
.do(value => console.log(`do: ${value}`))
.replay(null, 3) // null = selector, 3 = ReplaySubject buffer
.refCount();

observable
.subscribe(
value => console.log(`observer1 onNext: ${value}`),
error => console.log(`observer1 onError: ${error}`),
() => console.log('observer1 onCompleted')
);
console.log('observer1 subscribed');

observable
.subscribe(
value => console.log(`observer2 onNext: ${value}`),
error => console.log(`observer2 onError: ${error}`),
() => console.log('observer2 onCompleted')
);
console.log('observer2 subscribed');
// do: 1
// observer1 onNext: 1
// do: 2
// observer1 onNext: 2
// do: 3
// observer1 onNext: 3
// observer1 onCompleted
// observer1 subscribed
// observer2 onNext: 1
// do: 1
// observer2 onNext: 2
// do: 2
// observer2 onNext: 3
// do: 3
// observer2 onCompleted
// observer2 subscribed

上記の例は、refCount の最初の例とほとんど同じですが、publish ではなく replay(null, 2) を使用しています。replay(null, 3)bufferSize3ReplaySubject です。直近 3 回分を replay します。

replay(null, 3)nullselector です。実は publish()selector を取ることができます。ただ、selector を指定した場合には ConnectableObservable を返しません。説明が難しくなるのでここでは割愛します。


おわりに

ReactiveX の Connectable Observable Operators を見ました。

ConnectableObservable とそのメソッド connectrefCount 。そして ConnectableObservable を生成するメソッド publishreplay です。

今回の Cold / Hot 変換メソッドは、挙動の理解に注意が必要なので、これらの Operator をまとめた shareshareReplay やもっと低レイヤーな multicast などを合わせて紹介することを控えました。ConnectableObservable の実装を確認する際に触れたいと思います。

昨日とは打って変わって難しい動きでした。