8
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Coroutines FlowのHot Streamのバッファー周りの挙動を完全理解する

Last updated at Posted at 2023-06-16

概要

Coroutines FlowのHot Streamでは、バッファー状態によって動作が変わるものがたくさんあります。
たとえば、MutableSharedFlowコンストラクタのパラメーター、tryEmit()もそうです。
この辺りをまとめて理解したいという記事です。自分用にまとめてるだけなので、わかりづらいかもしれません。

ソースは、こちらにあります。
https://github.com/ko2ic/spike-kotlin-flatmap

flatMapConcat/flatMapMerge/flatMapLatestの違い

まずは、この3つのメソッドの違いから説明します。
一見するとバッファーとは関係ないように思えますが、ここを理解しておかないと混乱します。そのために、先に説明しておきます。
結論から言うとflatMapConcat()だけバッファーに関係してきます。

よく以下のシンプルなコードで説明されますが、ここでは、複数のクラスに分けて、複雑な実装で説明します。なぜかというと、もっと具体的に本番で利用されそうなコードにした方が利用シーンをイメージできるかなと思うためです。(ただし、Androidのクラスは一切使いません)

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapConcat { param ->
            flowOf(1, 2, 3)
                .onEach {
                    delay(500)
                }.map {
                    "${param}_${it} "
                }
        }.collect {
            println(it)
        }
}

まずは、Repositoryクラスを作っておきます。
Repositoryは、APIのダミーとして実装しています。ここでのイメージはload()のなかで3つのAPIが呼ばれていて、終わるたびに返している感じです(実際はそんな実装はしないと思いますが...すでに例が苦しい)。
このクラスはこれから説明するサンプルで全て同じクラスを使います。

class Repository {
    fun load(param: String) = flowOf(1, 2, 3)
        .onEach { delay(500) }
        .map { "${param}_${it}" }
}

次は、Emitterです。一般的に値を送信するクラスはEmitter(もしくはProducer)と呼ばれます。受信する側は、Subscriber(Consumer)と呼ばれます。
基本的にFlowは、Flowに対して、emit()するとSubscriberに配信されるイメージです。

最初は、MutableSharedFlowのパラメーターは、extraBufferCapacity=2にしています(これは後ほど説明します)。
changeText()メソッドで、emit()しています。

class Emitter {
    private val _shared =
        MutableSharedFlow<String>(replay = 0, extraBufferCapacity = 2, onBufferOverflow = BufferOverflow.SUSPEND)
    val sharedFlow: SharedFlow<String> = _shared.asSharedFlow()

    suspend fun changeText(text: String) {
        delay(100)
        _shared.emit(text)
    }
}

この記事全般では、main() メソッドがSubscriberの役割になります。
イメージでは、emitter.changeText()の部分がユーザ操作です。
0.1秒の間に連続で3回ユーザがテキストを変更していると想像してください。
テキストを変更したら、その入力したテキストを使って、APIに問い合わせて結果を取得しているイメージです。

fun main() {
    runBlocking {
        val repository = Repository()
        val emitter = Emitter()

        launch {
            emitter.changeText("A")
            emitter.changeText("B")
            emitter.changeText("C")
        }

        emitter.sharedFlow.flatMapXXXXX { // ここのflatMapが3種類ある
            repository.load(it)
        }.collect {
            println(it)
        }

flatMapConcat()の場合

        emitter.sharedFlow.flatMapConcat { 
            repository.load(it)
        }.collect {
            println(it)
        }

flatMapConcat()は直列に動作すると言われます。
結果です。

A_1
A_2
A_3
B_1
B_2
B_3
C_1
C_2
C_3

これは、最初のAはすぐに repository.load(it)に入ります。全てのAのloadが終わるまで、BとCはSharedFlowのバッファーで待機しています。

図にするとこうなります。

-- A_1 -- A_2 -- A_3 -->
                        -- B_1 -- B_2 -- B_3 -->
                                                -- C_1 -- C_2 -- C_3 -->  

flatMapMerge()の場合

        emitter.sharedFlow.flatMapMerge { 
            repository.load(it)
        }.collect {
            println(it)
        }

flatMapMerge() は並列に動作します。
結果です。

A_1
B_1
C_1
A_2
B_2
C_2
A_3
B_3
C_3

これは、0.1秒おきに repository.load(it)が呼ばれます。(何度もテキストを変更すると一気にサーバーの負荷が上がりますね)
入ったらすぐに次にいくので、SharedFlowにはバッファーされません。

図にするとこうなります。

-- A_1 -- A_2 -- A_3 -->
 -- B_1 -- B_2 -- B_3 -->
  -- C_1 -- C_2 -- C_3 -->

flatMapLatest()の場合

        emitter.sharedFlow.flatMapLatest { 
            repository.load(it)
        }.collect {
            println(it)
        }

flatMapLatest() は新しい値がemitされると以前の値はキャンセルされます。
結果です。
入ったらすぐに次の値が処理されるので、SharedFlowにはバッファーされません。

C_1
C_2
C_3

図にするとこうなります。
0.1秒後に次の値が入り、キャンセルされるので最後だけ表示されます。

-- A_1
 -- B_1 
   -- C_1 -- C_2 -- C_3 -->

3つのflatMap系メソッドのまとめ

  • flatMapConcat()だけが Flow内のパッファーに保持される
  • それは直列で処理が動き、終わるまで次にいかないため
  • 最初にemitされた値(ここでは"A")は、すぐに次の処理にいくのでバッファーに保持されるのは、 "B" と "C"
  • 他の二つはすぐに次の処理に入るため、バッファーは関係ない

MutableSharedFlowコンストラクタのパラメーターについて

ここでは全て、上記サンプルのflatMapConcat()を利用して、パラメーターによってどう変わるかをみます。

onBufferOverflowパラメーター

次の3つが指定できます。

  • BufferOverflow.SUSPEND
  • BufferOverflow.DROP_OLDEST // バッファーが1以上必要
  • BufferOverflow.DROP_LATEST // バッファーが1以上必要

BufferOverflow.SUSPEND

パッファーがあふれた時にemit()が一時停止します。上の例でextraBufferCapacity=2の場合にはemit()メソッドは呼ばれて、その次の行に処理は移ります。(パッファーに格納されるためです)
しかし、extraBufferCapacity=0 にするとemit()が呼ばれたあと、次の行の処理には行きません。購読処理が終わらないとemit()は待機状態になります。

つまり、コードに以下を追加するとわかります。

    suspend fun changeText(text: String) {
        delay(100)
        _shared.emit(text)
+       println("emitの次の処理") // ここ追加
    }

extraBufferCapacity=2の場合

emitの次の処理
emitの次の処理 // バッファーに入るので処理は継続される
emitの次の処理 // バッファーに入るので処理は継続される
A_1
A_2
A_3
B_1
・・・

extraBufferCapacity=0の場合

emitの次の処理
A_1
A_2
A_3
emitの次の処理  // 処理が終わるまでemitで待機しているため、終わった後に表示される
B_1
・・・

BufferOverflow.DROP_OLDEST

名称の通り、バッファーが溢れたときは、一番古い値を落とします。これを使う場合は、バッファーが一つ以上必要になるので、extraBufferCapacity=1 にします。

結果は以下です。
Aはすぐに実行されるので、バッファーはBとCが関係します。

Aが終わるまで1500msかかるので、その間にBがemitされてバッファーに入ります。100ms後にCがemitされて、Bが消えて、Cがパッファーに入ります。

emitの次の処理
emitの次の処理
emitの次の処理
A_1
A_2
A_3
C_1
C_2
C_3

BufferOverflow.DROP_LATEST

名称の通り、バッファーが溢れた時は、一番新しい値を落とします。これを使う場合は、バッファーが一つ以上必要になるので、extraBufferCapacity=1 にします。

結果は以下です。
Aはすぐに実行されるので、バッファーはBとCが関係します。

Aが終わるまで1500msかかるので、その間にBがemitされてバッファーに入ります。100ms後にCがemitされますが、すでにBがバッファーにいるので、Cはバッファーに入りません。

emitの次の処理
emitの次の処理
emitの次の処理
A_1
A_2
A_3
B_1
B_2
B_3

extraBufferCapacityパラメーター

これは上記の例でもわかるようにバッファーの数を指定できるパラメーターです。

replayパラメーター

replayCacheとも呼ばれますが、これもバッファーと考えていいでしょう。

extraBufferCapacityとの違いは、
replayの場合は、Subscriberがcollectしていない状態で、emitされた値を保持してくれます。

試しに以下のように collect()が始まる前にemitが終わるようにdelayを入れてみます。

        launch {
            emitter.changeText("A")
            emitter.changeText("B")
            emitter.changeText("C")
        }
+       delay(400)
        emitter.sharedFlow.flatMapConcat {
            repository.load(it)
        }.collect {
            println(it)
        }

replay = 0, extraBufferCapacity= 1, onBufferOverflow = BufferOverflow.SUSPENDの場合

emitがcollectがされる前に呼ばれてているので、その後にcollectされてもFlowには何も存在しません。

emitの次の処理
emitの次の処理
emitの次の処理

一方、replay = 1, extraBufferCapacity= 0, onBufferOverflow = BufferOverflow.SUSPENDの場合

collectされる前にemitされた最後の値が保持され、collectされた瞬間にFlowから取り出されます。

emitの次の処理
emitの次の処理
emitの次の処理
C_1
C_2
C_3

MutableSharedFlowコンストラクタのパラメーターのまとめ

  • バッファーが溢れた場合にどういう動作をするかを指定するためのパラメーター
  • replayとextraBufferCapacityの違いは、replayがcollect前のemitを利用できる

tryEmit()とは

emit()はsuspend関数なので、Coroutine Scopeから出ないと呼び出せませんが、tryEmit():Booleanは普通の関数ですので普通に呼び出せます。成功した場合がtrueで失敗した場合がfalseを返します。
バッファーが溢れた場合に失敗します。
ちなみに失敗するのはBufferOverflow.SUSPENDの場合だけになります。SUSPENDを指定していても一時停止するのではなくて、emitをするのが注意点です。

replay = 0, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPENDの場合

    suspend fun changeText(text: String) {
        delay(100)
-       _shared.emit(text)
+       println(_shared.tryEmit(text))
        println("emitの次の処理")
    }

tryEmit()をした時点でバッファーが0なので、全て失敗します。バッファーを使わないAも失敗しているのが注意です。
tryEmit()を使う場合は必ずバッファーが必要になります。これは何故かというとバッファーがあることで、emitで放出した値を全てのSubscriber(suspendしている)に渡すことができるからです。バッファーがないと放出した値は同期的に全てのSubscriberに渡す必要が出てしまいます。suspendなコルーチンに対してはバッファーが一つでもないと送信できないことは想像できるでしょう。

false
emitの次の処理
false
emitの次の処理
false
emitの次の処理

replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.SUSPENDの場合

Aはバッファーを使わないで処理され、Bがバッファーに格納されます。Cは失敗します。
emit()のときは、一時中断することで最後まで全てのemitした値を処理していたのに対して、tryEmit()はバッファー溢れはdropするので、注意が必要になります。ほぼ、BufferOverflow.DROP_LATESTと同じです。

true
emitの次の処理
true
emitの次の処理
false
emitの次の処理
A_1
A_2
A_3
B_1
B_2
B_3

replay = 1, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPENDの場合

「replayパラメーター」のサンプルと同じようにcollectされる前にemitを全て終わらせておいた時の結果は以下です。
全部、trueで成功になっていますが、実際に値を購読できたのは、最後の一つだけです。

つまり、falseになるのは、バッファーが足りない場合だけと覚えておくと良いでしょう。

true
emitの次の処理
true
emitの次の処理
true
emitの次の処理
C_1
C_2
C_3

tryEmit()結論

  • 通常関数。戻り値で成功か失敗かを判断。
  • onBufferOverflow = BufferOverflow.SUSPENDの場合だけfalseが返ってくる
  • trueの場合でも実際に購読者に届くかは別。Subscriberがcollect前にtryEmitした場合でもtrueが返る

おまけ

MutableStateFlowとは

こんな実装になりますが、

    private val _state: MutableStateFlow<String> = MutableStateFlow("Z")
    val stateFlow: StateFlow<String> = _state.asStateFlow()

// emitするときは、`_state.value="A"`のようにする
・・・・・・・・・・・・・・・

    emitter.stateFlow.flatMapConcat {
        repository.load(it)
    }.collect {
        println(it)

これは、以下の実装と同じになります。

    private val _shared =
        MutableSharedFlow<String>(replay = 1, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.DROP_OLDEST)
    val sharedFlow: SharedFlow<String>
        get() {
            _shared.tryEmit("Z")
            return _shared.asSharedFlow()
        }

・・・・・・・・・・・・・・・

        emitter.sharedFlow.distinctUntilChanged().flatMapConcat {
            repository.load(it)
        }.collect {
            println(it)

結果は以下です。

Z_1
Z_2
Z_3
C_1
C_2
C_3

つまり、次の動作になります。

  • 初期値があり、
  • collectした瞬間に初期値が渡ってきて、
  • それ以降のemitの最新だけが渡ってきて、
  • emitするにはCoroutine Scope内の必要がなく、
  • 連続した同じ値は渡ってこない
  • そして、Subscriberのcollectの前にemitしても最後の1つが返ってくる

これを見るとわかりますが、StateFlowは、画面表示に適切なFlowと言われています。

8
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?