背景
例えば株価をリアルタイムに画面に表示することを考えてみます。
ネットワークの調子が悪いときに現在の価格を消して「取得できません」と出したいようなときどう書くのがシンプルか考えてみました。
方法
suspend funの場合のタイムアウト
まず普通のsuspend funであれば簡単です。
withTimeoutやwithTimeoutOrNullで囲んでやればタイムアウト処理を書けます。
withTimeout(1000L) {
delay(2000)
}
Flowの場合のタイムアウト
ではFlowの場合はどうすればよいでしょうか?
ぱっと見てぴったりな標準関数は見当たりませんが、select { } を使えば簡単に実現できます。
以下説明用のため汎用性はありませんがFlow版のwithTimeoutを作ってみました。
fun Flow<String>.withTimeout(coroutineScope: CoroutineScope, timeMills: Long) = flow {
val receiveChannel = produceIn(coroutineScope)
while (coroutineScope.isActive) {
select<Unit> {
onTimeout(timeMills) {
emit("タイムアウトしました")
}
receiveChannel.onReceive {
emit(it)
}
}
}
}
サンプルコードと実行結果
全体のコードとしては以下のようになります。
package example
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.withTimeout
import kotlin.coroutines.EmptyCoroutineContext
suspend fun main() {
val job = SupervisorJob()
val coroutineScope = CoroutineScope(EmptyCoroutineContext + job)
coroutineScope.launch {
val exampleFlow: Flow<String> = flow {
delay(5_000)
emit("1")
delay(1_000)
emit("2")
delay(5_000)
emit("3")
emit("END")
}
exampleFlow
.withTimeout(coroutineScope, 2_000)
.distinctUntilChanged()
.collect {
println(it)
if (it == "END") {
coroutineScope.cancel()
}
}
}.join()
}
fun Flow<String>.withTimeout(coroutineScope: CoroutineScope, timeMills: Long) = flow {
val receiveChannel = produceIn(coroutineScope)
while (coroutineScope.isActive) {
select<Unit> {
onTimeout(timeMills) {
emit("タイムアウトしました")
}
receiveChannel.onReceive {
emit(it)
}
}
}
}
suspend fun aa() {
withTimeout(1000L) {
delay(2000)
}
}
出力のサンプルは以下です。
タイムアウトしました
1
2
タイムアウトしました
3
END