Hot と Cold
4つめの記事です。
1つめの記事
2つめの記事
3つめの記事
HotなObservableとColdなObservableという用語があります。ここまでの説明の中で差に気づいた方もいるかもしれません。
簡単に言えば、HotなObservableはもともとあるものに対するObservable、ColdなObservableはsubscribeした瞬間にはじめて処理が実行されるObservableです。(←ここ大事です)
例えば、APIリクエストはColdなObservable、textFieldの値などは、HotなObservableです。
オペレータによる変換によって、Hot/Coldが変わることもあります。
例えば
let fooObservable = textField.rx.text
.map { $0?.isEmpty ?? false }
のようにすると、textField.rx.textはHotなObservableですが、mapの中の処理はsubscribeするまで実行されないので、fooObservable自体はColdなObservableです。
複数のsubscribe
以下のコードのように、二箇所でsubscribeするとどうなるでしょうか
_ = observable.subscribe(onNext: { print($0) })
_ = observable.subscribe(onNext: { print($0) })
observableがHotであれば、イベントごとに2つ共に通知されるだけなので、問題ありません。
しかし、observableがColdであると、処理が二回行われてしまいます。例えばAPIリクエストによってデータを取得するようなObservableだと、subscribeごとに、リクエストが行われてしまいます。
HotなObservableは分岐できますが、ColdなObservableは分岐できません。それでは困るので、Cold/Hot変換オペレータがあります。
shareReplay(n)
便利なCold -> Hot変換オペレータとして、 shareReplay(n)
があります。
let sharedObservable = coldObservable.shareReplay(1)
_ = sharedObservable.subscribe(onNext: { print($0) })
_ = sharedObservable.subscribe(onNext: { print($0) })
.shareReplay(n)
は、ColdをHotに変換するオペレータです。新しい値が流れてくるまでn個前までのイベントをキャッシュします。subscribe時には、キャッシュされた値を流します。
上のようにキャッシュを1つだけにすると、すべてのobserverにイベントが通知されます。
Hot -> Cold変換オペレータ
let coldObservable = hotObservable.map { $0 * 2 }
上のように、例えもとがhotなObservableでも、mapやfilterなど変換処理を挟むと、coldになります。
小さい処理なら複数行われても問題ないと思いますが、flatMap等による大きな変換は注意が必要です。
その他 Cold -> Hot変換に関わるオペレータについて
以降は少し細かいHot / Coldに関するオペレータなのと、Subjectについて本記事で書いてないのとで、難しめかもしれません(慣れてから読み直していただければ!)
publish, connect
これらもCold -> Hot変換に関わるオペレータです。
shareReplay(n)は便利ですが、以下のようなケースがあります。
let sharedObservable = coldObservable.shareReplay(1)
_ = sharedObservable.subscribe(onNext: { print($0) })
// 何か処理
_ = sharedObservable.subscribe(onNext: { print($0) })
ひとつめのsubscribeの時点で、coldObservableは始まります。下のsubscribe処理の前に、複数のイベントが発行されてしまうと、下のsubscribeでは全てのイベントを受け取れません。
publish, connectは以下のように使います。
let connectableObservable = coldObservable.publish()
_ = connectableObservable.subscribe(onNext: { print($0) })
// 何か処理
_ = connectableObservable.subscribe(onNext: { print($0) })
connectableObservable.connect()
publish()はConnectableObservableに変換します。ConnectableObservableはconnect()するまで処理を開始しません。逆に、connect()した瞬間に処理が開始されます。
observerをConnectableObservableにたくさん登録して、connect()で一気に処理 & イベント発行ができます。
ConnectableObservableのsourceのObservableと、ConnectableObservableのobserver群をconnect() するわけですね。
refCount
connect()と同じく、ConnectableObservableから生えたオペレータです。
ConnectableObservable自身がsourceのObservableをsubscribeしているので、どこかのタイミングでdisposeせねばなりません。
.refCount()
オペレータで変換しておくと、subscribeされている数に合わせて自動的に .connect()
, .dispose()
を行ってくれます。
- はじめにsubscribeされた瞬間にconnectします(refcount=1)
- すべてのobserverがunsubscribeすると自動的にdisposeします(refCount=0)
その他ConnectableObservableに変換するオペレータ
multicast
publish()
は、
multicast(PublishSubject())
のショートハンドです
https://github.com/ReactiveX/RxSwift/blob/master/RxSwift/Observables/Multicast.swift#L62-L64
ConnectableObservableの仕組みとして、sourceとなるObservableのイベントをsubscribeして、Subjectにそのまま通知。subscribeはsubjectをsubscribeする仕組みです。
好きなSubjectを使って変換する場合に用います。
replay(bufferSize)
multicast(ReplaySubject.create(bufferSize: bufferSize))
のショートハンドです
https://github.com/ReactiveX/RxSwift/blob/master/RxSwift/Observables/Multicast.swift#L79-L82
replayAll()
multicast(ReplaySubject.createUnbounded())
のショートハンドです
すべての値をキャッシュするので、どのタイミングでsubscribeを開始しても、すべてのイベントが通知されます。
その他関連オペレータ
share()
.publish().refCount()
のショートハンドです
shareReplay(bufferSize) は・・・
.replay(bufferSize).refCount()
のショートハンドです
参考資料
以下のukitakaさんのslideshareがとてもわかり易く書かれています。