※ソース記事はこちら
suspend関数は非同期に単一の値を返却するが、非同期に計算される複数の値を返却することはできるのか?これはKotlin Flowが役立つところである。
複数の値の表現
複数の値はKotlinではコレクションを使って表現される。例えば、3つの数字のリストを返却するsimple
関数を持つことができ、そしてforEachを使ってそれらをすべて出力する。
fun simple(): List<Int> = listOf(1, 2, 3)
fun main() {
simple().forEach { value -> println(value) }
}
このコードは以下を出力する。
1
2
3
シーケンス
もしいくらかCPUを消費するブロッキングコード(それぞれの計算は100msかかる)を持つ、数値の計算がある場合、Sequenceを使って数値を表現することができる。
fun simple(): Sequence<Int> = sequence { // シーケンスビルダー
for (i in 1..3) {
Thread.sleep(100) // 計算しているふりをする
yield(i) // 次の値を生み出す
}
}
fun main() {
simple().forEach { value -> println(value) }
}
このコードは同じ数値を出力するが、それぞれを出力する前に100ms待機する。
suspend関数
しかし、この計算はコードを実行しているスレッドをブロックする。これらの値が、simple
関数をsuspend
修飾子で印をつけることができる非同期コードで計算すると、ブロッキング無しで作業を行うことができ、リストとして結果を返却する。
import kotlinx.coroutines.*
suspend fun simple(): List<Int> {
delay(1000) // ここで非同期なことをするふりをする
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
このコードは1秒待って数字を出力する。
Flow
List<Int>
の結果型を使うことは、一度にすべての値を返却することしかできない。非同期に計算される値のストリームを表現するため、Flow<Int>
型をつかうことができ、同期的に値を計算するためのSequence<Int>
とちょうど同じように使うことができる。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow { // flow ビルダー
for (i in 1..3) {
delay(100) // ここで何か役に立つことをするふりをする
emit(i) // 次の値を放出する
}
}
fun main() = runBlocking<Unit> {
// mainスレッドがブロックされるか確認するため、並列にコルーチンを起動する
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// flowを収集する
simple().collect { value -> println(value) }
}
このコードは、mainスレッドをブロックせず、それぞれの数を出力する前に100ms待機する。このことは、mainスレッドで実行する別のコルーチンによって、"I'm not blocked"が100msおきに出力されることによって確認できる。
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
Flowを使ったコードと、以前の例では次の違いがあることに注意すること。
- Flow型のビルダー関数は、flowと呼ばれる。
-
flow { ... }
ビルダーブロック内部のコードは、一時停止されることがある。 -
simple
関数は今ではsuspend
修飾子はついていない。 - 値はemit関数により、放出される。
- 値はcollect関数により、収集される。
simple
のflow { ... }
のボディにあるdelayをThread.sleep
に置き換えることができ、その場合、mainスレッドがブロックされることがわかる。
flowはcoldである
flowはsequenceに似てcoldストリームである。つまりflowビルダー内のコードは、flowが収集されるまで、実行されない。このことは次の例で明らかになる。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
これは次の出力となる。
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
これは(flowを返却する)simple
関数が、suspend
修飾子がついていない主な理由である。それ自体、simple()
はすぐに返却され、何も待つことはしない。flowは収集されるときはいつでも開始する。それは再度collect
を呼び出したとき、"Flow started"が表示される理由である。
flowのキャンセルの基本
flowはコルーチンの一般的な協調的キャンセルに従う。通常、flowの収集は、delayのようなキャンセルできるsuspend関数のおいてflowがsuspendされるとき、キャンセルすることができる。次の例では、flowがwithTimeoutOrNullブロックで実行しているときにどのようにキャンセルされ、コードの実行が中止する方法を見ることができる。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // 250ms後、タイムアウト
simple().collect { value -> println(value) }
}
println("Done")
}
次の出力を生成し、flowにより二つの数字だけが放出されることに注意すること。
Emitting 1
1
Emitting 2
2
Done
より詳細はflowのキャンセルチェックを参照のこと。
flowビルダー
以前の例からのflow { ... }
ビルダーは、最も基本的なものである。より簡単なflowの宣言のための他のビルダーが存在する。
- flowOfビルダーは固定の値を放出するflowを定義する
- 様々なコレクションやシーケンスは、
asFlow()
拡張関数を使うことで、flowに変換できる
こうして1から3の数字を出力する例は次のように書くことができる。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }
}
中間のflow演算子
flowはコレクションやシーケンスと全く同じように、演算子で変形することができ、中間の演算子は上流のflowに適用され、下流のflowを返却する。これらの演算子は、flowと全く同じようにcoldである。そのような演算子の呼び出しは、それ自体suspend関数ではない。それは素早く動作し、新しく変形されたflowの定義を返却する。
基本的な演算子は、mapやfilerのようななじみのある名前を持つ。シーケンスとの重要な違いは、これらの演算子の内部のコードブロックがsuspend関数を呼ぶことができるということである。
例えば、着信要求のflowはmap演算子を使って結果がマップされることができる。たとえ要求の実行が、suspend関数で実装された長時間動作する操作であってもである。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun performRequest(request: Int): String {
delay(1000) // 長時間の非同期動作のふりをする
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // 要求のflow
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
これは、以下の3行を生成し、それぞれの行はそれぞれ1秒後に現れる。
response 1
response 2
response 3
変形演算子
flowの変形演算子のうち、もっとも一般的なものはtransformと呼ばれる。それはより複雑な変形と同様に、mapやfilerのような簡単な変形の真似をすることに使われる。transform
演算子を使って、任意の回数、任意の値をemitすることができる。
例えば、transform
を使って、長時間実行する非同期要求の前に一つの文字列を放出し、応答でそれに続けることができる。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun performRequest(request: Int): String {
delay(1000) // 長時間の非同期処理のふりをする
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // 要求のflow
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
}
このコードの出力は次のようになる。
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
サイズを制限する演算子
takeのようなサイズを制限する中間演算子は、対応する制限に達したとき、flowの実行をキャンセルする。コルーチンでのキャンセルは常に例外をスローすることによって実行されるため、すべてのリソース管理関数(try { ... }
やfinally { ... }
ブロック)はキャンセルの場合、通常に動作する。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // 最初の二つだけ取る
.collect { value -> println(value) }
}
終端flow演算子
flowにおける終端演算子は、flowの収集を開始するsuspend関数である。collect演算子は最も基本的であるが、他の終端演算子も存在し、それらは以下のことを簡単にすることができる。
例えば以下の場合、
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val sum = (1..5).asFlow()
.map { it * it } // 1から5までの数値を二乗する
.reduce { a, b -> a + b } // それらを合計する (終端演算子)
println(sum)
}
単一の値を出力する。
35
flowはシーケンシャルである
flowfの個々の収集は、使われているる複数のflowを操作する特別な演算子無しで、シーケンシャルに実行される。収集は終端演算子を呼ぶコルーチン内で直接作用する。デフォルトでは新しいコルーチンは起動されない。それぞれの放出された値は、上流から下流にかけてすべての中間演算子によって処理され、その後、後から終端演算子に届けられる。
偶数の整数をフィルターして、それらを文字列にmapする次の例を参照してほしい。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
}
次のようにを生成される。
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
flowコンテキスト
flowの収集は、コルーチンを呼ぶコンテキストで常に発生する。例えば、simple
flowがある場合、次のコードはこのコードの著者によって指定されたコンテキストにおいて実行され、それはsimple
flowの実装の詳細に無関係である。
withContext(context) {
simple().collect { value ->
println(value) // 指定されたコンテキストで実行される
}
}
このflow属性はコンテキスト保存と呼ばれる。
このように、flow { ... }
ビルダー内のコードは、デフォルトでflowに対するコレクターによって提供されるコンテキストで実行される。例えば、呼び出されたスレッドを表示して、3つの数値を放出するsimple
関数の実装を考えてみる。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
このコードを実行すると次の結果となる。
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
simple().collect
は、メインスレッドから呼び出されるため、simple
のflow本体もメインスレッドで呼び出される。これは早く動かすため、あるいは、実行コンテキストについて気にせず、呼び出しをブロックしない非同期コードのための、完全なデフォルトである。
withContextでの不正な放出
しかし、長時間動作するCPUを消費するコードは、Dispatcher.Defaultのコンテキストで実行する必要性があるかもしれず、UIを更新するコードは、Dispatchers.Mainのコンテキストで実行する必要があるかもしれない。たいてい、withContextが、Kotlinコルーチンを使っているコードでコンテキストを変更するために使われるが、flow { ... }
ビルダー内のコードは、コンテキスト保存属性が与えられているため、異なるコンテキストからemitすることは許可されていない。
次のコードを試してみよう。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
// flowビルダー内でCPUを消費するコードのため、コンテキストを変更する間違った方法
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // CPUを消費する方法で計算するふりをする
emit(i) // 次の値を放出する
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
このコードは次の例外を生成する。
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
flowOn演算子
その例外は、flowの放出のコンテキストを変更するには、flowOn関数を使うべきだと言っている。flowのコンテキストを変更する正しい方法は、以下の例でみることができ、それが全くどのように動作するかを見せるために、対応するスレッドの名前も出力する。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // CPUを消費する方法で計算するふりをする
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // flowビルダー内でCPUを消費するコードのためにコンテキストを変更する正しい方法
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
収集はメインスレッドで起こる一方、flow { ... }
はバックグラウンドスレッドで動作することに注意すること。
ここで観察するもう一つのことは、flowOn演算子はflowのデフォルトのシーケンシャルな性質を変更したということである。今では収集は、あるコルーチン("coroutine#1")で発生し、放出は収集のコルーチンと並行な別のスレッドで動作するもう一つのコルーチン("coroutine#2")で発生している。flowOn演算子は、それがコンテキスト内でCoroutineDispatcherを変更する必要がある場合は、上流のflowのために別のコルーチンを作る。
バッファリング
様々なコルーチンの中で、様々なflowの部分が実行されることは、flowの収集にかかかる時間という観点から、特に長時間実行される非同期の操作が関係すると、役立つことがありうる。例えば、simple
のflowの放出が遅く、一つの要素を生成するのに100msかかり、コレクターも遅く、一つの要素を処理するのに300msかかる場合を考えてみよう。3つの数値を持つそのようなflowの収集にどのくらい時間がかかるか、見てみよう。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 非同期に100ms待つふりをする
emit(i) // 次の値を放出する
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // 300msでそれを処理するふりをする
println(value)
}
}
println("Collected in $time ms")
}
それはこのようなものを出力し、全体の収集はおおよそ1200ms(3つの数値でそれぞれ400ms)かかる。
1
2
3
Collected in 1220 ms
simple
flowの放出するコードを、シーケンシャルで実行するのとは対照的に、収集するコードで並列に実行するために、flowにおいてbuffer演算子を使うことができる。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 非同期に100ms待つふりをする
emit(i) // 次の値を放出する
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.buffer() // 放出をバッファリングし、待機しない
.collect { value ->
delay(300) // 300ms処理するふりをする
println(value)
}
}
println("Collected in $time ms")
}
最初の数値のために100ms待機する必要があり、その後、それぞれの数値を処理するため、300ms秒だけ一時的する、効果的に処理経路を作ったため、それは同じ数値をまさにより早く生成する。この方法では、実行に1000ms前後かかる。
1
2
3
Collected in 1071 ms
留意すべきは、flowOn演算子は、CoroutineDispatcherを変更する必要があるとき、同じバッファリングの仕組みを使うが、ここでは、実行コンテキストを変えずに明示的にバッファリング要求をしている。
合成
flowが操作の一部の結果あるいは、操作の状態の更新を表す場合、それぞれの値を処理する必要がなく、代わりに最新のものだけでよいかもしれない。この場合、コレクタが処理するのが遅すぎるとき、中間の値をスキップするのに、conflate演算子を使うことができる。前回の例を足場にしてみよう。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 非同期に100ms待つふりをする
emit(i) // 次の値を放出する
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.conflate() // 放出を合成する。それぞれの値を処理しない
.collect { value ->
delay(300) // 300msの間、処理するふりをする
println(value)
}
}
println("Collected in $time ms")
}
最初の値はまだ処理されるが、2つめと3つめはすでに生成され、2つめは合成され、最新の(3つめのもの)だけが、コレクタに配達される。
1
3
Collected in 758 ms
最新の値の処理
合成はエミッタとコレクタが両方とも遅い場合、処理を高速化する一つの方法である。それは放出された値をやめることで行われる。他の方法は、遅いコレクタをキャンセルし、新しい値が放出されたらいつでも再起動することである。xxx
演算子と基本的に同じロジックを実行するが、新しい値に基づいて、ブロック内のコードをキャンセルするxxxLatest
演算子ファミリーが存在する。前回の例におけるconflateをcollectLatestに変えてみよう。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 非同期に100ms待つふりをする
emit(i) //次の値を放出する
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.collectLatest { value -> // 最近の値について、キャンセルかつ再起動する
println("Collecting $value")
delay(300) // 300msの間、処理するふりをする
println("Done $value")
}
}
println("Collected in $time ms")
}
collectLatestの本体は300msかかるが、新しい値は100msごとに放出されるため、すべての値においてブロックは実行されるが、最後の値のためにのみ、完了することが、見て取れる。
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms
複数のflowの構成
複数のflowを構成にするにはたくさんの方法がある。
Zip
Kotlin標準ライブラリのSequence.zip拡張関数のように、flowには対応する二つのflowの値を合成するzip演算子がある。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow() // 数値 1..3
val strs = flowOf("one", "two", "three") // 文字列
nums.zip(strs) { a, b -> "$a -> $b" } // 単一の文字列に構成する
.collect { println(it) } // 収集と表示
}
この例は次の出力をする。
1 -> one
2 -> two
3 -> three
Combine
flowが、変数または操作の最も最近の値を表すとき(合成における関連する章も参照のこと)、対応するflowの最新の値に依存して計算を実行し、上流のflowが値を放出するときは常に再計算する必要があるかもしれない。対応する演算子ファミリーはcombineと呼ばれる。
例えば、前回の例で数値が300msごとに更新されるが、文字列は400msごとに更新される場合、zip演算子を使ってzipすると、結果を400msおきに出力するにも関わらず、引き続き同じ結果を生成する。
この例では、それぞれの要素を遅延するために、onEach中間演算子を使用し、同一のflowを放出するコードをより宣言的、簡潔ににする。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow().onEach { delay(300) } // 数値 1..3 300 msごと
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 文字列 400msごと
val startTime = System.currentTimeMillis() // 開始時刻を記録
nums.zip(strs) { a, b -> "$a -> $b" } // "zip"で単一の文字列を構成
.collect { value -> // 収集と表示
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
しかし、zipの代わりに、ここでcombile演算子を使う。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow().onEach { delay(300) } // 数値 1..3 300 msごと
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 文字列 400msごと
val startTime = System.currentTimeMillis() // 開始時刻を記録
nums.combine(strs) { a, b -> "$a -> $b" } // "combine"で単一の文字列を構成
.collect { value -> // 収集と表示
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
かなり異なる出力が得られ、nums
またはstr
のflowからのそれぞれの放出で行が表示される。
1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start
flowの平準化
flowは非同期に受けっとた値のシーケンスを表すため、それぞれの値が、他の値のシーケンス要求を引き起こすような状況に非常に簡単に陥る。例えば、500msおきに二つの文字列のflowを返却する次の関数がある。
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 500 ms待つ
emit("$i: Second")
}
そこで、このように三つの整数のフローがあり、それぞれのためにrequestFlow
が呼び出される。
(1..3).asFlow().map { requestFlow(it) }
その後、flowのflow(Flow<Flow<String>>
)になってしまい、さらなる処理のため、単一のflowに平準化する必要がある。コレクションやシーケンスではこのために、flattenやflatMap演算子がある。しかし、flowの非同期の性質のため、平準化の異なるモードが必要とされ、そういうものとして、flowの平準化演算子ファミリーがある。
flatMapConcat
連結モードはflatMapConcatとflattenConcat演算子によって実装されている。次の例で見えるように、それらは、次の値を収集する前に、内部のflowが完了するまで待機する。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 開始時刻を記録する
(1..3).asFlow().onEach { delay(100) } // 100msごとの数値
.flatMapConcat { requestFlow(it) }
.collect { value -> // 収集と表示
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
出力にはflatMapConcatのシーケンシャルな性質が明らかに見える。
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
flatMapMerge
もう一つの平準化モードは、着信するすべてのflowを並列に収集し、それらの値を単一のflowにマージするため、値はできるだけ早く放出される。それはflatMapMergeとflattenMerge演算子により、実装されている。それらは両方オプションのconcurrency
パラメータを持ち、同時に収集するflowの並列数を制限する。(デフォルトはDEFAULT_CONCURRENCYに等しい)
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 開始時刻を記録する
(1..3).asFlow().onEach { delay(100) } // 100msごとの数値
.flatMapMerge { requestFlow(it) }
.collect { value -> // 収集と表示
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
flatMapMergeの並列な性質が明らかである。
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
留意すべきは、flatMapMergeはそのコードブロック(この例では{ requestFlow(it) }
)をシーケンシャルに呼ぶが、結果のflowを並列に収集する。それは最初にシーケンシャルなmap { requestFlow(it) }
を実行し、その後で結果においてflattenMergeを呼ぶことに等しい。
flatMapLatest
最新の値の処理のセクションで見た、collectLatest演算子と似た方法で、対応する"最近の"平準化モードがあり、それは新しいflowが放出されるとすぐに、前回のflowの収集がキャンセルされるというものである。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 500ms待機
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 開始時刻を記録する
(1..3).asFlow().onEach { delay(100) } // 100msごとの数値
.flatMapLatest { requestFlow(it) }
.collect { value -> // 収集と表示
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
この例のここの出力は、flatMapLatestがどのように動作するかの良い実演である。
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
留意すべきは、flatMapLatestは、新しい値が来るとすぐにそのブロック({ requestFlow(it) }
)内のすべてのコードをキャンセルする。この特殊な例では違いがない。なぜなら、requestFlow
自体への呼び出しは早く、suspendもせず、キャンセルすることはできない。ただし、そこでdelay
のようなsuspend関数を使うような場合、違いが際立つようになる。
flow例外
flowの収集は、放出側あるいは、演算子の内側のコードが例外をスローする場合、例外で完了することがある。これらの例外を扱ういくつかの方法がある。
コレクタのtry catch
コレクタは例外を扱うために、Kotlinのtry/catch
を使うことができる。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // 次の値を放出する
}
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
このコードは、collect終端演算子で例外を正常にキャッチし、ご覧のようにその後では、もう値は放出されない。
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
すべてはキャッチされる
前回の例では、放出側や、中間あるいは終端演算子で発生したどんな例外も実際キャッチする。例えば、値を放出するコードを文字列にmapするようにコードを変更してみよう。しかし、対応するコードでは例外が発生する。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // 次の値を放出する
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
この例外は依然としてキャッチされ、収集は停止する。
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
例外の透過性
しかし、どのように放出者のコードを例外を扱うふるまいにカプセル化することができるか?
flowは例外に対して透明でなければならず、try/catch
ブロック内部からflow { ... }
ビルダーで値を放出することは、例外の透過性に違反することである。これは前回の例のように、コレクタがスローされた例外を常にtry/catch
ブロックでキャッチできることを保証している。
放出側は、この例外の透過性を保存し、例外ハンドリングをカプセル化するためにcatch演算子を使うことができる。catch
演算子の本体で、例外を分析でき、キャッチされた例外の種類に応じて、様々な方法で反応することができる。
- 例外は
throw
で再スローすることができる - 例外はcatchの本体から、emitを使うことで、値の放出に変えることができる
- 例外は無視、あるいはログ、あるいはその他のコードによって処理されることができる
例えば、例外をキャッチするとテキストを放出するようにしてみよう。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // 次の値を放出する
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> emit("Caught $e") } // 例外時の放出
.collect { value -> println(value) }
}
この例の出力は、コードの周りにもうtry/catch
は無いにも関わらず、同じである。
透過的なキャッチ
catch中間演算子は、例外の透過性のおかげで、上流の例外(それはcatch
の上のすべての演算子からの例外であり、その下は対象外)だけをキャッチする。もしcollect { ... }
内のブロック(catch
の下に置かれている)で例外がスローされた場合、それは漏れ出る。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> println("Caught $e") } // 下流の例外はキャッチしない
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
catch
演算子があるにも関わらず、"Caught..."メッセージは出力されない。
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at ...
宣言的なcatch
catch演算子の宣言的な性質と、すべての例外を扱いたいという欲望を合成することができる、つまりcollect演算子の本体をonEachに移動し、それをcatch
演算子の前に置く。このflowの収集は、パラメータ無しでcollect()
を呼び出すことによって引き起こされなければならない。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
}
これで、"Caught..."メッセージが出力されるのを見ることができ、明示的なtry/catch
ブロックを使うことなく、すべての例外をキャッチすることができる。
Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2
flowの完了
flowの収集が(正常あるいは例外で)完了するとき、動作を実行する必要があるかもしれない。すでに気づいているかもしれないが、二つの方法、つまり命令的あるいは宣言的にそれが可能である。
命令的なfinallyブロック
try/catch
に加えて、コレクターはcollect
の完了時に、動作を実行するためにfinally
ブロックも使うことができる。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} finally {
println("Done")
}
}
このコードは、simple
flowによって生成された3つの数字と、つづいて"Done"文字列を出力する。
1
2
3
Done
宣言的ハンドリング
宣言的なアプローチのため、flowにはflowが完全に収集したとき、呼び出されるonCompletion中間演算子がある。
前回の例をonCompletion演算子を使って書き直し、同じ出力を生成することができる。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
simple()
.onCompletion { println("Done") }
.collect { value -> println(value) }
}
onCompletionの重要な利点は、flowの収集が正常か例外で完了したのか、決めるために使うことができる、ラムダのnull許容のThrowable
パラメータである。次の例で、simple
flowは数字の1を放出した後で例外をスローする。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
予想しているかもしれないように、以下の出力をする。
1
Flow completed exceptionally
Caught exception
onCompletion演算子は、catchと異なり、例外を扱わない。上記の例のコードからわかるように、例外は引き続き下流へ流れる。それはさらにonCompletion
演算子に渡され、catch
演算子でハンドルすることもできる。
正常な完了
catch演算子とのもう一つの違いは、onCompletionはすべての例外が見えており、(キャンセルや失敗以外の)上流のflowの正常終了においてのみ、null
の例外を受け取るということである。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
完了の理由がnullではないことがわかる。なぜなら、flowは下流の例外のために異常終了したからである。
1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
命令的か宣言的か
今では、flowを収集し、命令的と宣言的両方で、完了と例外を扱う方法を知っている。ここで自然な質問として、どちらのアプローチがなぜ好ましいかがある。ライブラリとして、ある特定のアプローチを提唱することはせず、両方の選択肢が有効であると信じており、個人の好みとコードスタイルに従って選択するべきである。
flowの起動
いくつかの源泉からやってくる非同期なイベントを表現するのに、flowを使うことは簡単である。この場合、コードの断片にやってくるイベントに対する反応を登録し、さらに作業を続けるaddListener
関数の類似物が必要になる。onEach演算子がこの役割を担うことができる。しかし、onEach
は中間演算子である。flowを収集するには、終端演算子も必要である。そうでないと、onEach
の呼び出しだけでは、何も効果が無い。
onEach
の後に、collect終端演算子を使う場合、その後のコードはflowが収集されるまで待機する。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// イベントのflowの真似をする
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- flowの収集を待機
println("Done")
}
ご覧の通り、以下の出力となる。
Event: 1
Event: 2
Event: 3
Done
ここでlaunchIn終端演算子が役に立ってくる。collect
をlaunchIn
に置き換えると、別のコルーチンでflowの収集を起動することができ、さらなるコードの実行を、すぐに続けることができる。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- 別のコルーチンでflowを起動する
println("Done")
}
これは以下を出力する。
Done
Event: 1
Event: 2
Event: 3
launchIn
に対する必須のパラメータとして、CoroutineScopeを指定しなければならず、そこで、flowを収集するコルーチンが起動する。上記の例では、このスコープはrunBlockingコルーチンビルダーからやってきており、flowが実行する一方、runBlockingのスコープは、子のコルーチンの完了を待機し、main関数がリターンして、この例が終了しないようにしている。
実際のアプリケーションでは、スコープは制限された一生を持つ実体からやってくるだろう。この実体の一生が終了するとすぐ、対応するスコープはキャンセルされ、対応するflowの収集はキャンセルされる。このようにして、onEach { ... }.launchIn(scope)
のペアはaddEventListener
のように動作する。しかし、removeEventListener
関数に対応するものは必要ない。キャンセルと構造化並列性がこの目的を提供するためである。
留意すべきは、launchInはJobも返却し、それは全体のスコープをキャンセルせずにflowを収集するコルーチンのみをcancelしたり、joinするために使うことができる。
flowのキャンセルチェック
便宜上、flowビルダーは、それぞれの放出された値における追加のensureActiveキャンセルチェックを実行する。それはflow { ... }
から放出しているビジーなループはキャンセル可能であること意味する。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
for (i in 1..5) {
println("Emitting $i")
emit(i) // ※事前にキャンセルチェックをしている
}
}
fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}
3までの数値と、4の数値を放出しようとしたのち、CancellationExceptionを取得するだけである。
Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
しかし、他のほとんどのflow演算子は、性能理由から、自分自身で追加のキャンセル確認はしていない。例えば、IntRange.asFlowを使って同じビジーなループでどこでもsuspendしないコードを書くと、キャンセルのチェックは行われない。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}
1から5のすべての数値は、収集され、キャンセルはrunBlocking
からリターンする前にだけ、検知される。
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
ビジーなflowをキャンセルできるようにする
コルーチンを持つビジーなループがある場合、明示的にキャンセルの確認をしなければならない。.onEach { currentCoroutineContext().ensureActive() }
を追加することができるが、使いやすいcancellable演算子がそれを提供している。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}
cancellable
演算子があると、数値は1から3までのみ収集される。
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
flowとReactive Streams
Reactive Streamsあるいは、RxJavaとReactorプロジェクトのようなリアクティブフレームワークに精通している人にとって、flowの設計は非常になじみがあるかもしれない。
まさに、その設計はReactive Streamsとそのいろいろな実装から影響を受けていた。しかしflowの主な目的は、可能な限りシンプルな設計にし、Kotlinらしく、suspendに親和性があり、構造的並列性を尊重することである。その目的に達するには、リアクティブの先駆者とその巨大な仕事無しでは不可能だったろう。Reactive StreamsとKotlin Flowの記事で、完全な物語を読むことができる。
概念的に異なる一方で、flowはリアクティブストリームであり、それをリアクティブな(仕様とTCK準拠の)Publisherに変換、逆もまた同様、が可能である。そのようなコンバーターはkotlinx.coroutines
ですぐに使えるものが提供され、対応するリアクティブモジュール(Reactive Streamsのためのkotlinx-coroutines-reactive
、Project Reactorのためのkotlinx-coroutines-reactor
、RxJava2/RxJava3のためのkotlinx-coroutines-rx2
/kotlinx-coroutines-rx3
)で見つけることができる。統合モジュールには、Flow
からとFlowへのコンバータ、ReactorのContext
と様々なリアクティブ実体と連携する、suspendに親和性のある方法との統合が含まれている。