この記事は、筆者のメモを基に、AIが構成・加筆を行ったものです。
Kotlin Coroutines Flow入門:非同期データストリームを理解する
Kotlin Coroutinesの一部であるFlowは、非同期に処理されるデータストリームを扱うための強力な仕組みです。リアクティブプログラミングの概念を取り入れつつ、コルーチンのシンプルさと効率性を活かしています。
この記事では、Kotlin Flowの基本的な概念から、Cold FlowとHot Flowの違い、そして重要な制約であるContext preservationとException transparencyについて、公式ドキュメントを参考に解説します。さらに、代表的なHot FlowであるSharedFlowとStateFlowについても詳しく見ていきます。
Flowとは何か?
Flow は、非同期に計算される値のシーケンスを表します。値が必要になったタイミングで(遅延的に)計算され、一度に複数の値を生成できる点が特徴です。
非同期データストリーム
Flowの核となる概念は「非同期データストリーム」です。これは、時間経過とともに順次利用可能になる値のストリームを、コルーチンを使って効率的かつ安全に扱うための仕組みです。
-
非同期性: Flowはコルーチンをベースにしているため、
suspend
関数と同様に、実行を中断・再開できます。これにより、メインスレッドをブロックすることなく、時間のかかる処理(ネットワークリクエスト、データベースアクセスなど)の結果をストリームとして扱えます。 - ストリーム: データが生成されるたびに、それを待っている処理(コレクター)へ値を送り届けます。一度にすべてのデータをメモリに保持する必要がないため、大量のデータや無限に続く可能性のあるデータ(例: ユーザー入力、位置情報の更新)も効率的に扱えます。
Flowの構成要素
Flowの操作は、主に3つの役割に分類されます。
-
ビルダー (Builder): Flowを生成する役割です。
-
flow { ... }
: 最も基本的なビルダー。ラムダ内でemit()
を呼び出して値を生成します。 -
flowOf(...)
: 固定された既知の値セットからFlowを生成します。 -
asFlow()
:List
やSequence
などのコレクションや配列をFlowに変換します。 -
channelFlow { ... }
: 内部でChannelを使用し、複数のコルーチンから安全に値をsend()
できるビルダーです(後述)。 -
MutableStateFlow
,MutableSharedFlow
: Hot Flowを生成するためのビルダー(後述)。
-
-
中間演算子 (Intermediate Operator): Flowから受け取った値を変換したり、フィルタリングしたりする役割です。これら自身もFlowを返します。
-
map { ... }
: 各値を別の値に変換します。 -
filter { ... }
: 条件に合致する値のみを通します。 -
take(count)
: 最初のcount
個の値だけを通します。 -
zip(otherFlow) { a, b -> ... }
: 2つのFlowから値を1つずつ組み合わせて新しい値を生成します。 - その他多数 (
transform
,onEach
,debounce
,distinctUntilChanged
など)
-
-
終端演算子 (Terminal Operator): Flowの処理を開始し、結果を受け取る役割です。これは
suspend
関数であり、Flowの実行をトリガーします。-
collect { ... }
: Flowから放出される各値を順に受け取って処理します。Flowの実行を開始する最も基本的な方法です。 -
single()
: Flowが単一の値のみを放出して正常終了することを表明し、その値を返します(そうでなければ例外)。 -
reduce { accumulator, value -> ... }
: Flowの値を累積計算します。 -
toList()
,toSet()
: Flowのすべての値をコレクションに集約します。
-
これらの要素を組み合わせることで、複雑な非同期データ処理パイプラインを構築できます。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// 1. ビルダー: 1から5までの数値を生成するFlow
val numberFlow = flow {
for (i in 1..5) {
delay(100) // 非同期処理をシミュレート
emit(i)
println("Emitted: $i")
}
}
// 2. 中間演算子: 偶数のみをフィルタリングし、数値を文字列に変換
val processedFlow = numberFlow
.filter { it % 2 == 0 }
.map { "Processed value: $it" }
// 3. 終端演算子: Flowの処理を開始し、結果を出力
println("Collecting...")
processedFlow.collect { value ->
println("Collected: $value")
}
println("Collection finished.")
}
/* 出力例:
Collecting...
Emitted: 1
Emitted: 2
Collected: Processed value: 2
Emitted: 3
Emitted: 4
Collected: Processed value: 4
Emitted: 5
Collection finished.
*/
Cold Flow vs Hot Flow
Flowには大きく分けて「Cold Flow」と「Hot Flow」の2種類が存在します。これは、Flowがいつデータ生成を開始し、どのようにコレクター間で共有されるかという点で異なります。
Cold Flow
- 特徴: 「コレクターが現れたら、そのコレクターのためにデータ生成を開始する」フローです。
-
動作:
collect
などの終端演算子が呼び出されるたびに、Flowビルダー内のコードが最初から実行されます。各コレクターは独立したデータストリームを受け取ります。 -
例:
flow { ... }
,flowOf()
,asFlow()
など、標準的なFlowビルダーで作成されるFlowは基本的にCold Flowです。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val coldFlow = flow {
println("Flow started for ${currentCoroutineContext()[CoroutineName]?.name}")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking {
println("Collecting first time...")
launch(CoroutineName("Collector A")) {
coldFlow.collect { println("Collector A received: $it") }
}
delay(500) // 少し待ってから2回目のcollectを開始
println("\nCollecting second time...")
launch(CoroutineName("Collector B")) {
coldFlow.collect { println("Collector B received: $it") }
}
}
/* 出力例:
Collecting first time...
Flow started for Collector A
Collector A received: 1
Collector A received: 2
Collector A received: 3
Collecting second time...
Flow started for Collector B
Collector B received: 1
Collector B received: 2
Collector B received: 3
*/
上記の例では、Collector A
とCollector B
がそれぞれcollect
を呼び出すたびに、"Flow started..."
が出力され、独立したデータシーケンス(1, 2, 3)を受け取っていることがわかります。
Hot Flow
- 特徴: 「コレクターの存在に関わらず、常にデータが流れている可能性がある」フローです。
-
動作: Flowのインスタンスが生成された時点、または特定のスコープがアクティブな間、データ生成が開始・継続されます。複数のコレクターは、同じデータストリームを共有します。コレクターが収集を開始した時点で既に流れてしまったデータは、通常受け取れません(ただし、
replay
機能で過去のデータをキャッシュすることは可能)。 -
例:
SharedFlow
,StateFlow
が代表的なHot Flowです。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// MutableSharedFlowはHot Flowを作成するビルダーの一つ
val hotFlow = MutableSharedFlow<Int>(replay = 0) // replay=0: 新しいコレクターは過去の値を受け取らない
fun main() = runBlocking {
val scope = CoroutineScope(Dispatchers.Default + CoroutineName("EmitterScope"))
// データ送信側 (Emitter)
scope.launch {
println("Emitter starting...")
hotFlow.emit(1) // Collector A はまだいないので、これは誰にも届かない (replay=0のため)
delay(200)
hotFlow.emit(2) // Collector A が受け取る
delay(200)
hotFlow.emit(3) // Collector A と B が受け取る
delay(200)
hotFlow.emit(4) // Collector A と B が受け取る
println("Emitter finished.")
}
println("Collector A starting...")
val jobA = launch(CoroutineName("Collector A")) {
delay(100) // Emitterが1をemitした後に収集開始
hotFlow.collect { println("Collector A received: $it") }
}
println("Collector B starting...")
val jobB = launch(CoroutineName("Collector B")) {
delay(300) // Emitterが2をemitした後に収集開始
hotFlow.collect { println("Collector B received: $it") }
}
delay(1000) // すべての処理が終わるのを待つ
jobA.cancel()
jobB.cancel()
scope.cancel()
}
/* 出力例:
Collector A starting...
Collector B starting...
Emitter starting...
Collector A received: 2
Collector B received: 3 // Collector Bは3から受け取る
Collector A received: 3
Collector B received: 4
Collector A received: 4
Emitter finished.
*/
この例では、hotFlow
はscope
内で生成され、コレクターの存在とは無関係にemit
が実行されます。Collector A
はemit(1)
の後、Collector B
はemit(2)
の後に収集を開始するため、それぞれ受け取る値が異なります。また、emit(3)
とemit(4)
は両方のコレクターに配信されていることがわかります。
-
比喩:
-
Cold Flow: 自動販売機。ボタン(
collect
)を押した人に対して、その都度商品(データ)を出す。 -
Hot Flow: ラジオ放送。放送局(Flow)は常に電波(データ)を発信しており、リスナー(コレクター)はチューニング(
collect
)した時点から放送を聞くことができる。
-
Cold Flow: 自動販売機。ボタン(
Flowの重要な制約
Flowを安全かつ予測可能に利用するためには、以下の2つの重要な制約を理解し、遵守する必要があります。これらの制約は、Flowの内部実装を知らなくても、利用者が安心してFlowを使えるように設計されています。
1. Context Preservation (コンテキストの維持)
原則: Flowのemit
は、そのFlowを収集しているコレクターのCoroutineContextで実行されなければならない。flow { ... }
ビルダー内でwithContext
などを使って勝手にコンテキストを切り替えてemit
することは禁止されています。
目的: 「どこで何が実行されるか」を明確にし、意図しないスレッド変更やコンテキストの混在を防ぐためです。これにより、Flowの利用者は、どのコンテキストでデータを受け取るかを予測しやすくなります。
違反例:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val myFlow = flow {
// ダメな例: flowビルダー内でコンテキストを切り替えている
withContext(Dispatchers.IO) {
println("Emitting in ${currentCoroutineContext()}") // IO Dispatcherで実行される
emit(1) // ここで例外が発生する (IllegalStateException)
}
}
fun main() = runBlocking {
try {
// デフォルトのコンテキスト (この場合はrunBlockingのコンテキスト) で収集
println("Collecting in ${currentCoroutineContext()}")
myFlow.collect { value ->
println("Collected $value in ${currentCoroutineContext()}")
}
} catch (e: Exception) {
println("Caught exception: $e") // IllegalStateException: Flow invariant is violated...
}
}
正しい方法: flowOn
演算子
Flowの実行コンテキスト(データ生成側のコンテキスト)を変更したい場合は、flowOn
演算子を使用します。flowOn
は、それより上流の演算子(ビルダーを含む)を指定されたコンテキストで実行し、下流(flowOn
以降の演算子やcollect
)は元のコレクターのコンテキストで実行します。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val myFlow = flow {
// このブロックは flowOn で指定されたコンテキスト (Dispatchers.IO) で実行される
println("Emitting in ${currentCoroutineContext()}") // IO Dispatcher
emit(1)
delay(100)
emit(2)
}.flowOn(Dispatchers.IO) // 上流の処理をIOディスパッチャで実行するよう指定
fun main() = runBlocking(CoroutineName("MainContext")) {
println("Collecting in ${currentCoroutineContext()}") // MainContext
myFlow
.map { // このmapはコレクターのコンテキスト (MainContext) で実行される
println("Mapping in ${currentCoroutineContext()}")
it * 10
}
.collect { value -> // collectもコレクターのコンテキスト (MainContext)
println("Collected $value in ${currentCoroutineContext()}")
}
}
/* 出力例:
Collecting in CoroutineName(MainContext)
Emitting in Dispatchers.IO // flowブロック内はIO
Mapping in CoroutineName(MainContext) // mapはMainContext
Collected 10 in CoroutineName(MainContext) // collectもMainContext
Emitting in Dispatchers.IO
Mapping in CoroutineName(MainContext)
Collected 20 in CoroutineName(MainContext)
*/
flowOn
を使うことで、データ生成(時間のかかる可能性のある処理)とデータ消費(UI更新など)でコンテキストを安全に分離できます。
coroutineScope
との違い:
flow { ... }
内でcoroutineScope { ... }
を使うことは許可されています。これは、coroutineScope
が親のコンテキストを継承し、新しいコンテキストを作らないため、Context Preservationの原則に違反しないからです。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
flow {
emit(1) // OK
coroutineScope {
// このスコープは親 (collectを呼び出したコルーチン) のコンテキストを引き継ぐ
println("Inside coroutineScope: ${currentCoroutineContext()}")
emit(2) // OK -- 同じコンテキストで実行される
}
}.collect { println("Collected $it in ${currentCoroutineContext()}") }
}
複数のコンテキストでemit
したい場合: channelFlow
もし、Flowの生成ロジック内で、どうしても複数の異なるコンテキストから値をemit
(正確にはsend
)したい場合は、channelFlow
ビルダーを使用します。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val channelBasedFlow = channelFlow {
// channelFlow内では ProducerScope が提供され、send() を使う
launch(Dispatchers.IO + CoroutineName("SenderIO")) {
println("Sending 1 from ${currentCoroutineContext()}")
send(1) // IOコンテキストから送信可能
}
launch(Dispatchers.Default + CoroutineName("SenderDefault")) {
println("Sending 2 from ${currentCoroutineContext()}")
send(2) // Defaultコンテキストから送信可能
}
}.flowOn(Dispatchers.Main + CoroutineName("ReceiverMain")) // channelFlow自体もflowOnでコンテキストを指定できる
// ※ AndroidなどMainディスパッチャが必要な環境で実行してください
// fun main() = runBlocking { ... } ではMainがないためエラーになります
/*
fun main() = runBlocking { // 仮の実行 (Mainがないため実際には動かない可能性)
println("Collecting channelBasedFlow in ${currentCoroutineContext()}")
channelBasedFlow.collect {
println("Collected $it in ${currentCoroutineContext()}")
}
}
*/
channelFlow
は内部でChannelを使っており、異なるコンテキストからのsend
を安全に処理する仕組みを提供します。ただし、flow
ビルダーよりも若干オーバーヘッドがあるため、Context Preservationを守れるならflow
を使う方が効率的です。
2. Exception Transparency (例外の透過性)
原則: Flowの生成側(ビルダーや中間演算子)で発生した例外は、そのまま下流のコレクターに伝播されるべきであり、Flow内部でtry-catch
を使って握りつぶしてはいけない(ただし、特定の例外を別の値に変換するcatch
演算子は除く)。
目的: 例外処理の責任をFlowの利用者に委ねることで、Flowの実装をシンプルに保ち、例外発生時の挙動を予測可能にするためです。
違反例:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val badFlow = flow {
try {
emit(1)
throw RuntimeException("Something went wrong in the flow")
emit(2) // これは実行されない
} catch (e: Exception) {
println("Caught exception inside flow: $e")
// 例外を握りつぶして、正常終了したかのように見せかけるのはNG
}
}
fun main() = runBlocking {
println("Collecting badFlow...")
badFlow.collect { println("Collected: $it") } // 例外が発生したことがコレクターに伝わらない
println("badFlow finished.")
}
/* 出力例:
Collecting badFlow...
Collected: 1
Caught exception inside flow: java.lang.RuntimeException: Something went wrong in the flow
badFlow finished. // コレクターは例外を知らないまま終了
*/
正しい方法: コレクター側での例外処理 or catch
演算子
例外はFlowの外、つまり終端演算子(collect
など)を呼び出す側でtry-catch
するか、Flowチェーンの下流でcatch
演算子を使って処理します。
コレクター側でtry-catch
:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val goodFlow = flow {
emit(1)
throw RuntimeException("Something went wrong in the flow")
emit(2)
}
fun main() = runBlocking {
println("Collecting goodFlow...")
try {
goodFlow.collect { println("Collected: $it") }
} catch (e: Exception) {
println("Caught exception during collection: $e") // コレクターが例外を捕捉
}
println("goodFlow finished.")
}
/* 出力例:
Collecting goodFlow...
Collected: 1
Caught exception during collection: java.lang.RuntimeException: Something went wrong in the flow
goodFlow finished.
*/
catch
演算子:
catch
演算子は、自身より上流で発生した例外を捕捉し、代替の値を発行したり、特定の処理を行ったりすることができます。例外はcatch
ブロックで消費され、下流には伝播しません(catch
内で再度throw
しない限り)。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val resilientFlow = flow {
emit("Value 1")
throw IllegalStateException("Error occurred")
emit("Value 2") // 実行されない
}
fun main() = runBlocking {
resilientFlow
.map { "Processed: $it" } // 例外発生前の値は処理される
.catch { e ->
println("Caught exception in catch operator: $e")
emit("Fallback value due to error") // 代替の値を発行
}
.collect { value ->
println("Collected: $value")
}
}
/* 出力例:
Collected: Processed: Value 1
Caught exception in catch operator: java.lang.IllegalStateException: Error occurred
Collected: Fallback value due to error
*/
重要: catch
演算子は、下流で発生した例外は捕捉できません。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
try {
flow { emit(1) }
.catch { /* ここは呼ばれない */ }
.map { throw RuntimeException("Exception in map") }
.collect { /* ここも呼ばれない */ } // 例外はここで発生し、外部に伝播する
} catch (e: Exception) {
println("Caught exception outside: $e") // RuntimeException: Exception in map
}
}
上流と下流の両方で例外が発生した場合:
もしFlowの処理中に、上流(例: onCompletion
内)と下流(例: collect
ブロック内)の両方で例外が発生した場合、原則として下流(コレクター側)の例外が優先され、上流の例外はそれにaddSuppressed
される形で扱われます。これは、finally
ブロックで例外がスローされた場合のJavaの挙動に似ています。コレクターでの処理中の問題が根本原因と見なされるべき、という考え方に基づいています。
これらの制約を守ることで、Flowを使ったコードはより堅牢で、理解しやすくなります。
SharedFlow: Hot Flowの代表格
SharedFlow は、Hot Flowのインターフェースであり、その実装として MutableSharedFlow
が提供されています。コレクター間で値を共有(ブロードキャスト)する目的で使われます。
特徴と利点
-
ホットストリーム: コレクターの存在に関わらず、値が
emit
される可能性があります。 -
ブロードキャスト:
emit
された値は、その時点でアクティブなすべてのコレクターに配信されます。 - 設定可能なリプレイキャッシュ: 新しいコレクターに対して、直近のいくつかの値を再送(リプレイ)するように設定できます。
-
バッファリング:
emit
の速度がcollect
の処理速度を上回った場合に備えて、値を一時的に保持するバッファを設定できます。
ユースケース例: イベントのブロードキャスト
UIイベント、バックグラウンド処理の進捗通知など、アプリケーション内の複数の場所で同じイベントを受け取りたい場合に便利です。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// ViewModelなどでイベントを発行する例
class EventViewModel(private val scope: CoroutineScope) {
// プライベートなMutableSharedFlowでイベントを発行
private val _events = MutableSharedFlow<String>()
// 公開用には読み取り専用のSharedFlowを公開
val events: SharedFlow<String> = _events.asSharedFlow()
fun triggerEvent(message: String) {
scope.launch {
println("ViewModel emitting: $message")
_events.emit(message)
}
}
}
// 複数の場所でイベントを購読する例
fun main() = runBlocking {
val viewModel = EventViewModel(this)
val job1 = launch(CoroutineName("Observer 1")) {
viewModel.events.collect { event ->
println("Observer 1 received: $event")
}
}
val job2 = launch(CoroutineName("Observer 2")) {
delay(50) // 少し遅れて購読開始
viewModel.events.collect { event ->
println("Observer 2 received: $event")
}
}
delay(10)
viewModel.triggerEvent("Event A") // Observer 1 のみが受け取る
delay(100)
viewModel.triggerEvent("Event B") // Observer 1 と 2 が受け取る
delay(100)
job1.cancel()
job2.cancel()
}
/* 出力例:
ViewModel emitting: Event A
Observer 1 received: Event A
ViewModel emitting: Event B
Observer 1 received: Event B
Observer 2 received: Event B // Observer 2もEvent Bを受け取る
*/
MutableSharedFlow
の主要パラメータ
MutableSharedFlow(...)
コンストラクタやshareIn
演算子でSharedFlowを作成する際には、以下の重要なパラメータを設定できます。
-
replay
: (デフォルト: 0)-
新しいコレクターが購読を開始したときに、過去に
emit
された値のうち、直近の何個を即座に受け取るか を指定します。 -
replay = 1
なら最新の1つ、replay = 5
なら最新の5つを受け取ります。 -
0
の場合は、購読開始後にemit
された値のみを受け取ります。 - これはあくまで「新しいコレクター向けのキャッシュ」であり、既に購読中のコレクターには影響しません。
-
新しいコレクターが購読を開始したときに、過去に
-
extraBufferCapacity
: (デフォルト: 0)-
replay
キャッシュとは別に、emit
された値を一時的に保持するための追加バッファサイズを指定します。 - コレクターの処理が追いつかない場合に、
emit
された値がこのバッファに格納されます。 -
replay
とextraBufferCapacity
の合計が、SharedFlowが内部的に保持できる値の最大数になります(onBufferOverflow
の設定によります)。
-
-
onBufferOverflow
: (デフォルト:BufferOverflow.SUSPEND
)-
replay
キャッシュとextraBufferCapacity
を合わせたバッファが満杯になったときのemit
側の挙動を決定します。-
SUSPEND
:emit
しようとしているコルーチンを一時停止し、バッファに空きができるまで待機します。最も一般的な挙動です。 -
DROP_OLDEST
: バッファが満杯の場合、バッファ内で最も古い値を捨てて、新しい値を格納します。emit
は中断しません。 -
DROP_LATEST
: バッファが満杯の場合、新しくemit
されようとしている値を捨てます。emit
は中断しません。
-
-
これらのパラメータを調整することで、ユースケースに応じたSharedFlowの挙動を細かく制御できます。
shareIn
演算子
既存のCold FlowをHotなSharedFlowに変換するための演算子です。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val coldFlow = flow {
println("Cold flow started")
emit("Data from cold flow")
delay(100)
println("Cold flow finished")
}
fun main() = runBlocking {
val scope = CoroutineScope(Dispatchers.Default)
// coldFlowをSharedFlowに変換
val sharedFlow = coldFlow.shareIn(
scope = scope, // SharedFlowがアクティブになるスコープ
started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 500), // 購読者がいなくなって0.5秒後に上流のFlowを停止
replay = 1 // 最新の値を1つリプレイ
)
println("Collector 1 subscribing...")
val job1 = launch { sharedFlow.collect { println("Collector 1: $it") } }
delay(50) // Collector 1が購読開始 -> coldFlowが開始される
println("Collector 2 subscribing...")
val job2 = launch { sharedFlow.collect { println("Collector 2: $it (from replay)") } } // replayキャッシュから値を受け取る
delay(200) // coldFlowが完了するのを待つ
println("Collector 1 unsubscribing...")
job1.cancel()
delay(100)
println("Collector 2 unsubscribing...")
job2.cancel()
delay(600) // stopTimeoutMillis (500ms) を超える -> coldFlowが停止するはず (ログは出ないが内部的に)
println("SharedFlow should be stopped now.")
scope.cancel()
}
/* 出力例:
Collector 1 subscribing...
Cold flow started // Collector 1の購読により開始
Collector 1: Data from cold flow
Collector 2 subscribing...
Collector 2: Data from cold flow (from replay) // Collector 2はリプレイを受け取る
Cold flow finished
Collector 1 unsubscribing...
Collector 2 unsubscribing...
SharedFlow should be stopped now.
*/
-
scope
: SharedFlowの共有を開始・管理するためのコルーチンスコープ。 -
started
: 共有(上流のCold Flowの実行)をいつ開始・停止するかを定義するポリシー。-
SharingStarted.Eagerly
:scope
がアクティブになるとすぐに共有を開始し、scope
がキャンセルされるまで継続。 -
SharingStarted.Lazily
: 最初のコレクターが現れたら共有を開始し、scope
がキャンセルされるまで継続(コレクターがいなくなっても停止しない)。 -
SharingStarted.WhileSubscribed(...)
: 最初のコレクターが現れたら共有を開始し、最後のコレクターがいなくなったら(オプションのタイムアウト後に)共有を停止する。リソース効率が良い場合に推奨されます。
-
-
replay
:MutableSharedFlow
と同様、新しいコレクターにリプレイする値の数。
shareIn
は、一度だけ実行したい高コストな処理の結果を、複数のコレクターで効率的に共有したい場合に特に有用です。
StateFlow: 状態管理に特化したSharedFlow
StateFlow は、SharedFlow
をベースにした、現在の状態を保持・通知することに特化したHot Flowです。UIの状態管理などで広く利用されています。
特徴
-
常に単一の最新状態を持つ:
value
プロパティを通じて、現在の状態に同期的にアクセスできます。 -
replay = 1
相当の動作: 新しいコレクターは、購読を開始するとすぐに現在の状態を受け取ります。 -
onBufferOverflow = DROP_OLDEST
相当の動作: 状態の更新が非常に速い場合、中間的な状態はコレクターに通知されずに破棄され、最新の状態のみが配信されることが保証されます(Conflation)。 -
distinctUntilChanged
相当の動作: 同じ値が連続してemit
(設定)されても、コレクターには通知されません。状態が実際に変化した場合のみ通知されます。
MutableStateFlow
は、内部的には以下のようなMutableSharedFlow
とほぼ等価と考えることができます。
// MutableStateFlow(initialValue) は conceptually には以下に近い
val shared = MutableSharedFlow<StateType>(
replay = 1, // 常に最新の値を1つリプレイ
onBufferOverflow = BufferOverflow.DROP_OLDEST // 中間値は破棄 (Conflation)
)
// 初期値を設定 (tryEmitはsuspendしないemit)
shared.tryEmit(initialValue)
// StateFlowとして振る舞うように、連続する同一値の通知を抑制
val state: StateFlow<StateType> = shared.distinctUntilChanged()
MutableStateFlow
StateFlow
の更新可能なバージョンです。value
プロパティに新しい値を代入することで状態を更新します。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// ViewModelでの状態管理例
class CounterViewModel(private val scope: CoroutineScope) {
private val _count = MutableStateFlow(0) // 初期値0で作成
val count: StateFlow<Int> = _count.asStateFlow() // 読み取り専用を公開
fun increment() {
// valueプロパティを更新して状態を変更
// スレッドセーフな更新が必要な場合は update { current -> current + 1 } を使う
_count.value += 1
println("ViewModel count updated to: ${_count.value}")
}
fun decrement() {
_count.value -= 1
println("ViewModel count updated to: ${_count.value}")
}
}
// UI層などでの状態購読例
fun main() = runBlocking {
val viewModel = CounterViewModel(this)
val job = launch {
println("Collector starting, initial value: ${viewModel.count.value}")
viewModel.count.collect { currentCount ->
// 状態が変化した場合のみcollectが呼ばれる
println("Collector received update: $currentCount")
}
}
delay(100)
viewModel.increment() // count: 1
delay(100)
viewModel.increment() // count: 2
delay(100)
viewModel.decrement() // count: 1
delay(100)
viewModel.increment() // count: 2
delay(100)
// 同じ値を設定してもコレクターには通知されない
println("Setting value to 2 again...")
viewModel._count.value = 2 // distinctUntilChangedにより通知されない
delay(100)
job.cancel()
}
/* 出力例:
Collector starting, initial value: 0
Collector received update: 0
ViewModel count updated to: 1
Collector received update: 1
ViewModel count updated to: 2
Collector received update: 2
ViewModel count updated to: 1
Collector received update: 1
ViewModel count updated to: 2
Collector received update: 2
Setting value to 2 again... // コレクターへの通知はなし
*/
StateFlow利用時の注意点 (Conflation)
StateFlow
の重要な特性としてConflation(合流)があります。これは、emit
(状態更新)が非常に速く行われた場合、コレクターが処理する前に中間的な状態が破棄され、最新の状態のみが通知されることを意味します。
例: 状態が 1 -> 2 -> 3
と非常に短時間で変化した場合、コレクターは 1
を受け取った後、次に 3
を受け取り、2
はスキップされる可能性があります。
これは通常、UIの状態表示など「最新の状態が重要」なケースでは問題ありません。しかし、状態の変化そのものをトリガーとして副作用(例: ネットワークリクエスト、ログ送信)を実行したい場合には注意が必要です。なぜなら、スキップされた状態変化に対応する副作用が実行されない可能性があるからです。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val state = MutableStateFlow(0)
suspend fun collectAndLog() {
state.collect { value ->
println("Collected: $value. Performing side effect...")
delay(100) // 副作用の処理に時間がかかると仮定
println("Side effect for $value done.")
}
}
fun main() = runBlocking {
val collectorJob = launch { collectAndLog() }
delay(10)
println("Updating state to 1")
state.value = 1
println("Updating state to 2")
state.value = 2 // 1の副作用が終わる前に2に更新
println("Updating state to 3")
state.value = 3 // 2の副作用が終わる前に3に更新 (2はスキップされる可能性)
delay(500) // 処理完了を待つ
collectorJob.cancel()
}
/* 出力例の可能性の一つ:
Collected: 0. Performing side effect...
Updating state to 1
Updating state to 2
Updating state to 3
Side effect for 0 done.
Collected: 3. Performing side effect... // 1と2がスキップされた!
Side effect for 3 done.
*/
この例のように、collect
内の処理(副作用)が完了する前にStateFlow
の値が複数回更新されると、中間的な値(この場合は1と2)がcollect
に届かない可能性があります。
対策: 状態変化ごとに確実に副作用を実行したい場合は、StateFlow
ではなくSharedFlow(replay=0)
を使うか、Flow.onEach
と組み合わせて副作用をcollect
の外で処理するなどの工夫が必要です。Jetpack ComposeのLaunchedEffect
でStateFlow
をキーにする場合も同様の注意が必要です。
まとめ
Kotlin Flowは、コルーチンベースの強力な非同期データストリーム処理ライブラリです。
-
Cold Flowは
collect
されるたびに実行され、各コレクターは独立したストリームを受け取ります。 -
Hot Flow(
SharedFlow
,StateFlow
)はコレクターとは独立してデータが流れる可能性があり、複数のコレクターでストリームを共有します。 -
Context PreservationとException Transparencyという2つの制約により、Flowは安全で予測可能な動作を提供します。コンテキスト切り替えには
flowOn
、例外処理にはcatch
演算子やコレクター側でのtry-catch
を使用します。 -
SharedFlowはイベントのブロードキャストに適しており、
replay
やバッファリングを細かく設定できます。 -
StateFlowは状態管理に特化し、常に最新の値を保持し、
distinctUntilChanged
とConflationの特性を持ちます。
これらの概念を理解することで、Kotlinアプリケーションにおける非同期処理をより効率的かつ宣言的に記述できるようになります。