はじめに
take() や first() は、条件に達したとき、CancellationException を継承した AbortFlowException が投げられていますので、キャンセル完了をハンドリングしている場合、想定外の補足してしまう場合があります。
環境
- kotlin 2.0.21
- org.jetbrains.kotlinx:kotlinx-coroutines-core 1.8.1
注意
take() や first() よりも後の flow に対しては CancellationException は伝播してこないので、通常の使用で問題になることは少ないと思います。
CancellationException を捕捉してしまう例
val cancellationHandlingFlow: Flow<Unit> = flowOf(Unit)
.onCompletion { throwable ->
// 結果別の表示
when (throwable) {
null -> println("完了しました。")
is CancellationException -> println("キャンセルされました。 $throwable")
else -> println("エラーです。 $throwable")
}
}
上記のように、キャンセル完了もハンドリングしようとすると、catch()では CancellationException を捕捉できないので、onCompletion() で処理することになると思います。
このような Flow に対して、first() で結果を1つ得るようにします。
fun main() = runBlocking {
cancellationHandlingFlow.first()
}
すると、結果は、
キャンセルされました。 kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed
cancellationHandlingFlow の onCompletion() では、通常の完了ではなく、キャンセルとして flow を終了したことが伝えられました。
Flow が1つの値の放出で終了するものであっても、先に first() の条件を満たしてキャンセルで終了したことになります。
また、first() ではなく take(1) でも同様に AbortFlowException となります。
fun main() {
runBlocking {
launch {
cancellationHandlingFlow
.take(1)
.collect {}
}
}
}
キャンセルされました。 kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed
回避策
キャンセルハンドリングの onCompletion() を collect() や launchIn() の直前に呼び出すように Flow を拡張するとよさそうです。
// collect() 直前で onCompletion() を呼ぶ拡張
suspend fun <T> Flow<T>.collectHandling(
collector: FlowCollector<T>
) = this
.onCompletion { throwable ->
// 結果別の表示
when (throwable) {
null -> println("完了しました。")
is CancellationException -> println("キャンセルされました。 $throwable")
else -> println("エラーです。 $throwable")
}
}
.collect(collector)
// launchIn() 直前で onCompletion() を呼ぶ拡張
fun <T> Flow<T>.launchInHandling(
scope: CoroutineScope
) = this
.onCompletion { throwable ->
// 結果別の表示
when (throwable) {
null -> println("完了しました。")
is CancellationException -> println("キャンセルされました。 $throwable")
else -> println("エラーです。 $throwable")
}
}
.launchIn(scope)
fun main() {
runBlocking {
launch {
flowOf(Unit)
.take(1)
.collectHandling {}
}
}
}
fun main() = runBlocking {
flowOf(Unit)
.map { flowOf(Unit).first() }
.launchInHandling(this)
.join()
}
完了しました。