LiveDataはViewにデータを反映させるためのStreamとして非常に便利でシンプルな仕組みです。
ViewModelにLiveDataを配置し
val hoge: MutableLiveData<Hoge>
ActivityなどでobserveすることでLifecycleを考慮しつつリアクティブにViewへデータを反映させることができます。
hoge.observe(this) {
hogeView.applyHoge(it)
}
これはこれで十分便利ではあるのですが、シンプルであるが故に少し融通が利かないところがあったり、公式にはあまりユーティリティが充実していないので、少し変わったことをしようとすると、自作したり、サードパーティライブラリを導入する必要があります。
一方、Coroutines Flowは拡張関数などユーティリティが充実していて、代わりにこちらを使った方が便利そうなので置き換えたいなーと思っていましたが、同等の使い方をするにはどうすれば良いか、というところで結構悩んでしまったので記事にまとめておきます。
結論
先に結論書いておきます。
完全に同じ動作をさせることはできなさそうなので、状態通知とイベント通知の2種類に分けて考える必要があります。(LiveDataで正しくイベント通知するのは一筋縄ではいかず、いろいろ工夫が必要ですが)
状態通知で使っている場合はMutableSharedFlow
、初期値がある場合はMutableStateFlow
、イベント通知で使っている場合はChannel
を使います。
// 状態通知(初期値なし)
val flow = MutableSharedFlow<Hoge>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
// 状態通知(初期値あり)
val flow = MutableStateFlow(Hoge.EMPTY)
// イベント通知
val channel = Channel<Hoge>(Channel.CONFLATED)
// 以下と等価
// val channel = Channel<Hoge>(capacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
通知を受け取る側では以下のような拡張関数を用意すれば、LiveDataと同様のインターフェースで扱えますね。
fun <T> Flow<T>.observe(
owner: LifecycleOwner,
minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
action: suspend (T) -> Unit,
) = flowWithLifecycle(owner.lifecycle, minActiveState)
.distinctUntilChanged()
.onEach(action)
.launchIn(owner.lifecycleScope)
fun <T> Channel<T>.observe(
owner: LifecycleOwner,
minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
action: suspend (T) -> Unit,
) = receiveAsFlow()
.flowWithLifecycle(owner.lifecycle, minActiveState)
.onEach(action)
.launchIn(owner.lifecycleScope)
LiveDataの特徴
LiveDataってどんな挙動だったっけと整理してみると
- 初期値は指定しなければ空だが、最後の値を保持している
- observeした時点で値があればその値が通知される
- Started状態でのみ値を受け取る
- setValueされた値は、変化がなくても受け取る
- postValueされた値はすべてが通知されるわけではなく、observerが受け取るまでの間に更新された、最後の値が通知される
- Stop~Startの間にsetが行われた場合にのみ、再Startで値を受け取る
こんなところでしょうか?
状態の通知にはMutableSharedFlow or MutableStateFlow
「初期値は指定しなければ空だが、最後の値を保持している」「observeした時点で値があればその値が通知される」
という性質は、MutableSharedFlow
にreplay=1
を設定した場合のものと同じですね。(初期値を指定する場合はMutableStateFlow
)
また、「postValueされた値はすべてが通知されるわけではなく、observerが受け取るまでの間に更新された、最後の値が通知される」性質は、同様にMutableSharedFlowにonBufferOverflow = BufferOverflow.DROP_OLDEST
を設定した場合のものですね
val flow = MutableSharedFlow<Hoge>(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
「Started状態でのみ値を受け取る」という性質は受け取る際、repeatOnLifecycle
を使って以下
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
flow.collect {
...
}
}
}
ネストが深くなってしまうので、チェーンメソッドで使えるようにするための拡張関数、flowWithLifecycle
とlaunchIn
を使って以下のように書くのがよいですね。
flow
.flowWithLifecycle(lifecycle)
.onEach {
...
}
.launchIn(lifecycleScope)
概ねこれで所望の動作となりますが、「Stop~Startの間にsetが行われた場合にのみ、再Startで値を受け取る」については、値が更新されたかどうかにかかわらず、START時に現在値を受け取ってしまうという問題があります。
更新されていなくても同じ値が通知されてしまいます。
それを回避するため、distinctUntilChanged
を追加して以下のように受けとるのが良さそうです。
flow
.flowWithLifecycle(lifecycle)
.distinctUntilChanged()
.onEach {
...
}
.launchIn(lifecycleScope)
この場合、distinctUntilChangedを使っているため「setValueされた値は、変化がなくても受け取る」という性質は満たせなくなります。ただ、状態を受け取ると考えれば、同じ値を受け取る必要は無いので、むしろこれで良いです。
イベント通知にはChannel
一方で、イベント通知として使いたいので、値に変化がなくても受け取ってほしい、という場合もあります。
その場合は、Channelを使いましょう。Channelは言ってしまえばCoroutines版BlockingQueueなので、受信者が一つだけに限定されてしまいますが、まさにイベント通知に適したStreamです。
受信者が一つに限定されるという性質も、イベント通知なら当然のこと(LiveDataでも受信者を一つに限定して諸々工夫しなければイベント通知できないですし)なので割り切ります。
デフォルトのChannelは受信側が受信状態でなければ、送信できない動作になっているため、capacity = 1
とonBufferOverflow = BufferOverflow.DROP_OLDEST
を指定して、LiveData風に使えるようにします。
val channel = Channel<Hoge>(capacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
ちなみに、この設定のショートカットが用意されていて、capacity
にChannel.CONFLATED
を指定しても同じです。
val channel = Channel<Hoge>(Channel.CONFLATED)
受信処理はFlow同様にrepeatOnLifecycle
を使って以下のように書けます
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
for (i in viewModel.channel) {
...
}
}
}
チェーメソッドで受けたいところですが、channelには同様の拡張関数は用意されていません、代わりにreceiveAsFlow
というFlowとして受け取るための拡張関数が用意されていますので、これを利用し
channel
.receiveAsFlow()
.flowWithLifecycle(lifecycle)
.onEach {
...
}
.launchIn(lifecycleScope)
こうすることで、onStart ~ onStopの間にのみイベントを受け取り、onStop~onStartの間に発生したイベントは、onStart時に最後のイベントを受け取る。イベントが発生していなければ受け取らない。というLiveDataを使ったイベント通知と同様の動作を差せることができます。
observe拡張関数
flowを使う場合、Activeな状態をSTARTEDだけでなく、RESUMEDなど他の状態を指定するなど、より柔軟に使うことができます。それを指定可能にしつつも、デフォルト動作であれば、LiveDataと同じメソッドが使えるように以下のように拡張関数を定義しておくと同じ感覚で使えるかと思います。
fun <T> Flow<T>.observe(
owner: LifecycleOwner,
minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
action: suspend (T) -> Unit,
) = flowWithLifecycle(owner.lifecycle, minActiveState)
.distinctUntilChanged()
.onEach(action)
.launchIn(owner.lifecycleScope)
fun <T> Channel<T>.observe(
owner: LifecycleOwner,
minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
action: suspend (T) -> Unit,
) = receiveAsFlow()
.flowWithLifecycle(owner.lifecycle, minActiveState)
.onEach(action)
.launchIn(owner.lifecycleScope)
まとめ
というわけで、LiveDataをFlow/Channelに置き換える方法でした。
穴がありましたらご指摘いただけるとありがたいです。