RxJava multicastについて

More than 1 year has passed since last update.

RxJavaには, stream上のアイテムを複数のSubscriberに渡せるmulticast(再利用のような感じ)の機能があります.

multicastは主に「キャッシュとして使う」, イベントをが発火したときに複数で処理をおこなう」 として使います.

RxJavaでは2つの方法でmulticastを実現します.



  • publishを使い, ConnectableObservableを作る


  • Subjectを使う

また, RxJava2だとFlowableがありますが, Flowableの場合は Processorを使います.

説明だけだと分かりにくいので, サンプルコードも合わせてどうぞ.


ConnectableObservableについて

ConnectableObservableはpublishがコールされたタイミングで生成されます.

ここがStreamの開始地点, 再利用する地点になります. なので, mapfilterなど, 共通の処理があるのなら,

publishのUpstreamに書いて共通化したほうが効率が良いです.

ConnectableObservableはconnectメソッドがコールされたタイミングで, Streamの開始をします.

従来はSubscriberを登録した(subscribeメソッドをコールした)タイミングなので, 注意が必要です.

そして, publish + connectを組みわせることで1つのStreamに対して複数のSubscriberを登録することが出来ます.

Publish Connect

(source: https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators)

他にも, sharerefcountがありますが, 基本的な考え方は変わりません.


Subjectを使う

Subjectは, ObservableとObserverの機能を同時に使うことができる実装です.

1例としては, Observerの機能を使い, 複数のObservableからデータを取得し, Observableの機能を使いそのデータをObserverに渡す, いわゆるBridgeのような事ができます.

BehaviorSubject, ReplaySubjectなどのバラエティがあるので, 必要に応じて使い分ける必要があります. (https://github.com/ReactiveX/RxJava/wiki/subject)


まとめ

multicastを上手に使いこなすことで, データの取得回数を減らし, アプリがパフォーマンスに優れた構造になることが期待できます.

そのためには, どこにmulticastを入れるか, どこからConnectableObservableにするか(publishをコールするか)を決定する必要があります.

アプリごとに適切なポイント, 書き方は異なると思うので, サンプルを参考に頂けたら幸いです.

Happy RxJava2 life!!


参考