[RxSwift] shareReplayをちゃんと書いてお行儀良くストリームを購読しよう

More than 1 year has passed since last update.

RxSwiftのサンプルコードを見てみると、頻繁にshareReplayというオペレーターが登場することに気づくと思います。しかし、このshareReplayを書かなくともコードは動くし、ホントにこれ必要なの?てか、コイツは何をしてるの?という疑問が湧いたので詳しく調べてみました。


shareReplayがないとき

まずは、shareReplayがなぜ必要なのか考えるためにRxSwiftのExampleを多少シンプルにした下記の様な画面の実装について考えてみます。

shareReplay_1.png

↓のような感じで3つのIBOutletで接続していて、

    @IBOutlet weak var usernameOutlet: UITextField! // ユーザー名を入力するTextView

@IBOutlet weak var usernameValidOutlet: UILabel! // 赤い文字で警告を表示するUILabel
@IBOutlet weak var registerButtonOutlet: UIButton! // 登録ボタン

入力テキストが5文字以上かどうかをValidateして


  • 5文字以上入れてねという警告を表示するUILabel(usernameValidOutlet)をhiddenにするかどうか

  • 登録(registerButtonOutlet)ボタンをenableにするかどうか

のハンドリングを実施したい、という要件です。これをRxSwiftを使って実装すると

let usernameValid = usernameOutlet.rx_text

.map{ text -> Bool in
print("[execute validation]")
return text.characters.count >= minimalUsernameLength
}

usernameValid
.bindTo(usernameValidOutlet.rx_hidden)
.addDisposableTo(disposeBag)

usernameValid
.bindTo(registerButtonOutlet.rx_enabled)
.addDisposableTo(disposeBag)

みたいな感じになると思います。printログ出すために本来は不要な処理を若干書いてますが。。。さて、この実装の問題点はどこでしょうか?実は、この実装ではテキストを1文字入力するたびに、[execute validation]が2回出力されてしまうという問題があります。

これは、usernameValidusernameValidOutlet.rx_hiddenregisterButton.rx_enableの2つのプロパティに対してbindし購読しているためです。今回は2つだけですが、それでも無駄な処理が走ってしまっているのは気になります。さらに、今回はmapして中で簡単なvalidationを実施してるだけですが、もしもこれがIOを伴う処理だったり、通信を伴う処理だったらと考えるとあまり良い気持ちにはなりません。


shareReplayがあるとき

そこで、shareReplayの出番です!usernameValidのコードを修正して、最後にshareReplayを呼び出すよう変更してみましょう。

let usernameValid = usernameOutlet.rx_text

.map{ text -> Bool in
print("[execute validation]")
return text.characters.count >= minimalUsernameLength
}
.shareReplay(1) // 最後にshareReplayを呼び出す

この修正を加えた上で文字を入力すると、テキストを1文字入力すると1度だけ[execute validation]が出力されるようになっているはずです。


なぜ?

なぜ、shareReplayを追加すると、[execute validation]の出力が1回だけになったのでしょうか?それを知るためには、実は、replayrefCountという2つのオペレーターについて知る必要があります。

なぜならshareReplay(1)は、内部的にはreplay(1).refCountを呼び出してるだけのショートカットだからです(厳密には頻繁に使われるであろう<引数が1のとき>に限っては内部で最適化した別の処理を実行しているみたいです)。つまり、さきほどのコードを下記の様に書いても実行される処理は全く同じです。

let usernameValid = usernameOutlet.rx_text

.map{ text -> Bool in
print("[execute validation]")
return text.characters.count >= minimalUsernameLength
}
.replay(1)
.refCount()

ということは、replayrefCountが何者かわかれば、shareReplayが何かもわかる、というわけですね。こいつらは何者なのでしょうか?


ConnectableObservableへの変換

ここでreplayが果たしている大きな役割の一つが、ConnectableObservableの変換です。replay(1)により、usernameValidConnectableObservableと呼ばれるObservableに変換され、通常のObservableとは少し異なる挙動をするようになります。

ConnectableObservableと通常のObservableの大きな違いは、ストリームの生成方法とタイミングです。

通常のObservableの場合はsubscribeした時点でObserver毎に計算リソースが割り当てられ、subscribeしているObserverが複数存在する場合は別々に値を通知するという挙動になっています。つまり、最初の例のように2つのオブジェクトにbindしsubscribeすると、別々の2本のストリームが生成されるということです。subscribe(bindTo)した数だけmap内の処理が走ってしまう、という意図せぬ動きになっていたのは、この性質のためです。

一方ConnectableObservablesubscribeしているObserverが複数いたとしても、全てのObserver間で共有の計算リソースが割り当てられ、同時に値を通知するという挙動になります。わかりやすく言えば、1本のストリームが分岐してそれぞれのsubscribe元に情報を届ける、というイメージです。つまり最初の例で言うならば、複数箇所でsubscribe(bindTo)されていてもmap内の処理は1文字入力したときには1度だけ走るようになる、というわけです。

なお、同じ様にConnectableObservableに変換するオペレーターとしてはpublishというものもあります。replaypublishの違いは直近のイベント(後述するconnect()を実行する前に既に発行されてしまったイベント)が再送されるかどうか、です。


publish との違いは、publish が Subject を使うのに対し replay が ReplaySubject を使う点です。subscribe したタイミングで指定した個数の直近 に流れたデータが再度流 (replay) されます。

RxJS の Operators (11) - Connectable Observable Operators より


さて、これで意図しない処理が走ることもなくなったしメデタシメデタシ、、、ではありません。refCount()をコメントアウトしてみればわかりますが、replay(1)で終わった状態だとsubscribeしているにもかかわらずストリームが何も流れてきません。これは、なぜでしょうか?

let usernameValid = usernameOutlet.rx_text

.map{ text -> Bool in
print("[execute validation]")
return text.characters.count >= minimalUsernameLength
}
.replay(1)
// .refCount() ここをコメントアウトすると、subscribeしてるのに何も流れてこない


refCountとは

なぜストリームが流れないのか?その答えのヒントはConnectableObservableという名前です。実は、ConnectableObservableconnectというオペレータを呼び出すまでは、イベントが発生していてもそれをストリームに流すことはしません。つまり、connectしない限りはsubscribeしていてもストリームは流れてこないということです。

じゃあconnectすればええやん、となるのですが一つ面倒なことがあります。それはconnectした場合、そのソースを適切なタイミングでdisposeする必要があるという点です。複数のObserverにsubscribeされている場合、このdisposeを行うタイミングを適切に考えて実装するのはなかかなか手間です。

そこで役立つのが、refCountです。refCountは、subscribeされるとconnectすると同時に内部のカウンターをインクリメントし、Observerがdisposeされると内部のカウンターをデクリメントします。そして、カウンターの値が0になると同時に、connectしたソースをdisposeします。

iOSのエンジニアならARCでお馴染みのリファレンスカウンタと同じですね。おそらく名前も、referenceCountの略だと思われます。ConnectableObservableの末尾に.refCount()をつけておくことで、複数subscribeしても適切にconnectしてdisposeしてくれる、というわけです!


というわけで

複数のObserverが購読してるストリームで、最初の例の様に2本のストリームを作るのではなく、1本ストリームから同じ値を購読したいんだ、という場合には不必要な処理や意図しない処理が走らないように、shareReplay(share)をつけておきましょう!


HotなObservable とColdなObservable / Hot変換

ちなみに、上述したConnectableObservableの特徴は Hot Observableと呼ばれる種類のObservableの性質の一部です。Rxの世界ではObservableはHotとColdの2種に区別され、それぞれに異なった性質を持っています。これまでに説明した特徴以外も含めて、下記のRxSwiftのDocumentではわかりやすく対比した表を用意してくれています。

Rx Swift Hot and Cold Observables

また、replayを利用して行ったConnectableObservableへの変換は、Rxの世界ではHot変換などと呼ばれます。このあたりの理解については(自分自信まだあまり理解できたとは言えないですが)、下記の様な記事たちが大変参考になりました。

RxJS の Operators (11) - Connectable Observable Operators

RxのHotとColdについて

Rx入門 (13) - HotとCold

Rx入門 (14) - Cold to Hot変換

Connectable Observable Operators