はじめに
take()
や first()
は、条件に達したとき、CancellationException
を継承した AbortFlowException
が投げられていますので、キャンセル完了をハンドリングしている場合、想定外の補足してしまう場合があります。
環境
- kotlin 2.0.21
- org.jetbrains.kotlinx:kotlinx-coroutines-core 1.8.1
注意
take()
や first()
よりも後の flow に対しては CancellationException
は伝播してこないので、通常の使用で問題になることは少ないと思います。
CancellationException を捕捉してしまう例
val cancellationHandlingFlow: Flow<Unit> = flowOf(Unit)
.onCompletion { throwable ->
// 結果別の表示
when (throwable) {
null -> println("完了しました。")
is CancellationException -> println("キャンセルされました。 $throwable")
else -> println("エラーです。 $throwable")
}
}
上記のように、キャンセル完了もハンドリングしようとすると、catch()
では CancellationException
を捕捉できないので、onCompletion()
で処理することになると思います。
このような Flow に対して、first()
で結果を1つ得るようにします。
fun main() = runBlocking {
cancellationHandlingFlow.first()
}
すると、結果は、
キャンセルされました。 kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed
cancellationHandlingFlow
の onCompletion()
では、通常の完了ではなく、キャンセルとして flow を終了したことが伝えられました。
Flow が1つの値の放出で終了するものであっても、先に first()
の条件を満たしてキャンセルで終了したことになります。
また、first()
ではなく take(1)
でも同様に AbortFlowException
となります。
fun main() {
runBlocking {
launch {
cancellationHandlingFlow
.take(1)
.collect {}
}
}
}
キャンセルされました。 kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed
回避策
キャンセルハンドリングの onCompletion()
を collect()
や launchIn()
の直前に呼び出すように Flow を拡張するとよさそうです。
// collect() 直前で onCompletion() を呼ぶ拡張
suspend fun <T> Flow<T>.collectHandling(
collector: FlowCollector<T>
) = this
.onCompletion { throwable ->
// 結果別の表示
when (throwable) {
null -> println("完了しました。")
is CancellationException -> println("キャンセルされました。 $throwable")
else -> println("エラーです。 $throwable")
}
}
.collect(collector)
// launchIn() 直前で onCompletion() を呼ぶ拡張
fun <T> Flow<T>.launchInHandling(
scope: CoroutineScope
) = this
.onCompletion { throwable ->
// 結果別の表示
when (throwable) {
null -> println("完了しました。")
is CancellationException -> println("キャンセルされました。 $throwable")
else -> println("エラーです。 $throwable")
}
}
.launchIn(scope)
fun main() {
runBlocking {
launch {
flowOf(Unit)
.take(1)
.collectHandling {}
}
}
}
fun main() = runBlocking {
flowOf(Unit)
.map { flowOf(Unit).first() }
.launchInHandling(this)
.join()
}
完了しました。