1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

今更ながらKotlin Flow の docをちゃんとみる

Posted at

この記事は、筆者のメモを基に、AIが構成・加筆を行ったものです。

Kotlin Coroutines Flow入門:非同期データストリームを理解する

Kotlin Coroutinesの一部であるFlowは、非同期に処理されるデータストリームを扱うための強力な仕組みです。リアクティブプログラミングの概念を取り入れつつ、コルーチンのシンプルさと効率性を活かしています。

この記事では、Kotlin Flowの基本的な概念から、Cold FlowとHot Flowの違い、そして重要な制約であるContext preservationとException transparencyについて、公式ドキュメントを参考に解説します。さらに、代表的なHot FlowであるSharedFlowとStateFlowについても詳しく見ていきます。

Flowとは何か?

Flow は、非同期に計算される値のシーケンスを表します。値が必要になったタイミングで(遅延的に)計算され、一度に複数の値を生成できる点が特徴です。

非同期データストリーム

Flowの核となる概念は「非同期データストリーム」です。これは、時間経過とともに順次利用可能になる値のストリームを、コルーチンを使って効率的かつ安全に扱うための仕組みです。

  • 非同期性: Flowはコルーチンをベースにしているため、suspend関数と同様に、実行を中断・再開できます。これにより、メインスレッドをブロックすることなく、時間のかかる処理(ネットワークリクエスト、データベースアクセスなど)の結果をストリームとして扱えます。
  • ストリーム: データが生成されるたびに、それを待っている処理(コレクター)へ値を送り届けます。一度にすべてのデータをメモリに保持する必要がないため、大量のデータや無限に続く可能性のあるデータ(例: ユーザー入力、位置情報の更新)も効率的に扱えます。

Flowの構成要素

Flowの操作は、主に3つの役割に分類されます。

  1. ビルダー (Builder): Flowを生成する役割です。
    • flow { ... }: 最も基本的なビルダー。ラムダ内でemit()を呼び出して値を生成します。
    • flowOf(...): 固定された既知の値セットからFlowを生成します。
    • asFlow(): ListSequenceなどのコレクションや配列をFlowに変換します。
    • channelFlow { ... }: 内部でChannelを使用し、複数のコルーチンから安全に値をsend()できるビルダーです(後述)。
    • MutableStateFlow, MutableSharedFlow: Hot Flowを生成するためのビルダー(後述)。
  2. 中間演算子 (Intermediate Operator): Flowから受け取った値を変換したり、フィルタリングしたりする役割です。これら自身もFlowを返します。
    • map { ... }: 各値を別の値に変換します。
    • filter { ... }: 条件に合致する値のみを通します。
    • take(count): 最初のcount個の値だけを通します。
    • zip(otherFlow) { a, b -> ... }: 2つのFlowから値を1つずつ組み合わせて新しい値を生成します。
    • その他多数 (transform, onEach, debounce, distinctUntilChanged など)
  3. 終端演算子 (Terminal Operator): Flowの処理を開始し、結果を受け取る役割です。これはsuspend関数であり、Flowの実行をトリガーします。
    • collect { ... }: Flowから放出される各値を順に受け取って処理します。Flowの実行を開始する最も基本的な方法です。
    • single(): Flowが単一の値のみを放出して正常終了することを表明し、その値を返します(そうでなければ例外)。
    • reduce { accumulator, value -> ... }: Flowの値を累積計算します。
    • toList(), toSet(): Flowのすべての値をコレクションに集約します。

これらの要素を組み合わせることで、複雑な非同期データ処理パイプラインを構築できます。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    // 1. ビルダー: 1から5までの数値を生成するFlow
    val numberFlow = flow {
        for (i in 1..5) {
            delay(100) // 非同期処理をシミュレート
            emit(i)
            println("Emitted: $i")
        }
    }

    // 2. 中間演算子: 偶数のみをフィルタリングし、数値を文字列に変換
    val processedFlow = numberFlow
        .filter { it % 2 == 0 }
        .map { "Processed value: $it" }

    // 3. 終端演算子: Flowの処理を開始し、結果を出力
    println("Collecting...")
    processedFlow.collect { value ->
        println("Collected: $value")
    }
    println("Collection finished.")
}

/* 出力例:
Collecting...
Emitted: 1
Emitted: 2
Collected: Processed value: 2
Emitted: 3
Emitted: 4
Collected: Processed value: 4
Emitted: 5
Collection finished.
*/

Cold Flow vs Hot Flow

Flowには大きく分けて「Cold Flow」と「Hot Flow」の2種類が存在します。これは、Flowがいつデータ生成を開始し、どのようにコレクター間で共有されるかという点で異なります。

Cold Flow

  • 特徴:コレクターが現れたら、そのコレクターのためにデータ生成を開始する」フローです。
  • 動作: collectなどの終端演算子が呼び出されるたびに、Flowビルダー内のコードが最初から実行されます。各コレクターは独立したデータストリームを受け取ります。
  • 例: flow { ... }, flowOf(), asFlow() など、標準的なFlowビルダーで作成されるFlowは基本的にCold Flowです。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val coldFlow = flow {
    println("Flow started for ${currentCoroutineContext()[CoroutineName]?.name}")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking {
    println("Collecting first time...")
    launch(CoroutineName("Collector A")) {
        coldFlow.collect { println("Collector A received: $it") }
    }

    delay(500) // 少し待ってから2回目のcollectを開始

    println("\nCollecting second time...")
    launch(CoroutineName("Collector B")) {
        coldFlow.collect { println("Collector B received: $it") }
    }
}

/* 出力例:
Collecting first time...
Flow started for Collector A
Collector A received: 1
Collector A received: 2
Collector A received: 3

Collecting second time...
Flow started for Collector B
Collector B received: 1
Collector B received: 2
Collector B received: 3
*/

上記の例では、Collector ACollector Bがそれぞれcollectを呼び出すたびに、"Flow started..."が出力され、独立したデータシーケンス(1, 2, 3)を受け取っていることがわかります。

Hot Flow

  • 特徴:コレクターの存在に関わらず、常にデータが流れている可能性がある」フローです。
  • 動作: Flowのインスタンスが生成された時点、または特定のスコープがアクティブな間、データ生成が開始・継続されます。複数のコレクターは、同じデータストリームを共有します。コレクターが収集を開始した時点で既に流れてしまったデータは、通常受け取れません(ただし、replay機能で過去のデータをキャッシュすることは可能)。
  • 例: SharedFlow, StateFlow が代表的なHot Flowです。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// MutableSharedFlowはHot Flowを作成するビルダーの一つ
val hotFlow = MutableSharedFlow<Int>(replay = 0) // replay=0: 新しいコレクターは過去の値を受け取らない

fun main() = runBlocking {
    val scope = CoroutineScope(Dispatchers.Default + CoroutineName("EmitterScope"))

    // データ送信側 (Emitter)
    scope.launch {
        println("Emitter starting...")
        hotFlow.emit(1) // Collector A はまだいないので、これは誰にも届かない (replay=0のため)
        delay(200)
        hotFlow.emit(2) // Collector A が受け取る
        delay(200)
        hotFlow.emit(3) // Collector A と B が受け取る
        delay(200)
        hotFlow.emit(4) // Collector A と B が受け取る
        println("Emitter finished.")
    }

    println("Collector A starting...")
    val jobA = launch(CoroutineName("Collector A")) {
        delay(100) // Emitterが1をemitした後に収集開始
        hotFlow.collect { println("Collector A received: $it") }
    }

    println("Collector B starting...")
    val jobB = launch(CoroutineName("Collector B")) {
        delay(300) // Emitterが2をemitした後に収集開始
        hotFlow.collect { println("Collector B received: $it") }
    }

    delay(1000) // すべての処理が終わるのを待つ
    jobA.cancel()
    jobB.cancel()
    scope.cancel()
}

/* 出力例:
Collector A starting...
Collector B starting...
Emitter starting...
Collector A received: 2
Collector B received: 3 // Collector Bは3から受け取る
Collector A received: 3
Collector B received: 4
Collector A received: 4
Emitter finished.
*/

この例では、hotFlowscope内で生成され、コレクターの存在とは無関係にemitが実行されます。Collector Aemit(1)の後、Collector Bemit(2)の後に収集を開始するため、それぞれ受け取る値が異なります。また、emit(3)emit(4)は両方のコレクターに配信されていることがわかります。

  • 比喩:
    • Cold Flow: 自動販売機。ボタン(collect)を押した人に対して、その都度商品(データ)を出す。
    • Hot Flow: ラジオ放送。放送局(Flow)は常に電波(データ)を発信しており、リスナー(コレクター)はチューニング(collect)した時点から放送を聞くことができる。

Flowの重要な制約

Flowを安全かつ予測可能に利用するためには、以下の2つの重要な制約を理解し、遵守する必要があります。これらの制約は、Flowの内部実装を知らなくても、利用者が安心してFlowを使えるように設計されています。

1. Context Preservation (コンテキストの維持)

原則: Flowのemitは、そのFlowを収集しているコレクターのCoroutineContextで実行されなければならないflow { ... }ビルダー内でwithContextなどを使って勝手にコンテキストを切り替えてemitすることは禁止されています。

目的: 「どこで何が実行されるか」を明確にし、意図しないスレッド変更やコンテキストの混在を防ぐためです。これにより、Flowの利用者は、どのコンテキストでデータを受け取るかを予測しやすくなります。

違反例:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val myFlow = flow {
    // ダメな例: flowビルダー内でコンテキストを切り替えている
    withContext(Dispatchers.IO) {
        println("Emitting in ${currentCoroutineContext()}") // IO Dispatcherで実行される
        emit(1) // ここで例外が発生する (IllegalStateException)
    }
}

fun main() = runBlocking {
    try {
        // デフォルトのコンテキスト (この場合はrunBlockingのコンテキスト) で収集
        println("Collecting in ${currentCoroutineContext()}")
        myFlow.collect { value ->
            println("Collected $value in ${currentCoroutineContext()}")
        }
    } catch (e: Exception) {
        println("Caught exception: $e") // IllegalStateException: Flow invariant is violated...
    }
}

正しい方法: flowOn演算子

Flowの実行コンテキスト(データ生成側のコンテキスト)を変更したい場合は、flowOn演算子を使用します。flowOnは、それより上流の演算子(ビルダーを含む)を指定されたコンテキストで実行し、下流(flowOn以降の演算子やcollect)は元のコレクターのコンテキストで実行します。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val myFlow = flow {
    // このブロックは flowOn で指定されたコンテキスト (Dispatchers.IO) で実行される
    println("Emitting in ${currentCoroutineContext()}") // IO Dispatcher
    emit(1)
    delay(100)
    emit(2)
}.flowOn(Dispatchers.IO) // 上流の処理をIOディスパッチャで実行するよう指定

fun main() = runBlocking(CoroutineName("MainContext")) {
    println("Collecting in ${currentCoroutineContext()}") // MainContext
    myFlow
        .map { // このmapはコレクターのコンテキスト (MainContext) で実行される
            println("Mapping in ${currentCoroutineContext()}")
            it * 10
        }
        .collect { value -> // collectもコレクターのコンテキスト (MainContext)
            println("Collected $value in ${currentCoroutineContext()}")
        }
}

/* 出力例:
Collecting in CoroutineName(MainContext)
Emitting in Dispatchers.IO // flowブロック内はIO
Mapping in CoroutineName(MainContext) // mapはMainContext
Collected 10 in CoroutineName(MainContext) // collectもMainContext
Emitting in Dispatchers.IO
Mapping in CoroutineName(MainContext)
Collected 20 in CoroutineName(MainContext)
*/

flowOnを使うことで、データ生成(時間のかかる可能性のある処理)とデータ消費(UI更新など)でコンテキストを安全に分離できます。

coroutineScopeとの違い:

flow { ... }内でcoroutineScope { ... }を使うことは許可されています。これは、coroutineScope親のコンテキストを継承し、新しいコンテキストを作らないため、Context Preservationの原則に違反しないからです。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    flow {
        emit(1) // OK
        coroutineScope {
            // このスコープは親 (collectを呼び出したコルーチン) のコンテキストを引き継ぐ
            println("Inside coroutineScope: ${currentCoroutineContext()}")
            emit(2) // OK -- 同じコンテキストで実行される
        }
    }.collect { println("Collected $it in ${currentCoroutineContext()}") }
}

複数のコンテキストでemitしたい場合: channelFlow

もし、Flowの生成ロジック内で、どうしても複数の異なるコンテキストから値をemit(正確にはsend)したい場合は、channelFlowビルダーを使用します。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val channelBasedFlow = channelFlow {
    // channelFlow内では ProducerScope が提供され、send() を使う
    launch(Dispatchers.IO + CoroutineName("SenderIO")) {
        println("Sending 1 from ${currentCoroutineContext()}")
        send(1) // IOコンテキストから送信可能
    }
    launch(Dispatchers.Default + CoroutineName("SenderDefault")) {
        println("Sending 2 from ${currentCoroutineContext()}")
        send(2) // Defaultコンテキストから送信可能
    }
}.flowOn(Dispatchers.Main + CoroutineName("ReceiverMain")) // channelFlow自体もflowOnでコンテキストを指定できる

// ※ AndroidなどMainディスパッチャが必要な環境で実行してください
// fun main() = runBlocking { ... } ではMainがないためエラーになります
/*
fun main() = runBlocking { // 仮の実行 (Mainがないため実際には動かない可能性)
    println("Collecting channelBasedFlow in ${currentCoroutineContext()}")
    channelBasedFlow.collect {
        println("Collected $it in ${currentCoroutineContext()}")
    }
}
*/

channelFlowは内部でChannelを使っており、異なるコンテキストからのsendを安全に処理する仕組みを提供します。ただし、flowビルダーよりも若干オーバーヘッドがあるため、Context Preservationを守れるならflowを使う方が効率的です。

2. Exception Transparency (例外の透過性)

原則: Flowの生成側(ビルダーや中間演算子)で発生した例外は、そのまま下流のコレクターに伝播されるべきであり、Flow内部でtry-catchを使って握りつぶしてはいけない(ただし、特定の例外を別の値に変換するcatch演算子は除く)。

目的: 例外処理の責任をFlowの利用者に委ねることで、Flowの実装をシンプルに保ち、例外発生時の挙動を予測可能にするためです。

違反例:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val badFlow = flow {
    try {
        emit(1)
        throw RuntimeException("Something went wrong in the flow")
        emit(2) // これは実行されない
    } catch (e: Exception) {
        println("Caught exception inside flow: $e")
        // 例外を握りつぶして、正常終了したかのように見せかけるのはNG
    }
}

fun main() = runBlocking {
    println("Collecting badFlow...")
    badFlow.collect { println("Collected: $it") } // 例外が発生したことがコレクターに伝わらない
    println("badFlow finished.")
}
/* 出力例:
Collecting badFlow...
Collected: 1
Caught exception inside flow: java.lang.RuntimeException: Something went wrong in the flow
badFlow finished. // コレクターは例外を知らないまま終了
*/

正しい方法: コレクター側での例外処理 or catch演算子

例外はFlowの外、つまり終端演算子(collectなど)を呼び出す側でtry-catchするか、Flowチェーンの下流catch演算子を使って処理します。

コレクター側でtry-catch:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val goodFlow = flow {
    emit(1)
    throw RuntimeException("Something went wrong in the flow")
    emit(2)
}

fun main() = runBlocking {
    println("Collecting goodFlow...")
    try {
        goodFlow.collect { println("Collected: $it") }
    } catch (e: Exception) {
        println("Caught exception during collection: $e") // コレクターが例外を捕捉
    }
    println("goodFlow finished.")
}
/* 出力例:
Collecting goodFlow...
Collected: 1
Caught exception during collection: java.lang.RuntimeException: Something went wrong in the flow
goodFlow finished.
*/

catch演算子:

catch演算子は、自身より上流で発生した例外を捕捉し、代替の値を発行したり、特定の処理を行ったりすることができます。例外はcatchブロックで消費され、下流には伝播しません(catch内で再度throwしない限り)。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val resilientFlow = flow {
    emit("Value 1")
    throw IllegalStateException("Error occurred")
    emit("Value 2") // 実行されない
}

fun main() = runBlocking {
    resilientFlow
        .map { "Processed: $it" } // 例外発生前の値は処理される
        .catch { e ->
            println("Caught exception in catch operator: $e")
            emit("Fallback value due to error") // 代替の値を発行
        }
        .collect { value ->
            println("Collected: $value")
        }
}
/* 出力例:
Collected: Processed: Value 1
Caught exception in catch operator: java.lang.IllegalStateException: Error occurred
Collected: Fallback value due to error
*/

重要: catch演算子は、下流で発生した例外は捕捉できません

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    try {
        flow { emit(1) }
            .catch { /* ここは呼ばれない */ }
            .map { throw RuntimeException("Exception in map") }
            .collect { /* ここも呼ばれない */ } // 例外はここで発生し、外部に伝播する
    } catch (e: Exception) {
        println("Caught exception outside: $e") // RuntimeException: Exception in map
    }
}

上流と下流の両方で例外が発生した場合:

もしFlowの処理中に、上流(例: onCompletion内)と下流(例: collectブロック内)の両方で例外が発生した場合、原則として下流(コレクター側)の例外が優先され、上流の例外はそれにaddSuppressedされる形で扱われます。これは、finallyブロックで例外がスローされた場合のJavaの挙動に似ています。コレクターでの処理中の問題が根本原因と見なされるべき、という考え方に基づいています。

これらの制約を守ることで、Flowを使ったコードはより堅牢で、理解しやすくなります。

SharedFlow: Hot Flowの代表格

SharedFlow は、Hot Flowのインターフェースであり、その実装として MutableSharedFlow が提供されています。コレクター間で値を共有(ブロードキャスト)する目的で使われます。

特徴と利点

  • ホットストリーム: コレクターの存在に関わらず、値がemitされる可能性があります。
  • ブロードキャスト: emitされた値は、その時点でアクティブなすべてのコレクターに配信されます。
  • 設定可能なリプレイキャッシュ: 新しいコレクターに対して、直近のいくつかの値を再送(リプレイ)するように設定できます。
  • バッファリング: emitの速度がcollectの処理速度を上回った場合に備えて、値を一時的に保持するバッファを設定できます。

ユースケース例: イベントのブロードキャスト

UIイベント、バックグラウンド処理の進捗通知など、アプリケーション内の複数の場所で同じイベントを受け取りたい場合に便利です。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// ViewModelなどでイベントを発行する例
class EventViewModel(private val scope: CoroutineScope) {
    // プライベートなMutableSharedFlowでイベントを発行
    private val _events = MutableSharedFlow<String>()
    // 公開用には読み取り専用のSharedFlowを公開
    val events: SharedFlow<String> = _events.asSharedFlow()

    fun triggerEvent(message: String) {
        scope.launch {
            println("ViewModel emitting: $message")
            _events.emit(message)
        }
    }
}

// 複数の場所でイベントを購読する例
fun main() = runBlocking {
    val viewModel = EventViewModel(this)

    val job1 = launch(CoroutineName("Observer 1")) {
        viewModel.events.collect { event ->
            println("Observer 1 received: $event")
        }
    }

    val job2 = launch(CoroutineName("Observer 2")) {
        delay(50) // 少し遅れて購読開始
        viewModel.events.collect { event ->
            println("Observer 2 received: $event")
        }
    }

    delay(10)
    viewModel.triggerEvent("Event A") // Observer 1 のみが受け取る
    delay(100)
    viewModel.triggerEvent("Event B") // Observer 1 と 2 が受け取る

    delay(100)
    job1.cancel()
    job2.cancel()
}
/* 出力例:
ViewModel emitting: Event A
Observer 1 received: Event A
ViewModel emitting: Event B
Observer 1 received: Event B
Observer 2 received: Event B // Observer 2もEvent Bを受け取る
*/

MutableSharedFlowの主要パラメータ

MutableSharedFlow(...)コンストラクタやshareIn演算子でSharedFlowを作成する際には、以下の重要なパラメータを設定できます。

  • replay: (デフォルト: 0)
    • 新しいコレクターが購読を開始したときに、過去にemitされた値のうち、直近の何個を即座に受け取るか を指定します。
    • replay = 1なら最新の1つ、replay = 5なら最新の5つを受け取ります。
    • 0の場合は、購読開始後にemitされた値のみを受け取ります。
    • これはあくまで「新しいコレクター向けのキャッシュ」であり、既に購読中のコレクターには影響しません。
  • extraBufferCapacity: (デフォルト: 0)
    • replayキャッシュとは別に、emitされた値を一時的に保持するための追加バッファサイズを指定します。
    • コレクターの処理が追いつかない場合に、emitされた値がこのバッファに格納されます。
    • replayextraBufferCapacityの合計が、SharedFlowが内部的に保持できる値の最大数になります(onBufferOverflowの設定によります)。
  • onBufferOverflow: (デフォルト: BufferOverflow.SUSPEND)
    • replayキャッシュとextraBufferCapacityを合わせたバッファが満杯になったときのemit側の挙動を決定します。
      • SUSPEND: emitしようとしているコルーチンを一時停止し、バッファに空きができるまで待機します。最も一般的な挙動です。
      • DROP_OLDEST: バッファが満杯の場合、バッファ内で最も古い値を捨てて、新しい値を格納します。emitは中断しません。
      • DROP_LATEST: バッファが満杯の場合、新しくemitされようとしている値を捨てます。emitは中断しません。

これらのパラメータを調整することで、ユースケースに応じたSharedFlowの挙動を細かく制御できます。

shareIn演算子

既存のCold FlowをHotなSharedFlowに変換するための演算子です。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val coldFlow = flow {
    println("Cold flow started")
    emit("Data from cold flow")
    delay(100)
    println("Cold flow finished")
}

fun main() = runBlocking {
    val scope = CoroutineScope(Dispatchers.Default)

    // coldFlowをSharedFlowに変換
    val sharedFlow = coldFlow.shareIn(
        scope = scope, // SharedFlowがアクティブになるスコープ
        started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 500), // 購読者がいなくなって0.5秒後に上流のFlowを停止
        replay = 1 // 最新の値を1つリプレイ
    )

    println("Collector 1 subscribing...")
    val job1 = launch { sharedFlow.collect { println("Collector 1: $it") } }
    delay(50) // Collector 1が購読開始 -> coldFlowが開始される

    println("Collector 2 subscribing...")
    val job2 = launch { sharedFlow.collect { println("Collector 2: $it (from replay)") } } // replayキャッシュから値を受け取る
    delay(200) // coldFlowが完了するのを待つ

    println("Collector 1 unsubscribing...")
    job1.cancel()
    delay(100)
    println("Collector 2 unsubscribing...")
    job2.cancel()

    delay(600) // stopTimeoutMillis (500ms) を超える -> coldFlowが停止するはず (ログは出ないが内部的に)
    println("SharedFlow should be stopped now.")

    scope.cancel()
}
/* 出力例:
Collector 1 subscribing...
Cold flow started        // Collector 1の購読により開始
Collector 1: Data from cold flow
Collector 2 subscribing...
Collector 2: Data from cold flow (from replay) // Collector 2はリプレイを受け取る
Cold flow finished
Collector 1 unsubscribing...
Collector 2 unsubscribing...
SharedFlow should be stopped now.
*/
  • scope: SharedFlowの共有を開始・管理するためのコルーチンスコープ。
  • started: 共有(上流のCold Flowの実行)をいつ開始・停止するかを定義するポリシー。
    • SharingStarted.Eagerly: scopeがアクティブになるとすぐに共有を開始し、scopeがキャンセルされるまで継続。
    • SharingStarted.Lazily: 最初のコレクターが現れたら共有を開始し、scopeがキャンセルされるまで継続(コレクターがいなくなっても停止しない)。
    • SharingStarted.WhileSubscribed(...): 最初のコレクターが現れたら共有を開始し、最後のコレクターがいなくなったら(オプションのタイムアウト後に)共有を停止する。リソース効率が良い場合に推奨されます。
  • replay: MutableSharedFlowと同様、新しいコレクターにリプレイする値の数。

shareInは、一度だけ実行したい高コストな処理の結果を、複数のコレクターで効率的に共有したい場合に特に有用です。

StateFlow: 状態管理に特化したSharedFlow

StateFlow は、SharedFlowをベースにした、現在の状態を保持・通知することに特化したHot Flowです。UIの状態管理などで広く利用されています。

特徴

  • 常に単一の最新状態を持つ: valueプロパティを通じて、現在の状態に同期的にアクセスできます。
  • replay = 1相当の動作: 新しいコレクターは、購読を開始するとすぐに現在の状態を受け取ります。
  • onBufferOverflow = DROP_OLDEST相当の動作: 状態の更新が非常に速い場合、中間的な状態はコレクターに通知されずに破棄され、最新の状態のみが配信されることが保証されます(Conflation)。
  • distinctUntilChanged相当の動作: 同じ値が連続してemit(設定)されても、コレクターには通知されません。状態が実際に変化した場合のみ通知されます。

MutableStateFlowは、内部的には以下のようなMutableSharedFlowとほぼ等価と考えることができます。

// MutableStateFlow(initialValue) は conceptually には以下に近い
val shared = MutableSharedFlow<StateType>(
    replay = 1, // 常に最新の値を1つリプレイ
    onBufferOverflow = BufferOverflow.DROP_OLDEST // 中間値は破棄 (Conflation)
)
// 初期値を設定 (tryEmitはsuspendしないemit)
shared.tryEmit(initialValue)
// StateFlowとして振る舞うように、連続する同一値の通知を抑制
val state: StateFlow<StateType> = shared.distinctUntilChanged()

MutableStateFlow

StateFlowの更新可能なバージョンです。valueプロパティに新しい値を代入することで状態を更新します。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// ViewModelでの状態管理例
class CounterViewModel(private val scope: CoroutineScope) {
    private val _count = MutableStateFlow(0) // 初期値0で作成
    val count: StateFlow<Int> = _count.asStateFlow() // 読み取り専用を公開

    fun increment() {
        // valueプロパティを更新して状態を変更
        // スレッドセーフな更新が必要な場合は update { current -> current + 1 } を使う
        _count.value += 1
        println("ViewModel count updated to: ${_count.value}")
    }

    fun decrement() {
        _count.value -= 1
        println("ViewModel count updated to: ${_count.value}")
    }
}

// UI層などでの状態購読例
fun main() = runBlocking {
    val viewModel = CounterViewModel(this)

    val job = launch {
        println("Collector starting, initial value: ${viewModel.count.value}")
        viewModel.count.collect { currentCount ->
            // 状態が変化した場合のみcollectが呼ばれる
            println("Collector received update: $currentCount")
        }
    }

    delay(100)
    viewModel.increment() // count: 1
    delay(100)
    viewModel.increment() // count: 2
    delay(100)
    viewModel.decrement() // count: 1
    delay(100)
    viewModel.increment() // count: 2
    delay(100)
    // 同じ値を設定してもコレクターには通知されない
    println("Setting value to 2 again...")
    viewModel._count.value = 2 // distinctUntilChangedにより通知されない
    delay(100)

    job.cancel()
}
/* 出力例:
Collector starting, initial value: 0
Collector received update: 0
ViewModel count updated to: 1
Collector received update: 1
ViewModel count updated to: 2
Collector received update: 2
ViewModel count updated to: 1
Collector received update: 1
ViewModel count updated to: 2
Collector received update: 2
Setting value to 2 again... // コレクターへの通知はなし
*/

StateFlow利用時の注意点 (Conflation)

StateFlowの重要な特性としてConflation(合流)があります。これは、emit(状態更新)が非常に速く行われた場合、コレクターが処理する前に中間的な状態が破棄され、最新の状態のみが通知されることを意味します。

例: 状態が 1 -> 2 -> 3 と非常に短時間で変化した場合、コレクターは 1 を受け取った後、次に 3 を受け取り、2 はスキップされる可能性があります。

これは通常、UIの状態表示など「最新の状態が重要」なケースでは問題ありません。しかし、状態の変化そのものをトリガーとして副作用(例: ネットワークリクエスト、ログ送信)を実行したい場合には注意が必要です。なぜなら、スキップされた状態変化に対応する副作用が実行されない可能性があるからです。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val state = MutableStateFlow(0)

suspend fun collectAndLog() {
    state.collect { value ->
        println("Collected: $value. Performing side effect...")
        delay(100) // 副作用の処理に時間がかかると仮定
        println("Side effect for $value done.")
    }
}

fun main() = runBlocking {
    val collectorJob = launch { collectAndLog() }

    delay(10)
    println("Updating state to 1")
    state.value = 1
    println("Updating state to 2")
    state.value = 2 // 1の副作用が終わる前に2に更新
    println("Updating state to 3")
    state.value = 3 // 2の副作用が終わる前に3に更新 (2はスキップされる可能性)

    delay(500) // 処理完了を待つ
    collectorJob.cancel()
}
/* 出力例の可能性の一つ:
Collected: 0. Performing side effect...
Updating state to 1
Updating state to 2
Updating state to 3
Side effect for 0 done.
Collected: 3. Performing side effect... // 1と2がスキップされた!
Side effect for 3 done.
*/

この例のように、collect内の処理(副作用)が完了する前にStateFlowの値が複数回更新されると、中間的な値(この場合は1と2)がcollectに届かない可能性があります。

対策: 状態変化ごとに確実に副作用を実行したい場合は、StateFlowではなくSharedFlow(replay=0)を使うか、Flow.onEachと組み合わせて副作用をcollectの外で処理するなどの工夫が必要です。Jetpack ComposeのLaunchedEffectStateFlowをキーにする場合も同様の注意が必要です。

まとめ

Kotlin Flowは、コルーチンベースの強力な非同期データストリーム処理ライブラリです。

  • Cold Flowcollectされるたびに実行され、各コレクターは独立したストリームを受け取ります。
  • Hot FlowSharedFlow, StateFlow)はコレクターとは独立してデータが流れる可能性があり、複数のコレクターでストリームを共有します。
  • Context PreservationException Transparencyという2つの制約により、Flowは安全で予測可能な動作を提供します。コンテキスト切り替えにはflowOn、例外処理にはcatch演算子やコレクター側でのtry-catchを使用します。
  • SharedFlowはイベントのブロードキャストに適しており、replayやバッファリングを細かく設定できます。
  • StateFlowは状態管理に特化し、常に最新の値を保持し、distinctUntilChangedとConflationの特性を持ちます。

これらの概念を理解することで、Kotlinアプリケーションにおける非同期処理をより効率的かつ宣言的に記述できるようになります。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?