24
20

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【Coroutinesガイド】非同期Flow

Posted at

※ソース記事はこちら
suspend関数は非同期に単一の値を返却するが、非同期に計算される複数の値を返却することはできるのか?これはKotlin Flowが役立つところである。

複数の値の表現

複数の値はKotlinではコレクションを使って表現される。例えば、3つの数字のリストを返却するsimple関数を持つことができ、そしてforEachを使ってそれらをすべて出力する。

fun simple(): List<Int> = listOf(1, 2, 3)
 
fun main() {
    simple().forEach { value -> println(value) } 
}

このコードは以下を出力する。

1
2
3

シーケンス

もしいくらかCPUを消費するブロッキングコード(それぞれの計算は100msかかる)を持つ、数値の計算がある場合、Sequenceを使って数値を表現することができる。

fun simple(): Sequence<Int> = sequence { // シーケンスビルダー
    for (i in 1..3) {
        Thread.sleep(100) // 計算しているふりをする
        yield(i) // 次の値を生み出す
    }
}

fun main() {
    simple().forEach { value -> println(value) } 
}

このコードは同じ数値を出力するが、それぞれを出力する前に100ms待機する。

suspend関数

しかし、この計算はコードを実行しているスレッドをブロックする。これらの値が、simple関数をsuspend修飾子で印をつけることができる非同期コードで計算すると、ブロッキング無しで作業を行うことができ、リストとして結果を返却する。

import kotlinx.coroutines.*                 
                           
suspend fun simple(): List<Int> {
    delay(1000) // ここで非同期なことをするふりをする
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}

このコードは1秒待って数字を出力する。

Flow

List<Int>の結果型を使うことは、一度にすべての値を返却することしかできない。非同期に計算される値のストリームを表現するため、Flow<Int>型をつかうことができ、同期的に値を計算するためのSequence<Int>とちょうど同じように使うことができる。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

              
fun simple(): Flow<Int> = flow { // flow ビルダー
    for (i in 1..3) {
        delay(100) // ここで何か役に立つことをするふりをする
        emit(i) // 次の値を放出する
    }
}

fun main() = runBlocking<Unit> {
    // mainスレッドがブロックされるか確認するため、並列にコルーチンを起動する
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // flowを収集する
    simple().collect { value -> println(value) } 
}

このコードは、mainスレッドをブロックせず、それぞれの数を出力する前に100ms待機する。このことは、mainスレッドで実行する別のコルーチンによって、"I'm not blocked"が100msおきに出力されることによって確認できる。

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

Flowを使ったコードと、以前の例では次の違いがあることに注意すること。

  • Flow型のビルダー関数は、flowと呼ばれる。
  • flow { ... }ビルダーブロック内部のコードは、一時停止されることがある。
  • simple関数は今ではsuspend修飾子はついていない。
  • 値はemit関数により、放出される。
  • 値はcollect関数により、収集される。

simpleflow { ... }のボディにあるdelayThread.sleepに置き換えることができ、その場合、mainスレッドがブロックされることがわかる。

flowはcoldである

flowはsequenceに似てcoldストリームである。つまりflowビルダー内のコードは、flowが収集されるまで、実行されない。このことは次の例で明らかになる。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

     
fun simple(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}

これは次の出力となる。

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

これは(flowを返却する)simple関数が、suspend修飾子がついていない主な理由である。それ自体、simple()はすぐに返却され、何も待つことはしない。flowは収集されるときはいつでも開始する。それは再度collectを呼び出したとき、"Flow started"が表示される理由である。

flowのキャンセルの基本

flowはコルーチンの一般的な協調的キャンセルに従う。通常、flowの収集は、delayのようなキャンセルできるsuspend関数のおいてflowがsuspendされるとき、キャンセルすることができる。次の例では、flowがwithTimeoutOrNullブロックで実行しているときにどのようにキャンセルされ、コードの実行が中止する方法を見ることができる。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

          
fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // 250ms後、タイムアウト 
        simple().collect { value -> println(value) } 
    }
    println("Done")
}

次の出力を生成し、flowにより二つの数字だけが放出されることに注意すること。

Emitting 1
1
Emitting 2
2
Done

より詳細はflowのキャンセルチェックを参照のこと。

flowビルダー

以前の例からのflow { ... }ビルダーは、最も基本的なものである。より簡単なflowの宣言のための他のビルダーが存在する。

  • flowOfビルダーは固定の値を放出するflowを定義する
  • 様々なコレクションやシーケンスは、asFlow()拡張関数を使うことで、flowに変換できる

こうして1から3の数字を出力する例は次のように書くことができる。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    // Convert an integer range to a flow
    (1..3).asFlow().collect { value -> println(value) } 
}

中間のflow演算子

flowはコレクションやシーケンスと全く同じように、演算子で変形することができ、中間の演算子は上流のflowに適用され、下流のflowを返却する。これらの演算子は、flowと全く同じようにcoldである。そのような演算子の呼び出しは、それ自体suspend関数ではない。それは素早く動作し、新しく変形されたflowの定義を返却する。
基本的な演算子は、mapfilerのようななじみのある名前を持つ。シーケンスとの重要な違いは、これらの演算子の内部のコードブロックがsuspend関数を呼ぶことができるということである。
例えば、着信要求のflowはmap演算子を使って結果がマップされることができる。たとえ要求の実行が、suspend関数で実装された長時間動作する操作であってもである。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

          
suspend fun performRequest(request: Int): String {
    delay(1000) // 長時間の非同期動作のふりをする
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // 要求のflow
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

これは、以下の3行を生成し、それぞれの行はそれぞれ1秒後に現れる。

response 1
response 2
response 3

変形演算子

flowの変形演算子のうち、もっとも一般的なものはtransformと呼ばれる。それはより複雑な変形と同様に、mapfilerのような簡単な変形の真似をすることに使われる。transform演算子を使って、任意の回数、任意の値をemitすることができる。
例えば、transformを使って、長時間実行する非同期要求の前に一つの文字列を放出し、応答でそれに続けることができる。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun performRequest(request: Int): String {
    delay(1000) // 長時間の非同期処理のふりをする
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // 要求のflow
        .transform { request ->
            emit("Making request $request") 
            emit(performRequest(request)) 
        }
        .collect { response -> println(response) }
}

このコードの出力は次のようになる。

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

サイズを制限する演算子

takeのようなサイズを制限する中間演算子は、対応する制限に達したとき、flowの実行をキャンセルする。コルーチンでのキャンセルは常に例外をスローすることによって実行されるため、すべてのリソース管理関数(try { ... }finally { ... }ブロック)はキャンセルの場合、通常に動作する。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // 最初の二つだけ取る
        .collect { value -> println(value) }
}            

終端flow演算子

flowにおける終端演算子は、flowの収集を開始するsuspend関数である。collect演算子は最も基本的であるが、他の終端演算子も存在し、それらは以下のことを簡単にすることができる。

  • toListtoSetのように様々なコレクションへの変換
  • 最初の値を取得したり、flowが単一な値を放出することを保証する演算子
  • reducefoldを使い、flowを一つの値に減らすこと

例えば以下の場合、

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {

    val sum = (1..5).asFlow()
        .map { it * it } // 1から5までの数値を二乗する                           
        .reduce { a, b -> a + b } // それらを合計する (終端演算子)
    println(sum)     
}

単一の値を出力する。

35

flowはシーケンシャルである

flowfの個々の収集は、使われているる複数のflowを操作する特別な演算子無しで、シーケンシャルに実行される。収集は終端演算子を呼ぶコルーチン内で直接作用する。デフォルトでは新しいコルーチンは起動されない。それぞれの放出された値は、上流から下流にかけてすべての中間演算子によって処理され、その後、後から終端演算子に届けられる。
偶数の整数をフィルターして、それらを文字列にmapする次の例を参照してほしい。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {

    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0              
        }              
        .map { 
            println("Map $it")
            "string $it"
        }.collect { 
            println("Collect $it")
        }                      
}

次のようにを生成される。

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

flowコンテキスト

flowの収集は、コルーチンを呼ぶコンテキストで常に発生する。例えば、simpleflowがある場合、次のコードはこのコードの著者によって指定されたコンテキストにおいて実行され、それはsimpleflowの実装の詳細に無関係である。

withContext(context) {
    simple().collect { value ->
        println(value) // 指定されたコンテキストで実行される
    }
}

このflow属性はコンテキスト保存と呼ばれる。
このように、flow { ... }ビルダー内のコードは、デフォルトでflowに対するコレクターによって提供されるコンテキストで実行される。例えば、呼び出されたスレッドを表示して、3つの数値を放出するsimple関数の実装を考えてみる。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
}          

このコードを実行すると次の結果となる。

[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

simple().collectは、メインスレッドから呼び出されるため、simpleのflow本体もメインスレッドで呼び出される。これは早く動かすため、あるいは、実行コンテキストについて気にせず、呼び出しをブロックしない非同期コードのための、完全なデフォルトである。

withContextでの不正な放出

しかし、長時間動作するCPUを消費するコードは、Dispatcher.Defaultのコンテキストで実行する必要性があるかもしれず、UIを更新するコードは、Dispatchers.Mainのコンテキストで実行する必要があるかもしれない。たいてい、withContextが、Kotlinコルーチンを使っているコードでコンテキストを変更するために使われるが、flow { ... }ビルダー内のコードは、コンテキスト保存属性が与えられているため、異なるコンテキストからemitすることは許可されていない。
次のコードを試してみよう。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
                      
fun simple(): Flow<Int> = flow {
    // flowビルダー内でCPUを消費するコードのため、コンテキストを変更する間違った方法
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // CPUを消費する方法で計算するふりをする
            emit(i) // 次の値を放出する
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) } 
}   

このコードは次の例外を生成する。

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
		Please refer to 'flow' documentation or use 'flowOn' instead
	at ...

flowOn演算子

その例外は、flowの放出のコンテキストを変更するには、flowOn関数を使うべきだと言っている。flowのコンテキストを変更する正しい方法は、以下の例でみることができ、それが全くどのように動作するかを見せるために、対応するスレッドの名前も出力する。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // CPUを消費する方法で計算するふりをする
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // flowビルダー内でCPUを消費するコードのためにコンテキストを変更する正しい方法

fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}       

収集はメインスレッドで起こる一方、flow { ... }はバックグラウンドスレッドで動作することに注意すること。
ここで観察するもう一つのことは、flowOn演算子はflowのデフォルトのシーケンシャルな性質を変更したということである。今では収集は、あるコルーチン("coroutine#1")で発生し、放出は収集のコルーチンと並行な別のスレッドで動作するもう一つのコルーチン("coroutine#2")で発生している。flowOn演算子は、それがコンテキスト内でCoroutineDispatcherを変更する必要がある場合は、上流のflowのために別のコルーチンを作る。

バッファリング

様々なコルーチンの中で、様々なflowの部分が実行されることは、flowの収集にかかかる時間という観点から、特に長時間実行される非同期の操作が関係すると、役立つことがありうる。例えば、simpleのflowの放出が遅く、一つの要素を生成するのに100msかかり、コレクターも遅く、一つの要素を処理するのに300msかかる場合を考えてみよう。3つの数値を持つそのようなflowの収集にどのくらい時間がかかるか、見てみよう。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // 非同期に100ms待つふりをする
        emit(i) // 次の値を放出する
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(300) // 300msでそれを処理するふりをする
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}

それはこのようなものを出力し、全体の収集はおおよそ1200ms(3つの数値でそれぞれ400ms)かかる。

1
2
3
Collected in 1220 ms

simpleflowの放出するコードを、シーケンシャルで実行するのとは対照的に、収集するコードで並列に実行するために、flowにおいてbuffer演算子を使うことができる。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // 非同期に100ms待つふりをする
        emit(i) // 次の値を放出する
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            .buffer() // 放出をバッファリングし、待機しない
            .collect { value -> 
                delay(300) // 300ms処理するふりをする
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

最初の数値のために100ms待機する必要があり、その後、それぞれの数値を処理するため、300ms秒だけ一時的する、効果的に処理経路を作ったため、それは同じ数値をまさにより早く生成する。この方法では、実行に1000ms前後かかる。

1
2
3
Collected in 1071 ms

留意すべきは、flowOn演算子は、CoroutineDispatcherを変更する必要があるとき、同じバッファリングの仕組みを使うが、ここでは、実行コンテキストを変えずに明示的にバッファリング要求をしている。

合成

flowが操作の一部の結果あるいは、操作の状態の更新を表す場合、それぞれの値を処理する必要がなく、代わりに最新のものだけでよいかもしれない。この場合、コレクタが処理するのが遅すぎるとき、中間の値をスキップするのに、conflate演算子を使うことができる。前回の例を足場にしてみよう。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // 非同期に100ms待つふりをする
        emit(i) // 次の値を放出する
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            .conflate() // 放出を合成する。それぞれの値を処理しない
            .collect { value -> 
                delay(300) // 300msの間、処理するふりをする
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

最初の値はまだ処理されるが、2つめと3つめはすでに生成され、2つめは合成され、最新の(3つめのもの)だけが、コレクタに配達される。

1
3
Collected in 758 ms

最新の値の処理

合成はエミッタとコレクタが両方とも遅い場合、処理を高速化する一つの方法である。それは放出された値をやめることで行われる。他の方法は、遅いコレクタをキャンセルし、新しい値が放出されたらいつでも再起動することである。xxx演算子と基本的に同じロジックを実行するが、新しい値に基づいて、ブロック内のコードをキャンセルするxxxLatest演算子ファミリーが存在する。前回の例におけるconflatecollectLatestに変えてみよう。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // 非同期に100ms待つふりをする
        emit(i) //次の値を放出する
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            .collectLatest { value -> // 最近の値について、キャンセルかつ再起動する
                println("Collecting $value") 
                delay(300) // 300msの間、処理するふりをする
                println("Done $value") 
            } 
    }   
    println("Collected in $time ms")
}

collectLatestの本体は300msかかるが、新しい値は100msごとに放出されるため、すべての値においてブロックは実行されるが、最後の値のためにのみ、完了することが、見て取れる。

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms

複数のflowの構成

複数のflowを構成にするにはたくさんの方法がある。

Zip

Kotlin標準ライブラリのSequence.zip拡張関数のように、flowには対応する二つのflowの値を合成するzip演算子がある。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> { 

    val nums = (1..3).asFlow() // 数値 1..3
    val strs = flowOf("one", "two", "three") // 文字列
    nums.zip(strs) { a, b -> "$a -> $b" } // 単一の文字列に構成する
        .collect { println(it) } // 収集と表示
}

この例は次の出力をする。

1 -> one
2 -> two
3 -> three

Combine

flowが、変数または操作の最も最近の値を表すとき(合成における関連する章も参照のこと)、対応するflowの最新の値に依存して計算を実行し、上流のflowが値を放出するときは常に再計算する必要があるかもしれない。対応する演算子ファミリーはcombineと呼ばれる。
例えば、前回の例で数値が300msごとに更新されるが、文字列は400msごとに更新される場合、zip演算子を使ってzipすると、結果を400msおきに出力するにも関わらず、引き続き同じ結果を生成する。

この例では、それぞれの要素を遅延するために、onEach中間演算子を使用し、同一のflowを放出するコードをより宣言的、簡潔ににする。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> { 

    val nums = (1..3).asFlow().onEach { delay(300) } // 数値 1..3 300 msごと
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // 文字列 400msごと
    val startTime = System.currentTimeMillis() // 開始時刻を記録
    nums.zip(strs) { a, b -> "$a -> $b" } // "zip"で単一の文字列を構成
        .collect { value -> // 収集と表示 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

しかし、zipの代わりに、ここでcombile演算子を使う。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> { 

    val nums = (1..3).asFlow().onEach { delay(300) } // 数値 1..3 300 msごと
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // 文字列 400msごと          
    val startTime = System.currentTimeMillis() // 開始時刻を記録 
    nums.combine(strs) { a, b -> "$a -> $b" } // "combine"で単一の文字列を構成
        .collect { value -> // 収集と表示 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

かなり異なる出力が得られ、numsまたはstrのflowからのそれぞれの放出で行が表示される。

1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start

flowの平準化

flowは非同期に受けっとた値のシーケンスを表すため、それぞれの値が、他の値のシーケンス要求を引き起こすような状況に非常に簡単に陥る。例えば、500msおきに二つの文字列のflowを返却する次の関数がある。

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // 500 ms待つ
    emit("$i: Second")
}

そこで、このように三つの整数のフローがあり、それぞれのためにrequestFlowが呼び出される。

(1..3).asFlow().map { requestFlow(it) }

その後、flowのflow(Flow<Flow<String>>)になってしまい、さらなる処理のため、単一のflowに平準化する必要がある。コレクションやシーケンスではこのために、flattenflatMap演算子がある。しかし、flowの非同期の性質のため、平準化の異なるモードが必要とされ、そういうものとして、flowの平準化演算子ファミリーがある。

flatMapConcat

連結モードはflatMapConcatflattenConcat演算子によって実装されている。次の例で見えるように、それらは、次の値を収集する前に、内部のflowが完了するまで待機する。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // 開始時刻を記録する
    (1..3).asFlow().onEach { delay(100) } // 100msごとの数値 
        .flatMapConcat { requestFlow(it) }                                                                           
        .collect { value -> // 収集と表示
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

出力にはflatMapConcatのシーケンシャルな性質が明らかに見える。

1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start

flatMapMerge

もう一つの平準化モードは、着信するすべてのflowを並列に収集し、それらの値を単一のflowにマージするため、値はできるだけ早く放出される。それはflatMapMergeflattenMerge演算子により、実装されている。それらは両方オプションのconcurrencyパラメータを持ち、同時に収集するflowの並列数を制限する。(デフォルトはDEFAULT_CONCURRENCYに等しい)

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // 開始時刻を記録する
    (1..3).asFlow().onEach { delay(100) } // 100msごとの数値  
        .flatMapMerge { requestFlow(it) }                                                                           
        .collect { value -> // 収集と表示
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

flatMapMergeの並列な性質が明らかである。

1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start

留意すべきは、flatMapMergeはそのコードブロック(この例では{ requestFlow(it) })をシーケンシャルに呼ぶが、結果のflowを並列に収集する。それは最初にシーケンシャルなmap { requestFlow(it) }を実行し、その後で結果においてflattenMergeを呼ぶことに等しい。

flatMapLatest

最新の値の処理のセクションで見た、collectLatest演算子と似た方法で、対応する"最近の"平準化モードがあり、それは新しいflowが放出されるとすぐに、前回のflowの収集がキャンセルされるというものである。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // 500ms待機
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() //  開始時刻を記録する 
    (1..3).asFlow().onEach { delay(100) } // 100msごとの数値 
        .flatMapLatest { requestFlow(it) }                                                                           
        .collect { value -> // 収集と表示
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

この例のここの出力は、flatMapLatestがどのように動作するかの良い実演である。

1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start

留意すべきは、flatMapLatestは、新しい値が来るとすぐにそのブロック({ requestFlow(it) })内のすべてのコードをキャンセルする。この特殊な例では違いがない。なぜなら、requestFlow自体への呼び出しは早く、suspendもせず、キャンセルすることはできない。ただし、そこでdelayのようなsuspend関数を使うような場合、違いが際立つようになる。

flow例外

flowの収集は、放出側あるいは、演算子の内側のコードが例外をスローする場合、例外で完了することがある。これらの例外を扱ういくつかの方法がある。

コレクタのtry catch

コレクタは例外を扱うために、Kotlinのtry/catchを使うことができる。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // 次の値を放出する
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}

このコードは、collect終端演算子で例外を正常にキャッチし、ご覧のようにその後では、もう値は放出されない。

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

すべてはキャッチされる

前回の例では、放出側や、中間あるいは終端演算子で発生したどんな例外も実際キャッチする。例えば、値を放出するコードを文字列にmapするようにコードを変更してみよう。しかし、対応するコードでは例外が発生する。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // 次の値を放出する
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}    

この例外は依然としてキャッチされ、収集は停止する。

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

例外の透過性

しかし、どのように放出者のコードを例外を扱うふるまいにカプセル化することができるか?
flowは例外に対して透明でなければならず、try/catchブロック内部からflow { ... }ビルダーで値を放出することは、例外の透過性に違反することである。これは前回の例のように、コレクタがスローされた例外を常にtry/catchブロックでキャッチできることを保証している。
放出側は、この例外の透過性を保存し、例外ハンドリングをカプセル化するためにcatch演算子を使うことができる。catch演算子の本体で、例外を分析でき、キャッチされた例外の種類に応じて、様々な方法で反応することができる。

  • 例外はthrowで再スローすることができる
  • 例外はcatchの本体から、emitを使うことで、値の放出に変えることができる
  • 例外は無視、あるいはログ、あるいはその他のコードによって処理されることができる
    例えば、例外をキャッチするとテキストを放出するようにしてみよう。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // 次の値を放出する
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> emit("Caught $e") } // 例外時の放出
        .collect { value -> println(value) }
}

この例の出力は、コードの周りにもうtry/catchは無いにも関わらず、同じである。

透過的なキャッチ

catch中間演算子は、例外の透過性のおかげで、上流の例外(それはcatchの上のすべての演算子からの例外であり、その下は対象外)だけをキャッチする。もしcollect { ... }内のブロック(catchの下に置かれている)で例外がスローされた場合、それは漏れ出る。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // 下流の例外はキャッチしない
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}

catch演算子があるにも関わらず、"Caught..."メッセージは出力されない。

Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
	at ...

宣言的なcatch

catch演算子の宣言的な性質と、すべての例外を扱いたいという欲望を合成することができる、つまりcollect演算子の本体をonEachに移動し、それをcatch演算子の前に置く。このflowの収集は、パラメータ無しでcollect()を呼び出すことによって引き起こされなければならない。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .onEach { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
        .catch { e -> println("Caught $e") }
        .collect()
}

これで、"Caught..."メッセージが出力されるのを見ることができ、明示的なtry/catchブロックを使うことなく、すべての例外をキャッチすることができる。

Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2

flowの完了

flowの収集が(正常あるいは例外で)完了するとき、動作を実行する必要があるかもしれない。すでに気づいているかもしれないが、二つの方法、つまり命令的あるいは宣言的にそれが可能である。

命令的なfinallyブロック

try/catchに加えて、コレクターはcollectの完了時に、動作を実行するためにfinallyブロックも使うことができる。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}     

このコードは、simpleflowによって生成された3つの数字と、つづいて"Done"文字列を出力する。

1
2
3
Done

宣言的ハンドリング

宣言的なアプローチのため、flowにはflowが完全に収集したとき、呼び出されるonCompletion中間演算子がある。
前回の例をonCompletion演算子を使って書き直し、同じ出力を生成することができる。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { println("Done") }
        .collect { value -> println(value) }
}

onCompletionの重要な利点は、flowの収集が正常か例外で完了したのか、決めるために使うことができる、ラムダのnull許容のThrowableパラメータである。次の例で、simpleflowは数字の1を放出した後で例外をスローする。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}  

予想しているかもしれないように、以下の出力をする。

1
Flow completed exceptionally
Caught exception

onCompletion演算子は、catchと異なり、例外を扱わない。上記の例のコードからわかるように、例外は引き続き下流へ流れる。それはさらにonCompletion演算子に渡され、catch演算子でハンドルすることもできる。

正常な完了

catch演算子とのもう一つの違いは、onCompletionはすべての例外が見えており、(キャンセルや失敗以外の)上流のflowの正常終了においてのみ、nullの例外を受け取るということである。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}

完了の理由がnullではないことがわかる。なぜなら、flowは下流の例外のために異常終了したからである。

1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2

命令的か宣言的か

今では、flowを収集し、命令的と宣言的両方で、完了と例外を扱う方法を知っている。ここで自然な質問として、どちらのアプローチがなぜ好ましいかがある。ライブラリとして、ある特定のアプローチを提唱することはせず、両方の選択肢が有効であると信じており、個人の好みとコードスタイルに従って選択するべきである。

flowの起動

いくつかの源泉からやってくる非同期なイベントを表現するのに、flowを使うことは簡単である。この場合、コードの断片にやってくるイベントに対する反応を登録し、さらに作業を続けるaddListener関数の類似物が必要になる。onEach演算子がこの役割を担うことができる。しかし、onEachは中間演算子である。flowを収集するには、終端演算子も必要である。そうでないと、onEachの呼び出しだけでは、何も効果が無い。
onEachの後に、collect終端演算子を使う場合、その後のコードはflowが収集されるまで待機する。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// イベントのflowの真似をする
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- flowの収集を待機
    println("Done")
}            

ご覧の通り、以下の出力となる。

Event: 1
Event: 2
Event: 3
Done

ここでlaunchIn終端演算子が役に立ってくる。collectlaunchInに置き換えると、別のコルーチンでflowの収集を起動することができ、さらなるコードの実行を、すぐに続けることができる。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- 別のコルーチンでflowを起動する
    println("Done")
} 

これは以下を出力する。

Done
Event: 1
Event: 2
Event: 3

launchInに対する必須のパラメータとして、CoroutineScopeを指定しなければならず、そこで、flowを収集するコルーチンが起動する。上記の例では、このスコープはrunBlockingコルーチンビルダーからやってきており、flowが実行する一方、runBlockingのスコープは、子のコルーチンの完了を待機し、main関数がリターンして、この例が終了しないようにしている。
実際のアプリケーションでは、スコープは制限された一生を持つ実体からやってくるだろう。この実体の一生が終了するとすぐ、対応するスコープはキャンセルされ、対応するflowの収集はキャンセルされる。このようにして、onEach { ... }.launchIn(scope)のペアはaddEventListenerのように動作する。しかし、removeEventListener関数に対応するものは必要ない。キャンセルと構造化並列性がこの目的を提供するためである。
留意すべきは、launchInJobも返却し、それは全体のスコープをキャンセルせずにflowを収集するコルーチンのみをcancelしたり、joinするために使うことができる。

flowのキャンセルチェック

便宜上、flowビルダーは、それぞれの放出された値における追加のensureActiveキャンセルチェックを実行する。それはflow { ... }から放出しているビジーなループはキャンセル可能であること意味する。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

          
fun foo(): Flow<Int> = flow { 
    for (i in 1..5) {
        println("Emitting $i") 
        emit(i) // ※事前にキャンセルチェックをしている
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

3までの数値と、4の数値を放出しようとしたのち、CancellationExceptionを取得するだけである。

Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c

しかし、他のほとんどのflow演算子は、性能理由から、自分自身で追加のキャンセル確認はしていない。例えば、IntRange.asFlowを使って同じビジーなループでどこでもsuspendしないコードを書くと、キャンセルのチェックは行われない。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

          
fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

1から5のすべての数値は、収集され、キャンセルはrunBlockingからリターンする前にだけ、検知される。

1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23

ビジーなflowをキャンセルできるようにする

コルーチンを持つビジーなループがある場合、明示的にキャンセルの確認をしなければならない。.onEach { currentCoroutineContext().ensureActive() }を追加することができるが、使いやすいcancellable演算子がそれを提供している。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

          
fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

cancellable演算子があると、数値は1から3までのみ収集される。

1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365

flowとReactive Streams

Reactive Streamsあるいは、RxJavaとReactorプロジェクトのようなリアクティブフレームワークに精通している人にとって、flowの設計は非常になじみがあるかもしれない。
まさに、その設計はReactive Streamsとそのいろいろな実装から影響を受けていた。しかしflowの主な目的は、可能な限りシンプルな設計にし、Kotlinらしく、suspendに親和性があり、構造的並列性を尊重することである。その目的に達するには、リアクティブの先駆者とその巨大な仕事無しでは不可能だったろう。Reactive StreamsとKotlin Flowの記事で、完全な物語を読むことができる。
概念的に異なる一方で、flowはリアクティブストリームであり、それをリアクティブな(仕様とTCK準拠の)Publisherに変換、逆もまた同様、が可能である。そのようなコンバーターはkotlinx.coroutinesですぐに使えるものが提供され、対応するリアクティブモジュール(Reactive Streamsのためのkotlinx-coroutines-reactive、Project Reactorのためのkotlinx-coroutines-reactor、RxJava2/RxJava3のためのkotlinx-coroutines-rx2/kotlinx-coroutines-rx3)で見つけることができる。統合モジュールには、FlowからとFlowへのコンバータ、ReactorのContextと様々なリアクティブ実体と連携する、suspendに親和性のある方法との統合が含まれている。

24
20
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
24
20

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?