RxJavaをAndroid上で扱うに当たって、LifecycleとかLiveDataとかと併せて使うと便利そうだよね。という何番煎じだか分からないネタです。
Disposableをライフサイクルに合わせて自動的にdisposeしたい
AndroidでRxJava/RxKotlinを使う場合、以下のようにonDestoryなどでDisposableのdispose処理をしないといけないのが面倒ですね。
hogeObserver
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({}, {})
.addTo(compositeDisposable)
...
override fun onDestroy() {
compositeDisposable.dispose()
}
しかし、Lifecycle系の機能を使えば自動化できそうだよねーと誰もが考えるでしょう。
安直にはこんなところでしょうか?
fun Disposable.disposeOnDestroy(owner: LifecycleOwner) {
if (owner.lifecycle.currentState == State.DESTROYED) {
dispose()
return
}
DisposeOnDestroy(owner, this).observe()
}
private class DisposeOnDestroy(
private val owner: LifecycleOwner,
private val disposable: Disposable
) : LifecycleEventObserver {
fun observe() {
owner.lifecycle.addObserver(this)
}
override fun onStateChanged(source: LifecycleOwner, event: Event) {
if (event == Event.ON_DESTROY) {
disposable.dispose()
owner.lifecycle.removeObserver(this)
}
}
}
先の例だと、こんな風に使います。
hogeObserver
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({}, {})
.disposeOnDestroy(this)
disposeしたいタイミングはonDestroy
ではなく、onStop
だという場合は、disposeOnStop()
などを作ればよいでしょう。
RxJavaのObservableをLiveDataに変換したい
前述のように、onDestroyでのdisposeが省略できれば十分かもしれませんが、データを受け取り、UIに反映させる、という使い方ではRxJavaよりもLiveDataを使うのが便利ですね。
そのため、RxJavaのObservableをLiveDataに変換したいという要望はよくあるでしょう。
安直にはlifecycle-reactivestreams
を使って、こうでしょうか。
fun <T> Observable<T>.toLiveData(): LiveData<T> =
LiveDataReactiveStreams.fromPublisher(toFlowable(BackpressureStrategy.LATEST))
lifecycle-reactivestreams-ktx
ならPublisher.toLiveData()
が使えるので以下でも良いですね。
fun <T> Observable<T>.toLiveData(): LiveData<T> =
toFlowable(BackpressureStrategy.LATEST).toLiveData()
ReaciveStreamsを挟まないで変換したいという場合は以下のようにすれば同等でしょうか?
fun <T> Observable<T>.toLiveData(): LiveData<T> = RxLiveData(this)
private class RxLiveData<T>(
private val observable: Observable<T>
) : LiveData<T>() {
private var disposable: Disposable? = null
override fun onActive() {
disposable = observable.subscribe({
postValue(it)
}) {
postValue(null)
}
}
override fun onInactive() {
disposable?.dispose()
}
}
いずれもLifecycleに併せてdisposeも行ってくれます。
以上!
...と考えていた時期が、俺にもありました。
LiveDataReactiveStreamsも前述のRxLiveDataを使った場合も同じですが、LiveDataがActiveな状態でのみupstreamのsubscribeを行っています。つまり、inactive状態のイベントを拾うことができません。
inactive状態ってLiveDataでは状態を通知されないのでいらないようにも思いますが、MutableLiveDataの場合、inactive状態でもsetValue/postValueが可能で、inactive状態で値の更新があれば、次のonActiveのタイミングで最新の値が通知されるという仕組みになっています。
つまり、stop状態で何らかの更新があれば、次のonStartで最新の値を拾いたいという用途では前述の方法では不十分ということになります。
ただそうなると、disposeするタイミングをどこにするのかとかが難しくなってくるので、ひとまずobserverがいる間はsubscribe状態で、observerがいなくなったときにdisposeすればいいんじゃない?ということで以下のようにすればよさそうです。
private class RxLiveData<T>(
private val observable: Observable<T>
) : LiveData<T>() {
private var disposable: Disposable? = null
private fun ensureSubscribe() {
if (disposable != null) return
disposable = observable.subscribe({
postValue(it)
}) {
postValue(null)
}
}
override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
super.observe(owner, observer)
ensureSubscribe()
}
override fun observeForever(observer: Observer<in T>) {
super.observeForever(observer)
ensureSubscribe()
}
override fun removeObserver(observer: Observer<in T>) {
super.removeObserver(observer)
if (!hasObservers()) {
disposable?.dispose()
disposable = null
}
}
}
最初のdisposeOnDestroyを使えば良さそうにも思いますが、あちらはinactive状態でも通知されるという違いがあるので、UIへの反映とかに使うにはこっちの方が良いのではないかと思います。
以上です。