LoginSignup
62
46

More than 3 years have passed since last update.

SharedFlow、StateFlow、Channelなどを比較して、LiveDataやEvent通知の置き換えを考える

Last updated at Posted at 2020-11-15

(ちょっと早いですが、時間が取れなかった場合、これをアドベントカレンダーのネタにするかもです)

このパターンでこれだとうまく動かない、こういう方法があるなど、何か指摘などあればぜひ教えて下さい。

どうやら、これからはStateFlowやSharedFlowの時代のようです。

新しいコードで意図的にLiveDataを選択する理由はありません。ということが書かれていました。

Twitterなどで話題になっていました。
https://twitter.com/codinginflow/status/1326628159077412866
(Kotlin LangのSlackより)

image.png
image.png

最初にまとめ

以下にLifecycleCoroutineScope.launchWhenStarted()を組み合わせて使います。

LiveData.postValueの挙動に近い

MutableStateFlow<String>("initial value")
最新のデータをViewに反映したいだけならこれです。

LiveData.setValueでの挙動に近い

MutableSharedFlow(replay = 1)
ちゃんと一つ一つのデータを流す必要があり、Viewに反映したいなら MutableSharedFlow(replay = 1)

Event通知

Flowで素直にやる方法が今の所ないみたいです。
以下の2つのやり方が一応あります。

  • Eventクラスでラップして、このEventが消費されたかを保持する方法
  • 通知後にEmptyのオブジェクトを入れてを入れてなんとかする方法
  • Channel(capacity = UNLIMITED).receiveAsFlow()

他にもこんな方法でいけるとか、間違っているみたいなのがあれば教えていただきたいです。

以下考察

どんな動きが理想か?

LiveData

  • observe時に今STARTED以降の状態であれば値を流し、そうでなければ次にSTARTED以降の状態になったときに値を流します。これは確実にViewに値を反映させつつ、FragmentなどonStart以降でしか動かすことができない(Exceptionが投げられる)AndroidのAPIがあってもうまく動かすためです。
liveData.observe(viewLifecycleOwer) { value ->
...
}
  • LiveData#postValue(value)では複数回投げられると最新の1つの値で動きます。もし完全な表示のためのデータを持っているのであれば最新のデータを反映するだけでいいので、この仕様でもうまく動くはずです。

postValueのJavaDoc

If you called this method multiple times before a main thread executed a posted task, only the last value would be dispatched.

つまり以下ではobserverに"b"だけが通知されます。

liveData.postValue("a")
liveData.postValue("b")
  • LiveData#setValue(value)ではpostValue()のような制約なしに以下で"a"、"b"両方をobserverで実行することができます。しかし、"a"を流す必要ない場合もあるので、良し悪しがあります。
liveData.setValue("a")
liveData.setValue("b")
  • 値が一緒でも流すという性質があります。これに関してはイベント通知として使っていると必要になるパターンがあるかもしれません。

Event

例えばSnackbarを表示したい、ダイアログを表示したいといったものです。

これに対応するAndroidのコンポーネントはないので、いろんなパターンで実装されてきたため、正確にこうというのはないですが、以下の仕様が良いででしょう。

  • observe時には通知されない。例えば画面回転時にダイアログやSnackbarがまた出てしまうとつらいです。
  • onStart以降で動かす。
  • observeしていないときのイベントもobserve時に流れる。画面回転時にイベントが失われないようにするため。
  • 複数回postしても、LiveData.postValueのように消えずに、ちゃんと全部が流れる。
  • 同じ値を2回入れたときでも2回流れる。

FlowをSTARTED以降の状態で動かすには

LifecycleCoroutineScope.launchWhenStarted()を使うとCoroutinesの処理をpause、resumeしてくれます。

そのため、onStart以降でしか動かないようにする方法としてKotlin LangのSlackでIan Lakeさんが以下の方法を紹介していました。 これを使えば、onStart以降だけ動かすようにできそうです。

/**
 * Similar to [kotlinx.coroutines.flow.launchIn] but using
 * [androidx.lifecycle.LifecycleCoroutineScope.launchWhenStarted].
 */
fun <T> Flow<T>.launchWhenStartedIn(
        lifecycleOwner: LifecycleOwner
) = lifecycleOwner.lifecycleScope.launchWhenStarted {
    collect()
}
viewModel.yourFlow.onEach { items ->
  // Do something with the items
}.launchWhenStartedIn(viewLifecycleOwner)

検証コード

基本的に複数の種類のFlowに対してそれぞれforEach{}で同じプログラムで、チェックしているだけです。

ここから動作見られます。
https://pl.kotl.in/9lm5AT-8N

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.flow.*

class App {
    /**
     * public fun <T> MutableSharedFlow(
     * replay: Int = 0,
     * extraBufferCapacity: Int = 0,
     * onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
    )
     */

    val sharedFlow1 = MutableSharedFlow<String>()

    val sharedFlow2 = MutableSharedFlow<String>(replay = 1)

    val sharedFlow3 = MutableSharedFlow<String>(extraBufferCapacity = 1)
    val sharedFlow4 = MutableSharedFlow<String>(
        extraBufferCapacity = 1,
        onBufferOverflow = BufferOverflow.DROP_LATEST
    )
    val sharedFlow5 = MutableSharedFlow<String>(
        replay = 1,
        extraBufferCapacity = 1,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
    val sharedFlow6 = MutableSharedFlow<String>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)

    // The resulting flow can be collected just once and throws
    // * [IllegalStateException] when trying to collect it more than once.
//    val channelConsumeAsFlow = Channel<String>(capacity = UNLIMITED)
//    val flowChannelConsumeAsFlow = channelConsumeAsFlow.consumeAsFlow()

    val channelReceiveAsFlow = Channel<String>(capacity = UNLIMITED)
    val flowChannelReceiveAsFlow = channelReceiveAsFlow.receiveAsFlow()

    data class Event<T>(var isConsumed: Boolean, val value: T)

    val mutableSharedFlowForWrapEvent = MutableSharedFlow<Event<String>>(replay = 1)
    val flowWrapWithEvent = mutableSharedFlowForWrapEvent

    val sharedFlowSetEmptyWhenCollect = MutableSharedFlow<String>(replay = 1)

    val stateFlow = MutableStateFlow("initial value")
}

open class FlowCheck<T>(
    open val name: String,
    open val collect: suspend ((T) -> Unit) -> Unit,
    open val emit: suspend (String) -> Unit
)

class DefaultFlowCheck<T>(
    name: String,
    flow: Flow<T>
) : FlowCheck<T>(
    name = name,
    collect = { collectFun ->
        flow.collect { value -> collectFun(value) }
    },
    emit = { value ->
        flow as MutableSharedFlow<String>
        flow.emit(value)
    }
)

suspend fun main(args: Array<String>) {
    val app = App()
    val flows = listOf<FlowCheck<*>>(
        DefaultFlowCheck(
            "MutableSharedFlow(replay = 0, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND) default MutableSharedFlow",
            app.sharedFlow1
        ),
        DefaultFlowCheck(
            "MutableSharedFlow(replay = 1, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)",
            app.sharedFlow2
        ),
        DefaultFlowCheck(
            "MutableSharedFlow(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.SUSPEND)",
            app.sharedFlow3
        ),
        DefaultFlowCheck(
            "MutableSharedFlow(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)",
            app.sharedFlow6
        ),
        DefaultFlowCheck(
            "MutableSharedFlow(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)",
            app.sharedFlow4
        ),
        DefaultFlowCheck(
            "MutableSharedFlow(replay = 1, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)",
            app.sharedFlow5
        ),
        FlowCheck(
            "Event",
            collect = { function: (App.Event<String>) -> Unit ->
                app.flowWrapWithEvent.collect { event ->
                    if (!event.isConsumed) {
                        event.isConsumed = true
                        function(event)
                    }
                }
            },
            emit = {
                app.mutableSharedFlowForWrapEvent.emit(App.Event(false, it))
            }
        ),
//        DefaultFlowCheck(
//            "channel.consumeAsFlow()",
//            app.flowChannelConsumeAsFlow,
//            { s: String -> app.channelConsumeAsFlow.send(s) }
//        ),
        FlowCheck<String>(
            "channel.receiveAsFlow()",
            collect = { collectFun ->
                app.flowChannelReceiveAsFlow.collect { collectFun(it) }
            },
            emit = { s: String -> app.channelReceiveAsFlow.send(s) }
        ),

        FlowCheck<String>(
            "SharedFlowSetEmptyWhenCollect",
            collect = { collectFun ->
                app.sharedFlowSetEmptyWhenCollect.collect {
                    if (it.isNotEmpty()) {
                        collectFun(it)
                        app.sharedFlowSetEmptyWhenCollect.tryEmit("")
                    }
                }
            },
            emit = { s: String -> app.sharedFlowSetEmptyWhenCollect.emit(s) }
        ),


        DefaultFlowCheck(
            "MutableStateFlow<String>(\"initial value\")",
            app.stateFlow
        ),
    )
    flows.forEach { flowCheck ->
        println("----")
        println("start ${flowCheck.name}")
        println("#1 collect and emit")
        val job = GlobalScope.launch {
            flowCheck.collect {
                println("- #1 received: $it")
            }
        }
        delay(30)
        flowCheck.emit("hello 1")
        delay(30)
        job.cancel()
        delay(30)
        println("#2 emit and collect")
        flowCheck.emit("hello 2")
        delay(30)
        val job2 = GlobalScope.launch {
            flowCheck.collect {
                println("- #2 received: $it")
            }
        }
        delay(30)
        job2.cancel()
        delay(30)
        println("#3 collect and emit × 3")
        delay(30)
        val job3 = GlobalScope.launch {
            flowCheck.collect {
                println("- #3 received: $it")
            }
        }
        delay(30)
        flowCheck.emit("hello 3-1")
        flowCheck.emit("hello 3-2")
        flowCheck.emit("hello 3-3")
        delay(30)
        job3.cancel()
        println("#4 just collect")
        delay(30)
        val job4 = GlobalScope.launch {
            flowCheck.collect {
                println("- #4 received: $it")
            }
        }
        delay(30)
        job4.cancel()
        delay(30)
        println("#5 collect and same emit × 2")
        val job5 = GlobalScope.launch {
            flowCheck.collect {
                println("- #5 received: $it")
            }
        }
        delay(30)
        flowCheck.emit("hello 5")
        delay(30)
        flowCheck.emit("hello 5")
        delay(30)
        job5.cancel()
        delay(30)

        println("#6 same emit × 2 and collect")
        flowCheck.emit("hello 6")
        delay(30)
        flowCheck.emit("hello 6")
        delay(30)
        val job6 = GlobalScope.launch {
            flowCheck.collect {
                println("- #6 received: $it")
            }
        }
        delay(30)
        job6.cancel()
        delay(30)
    }
    delay(30)
}
----
start MutableSharedFlow(replay = 0, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND) default MutableSharedFlow
#1 collect and emit
- #1 received: hello 1
#2 emit and collect
#3 collect and emit × 3
- #3 received: hello 3-1
- #3 received: hello 3-2
- #3 received: hello 3-3
#4 just collect
#5 collect and same emit × 2
- #5 received: hello 5
- #5 received: hello 5
#6 same emit × 2 and collect
----
start MutableSharedFlow(replay = 1, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)
#1 collect and emit
- #1 received: hello 1
#2 emit and collect
- #2 received: hello 2
#3 collect and emit × 3
- #3 received: hello 2
- #3 received: hello 3-1
- #3 received: hello 3-2
- #3 received: hello 3-3
#4 just collect
- #4 received: hello 3-3
#5 collect and same emit × 2
- #5 received: hello 3-3
- #5 received: hello 5
- #5 received: hello 5
#6 same emit × 2 and collect
- #6 received: hello 6
----
start MutableSharedFlow(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.SUSPEND)
#1 collect and emit
- #1 received: hello 1
#2 emit and collect
#3 collect and emit × 3
- #3 received: hello 3-1
- #3 received: hello 3-2
- #3 received: hello 3-3
#4 just collect
#5 collect and same emit × 2
- #5 received: hello 5
- #5 received: hello 5
#6 same emit × 2 and collect
----
start MutableSharedFlow(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
#1 collect and emit
- #1 received: hello 1
#2 emit and collect
#3 collect and emit × 3
- #3 received: hello 3-3
#4 just collect
#5 collect and same emit × 2
- #5 received: hello 5
- #5 received: hello 5
#6 same emit × 2 and collect
----
start MutableSharedFlow(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)
#1 collect and emit
- #1 received: hello 1
#2 emit and collect
#3 collect and emit × 3
- #3 received: hello 3-1
#4 just collect
#5 collect and same emit × 2
- #5 received: hello 5
- #5 received: hello 5
#6 same emit × 2 and collect
----
start MutableSharedFlow(replay = 1, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
#1 collect and emit
- #1 received: hello 1
#2 emit and collect
- #2 received: hello 2
#3 collect and emit × 3
- #3 received: hello 2
- #3 received: hello 3-2
- #3 received: hello 3-3
#4 just collect
- #4 received: hello 3-3
#5 collect and same emit × 2
- #5 received: hello 3-3
- #5 received: hello 5
- #5 received: hello 5
#6 same emit × 2 and collect
- #6 received: hello 6
----
start Event
#1 collect and emit
- #1 received: Event(isConsumed=true, value=hello 1)
#2 emit and collect
- #2 received: Event(isConsumed=true, value=hello 2)
#3 collect and emit × 3
- #3 received: Event(isConsumed=true, value=hello 3-1)
- #3 received: Event(isConsumed=true, value=hello 3-2)
- #3 received: Event(isConsumed=true, value=hello 3-3)
#4 just collect
#5 collect and same emit × 2
- #5 received: Event(isConsumed=true, value=hello 5)
- #5 received: Event(isConsumed=true, value=hello 5)
#6 same emit × 2 and collect
- #6 received: Event(isConsumed=true, value=hello 6)
----
start channel.receiveAsFlow()
#1 collect and emit
- #1 received: hello 1
#2 emit and collect
- #2 received: hello 2
#3 collect and emit × 3
- #3 received: hello 3-1
- #3 received: hello 3-2
- #3 received: hello 3-3
#4 just collect
#5 collect and same emit × 2
- #5 received: hello 5
- #5 received: hello 5
#6 same emit × 2 and collect
- #6 received: hello 6
- #6 received: hello 6
----
start SharedFlowSetEmptyWhenCollect
#1 collect and emit
- #1 received: hello 1
#2 emit and collect
- #2 received: hello 2
#3 collect and emit × 3
- #3 received: hello 3-1
- #3 received: hello 3-2
- #3 received: hello 3-3
#4 just collect
#5 collect and same emit × 2
- #5 received: hello 5
- #5 received: hello 5
#6 same emit × 2 and collect
- #6 received: hello 6
----
start MutableStateFlow<String>("initial value")
#1 collect and emit
- #1 received: initial value
- #1 received: hello 1
#2 emit and collect
- #2 received: hello 2
#3 collect and emit × 3
- #3 received: hello 2
- #3 received: hello 3-3
#4 just collect
- #4 received: hello 3-3
#5 collect and same emit × 2
- #5 received: hello 3-3
- #5 received: hello 5
#6 same emit × 2 and collect
- #6 received: hello 6

結果

LiveData.postValueに近いもの(observe時に値が流れ、かつ、最新以外が失われるもの)

MutableSharedFlow(replay = 1, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)MutableStateFlow<String>("initial value")ですが、この結果であれば、MutableStateFlowで良さそうに思えます。

MutableSharedFlow(replay = 1, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)ではreplay=1によって、observe時(collect時)に保持している値が流れ、BufferOverflow.DROP_OLDESTによって最新の値が使われることによって、LiveData.postValue()に近い動きが実現されています。

MutableStateFlow<String>("initial value")はデフォルトでそのような動きになっているようで、理想的な動きになっています。

start MutableSharedFlow(replay = 1, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
#1 collect and emit
- #1 received: hello 1
#2 emit and collect
- #2 received: hello 2
#3 collect and emit × 3
- #3 received: hello 2
- #3 received: hello 3-2
- #3 received: hello 3-3
#4 just collect
- #4 received: hello 3-3
#5 collect and same emit × 2
- #5 received: hello 3-3
- #5 received: hello 5
- #5 received: hello 5
#6 same emit × 2 and collect
- #6 received: hello 6
---
start MutableStateFlow<String>("initial value")
#1 collect and emit
- #1 received: initial value
- #1 received: hello 1
#2 emit and collect
- #2 received: hello 2
#3 collect and emit × 3
- #3 received: hello 2
- #3 received: hello 3-3
#4 just collect
- #4 received: hello 3-3
#5 collect and same emit × 2
- #5 received: hello 3-3
- #5 received: hello 5
#6 same emit × 2 and collect
- #6 received: hello 6

LiveData.setValueに近いもの(observe時に値が流れ、かつ、情報が失われないもの)

MutableSharedFlow(replay = 1)がそのような挙動になります。

start MutableSharedFlow(replay = 1, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)
#1 collect and emit
- #1 received: hello 1
#2 emit and collect
- #2 received: hello 2
#3 collect and emit × 3
- #3 received: hello 2
- #3 received: hello 3-1
- #3 received: hello 3-2
- #3 received: hello 3-3
#4 just collect
- #4 received: hello 3-3
#5 collect and same emit × 2
- #5 received: hello 3-3
- #5 received: hello 5
- #5 received: hello 5
#6 same emit × 2 and collect
- #6 received: hello 6

イベント通知(observe時に値が流れず、observeしていないときのイベントも流れる)

これは以下が考えられます。Channel APIがちょっと複雑というか理解するべきことが増えるので、厄介だというのがあるので、Eventがいいかもしれません。

  • Eventクラスでラップして、このEventが消費されたかを保持する方法
  • 通知後にEmptyのオブジェクトを入れてなんとかする方法
  • Channel(capacity = UNLIMITED).receiveAsFlow()

しかし、一つ問題があって、Channelを使うやり方でないとcollectされていない状態で複数イベントが通知された場合に1つしか動きません。
Channel(capacity = UNLIMITED)を使うとLinkedListChannelが使われ、linked-listで管理されているBufferがうまく動いてくれるようです。

start Event
#1 collect and emit
- #1 received: Event(isConsumed=true, value=hello 1)
#2 emit and collect
- #2 received: Event(isConsumed=true, value=hello 2)
#3 collect and emit × 3
- #3 received: Event(isConsumed=true, value=hello 3-1)
- #3 received: Event(isConsumed=true, value=hello 3-2)
- #3 received: Event(isConsumed=true, value=hello 3-3)
#4 just collect
#5 collect and same emit × 2
- #5 received: Event(isConsumed=true, value=hello 5)
- #5 received: Event(isConsumed=true, value=hello 5)
#6 same emit × 2 and collect
- #6 received: Event(isConsumed=true, value=hello 6)
----
start channel.receiveAsFlow()
#1 collect and emit
- #1 received: hello 1
#2 emit and collect
- #2 received: hello 2
#3 collect and emit × 3
- #3 received: hello 3-1
- #3 received: hello 3-2
- #3 received: hello 3-3
#4 just collect
#5 collect and same emit × 2
- #5 received: hello 5
- #5 received: hello 5
#6 same emit × 2 and collect
- #6 received: hello 6
- #6 received: hello 6
----
start SharedFlowSetEmptyWhenCollect
#1 collect and emit
- #1 received: hello 1
#2 emit and collect
- #2 received: hello 2
#3 collect and emit × 3
- #3 received: hello 3-1
- #3 received: hello 3-2
- #3 received: hello 3-3
#4 just collect
#5 collect and same emit × 2
- #5 received: hello 5
- #5 received: hello 5
#6 same emit × 2 and collect
- #6 received: hello 6
62
46
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
62
46