LoginSignup
3
2

More than 1 year has passed since last update.

Kotlin CoroutineのFlowで一定期間データが取得できない場合のタイムアウト処理を書くには

Posted at

背景

例えば株価をリアルタイムに画面に表示することを考えてみます。
ネットワークの調子が悪いときに現在の価格を消して「取得できません」と出したいようなときどう書くのがシンプルか考えてみました。

方法

suspend funの場合のタイムアウト

まず普通のsuspend funであれば簡単です。
withTimeoutやwithTimeoutOrNullで囲んでやればタイムアウト処理を書けます。

    withTimeout(1000L) {
        delay(2000)
    }

Flowの場合のタイムアウト

ではFlowの場合はどうすればよいでしょうか?

ぱっと見てぴったりな標準関数は見当たりませんが、select { } を使えば簡単に実現できます。

以下説明用のため汎用性はありませんがFlow版のwithTimeoutを作ってみました。

fun Flow<String>.withTimeout(coroutineScope: CoroutineScope, timeMills: Long) = flow {
    val receiveChannel = produceIn(coroutineScope)

    while (coroutineScope.isActive) {
        select<Unit> {
            onTimeout(timeMills) {
                emit("タイムアウトしました")
            }

            receiveChannel.onReceive {
                emit(it)
            }
        }
    }
}

サンプルコードと実行結果

全体のコードとしては以下のようになります。

package example

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.withTimeout
import kotlin.coroutines.EmptyCoroutineContext

suspend fun main() {
    val job = SupervisorJob()
    val coroutineScope = CoroutineScope(EmptyCoroutineContext + job)

    coroutineScope.launch {
        val exampleFlow: Flow<String> = flow {
            delay(5_000)
            emit("1")
            delay(1_000)
            emit("2")
            delay(5_000)
            emit("3")
            emit("END")
        }

        exampleFlow
            .withTimeout(coroutineScope, 2_000)
            .distinctUntilChanged()
            .collect {
                println(it)

                if (it == "END") {
                    coroutineScope.cancel()
                }
            }
    }.join()
}

fun Flow<String>.withTimeout(coroutineScope: CoroutineScope, timeMills: Long) = flow {
    val receiveChannel = produceIn(coroutineScope)

    while (coroutineScope.isActive) {
        select<Unit> {
            onTimeout(timeMills) {
                emit("タイムアウトしました")
            }

            receiveChannel.onReceive {
                emit(it)
            }
        }
    }
}

suspend fun aa() {
    withTimeout(1000L) {
        delay(2000)
    }
}

出力のサンプルは以下です。

タイムアウトしました
1
2
タイムアウトしました
3
END
3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2